Skip to content

Reference Implementation: documentloader

Overview

The documentloader module ingests documents into the Nextpoint eDiscovery platform. It processes files from various sources, extracts metadata, creates exhibits and attachments, indexes to Elasticsearch, and tracks progress through an 11-step checkpoint pipeline.

This is the first module built with the architecture patterns defined in this repo and serves as the reference implementation for all future modules.

Pattern Mapping

Pattern documentloader Implementation
Hexagonal boundaries core/ contains process.py, checkpoints.py, exceptions.py; shell/ contains db/, es/, utils/
Exception hierarchy RecoverableException, PermanentFailureException, SilentSuccessException, DBTransactionError, CheckpointUpdateError
SNS events 11 EventType enum values (JOB_STARTED through LOADER_CANCELLED)
SQS handler index.py — batch processing with partial failure support
Checkpoint pipeline 11-step state machine (PROCESS_STARTED → PROCESS_COMPLETE)
Database sessions writer_session, reader_session, core_reader_session, core_writer_session
Retry/resilience @retry_on_db_conflict, SQS exponential backoff (120s→900s), DLQ redrive (max 2x)
Idempotency Checkpoint-based (composite PK), DocDedupe table, TagDedupe with SHA256
Multi-tenancy Per-case database schema: {RDS_DBNAME}_case_{case_id}
Job processor orchestration 925-line job_processor_index.py — per-batch infrastructure lifecycle (create SQS/DLQ/Lambda/SNS subscription, monitor queue depth, multi-pass DLQ redrive, atomic teardown)
Elasticsearch bulk indexing _process_in_batches() with 504 ALB timeout recovery, _verify_bulk_indexed(), threshold-based bulk/individual switching (3), batch size 50
SQS operations Double-encoded envelope parsing, {STACK_NAME_PREFIX}_doc_loader_{case_id}_{batch_id} naming, exponential backoff requeue (120s→900s), 3-state queue depth monitoring, direct-to-DLQ routing
Structured logging JSON formatter with level normalization (WARNING→WARN, CRITICAL→FATAL), module-level _log_context dict, "nil" for missing values, silenced boto3/botocore/urllib3 loggers
Config management 4 environments × 3 regions CONFIG_MAP, lazy Secrets Manager caching, env var overrides with int() casting, import-time validation (warn, don't crash)
CDK infrastructure Two-stack composition (CommonResourcesStack + DocLoad), 3 Lambdas (DocumentLoader/JobProcessor/BatchProcessor), separate RDS writer/reader proxies, SNS shared in prod / separate in dev, Python 3.13 layer, private subnets only
ORM models 18 models: 8 core entities, 6 relationship tables, 3 deduplication tables with composite PKs. Soft deletes via delete_at_gmt. JSON columns for flexible config (near_dupe_info, bulk_redaction_info). Builder pattern for Exhibits construction from ExhibitData schema

Architecture

documentloader/
├── lib/lambda/src/loader/
│   ├── core/
│   │   ├── exceptions.py          # 5 exception types controlling message flow
│   │   ├── checkpoints.py         # 11-step checkpoint enum
│   │   ├── process.py             # DocumentProcessor pipeline orchestrator
│   │   ├── process_helpers.py     # Exhibit extraction, polling utilities
│   │   ├── models/
│   │   │   ├── db_models.py       # Exhibits, Attachments, Tags, Batches, etc.
│   │   │   └── schemas.py         # TypedDict/Pydantic validation
│   │   └── utils/
│   │       ├── db_transaction.py  # @retry_on_db_conflict decorator
│   │       ├── context_data.py    # ContextVar management
│   │       └── helpers.py         # Structured logging
│   ├── shell/
│   │   ├── db/database.py         # Session management (4 session types)
│   │   ├── es/es_ops.py           # Elasticsearch bulk indexing
│   │   ├── *_ops.py               # Domain-specific DB operations
│   │   └── utils/
│   │       ├── sns_ops.py         # SNS publisher with EventType enum
│   │       ├── sqs_ops.py         # SQS requeue, DLQ redrive, visibility
│   │       ├── s3_ops.py          # S3 file operations
│   │       └── aws_secrets.py     # Secrets Manager with caching
│   ├── index.py                   # Document loader Lambda handler
│   ├── job_processor_index.py     # Job orchestration Lambda handler
│   └── config.py                  # Centralized Config class
├── lib/lambda/tests/
│   ├── conftest.py                # 23 env vars, auto-use fixtures
│   └── ...                        # Unit and integration tests
└── lib/                           # CDK infrastructure (TypeScript)

Key Design Decisions

Two Lambda Functions

  • Job Processor: Orchestrates batch lifecycle (create queues, manage concurrency, finalize)
  • Document Loader: Processes individual documents (11-step checkpoint pipeline)

Checkpoint as Distributed Lock

The composite primary key (npcase_id, batch_id, document_id) prevents duplicate processing. First Lambda to INSERT wins; second gets a RecoverableException and retries later.

SNS Filter Policies for Fan-Out

Each batch gets its own SQS queue with a filter policy scoped to (eventType, caseId, batchId). This prevents cross-batch message delivery without application-level filtering.

Three Deduplication Strategies

  1. Checkpoint table — pipeline-level idempotency
  2. DocDedupe table — content-level deduplication (MD5 + message_id composite)
  3. TagDedupe table — SHA256 hash for content exceeding varchar limits

Three Lambda Handlers

Handler 1: index.py (Document Loader)

Processes individual documents through the 11-step checkpoint pipeline. Handles three event types:

Event Type Action
DOCUMENT_PROCESSED Run full 11-step checkpoint pipeline via load_document()
BATCH_ATTACHMENTS Chunked attachment creation (startPos/endPos ranges)
BATCH_DOCUMENT_RELATIONS Family document relation linking

Returns {"batchItemFailures": [...]} for partial batch failures.

Handler 2: job_processor_index.py (Job Orchestrator)

925-line handler managing per-batch infrastructure lifecycle (batch_size=1):

Event Type Action
JOB_STARTED Validate batch state → clone base Lambda → create SQS/DLQ → SNS subscribe with filter policy
JOB_FINISHED Re-enqueue once for SQS consistency → check queue depth → multi-pass DLQ redrive → trigger BATCH_END_START
IMPORT_CANCELLED Partial or full teardown depending on processing state
BATCH_END_FINISHED Atomic batch status update → clean AWS resources (Lambda, SQS, SNS) → publish notifications

Handler 3: batch_index.py (Batch Relations/Attachments)

Handles chunked operations dispatched by handler 1: - Processes attachment creation in ATTACHMENT_CHUNK_SIZE=100 page chunks - Polls for attachment consistency when page counts mismatch - Triggers document relation creation in batches

Dynamic Per-Batch Lambda Creation

The Job Processor clones the base Lambda to create isolated per-batch processing:

JOB_STARTED event received
  → Validate batch not cancelled/stuck
  → Check no active Lambda already exists
  → Create per-batch Lambda: {STACK_NAME_PREFIX}_doc_loader_{case_id}_{batch_id}
  → Create SQS queue + DLQ (maxReceiveCount=3)
  → Subscribe to SNS with filter policy:
      { eventType: [DOCUMENT_PROCESSED, BATCH_ATTACHMENTS, BATCH_DOCUMENT_RELATIONS],
        caseId: [{"numeric": ["=", case_id]}],
        batchId: [{"numeric": ["=", batch_id]}] }
  → Grant IAM permissions (Secrets Manager, RDS Proxy, SQS, SNS)
  → Publish LOADER_STARTED event

On BATCH_END_FINISHED, all per-batch resources are atomically torn down.

DLQ Multi-Pass Redrive

When JOB_FINISHED arrives, the Job Processor redrives DLQ messages:

Pass 1: Reset retry_count=0, set dlq_redrive_count=1, dlq_final_pass=false
  → Messages get fresh retry attempts in main queue
  → If still failing → back to DLQ

Pass 2: Reset retry_count=0, set dlq_redrive_count=2, dlq_final_pass=true
  → Final attempt — if family doc still missing:
      → Raise SilentSuccessException (log warning, don't fail)
      → SNS notification sent before silencing

Max redrive passes controlled by SQS_MAX_DLQ_REDRIVES (default: 2).

Attachment Chunking & Polling

Documents with many pages use chunked attachment creation:

if exhibit_page_count <= ATTACHMENT_CHUNK_SIZE (100):
    create all verified pages inline
    ES index all attachments
else:
    dispatch chunks via BATCH_ATTACHMENTS events:
      {startPos: 0, endPos: 100}
      {startPos: 100, endPos: 200}
      ...

Polling scenarios when exhibit_page_count != attachment_count:

Scenario Condition Action
ALL_PROCESSED actual == target Continue pipeline
PROGRESS_MADE actual > previous Reset retry count, requeue
RETRY_NEEDED actual == previous, retries < max Increment retry, requeue
MAX_RETRIES_EXCEEDED retries >= max Raise RecoverableException

ContextVar-Based Request Context

Thread-safe context propagation using Python contextvars:

npcase_context = ContextVar("npcase_id")      # Current case ID
sqs_event_var = ContextVar("sqs_event")        # Current SQS record
batch_context_var = ContextVar("batch_context") # Batch metadata (lazy-loaded)

Batch context lazy loading: Fetched from core DB on first access per Lambda invocation, then reused for all messages in the batch. Avoids redundant DB queries when processing multiple messages from the same batch.

Database Connection Management

# writer_session(npcase_id="123")
# → mysql+pymysql://{user}:{pwd}@{WRITER_PROXY}/nextpoint_case_123

Pool Configuration:
  pool_size: 5
  max_overflow: 5
  pool_timeout: 60s
  pool_recycle: 1800s (30 min)
  isolation_level: READ COMMITTED (writers only)

Four session types:

Session Endpoint Database Use Case
writer_session(case_id) RDS Writer Proxy {RDS_DBNAME}_case_{case_id} Per-case writes
reader_session(case_id) RDS Reader Proxy {RDS_DBNAME}_case_{case_id} Per-case reads
core_writer_session() RDS Writer Proxy {RDS_DBNAME} Shared DB writes (Batches, Npcases)
core_reader_session() RDS Reader Proxy {RDS_DBNAME} Shared DB reads

Concurrent Write Safety

Three strategies prevent data corruption under concurrent Lambda processing:

1. Checkpoint unique constraint (distributed lock):

UNIQUE (npcase_id, batch_id, nge_document_id)
-- First INSERT wins; second gets IntegrityError → RecoverableException

2. SAVEPOINT for DocDedupe insertion:

session.execute(text("SAVEPOINT dedupe_insert"))
try:
    session.execute(insert(DocDedupe).values(...))
    session.execute(text("RELEASE SAVEPOINT dedupe_insert"))
except IntegrityError:
    session.execute(text("ROLLBACK TO dedupe_insert"))
    # Already exists — safe to continue without failing transaction

3. @retry_on_db_conflict decorator: - Catches MySQL error codes 1213 (deadlock) and 1205 (lock wait timeout) - Auto-rollback + retry with jitter delay - Max retries: 3

Elasticsearch Bulk Indexing

# Index naming: {env}_{case_id}_exhibits (via alias)
# Routing: by case_id for query performance

Bulk Configuration:
  ES_BULK_BATCH_SIZE: 50 documents per request
  ES_BULK_THRESHOLD: 3 failures before switching to individual indexing
  ES_RETRY_ATTEMPTS: 3 with exponential backoff (2^attempt seconds)
  ES_TIMEOUT: 180s

# 504 ALB timeout recovery:
#   If bulk request hits ALB timeout, verify which docs were indexed
#   via _verify_bulk_indexed(), then retry only missing ones

Mapping configuration: YAML-based (exhibit.yml, deposition_volume.yml) with custom analyzers for full-text search.

S3 Operations

# Timeout configuration:
custom_config = BotoConfig(
    connect_timeout=10,   # 10s connection timeout
    read_timeout=30,      # 30s read timeout
    retries={"max_attempts": 5, "mode": "standard"}
)

# Operations:
read_json_from_s3(path)  # Metadata files, extraction results
read_txt_from_s3(path)   # Extracted text content

Multi-Region Deployment

CONFIG_MAP is a {environment}-{region} matrix providing region-specific Elasticsearch endpoints:

Environment us-east-1 us-west-1 ca-central-1
staging staging-c2-elasticsearch7-alb stg-prd-c2-elasticsearch-alb staging-c5-alb-elasticsearch
qa qa-c2-elasticsearch7-alb dummy dummy
production production-c2-search.nextpoint.com production-elasticsearch-alb stg-prd-c2-elasticsearch-alb

Region enum: US_EAST_1, US_WEST_1, CA_CENTRAL_1 Environment enum: DEV, STAGE, QA, PROD

Config lookup: CONFIG_MAP[f"{ENV}-{REGION}"] — falls back to prod-us-east-1 if key not found. Development ignores region entirely.

Per-Batch Resource Cleanup (Stack Teardown)

On BATCH_END_FINISHED, the Job Processor performs atomic teardown of all per-batch AWS resources:

process_batch_end_finished(message):
  1. Update batch.loader_status = "complete" (in writer_session)
  2. Unsubscribe SNS subscription (by queue name match)
  3. Detach SQS event source mapping from Lambda
  4. Delete per-batch Lambda function
  5. Delete main SQS queue
  6. Delete DLQ (only if empty — preserves failed messages for investigation)
  7. Publish LOADER_FINISHED event

Idempotent cleanup: Each step uses _safely_cleanup_resource() wrapper that catches and logs errors without failing the overall teardown. If a resource is already deleted, the operation succeeds silently.

Pre-Lambda cleanup (_cleanup_pre_lambda_resources): If JOB_STARTED detects stale resources from a previous failed attempt, it cleans up SNS subscriptions and SQS queues before creating fresh ones — prevents orphaned resources from accumulating.

DLQ preservation: The DLQ is only deleted if its message count is zero. Non-empty DLQs are preserved for manual investigation of permanently failed documents.

Lessons Learned

  1. Exponential backoff base matters — 120s base delay gives the system time to clear deadlocks. Starting too low causes retry storms.
  2. DLQ redrive needs a termination condition — without max redrive count, messages cycle indefinitely between main queue and DLQ.
  3. SilentSuccessException is essential — without it, successfully-handled edge cases (already processed, final DLQ pass) get reported as errors.
  4. Engine cache size limits prevent Lambda OOM — without LRU eviction, Lambdas processing many cases accumulate connections until memory exhaustion.
Ask the Architecture ×

Ask questions about Nextpoint architecture, patterns, rules, or any module. Powered by Claude Opus 4.6.