Lambda + SQS Integration Pattern¶
Purpose¶
Define the complete pattern for Lambda functions triggered by SQS messages: event source configuration, batch processing semantics, partial failure reporting, concurrency control, timeout alignment, dynamic Lambda cloning, and the two Lambda archetypes used across Nextpoint modules.
Two Lambda Archetypes¶
1. Batch Processing Lambda (Document Loader)¶
Processes multiple records per invocation with partial failure support. Failed records are reported individually; successful records are auto-deleted.
SQS Queue → Lambda (batch size 10, 60s batching window)
│
├── Record 1: ✅ success → auto-deleted
├── Record 2: ❌ failure → reported in batchItemFailures
├── Record 3: ✅ success → auto-deleted
└── ...
│
▼
{"batchItemFailures": [{"itemIdentifier": "record-2-id"}]}
Event source configuration:
event_source_response = lambda_client.create_event_source_mapping(
EventSourceArn=queue_arn,
FunctionName=function_name,
Enabled=True,
BatchSize=10, # Up to 10 records per invocation
MaximumBatchingWindowInSeconds=60, # Wait up to 60s to fill batch
FunctionResponseTypes=["ReportBatchItemFailures"], # Enable partial failure
ScalingConfig={"MaximumConcurrency": concurrency}, # Cap concurrent executions
)
CDK equivalent:
documentLoader.addEventSource(new SqsEventSource(queue, {
batchSize: 10,
maxBatchingWindow: cdk.Duration.seconds(60),
reportBatchItemFailures: true,
maxConcurrency: concurrency,
}));
Handler pattern:
def lambda_handler(event, _context):
batch_item_failures = []
for record in event.get("Records", []):
try:
set_event(record)
message = _parse_record(record)
set_npcase(str(message.get("caseId")))
_setup_logging_context(message)
# Lazy-load batch context once per invocation
if not get_batch_context():
with writer_session() as session:
batch_data = _get_batch_data_for_import(session, message.get("batchId"))
set_batch_context(batch_data)
SNS(message)
process_event_type(message=message)
except Exception as e:
_handle_exception(e, record, batch_item_failures)
finally:
set_event(None)
configure_log_context()
set_batch_context(None)
return {"batchItemFailures": batch_item_failures}
2. Single-Record Lambda (Job Processor, Batch Processor)¶
Processes one record per invocation. On failure, the entire invocation fails and SQS retries the message after the visibility timeout.
SQS Queue → Lambda (batch size 1, max concurrency 40)
│
└── Record 1: ❌ failure → raise → SQS retries
CDK configuration:
Handler pattern:
def lambda_handler(event, _context):
for record in event.get("Records", []):
try:
process_single_record(record)
except Exception as e:
# Publish error notification
SNS._publish_direct({**SNS.message_json, "eventType": "ERROR", ...})
raise # Fail entire invocation → SQS retries
return {"statusCode": 200, "body": "ok"}
When to Use Which¶
| Use Case | Archetype | Batch Size | Why |
|---|---|---|---|
| High-volume document processing | Batch | 10 | Throughput; partial failures don't block the batch |
| Job orchestration | Single | 1 | Each job needs sequential state machine logic |
| Batch lifecycle management | Single | 1 | Cleanup must succeed or retry entirely |
| Low-volume, order-sensitive work | Single | 1 | Simpler error handling, guaranteed ordering |
Partial Failure Reporting (ReportBatchItemFailures)¶
This is the most important SQS+Lambda configuration decision.
Without ReportBatchItemFailures, one failed record causes SQS to retry
the entire batch — including records that already succeeded. This leads
to duplicate processing and wasted compute.
With it, only failed records are retried:
def _handle_exception(e, record, batch_item_failures):
message_id = record.get("messageId", "N/A")
if isinstance(e, PermanentFailureException):
# Direct to DLQ — do NOT add to batchItemFailures
# SQS auto-deletes records NOT in the failures list
try:
send_to_dlq(record)
except Exception as dlq_err:
# Fallback: report as failure so SQS retries via standard path
batch_item_failures.append({"itemIdentifier": message_id})
change_message_visibility(_parse_record(record), record.get("receiptHandle"), 300)
elif isinstance(e, RecoverableException):
# Requeue with backoff — do NOT add to batchItemFailures
# Original message auto-deleted after successful requeue
try:
handle_requeue_exception(record, e)
except Exception as requeue_err:
batch_item_failures.append({"itemIdentifier": message_id})
elif isinstance(e, (json.JSONDecodeError, KeyError, ValueError)):
# Parsing error — report failure, extend visibility to 5 min
batch_item_failures.append({"itemIdentifier": message_id})
change_message_visibility(_parse_record(record), record.get("receiptHandle"), 300)
elif isinstance(e, OperationalError):
# Database error — report failure, extend visibility to 2 min
batch_item_failures.append({"itemIdentifier": message_id})
change_message_visibility(_parse_record(record), record.get("receiptHandle"), 120)
elif isinstance(e, SilentSuccessException):
# Intentional skip — do NOT report as failure
# SQS auto-deletes the message (treated as success)
pass
else:
# Unknown error — report failure, standard SQS retry
batch_item_failures.append({"itemIdentifier": message_id})
Key Insight: What Goes in batchItemFailures¶
| Exception | In batchItemFailures? | Why |
|---|---|---|
| PermanentFailure | No | Sent directly to DLQ; SQS auto-deletes |
| Recoverable | No | Requeued with backoff; original auto-deleted |
| SilentSuccess | No | Not an error; SQS auto-deletes |
| OperationalError | Yes | Standard SQS retry with visibility timeout |
| Parse errors | Yes | Standard SQS retry with visibility timeout |
| Unknown errors | Yes | Standard SQS retry |
Timeout and Visibility Alignment¶
Rule: SQS visibility timeout ≥ Lambda timeout.
If the visibility timeout expires before Lambda finishes, SQS delivers the message to another invocation — causing duplicate processing.
| Component | Timeout | Purpose |
|---|---|---|
| Lambda timeout | 15 minutes (900s) | Max execution time |
| SQS visibility timeout | 15 minutes (900s) | Message invisible while processing |
| DLQ visibility timeout | 15 minutes (900s) | Same as main queue |
Set in CDK:
const queue = new sqs.Queue(this, 'Queue', {
visibilityTimeout: cdk.Duration.minutes(15), // Match Lambda timeout
});
Set in dynamic queue creation:
Dynamic Lambda Cloning¶
The Job Processor creates per-batch Lambda functions by cloning from the CDK-deployed source Lambda:
def launch_lambda(case_id, batch_id, concurrency):
# 1. Download source Lambda code
source_lambda = f"{STACK_NAME_PREFIX}-DocumentLoader"
zip_content = download_lambda_code(lambda_client, source_lambda)
# 2. Create new function with per-batch env vars
new_lambda = f"{STACK_NAME_PREFIX}_doc_loader_{case_id}_{batch_id}"
create_lambda(
lambda_client=lambda_client,
function_name=new_lambda,
zip_content=zip_content, # Same code as source
runtime="python3.13",
handler="index.lambda_handler",
role_arn=role_arn,
env_vars={
**base_env_vars,
"RDS_CASE_DBNAME": f"{rds_dbname}_case_{case_id}", # Per-case DB
},
vpc_config=vpc_config,
layers=layer_arns,
tags=TAGS,
timeout=900,
memory_size=256,
log_group_name=log_group_name,
)
# 3. Attach to per-batch SQS queue
attach_event_source(lambda_client, sqs_client, new_lambda, queue_url, concurrency)
# 4. Set reserved concurrency from case config
lambda_client.put_function_concurrency(
FunctionName=new_lambda,
ReservedConcurrentExecutions=concurrency,
)
Lambda Creation Details¶
lambda_client.create_function(
FunctionName=function_name,
Runtime=runtime,
Role=role_arn,
Handler=handler,
Code={"ZipFile": zip_content},
Environment={"Variables": env_vars},
VpcConfig=vpc_config,
Layers=layers,
Timeout=timeout,
MemorySize=memory_size,
Publish=True,
PackageType="Zip",
Tags=tags,
LoggingConfig={"LogGroup": log_group_name}, # Pre-created log group
)
# Enable recursive loop — job processor re-enqueues to its own queue
lambda_client.put_function_recursion_config(
FunctionName=function_name,
RecursiveLoop="Allow",
)
Log Group Must Pre-Exist¶
Lambda won't create its own log group — it's pre-created by CDK. The
assert_log_group_exists() check runs before create_function():
def assert_log_group_exists(log_group_name):
"""Raise RuntimeError if log group doesn't exist."""
paginator = logs_client.get_paginator("describe_log_groups")
for page in paginator.paginate(logGroupNamePrefix=log_group_name):
for grp in page.get("logGroups", []):
if grp["logGroupName"] == log_group_name:
return
raise RuntimeError(f"Required log group '{log_group_name}' does not exist")
Idempotent Creation¶
create_function handles ResourceConflictException (already exists) gracefully:
except ClientError as e:
if e.response["Error"]["Code"] == "ResourceConflictException":
log_message("info", "Lambda function already exists.")
else:
raise
Concurrency Control¶
Static (CDK-Deployed Lambdas)¶
| Lambda | Reserved Concurrency | Reason |
|---|---|---|
| DocumentLoader | None | Dynamically created per-batch |
| JobProcessor | 40 | Prevents runaway job orchestration |
| BatchProcessor | 40 | Prevents runaway batch lifecycle |
Dynamic (Per-Batch Lambdas)¶
Reserved concurrency set from Npcases.max_processing_daemons (default: 6):
concurrency = get_npcase_max_processing_daemons(npcase_id) # Per-case setting
lambda_client.put_function_concurrency(
FunctionName=doc_loader_lambda,
ReservedConcurrentExecutions=concurrency,
)
This controls how many documents are processed in parallel per batch.
Event Source MaximumConcurrency¶
Separate from Lambda reserved concurrency — this caps how many Lambda invocations the event source mapping creates:
event_source_response = lambda_client.create_event_source_mapping(
...
ScalingConfig={"MaximumConcurrency": concurrency},
)
Lambda Configuration Summary¶
| Parameter | Document Loader | Job Processor | Batch Processor |
|---|---|---|---|
| Runtime | Python 3.13 | Python 3.13 | Python 3.13 |
| Memory | 256 MB | 128 MB (default) | 256 MB |
| Timeout | 15 min | 15 min | 15 min |
| Reserved Concurrency | Per-case (default 6) | 40 | 40 |
| Batch Size | 10 | 1 | 1 |
| Batching Window | 60s | None | None |
| Report Failures | Yes | No (single-record) | No (single-record) |
| RecursiveLoop | Allow | Allow | Allow |
| VPC | Private subnets | Private subnets | Private subnets |
| Layer | Python packages | Python packages | Python packages |
Global State in Lambda¶
Lambda reuses the runtime across invocations. Module-level objects persist:
# database.py — engine cache persists across invocations
_engines_cache: dict[str, ScopedSession] = {}
_cache_access_times: dict[str, float] = {}
Rules for global state:
- AWS clients (boto3.client) — safe to cache globally
- Engine cache — safe with LRU eviction and proper disposal
- Logger — safe (Lambda is single-request-at-a-time)
- ContextVars (npcase_id, event, batch_context) — must reset in handler's finally block
Key Rules¶
- Match visibility timeout to Lambda timeout — prevents duplicate delivery
- Use
ReportBatchItemFailuresfor batch processing — prevents entire-batch retry - Batch size 1 for orchestrators — simplifies error handling and state management
- Reserved concurrency on orchestrators — prevents runaway scaling
- RecursiveLoop = Allow — required when Lambda re-enqueues to its own queue
- Pre-create log groups — Lambda with
LoggingConfigrequires the group to exist - Idempotent Lambda creation — handle
ResourceConflictExceptiongracefully - Reset ContextVars in
finally— global state persists across invocations - PermanentFailure NOT in batchItemFailures — send to DLQ directly, SQS auto-deletes
- SilentSuccess NOT in batchItemFailures — treated as success, SQS auto-deletes
Ask questions about Nextpoint architecture, patterns, rules, or any module. Powered by Claude Opus 4.6.