Reference Implementation: documentloader¶
Overview¶
The documentloader module ingests documents into the Nextpoint eDiscovery platform. It processes files from various sources, extracts metadata, creates exhibits and attachments, indexes to Elasticsearch, and tracks progress through an 11-step checkpoint pipeline.
This is the first module built with the architecture patterns defined in this repo and serves as the reference implementation for all future modules.
Pattern Mapping¶
| Pattern | documentloader Implementation |
|---|---|
| Hexagonal boundaries | core/ contains process.py, checkpoints.py, exceptions.py; shell/ contains db/, es/, utils/ |
| Exception hierarchy | RecoverableException, PermanentFailureException, SilentSuccessException, DBTransactionError, CheckpointUpdateError |
| SNS events | 11 EventType enum values (JOB_STARTED through LOADER_CANCELLED) |
| SQS handler | index.py — batch processing with partial failure support |
| Checkpoint pipeline | 11-step state machine (PROCESS_STARTED → PROCESS_COMPLETE) |
| Database sessions | writer_session, reader_session, core_reader_session, core_writer_session |
| Retry/resilience | @retry_on_db_conflict, SQS exponential backoff (120s→900s), DLQ redrive (max 2x) |
| Idempotency | Checkpoint-based (composite PK), DocDedupe table, TagDedupe with SHA256 |
| Multi-tenancy | Per-case database schema: {RDS_DBNAME}_case_{case_id} |
| Job processor orchestration | 925-line job_processor_index.py — per-batch infrastructure lifecycle (create SQS/DLQ/Lambda/SNS subscription, monitor queue depth, multi-pass DLQ redrive, atomic teardown) |
| Elasticsearch bulk indexing | _process_in_batches() with 504 ALB timeout recovery, _verify_bulk_indexed(), threshold-based bulk/individual switching (3), batch size 50 |
| SQS operations | Double-encoded envelope parsing, {STACK_NAME_PREFIX}_doc_loader_{case_id}_{batch_id} naming, exponential backoff requeue (120s→900s), 3-state queue depth monitoring, direct-to-DLQ routing |
| Structured logging | JSON formatter with level normalization (WARNING→WARN, CRITICAL→FATAL), module-level _log_context dict, "nil" for missing values, silenced boto3/botocore/urllib3 loggers |
| Config management | 4 environments × 3 regions CONFIG_MAP, lazy Secrets Manager caching, env var overrides with int() casting, import-time validation (warn, don't crash) |
| CDK infrastructure | Two-stack composition (CommonResourcesStack + DocLoad), 3 Lambdas (DocumentLoader/JobProcessor/BatchProcessor), separate RDS writer/reader proxies, SNS shared in prod / separate in dev, Python 3.13 layer, private subnets only |
| ORM models | 18 models: 8 core entities, 6 relationship tables, 3 deduplication tables with composite PKs. Soft deletes via delete_at_gmt. JSON columns for flexible config (near_dupe_info, bulk_redaction_info). Builder pattern for Exhibits construction from ExhibitData schema |
Architecture¶
documentloader/
├── lib/lambda/src/loader/
│ ├── core/
│ │ ├── exceptions.py # 5 exception types controlling message flow
│ │ ├── checkpoints.py # 11-step checkpoint enum
│ │ ├── process.py # DocumentProcessor pipeline orchestrator
│ │ ├── process_helpers.py # Exhibit extraction, polling utilities
│ │ ├── models/
│ │ │ ├── db_models.py # Exhibits, Attachments, Tags, Batches, etc.
│ │ │ └── schemas.py # TypedDict/Pydantic validation
│ │ └── utils/
│ │ ├── db_transaction.py # @retry_on_db_conflict decorator
│ │ ├── context_data.py # ContextVar management
│ │ └── helpers.py # Structured logging
│ ├── shell/
│ │ ├── db/database.py # Session management (4 session types)
│ │ ├── es/es_ops.py # Elasticsearch bulk indexing
│ │ ├── *_ops.py # Domain-specific DB operations
│ │ └── utils/
│ │ ├── sns_ops.py # SNS publisher with EventType enum
│ │ ├── sqs_ops.py # SQS requeue, DLQ redrive, visibility
│ │ ├── s3_ops.py # S3 file operations
│ │ └── aws_secrets.py # Secrets Manager with caching
│ ├── index.py # Document loader Lambda handler
│ ├── job_processor_index.py # Job orchestration Lambda handler
│ └── config.py # Centralized Config class
├── lib/lambda/tests/
│ ├── conftest.py # 23 env vars, auto-use fixtures
│ └── ... # Unit and integration tests
└── lib/ # CDK infrastructure (TypeScript)
Key Design Decisions¶
Two Lambda Functions¶
- Job Processor: Orchestrates batch lifecycle (create queues, manage concurrency, finalize)
- Document Loader: Processes individual documents (11-step checkpoint pipeline)
Checkpoint as Distributed Lock¶
The composite primary key (npcase_id, batch_id, document_id) prevents duplicate processing. First Lambda to INSERT wins; second gets a RecoverableException and retries later.
SNS Filter Policies for Fan-Out¶
Each batch gets its own SQS queue with a filter policy scoped to (eventType, caseId, batchId). This prevents cross-batch message delivery without application-level filtering.
Three Deduplication Strategies¶
- Checkpoint table — pipeline-level idempotency
- DocDedupe table — content-level deduplication (MD5 + message_id composite)
- TagDedupe table — SHA256 hash for content exceeding varchar limits
Three Lambda Handlers¶
Handler 1: index.py (Document Loader)¶
Processes individual documents through the 11-step checkpoint pipeline. Handles three event types:
| Event Type | Action |
|---|---|
DOCUMENT_PROCESSED |
Run full 11-step checkpoint pipeline via load_document() |
BATCH_ATTACHMENTS |
Chunked attachment creation (startPos/endPos ranges) |
BATCH_DOCUMENT_RELATIONS |
Family document relation linking |
Returns {"batchItemFailures": [...]} for partial batch failures.
Handler 2: job_processor_index.py (Job Orchestrator)¶
925-line handler managing per-batch infrastructure lifecycle (batch_size=1):
| Event Type | Action |
|---|---|
JOB_STARTED |
Validate batch state → clone base Lambda → create SQS/DLQ → SNS subscribe with filter policy |
JOB_FINISHED |
Re-enqueue once for SQS consistency → check queue depth → multi-pass DLQ redrive → trigger BATCH_END_START |
IMPORT_CANCELLED |
Partial or full teardown depending on processing state |
BATCH_END_FINISHED |
Atomic batch status update → clean AWS resources (Lambda, SQS, SNS) → publish notifications |
Handler 3: batch_index.py (Batch Relations/Attachments)¶
Handles chunked operations dispatched by handler 1: - Processes attachment creation in ATTACHMENT_CHUNK_SIZE=100 page chunks - Polls for attachment consistency when page counts mismatch - Triggers document relation creation in batches
Dynamic Per-Batch Lambda Creation¶
The Job Processor clones the base Lambda to create isolated per-batch processing:
JOB_STARTED event received
→ Validate batch not cancelled/stuck
→ Check no active Lambda already exists
→ Create per-batch Lambda: {STACK_NAME_PREFIX}_doc_loader_{case_id}_{batch_id}
→ Create SQS queue + DLQ (maxReceiveCount=3)
→ Subscribe to SNS with filter policy:
{ eventType: [DOCUMENT_PROCESSED, BATCH_ATTACHMENTS, BATCH_DOCUMENT_RELATIONS],
caseId: [{"numeric": ["=", case_id]}],
batchId: [{"numeric": ["=", batch_id]}] }
→ Grant IAM permissions (Secrets Manager, RDS Proxy, SQS, SNS)
→ Publish LOADER_STARTED event
On BATCH_END_FINISHED, all per-batch resources are atomically torn down.
DLQ Multi-Pass Redrive¶
When JOB_FINISHED arrives, the Job Processor redrives DLQ messages:
Pass 1: Reset retry_count=0, set dlq_redrive_count=1, dlq_final_pass=false
→ Messages get fresh retry attempts in main queue
→ If still failing → back to DLQ
Pass 2: Reset retry_count=0, set dlq_redrive_count=2, dlq_final_pass=true
→ Final attempt — if family doc still missing:
→ Raise SilentSuccessException (log warning, don't fail)
→ SNS notification sent before silencing
Max redrive passes controlled by SQS_MAX_DLQ_REDRIVES (default: 2).
Attachment Chunking & Polling¶
Documents with many pages use chunked attachment creation:
if exhibit_page_count <= ATTACHMENT_CHUNK_SIZE (100):
create all verified pages inline
ES index all attachments
else:
dispatch chunks via BATCH_ATTACHMENTS events:
{startPos: 0, endPos: 100}
{startPos: 100, endPos: 200}
...
Polling scenarios when exhibit_page_count != attachment_count:
| Scenario | Condition | Action |
|---|---|---|
| ALL_PROCESSED | actual == target | Continue pipeline |
| PROGRESS_MADE | actual > previous | Reset retry count, requeue |
| RETRY_NEEDED | actual == previous, retries < max | Increment retry, requeue |
| MAX_RETRIES_EXCEEDED | retries >= max | Raise RecoverableException |
ContextVar-Based Request Context¶
Thread-safe context propagation using Python contextvars:
npcase_context = ContextVar("npcase_id") # Current case ID
sqs_event_var = ContextVar("sqs_event") # Current SQS record
batch_context_var = ContextVar("batch_context") # Batch metadata (lazy-loaded)
Batch context lazy loading: Fetched from core DB on first access per Lambda invocation, then reused for all messages in the batch. Avoids redundant DB queries when processing multiple messages from the same batch.
Database Connection Management¶
# writer_session(npcase_id="123")
# → mysql+pymysql://{user}:{pwd}@{WRITER_PROXY}/nextpoint_case_123
Pool Configuration:
pool_size: 5
max_overflow: 5
pool_timeout: 60s
pool_recycle: 1800s (30 min)
isolation_level: READ COMMITTED (writers only)
Four session types:
| Session | Endpoint | Database | Use Case |
|---|---|---|---|
writer_session(case_id) |
RDS Writer Proxy | {RDS_DBNAME}_case_{case_id} |
Per-case writes |
reader_session(case_id) |
RDS Reader Proxy | {RDS_DBNAME}_case_{case_id} |
Per-case reads |
core_writer_session() |
RDS Writer Proxy | {RDS_DBNAME} |
Shared DB writes (Batches, Npcases) |
core_reader_session() |
RDS Reader Proxy | {RDS_DBNAME} |
Shared DB reads |
Concurrent Write Safety¶
Three strategies prevent data corruption under concurrent Lambda processing:
1. Checkpoint unique constraint (distributed lock):
UNIQUE (npcase_id, batch_id, nge_document_id)
-- First INSERT wins; second gets IntegrityError → RecoverableException
2. SAVEPOINT for DocDedupe insertion:
session.execute(text("SAVEPOINT dedupe_insert"))
try:
session.execute(insert(DocDedupe).values(...))
session.execute(text("RELEASE SAVEPOINT dedupe_insert"))
except IntegrityError:
session.execute(text("ROLLBACK TO dedupe_insert"))
# Already exists — safe to continue without failing transaction
3. @retry_on_db_conflict decorator: - Catches MySQL error codes 1213 (deadlock) and 1205 (lock wait timeout) - Auto-rollback + retry with jitter delay - Max retries: 3
Elasticsearch Bulk Indexing¶
# Index naming: {env}_{case_id}_exhibits (via alias)
# Routing: by case_id for query performance
Bulk Configuration:
ES_BULK_BATCH_SIZE: 50 documents per request
ES_BULK_THRESHOLD: 3 failures before switching to individual indexing
ES_RETRY_ATTEMPTS: 3 with exponential backoff (2^attempt seconds)
ES_TIMEOUT: 180s
# 504 ALB timeout recovery:
# If bulk request hits ALB timeout, verify which docs were indexed
# via _verify_bulk_indexed(), then retry only missing ones
Mapping configuration: YAML-based (exhibit.yml, deposition_volume.yml)
with custom analyzers for full-text search.
S3 Operations¶
# Timeout configuration:
custom_config = BotoConfig(
connect_timeout=10, # 10s connection timeout
read_timeout=30, # 30s read timeout
retries={"max_attempts": 5, "mode": "standard"}
)
# Operations:
read_json_from_s3(path) # Metadata files, extraction results
read_txt_from_s3(path) # Extracted text content
Multi-Region Deployment¶
CONFIG_MAP is a {environment}-{region} matrix providing region-specific
Elasticsearch endpoints:
| Environment | us-east-1 | us-west-1 | ca-central-1 |
|---|---|---|---|
| staging | staging-c2-elasticsearch7-alb |
stg-prd-c2-elasticsearch-alb |
staging-c5-alb-elasticsearch |
| qa | qa-c2-elasticsearch7-alb |
dummy | dummy |
| production | production-c2-search.nextpoint.com |
production-elasticsearch-alb |
stg-prd-c2-elasticsearch-alb |
Region enum: US_EAST_1, US_WEST_1, CA_CENTRAL_1
Environment enum: DEV, STAGE, QA, PROD
Config lookup: CONFIG_MAP[f"{ENV}-{REGION}"] — falls back to prod-us-east-1
if key not found. Development ignores region entirely.
Per-Batch Resource Cleanup (Stack Teardown)¶
On BATCH_END_FINISHED, the Job Processor performs atomic teardown of all
per-batch AWS resources:
process_batch_end_finished(message):
1. Update batch.loader_status = "complete" (in writer_session)
2. Unsubscribe SNS subscription (by queue name match)
3. Detach SQS event source mapping from Lambda
4. Delete per-batch Lambda function
5. Delete main SQS queue
6. Delete DLQ (only if empty — preserves failed messages for investigation)
7. Publish LOADER_FINISHED event
Idempotent cleanup: Each step uses _safely_cleanup_resource() wrapper
that catches and logs errors without failing the overall teardown. If a
resource is already deleted, the operation succeeds silently.
Pre-Lambda cleanup (_cleanup_pre_lambda_resources): If JOB_STARTED
detects stale resources from a previous failed attempt, it cleans up SNS
subscriptions and SQS queues before creating fresh ones — prevents orphaned
resources from accumulating.
DLQ preservation: The DLQ is only deleted if its message count is zero. Non-empty DLQs are preserved for manual investigation of permanently failed documents.
Lessons Learned¶
- Exponential backoff base matters — 120s base delay gives the system time to clear deadlocks. Starting too low causes retry storms.
- DLQ redrive needs a termination condition — without max redrive count, messages cycle indefinitely between main queue and DLQ.
- SilentSuccessException is essential — without it, successfully-handled edge cases (already processed, final DLQ pass) get reported as errors.
- Engine cache size limits prevent Lambda OOM — without LRU eviction, Lambdas processing many cases accumulate connections until memory exhaustion.
Ask questions about Nextpoint architecture, patterns, rules, or any module. Powered by Claude Opus 4.6.