Checkpoint Pipeline Pattern¶
Orchestration pattern: Plan-and-Execute — define all steps upfront (the "plan"), then execute sequentially with resumability. Compare with Hierarchical (parallel delegation, used by pr-review) and ReAct (dynamic tool selection, not used in NGE).
Purpose¶
Long-running document processing must be resumable. If a Lambda times out, gets throttled, or hits a transient error, processing resumes from the last completed step — not from the beginning.
How It Works¶
- Define processing as a sequence of named checkpoints (the plan)
- Before starting, check if a checkpoint exists (resume vs. fresh start)
- After each step, persist the checkpoint to the database
- On resume, skip completed steps and continue from the last checkpoint
Checkpoint Enum¶
# core/checkpoints.py
from enum import Enum
class Checkpoints(Enum):
PROCESS_STARTED = 0
DUPLICATION_HANDLED = 1
EXHIBIT_CREATED = 2
ATTACHMENTS_CREATED = 3
TAGGINGS_CREATED = 4
BATCH_SOURCES_CREATED = 5
CUSTODIAN_EXHIBITS_CREATED = 6
MODEL_AUDITS_SKIPPED = 7
FOLDERING_CREATED = 8
ES_INDEXING_COMPLETE = 9
PROCESS_COMPLETE = 10
Database Model¶
# core/models/db_models.py
class ProcessCheckpoints(Base):
__tablename__ = "process_checkpoints"
npcase_id: Mapped[int] = mapped_column(primary_key=True)
batch_id: Mapped[int] = mapped_column(primary_key=True)
nge_document_id: Mapped[str] = mapped_column(String(255), primary_key=True)
checkpoint_id: Mapped[int] = mapped_column(default=0)
exhibit_ids: Mapped[Optional[str]] = mapped_column(Text)
index_documents: Mapped[bool] = mapped_column(default=True)
The composite primary key (npcase_id, batch_id, document_id) ensures only one checkpoint per document per batch.
Pipeline Orchestrator¶
# core/process.py
class DocumentProcessor:
def __init__(self, checkpoint, import_attributes, initial=True):
self.checkpoint = checkpoint
self.import_attributes = import_attributes
self.initial = initial
def run(self):
"""Execute pipeline from current checkpoint to completion."""
# Define pipeline steps
pipeline = {
Checkpoints.PROCESS_STARTED: self._handle_duplication,
Checkpoints.DUPLICATION_HANDLED: self._create_exhibit,
Checkpoints.EXHIBIT_CREATED: self._create_attachments,
Checkpoints.ATTACHMENTS_CREATED: self._create_taggings,
Checkpoints.TAGGINGS_CREATED: self._create_batch_sources,
Checkpoints.BATCH_SOURCES_CREATED: self._create_custodian_exhibits,
Checkpoints.CUSTODIAN_EXHIBITS_CREATED: self._skip_model_audits,
Checkpoints.MODEL_AUDITS_SKIPPED: self._create_foldering,
Checkpoints.FOLDERING_CREATED: self._index_elasticsearch,
Checkpoints.ES_INDEXING_COMPLETE: self._complete_process,
}
# Execute from current checkpoint
while self.checkpoint.checkpoint_id != Checkpoints.PROCESS_COMPLETE.value:
current = Checkpoints(self.checkpoint.checkpoint_id)
handler = pipeline[current]
with writer_session(npcase_id=self.npcase_id) as session:
handler(session)
update_checkpoint(session, self.checkpoint, current.value + 1)
# Refresh checkpoint for next iteration
self.checkpoint = get_checkpoint(...)
Entry Point: Resume or Start¶
def load_document(session, import_attributes):
"""Resume existing or start new processing pipeline."""
checkpoint = get_checkpoint(session, npcase_id, batch_id, document_id)
if checkpoint and checkpoint.checkpoint_id == Checkpoints.PROCESS_COMPLETE.value:
# Already done — skip
log_message("info", f"Document {document_id} already processed")
return
if checkpoint:
# Resume from last checkpoint
DocumentProcessor(checkpoint, import_attributes, initial=False).run()
else:
# First attempt — create initial checkpoint
new_checkpoint = create_checkpoint(
session, npcase_id, batch_id, document_id, Checkpoints.PROCESS_STARTED
)
if new_checkpoint:
DocumentProcessor(new_checkpoint, import_attributes, initial=True).run()
else:
# Race condition — another Lambda claimed this document
raise RecoverableException(document_id, "Another Lambda is processing")
Race Condition Handling¶
The composite primary key acts as a distributed lock: - First Lambda to INSERT the checkpoint "wins" and processes the document - Second Lambda's INSERT fails → raises RecoverableException → message requeued - When the first Lambda completes, the requeued message sees PROCESS_COMPLETE and skips
When to Use This Pattern¶
- Processing has multiple distinct steps that each take significant time
- Steps are independently committable (each step's result can be persisted)
- Lambda timeouts are a real risk (processing can take minutes)
- Duplicate delivery is expected (SNS/SQS at-least-once)
Checkpoint Cleanup¶
Checkpoint rows accumulate in the process_checkpoints table over time. They
must be cleaned up after batch completion to prevent unbounded table growth.
When to Clean Up¶
Cleanup happens during BATCH_END_FINISHED processing in the job processor,
after all documents in the batch are confirmed complete (or accepted as failed).
def cleanup_batch_checkpoints(
session: Session,
npcase_id: int,
batch_id: int,
) -> int:
"""Delete all checkpoints for a completed batch."""
deleted = (
session.query(ProcessCheckpoints)
.filter(
ProcessCheckpoints.npcase_id == npcase_id,
ProcessCheckpoints.batch_id == batch_id,
)
.delete(synchronize_session="fetch")
)
log_message("info", f"Cleaned up {deleted} checkpoints for batch {batch_id}")
return deleted
Cleanup Rules¶
- ONLY clean up after
BATCH_END_FINISHED— never during active processing - Delete ALL checkpoints for the batch (completed, failed, and orphaned)
- Use
@retry_on_db_conflict— cleanup may conflict with late-arriving document processing - Log the count of deleted rows for audit
Stale Checkpoint Detection¶
For batches that are cancelled or abandoned without a BATCH_END_FINISHED event,
a periodic scan can detect and clean up stale checkpoints:
STALE_CHECKPOINT_DAYS = 7
def find_stale_checkpoints(session: Session) -> list[tuple[int, int]]:
"""Find batch/case pairs with checkpoints older than the stale threshold."""
cutoff = datetime.now(timezone.utc) - timedelta(days=STALE_CHECKPOINT_DAYS)
return (
session.query(
ProcessCheckpoints.npcase_id,
ProcessCheckpoints.batch_id,
)
.filter(ProcessCheckpoints.updated_at < cutoff)
.distinct()
.all()
)
When NOT to Use This Pattern¶
- Simple request/response operations (single database write)
- Operations that complete in under 5 seconds
- Operations where partial completion is worse than starting over
Ask questions about Nextpoint architecture, patterns, rules, or any module. Powered by Claude Opus 4.6.