Elasticsearch Bulk Indexing Pattern¶
Purpose¶
Efficiently index and delete documents in Elasticsearch with batch optimization, retry logic, ALB timeout recovery, and partial failure handling.
Bulk vs Individual Threshold¶
Use bulk operations only when the item count exceeds a threshold. Most documents have few pages, so individual indexing avoids bulk overhead for small batches.
BULK_THRESHOLD = Config.ES_BULK_THRESHOLD # Default: 3
BULK_BATCH_SIZE = Config.ES_BULK_BATCH_SIZE # Default: 50
ES_RETRY_ATTEMPTS = Config.ES_RETRY_ATTEMPTS # Default: 3
def es_index_attachments(session, npcase_id, attachment_ids):
"""Switch between bulk and individual based on count."""
if Config.is_dev():
return # Skip ES indexing in development
index_name = get_index_name_from_alias(get_alias(npcase_id))
if not index_name:
SNS.send_index_not_found_message()
return
if len(attachment_ids) > BULK_THRESHOLD:
_bulk_index_attachments(session, attachment_ids, index_name)
else:
for attachment_id in attachment_ids:
_create_es_attachment(session, attachment_id, index_name)
Generic Batch Processor¶
A reusable pattern for any bulk ES operation (index, delete, update):
def _process_in_batches(items, batch_size, action_builder, operation_name):
"""Common bulk processing pattern for ES operations.
Args:
items: List of IDs to process
batch_size: How many items per bulk request (default 50)
action_builder: Function that builds an ES action from an item ID.
Returns (metadata_dict, source_dict) tuple or None to skip.
operation_name: Human-readable name for logging ("indexed", "deleted")
"""
total_processed = 0
batch_count = 0
for i in range(0, len(items), batch_size):
batch_items = items[i : i + batch_size]
actions = []
for item in batch_items:
action = action_builder(item)
if action:
if isinstance(action, tuple):
actions.append(action[0]) # metadata (index/delete directive)
if action[1]: # source document (None for deletes)
actions.append(action[1])
else:
actions.append(action)
if actions:
try:
response = _retry_es_operation(lambda: es.bulk(body=actions))
except ElasticsearchException as e:
if getattr(e, "status_code", None) == 504:
# ALB timeout — verify documents landed before failing
if _verify_bulk_indexed(actions):
total_processed += len(batch_items)
batch_count += 1
continue
raise
# Inspect partial failures in bulk response
if response and response.get("errors"):
error_details = []
for item in response.get("items", []):
for op, details in item.items():
if details.get("status", 200) >= 400:
error_details.append(
f"{op} {details.get('_id')}: "
f"{details.get('error', {}).get('reason')}"
)
if error_details:
_handle_es_error(
f"Bulk {operation_name} partial failures",
{"errors": error_details[:5]} # Limit to first 5
)
total_processed += len(batch_items)
batch_count += 1
log_message("info",
f"Bulk {operation_name} {total_processed}/{len(items)} items in {batch_count} batches"
)
return True
504 ALB Timeout Recovery¶
This is a critical production pattern. When Elasticsearch sits behind an ALB, the ALB may timeout (504) even though ES completed the write. Always verify before reporting failure.
Single Document¶
def create_document(index_name, document_id, document, routing=None):
try:
_retry_es_operation(
lambda: es.index(index=index_name, id=document_id, body=document, routing=routing)
)
return True
except ElasticsearchException as e:
if getattr(e, "status_code", None) == 504:
# ALB killed the response, but ES may have written the doc
try:
if es.exists(index=index_name, id=document_id, routing=routing):
log_message("info",
f"Document {document_id} confirmed after 504 — indexing succeeded"
)
return True
except ElasticsearchException:
pass # exists() check failed too — fall through to error
_handle_es_error(f"Could not index {document_id}", {"error": str(e)})
return False
Bulk Operations¶
def _verify_bulk_indexed(actions):
"""After a 504 on bulk, verify every document in the batch exists."""
try:
for action in actions:
if "index" in action:
meta = action["index"]
if not es.exists(
index=meta["_index"], id=meta["_id"], routing=meta.get("routing")
):
return False # At least one doc missing — bulk failed
return True # All docs confirmed
except ElasticsearchException:
return False # Can't verify — assume failure
Retry with Exponential Backoff¶
def _retry_es_operation(operation, max_retries=3, backoff_factor=2):
"""Retry on ES timeout with exponential backoff."""
for attempt in range(max_retries):
try:
return operation()
except ElasticsearchException as e:
if "timeout" in str(e).lower() and attempt < max_retries - 1:
wait_time = backoff_factor ** attempt # 1s, 2s, 4s
log_message("warning",
f"ES timeout, retrying in {wait_time}s ({attempt + 1}/{max_retries})"
)
time.sleep(wait_time)
continue
raise
Index Alias Pattern¶
Use aliases for zero-downtime reindexing:
def get_alias(npcase_id):
"""Environment-specific alias: {env}_{npcase_id}_exhibits"""
return f"{Config.ENV}_{npcase_id}_exhibits"
def get_index_name_from_alias(alias_name):
"""Resolve alias to concrete index name."""
try:
response = es.indices.get_alias(name=alias_name)
return list(response.keys())[0]
except ElasticsearchException as e:
if e.status_code == 404:
return None # Alias doesn't exist
raise
YAML Mapping Loading¶
ES mappings stored as YAML files alongside the ES ops module:
MAPPING_TYPES = {"Exhibit": "exhibit", "DepositionVolume": "deposition_volume"}
def mapping_for_index_type(index_type):
"""Load YAML mapping file for index type."""
file_path = os.path.join(current_dir, f"{MAPPING_TYPES.get(index_type, 'exhibit')}.yml")
with open(file_path, "r") as f:
return yaml.safe_load(f)
Error Reporting via SNS¶
ES errors are published as WARNING events for monitoring:
def _handle_es_error(error_message, context=None):
log_message("error", f"Elasticsearch Error: {error_message}")
SNS.publish_sns_message({
"eventType": "WARNING",
"eventDetail": {
"activity": "adding text to index",
"message": error_message,
**(context or {}),
},
})
Configuration Tuning¶
| Parameter | Default | Description |
|---|---|---|
ES_TIMEOUT |
180s | Client request timeout |
ES_BULK_THRESHOLD |
3 | Min items before using bulk API |
ES_BULK_BATCH_SIZE |
50 | Items per bulk request |
ES_RETRY_ATTEMPTS |
3 | Max retries on timeout |
All configurable via environment variables for per-module tuning.
Key Rules¶
- Always check for 504 before failing — ALB timeouts don't mean the write failed
- Bulk only above threshold — avoid bulk overhead for small batches
- Inspect partial failures — bulk responses can have per-item errors
- Skip ES in dev —
Config.is_dev()guard on all public functions - Report errors via SNS — monitoring systems consume WARNING events
- Use aliases, not concrete index names — enables zero-downtime reindexing
Ask questions about Nextpoint architecture, patterns, rules, or any module. Powered by Claude Opus 4.6.