Idempotent Handler Pattern¶
Purpose¶
Every event handler must produce the same result whether it runs once or multiple times for the same event. SNS/SQS guarantees at-least-once delivery, so duplicate processing is expected, not exceptional.
Strategy 1: Checkpoint-Based Idempotency¶
For document processing pipelines, the checkpoint acts as the idempotency key:
def load_document(session, import_attributes):
checkpoint = get_checkpoint(session, npcase_id, batch_id, document_id)
if checkpoint and checkpoint.checkpoint_id == Checkpoints.PROCESS_COMPLETE.value:
# Already processed — idempotent skip
log_message("info", f"Document {document_id} already processed")
return
if checkpoint:
# Partially processed — resume from checkpoint (also idempotent)
DocumentProcessor(checkpoint, import_attributes, initial=False).run()
else:
# First attempt
new_checkpoint = create_checkpoint(...)
if new_checkpoint:
DocumentProcessor(new_checkpoint, import_attributes, initial=True).run()
else:
raise RecoverableException(document_id, "Another Lambda is processing")
Strategy 2: Database Unique Constraints¶
Use composite unique keys to prevent duplicate records:
# Deduplication table
class DocDedupe(Base):
__tablename__ = "doc_dedupe"
npcase_id = Column(Integer, primary_key=True)
message_id = Column(String(255), primary_key=True)
bcc = Column(String(255), primary_key=True)
md5 = Column(String(32), primary_key=True)
doc_type = Column(String(50), primary_key=True)
# Insert-or-skip pattern
try:
session.add(DocDedupe(**dedup_key))
session.flush()
except IntegrityError:
session.rollback()
# Duplicate — already exists, skip processing
Strategy 3: Hash-Based Deduplication¶
For content that may exceed column size limits:
class TagDedupe(Base):
__tablename__ = "tag_dedupe"
npcase_id = Column(Integer, primary_key=True)
name = Column(String(255), primary_key=True) # SHA256 of full content
custom_field_id = Column(Integer, primary_key=True)
# Hash long content to fit in PK column
import hashlib
tag_hash = hashlib.sha256(full_tag_value.encode()).hexdigest()
Strategy 4: DLQ Redrive Markers¶
Messages redriven from DLQ carry markers to prevent infinite loops:
def process_event(message: dict):
if message.get("dlq_redriven") and message["eventDetail"].get("dlq_final_pass"):
# Final redrive pass — if this fails, stop silently
try:
_process(message)
except Exception:
raise SilentSuccessException("DLQ final pass — accepting failure")
else:
_process(message)
Rules¶
- Check before write — always verify if the operation has already been performed
- Use database constraints — they're the ultimate safety net against race conditions
- Composite keys for deduplication — single-field keys are rarely sufficient
- Log duplicates as info, not warning — they're expected behavior
- DLQ redrive must terminate — max redrive count prevents infinite loops
- Never assume exactly-once — design every handler as if it will run twice
Ask the Architecture
×
Ask questions about Nextpoint architecture, patterns, rules, or any module. Powered by Claude Opus 4.6.