Skip to content

Concurrent Write Safety Patterns

Purpose

Prevent deadlocks, duplicate inserts, and race conditions when multiple Lambda functions write to the same database tables concurrently. These patterns work together with @retry_on_db_conflict and READ COMMITTED isolation.

Pattern 1: Pessimistic Locking with Ordered Keys

Problem: Two Lambdas locking rows A then B and B then A deadlock each other.

Solution: Always sort the IDs before locking. Consistent lock ordering eliminates deadlocks by construction.

# shell/custodian_ops.py

def create_custodian_exhibits(session, exhibit_custodian_pairs, user_id):
    """Bulk create junction records with deadlock-safe locking."""

    # 1. Sort IDs for consistent locking order — THIS PREVENTS DEADLOCKS
    custodian_ids_to_lock = sorted(list({pair[1] for pair in exhibit_custodian_pairs}))

    # 2. Acquire pessimistic locks in sorted order
    if custodian_ids_to_lock:
        acquire_lock_with_retry(session, Custodians, custodian_ids_to_lock, is_list_filter=True)

    # 3. Check which records already exist (idempotency)
    pair_filters = [
        and_(
            CustodianExhibits.exhibit_id == eid,
            CustodianExhibits.custodian_id == cid,
        )
        for eid, cid in exhibit_custodian_pairs
    ]
    existing_set = set(
        session.query(CustodianExhibits.exhibit_id, CustodianExhibits.custodian_id)
        .filter(or_(*pair_filters))
        .all()
    )

    # 4. Insert only new records
    new_records = [
        CustodianExhibits(custodian_id=cid, exhibit_id=eid, user_id=user_id, ...)
        for eid, cid in exhibit_custodian_pairs
        if (eid, cid) not in existing_set
    ]
    if new_records:
        session.bulk_save_objects(new_records)

Lock Acquisition with Deadlock Retry

Generic helper for acquiring row-level locks with automatic deadlock retry:

# shell/utils/db_helpers.py

DB_DEADLOCK_RETRY_ATTEMPTS = int(os.environ.get("DB_DEADLOCK_RETRY_ATTEMPTS", "5"))
DB_DEADLOCK_RETRY_DELAY = float(os.environ.get("DB_DEADLOCK_RETRY_DELAY", "0.2"))

def acquire_lock_with_retry(
    session, model, filter_value, field_name="id",
    is_list_filter=False, attempts=DB_DEADLOCK_RETRY_ATTEMPTS,
    delay=DB_DEADLOCK_RETRY_DELAY,
):
    """Acquire row-level lock with deadlock retry.

    Always queries with ORDER BY to ensure consistent lock ordering.
    """
    for attempt in range(attempts):
        try:
            query = session.query(model)
            if is_list_filter:
                query = query.filter(getattr(model, field_name).in_(filter_value))
            else:
                query = query.filter(getattr(model, field_name) == filter_value)

            # ORDER BY ensures consistent lock ordering across Lambdas
            query = query.order_by(getattr(model, field_name))

            return query.with_for_update().all()

        except OperationalError as e:
            if e.args[0] == 1213:  # MySQL deadlock
                log_message("warning",
                    f"Deadlock on {model.__name__}.{field_name}, "
                    f"attempt {attempt + 1}/{attempts}")
                session.rollback()
                time.sleep(delay)
            else:
                raise

    raise OperationalError(None, None,
        f"Failed to acquire lock after {attempts} attempts")

Pattern 2: SAVEPOINT Nested Transactions (Insert-or-Get)

Problem: Two Lambdas try to insert the same record simultaneously. One succeeds, the other gets IntegrityError. Without a SAVEPOINT, the IntegrityError poisons the entire transaction.

Solution: Use session.begin_nested() (SAVEPOINT) to isolate the insert. If it fails, only the SAVEPOINT rolls back — the outer transaction survives.

For single records:

# shell/taggings_ops.py

def create_taggings(session, exhibit_id, tag_id, user_id):
    """Create tagging with race condition safety via SAVEPOINT."""
    try:
        with session.begin_nested():  # SAVEPOINT
            tagging = Taggings(exhibit_id=exhibit_id, tag_id=tag_id, user_id=user_id)
            session.add(tagging)
    except IntegrityError:
        # Another Lambda inserted first — SAVEPOINT rolled back, outer txn safe
        log_message("info", f"Skipping duplicate tagging: exhibit={exhibit_id}, tag={tag_id}")

For insert-or-get pattern:

# shell/custodian_ops.py

def insert_or_get_custodian(session, name, import_attributes):
    """Get existing or create new custodian, safely handling races."""
    # Try to find existing first
    custodian = session.query(Custodians).filter_by(name=name).first()
    if custodian:
        return custodian.id

    # Attempt insert in SAVEPOINT
    # If another Lambda already inserted, @retry_on_db_conflict on the caller
    # will catch the IntegrityError, rollback, and retry — finding the record
    # on the next attempt.
    with session.begin_nested():
        new_custodian = Custodians(name=name, created_at_gmt=now_utc, ...)
        session.add(new_custodian)
        session.flush()  # Get the auto-increment ID
        return new_custodian.id

Pattern 3: INSERT IGNORE for Bulk Operations

Problem: Bulk inserting hundreds of rows where some may already exist. Individual SAVEPOINT per row is too slow.

Solution: MySQL INSERT IGNORE skips duplicate rows silently in a single statement — no round-trips, no exceptions.

# shell/taggings_ops.py

from sqlalchemy.dialects.mysql import insert

def create_np_taggings(session, tags, exhibit_ids, import_attributes):
    """Bulk create taggings, silently skipping duplicates."""
    user_id = int(import_attributes["user_id"])

    taggings_to_insert = [
        {"exhibit_id": eid, "tag_id": tid, "user_id": user_id}
        for eid in exhibit_ids
        for tid in tags
        if tid is not None
    ]

    if not taggings_to_insert:
        return

    # INSERT IGNORE — MySQL silently skips rows that violate unique constraints
    stmt = insert(Taggings).prefix_with("IGNORE").values(taggings_to_insert)
    session.execute(stmt)
    log_message("info", f"Bulk inserted/ignored {len(taggings_to_insert)} taggings")

When to use which:

Approach Use When Performance
SAVEPOINT (begin_nested) Single record, need the ID back 1 round-trip per record
INSERT IGNORE Bulk insert, don't need returned IDs 1 statement for all records
Pessimistic lock + check Need to prevent duplicates AND read existing 2 queries (lock + check)

Pattern 4: Column Length Truncation

Problem: External data may exceed column length limits, causing MySQL to silently truncate or error.

Solution: Reflect column length from the ORM model and truncate proactively:

# shell/utils/data_sanitizer.py

def truncate_string(model_class, column_name, value):
    """Truncate string to column's max length using SQLAlchemy reflection."""
    if not isinstance(value, str):
        return value

    try:
        column_property = getattr(model_class, column_name)
        max_length = getattr(column_property.type, "length", None)

        if max_length is not None and len(value) > max_length:
            log_message("warning",
                f"{model_class.__tablename__}.{column_name} truncated: "
                f"{len(value)}{max_length}")
            return value[:max_length]
    except AttributeError:
        pass

    return value

Pattern 5: Data Sanitization Pipeline

Problem: External data contains emoji (4-byte UTF-8), encoding errors, nil strings, and values that exceed TEXT column limits. MySQL's utf8mb3 encoding silently truncates at the first 4-byte character.

Solution: Multi-stage sanitization before any database write:

# shell/utils/document_properties.py

import re

# Match ALL characters above U+FFFF (entire supplementary Unicode plane)
# Previously only covered U+1F600-U+1F6FF — missed emoji like 🍩 (U+1F369)
# causing production data loss (see: NGE-2339)
_EMOJI_PATTERN = re.compile("[\U00010000-\U0010ffff]+", flags=re.UNICODE)

def remove_emojis(value: str) -> str:
    """Strip 4-byte characters that MySQL utf8mb3 can't store."""
    return _EMOJI_PATTERN.sub("", value)

def _normalize_encoding(value: str) -> str:
    """Round-trip through UTF-8 to replace invalid characters."""
    return value.encode("utf-8", errors="replace").decode("utf-8")

def _sanitize_value(value):
    """Full sanitization pipeline for a single value."""
    if isinstance(value, str):
        if value == "<nil>":
            return None                           # Rails nil string → Python None
        return remove_emojis(_normalize_encoding(value))  # Encoding → emoji strip
    return value

TEXT Column Overflow with Smart Key Dropping

When serialized properties exceed the 65,535-byte TEXT column limit, drop the largest entries first to preserve short, useful metadata:

_TEXT_COLUMN_MAX_BYTES = 65_535

def _truncate_to_text_column(properties: dict) -> str:
    """Drop largest entries first until data fits TEXT column."""
    json_str = json.dumps(properties, ensure_ascii=False)
    result = yaml.dump(json_str, ...)
    if len(result.encode("utf-8")) <= _TEXT_COLUMN_MAX_BYTES:
        return result

    # Sort entries by serialized size, largest first
    sized_entries = []
    for k, v in properties.items():
        entry_json = json.dumps({k: v}, ensure_ascii=False)
        sized_entries.append((len(entry_json.encode("utf-8")), k))
    sized_entries.sort(reverse=True)

    # Drop largest until we fit
    trimmed = dict(properties)
    for _, key in sized_entries:
        current = yaml.dump(json.dumps(trimmed, ...), ...)
        if len(current.encode("utf-8")) <= _TEXT_COLUMN_MAX_BYTES:
            break
        del trimmed[key]

    log_message("warning",
        f"document_properties exceeded TEXT limit. "
        f"Dropped {len(properties) - len(trimmed)}/{len(properties)} keys")

    SNS.publish_sns_message({"eventType": "WARNING", ...})
    return yaml.dump(json.dumps(trimmed, ...), ...)

Key Rules

  1. Sort IDs before locking — consistent ordering eliminates deadlocks by construction
  2. Use SAVEPOINTs for insert-or-get — isolates IntegrityError from outer transaction
  3. Use INSERT IGNORE for bulk — single statement, no per-row exceptions
  4. Reflect column lengths — don't hardcode max lengths, read from ORM model
  5. Strip ALL 4-byte Unicode, not just common emoji — the supplementary plane is wider than U+1F600-U+1F6FF
  6. Drop largest entries first on overflow — preserves short, useful metadata
  7. Log truncation and notify via SNS — data loss should be visible, not silent
Ask the Architecture ×

Ask questions about Nextpoint architecture, patterns, rules, or any module. Powered by Claude Opus 4.6.