From 617c76cb54e4532ed7b1bce95357022ee43239e1 Mon Sep 17 00:00:00 2001 From: EiSiMo Date: Tue, 21 Apr 2026 19:01:27 +0200 Subject: [PATCH] db: thread-local SQLite connections + busy_timeout MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The module-level _conn was being hit from FastAPI handlers, the retention daemon thread, and asyncio.to_thread workers simultaneously. Sharing a single sqlite3.Connection across threads is unsafe (cursors collide) even with check_same_thread=False and WAL. The writer _lock didn't cover readers, so a reader cursor could race a writer mid-statement. Switch to threading.local(): each thread gets its own Connection via _get_conn(). WAL handles concurrent readers/writer at the DB level; busy_timeout=5000 absorbs short-lived "database is locked" when two threads both try to BEGIN IMMEDIATE. The write-serialising _lock stays — it keeps multi-statement writer blocks atomic and avoids busy-loop on concurrent writers. External access via db._conn replaced with a new db.has_running_application helper (the only caller outside db.py). Co-Authored-By: Claude Opus 4.7 (1M context) --- web/app.py | 6 +-- web/db.py | 154 ++++++++++++++++++++++++++++++----------------------- 2 files changed, 88 insertions(+), 72 deletions(-) diff --git a/web/app.py b/web/app.py index 6a59651..750be75 100644 --- a/web/app.py +++ b/web/app.py @@ -274,11 +274,7 @@ def _filter_summary(f) -> str: def _has_running_application(user_id: int) -> bool: - row = db._conn.execute( - "SELECT 1 FROM applications WHERE user_id = ? AND finished_at IS NULL LIMIT 1", - (user_id,), - ).fetchone() - return row is not None + return db.has_running_application(user_id) def _finish_apply_background(app_id: int, user_id: int, flat_id: str, url: str, diff --git a/web/db.py b/web/db.py index d18c6fe..a926d4c 100644 --- a/web/db.py +++ b/web/db.py @@ -16,7 +16,13 @@ from settings import DB_PATH, RETENTION_DAYS logger = logging.getLogger("web.db") +# WAL mode permits any number of concurrent readers and one writer at a time, +# but a single sqlite3.Connection object is not safe to share across threads +# (cursors can collide). We keep one connection per thread via threading.local +# and a module-level write lock so concurrent writers don't trip +# "database is locked" during BEGIN IMMEDIATE. _lock = threading.Lock() +_local = threading.local() def _connect() -> sqlite3.Connection: @@ -24,10 +30,16 @@ def _connect() -> sqlite3.Connection: conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA foreign_keys=ON") + conn.execute("PRAGMA busy_timeout=5000") return conn -_conn: sqlite3.Connection = _connect() +def _get_conn() -> sqlite3.Connection: + c = getattr(_local, "conn", None) + if c is None: + c = _connect() + _local.conn = c + return c # --------------------------------------------------------------------------- @@ -235,7 +247,7 @@ MIGRATIONS: list[str] = [ def _current_version() -> int: try: - row = _conn.execute("SELECT COALESCE(MAX(version), 0) AS v FROM schema_version").fetchone() + row = _get_conn().execute("SELECT COALESCE(MAX(version), 0) AS v FROM schema_version").fetchone() return int(row["v"]) if row else 0 except sqlite3.Error: return 0 @@ -243,15 +255,15 @@ def _current_version() -> int: def init_db() -> None: with _lock: - _conn.execute("CREATE TABLE IF NOT EXISTS schema_version (version INTEGER PRIMARY KEY)") + _get_conn().execute("CREATE TABLE IF NOT EXISTS schema_version (version INTEGER PRIMARY KEY)") current = _current_version() for i, script in enumerate(MIGRATIONS, start=1): if i <= current: continue logger.info("applying migration v%d", i) - _conn.executescript(script) - _conn.execute("INSERT OR IGNORE INTO schema_version(version) VALUES (?)", (i,)) - _conn.execute( + _get_conn().executescript(script) + _get_conn().execute("INSERT OR IGNORE INTO schema_version(version) VALUES (?)", (i,)) + _get_conn().execute( "INSERT OR IGNORE INTO system_state(key, value) VALUES ('last_alert_heartbeat', '')" ) logger.info("DB initialized (schema v%d)", _current_version()) @@ -269,13 +281,13 @@ SECRET_KEYS = ("ANTHROPIC_API_KEY", "BERLIN_WOHNEN_USERNAME", "BERLIN_WOHNEN_PAS def get_secret(key: str) -> Optional[str]: - row = _conn.execute("SELECT value FROM secrets WHERE key = ?", (key,)).fetchone() + row = _get_conn().execute("SELECT value FROM secrets WHERE key = ?", (key,)).fetchone() return row["value"] if row else None def set_secret(key: str, value: str) -> None: with _lock: - _conn.execute( + _get_conn().execute( "INSERT INTO secrets(key, value, updated_at) VALUES (?, ?, ?) " "ON CONFLICT(key) DO UPDATE SET value = excluded.value, " " updated_at = excluded.updated_at", @@ -284,7 +296,7 @@ def set_secret(key: str, value: str) -> None: def all_secrets() -> dict[str, str]: - rows = _conn.execute("SELECT key, value FROM secrets").fetchall() + rows = _get_conn().execute("SELECT key, value FROM secrets").fetchall() return {r["key"]: r["value"] for r in rows} @@ -307,13 +319,13 @@ def seed_secrets_from_env() -> None: # --------------------------------------------------------------------------- def get_state(key: str) -> Optional[str]: - row = _conn.execute("SELECT value FROM system_state WHERE key = ?", (key,)).fetchone() + row = _get_conn().execute("SELECT value FROM system_state WHERE key = ?", (key,)).fetchone() return row["value"] if row else None def set_state(key: str, value: str) -> None: with _lock: - _conn.execute( + _get_conn().execute( "INSERT INTO system_state(key, value) VALUES (?, ?) " "ON CONFLICT(key) DO UPDATE SET value = excluded.value", (key, value), @@ -333,13 +345,13 @@ def _ensure_user_rows(user_id: int) -> None: "INSERT OR IGNORE INTO user_notifications(user_id, updated_at) VALUES (?, ?)", "INSERT OR IGNORE INTO user_preferences(user_id, updated_at) VALUES (?, ?)", ): - _conn.execute(q, (user_id, ts)) + _get_conn().execute(q, (user_id, ts)) def create_user(username: str, password_hash: str, is_admin: bool = False) -> int: ts = now_iso() with _lock: - cur = _conn.execute( + cur = _get_conn().execute( "INSERT INTO users(username, password_hash, is_admin, created_at, updated_at) " "VALUES (?, ?, ?, ?, ?)", (username, password_hash, 1 if is_admin else 0, ts, ts), @@ -351,22 +363,22 @@ def create_user(username: str, password_hash: str, is_admin: bool = False) -> in def get_user_by_username(username: str) -> Optional[sqlite3.Row]: - return _conn.execute( + return _get_conn().execute( "SELECT * FROM users WHERE username = ? COLLATE NOCASE AND disabled = 0", (username,) ).fetchone() def get_user(user_id: int) -> Optional[sqlite3.Row]: - return _conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone() + return _get_conn().execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone() def list_users() -> list[sqlite3.Row]: - return list(_conn.execute("SELECT * FROM users ORDER BY username").fetchall()) + return list(_get_conn().execute("SELECT * FROM users ORDER BY username").fetchall()) def set_user_password(user_id: int, password_hash: str) -> None: with _lock: - _conn.execute( + _get_conn().execute( "UPDATE users SET password_hash = ?, updated_at = ? WHERE id = ?", (password_hash, now_iso(), user_id), ) @@ -374,7 +386,7 @@ def set_user_password(user_id: int, password_hash: str) -> None: def set_user_disabled(user_id: int, disabled: bool) -> None: with _lock: - _conn.execute( + _get_conn().execute( "UPDATE users SET disabled = ?, updated_at = ? WHERE id = ?", (1 if disabled else 0, now_iso(), user_id), ) @@ -385,7 +397,7 @@ def delete_user(user_id: int) -> None: preferences, rejections, applications). Audit/error logs stay (user_id column is nullable).""" with _lock: - _conn.execute("DELETE FROM users WHERE id = ?", (user_id,)) + _get_conn().execute("DELETE FROM users WHERE id = ?", (user_id,)) # --------------------------------------------------------------------------- @@ -394,7 +406,7 @@ def delete_user(user_id: int) -> None: def get_profile(user_id: int) -> sqlite3.Row: _ensure_user_rows(user_id) - return _conn.execute("SELECT * FROM user_profiles WHERE user_id = ?", (user_id,)).fetchone() + return _get_conn().execute("SELECT * FROM user_profiles WHERE user_id = ?", (user_id,)).fetchone() def update_profile(user_id: int, data: dict) -> None: @@ -412,12 +424,12 @@ def update_profile(user_id: int, data: dict) -> None: cols = ", ".join(f"{k} = ?" for k in clean) vals = list(clean.values()) + [now_iso(), user_id] with _lock: - _conn.execute(f"UPDATE user_profiles SET {cols}, updated_at = ? WHERE user_id = ?", vals) + _get_conn().execute(f"UPDATE user_profiles SET {cols}, updated_at = ? WHERE user_id = ?", vals) def get_filters(user_id: int) -> sqlite3.Row: _ensure_user_rows(user_id) - return _conn.execute("SELECT * FROM user_filters WHERE user_id = ?", (user_id,)).fetchone() + return _get_conn().execute("SELECT * FROM user_filters WHERE user_id = ?", (user_id,)).fetchone() def update_filters(user_id: int, data: dict) -> None: @@ -430,12 +442,12 @@ def update_filters(user_id: int, data: dict) -> None: cols = ", ".join(f"{k} = ?" for k in clean) vals = list(clean.values()) + [now_iso(), user_id] with _lock: - _conn.execute(f"UPDATE user_filters SET {cols}, updated_at = ? WHERE user_id = ?", vals) + _get_conn().execute(f"UPDATE user_filters SET {cols}, updated_at = ? WHERE user_id = ?", vals) def get_notifications(user_id: int) -> sqlite3.Row: _ensure_user_rows(user_id) - return _conn.execute("SELECT * FROM user_notifications WHERE user_id = ?", (user_id,)).fetchone() + return _get_conn().execute("SELECT * FROM user_notifications WHERE user_id = ?", (user_id,)).fetchone() def update_notifications(user_id: int, data: dict) -> None: @@ -450,12 +462,12 @@ def update_notifications(user_id: int, data: dict) -> None: cols = ", ".join(f"{k} = ?" for k in clean) vals = list(clean.values()) + [now_iso(), user_id] with _lock: - _conn.execute(f"UPDATE user_notifications SET {cols}, updated_at = ? WHERE user_id = ?", vals) + _get_conn().execute(f"UPDATE user_notifications SET {cols}, updated_at = ? WHERE user_id = ?", vals) def get_preferences(user_id: int) -> sqlite3.Row: _ensure_user_rows(user_id) - return _conn.execute("SELECT * FROM user_preferences WHERE user_id = ?", (user_id,)).fetchone() + return _get_conn().execute("SELECT * FROM user_preferences WHERE user_id = ?", (user_id,)).fetchone() def update_preferences(user_id: int, data: dict) -> None: @@ -470,7 +482,7 @@ def update_preferences(user_id: int, data: dict) -> None: cols = ", ".join(f"{k} = ?" for k in clean) vals = list(clean.values()) + [now_iso(), user_id] with _lock: - _conn.execute(f"UPDATE user_preferences SET {cols}, updated_at = ? WHERE user_id = ?", vals) + _get_conn().execute(f"UPDATE user_preferences SET {cols}, updated_at = ? WHERE user_id = ?", vals) # --------------------------------------------------------------------------- @@ -480,20 +492,20 @@ def update_preferences(user_id: int, data: dict) -> None: def upsert_flat(payload: dict) -> bool: flat_id = str(payload["id"]) with _lock: - existing = _conn.execute( + existing = _get_conn().execute( "SELECT id, lat, lng FROM flats WHERE id = ?", (flat_id,) ).fetchone() if existing: # Backfill coords on old rows that pre-date the lat/lng migration. if (existing["lat"] is None or existing["lng"] is None) \ and payload.get("lat") is not None and payload.get("lng") is not None: - _conn.execute( + _get_conn().execute( "UPDATE flats SET lat = ?, lng = ? WHERE id = ?", (payload["lat"], payload["lng"], flat_id), ) return False c = payload.get("connectivity") or {} - _conn.execute( + _get_conn().execute( """INSERT INTO flats( id, link, address, rooms, size, total_rent, sqm_price, year_built, wbs, connectivity_morning_time, connectivity_night_time, address_link_gmaps, @@ -515,20 +527,20 @@ def upsert_flat(payload: dict) -> bool: def recent_flats(limit: int = 50) -> list[sqlite3.Row]: - return list(_conn.execute( + return list(_get_conn().execute( "SELECT * FROM flats ORDER BY discovered_at DESC LIMIT ?", (limit,) ).fetchall()) def get_flat(flat_id: str) -> Optional[sqlite3.Row]: - return _conn.execute("SELECT * FROM flats WHERE id = ?", (flat_id,)).fetchone() + return _get_conn().execute("SELECT * FROM flats WHERE id = ?", (flat_id,)).fetchone() def set_flat_enrichment(flat_id: str, status: str, enrichment: Optional[dict] = None, image_count: int = 0) -> None: with _lock: - _conn.execute( + _get_conn().execute( """UPDATE flats SET enrichment_status = ?, enrichment_json = ?, enrichment_updated_at = ?, @@ -541,7 +553,7 @@ def set_flat_enrichment(flat_id: str, status: str, def flats_needing_enrichment(limit: int = 100) -> list[sqlite3.Row]: - return list(_conn.execute( + return list(_get_conn().execute( """SELECT id, link FROM flats WHERE enrichment_status IN ('pending', 'failed') ORDER BY discovered_at DESC LIMIT ?""", @@ -550,7 +562,7 @@ def flats_needing_enrichment(limit: int = 100) -> list[sqlite3.Row]: def enrichment_counts() -> dict: - row = _conn.execute( + row = _get_conn().execute( """SELECT COUNT(*) AS total, SUM(CASE WHEN enrichment_status = 'ok' THEN 1 ELSE 0 END) AS ok, @@ -573,7 +585,7 @@ def enrichment_counts() -> dict: def start_application(user_id: int, flat_id: str, url: str, triggered_by: str, submit_forms: bool, profile_snapshot: dict) -> int: with _lock: - cur = _conn.execute( + cur = _get_conn().execute( """INSERT INTO applications( user_id, flat_id, url, triggered_by, submit_forms_used, started_at, profile_snapshot_json @@ -587,7 +599,7 @@ def start_application(user_id: int, flat_id: str, url: str, triggered_by: str, def finish_application(app_id: int, success: bool, message: str, provider: str = "", forensics: Optional[dict] = None) -> None: with _lock: - _conn.execute( + _get_conn().execute( """UPDATE applications SET finished_at = ?, success = ?, message = ?, provider = ?, forensics_json = ? WHERE id = ?""", @@ -598,17 +610,17 @@ def finish_application(app_id: int, success: bool, message: str, def get_application(app_id: int) -> Optional[sqlite3.Row]: - return _conn.execute("SELECT * FROM applications WHERE id = ?", (app_id,)).fetchone() + return _get_conn().execute("SELECT * FROM applications WHERE id = ?", (app_id,)).fetchone() def recent_applications(user_id: Optional[int], limit: int = 50) -> list[sqlite3.Row]: if user_id is None: - return list(_conn.execute( + return list(_get_conn().execute( """SELECT a.*, f.address, f.link FROM applications a LEFT JOIN flats f ON f.id = a.flat_id ORDER BY a.started_at DESC LIMIT ?""", (limit,) ).fetchall()) - return list(_conn.execute( + return list(_get_conn().execute( """SELECT a.*, f.address, f.link FROM applications a LEFT JOIN flats f ON f.id = a.flat_id WHERE a.user_id = ? @@ -623,7 +635,7 @@ def recent_applications(user_id: Optional[int], limit: int = 50) -> list[sqlite3 def reject_flat(user_id: int, flat_id: str) -> None: with _lock: - _conn.execute( + _get_conn().execute( "INSERT OR IGNORE INTO flat_rejections(user_id, flat_id, rejected_at) VALUES (?, ?, ?)", (user_id, flat_id, now_iso()), ) @@ -631,21 +643,21 @@ def reject_flat(user_id: int, flat_id: str) -> None: def unreject_flat(user_id: int, flat_id: str) -> None: with _lock: - _conn.execute( + _get_conn().execute( "DELETE FROM flat_rejections WHERE user_id = ? AND flat_id = ?", (user_id, flat_id), ) def rejected_flat_ids(user_id: int) -> set[str]: - rows = _conn.execute( + rows = _get_conn().execute( "SELECT flat_id FROM flat_rejections WHERE user_id = ?", (user_id,) ).fetchall() return {row["flat_id"] for row in rows} def rejected_flats(user_id: int, limit: int = 200) -> list[sqlite3.Row]: - return list(_conn.execute( + return list(_get_conn().execute( """SELECT f.*, r.rejected_at FROM flat_rejections r JOIN flats f ON f.id = r.flat_id WHERE r.user_id = ? @@ -655,7 +667,7 @@ def rejected_flats(user_id: int, limit: int = 200) -> list[sqlite3.Row]: def last_application_for_flat(user_id: int, flat_id: str) -> Optional[sqlite3.Row]: - return _conn.execute( + return _get_conn().execute( """SELECT * FROM applications WHERE user_id = ? AND flat_id = ? ORDER BY started_at DESC LIMIT 1""", @@ -663,6 +675,14 @@ def last_application_for_flat(user_id: int, flat_id: str) -> Optional[sqlite3.Ro ).fetchone() +def has_running_application(user_id: int) -> bool: + row = _get_conn().execute( + "SELECT 1 FROM applications WHERE user_id = ? AND finished_at IS NULL LIMIT 1", + (user_id,), + ).fetchone() + return row is not None + + # --------------------------------------------------------------------------- # Errors # --------------------------------------------------------------------------- @@ -671,7 +691,7 @@ def log_error(source: str, kind: str, summary: str, user_id: Optional[int] = None, application_id: Optional[int] = None, context: Optional[dict] = None) -> int: with _lock: - cur = _conn.execute( + cur = _get_conn().execute( """INSERT INTO errors(timestamp, user_id, source, kind, summary, application_id, context_json) VALUES (?, ?, ?, ?, ?, ?, ?)""", (now_iso(), user_id, source, kind, summary, application_id, @@ -683,24 +703,24 @@ def log_error(source: str, kind: str, summary: str, def recent_errors(user_id: Optional[int], limit: int = 100, include_global: bool = False) -> list[sqlite3.Row]: if user_id is None: - return list(_conn.execute( + return list(_get_conn().execute( "SELECT * FROM errors ORDER BY timestamp DESC LIMIT ?", (limit,) ).fetchall()) if include_global: - return list(_conn.execute( + return list(_get_conn().execute( """SELECT * FROM errors WHERE user_id = ? OR user_id IS NULL ORDER BY timestamp DESC LIMIT ?""", (user_id, limit), ).fetchall()) - return list(_conn.execute( + return list(_get_conn().execute( "SELECT * FROM errors WHERE user_id = ? ORDER BY timestamp DESC LIMIT ?", (user_id, limit), ).fetchall()) def get_error(error_id: int) -> Optional[sqlite3.Row]: - return _conn.execute("SELECT * FROM errors WHERE id = ?", (error_id,)).fetchone() + return _get_conn().execute("SELECT * FROM errors WHERE id = ?", (error_id,)).fetchone() # --------------------------------------------------------------------------- @@ -710,7 +730,7 @@ def get_error(error_id: int) -> Optional[sqlite3.Row]: def log_audit(actor: str, action: str, details: str = "", user_id: Optional[int] = None, ip: str = "") -> None: with _lock: - _conn.execute( + _get_conn().execute( "INSERT INTO audit_log(timestamp, user_id, actor, action, details, ip) " "VALUES (?, ?, ?, ?, ?, ?)", (now_iso(), user_id, actor, action, details, ip), @@ -719,10 +739,10 @@ def log_audit(actor: str, action: str, details: str = "", def recent_audit(user_id: Optional[int], limit: int = 100) -> list[sqlite3.Row]: if user_id is None: - return list(_conn.execute( + return list(_get_conn().execute( "SELECT * FROM audit_log ORDER BY timestamp DESC LIMIT ?", (limit,) ).fetchall()) - return list(_conn.execute( + return list(_get_conn().execute( "SELECT * FROM audit_log WHERE user_id = ? OR user_id IS NULL ORDER BY timestamp DESC LIMIT ?", (user_id, limit), ).fetchall()) @@ -737,7 +757,7 @@ def recent_audit(user_id: Optional[int], limit: int = 100) -> list[sqlite3.Row]: # --------------------------------------------------------------------------- def get_accepted_partnership(user_id: int) -> Optional[sqlite3.Row]: - return _conn.execute( + return _get_conn().execute( """SELECT * FROM partnerships WHERE status = 'accepted' AND (from_user_id = ? OR to_user_id = ?) LIMIT 1""", @@ -754,7 +774,7 @@ def get_partner_user(user_id: int) -> Optional[sqlite3.Row]: def partnership_incoming(user_id: int) -> list[sqlite3.Row]: - return list(_conn.execute( + return list(_get_conn().execute( """SELECT p.*, u.username AS from_username FROM partnerships p JOIN users u ON u.id = p.from_user_id WHERE p.status = 'pending' AND p.to_user_id = ? @@ -764,7 +784,7 @@ def partnership_incoming(user_id: int) -> list[sqlite3.Row]: def partnership_outgoing(user_id: int) -> list[sqlite3.Row]: - return list(_conn.execute( + return list(_get_conn().execute( """SELECT p.*, u.username AS to_username FROM partnerships p JOIN users u ON u.id = p.to_user_id WHERE p.status = 'pending' AND p.from_user_id = ? @@ -782,7 +802,7 @@ def partnership_request(from_id: int, to_id: int) -> Optional[int]: return None with _lock: # Reject duplicate in either direction. - dup = _conn.execute( + dup = _get_conn().execute( """SELECT id FROM partnerships WHERE (from_user_id = ? AND to_user_id = ?) OR (from_user_id = ? AND to_user_id = ?)""", @@ -790,7 +810,7 @@ def partnership_request(from_id: int, to_id: int) -> Optional[int]: ).fetchone() if dup: return None - cur = _conn.execute( + cur = _get_conn().execute( "INSERT INTO partnerships(from_user_id, to_user_id, status, created_at) " "VALUES (?, ?, 'pending', ?)", (from_id, to_id, now_iso()), @@ -801,7 +821,7 @@ def partnership_request(from_id: int, to_id: int) -> Optional[int]: def partnership_accept(request_id: int, user_id: int) -> bool: """Accept a pending request addressed to user_id. Also wipes any other pending rows involving either partner.""" - row = _conn.execute( + row = _get_conn().execute( "SELECT * FROM partnerships WHERE id = ? AND status = 'pending'", (request_id,), ).fetchone() @@ -812,12 +832,12 @@ def partnership_accept(request_id: int, user_id: int) -> bool: return False partner_id = row["from_user_id"] with _lock: - _conn.execute( + _get_conn().execute( "UPDATE partnerships SET status = 'accepted', accepted_at = ? WHERE id = ?", (now_iso(), request_id), ) # clean up any stale pending requests touching either user - _conn.execute( + _get_conn().execute( """DELETE FROM partnerships WHERE status = 'pending' AND (from_user_id IN (?, ?) OR to_user_id IN (?, ?))""", @@ -829,7 +849,7 @@ def partnership_accept(request_id: int, user_id: int) -> bool: def partnership_decline(request_id: int, user_id: int) -> bool: """Decline an incoming pending request (deletes the row).""" with _lock: - cur = _conn.execute( + cur = _get_conn().execute( """DELETE FROM partnerships WHERE id = ? AND status = 'pending' AND (to_user_id = ? OR from_user_id = ?)""", @@ -841,7 +861,7 @@ def partnership_decline(request_id: int, user_id: int) -> bool: def partnership_unlink(user_id: int) -> bool: """Remove the current accepted partnership (either side can call).""" with _lock: - cur = _conn.execute( + cur = _get_conn().execute( """DELETE FROM partnerships WHERE status = 'accepted' AND (from_user_id = ? OR to_user_id = ?)""", @@ -853,10 +873,10 @@ def partnership_unlink(user_id: int) -> bool: def partner_flat_actions(partner_id: int) -> dict: """Flats the partner has touched. 'applied' = any application (regardless of outcome); 'rejected' = in flat_rejections.""" - applied = {r["flat_id"] for r in _conn.execute( + applied = {r["flat_id"] for r in _get_conn().execute( "SELECT DISTINCT flat_id FROM applications WHERE user_id = ?", (partner_id,) ).fetchall()} - rejected = {r["flat_id"] for r in _conn.execute( + rejected = {r["flat_id"] for r in _get_conn().execute( "SELECT flat_id FROM flat_rejections WHERE user_id = ?", (partner_id,) ).fetchall()} return {"applied": applied, "rejected": rejected} @@ -867,10 +887,10 @@ def cleanup_retention() -> dict: stats = {} with _lock: for table in ("errors", "audit_log"): - cur = _conn.execute(f"DELETE FROM {table} WHERE timestamp < ?", (cutoff,)) + cur = _get_conn().execute(f"DELETE FROM {table} WHERE timestamp < ?", (cutoff,)) stats[table] = cur.rowcount # Drop forensics from old applications (but keep the row itself for history) - cur = _conn.execute( + cur = _get_conn().execute( "UPDATE applications SET forensics_json = NULL WHERE started_at < ? AND forensics_json IS NOT NULL", (cutoff,), )