Skip to content

Idempotent Handler Pattern

Purpose

Every event handler must produce the same result whether it runs once or multiple times for the same event. SNS/SQS guarantees at-least-once delivery, so duplicate processing is expected, not exceptional.

Strategy 1: Checkpoint-Based Idempotency

For document processing pipelines, the checkpoint acts as the idempotency key:

def load_document(session, import_attributes):
    checkpoint = get_checkpoint(session, npcase_id, batch_id, document_id)

    if checkpoint and checkpoint.checkpoint_id == Checkpoints.PROCESS_COMPLETE.value:
        # Already processed — idempotent skip
        log_message("info", f"Document {document_id} already processed")
        return

    if checkpoint:
        # Partially processed — resume from checkpoint (also idempotent)
        DocumentProcessor(checkpoint, import_attributes, initial=False).run()
    else:
        # First attempt
        new_checkpoint = create_checkpoint(...)
        if new_checkpoint:
            DocumentProcessor(new_checkpoint, import_attributes, initial=True).run()
        else:
            raise RecoverableException(document_id, "Another Lambda is processing")

Strategy 2: Database Unique Constraints

Use composite unique keys to prevent duplicate records:

# Deduplication table
class DocDedupe(Base):
    __tablename__ = "doc_dedupe"
    npcase_id = Column(Integer, primary_key=True)
    message_id = Column(String(255), primary_key=True)
    bcc = Column(String(255), primary_key=True)
    md5 = Column(String(32), primary_key=True)
    doc_type = Column(String(50), primary_key=True)

# Insert-or-skip pattern
try:
    session.add(DocDedupe(**dedup_key))
    session.flush()
except IntegrityError:
    session.rollback()
    # Duplicate — already exists, skip processing

Strategy 3: Hash-Based Deduplication

For content that may exceed column size limits:

class TagDedupe(Base):
    __tablename__ = "tag_dedupe"
    npcase_id = Column(Integer, primary_key=True)
    name = Column(String(255), primary_key=True)        # SHA256 of full content
    custom_field_id = Column(Integer, primary_key=True)

# Hash long content to fit in PK column
import hashlib
tag_hash = hashlib.sha256(full_tag_value.encode()).hexdigest()

Strategy 4: DLQ Redrive Markers

Messages redriven from DLQ carry markers to prevent infinite loops:

def process_event(message: dict):
    if message.get("dlq_redriven") and message["eventDetail"].get("dlq_final_pass"):
        # Final redrive pass — if this fails, stop silently
        try:
            _process(message)
        except Exception:
            raise SilentSuccessException("DLQ final pass — accepting failure")
    else:
        _process(message)

Rules

  1. Check before write — always verify if the operation has already been performed
  2. Use database constraints — they're the ultimate safety net against race conditions
  3. Composite keys for deduplication — single-field keys are rarely sufficient
  4. Log duplicates as info, not warning — they're expected behavior
  5. DLQ redrive must terminate — max redrive count prevents infinite loops
  6. Never assume exactly-once — design every handler as if it will run twice
Ask the Architecture ×

Ask questions about Nextpoint architecture, patterns, rules, or any module. Powered by Claude Opus 4.6.