Concurrent Write Safety Patterns¶
Purpose¶
Prevent deadlocks, duplicate inserts, and race conditions when multiple Lambda
functions write to the same database tables concurrently. These patterns work
together with @retry_on_db_conflict and READ COMMITTED isolation.
Pattern 1: Pessimistic Locking with Ordered Keys¶
Problem: Two Lambdas locking rows A then B and B then A deadlock each other.
Solution: Always sort the IDs before locking. Consistent lock ordering eliminates deadlocks by construction.
# shell/custodian_ops.py
def create_custodian_exhibits(session, exhibit_custodian_pairs, user_id):
"""Bulk create junction records with deadlock-safe locking."""
# 1. Sort IDs for consistent locking order — THIS PREVENTS DEADLOCKS
custodian_ids_to_lock = sorted(list({pair[1] for pair in exhibit_custodian_pairs}))
# 2. Acquire pessimistic locks in sorted order
if custodian_ids_to_lock:
acquire_lock_with_retry(session, Custodians, custodian_ids_to_lock, is_list_filter=True)
# 3. Check which records already exist (idempotency)
pair_filters = [
and_(
CustodianExhibits.exhibit_id == eid,
CustodianExhibits.custodian_id == cid,
)
for eid, cid in exhibit_custodian_pairs
]
existing_set = set(
session.query(CustodianExhibits.exhibit_id, CustodianExhibits.custodian_id)
.filter(or_(*pair_filters))
.all()
)
# 4. Insert only new records
new_records = [
CustodianExhibits(custodian_id=cid, exhibit_id=eid, user_id=user_id, ...)
for eid, cid in exhibit_custodian_pairs
if (eid, cid) not in existing_set
]
if new_records:
session.bulk_save_objects(new_records)
Lock Acquisition with Deadlock Retry¶
Generic helper for acquiring row-level locks with automatic deadlock retry:
# shell/utils/db_helpers.py
DB_DEADLOCK_RETRY_ATTEMPTS = int(os.environ.get("DB_DEADLOCK_RETRY_ATTEMPTS", "5"))
DB_DEADLOCK_RETRY_DELAY = float(os.environ.get("DB_DEADLOCK_RETRY_DELAY", "0.2"))
def acquire_lock_with_retry(
session, model, filter_value, field_name="id",
is_list_filter=False, attempts=DB_DEADLOCK_RETRY_ATTEMPTS,
delay=DB_DEADLOCK_RETRY_DELAY,
):
"""Acquire row-level lock with deadlock retry.
Always queries with ORDER BY to ensure consistent lock ordering.
"""
for attempt in range(attempts):
try:
query = session.query(model)
if is_list_filter:
query = query.filter(getattr(model, field_name).in_(filter_value))
else:
query = query.filter(getattr(model, field_name) == filter_value)
# ORDER BY ensures consistent lock ordering across Lambdas
query = query.order_by(getattr(model, field_name))
return query.with_for_update().all()
except OperationalError as e:
if e.args[0] == 1213: # MySQL deadlock
log_message("warning",
f"Deadlock on {model.__name__}.{field_name}, "
f"attempt {attempt + 1}/{attempts}")
session.rollback()
time.sleep(delay)
else:
raise
raise OperationalError(None, None,
f"Failed to acquire lock after {attempts} attempts")
Pattern 2: SAVEPOINT Nested Transactions (Insert-or-Get)¶
Problem: Two Lambdas try to insert the same record simultaneously.
One succeeds, the other gets IntegrityError. Without a SAVEPOINT, the
IntegrityError poisons the entire transaction.
Solution: Use session.begin_nested() (SAVEPOINT) to isolate the insert.
If it fails, only the SAVEPOINT rolls back — the outer transaction survives.
For single records:¶
# shell/taggings_ops.py
def create_taggings(session, exhibit_id, tag_id, user_id):
"""Create tagging with race condition safety via SAVEPOINT."""
try:
with session.begin_nested(): # SAVEPOINT
tagging = Taggings(exhibit_id=exhibit_id, tag_id=tag_id, user_id=user_id)
session.add(tagging)
except IntegrityError:
# Another Lambda inserted first — SAVEPOINT rolled back, outer txn safe
log_message("info", f"Skipping duplicate tagging: exhibit={exhibit_id}, tag={tag_id}")
For insert-or-get pattern:¶
# shell/custodian_ops.py
def insert_or_get_custodian(session, name, import_attributes):
"""Get existing or create new custodian, safely handling races."""
# Try to find existing first
custodian = session.query(Custodians).filter_by(name=name).first()
if custodian:
return custodian.id
# Attempt insert in SAVEPOINT
# If another Lambda already inserted, @retry_on_db_conflict on the caller
# will catch the IntegrityError, rollback, and retry — finding the record
# on the next attempt.
with session.begin_nested():
new_custodian = Custodians(name=name, created_at_gmt=now_utc, ...)
session.add(new_custodian)
session.flush() # Get the auto-increment ID
return new_custodian.id
Pattern 3: INSERT IGNORE for Bulk Operations¶
Problem: Bulk inserting hundreds of rows where some may already exist. Individual SAVEPOINT per row is too slow.
Solution: MySQL INSERT IGNORE skips duplicate rows silently in a
single statement — no round-trips, no exceptions.
# shell/taggings_ops.py
from sqlalchemy.dialects.mysql import insert
def create_np_taggings(session, tags, exhibit_ids, import_attributes):
"""Bulk create taggings, silently skipping duplicates."""
user_id = int(import_attributes["user_id"])
taggings_to_insert = [
{"exhibit_id": eid, "tag_id": tid, "user_id": user_id}
for eid in exhibit_ids
for tid in tags
if tid is not None
]
if not taggings_to_insert:
return
# INSERT IGNORE — MySQL silently skips rows that violate unique constraints
stmt = insert(Taggings).prefix_with("IGNORE").values(taggings_to_insert)
session.execute(stmt)
log_message("info", f"Bulk inserted/ignored {len(taggings_to_insert)} taggings")
When to use which:
| Approach | Use When | Performance |
|---|---|---|
SAVEPOINT (begin_nested) |
Single record, need the ID back | 1 round-trip per record |
| INSERT IGNORE | Bulk insert, don't need returned IDs | 1 statement for all records |
| Pessimistic lock + check | Need to prevent duplicates AND read existing | 2 queries (lock + check) |
Pattern 4: Column Length Truncation¶
Problem: External data may exceed column length limits, causing MySQL to silently truncate or error.
Solution: Reflect column length from the ORM model and truncate proactively:
# shell/utils/data_sanitizer.py
def truncate_string(model_class, column_name, value):
"""Truncate string to column's max length using SQLAlchemy reflection."""
if not isinstance(value, str):
return value
try:
column_property = getattr(model_class, column_name)
max_length = getattr(column_property.type, "length", None)
if max_length is not None and len(value) > max_length:
log_message("warning",
f"{model_class.__tablename__}.{column_name} truncated: "
f"{len(value)} → {max_length}")
return value[:max_length]
except AttributeError:
pass
return value
Pattern 5: Data Sanitization Pipeline¶
Problem: External data contains emoji (4-byte UTF-8), encoding errors,
nil strings, and values that exceed TEXT column limits. MySQL's utf8mb3
encoding silently truncates at the first 4-byte character.
Solution: Multi-stage sanitization before any database write:
# shell/utils/document_properties.py
import re
# Match ALL characters above U+FFFF (entire supplementary Unicode plane)
# Previously only covered U+1F600-U+1F6FF — missed emoji like 🍩 (U+1F369)
# causing production data loss (see: NGE-2339)
_EMOJI_PATTERN = re.compile("[\U00010000-\U0010ffff]+", flags=re.UNICODE)
def remove_emojis(value: str) -> str:
"""Strip 4-byte characters that MySQL utf8mb3 can't store."""
return _EMOJI_PATTERN.sub("", value)
def _normalize_encoding(value: str) -> str:
"""Round-trip through UTF-8 to replace invalid characters."""
return value.encode("utf-8", errors="replace").decode("utf-8")
def _sanitize_value(value):
"""Full sanitization pipeline for a single value."""
if isinstance(value, str):
if value == "<nil>":
return None # Rails nil string → Python None
return remove_emojis(_normalize_encoding(value)) # Encoding → emoji strip
return value
TEXT Column Overflow with Smart Key Dropping¶
When serialized properties exceed the 65,535-byte TEXT column limit, drop the largest entries first to preserve short, useful metadata:
_TEXT_COLUMN_MAX_BYTES = 65_535
def _truncate_to_text_column(properties: dict) -> str:
"""Drop largest entries first until data fits TEXT column."""
json_str = json.dumps(properties, ensure_ascii=False)
result = yaml.dump(json_str, ...)
if len(result.encode("utf-8")) <= _TEXT_COLUMN_MAX_BYTES:
return result
# Sort entries by serialized size, largest first
sized_entries = []
for k, v in properties.items():
entry_json = json.dumps({k: v}, ensure_ascii=False)
sized_entries.append((len(entry_json.encode("utf-8")), k))
sized_entries.sort(reverse=True)
# Drop largest until we fit
trimmed = dict(properties)
for _, key in sized_entries:
current = yaml.dump(json.dumps(trimmed, ...), ...)
if len(current.encode("utf-8")) <= _TEXT_COLUMN_MAX_BYTES:
break
del trimmed[key]
log_message("warning",
f"document_properties exceeded TEXT limit. "
f"Dropped {len(properties) - len(trimmed)}/{len(properties)} keys")
SNS.publish_sns_message({"eventType": "WARNING", ...})
return yaml.dump(json.dumps(trimmed, ...), ...)
Key Rules¶
- Sort IDs before locking — consistent ordering eliminates deadlocks by construction
- Use SAVEPOINTs for insert-or-get — isolates
IntegrityErrorfrom outer transaction - Use INSERT IGNORE for bulk — single statement, no per-row exceptions
- Reflect column lengths — don't hardcode max lengths, read from ORM model
- Strip ALL 4-byte Unicode, not just common emoji — the supplementary plane is wider than U+1F600-U+1F6FF
- Drop largest entries first on overflow — preserves short, useful metadata
- Log truncation and notify via SNS — data loss should be visible, not silent
Ask questions about Nextpoint architecture, patterns, rules, or any module. Powered by Claude Opus 4.6.