Skip to content

State Validation and Reconciliation Pattern

Purpose

Ensure processing state remains consistent when resuming from checkpoints, detect orphaned or stale state, and provide recovery procedures for interrupted pipelines.

Problem

Lambda functions can be interrupted at any point — timeout, throttle, OOM, or transient infrastructure failure. When a new Lambda picks up the requeued message, it must:

  1. Determine where processing left off
  2. Verify the intermediate state is valid (not corrupted by a partial write)
  3. Resume safely without duplicating work

State Transition Validation

Valid Checkpoint Transitions

Checkpoints advance sequentially. A checkpoint can only move forward, never backward:

# core/checkpoints.py

VALID_TRANSITIONS: dict[int, list[int]] = {
    Checkpoints.PROCESS_STARTED.value: [Checkpoints.DUPLICATION_HANDLED.value],
    Checkpoints.DUPLICATION_HANDLED.value: [Checkpoints.EXHIBIT_CREATED.value],
    Checkpoints.EXHIBIT_CREATED.value: [Checkpoints.ATTACHMENTS_CREATED.value],
    # ... each checkpoint has exactly one valid successor
}


def validate_transition(current: int, target: int) -> bool:
    """Verify the checkpoint transition is valid."""
    valid_targets = VALID_TRANSITIONS.get(current, [])
    return target in valid_targets

Guarded Checkpoint Update

def update_checkpoint(
    session: Session,
    checkpoint: ProcessCheckpoints,
    target_checkpoint_id: int,
) -> None:
    """Update checkpoint only if the transition is valid."""
    if checkpoint.checkpoint_id >= target_checkpoint_id:
        # Already at or past this checkpoint — idempotent skip
        log_message("info",
            f"Checkpoint already at {checkpoint.checkpoint_id}, "
            f"skipping update to {target_checkpoint_id}"
        )
        return

    if not validate_transition(checkpoint.checkpoint_id, target_checkpoint_id):
        raise PermanentFailureException(
            f"Invalid checkpoint transition: "
            f"{checkpoint.checkpoint_id}{target_checkpoint_id}"
        )

    checkpoint.checkpoint_id = target_checkpoint_id
    session.flush()

Orphan Detection

What Creates Orphans

Orphaned state occurs when: - Lambda times out mid-checkpoint and the SQS message expires (exceeds maxReceiveCount) - A batch is cancelled but some documents are mid-processing - DLQ messages are never redriven

Detecting Orphaned Checkpoints

Checkpoints that haven't progressed within a grace period are likely orphaned:

from datetime import datetime, timedelta, timezone

ORPHAN_GRACE_PERIOD = timedelta(hours=2)


def find_orphaned_checkpoints(
    session: Session,
    npcase_id: int,
    batch_id: int,
) -> list[ProcessCheckpoints]:
    """Find checkpoints stuck in intermediate state past the grace period."""
    cutoff = datetime.now(timezone.utc) - ORPHAN_GRACE_PERIOD

    return (
        session.query(ProcessCheckpoints)
        .filter(
            ProcessCheckpoints.npcase_id == npcase_id,
            ProcessCheckpoints.batch_id == batch_id,
            ProcessCheckpoints.checkpoint_id > Checkpoints.PROCESS_STARTED.value,
            ProcessCheckpoints.checkpoint_id < Checkpoints.PROCESS_COMPLETE.value,
            ProcessCheckpoints.updated_at < cutoff,
        )
        .all()
    )

Orphan Resolution Strategies

Strategy When to Use Implementation
Re-enqueue Document can be safely reprocessed Publish event to trigger reprocessing from current checkpoint
Mark failed Document is in unrecoverable state Set checkpoint to a FAILED sentinel, report via SNS WARNING
Manual review Ambiguous state, data integrity risk Add to DLQ with context, alert operations team
def resolve_orphaned_checkpoint(
    session: Session,
    checkpoint: ProcessCheckpoints,
    strategy: str = "re-enqueue",
) -> None:
    """Resolve an orphaned checkpoint using the specified strategy."""
    if strategy == "re-enqueue":
        # Re-publish the document event — Lambda will resume from current checkpoint
        SNS.publish_sns_message({
            "eventType": EventType.DOCUMENT_PROCESSED.value,
            "documentId": checkpoint.nge_document_id,
            "eventDetail": json.dumps({"orphan_recovery": True}),
        })
    elif strategy == "mark_failed":
        checkpoint.checkpoint_id = CHECKPOINT_FAILED_SENTINEL
        session.flush()
        SNS.publish_sns_message({
            "eventType": "WARNING",
            "eventDetail": json.dumps({
                "activity": "orphan_resolution",
                "message": f"Document {checkpoint.nge_document_id} marked as failed",
            }),
        })

Intermediate State Verification

When resuming from a checkpoint, verify that the previous step's output actually exists before proceeding.

Verify-Before-Continue Pattern

class DocumentProcessor:
    def _resume_from_checkpoint(self, session: Session) -> None:
        """Verify intermediate state before resuming."""
        current = Checkpoints(self.checkpoint.checkpoint_id)

        if current.value >= Checkpoints.EXHIBIT_CREATED.value:
            # Exhibit should exist if we're past this checkpoint
            exhibit_ids = self.checkpoint.exhibit_ids
            if exhibit_ids:
                ids = [int(x) for x in exhibit_ids.split(",")]
                existing = (
                    session.query(Exhibits.id)
                    .filter(Exhibits.id.in_(ids))
                    .all()
                )
                if len(existing) != len(ids):
                    log_message("warning",
                        f"Expected {len(ids)} exhibits, found {len(existing)}. "
                        f"Rolling back to EXHIBIT_CREATED checkpoint."
                    )
                    self.checkpoint.checkpoint_id = Checkpoints.DUPLICATION_HANDLED.value
                    session.flush()
                    return

Verification Checklist by Checkpoint

Checkpoint Verify
EXHIBIT_CREATED Exhibit rows exist in database
ATTACHMENTS_CREATED Attachment rows exist, page count matches
TAGGINGS_CREATED Tag rows exist for exhibit
ES_INDEXING_COMPLETE Document exists in ES index

Batch-Level Reconciliation

After all documents in a batch are processed (or timed out), reconcile the batch state before publishing BATCH_END_FINISHED.

def reconcile_batch(
    session: Session,
    npcase_id: int,
    batch_id: int,
) -> dict:
    """Reconcile batch state: count completed, failed, and orphaned."""
    total = get_total_documents(session, npcase_id, batch_id)

    completed = (
        session.query(ProcessCheckpoints)
        .filter(
            ProcessCheckpoints.npcase_id == npcase_id,
            ProcessCheckpoints.batch_id == batch_id,
            ProcessCheckpoints.checkpoint_id == Checkpoints.PROCESS_COMPLETE.value,
        )
        .count()
    )

    orphaned = find_orphaned_checkpoints(session, npcase_id, batch_id)

    return {
        "total": total,
        "completed": completed,
        "orphaned": len(orphaned),
        "failed": total - completed - len(orphaned),
    }

Key Rules

  1. Checkpoints only move forward — never decrement a checkpoint value
  2. Validate transitions — reject invalid jumps (e.g., STARTED → ES_INDEXING_COMPLETE)
  3. Verify intermediate state on resume — don't assume the previous step's output exists
  4. Orphan detection uses time-based grace period — 2 hours default for Lambda-based processing
  5. Reconcile before batch completion — count completed vs. expected before BATCH_END_FINISHED
  6. Log orphan resolution — every orphan recovery action must be logged for audit
  7. Don't block the batch forever — accept partial failure rather than infinite retry
  8. Idempotent recovery — re-enqueued documents resume from their current checkpoint, not from scratch
Ask the Architecture ×

Ask questions about Nextpoint architecture, patterns, rules, or any module. Powered by Claude Opus 4.6.