SQS Lambda Handler Pattern¶
Purpose¶
Standardize how Lambda functions process SQS messages with consistent error handling, context management, and partial batch failure support.
Handler Structure¶
# handlers/index.py
from typing import Any, Dict
def lambda_handler(event: Dict[str, Any], _context) -> Dict[str, Any]:
"""Process SQS batch with individual message error handling."""
batch_item_failures = []
for record in event.get("Records", []):
try:
# 1. Set request context
set_event(record)
message = _parse_record(record)
set_npcase(str(message.get("caseId")))
# 2. Lazy-load shared context (once per invocation)
if not get_batch_context():
with writer_session() as session:
batch_data = _load_batch_context(session, message)
set_batch_context(batch_data)
# 3. Initialize event publisher
SNS(message)
# 4. Route to business logic
process_event_type(message)
except Exception as e:
_handle_exception(e, record, batch_item_failures)
finally:
# 5. Reset per-record context
set_event(None)
configure_log_context()
# 6. Reset per-invocation context
set_batch_context(None)
# 7. Report partial failures
return {"batchItemFailures": batch_item_failures}
The Six Steps¶
1. Set Request Context¶
Store the current SQS record and case ID in ContextVars for logging and tracing.
2. Lazy-Load Shared Context¶
Some data is shared across all records in a batch (e.g., batch metadata). Load it once on the first record, reuse for subsequent records.
3. Initialize Event Publisher¶
Set up the SNS publisher with base message fields from the incoming event.
4. Route to Business Logic¶
Dispatch based on event type to the appropriate handler function:
def process_event_type(message: dict) -> None:
event_type = message.get("eventType")
handlers = {
EventType.DOCUMENT_PROCESSED.value: handle_document_processed,
EventType.BATCH_DOCUMENT_RELATIONS.value: handle_batch_relations,
EventType.BATCH_ATTACHMENTS.value: handle_batch_attachments,
}
handler = handlers.get(event_type)
if handler:
handler(message)
else:
raise PermanentFailureException(f"Unknown event type: {event_type}")
5. Reset Per-Record Context¶
Clean up context after each record to prevent leaking between records.
6. Report Partial Failures¶
Return failed message IDs so SQS retries only the failed records, not the entire batch.
Message Parsing¶
SQS messages from SNS are double-encoded:
def _parse_record(record: dict) -> dict:
"""Extract the actual message from SNS→SQS envelope."""
body = json.loads(record.get("body", "{}"))
message_str = body.get("Message", "{}")
return json.loads(message_str)
Exception Handling¶
See patterns/exception-hierarchy.md for the full exception routing table.
Job Processor Variant¶
For single-record processing (job orchestration), use a simpler pattern:
def lambda_handler(event: Dict[str, Any], _context) -> Dict[str, Any]:
"""Single record — entire invocation fails on error."""
for record in event.get("Records", []):
try:
message = preprocess_record(record)
SNS(message)
process_single_record(message)
except Exception as e:
# Publish error event for monitoring
SNS._publish_direct({**SNS.message_json, "status": "FAILED"})
raise # Fail entire invocation → SQS retries
return {"statusCode": 200, "body": "ok"}
Key Rules¶
- Always return
batchItemFailures— enables partial batch processing - Parse the double-encoded envelope — SNS wraps the message before SQS delivers it
- Reset context in
finally— prevents record context leaking to the next record - Lazy-load shared context — don't hit the database for every record in the batch
- Unknown event types are permanent failures — don't retry what you can't handle
Ask questions about Nextpoint architecture, patterns, rules, or any module. Powered by Claude Opus 4.6.