SQS Operations Pattern¶
Purpose¶
Standardize SQS message handling across all modules: double-encoded envelope parsing, queue naming, exponential backoff requeue, visibility timeout manipulation, queue depth monitoring, and DLQ management.
Double-Encoded Envelope Parsing¶
SNS messages delivered to SQS are double-encoded. The actual domain event is nested inside an SNS envelope inside the SQS body.
SQS Record
└── body (JSON string)
└── { "Message": (JSON string) }
└── { "caseId": 1001, "eventType": "DOCUMENT_PROCESSED", ... }
def preprocess_record(record):
"""Parse the nested SNS message from an SQS record."""
# Set logging context
set_event(record)
# Unwrap: SQS body → SNS envelope → actual message
body = json.loads(record.get("body", "{}"))
if not body or "Message" not in body:
raise KeyError("Missing 'Message' key in record body")
message = json.loads(body["Message"])
# Configure logging context from message fields
configure_log_context(
caseId=str(message.get("caseId", "")),
batchId=str(message.get("batchId", "")),
jobId=str(message.get("jobId", "")),
documentId=str(message.get("documentId", "")),
exhibitIds=message.get("exhibitIds", []),
)
# Initialize SNS publisher and set case context
SNS(message)
set_npcase(str(message.get("caseId")))
return message
Queue and Lambda Naming¶
All per-batch resources follow a consistent naming pattern:
STACK_NAME_PREFIX = os.environ.get("STACK_NAME_PREFIX", "")
def doc_loader_lambda_name(case_id, batch_id):
return f"{STACK_NAME_PREFIX}_doc_loader_{case_id}_{batch_id}"
def doc_loader_queue_name(env, case_id, batch_id):
return f"{STACK_NAME_PREFIX}_doc_loader_{case_id}_{batch_id}"
# DLQ name is always: {queue_name}_dlq
Queue URL Construction¶
account_id = get_account_id()
deploy_region = Config.DEPLOY_REGION
queue_name = doc_loader_queue_name(env, case_id, batch_id)
queue_url = f"https://sqs.{deploy_region}.amazonaws.com/{account_id}/{queue_name}"
dlq_url = f"{queue_url}_dlq"
Exponential Backoff Requeue¶
When a RecoverableException occurs, requeue the message with increasing delay:
def handle_requeue_exception(record, rq_ex):
"""Requeue with exponential backoff, preserving the SNS envelope."""
# 1. Parse the full nested structure
body = json.loads(record.get("body", "{}"))
message = json.loads(body["Message"])
event_detail = json.loads(message.get("eventDetail", "{}"))
# 2. Update retry metadata in event_detail
event_detail.update({
"retry_count": rq_ex.retry_count,
"prev_page_count": rq_ex.prev_page_count,
})
# 3. Re-encode the full nested structure
message["eventDetail"] = json.dumps(event_detail)
body["Message"] = json.dumps(message)
final_body = json.dumps(body)
# 4. Calculate exponential delay
retry_count = rq_ex.retry_count or 0
calculated_delay = Config.SQS_BASE_DELAY * (2 ** retry_count)
delay = min(calculated_delay, Config.SQS_MAX_DELAY)
# Sequence: 120s → 240s → 480s → 900s (capped)
# 5. Send to queue with delay
requeue_sqs_message(queue_url, final_body, delay_seconds=delay)
# 6. Delete original message to prevent duplicate processing
delete_original_message(record, queue_url)
Critical Detail: Preserve the Envelope¶
When requeuing, the full SNS envelope structure must be preserved. The document
loader's _parse_record() expects the double-encoded format. If you flatten
the message during requeue, parsing breaks on retry.
Sending Messages (New Events)¶
When publishing a new SQS message (not requeuing), wrap in the SNS envelope:
def send_sqs_message(queue_url, message, delay_seconds=0):
"""Send a new message wrapped in SNS envelope format."""
client = boto3_client("sqs", region_name=Config.DEPLOY_REGION)
client.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps({"Message": json.dumps(message)}),
DelaySeconds=int(delay_seconds),
)
Queue Depth Monitoring¶
Check all three SQS message states to get the true message count:
def queue_messages_count(queue_url):
"""Total messages = visible + delayed + in-flight."""
res = sqs_client.get_queue_attributes(
QueueUrl=queue_url,
AttributeNames=[
"ApproximateNumberOfMessages", # Ready to receive
"ApproximateNumberOfMessagesDelayed", # Delay timer active
"ApproximateNumberOfMessagesNotVisible", # Being processed
],
)
attrs = res["Attributes"]
return (
int(attrs["ApproximateNumberOfMessages"])
+ int(attrs["ApproximateNumberOfMessagesDelayed"])
+ int(attrs["ApproximateNumberOfMessagesNotVisible"])
)
Why all three? A message in "delayed" state won't show in "visible" count but is still in the queue. A message being processed is "not visible" but shouldn't be considered complete. Missing any state causes premature completion.
Visibility Timeout Manipulation¶
For slow operations (e.g., MySQL operational errors), extend the visibility timeout so the message isn't retried too quickly:
def change_message_visibility(message, receipt_handle, timeout_seconds):
"""Extend visibility timeout for messages needing more time."""
sqs_client.change_message_visibility(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle,
VisibilityTimeout=timeout_seconds, # Typically 120s for DB errors
)
Direct-to-DLQ Routing¶
For PermanentFailureException, bypass the standard maxReceiveCount mechanism and send directly to DLQ:
def send_to_dlq(record, sqs_client=None):
"""Send message directly to DLQ, bypassing maxReceiveCount bouncing."""
message = _parse_message_from_record(record)
dlq_url = f".../{queue_name}_dlq"
client = sqs_client or boto3_client("sqs")
# Preserve original record body (with SNS envelope) in DLQ
client.send_message(QueueUrl=dlq_url, MessageBody=record.get("body", "{}"))
DLQ Redrive¶
Move failed messages from DLQ back to the main queue for reprocessing:
def redrive_dlq_messages(npcase_id, batch_id, final_pass=False):
"""Redrive all DLQ messages with markers for tracking."""
total_redriven = 0
empty_receives = 0
while True:
# Long polling only after first empty response (drain stragglers)
wait_time = 5 if empty_receives > 0 else 0
response = sqs_client.receive_message(
QueueUrl=dlq_url, MaxNumberOfMessages=10, WaitTimeSeconds=wait_time
)
messages = response.get("Messages", [])
if not messages:
empty_receives += 1
if empty_receives >= 3: # 3 empty receives = DLQ is drained
break
continue
empty_receives = 0
for msg in messages:
# Add redrive markers
body = json.loads(msg["Body"])
message_content = json.loads(body["Message"])
message_content["dlq_redriven"] = True
event_detail = json.loads(message_content["eventDetail"])
event_detail["retry_count"] = 0 # Reset retries
event_detail["dlq_redrive_count"] = count + 1 # Track redrive passes
event_detail["dlq_final_pass"] = final_pass # Signal last chance
# Re-encode and send to main queue
sqs_client.send_message(QueueUrl=main_queue_url, MessageBody=modified_body)
sqs_client.delete_message(QueueUrl=dlq_url, ReceiptHandle=msg["ReceiptHandle"])
total_redriven += 1
eventDetail Parsing¶
eventDetail can be a string, dict, or JSON string. Always parse defensively:
def parse_event_detail(raw_detail):
"""Normalize eventDetail to a dict regardless of input type."""
if raw_detail is None:
return {}
if isinstance(raw_detail, dict):
return raw_detail
if isinstance(raw_detail, str):
try:
return json.loads(raw_detail) if raw_detail.strip() else {}
except json.JSONDecodeError:
return {}
return {}
Key Rules¶
- Always preserve the SNS envelope when requeuing — parsing depends on it
- Check all 3 message states for queue depth — visible + delayed + in-flight
- Long-poll only after empty response during DLQ redrive to drain stragglers
- Reset retry_count during DLQ redrive — give messages fresh retry budget
- Delete original after requeue — prevents duplicate processing
- Parse eventDetail defensively — it can be string, dict, or None
Ask questions about Nextpoint architecture, patterns, rules, or any module. Powered by Claude Opus 4.6.