Job Processor Orchestration Pattern¶
Purpose¶
Manage the lifecycle of per-batch processing infrastructure: create SQS queues, DLQs, Lambda functions, and SNS subscriptions when a batch starts, monitor progress, coordinate DLQ redrives, and tear everything down when the batch completes.
This is the most complex pattern in the platform. It coordinates the entire processing lifecycle for a batch of documents.
Architecture Overview¶
JOB_STARTED event
│
▼
┌─────────────────────────────────┐
│ Job Processor Lambda │
│ (single-record, batch size 1) │
│ │
│ 1. Validate batch state │
│ 2. Check for stuck imports │
│ 3. Verify batch ordering │
│ 4. Create per-batch infra │
│ 5. Monitor queue depth │
│ 6. Coordinate DLQ redrives │
│ 7. Tear down infrastructure │
└─────────────────────────────────┘
│ │
▼ ▼
┌──────────┐ ┌──────────────┐
│ SQS Queue│ │ DLQ │
│ (per │ │ (per batch) │
│ batch) │ │ │
└────┬─────┘ └──────────────┘
│
▼
┌──────────────────────────────┐
│ Document Loader Lambda │
│ (per batch, N concurrency) │
│ │
│ Processes documents via │
│ checkpoint pipeline │
└──────────────────────────────┘
Batch Validation State Machine¶
Before creating infrastructure, the job processor validates that this batch should be processed. This prevents duplicate infrastructure and enforces sequential batch ordering per case.
class BatchValidationResult(Enum):
SKIP_CANCELLED = "skip_cancelled" # Batch was cancelled — do nothing
SKIP_DUPLICATE = "skip_duplicate" # Lambda already exists and running
SKIP_INCOMPLETE = "skip_incomplete" # Lambda exists but status is stale
REQUEUE_DB_ERROR = "requeue_db_error" # DB unavailable — retry later
CONTINUE = "continue" # Proceed with infrastructure creation
Validation Flow¶
def process_job_started(message):
npcase_id = int(message.get("caseId"))
batch_id = int(message.get("batchId"))
# Step 1: Validate batch exists and isn't terminal
batch, result = _validate_batch_state(npcase_id, batch_id, message)
if result != BatchValidationResult.CONTINUE:
return
# Step 2: Check if Lambda already exists (idempotency)
lambda_result = _handle_lambda_state(batch, npcase_id, batch_id)
if lambda_result != BatchValidationResult.CONTINUE:
return
# Step 3: Check for stuck imports and batch ordering
if _can_process_current_job(message):
_launch_batch_processing(npcase_id, batch_id)
Stuck Import Detection¶
Active imports are detected by checking: 1. Is another batch's loader_status "processing"? 2. Does that batch's Lambda function still exist? 3. Is the lock recent (within 15-minute grace period)?
def _check_and_handle_stuck_imports(npcase_id, current_batch_id):
"""Check for orphaned batches and clean them up."""
potentially_active = session.query(Batches).filter(
Batches.npcase_id == npcase_id,
Batches.id != current_batch_id,
Batches.loader_status == "processing",
).all()
for batch in potentially_active:
lambda_exists = _does_lambda_exist(batch.npcase_id, batch.id)
lock_time = batch.loader_status_updated_at_gmt
grace_period = now - timedelta(minutes=15)
if lambda_exists:
return True # Genuinely active — block current batch
if not lambda_exists and lock_time and lock_time > grace_period:
return True # Recent lock, Lambda may be starting — block
if not lambda_exists and (not lock_time or lock_time <= grace_period):
# Orphaned batch — clean up and continue
batch.loader_status = "in-complete"
return False
Sequential Batch Ordering¶
Only one batch per case processes at a time. The next batch is determined by: 1. Query non-terminal batches ordered by ID 2. Skip batches with terminal loader_status 3. Skip stuck batches (active status but no "processing" loader_status for 6+ hours) 4. First non-skipped batch must match current batch_id
If current batch isn't next in line, requeue with 5-minute delay.
Per-Batch Infrastructure Creation¶
def launch_lambda(case_id, batch_id, concurrency):
"""Create all per-batch AWS resources."""
sqs_client = boto3_client("sqs")
lambda_client = boto3_client("lambda")
# 1. Create main SQS queue
# Naming: {STACK_NAME_PREFIX}_doc_loader_{case_id}_{batch_id}
queue_url, queue_arn = create_queue(sqs_client, case_id, batch_id)
# 2. Create Dead Letter Queue
# Naming: {STACK_NAME_PREFIX}_doc_loader_{case_id}_{batch_id}_dlq
dlq_arn = create_dlq(sqs_client, case_id, batch_id)
# 3. Configure redrive policy (main queue → DLQ after 3 failed receives)
update_queue_with_dlq(sqs_client, queue_url, dlq_arn)
# 4. Set queue policy allowing SNS to send messages
update_queue_policy(queue_url, queue_arn, sns_topic_arn)
# 5. Subscribe queue to SNS with filter policy
subscribe_queue_to_sns(queue_arn, sns_topic_arn, case_id, batch_id)
# 6. Grant IAM permissions to Lambda role
# - SecretsManager: GetSecretValue, DescribeSecret
# - RDS Proxy: rds-db:connect
# - SQS (dev only): ReceiveMessage, DeleteMessage, etc.
grant_permissions(iam_client, role_name, policies)
# 7. Create Lambda function (cloned from source with per-batch env vars)
create_lambda(lambda_client, new_lambda_name, zip_content, ...)
# 8. Attach event source mapping (SQS → Lambda)
attach_event_source(lambda_client, sqs_client, lambda_name, queue_url, concurrency)
# 9. Set reserved concurrency (from Npcases.max_processing_daemons)
client.put_function_concurrency(
FunctionName=lambda_name,
ReservedConcurrentExecutions=concurrency
)
SNS Filter Policy¶
Each batch's queue only receives events for that specific batch:
filter_policy = {
"eventType": ["DOCUMENT_PROCESSED", "BATCH_DOCUMENT_RELATIONS", "BATCH_ATTACHMENTS"],
"caseId": [{"numeric": ["=", case_id]}],
"batchId": [{"numeric": ["=", batch_id]}],
}
Resource Naming Convention¶
Lambda: {STACK_NAME_PREFIX}_doc_loader_{case_id}_{batch_id}
Queue: {STACK_NAME_PREFIX}_doc_loader_{case_id}_{batch_id}
DLQ: {STACK_NAME_PREFIX}_doc_loader_{case_id}_{batch_id}_dlq
Queue Depth Monitoring and Completion¶
When a JOB_FINISHED event arrives, the job processor monitors the document loader's queue to determine when processing is actually complete.
def process_job_finished(message):
# Re-enqueue once for SQS attribute consistency delay
if not message.get("reenqueued"):
message["reenqueued"] = True
send_sqs_message(jobs_queue_url, message, delay=60)
return
# Check queue depth (visible + delayed + in-flight messages)
if queue_messages_count(doc_loader_queue_url) > 0:
# Still processing — check again in 3 minutes
send_sqs_message(jobs_queue_url, message, delay=180)
return
# Queue empty — check DLQ
dlq_count = queue_messages_count(dlq_url)
dlq_redrive_count = message.get("dlq_redrive_count", 0)
if dlq_count > 0 and dlq_redrive_count <= max_redrives:
# Redrive DLQ messages back to main queue
is_final = (dlq_redrive_count == max_redrives)
redrive_dlq_messages(npcase_id, batch_id, final_pass=is_final)
message["dlq_redrive_count"] = dlq_redrive_count + 1
send_sqs_message(jobs_queue_url, message, delay=180)
else:
# Done — trigger batch end
SNS._publish_direct({
**SNS.message_json,
"eventType": "BATCH_END_START",
"status": "SUCCESS",
})
Queue Depth Calculation¶
All three SQS message states must be checked:
def queue_messages_count(queue_url):
"""Sum all message states — visible, delayed, and in-flight."""
attrs = sqs_client.get_queue_attributes(
QueueUrl=queue_url,
AttributeNames=[
"ApproximateNumberOfMessages", # Visible, ready to receive
"ApproximateNumberOfMessagesDelayed", # Waiting for delay to expire
"ApproximateNumberOfMessagesNotVisible", # In-flight (being processed)
],
)
return (
int(attrs["ApproximateNumberOfMessages"])
+ int(attrs["ApproximateNumberOfMessagesDelayed"])
+ int(attrs["ApproximateNumberOfMessagesNotVisible"])
)
Atomic Teardown¶
When BATCH_END_FINISHED arrives, tear down all per-batch resources in order:
def process_batch_end_finished(message):
npcase_id = int(message.get("caseId"))
batch_id = int(message.get("batchId"))
# 1. Update batch status atomically
with writer_session(npcase_id=npcase_id) as session:
batch = session.get(Batches, batch_id)
if batch:
batch.loader_status = "complete"
# 2. Clean up AWS resources (each step is idempotent)
_safely_cleanup_resource(unsubscribe_sns, "SNS subscription")
_safely_cleanup_resource(detach_event_source, "Event source mapping")
_safely_cleanup_resource(delete_lambda, "Lambda function")
_safely_cleanup_resource(delete_sqs_queue, "Main SQS queue")
# 3. Delete DLQ only if empty
if not queue_messages_count(dlq_url):
_safely_cleanup_resource(delete_sqs_queue_dlq, "DLQ")
# 4. Truncate temp tables (doc_dedupe, tag_dedupe, process_checkpoints)
cleanup_temp_tables(npcase_id, batch_id)
# 5. Publish final notifications
SNS._publish_direct({...eventType: "LOADER_FINISHED"...})
Safe Cleanup Helper¶
Each cleanup step is wrapped to handle "resource already gone" gracefully:
def _safely_cleanup_resource(cleanup_action, description):
try:
cleanup_action()
return True
except Exception as e:
if "non-existent" in str(e).lower() or "not found" in str(e).lower():
log_message("info", f"Already gone: {description}")
return True
log_message("error", f"Cleanup failed: {description}: {e}")
return False
Event Routing¶
The job processor handles four event types:
| Event | Trigger | Action |
|---|---|---|
JOB_STARTED |
Rails publishes when batch is ready | Validate, create infra, launch |
JOB_FINISHED |
Rails publishes when all docs sent | Monitor queues, redrive DLQ, finalize |
IMPORT_CANCELLED |
User cancels import | Tear down (full or partial) |
BATCH_END_FINISHED |
Internal: batch processing complete | Atomic cleanup of all resources |
Key Design Decisions¶
-
Single-record batch size — Job processor Lambda processes one SQS record at a time. On failure, the entire invocation fails and SQS retries the message.
-
Re-enqueue for consistency — JOB_FINISHED always re-enqueues once with 60s delay because SQS queue depth attributes are eventually consistent.
-
Lambda existence as lock — Whether a Lambda function exists is used as a distributed lock to prevent duplicate infrastructure creation.
-
Grace period for orphan detection — 15-minute grace period before declaring a batch orphaned, to account for Lambda cold starts and infrastructure creation time.
-
Idempotent cleanup — Every cleanup step handles "resource already gone" gracefully so teardown can be retried safely.
Ask questions about Nextpoint architecture, patterns, rules, or any module. Powered by Claude Opus 4.6.