Database Session Management Pattern¶
Purpose¶
Provide consistent, safe database access with read/write separation, multi-tenant routing, connection lifecycle management, and deadlock reduction.
Session Context Managers¶
# shell/db/database.py
from contextlib import contextmanager
from typing import Generator, Optional
from sqlalchemy.orm import Session
@contextmanager
def writer_session(npcase_id: Optional[str] = None) -> Generator[Session, None, None]:
"""Write session targeting case-specific database."""
case_id = npcase_id or get_npcase()
if not case_id:
raise ValueError("npcase_id must be provided for a case session.")
db_name = f"{RDS_DBNAME}_case_{case_id}"
with _session_manager(db_name, is_reader=False) as session:
yield session
@contextmanager
def reader_session(npcase_id: Optional[str] = None) -> Generator[Session, None, None]:
"""Read-only session targeting case-specific database (read replica)."""
case_id = npcase_id or get_npcase()
if not case_id:
raise ValueError("npcase_id must be provided for a case session.")
db_name = f"{RDS_DBNAME}_case_{case_id}"
with _session_manager(db_name, is_reader=True) as session:
yield session
@contextmanager
def core_writer_session() -> Generator[Session, None, None]:
"""Write session targeting shared core database."""
with _session_manager(RDS_DBNAME, is_reader=False) as session:
yield session
@contextmanager
def core_reader_session() -> Generator[Session, None, None]:
"""Read-only session targeting shared core database (read replica)."""
with _session_manager(RDS_DBNAME, is_reader=True) as session:
yield session
Session Lifecycle¶
Exception handling is nuanced — different exception types get different log levels and behaviors:
@contextmanager
def _session_manager(db_name: str, is_reader: bool) -> Generator[Session, None, None]:
"""Manage session lifecycle: create, yield, commit/rollback, cleanup."""
SessionFactory = _get_or_create_session_factory(db_name, is_reader=is_reader)
session = SessionFactory()
try:
yield session
if not is_reader:
session.commit()
except RecoverableException as e:
# Expected control-flow exception — debug level, not warning
log_message("debug", f"Rolled back due to planned requeue: {e}")
session.rollback()
raise
except Exception as e:
# Deadlocks: debug level — retry decorator handles them
if (
isinstance(e, OperationalError)
and hasattr(e.orig, "args")
and e.orig.args[0] in (1213, 1205)
):
error_type = "deadlock" if e.orig.args[0] == 1213 else "lock wait timeout"
log_message("debug", f"Database {error_type} — will be retried by decorator")
# PermanentFailure and SilentSuccess: caller logs, skip duplicate
elif isinstance(e, (PermanentFailureException, SilentSuccessException)):
pass
# Everything else: warning with stack trace
else:
log_message("warning", f"Transaction failed: {type(e).__name__}: {e}", exc_info=True)
session.rollback()
raise
finally:
SessionFactory.remove()
Why This Log Level Strategy Matters¶
- Deadlocks at debug — they happen constantly under concurrent writes. Logging
at warning would flood CloudWatch. The
@retry_on_db_conflictdecorator handles them silently; only if retries are exhausted does it become an error. - RecoverableException at debug — it's expected control flow (e.g., "another Lambda is processing this document"), not an error condition.
- PermanentFailure/SilentSuccess skip logging — the handler already logs these with proper context. Double-logging creates noise.
Engine Configuration¶
READ COMMITTED Isolation (Deadlock Reduction)¶
This is the single most important configuration for reducing deadlocks.
MySQL/InnoDB defaults to REPEATABLE READ, which holds row locks for the
entire transaction. Under concurrent Lambda writes to the same case database,
this causes frequent deadlocks.
READ COMMITTED releases row locks after each statement, dramatically reducing
lock contention:
engine_kwargs = {
"pool_pre_ping": True, # Validate connection before use
"pool_recycle": 1800, # Recycle after 30 minutes
"pool_size": 5, # Base pool size
"max_overflow": 5, # Max overflow connections
"pool_timeout": 60, # Wait timeout for connection
}
# CRITICAL: READ COMMITTED only for writers — reduces deadlocks
if not is_reader:
engine_kwargs["isolation_level"] = "READ COMMITTED"
Why only for writers? Reader sessions don't hold locks, so isolation level doesn't affect deadlock frequency. Leaving readers at the default avoids unexpected behavior changes for read-only queries.
Character Set¶
url_object = URL.create(
drivername="mysql+pymysql",
username=Config.RDS_USERNAME(),
password=Config.RDS_PASSWORD(),
host=endpoint,
database=db_name,
query={"charset": "utf8mb3"}, # MySQL 3-byte UTF-8
)
Why utf8mb3? The existing Rails database uses MySQL's 3-byte UTF-8 encoding.
Using utf8mb4 (4-byte) would cause index size mismatches on existing tables.
New modules should match the existing charset.
Thread-Safe Engine Cache with LRU Eviction¶
The engine cache must be thread-safe because Lambda may reuse the same runtime across invocations:
from threading import Lock
_engine_cache_lock = Lock()
_engines_cache: dict[str, ScopedSession] = {}
_cache_access_times: dict[str, float] = {} # LRU tracking
MAX_CACHE_SIZE = 50
def _get_or_create_session_factory(db_name: str, is_reader: bool) -> ScopedSession:
endpoint = _get_proxy_endpoint(is_reader)
cache_key = f"{db_name}_{'reader' if is_reader else 'writer'}"
with _engine_cache_lock:
if cache_key not in _engines_cache:
_cleanup_cache() # LRU evict if at capacity
url_object = URL.create(
drivername="mysql+pymysql",
username=Config.RDS_USERNAME(),
password=Config.RDS_PASSWORD(),
host=endpoint,
database=db_name,
query={"charset": "utf8mb3"},
)
engine_kwargs = {
"pool_pre_ping": True,
"pool_recycle": 1800,
"pool_size": 5,
"max_overflow": 5,
"pool_timeout": 60,
}
if not is_reader:
engine_kwargs["isolation_level"] = "READ COMMITTED"
engine = create_engine(url_object, **engine_kwargs)
_engines_cache[cache_key] = scoped_session(
sessionmaker(bind=engine, class_=Session)
)
_cache_access_times[cache_key] = time.time()
return _engines_cache[cache_key]
LRU Eviction with Proper Disposal¶
When the cache is full, the least recently used engine is properly cleaned up:
def _cleanup_cache() -> None:
"""Evict LRU engine when cache exceeds MAX_CACHE_SIZE."""
if len(_engines_cache) <= MAX_CACHE_SIZE:
return
oldest_key = min(_cache_access_times, key=_cache_access_times.get)
log_message("info", f"Cache limit reached. Evicting: {oldest_key}")
if oldest_key in _engines_cache:
try:
scoped = _engines_cache[oldest_key]
# Clear current thread's session if it exists
if scoped.registry.has():
scoped.remove()
# Dispose the underlying engine (closes all pooled connections)
scoped.bind.dispose()
except Exception as e:
log_message("warning", f"Error disposing engine for {oldest_key}: {e}")
del _engines_cache[oldest_key]
if oldest_key in _cache_access_times:
del _cache_access_times[oldest_key]
Why proper disposal matters: Without engine.dispose(), evicted engines
leave orphaned connections in the pool. In Lambda processing many cases,
this leads to connection exhaustion against RDS Proxy.
Environment Validation at Import Time¶
Database config is validated immediately on import — fail fast if misconfigured:
WRITER_PROXY_ENDPOINT = os.getenv("WRITER_PROXY_ENDPOINT")
READER_PROXY_ENDPOINT = os.getenv("READER_PROXY_ENDPOINT")
RDS_DBNAME = os.getenv("RDS_DBNAME")
def _validate_environment() -> None:
if not all([
WRITER_PROXY_ENDPOINT,
READER_PROXY_ENDPOINT,
RDS_DBNAME,
Config.RDS_USERNAME(),
Config.RDS_PASSWORD(),
]):
raise EnvironmentError(
"One or more required database environment variables are missing."
)
_validate_environment() # Runs at import time
Usage in core/¶
Core code receives sessions as parameters — never creates them:
# core/process.py — CORRECT
def create_exhibit_data(session: Session, import_attributes: dict) -> dict:
"""Pure business logic using passed-in session."""
exhibit = Exhibits(**import_attributes)
session.add(exhibit)
session.flush()
return {"exhibit_id": exhibit.id}
# core/process.py — WRONG (never do this)
def create_exhibit_data(import_attributes: dict) -> dict:
with writer_session() as session: # core/ should NOT create sessions
...
Key Rules¶
- READ COMMITTED for writers — reduces deadlocks by releasing row locks per-statement
- Always use context managers — never create sessions manually
- Writer sessions auto-commit — don't call
session.commit()inside thewithblock - Reader sessions never commit — they're read-only
- Pass sessions to core/ — core never imports from shell/db
- Thread-safe cache with Lock — Lambda runtime may be shared across invocations
- LRU eviction with engine disposal — prevents connection exhaustion
- utf8mb3 charset — matches existing Rails database encoding
- Validate at import time — fail fast if database config is missing
- Deadlocks logged at debug — retry decorator handles them, warning would flood logs
- PermanentFailure/SilentSuccess skip session logging — caller handles it, avoid duplicates
Ask questions about Nextpoint architecture, patterns, rules, or any module. Powered by Claude Opus 4.6.