Chunk Dispatch and Polling Pattern¶
Purpose¶
Process large datasets by splitting them into chunks dispatched as separate SNS/SQS messages, then poll for completion using an adaptive state machine. This prevents Lambda timeouts on large batches and enables parallel processing.
Chunk Dispatch Pattern¶
When a dataset exceeds a threshold, split into chunks and dispatch via SQS:¶
# shell/attachment_model_ops.py
ATTACHMENT_CHUNK_SIZE = Config.ATTACHMENT_CHUNK_SIZE # Default: 100
def launch_attachment_chunker(session, import_attributes, exhibit_ids, total_pages):
"""Dispatch page creation in chunks to avoid Lambda timeout."""
npcase_id = import_attributes["npcase_id"]
batch_id = import_attributes["batch_id"]
job_id = import_attributes["job_id"]
for i in range(0, total_pages, ATTACHMENT_CHUNK_SIZE):
start_pos = i
end_pos = min(i + ATTACHMENT_CHUNK_SIZE, total_pages)
send_to_batch_sqs_queue(
npcase_id=npcase_id,
batch_id=batch_id,
job_id=job_id,
event_type=EventType.BATCH_ATTACHMENTS,
message_delay=5,
event_detail={"startPos": start_pos, "endPos": end_pos},
document_id=import_attributes["document_id"],
exhibit_ids=str(exhibit_ids),
)
For document relations, chunk with a MAX_CHUNKS safety limit:¶
# shell/batch_document_relation_ops.py
BATCH_CHUNK_SIZE = 100
MAX_CHUNKS = 1000 # Safety limit to prevent infinite dispatch
def dispatch_relation_chunks(session, npcase_id, batch_id, job_id, total_relations):
"""Split relation processing into chunks dispatched via SQS."""
chunk_count = 0
for start_id in range(0, total_relations, BATCH_CHUNK_SIZE):
end_id = min(start_id + BATCH_CHUNK_SIZE, total_relations)
send_to_batch_sqs_queue(
npcase_id=npcase_id,
batch_id=batch_id,
job_id=job_id,
event_type=EventType.BATCH_DOCUMENT_RELATIONS,
event_detail={
"chunkSize": BATCH_CHUNK_SIZE,
"startId": start_id,
"endId": end_id,
},
)
chunk_count += 1
if chunk_count == MAX_CHUNKS:
log_message("warning",
f"Hit MAX_CHUNKS limit ({MAX_CHUNKS}). "
f"Remaining relations will not be processed.")
break
Dual-Path Decision: Inline vs Chunked¶
For small datasets, process inline. For large datasets, dispatch chunks:
def process_attachments(session, import_attributes, exhibit_ids, total_pages):
"""Inline for small docs, chunked for large ones."""
if total_pages > ATTACHMENT_CHUNK_SIZE:
# Large document — dispatch chunks for parallel processing
launch_attachment_chunker(session, import_attributes, exhibit_ids, total_pages)
else:
# Small document — process inline in this Lambda invocation
create_attachments(session, import_attributes, exhibit_ids, ...)
Polling State Machine¶
After dispatching chunks, poll for completion using a state machine with adaptive delays.
Polling Scenarios¶
# shell/utils/polling_helpers.py
class PollingScenario(Enum):
ALL_PROCESSED = "all_processed" # Target reached — done
PROGRESS_MADE = "progress_made" # Count changed — keep going
RETRY_NEEDED = "retry_needed" # Count unchanged, retries remain
RETRIES_EXHAUSTED = "retries_exhausted" # Stuck — accept partial failure
Scenario Determination¶
Generic for both decreasing counts (relations draining to 0) and increasing counts (attachments building to target):
def determine_polling_scenario(
current_count, previous_count, retry_count, target_count=None
):
"""Determine polling state from counts and retry budget.
Args:
current_count: Current item count
previous_count: Count from previous poll (-1 for initial)
retry_count: Current retry attempt
target_count: Optional target for increasing counts.
If None, target is 0 (decreasing to zero).
"""
# Done?
if (target_count is None and current_count == 0) or \
(target_count is not None and current_count == target_count):
return PollingScenario.ALL_PROCESSED
# First poll — always counts as progress
if previous_count == -1:
return PollingScenario.PROGRESS_MADE
# Making progress?
if target_count is None:
if current_count < previous_count: # Decreasing toward 0
return PollingScenario.PROGRESS_MADE
else:
if current_count > previous_count: # Increasing toward target
return PollingScenario.PROGRESS_MADE
# Stuck but retries remain
if retry_count < MAX_RETRIES:
return PollingScenario.RETRY_NEEDED
# Stuck and out of retries
return PollingScenario.RETRIES_EXHAUSTED
Adaptive Delay Based on Dataset Size¶
Larger datasets need longer delays between polls:
CHUNK_DELAY_THRESHOLDS = {
500: 60, # < 500 items → 1 minute
1000: 120, # < 1000 items → 2 minutes
2000: 180, # < 2000 items → 3 minutes
5000: 240, # < 5000 items → 4 minutes
}
DEFAULT_CHUNK_DELAY = 300 # 5 minutes for very large datasets
def get_chunker_delay(current_count):
"""Adaptive delay — shorter for small datasets, longer for large."""
for threshold, delay in CHUNK_DELAY_THRESHOLDS.items():
if current_count < threshold:
return delay
return DEFAULT_CHUNK_DELAY
Polling Loop (Orchestrator)¶
The batch processor Lambda uses polling to wait for chunk completion:
def poll_for_completion(message):
"""Poll until all chunks are processed or retries exhausted."""
current_count = get_incomplete_count(session, npcase_id, batch_id)
previous_count = message.get("eventDetail", {}).get("previousCount", -1)
retry_count = message.get("eventDetail", {}).get("retryCount", 0)
scenario = determine_polling_scenario(current_count, previous_count, retry_count)
if scenario == PollingScenario.ALL_PROCESSED:
# All chunks done — proceed to next pipeline step
publish_completion_event(message)
elif scenario == PollingScenario.PROGRESS_MADE:
# Making progress — poll again with reset retry count
delay = get_chunker_delay(current_count)
requeue_with_counts(message, current_count, retry_count=0, delay=delay)
elif scenario == PollingScenario.RETRY_NEEDED:
# No progress but retries remain — poll again
delay = RETRY_DELAY # 120s
requeue_with_counts(message, current_count, retry_count + 1, delay=delay)
elif scenario == PollingScenario.RETRIES_EXHAUSTED:
# Stuck — report incomplete items, proceed anyway
report_incomplete_items(session, npcase_id, batch_id)
publish_completion_event(message) # Don't block the batch forever
Complete Flow¶
Document arrives (500 pages)
│
▼
total_pages > ATTACHMENT_CHUNK_SIZE (100)?
│ Yes
▼
Dispatch 5 chunk messages to SQS:
├── BATCH_ATTACHMENTS {startPos: 0, endPos: 100}
├── BATCH_ATTACHMENTS {startPos: 100, endPos: 200}
├── BATCH_ATTACHMENTS {startPos: 200, endPos: 300}
├── BATCH_ATTACHMENTS {startPos: 300, endPos: 400}
└── BATCH_ATTACHMENTS {startPos: 400, endPos: 500}
│
▼ (5 Lambda invocations process in parallel)
│
Batch Processor polls:
├── Poll 1: 300/500 done → PROGRESS_MADE → requeue (delay: 60s)
├── Poll 2: 450/500 done → PROGRESS_MADE → requeue (delay: 60s)
├── Poll 3: 500/500 done → ALL_PROCESSED → continue pipeline
│
▼
Next pipeline step (ES indexing, etc.)
State Persistence in Polling¶
Polling state lives in the SQS message, not in the database. Each requeued polling message carries its own context:
def requeue_with_counts(
message: dict,
current_count: int,
retry_count: int,
delay: int,
) -> None:
"""Requeue polling message with updated state."""
event_detail = parse_event_detail(message.get("eventDetail"))
event_detail["previousCount"] = current_count
event_detail["retryCount"] = retry_count
message["eventDetail"] = json.dumps(event_detail)
# Requeue to the same batch SQS queue with delay
send_sqs_message(queue_url, message, delay_seconds=delay)
Why Message-Based State (Not Database)¶
- No extra database writes — polling happens frequently; DB writes per poll add load
- Naturally scoped — state dies with the message if the batch is cancelled
- No cleanup needed — unlike checkpoint rows, no table to prune after completion
- Concurrent safety — each polling message is independent, no shared mutable state
What get_incomplete_count() Queries¶
The polling function queries process_checkpoints to count documents that
haven't reached PROCESS_COMPLETE:
def get_incomplete_count(session: Session, npcase_id: int, batch_id: int) -> int:
"""Count documents still being processed in this batch."""
return (
session.query(ProcessCheckpoints)
.filter(
ProcessCheckpoints.npcase_id == npcase_id,
ProcessCheckpoints.batch_id == batch_id,
ProcessCheckpoints.checkpoint_id < Checkpoints.PROCESS_COMPLETE.value,
)
.count()
)
Interrupted Polling Recovery¶
If the polling Lambda times out or crashes, the SQS visibility timeout expires and the message is automatically redelivered. Because state is in the message, polling resumes with the last known counts — no recovery logic needed.
Key Rules¶
- Always set MAX_CHUNKS — safety limit to prevent infinite dispatch loops
- Use adaptive delays — small datasets poll faster, large datasets poll slower
- Reset retry count on progress — only count consecutive stalls
- Don't block forever —
RETRIES_EXHAUSTEDproceeds with partial results, doesn't hang - Inline for small, chunked for large — threshold decision avoids dispatch overhead for small jobs
- Carry state in the message —
previousCount,retryCounttravel with the SQS message, not in a database - Generic polling logic — works for both increasing (attachments → target) and decreasing (relations → 0) counts
- No database writes for polling state — avoids extra load and cleanup overhead
Ask questions about Nextpoint architecture, patterns, rules, or any module. Powered by Claude Opus 4.6.