Skip to content

Checkpoint Pipeline Pattern

Orchestration pattern: Plan-and-Execute — define all steps upfront (the "plan"), then execute sequentially with resumability. Compare with Hierarchical (parallel delegation, used by pr-review) and ReAct (dynamic tool selection, not used in NGE).

Purpose

Long-running document processing must be resumable. If a Lambda times out, gets throttled, or hits a transient error, processing resumes from the last completed step — not from the beginning.

How It Works

  1. Define processing as a sequence of named checkpoints (the plan)
  2. Before starting, check if a checkpoint exists (resume vs. fresh start)
  3. After each step, persist the checkpoint to the database
  4. On resume, skip completed steps and continue from the last checkpoint

Checkpoint Enum

# core/checkpoints.py

from enum import Enum

class Checkpoints(Enum):
    PROCESS_STARTED = 0
    DUPLICATION_HANDLED = 1
    EXHIBIT_CREATED = 2
    ATTACHMENTS_CREATED = 3
    TAGGINGS_CREATED = 4
    BATCH_SOURCES_CREATED = 5
    CUSTODIAN_EXHIBITS_CREATED = 6
    MODEL_AUDITS_SKIPPED = 7
    FOLDERING_CREATED = 8
    ES_INDEXING_COMPLETE = 9
    PROCESS_COMPLETE = 10

Database Model

# core/models/db_models.py

class ProcessCheckpoints(Base):
    __tablename__ = "process_checkpoints"

    npcase_id: Mapped[int] = mapped_column(primary_key=True)
    batch_id: Mapped[int] = mapped_column(primary_key=True)
    nge_document_id: Mapped[str] = mapped_column(String(255), primary_key=True)
    checkpoint_id: Mapped[int] = mapped_column(default=0)
    exhibit_ids: Mapped[Optional[str]] = mapped_column(Text)
    index_documents: Mapped[bool] = mapped_column(default=True)

The composite primary key (npcase_id, batch_id, document_id) ensures only one checkpoint per document per batch.

Pipeline Orchestrator

# core/process.py

class DocumentProcessor:
    def __init__(self, checkpoint, import_attributes, initial=True):
        self.checkpoint = checkpoint
        self.import_attributes = import_attributes
        self.initial = initial

    def run(self):
        """Execute pipeline from current checkpoint to completion."""

        # Define pipeline steps
        pipeline = {
            Checkpoints.PROCESS_STARTED: self._handle_duplication,
            Checkpoints.DUPLICATION_HANDLED: self._create_exhibit,
            Checkpoints.EXHIBIT_CREATED: self._create_attachments,
            Checkpoints.ATTACHMENTS_CREATED: self._create_taggings,
            Checkpoints.TAGGINGS_CREATED: self._create_batch_sources,
            Checkpoints.BATCH_SOURCES_CREATED: self._create_custodian_exhibits,
            Checkpoints.CUSTODIAN_EXHIBITS_CREATED: self._skip_model_audits,
            Checkpoints.MODEL_AUDITS_SKIPPED: self._create_foldering,
            Checkpoints.FOLDERING_CREATED: self._index_elasticsearch,
            Checkpoints.ES_INDEXING_COMPLETE: self._complete_process,
        }

        # Execute from current checkpoint
        while self.checkpoint.checkpoint_id != Checkpoints.PROCESS_COMPLETE.value:
            current = Checkpoints(self.checkpoint.checkpoint_id)
            handler = pipeline[current]

            with writer_session(npcase_id=self.npcase_id) as session:
                handler(session)
                update_checkpoint(session, self.checkpoint, current.value + 1)

            # Refresh checkpoint for next iteration
            self.checkpoint = get_checkpoint(...)

Entry Point: Resume or Start

def load_document(session, import_attributes):
    """Resume existing or start new processing pipeline."""

    checkpoint = get_checkpoint(session, npcase_id, batch_id, document_id)

    if checkpoint and checkpoint.checkpoint_id == Checkpoints.PROCESS_COMPLETE.value:
        # Already done — skip
        log_message("info", f"Document {document_id} already processed")
        return

    if checkpoint:
        # Resume from last checkpoint
        DocumentProcessor(checkpoint, import_attributes, initial=False).run()
    else:
        # First attempt — create initial checkpoint
        new_checkpoint = create_checkpoint(
            session, npcase_id, batch_id, document_id, Checkpoints.PROCESS_STARTED
        )
        if new_checkpoint:
            DocumentProcessor(new_checkpoint, import_attributes, initial=True).run()
        else:
            # Race condition — another Lambda claimed this document
            raise RecoverableException(document_id, "Another Lambda is processing")

Race Condition Handling

The composite primary key acts as a distributed lock: - First Lambda to INSERT the checkpoint "wins" and processes the document - Second Lambda's INSERT fails → raises RecoverableException → message requeued - When the first Lambda completes, the requeued message sees PROCESS_COMPLETE and skips

When to Use This Pattern

  • Processing has multiple distinct steps that each take significant time
  • Steps are independently committable (each step's result can be persisted)
  • Lambda timeouts are a real risk (processing can take minutes)
  • Duplicate delivery is expected (SNS/SQS at-least-once)

Checkpoint Cleanup

Checkpoint rows accumulate in the process_checkpoints table over time. They must be cleaned up after batch completion to prevent unbounded table growth.

When to Clean Up

Cleanup happens during BATCH_END_FINISHED processing in the job processor, after all documents in the batch are confirmed complete (or accepted as failed).

def cleanup_batch_checkpoints(
    session: Session,
    npcase_id: int,
    batch_id: int,
) -> int:
    """Delete all checkpoints for a completed batch."""
    deleted = (
        session.query(ProcessCheckpoints)
        .filter(
            ProcessCheckpoints.npcase_id == npcase_id,
            ProcessCheckpoints.batch_id == batch_id,
        )
        .delete(synchronize_session="fetch")
    )
    log_message("info", f"Cleaned up {deleted} checkpoints for batch {batch_id}")
    return deleted

Cleanup Rules

  • ONLY clean up after BATCH_END_FINISHED — never during active processing
  • Delete ALL checkpoints for the batch (completed, failed, and orphaned)
  • Use @retry_on_db_conflict — cleanup may conflict with late-arriving document processing
  • Log the count of deleted rows for audit

Stale Checkpoint Detection

For batches that are cancelled or abandoned without a BATCH_END_FINISHED event, a periodic scan can detect and clean up stale checkpoints:

STALE_CHECKPOINT_DAYS = 7

def find_stale_checkpoints(session: Session) -> list[tuple[int, int]]:
    """Find batch/case pairs with checkpoints older than the stale threshold."""
    cutoff = datetime.now(timezone.utc) - timedelta(days=STALE_CHECKPOINT_DAYS)
    return (
        session.query(
            ProcessCheckpoints.npcase_id,
            ProcessCheckpoints.batch_id,
        )
        .filter(ProcessCheckpoints.updated_at < cutoff)
        .distinct()
        .all()
    )

When NOT to Use This Pattern

  • Simple request/response operations (single database write)
  • Operations that complete in under 5 seconds
  • Operations where partial completion is worse than starting over
Ask the Architecture ×

Ask questions about Nextpoint architecture, patterns, rules, or any module. Powered by Claude Opus 4.6.