State Validation and Reconciliation Pattern¶
Purpose¶
Ensure processing state remains consistent when resuming from checkpoints, detect orphaned or stale state, and provide recovery procedures for interrupted pipelines.
Problem¶
Lambda functions can be interrupted at any point — timeout, throttle, OOM, or transient infrastructure failure. When a new Lambda picks up the requeued message, it must:
- Determine where processing left off
- Verify the intermediate state is valid (not corrupted by a partial write)
- Resume safely without duplicating work
State Transition Validation¶
Valid Checkpoint Transitions¶
Checkpoints advance sequentially. A checkpoint can only move forward, never backward:
# core/checkpoints.py
VALID_TRANSITIONS: dict[int, list[int]] = {
Checkpoints.PROCESS_STARTED.value: [Checkpoints.DUPLICATION_HANDLED.value],
Checkpoints.DUPLICATION_HANDLED.value: [Checkpoints.EXHIBIT_CREATED.value],
Checkpoints.EXHIBIT_CREATED.value: [Checkpoints.ATTACHMENTS_CREATED.value],
# ... each checkpoint has exactly one valid successor
}
def validate_transition(current: int, target: int) -> bool:
"""Verify the checkpoint transition is valid."""
valid_targets = VALID_TRANSITIONS.get(current, [])
return target in valid_targets
Guarded Checkpoint Update¶
def update_checkpoint(
session: Session,
checkpoint: ProcessCheckpoints,
target_checkpoint_id: int,
) -> None:
"""Update checkpoint only if the transition is valid."""
if checkpoint.checkpoint_id >= target_checkpoint_id:
# Already at or past this checkpoint — idempotent skip
log_message("info",
f"Checkpoint already at {checkpoint.checkpoint_id}, "
f"skipping update to {target_checkpoint_id}"
)
return
if not validate_transition(checkpoint.checkpoint_id, target_checkpoint_id):
raise PermanentFailureException(
f"Invalid checkpoint transition: "
f"{checkpoint.checkpoint_id} → {target_checkpoint_id}"
)
checkpoint.checkpoint_id = target_checkpoint_id
session.flush()
Orphan Detection¶
What Creates Orphans¶
Orphaned state occurs when: - Lambda times out mid-checkpoint and the SQS message expires (exceeds maxReceiveCount) - A batch is cancelled but some documents are mid-processing - DLQ messages are never redriven
Detecting Orphaned Checkpoints¶
Checkpoints that haven't progressed within a grace period are likely orphaned:
from datetime import datetime, timedelta, timezone
ORPHAN_GRACE_PERIOD = timedelta(hours=2)
def find_orphaned_checkpoints(
session: Session,
npcase_id: int,
batch_id: int,
) -> list[ProcessCheckpoints]:
"""Find checkpoints stuck in intermediate state past the grace period."""
cutoff = datetime.now(timezone.utc) - ORPHAN_GRACE_PERIOD
return (
session.query(ProcessCheckpoints)
.filter(
ProcessCheckpoints.npcase_id == npcase_id,
ProcessCheckpoints.batch_id == batch_id,
ProcessCheckpoints.checkpoint_id > Checkpoints.PROCESS_STARTED.value,
ProcessCheckpoints.checkpoint_id < Checkpoints.PROCESS_COMPLETE.value,
ProcessCheckpoints.updated_at < cutoff,
)
.all()
)
Orphan Resolution Strategies¶
| Strategy | When to Use | Implementation |
|---|---|---|
| Re-enqueue | Document can be safely reprocessed | Publish event to trigger reprocessing from current checkpoint |
| Mark failed | Document is in unrecoverable state | Set checkpoint to a FAILED sentinel, report via SNS WARNING |
| Manual review | Ambiguous state, data integrity risk | Add to DLQ with context, alert operations team |
def resolve_orphaned_checkpoint(
session: Session,
checkpoint: ProcessCheckpoints,
strategy: str = "re-enqueue",
) -> None:
"""Resolve an orphaned checkpoint using the specified strategy."""
if strategy == "re-enqueue":
# Re-publish the document event — Lambda will resume from current checkpoint
SNS.publish_sns_message({
"eventType": EventType.DOCUMENT_PROCESSED.value,
"documentId": checkpoint.nge_document_id,
"eventDetail": json.dumps({"orphan_recovery": True}),
})
elif strategy == "mark_failed":
checkpoint.checkpoint_id = CHECKPOINT_FAILED_SENTINEL
session.flush()
SNS.publish_sns_message({
"eventType": "WARNING",
"eventDetail": json.dumps({
"activity": "orphan_resolution",
"message": f"Document {checkpoint.nge_document_id} marked as failed",
}),
})
Intermediate State Verification¶
When resuming from a checkpoint, verify that the previous step's output actually exists before proceeding.
Verify-Before-Continue Pattern¶
class DocumentProcessor:
def _resume_from_checkpoint(self, session: Session) -> None:
"""Verify intermediate state before resuming."""
current = Checkpoints(self.checkpoint.checkpoint_id)
if current.value >= Checkpoints.EXHIBIT_CREATED.value:
# Exhibit should exist if we're past this checkpoint
exhibit_ids = self.checkpoint.exhibit_ids
if exhibit_ids:
ids = [int(x) for x in exhibit_ids.split(",")]
existing = (
session.query(Exhibits.id)
.filter(Exhibits.id.in_(ids))
.all()
)
if len(existing) != len(ids):
log_message("warning",
f"Expected {len(ids)} exhibits, found {len(existing)}. "
f"Rolling back to EXHIBIT_CREATED checkpoint."
)
self.checkpoint.checkpoint_id = Checkpoints.DUPLICATION_HANDLED.value
session.flush()
return
Verification Checklist by Checkpoint¶
| Checkpoint | Verify |
|---|---|
EXHIBIT_CREATED |
Exhibit rows exist in database |
ATTACHMENTS_CREATED |
Attachment rows exist, page count matches |
TAGGINGS_CREATED |
Tag rows exist for exhibit |
ES_INDEXING_COMPLETE |
Document exists in ES index |
Batch-Level Reconciliation¶
After all documents in a batch are processed (or timed out), reconcile
the batch state before publishing BATCH_END_FINISHED.
def reconcile_batch(
session: Session,
npcase_id: int,
batch_id: int,
) -> dict:
"""Reconcile batch state: count completed, failed, and orphaned."""
total = get_total_documents(session, npcase_id, batch_id)
completed = (
session.query(ProcessCheckpoints)
.filter(
ProcessCheckpoints.npcase_id == npcase_id,
ProcessCheckpoints.batch_id == batch_id,
ProcessCheckpoints.checkpoint_id == Checkpoints.PROCESS_COMPLETE.value,
)
.count()
)
orphaned = find_orphaned_checkpoints(session, npcase_id, batch_id)
return {
"total": total,
"completed": completed,
"orphaned": len(orphaned),
"failed": total - completed - len(orphaned),
}
Key Rules¶
- Checkpoints only move forward — never decrement a checkpoint value
- Validate transitions — reject invalid jumps (e.g., STARTED → ES_INDEXING_COMPLETE)
- Verify intermediate state on resume — don't assume the previous step's output exists
- Orphan detection uses time-based grace period — 2 hours default for Lambda-based processing
- Reconcile before batch completion — count completed vs. expected before
BATCH_END_FINISHED - Log orphan resolution — every orphan recovery action must be logged for audit
- Don't block the batch forever — accept partial failure rather than infinite retry
- Idempotent recovery — re-enqueued documents resume from their current checkpoint, not from scratch
Ask questions about Nextpoint architecture, patterns, rules, or any module. Powered by Claude Opus 4.6.