Skip to content

SQS Lambda Handler Pattern

Purpose

Standardize how Lambda functions process SQS messages with consistent error handling, context management, and partial batch failure support.

Handler Structure

# handlers/index.py

from typing import Any, Dict

def lambda_handler(event: Dict[str, Any], _context) -> Dict[str, Any]:
    """Process SQS batch with individual message error handling."""
    batch_item_failures = []

    for record in event.get("Records", []):
        try:
            # 1. Set request context
            set_event(record)
            message = _parse_record(record)
            set_npcase(str(message.get("caseId")))

            # 2. Lazy-load shared context (once per invocation)
            if not get_batch_context():
                with writer_session() as session:
                    batch_data = _load_batch_context(session, message)
                    set_batch_context(batch_data)

            # 3. Initialize event publisher
            SNS(message)

            # 4. Route to business logic
            process_event_type(message)

        except Exception as e:
            _handle_exception(e, record, batch_item_failures)

        finally:
            # 5. Reset per-record context
            set_event(None)
            configure_log_context()

    # 6. Reset per-invocation context
    set_batch_context(None)

    # 7. Report partial failures
    return {"batchItemFailures": batch_item_failures}

The Six Steps

1. Set Request Context

Store the current SQS record and case ID in ContextVars for logging and tracing.

2. Lazy-Load Shared Context

Some data is shared across all records in a batch (e.g., batch metadata). Load it once on the first record, reuse for subsequent records.

3. Initialize Event Publisher

Set up the SNS publisher with base message fields from the incoming event.

4. Route to Business Logic

Dispatch based on event type to the appropriate handler function:

def process_event_type(message: dict) -> None:
    event_type = message.get("eventType")
    handlers = {
        EventType.DOCUMENT_PROCESSED.value: handle_document_processed,
        EventType.BATCH_DOCUMENT_RELATIONS.value: handle_batch_relations,
        EventType.BATCH_ATTACHMENTS.value: handle_batch_attachments,
    }
    handler = handlers.get(event_type)
    if handler:
        handler(message)
    else:
        raise PermanentFailureException(f"Unknown event type: {event_type}")

5. Reset Per-Record Context

Clean up context after each record to prevent leaking between records.

6. Report Partial Failures

Return failed message IDs so SQS retries only the failed records, not the entire batch.

Message Parsing

SQS messages from SNS are double-encoded:

def _parse_record(record: dict) -> dict:
    """Extract the actual message from SNS→SQS envelope."""
    body = json.loads(record.get("body", "{}"))
    message_str = body.get("Message", "{}")
    return json.loads(message_str)

Exception Handling

See patterns/exception-hierarchy.md for the full exception routing table.

Job Processor Variant

For single-record processing (job orchestration), use a simpler pattern:

def lambda_handler(event: Dict[str, Any], _context) -> Dict[str, Any]:
    """Single record — entire invocation fails on error."""
    for record in event.get("Records", []):
        try:
            message = preprocess_record(record)
            SNS(message)
            process_single_record(message)
        except Exception as e:
            # Publish error event for monitoring
            SNS._publish_direct({**SNS.message_json, "status": "FAILED"})
            raise  # Fail entire invocation → SQS retries

    return {"statusCode": 200, "body": "ok"}

Key Rules

  1. Always return batchItemFailures — enables partial batch processing
  2. Parse the double-encoded envelope — SNS wraps the message before SQS delivers it
  3. Reset context in finally — prevents record context leaking to the next record
  4. Lazy-load shared context — don't hit the database for every record in the batch
  5. Unknown event types are permanent failures — don't retry what you can't handle
Ask the Architecture ×

Ask questions about Nextpoint architecture, patterns, rules, or any module. Powered by Claude Opus 4.6.