Skip to content

Job Processor Orchestration Pattern

Purpose

Manage the lifecycle of per-batch processing infrastructure: create SQS queues, DLQs, Lambda functions, and SNS subscriptions when a batch starts, monitor progress, coordinate DLQ redrives, and tear everything down when the batch completes.

This is the most complex pattern in the platform. It coordinates the entire processing lifecycle for a batch of documents.

Architecture Overview

JOB_STARTED event
┌─────────────────────────────────┐
│  Job Processor Lambda           │
│  (single-record, batch size 1)  │
│                                 │
│  1. Validate batch state        │
│  2. Check for stuck imports     │
│  3. Verify batch ordering       │
│  4. Create per-batch infra      │
│  5. Monitor queue depth         │
│  6. Coordinate DLQ redrives     │
│  7. Tear down infrastructure    │
└─────────────────────────────────┘
    │                    │
    ▼                    ▼
┌──────────┐    ┌──────────────┐
│ SQS Queue│    │ DLQ          │
│ (per     │    │ (per batch)  │
│  batch)  │    │              │
└────┬─────┘    └──────────────┘
┌──────────────────────────────┐
│ Document Loader Lambda       │
│ (per batch, N concurrency)   │
│                              │
│ Processes documents via      │
│ checkpoint pipeline          │
└──────────────────────────────┘

Batch Validation State Machine

Before creating infrastructure, the job processor validates that this batch should be processed. This prevents duplicate infrastructure and enforces sequential batch ordering per case.

class BatchValidationResult(Enum):
    SKIP_CANCELLED = "skip_cancelled"     # Batch was cancelled — do nothing
    SKIP_DUPLICATE = "skip_duplicate"     # Lambda already exists and running
    SKIP_INCOMPLETE = "skip_incomplete"   # Lambda exists but status is stale
    REQUEUE_DB_ERROR = "requeue_db_error" # DB unavailable — retry later
    CONTINUE = "continue"                 # Proceed with infrastructure creation

Validation Flow

def process_job_started(message):
    npcase_id = int(message.get("caseId"))
    batch_id = int(message.get("batchId"))

    # Step 1: Validate batch exists and isn't terminal
    batch, result = _validate_batch_state(npcase_id, batch_id, message)
    if result != BatchValidationResult.CONTINUE:
        return

    # Step 2: Check if Lambda already exists (idempotency)
    lambda_result = _handle_lambda_state(batch, npcase_id, batch_id)
    if lambda_result != BatchValidationResult.CONTINUE:
        return

    # Step 3: Check for stuck imports and batch ordering
    if _can_process_current_job(message):
        _launch_batch_processing(npcase_id, batch_id)

Stuck Import Detection

Active imports are detected by checking: 1. Is another batch's loader_status "processing"? 2. Does that batch's Lambda function still exist? 3. Is the lock recent (within 15-minute grace period)?

def _check_and_handle_stuck_imports(npcase_id, current_batch_id):
    """Check for orphaned batches and clean them up."""
    potentially_active = session.query(Batches).filter(
        Batches.npcase_id == npcase_id,
        Batches.id != current_batch_id,
        Batches.loader_status == "processing",
    ).all()

    for batch in potentially_active:
        lambda_exists = _does_lambda_exist(batch.npcase_id, batch.id)
        lock_time = batch.loader_status_updated_at_gmt
        grace_period = now - timedelta(minutes=15)

        if lambda_exists:
            return True  # Genuinely active — block current batch

        if not lambda_exists and lock_time and lock_time > grace_period:
            return True  # Recent lock, Lambda may be starting — block

        if not lambda_exists and (not lock_time or lock_time <= grace_period):
            # Orphaned batch — clean up and continue
            batch.loader_status = "in-complete"
            return False

Sequential Batch Ordering

Only one batch per case processes at a time. The next batch is determined by: 1. Query non-terminal batches ordered by ID 2. Skip batches with terminal loader_status 3. Skip stuck batches (active status but no "processing" loader_status for 6+ hours) 4. First non-skipped batch must match current batch_id

If current batch isn't next in line, requeue with 5-minute delay.

Per-Batch Infrastructure Creation

def launch_lambda(case_id, batch_id, concurrency):
    """Create all per-batch AWS resources."""
    sqs_client = boto3_client("sqs")
    lambda_client = boto3_client("lambda")

    # 1. Create main SQS queue
    #    Naming: {STACK_NAME_PREFIX}_doc_loader_{case_id}_{batch_id}
    queue_url, queue_arn = create_queue(sqs_client, case_id, batch_id)

    # 2. Create Dead Letter Queue
    #    Naming: {STACK_NAME_PREFIX}_doc_loader_{case_id}_{batch_id}_dlq
    dlq_arn = create_dlq(sqs_client, case_id, batch_id)

    # 3. Configure redrive policy (main queue → DLQ after 3 failed receives)
    update_queue_with_dlq(sqs_client, queue_url, dlq_arn)

    # 4. Set queue policy allowing SNS to send messages
    update_queue_policy(queue_url, queue_arn, sns_topic_arn)

    # 5. Subscribe queue to SNS with filter policy
    subscribe_queue_to_sns(queue_arn, sns_topic_arn, case_id, batch_id)

    # 6. Grant IAM permissions to Lambda role
    #    - SecretsManager: GetSecretValue, DescribeSecret
    #    - RDS Proxy: rds-db:connect
    #    - SQS (dev only): ReceiveMessage, DeleteMessage, etc.
    grant_permissions(iam_client, role_name, policies)

    # 7. Create Lambda function (cloned from source with per-batch env vars)
    create_lambda(lambda_client, new_lambda_name, zip_content, ...)

    # 8. Attach event source mapping (SQS → Lambda)
    attach_event_source(lambda_client, sqs_client, lambda_name, queue_url, concurrency)

    # 9. Set reserved concurrency (from Npcases.max_processing_daemons)
    client.put_function_concurrency(
        FunctionName=lambda_name,
        ReservedConcurrentExecutions=concurrency
    )

SNS Filter Policy

Each batch's queue only receives events for that specific batch:

filter_policy = {
    "eventType": ["DOCUMENT_PROCESSED", "BATCH_DOCUMENT_RELATIONS", "BATCH_ATTACHMENTS"],
    "caseId": [{"numeric": ["=", case_id]}],
    "batchId": [{"numeric": ["=", batch_id]}],
}

Resource Naming Convention

Lambda:  {STACK_NAME_PREFIX}_doc_loader_{case_id}_{batch_id}
Queue:   {STACK_NAME_PREFIX}_doc_loader_{case_id}_{batch_id}
DLQ:     {STACK_NAME_PREFIX}_doc_loader_{case_id}_{batch_id}_dlq

Queue Depth Monitoring and Completion

When a JOB_FINISHED event arrives, the job processor monitors the document loader's queue to determine when processing is actually complete.

def process_job_finished(message):
    # Re-enqueue once for SQS attribute consistency delay
    if not message.get("reenqueued"):
        message["reenqueued"] = True
        send_sqs_message(jobs_queue_url, message, delay=60)
        return

    # Check queue depth (visible + delayed + in-flight messages)
    if queue_messages_count(doc_loader_queue_url) > 0:
        # Still processing — check again in 3 minutes
        send_sqs_message(jobs_queue_url, message, delay=180)
        return

    # Queue empty — check DLQ
    dlq_count = queue_messages_count(dlq_url)
    dlq_redrive_count = message.get("dlq_redrive_count", 0)

    if dlq_count > 0 and dlq_redrive_count <= max_redrives:
        # Redrive DLQ messages back to main queue
        is_final = (dlq_redrive_count == max_redrives)
        redrive_dlq_messages(npcase_id, batch_id, final_pass=is_final)
        message["dlq_redrive_count"] = dlq_redrive_count + 1
        send_sqs_message(jobs_queue_url, message, delay=180)
    else:
        # Done — trigger batch end
        SNS._publish_direct({
            **SNS.message_json,
            "eventType": "BATCH_END_START",
            "status": "SUCCESS",
        })

Queue Depth Calculation

All three SQS message states must be checked:

def queue_messages_count(queue_url):
    """Sum all message states — visible, delayed, and in-flight."""
    attrs = sqs_client.get_queue_attributes(
        QueueUrl=queue_url,
        AttributeNames=[
            "ApproximateNumberOfMessages",          # Visible, ready to receive
            "ApproximateNumberOfMessagesDelayed",    # Waiting for delay to expire
            "ApproximateNumberOfMessagesNotVisible", # In-flight (being processed)
        ],
    )
    return (
        int(attrs["ApproximateNumberOfMessages"])
        + int(attrs["ApproximateNumberOfMessagesDelayed"])
        + int(attrs["ApproximateNumberOfMessagesNotVisible"])
    )

Atomic Teardown

When BATCH_END_FINISHED arrives, tear down all per-batch resources in order:

def process_batch_end_finished(message):
    npcase_id = int(message.get("caseId"))
    batch_id = int(message.get("batchId"))

    # 1. Update batch status atomically
    with writer_session(npcase_id=npcase_id) as session:
        batch = session.get(Batches, batch_id)
        if batch:
            batch.loader_status = "complete"

    # 2. Clean up AWS resources (each step is idempotent)
    _safely_cleanup_resource(unsubscribe_sns, "SNS subscription")
    _safely_cleanup_resource(detach_event_source, "Event source mapping")
    _safely_cleanup_resource(delete_lambda, "Lambda function")
    _safely_cleanup_resource(delete_sqs_queue, "Main SQS queue")

    # 3. Delete DLQ only if empty
    if not queue_messages_count(dlq_url):
        _safely_cleanup_resource(delete_sqs_queue_dlq, "DLQ")

    # 4. Truncate temp tables (doc_dedupe, tag_dedupe, process_checkpoints)
    cleanup_temp_tables(npcase_id, batch_id)

    # 5. Publish final notifications
    SNS._publish_direct({...eventType: "LOADER_FINISHED"...})

Safe Cleanup Helper

Each cleanup step is wrapped to handle "resource already gone" gracefully:

def _safely_cleanup_resource(cleanup_action, description):
    try:
        cleanup_action()
        return True
    except Exception as e:
        if "non-existent" in str(e).lower() or "not found" in str(e).lower():
            log_message("info", f"Already gone: {description}")
            return True
        log_message("error", f"Cleanup failed: {description}: {e}")
        return False

Event Routing

The job processor handles four event types:

Event Trigger Action
JOB_STARTED Rails publishes when batch is ready Validate, create infra, launch
JOB_FINISHED Rails publishes when all docs sent Monitor queues, redrive DLQ, finalize
IMPORT_CANCELLED User cancels import Tear down (full or partial)
BATCH_END_FINISHED Internal: batch processing complete Atomic cleanup of all resources

Key Design Decisions

  1. Single-record batch size — Job processor Lambda processes one SQS record at a time. On failure, the entire invocation fails and SQS retries the message.

  2. Re-enqueue for consistency — JOB_FINISHED always re-enqueues once with 60s delay because SQS queue depth attributes are eventually consistent.

  3. Lambda existence as lock — Whether a Lambda function exists is used as a distributed lock to prevent duplicate infrastructure creation.

  4. Grace period for orphan detection — 15-minute grace period before declaring a batch orphaned, to account for Lambda cold starts and infrastructure creation time.

  5. Idempotent cleanup — Every cleanup step handles "resource already gone" gracefully so teardown can be retried safely.

Ask the Architecture ×

Ask questions about Nextpoint architecture, patterns, rules, or any module. Powered by Claude Opus 4.6.