Skip to content

Lambda + SQS Integration Pattern

Purpose

Define the complete pattern for Lambda functions triggered by SQS messages: event source configuration, batch processing semantics, partial failure reporting, concurrency control, timeout alignment, dynamic Lambda cloning, and the two Lambda archetypes used across Nextpoint modules.

Two Lambda Archetypes

1. Batch Processing Lambda (Document Loader)

Processes multiple records per invocation with partial failure support. Failed records are reported individually; successful records are auto-deleted.

SQS Queue → Lambda (batch size 10, 60s batching window)
                ├── Record 1: ✅ success → auto-deleted
                ├── Record 2: ❌ failure → reported in batchItemFailures
                ├── Record 3: ✅ success → auto-deleted
                └── ...
            {"batchItemFailures": [{"itemIdentifier": "record-2-id"}]}

Event source configuration:

event_source_response = lambda_client.create_event_source_mapping(
    EventSourceArn=queue_arn,
    FunctionName=function_name,
    Enabled=True,
    BatchSize=10,                                    # Up to 10 records per invocation
    MaximumBatchingWindowInSeconds=60,               # Wait up to 60s to fill batch
    FunctionResponseTypes=["ReportBatchItemFailures"],  # Enable partial failure
    ScalingConfig={"MaximumConcurrency": concurrency},  # Cap concurrent executions
)

CDK equivalent:

documentLoader.addEventSource(new SqsEventSource(queue, {
  batchSize: 10,
  maxBatchingWindow: cdk.Duration.seconds(60),
  reportBatchItemFailures: true,
  maxConcurrency: concurrency,
}));

Handler pattern:

def lambda_handler(event, _context):
    batch_item_failures = []

    for record in event.get("Records", []):
        try:
            set_event(record)
            message = _parse_record(record)
            set_npcase(str(message.get("caseId")))
            _setup_logging_context(message)

            # Lazy-load batch context once per invocation
            if not get_batch_context():
                with writer_session() as session:
                    batch_data = _get_batch_data_for_import(session, message.get("batchId"))
                    set_batch_context(batch_data)

            SNS(message)
            process_event_type(message=message)

        except Exception as e:
            _handle_exception(e, record, batch_item_failures)
        finally:
            set_event(None)
            configure_log_context()

    set_batch_context(None)
    return {"batchItemFailures": batch_item_failures}

2. Single-Record Lambda (Job Processor, Batch Processor)

Processes one record per invocation. On failure, the entire invocation fails and SQS retries the message after the visibility timeout.

SQS Queue → Lambda (batch size 1, max concurrency 40)
                └── Record 1: ❌ failure → raise → SQS retries

CDK configuration:

jobProcessor.addEventSource(new SqsEventSource(jobQueue, {
  batchSize: 1,
  maxConcurrency: 40,
}));

Handler pattern:

def lambda_handler(event, _context):
    for record in event.get("Records", []):
        try:
            process_single_record(record)
        except Exception as e:
            # Publish error notification
            SNS._publish_direct({**SNS.message_json, "eventType": "ERROR", ...})
            raise  # Fail entire invocation → SQS retries

    return {"statusCode": 200, "body": "ok"}

When to Use Which

Use Case Archetype Batch Size Why
High-volume document processing Batch 10 Throughput; partial failures don't block the batch
Job orchestration Single 1 Each job needs sequential state machine logic
Batch lifecycle management Single 1 Cleanup must succeed or retry entirely
Low-volume, order-sensitive work Single 1 Simpler error handling, guaranteed ordering

Partial Failure Reporting (ReportBatchItemFailures)

This is the most important SQS+Lambda configuration decision.

Without ReportBatchItemFailures, one failed record causes SQS to retry the entire batch — including records that already succeeded. This leads to duplicate processing and wasted compute.

With it, only failed records are retried:

def _handle_exception(e, record, batch_item_failures):
    message_id = record.get("messageId", "N/A")

    if isinstance(e, PermanentFailureException):
        # Direct to DLQ — do NOT add to batchItemFailures
        # SQS auto-deletes records NOT in the failures list
        try:
            send_to_dlq(record)
        except Exception as dlq_err:
            # Fallback: report as failure so SQS retries via standard path
            batch_item_failures.append({"itemIdentifier": message_id})
            change_message_visibility(_parse_record(record), record.get("receiptHandle"), 300)

    elif isinstance(e, RecoverableException):
        # Requeue with backoff — do NOT add to batchItemFailures
        # Original message auto-deleted after successful requeue
        try:
            handle_requeue_exception(record, e)
        except Exception as requeue_err:
            batch_item_failures.append({"itemIdentifier": message_id})

    elif isinstance(e, (json.JSONDecodeError, KeyError, ValueError)):
        # Parsing error — report failure, extend visibility to 5 min
        batch_item_failures.append({"itemIdentifier": message_id})
        change_message_visibility(_parse_record(record), record.get("receiptHandle"), 300)

    elif isinstance(e, OperationalError):
        # Database error — report failure, extend visibility to 2 min
        batch_item_failures.append({"itemIdentifier": message_id})
        change_message_visibility(_parse_record(record), record.get("receiptHandle"), 120)

    elif isinstance(e, SilentSuccessException):
        # Intentional skip — do NOT report as failure
        # SQS auto-deletes the message (treated as success)
        pass

    else:
        # Unknown error — report failure, standard SQS retry
        batch_item_failures.append({"itemIdentifier": message_id})

Key Insight: What Goes in batchItemFailures

Exception In batchItemFailures? Why
PermanentFailure No Sent directly to DLQ; SQS auto-deletes
Recoverable No Requeued with backoff; original auto-deleted
SilentSuccess No Not an error; SQS auto-deletes
OperationalError Yes Standard SQS retry with visibility timeout
Parse errors Yes Standard SQS retry with visibility timeout
Unknown errors Yes Standard SQS retry

Timeout and Visibility Alignment

Rule: SQS visibility timeout ≥ Lambda timeout.

If the visibility timeout expires before Lambda finishes, SQS delivers the message to another invocation — causing duplicate processing.

Component Timeout Purpose
Lambda timeout 15 minutes (900s) Max execution time
SQS visibility timeout 15 minutes (900s) Message invisible while processing
DLQ visibility timeout 15 minutes (900s) Same as main queue

Set in CDK:

const queue = new sqs.Queue(this, 'Queue', {
  visibilityTimeout: cdk.Duration.minutes(15),  // Match Lambda timeout
});

Set in dynamic queue creation:

sqs_client.create_queue(
    QueueName=queue_name,
    Attributes={"VisibilityTimeout": "900"},
)

Dynamic Lambda Cloning

The Job Processor creates per-batch Lambda functions by cloning from the CDK-deployed source Lambda:

def launch_lambda(case_id, batch_id, concurrency):
    # 1. Download source Lambda code
    source_lambda = f"{STACK_NAME_PREFIX}-DocumentLoader"
    zip_content = download_lambda_code(lambda_client, source_lambda)

    # 2. Create new function with per-batch env vars
    new_lambda = f"{STACK_NAME_PREFIX}_doc_loader_{case_id}_{batch_id}"
    create_lambda(
        lambda_client=lambda_client,
        function_name=new_lambda,
        zip_content=zip_content,         # Same code as source
        runtime="python3.13",
        handler="index.lambda_handler",
        role_arn=role_arn,
        env_vars={
            **base_env_vars,
            "RDS_CASE_DBNAME": f"{rds_dbname}_case_{case_id}",  # Per-case DB
        },
        vpc_config=vpc_config,
        layers=layer_arns,
        tags=TAGS,
        timeout=900,
        memory_size=256,
        log_group_name=log_group_name,
    )

    # 3. Attach to per-batch SQS queue
    attach_event_source(lambda_client, sqs_client, new_lambda, queue_url, concurrency)

    # 4. Set reserved concurrency from case config
    lambda_client.put_function_concurrency(
        FunctionName=new_lambda,
        ReservedConcurrentExecutions=concurrency,
    )

Lambda Creation Details

lambda_client.create_function(
    FunctionName=function_name,
    Runtime=runtime,
    Role=role_arn,
    Handler=handler,
    Code={"ZipFile": zip_content},
    Environment={"Variables": env_vars},
    VpcConfig=vpc_config,
    Layers=layers,
    Timeout=timeout,
    MemorySize=memory_size,
    Publish=True,
    PackageType="Zip",
    Tags=tags,
    LoggingConfig={"LogGroup": log_group_name},  # Pre-created log group
)

# Enable recursive loop — job processor re-enqueues to its own queue
lambda_client.put_function_recursion_config(
    FunctionName=function_name,
    RecursiveLoop="Allow",
)

Log Group Must Pre-Exist

Lambda won't create its own log group — it's pre-created by CDK. The assert_log_group_exists() check runs before create_function():

def assert_log_group_exists(log_group_name):
    """Raise RuntimeError if log group doesn't exist."""
    paginator = logs_client.get_paginator("describe_log_groups")
    for page in paginator.paginate(logGroupNamePrefix=log_group_name):
        for grp in page.get("logGroups", []):
            if grp["logGroupName"] == log_group_name:
                return
    raise RuntimeError(f"Required log group '{log_group_name}' does not exist")

Idempotent Creation

create_function handles ResourceConflictException (already exists) gracefully:

except ClientError as e:
    if e.response["Error"]["Code"] == "ResourceConflictException":
        log_message("info", "Lambda function already exists.")
    else:
        raise

Concurrency Control

Static (CDK-Deployed Lambdas)

Lambda Reserved Concurrency Reason
DocumentLoader None Dynamically created per-batch
JobProcessor 40 Prevents runaway job orchestration
BatchProcessor 40 Prevents runaway batch lifecycle

Dynamic (Per-Batch Lambdas)

Reserved concurrency set from Npcases.max_processing_daemons (default: 6):

concurrency = get_npcase_max_processing_daemons(npcase_id)  # Per-case setting
lambda_client.put_function_concurrency(
    FunctionName=doc_loader_lambda,
    ReservedConcurrentExecutions=concurrency,
)

This controls how many documents are processed in parallel per batch.

Event Source MaximumConcurrency

Separate from Lambda reserved concurrency — this caps how many Lambda invocations the event source mapping creates:

event_source_response = lambda_client.create_event_source_mapping(
    ...
    ScalingConfig={"MaximumConcurrency": concurrency},
)

Lambda Configuration Summary

Parameter Document Loader Job Processor Batch Processor
Runtime Python 3.13 Python 3.13 Python 3.13
Memory 256 MB 128 MB (default) 256 MB
Timeout 15 min 15 min 15 min
Reserved Concurrency Per-case (default 6) 40 40
Batch Size 10 1 1
Batching Window 60s None None
Report Failures Yes No (single-record) No (single-record)
RecursiveLoop Allow Allow Allow
VPC Private subnets Private subnets Private subnets
Layer Python packages Python packages Python packages

Global State in Lambda

Lambda reuses the runtime across invocations. Module-level objects persist:

# index.py — global, created once per Lambda cold start
sqs_client = boto3_client("sqs")
# database.py — engine cache persists across invocations
_engines_cache: dict[str, ScopedSession] = {}
_cache_access_times: dict[str, float] = {}
# helpers.py — logger persists across invocations
_logger: Optional[logging.Logger] = None

Rules for global state: - AWS clients (boto3.client) — safe to cache globally - Engine cache — safe with LRU eviction and proper disposal - Logger — safe (Lambda is single-request-at-a-time) - ContextVars (npcase_id, event, batch_context) — must reset in handler's finally block

Key Rules

  1. Match visibility timeout to Lambda timeout — prevents duplicate delivery
  2. Use ReportBatchItemFailures for batch processing — prevents entire-batch retry
  3. Batch size 1 for orchestrators — simplifies error handling and state management
  4. Reserved concurrency on orchestrators — prevents runaway scaling
  5. RecursiveLoop = Allow — required when Lambda re-enqueues to its own queue
  6. Pre-create log groups — Lambda with LoggingConfig requires the group to exist
  7. Idempotent Lambda creation — handle ResourceConflictException gracefully
  8. Reset ContextVars in finally — global state persists across invocations
  9. PermanentFailure NOT in batchItemFailures — send to DLQ directly, SQS auto-deletes
  10. SilentSuccess NOT in batchItemFailures — treated as success, SQS auto-deletes
Ask the Architecture ×

Ask questions about Nextpoint architecture, patterns, rules, or any module. Powered by Claude Opus 4.6.