Skip to content

Elasticsearch Bulk Indexing Pattern

Purpose

Efficiently index and delete documents in Elasticsearch with batch optimization, retry logic, ALB timeout recovery, and partial failure handling.

Bulk vs Individual Threshold

Use bulk operations only when the item count exceeds a threshold. Most documents have few pages, so individual indexing avoids bulk overhead for small batches.

BULK_THRESHOLD = Config.ES_BULK_THRESHOLD     # Default: 3
BULK_BATCH_SIZE = Config.ES_BULK_BATCH_SIZE   # Default: 50
ES_RETRY_ATTEMPTS = Config.ES_RETRY_ATTEMPTS  # Default: 3

def es_index_attachments(session, npcase_id, attachment_ids):
    """Switch between bulk and individual based on count."""
    if Config.is_dev():
        return  # Skip ES indexing in development

    index_name = get_index_name_from_alias(get_alias(npcase_id))
    if not index_name:
        SNS.send_index_not_found_message()
        return

    if len(attachment_ids) > BULK_THRESHOLD:
        _bulk_index_attachments(session, attachment_ids, index_name)
    else:
        for attachment_id in attachment_ids:
            _create_es_attachment(session, attachment_id, index_name)

Generic Batch Processor

A reusable pattern for any bulk ES operation (index, delete, update):

def _process_in_batches(items, batch_size, action_builder, operation_name):
    """Common bulk processing pattern for ES operations.

    Args:
        items: List of IDs to process
        batch_size: How many items per bulk request (default 50)
        action_builder: Function that builds an ES action from an item ID.
                        Returns (metadata_dict, source_dict) tuple or None to skip.
        operation_name: Human-readable name for logging ("indexed", "deleted")
    """
    total_processed = 0
    batch_count = 0

    for i in range(0, len(items), batch_size):
        batch_items = items[i : i + batch_size]
        actions = []

        for item in batch_items:
            action = action_builder(item)
            if action:
                if isinstance(action, tuple):
                    actions.append(action[0])       # metadata (index/delete directive)
                    if action[1]:                    # source document (None for deletes)
                        actions.append(action[1])
                else:
                    actions.append(action)

        if actions:
            try:
                response = _retry_es_operation(lambda: es.bulk(body=actions))
            except ElasticsearchException as e:
                if getattr(e, "status_code", None) == 504:
                    # ALB timeout — verify documents landed before failing
                    if _verify_bulk_indexed(actions):
                        total_processed += len(batch_items)
                        batch_count += 1
                        continue
                raise

            # Inspect partial failures in bulk response
            if response and response.get("errors"):
                error_details = []
                for item in response.get("items", []):
                    for op, details in item.items():
                        if details.get("status", 200) >= 400:
                            error_details.append(
                                f"{op} {details.get('_id')}: "
                                f"{details.get('error', {}).get('reason')}"
                            )
                if error_details:
                    _handle_es_error(
                        f"Bulk {operation_name} partial failures",
                        {"errors": error_details[:5]}  # Limit to first 5
                    )

            total_processed += len(batch_items)
            batch_count += 1

    log_message("info",
        f"Bulk {operation_name} {total_processed}/{len(items)} items in {batch_count} batches"
    )
    return True

504 ALB Timeout Recovery

This is a critical production pattern. When Elasticsearch sits behind an ALB, the ALB may timeout (504) even though ES completed the write. Always verify before reporting failure.

Single Document

def create_document(index_name, document_id, document, routing=None):
    try:
        _retry_es_operation(
            lambda: es.index(index=index_name, id=document_id, body=document, routing=routing)
        )
        return True
    except ElasticsearchException as e:
        if getattr(e, "status_code", None) == 504:
            # ALB killed the response, but ES may have written the doc
            try:
                if es.exists(index=index_name, id=document_id, routing=routing):
                    log_message("info",
                        f"Document {document_id} confirmed after 504 — indexing succeeded"
                    )
                    return True
            except ElasticsearchException:
                pass  # exists() check failed too — fall through to error
        _handle_es_error(f"Could not index {document_id}", {"error": str(e)})
        return False

Bulk Operations

def _verify_bulk_indexed(actions):
    """After a 504 on bulk, verify every document in the batch exists."""
    try:
        for action in actions:
            if "index" in action:
                meta = action["index"]
                if not es.exists(
                    index=meta["_index"], id=meta["_id"], routing=meta.get("routing")
                ):
                    return False  # At least one doc missing — bulk failed
        return True  # All docs confirmed
    except ElasticsearchException:
        return False  # Can't verify — assume failure

Retry with Exponential Backoff

def _retry_es_operation(operation, max_retries=3, backoff_factor=2):
    """Retry on ES timeout with exponential backoff."""
    for attempt in range(max_retries):
        try:
            return operation()
        except ElasticsearchException as e:
            if "timeout" in str(e).lower() and attempt < max_retries - 1:
                wait_time = backoff_factor ** attempt  # 1s, 2s, 4s
                log_message("warning",
                    f"ES timeout, retrying in {wait_time}s ({attempt + 1}/{max_retries})"
                )
                time.sleep(wait_time)
                continue
            raise

Index Alias Pattern

Use aliases for zero-downtime reindexing:

def get_alias(npcase_id):
    """Environment-specific alias: {env}_{npcase_id}_exhibits"""
    return f"{Config.ENV}_{npcase_id}_exhibits"

def get_index_name_from_alias(alias_name):
    """Resolve alias to concrete index name."""
    try:
        response = es.indices.get_alias(name=alias_name)
        return list(response.keys())[0]
    except ElasticsearchException as e:
        if e.status_code == 404:
            return None  # Alias doesn't exist
        raise

YAML Mapping Loading

ES mappings stored as YAML files alongside the ES ops module:

MAPPING_TYPES = {"Exhibit": "exhibit", "DepositionVolume": "deposition_volume"}

def mapping_for_index_type(index_type):
    """Load YAML mapping file for index type."""
    file_path = os.path.join(current_dir, f"{MAPPING_TYPES.get(index_type, 'exhibit')}.yml")
    with open(file_path, "r") as f:
        return yaml.safe_load(f)

Error Reporting via SNS

ES errors are published as WARNING events for monitoring:

def _handle_es_error(error_message, context=None):
    log_message("error", f"Elasticsearch Error: {error_message}")
    SNS.publish_sns_message({
        "eventType": "WARNING",
        "eventDetail": {
            "activity": "adding text to index",
            "message": error_message,
            **(context or {}),
        },
    })

Configuration Tuning

Parameter Default Description
ES_TIMEOUT 180s Client request timeout
ES_BULK_THRESHOLD 3 Min items before using bulk API
ES_BULK_BATCH_SIZE 50 Items per bulk request
ES_RETRY_ATTEMPTS 3 Max retries on timeout

All configurable via environment variables for per-module tuning.

Key Rules

  1. Always check for 504 before failing — ALB timeouts don't mean the write failed
  2. Bulk only above threshold — avoid bulk overhead for small batches
  3. Inspect partial failures — bulk responses can have per-item errors
  4. Skip ES in devConfig.is_dev() guard on all public functions
  5. Report errors via SNS — monitoring systems consume WARNING events
  6. Use aliases, not concrete index names — enables zero-downtime reindexing
Ask the Architecture ×

Ask questions about Nextpoint architecture, patterns, rules, or any module. Powered by Claude Opus 4.6.