Skip to content

Retry and Resilience Patterns

Purpose

Handle transient failures gracefully across database operations, SQS message processing, and external service calls.

Pattern 1: Database Conflict Retry (@retry_on_db_conflict)

MySQL deadlocks and lock wait timeouts are expected under concurrent writes. Retry with exponential backoff instead of failing immediately.

# core/utils/db_transaction.py

import functools
import time
from sqlalchemy.exc import OperationalError

def retry_on_db_conflict(max_retries: int = 3, base_delay: float = 0.1):
    """Retry on MySQL deadlock (1213) and lock wait timeout (1205)."""

    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            session = args[0] if args else kwargs.get("session")
            last_exception = None

            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except OperationalError as e:
                    error_code = e.orig.args[0] if e.orig else None
                    if error_code in (1213, 1205):  # Deadlock, Lock wait
                        last_exception = e
                        if session:
                            session.rollback()
                        delay = base_delay * (2 ** attempt)
                        time.sleep(delay)
                    else:
                        raise  # Non-retryable MySQL error

            raise last_exception  # Exhausted retries

        return wrapper
    return decorator

Usage

@retry_on_db_conflict(max_retries=3, base_delay=0.1)
def create_exhibit(session, exhibit_data):
    """Retries automatically on deadlock."""
    exhibit = Exhibits(**exhibit_data)
    session.add(exhibit)
    session.flush()
    return exhibit

Pattern 2: SQS Exponential Backoff Requeue

When a message fails with a RecoverableException, requeue it with increasing delay.

# shell/utils/sqs_ops.py

def handle_requeue_exception(record: dict, exception: RecoverableException) -> None:
    """Requeue message with exponential backoff."""
    retry_count = exception.retry_count or 0

    if retry_count >= Config.MAX_RETRIES:
        log_message("error", f"Max retries ({Config.MAX_RETRIES}) exceeded")
        send_to_dlq(record)
        return

    # Calculate delay: 120s → 240s → 480s → 900s (capped)
    base_delay = Config.SQS_BASE_DELAY      # 120 seconds
    max_delay = Config.SQS_MAX_DELAY         # 900 seconds
    delay = min(base_delay * (2 ** retry_count), max_delay)

    # Update retry count in message
    message = _parse_message(record)
    message["eventDetail"]["retry_count"] = retry_count + 1

    # Send to back of queue with delay
    sqs_client.send_message(
        QueueUrl=queue_url,
        MessageBody=json.dumps(message),
        DelaySeconds=int(delay),
    )

Backoff Sequence

Attempt 0: 120s delay (2 minutes)
Attempt 1: 240s delay (4 minutes)
Attempt 2: 480s delay (8 minutes)
Attempt 3: 900s delay (15 minutes — capped)

Pattern 3: DLQ Redrive

Messages that exhaust retries land in the Dead Letter Queue. A redrive mechanism moves them back for reprocessing after the transient issue is resolved.

def redrive_dlq_messages(npcase_id: str, batch_id: str,
                          final_pass: bool = False) -> int:
    """Move all DLQ messages back to the main queue."""
    redriven = 0

    while True:
        response = sqs_client.receive_message(
            QueueUrl=dlq_url,
            MaxNumberOfMessages=10,
        )
        messages = response.get("Messages", [])
        if not messages:
            break

        for msg in messages:
            content = json.loads(msg["Body"])
            content["dlq_redriven"] = True
            content["eventDetail"]["dlq_final_pass"] = final_pass
            content["eventDetail"]["retry_count"] = 0  # Reset retries

            sqs_client.send_message(
                QueueUrl=main_queue_url,
                MessageBody=json.dumps(content),
            )
            sqs_client.delete_message(
                QueueUrl=dlq_url,
                ReceiptHandle=msg["ReceiptHandle"],
            )
            redriven += 1

    return redriven

Redrive Policy

  • Maximum redrives: 2 (configurable via SQS_MAX_DLQ_REDRIVES)
  • After max redrives, proceed with batch completion (accept partial failure)
  • Final pass messages that fail again raise SilentSuccessException (stop retrying)

Pattern 4: Visibility Timeout for Database Errors

MySQL operational errors (not deadlocks) get a cooling period:

# In handler exception routing
elif isinstance(e, OperationalError):
    batch_item_failures.append({"itemIdentifier": message_id})
    change_message_visibility(record, 120)  # 2-minute timeout

This delays retry without requeuing — the message becomes visible again after the timeout, giving the database time to recover.

Summary

Failure Type Strategy Delay Pattern
MySQL deadlock/lock wait @retry_on_db_conflict 0.1s → 0.2s → 0.4s
Transient processing error SQS requeue 120s → 240s → 480s → 900s
Exhausted retries Send to DLQ N/A
DLQ accumulation Redrive (max 2x) Immediate requeue
MySQL operational error Visibility timeout 120s fixed
Ask the Architecture ×

Ask questions about Nextpoint architecture, patterns, rules, or any module. Powered by Claude Opus 4.6.