Skip to content

Chunk Dispatch and Polling Pattern

Purpose

Process large datasets by splitting them into chunks dispatched as separate SNS/SQS messages, then poll for completion using an adaptive state machine. This prevents Lambda timeouts on large batches and enables parallel processing.

Chunk Dispatch Pattern

When a dataset exceeds a threshold, split into chunks and dispatch via SQS:

# shell/attachment_model_ops.py

ATTACHMENT_CHUNK_SIZE = Config.ATTACHMENT_CHUNK_SIZE  # Default: 100

def launch_attachment_chunker(session, import_attributes, exhibit_ids, total_pages):
    """Dispatch page creation in chunks to avoid Lambda timeout."""
    npcase_id = import_attributes["npcase_id"]
    batch_id = import_attributes["batch_id"]
    job_id = import_attributes["job_id"]

    for i in range(0, total_pages, ATTACHMENT_CHUNK_SIZE):
        start_pos = i
        end_pos = min(i + ATTACHMENT_CHUNK_SIZE, total_pages)

        send_to_batch_sqs_queue(
            npcase_id=npcase_id,
            batch_id=batch_id,
            job_id=job_id,
            event_type=EventType.BATCH_ATTACHMENTS,
            message_delay=5,
            event_detail={"startPos": start_pos, "endPos": end_pos},
            document_id=import_attributes["document_id"],
            exhibit_ids=str(exhibit_ids),
        )

For document relations, chunk with a MAX_CHUNKS safety limit:

# shell/batch_document_relation_ops.py

BATCH_CHUNK_SIZE = 100
MAX_CHUNKS = 1000  # Safety limit to prevent infinite dispatch

def dispatch_relation_chunks(session, npcase_id, batch_id, job_id, total_relations):
    """Split relation processing into chunks dispatched via SQS."""
    chunk_count = 0

    for start_id in range(0, total_relations, BATCH_CHUNK_SIZE):
        end_id = min(start_id + BATCH_CHUNK_SIZE, total_relations)

        send_to_batch_sqs_queue(
            npcase_id=npcase_id,
            batch_id=batch_id,
            job_id=job_id,
            event_type=EventType.BATCH_DOCUMENT_RELATIONS,
            event_detail={
                "chunkSize": BATCH_CHUNK_SIZE,
                "startId": start_id,
                "endId": end_id,
            },
        )

        chunk_count += 1
        if chunk_count == MAX_CHUNKS:
            log_message("warning",
                f"Hit MAX_CHUNKS limit ({MAX_CHUNKS}). "
                f"Remaining relations will not be processed.")
            break

Dual-Path Decision: Inline vs Chunked

For small datasets, process inline. For large datasets, dispatch chunks:

def process_attachments(session, import_attributes, exhibit_ids, total_pages):
    """Inline for small docs, chunked for large ones."""
    if total_pages > ATTACHMENT_CHUNK_SIZE:
        # Large document — dispatch chunks for parallel processing
        launch_attachment_chunker(session, import_attributes, exhibit_ids, total_pages)
    else:
        # Small document — process inline in this Lambda invocation
        create_attachments(session, import_attributes, exhibit_ids, ...)

Polling State Machine

After dispatching chunks, poll for completion using a state machine with adaptive delays.

Polling Scenarios

# shell/utils/polling_helpers.py

class PollingScenario(Enum):
    ALL_PROCESSED = "all_processed"       # Target reached — done
    PROGRESS_MADE = "progress_made"       # Count changed — keep going
    RETRY_NEEDED = "retry_needed"         # Count unchanged, retries remain
    RETRIES_EXHAUSTED = "retries_exhausted"  # Stuck — accept partial failure

Scenario Determination

Generic for both decreasing counts (relations draining to 0) and increasing counts (attachments building to target):

def determine_polling_scenario(
    current_count, previous_count, retry_count, target_count=None
):
    """Determine polling state from counts and retry budget.

    Args:
        current_count: Current item count
        previous_count: Count from previous poll (-1 for initial)
        retry_count: Current retry attempt
        target_count: Optional target for increasing counts.
                      If None, target is 0 (decreasing to zero).
    """
    # Done?
    if (target_count is None and current_count == 0) or \
       (target_count is not None and current_count == target_count):
        return PollingScenario.ALL_PROCESSED

    # First poll — always counts as progress
    if previous_count == -1:
        return PollingScenario.PROGRESS_MADE

    # Making progress?
    if target_count is None:
        if current_count < previous_count:  # Decreasing toward 0
            return PollingScenario.PROGRESS_MADE
    else:
        if current_count > previous_count:  # Increasing toward target
            return PollingScenario.PROGRESS_MADE

    # Stuck but retries remain
    if retry_count < MAX_RETRIES:
        return PollingScenario.RETRY_NEEDED

    # Stuck and out of retries
    return PollingScenario.RETRIES_EXHAUSTED

Adaptive Delay Based on Dataset Size

Larger datasets need longer delays between polls:

CHUNK_DELAY_THRESHOLDS = {
    500: 60,     # < 500 items → 1 minute
    1000: 120,   # < 1000 items → 2 minutes
    2000: 180,   # < 2000 items → 3 minutes
    5000: 240,   # < 5000 items → 4 minutes
}
DEFAULT_CHUNK_DELAY = 300  # 5 minutes for very large datasets

def get_chunker_delay(current_count):
    """Adaptive delay — shorter for small datasets, longer for large."""
    for threshold, delay in CHUNK_DELAY_THRESHOLDS.items():
        if current_count < threshold:
            return delay
    return DEFAULT_CHUNK_DELAY

Polling Loop (Orchestrator)

The batch processor Lambda uses polling to wait for chunk completion:

def poll_for_completion(message):
    """Poll until all chunks are processed or retries exhausted."""
    current_count = get_incomplete_count(session, npcase_id, batch_id)
    previous_count = message.get("eventDetail", {}).get("previousCount", -1)
    retry_count = message.get("eventDetail", {}).get("retryCount", 0)

    scenario = determine_polling_scenario(current_count, previous_count, retry_count)

    if scenario == PollingScenario.ALL_PROCESSED:
        # All chunks done — proceed to next pipeline step
        publish_completion_event(message)

    elif scenario == PollingScenario.PROGRESS_MADE:
        # Making progress — poll again with reset retry count
        delay = get_chunker_delay(current_count)
        requeue_with_counts(message, current_count, retry_count=0, delay=delay)

    elif scenario == PollingScenario.RETRY_NEEDED:
        # No progress but retries remain — poll again
        delay = RETRY_DELAY  # 120s
        requeue_with_counts(message, current_count, retry_count + 1, delay=delay)

    elif scenario == PollingScenario.RETRIES_EXHAUSTED:
        # Stuck — report incomplete items, proceed anyway
        report_incomplete_items(session, npcase_id, batch_id)
        publish_completion_event(message)  # Don't block the batch forever

Complete Flow

Document arrives (500 pages)
total_pages > ATTACHMENT_CHUNK_SIZE (100)?
    │ Yes
Dispatch 5 chunk messages to SQS:
    ├── BATCH_ATTACHMENTS {startPos: 0, endPos: 100}
    ├── BATCH_ATTACHMENTS {startPos: 100, endPos: 200}
    ├── BATCH_ATTACHMENTS {startPos: 200, endPos: 300}
    ├── BATCH_ATTACHMENTS {startPos: 300, endPos: 400}
    └── BATCH_ATTACHMENTS {startPos: 400, endPos: 500}
    ▼ (5 Lambda invocations process in parallel)
Batch Processor polls:
    ├── Poll 1: 300/500 done → PROGRESS_MADE → requeue (delay: 60s)
    ├── Poll 2: 450/500 done → PROGRESS_MADE → requeue (delay: 60s)
    ├── Poll 3: 500/500 done → ALL_PROCESSED → continue pipeline
Next pipeline step (ES indexing, etc.)

State Persistence in Polling

Polling state lives in the SQS message, not in the database. Each requeued polling message carries its own context:

def requeue_with_counts(
    message: dict,
    current_count: int,
    retry_count: int,
    delay: int,
) -> None:
    """Requeue polling message with updated state."""
    event_detail = parse_event_detail(message.get("eventDetail"))
    event_detail["previousCount"] = current_count
    event_detail["retryCount"] = retry_count

    message["eventDetail"] = json.dumps(event_detail)

    # Requeue to the same batch SQS queue with delay
    send_sqs_message(queue_url, message, delay_seconds=delay)

Why Message-Based State (Not Database)

  • No extra database writes — polling happens frequently; DB writes per poll add load
  • Naturally scoped — state dies with the message if the batch is cancelled
  • No cleanup needed — unlike checkpoint rows, no table to prune after completion
  • Concurrent safety — each polling message is independent, no shared mutable state

What get_incomplete_count() Queries

The polling function queries process_checkpoints to count documents that haven't reached PROCESS_COMPLETE:

def get_incomplete_count(session: Session, npcase_id: int, batch_id: int) -> int:
    """Count documents still being processed in this batch."""
    return (
        session.query(ProcessCheckpoints)
        .filter(
            ProcessCheckpoints.npcase_id == npcase_id,
            ProcessCheckpoints.batch_id == batch_id,
            ProcessCheckpoints.checkpoint_id < Checkpoints.PROCESS_COMPLETE.value,
        )
        .count()
    )

Interrupted Polling Recovery

If the polling Lambda times out or crashes, the SQS visibility timeout expires and the message is automatically redelivered. Because state is in the message, polling resumes with the last known counts — no recovery logic needed.

Key Rules

  1. Always set MAX_CHUNKS — safety limit to prevent infinite dispatch loops
  2. Use adaptive delays — small datasets poll faster, large datasets poll slower
  3. Reset retry count on progress — only count consecutive stalls
  4. Don't block foreverRETRIES_EXHAUSTED proceeds with partial results, doesn't hang
  5. Inline for small, chunked for large — threshold decision avoids dispatch overhead for small jobs
  6. Carry state in the messagepreviousCount, retryCount travel with the SQS message, not in a database
  7. Generic polling logic — works for both increasing (attachments → target) and decreasing (relations → 0) counts
  8. No database writes for polling state — avoids extra load and cleanup overhead
Ask the Architecture ×

Ask questions about Nextpoint architecture, patterns, rules, or any module. Powered by Claude Opus 4.6.