Skip to content

SQS Operations Pattern

Purpose

Standardize SQS message handling across all modules: double-encoded envelope parsing, queue naming, exponential backoff requeue, visibility timeout manipulation, queue depth monitoring, and DLQ management.

Double-Encoded Envelope Parsing

SNS messages delivered to SQS are double-encoded. The actual domain event is nested inside an SNS envelope inside the SQS body.

SQS Record
└── body (JSON string)
    └── { "Message": (JSON string) }
        └── { "caseId": 1001, "eventType": "DOCUMENT_PROCESSED", ... }
def preprocess_record(record):
    """Parse the nested SNS message from an SQS record."""
    # Set logging context
    set_event(record)

    # Unwrap: SQS body → SNS envelope → actual message
    body = json.loads(record.get("body", "{}"))
    if not body or "Message" not in body:
        raise KeyError("Missing 'Message' key in record body")

    message = json.loads(body["Message"])

    # Configure logging context from message fields
    configure_log_context(
        caseId=str(message.get("caseId", "")),
        batchId=str(message.get("batchId", "")),
        jobId=str(message.get("jobId", "")),
        documentId=str(message.get("documentId", "")),
        exhibitIds=message.get("exhibitIds", []),
    )

    # Initialize SNS publisher and set case context
    SNS(message)
    set_npcase(str(message.get("caseId")))

    return message

Queue and Lambda Naming

All per-batch resources follow a consistent naming pattern:

STACK_NAME_PREFIX = os.environ.get("STACK_NAME_PREFIX", "")

def doc_loader_lambda_name(case_id, batch_id):
    return f"{STACK_NAME_PREFIX}_doc_loader_{case_id}_{batch_id}"

def doc_loader_queue_name(env, case_id, batch_id):
    return f"{STACK_NAME_PREFIX}_doc_loader_{case_id}_{batch_id}"

# DLQ name is always: {queue_name}_dlq

Queue URL Construction

account_id = get_account_id()
deploy_region = Config.DEPLOY_REGION
queue_name = doc_loader_queue_name(env, case_id, batch_id)

queue_url = f"https://sqs.{deploy_region}.amazonaws.com/{account_id}/{queue_name}"
dlq_url = f"{queue_url}_dlq"

Exponential Backoff Requeue

When a RecoverableException occurs, requeue the message with increasing delay:

def handle_requeue_exception(record, rq_ex):
    """Requeue with exponential backoff, preserving the SNS envelope."""

    # 1. Parse the full nested structure
    body = json.loads(record.get("body", "{}"))
    message = json.loads(body["Message"])
    event_detail = json.loads(message.get("eventDetail", "{}"))

    # 2. Update retry metadata in event_detail
    event_detail.update({
        "retry_count": rq_ex.retry_count,
        "prev_page_count": rq_ex.prev_page_count,
    })

    # 3. Re-encode the full nested structure
    message["eventDetail"] = json.dumps(event_detail)
    body["Message"] = json.dumps(message)
    final_body = json.dumps(body)

    # 4. Calculate exponential delay
    retry_count = rq_ex.retry_count or 0
    calculated_delay = Config.SQS_BASE_DELAY * (2 ** retry_count)
    delay = min(calculated_delay, Config.SQS_MAX_DELAY)

    # Sequence: 120s → 240s → 480s → 900s (capped)

    # 5. Send to queue with delay
    requeue_sqs_message(queue_url, final_body, delay_seconds=delay)

    # 6. Delete original message to prevent duplicate processing
    delete_original_message(record, queue_url)

Critical Detail: Preserve the Envelope

When requeuing, the full SNS envelope structure must be preserved. The document loader's _parse_record() expects the double-encoded format. If you flatten the message during requeue, parsing breaks on retry.

Sending Messages (New Events)

When publishing a new SQS message (not requeuing), wrap in the SNS envelope:

def send_sqs_message(queue_url, message, delay_seconds=0):
    """Send a new message wrapped in SNS envelope format."""
    client = boto3_client("sqs", region_name=Config.DEPLOY_REGION)
    client.send_message(
        QueueUrl=queue_url,
        MessageBody=json.dumps({"Message": json.dumps(message)}),
        DelaySeconds=int(delay_seconds),
    )

Queue Depth Monitoring

Check all three SQS message states to get the true message count:

def queue_messages_count(queue_url):
    """Total messages = visible + delayed + in-flight."""
    res = sqs_client.get_queue_attributes(
        QueueUrl=queue_url,
        AttributeNames=[
            "ApproximateNumberOfMessages",          # Ready to receive
            "ApproximateNumberOfMessagesDelayed",    # Delay timer active
            "ApproximateNumberOfMessagesNotVisible", # Being processed
        ],
    )
    attrs = res["Attributes"]
    return (
        int(attrs["ApproximateNumberOfMessages"])
        + int(attrs["ApproximateNumberOfMessagesDelayed"])
        + int(attrs["ApproximateNumberOfMessagesNotVisible"])
    )

Why all three? A message in "delayed" state won't show in "visible" count but is still in the queue. A message being processed is "not visible" but shouldn't be considered complete. Missing any state causes premature completion.

Visibility Timeout Manipulation

For slow operations (e.g., MySQL operational errors), extend the visibility timeout so the message isn't retried too quickly:

def change_message_visibility(message, receipt_handle, timeout_seconds):
    """Extend visibility timeout for messages needing more time."""
    sqs_client.change_message_visibility(
        QueueUrl=queue_url,
        ReceiptHandle=receipt_handle,
        VisibilityTimeout=timeout_seconds,  # Typically 120s for DB errors
    )

Direct-to-DLQ Routing

For PermanentFailureException, bypass the standard maxReceiveCount mechanism and send directly to DLQ:

def send_to_dlq(record, sqs_client=None):
    """Send message directly to DLQ, bypassing maxReceiveCount bouncing."""
    message = _parse_message_from_record(record)
    dlq_url = f".../{queue_name}_dlq"

    client = sqs_client or boto3_client("sqs")
    # Preserve original record body (with SNS envelope) in DLQ
    client.send_message(QueueUrl=dlq_url, MessageBody=record.get("body", "{}"))

DLQ Redrive

Move failed messages from DLQ back to the main queue for reprocessing:

def redrive_dlq_messages(npcase_id, batch_id, final_pass=False):
    """Redrive all DLQ messages with markers for tracking."""
    total_redriven = 0
    empty_receives = 0

    while True:
        # Long polling only after first empty response (drain stragglers)
        wait_time = 5 if empty_receives > 0 else 0
        response = sqs_client.receive_message(
            QueueUrl=dlq_url, MaxNumberOfMessages=10, WaitTimeSeconds=wait_time
        )
        messages = response.get("Messages", [])

        if not messages:
            empty_receives += 1
            if empty_receives >= 3:  # 3 empty receives = DLQ is drained
                break
            continue

        empty_receives = 0
        for msg in messages:
            # Add redrive markers
            body = json.loads(msg["Body"])
            message_content = json.loads(body["Message"])
            message_content["dlq_redriven"] = True

            event_detail = json.loads(message_content["eventDetail"])
            event_detail["retry_count"] = 0               # Reset retries
            event_detail["dlq_redrive_count"] = count + 1  # Track redrive passes
            event_detail["dlq_final_pass"] = final_pass    # Signal last chance

            # Re-encode and send to main queue
            sqs_client.send_message(QueueUrl=main_queue_url, MessageBody=modified_body)
            sqs_client.delete_message(QueueUrl=dlq_url, ReceiptHandle=msg["ReceiptHandle"])
            total_redriven += 1

eventDetail Parsing

eventDetail can be a string, dict, or JSON string. Always parse defensively:

def parse_event_detail(raw_detail):
    """Normalize eventDetail to a dict regardless of input type."""
    if raw_detail is None:
        return {}
    if isinstance(raw_detail, dict):
        return raw_detail
    if isinstance(raw_detail, str):
        try:
            return json.loads(raw_detail) if raw_detail.strip() else {}
        except json.JSONDecodeError:
            return {}
    return {}

Key Rules

  1. Always preserve the SNS envelope when requeuing — parsing depends on it
  2. Check all 3 message states for queue depth — visible + delayed + in-flight
  3. Long-poll only after empty response during DLQ redrive to drain stragglers
  4. Reset retry_count during DLQ redrive — give messages fresh retry budget
  5. Delete original after requeue — prevents duplicate processing
  6. Parse eventDetail defensively — it can be string, dict, or None
Ask the Architecture ×

Ask questions about Nextpoint architecture, patterns, rules, or any module. Powered by Claude Opus 4.6.