From 617c76cb54e4532ed7b1bce95357022ee43239e1 Mon Sep 17 00:00:00 2001 From: EiSiMo Date: Tue, 21 Apr 2026 19:01:27 +0200 Subject: [PATCH 1/3] db: thread-local SQLite connections + busy_timeout MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The module-level _conn was being hit from FastAPI handlers, the retention daemon thread, and asyncio.to_thread workers simultaneously. Sharing a single sqlite3.Connection across threads is unsafe (cursors collide) even with check_same_thread=False and WAL. The writer _lock didn't cover readers, so a reader cursor could race a writer mid-statement. Switch to threading.local(): each thread gets its own Connection via _get_conn(). WAL handles concurrent readers/writer at the DB level; busy_timeout=5000 absorbs short-lived "database is locked" when two threads both try to BEGIN IMMEDIATE. The write-serialising _lock stays — it keeps multi-statement writer blocks atomic and avoids busy-loop on concurrent writers. External access via db._conn replaced with a new db.has_running_application helper (the only caller outside db.py). Co-Authored-By: Claude Opus 4.7 (1M context) --- web/app.py | 6 +-- web/db.py | 154 ++++++++++++++++++++++++++++++----------------------- 2 files changed, 88 insertions(+), 72 deletions(-) diff --git a/web/app.py b/web/app.py index 6a59651..750be75 100644 --- a/web/app.py +++ b/web/app.py @@ -274,11 +274,7 @@ def _filter_summary(f) -> str: def _has_running_application(user_id: int) -> bool: - row = db._conn.execute( - "SELECT 1 FROM applications WHERE user_id = ? AND finished_at IS NULL LIMIT 1", - (user_id,), - ).fetchone() - return row is not None + return db.has_running_application(user_id) def _finish_apply_background(app_id: int, user_id: int, flat_id: str, url: str, diff --git a/web/db.py b/web/db.py index d18c6fe..a926d4c 100644 --- a/web/db.py +++ b/web/db.py @@ -16,7 +16,13 @@ from settings import DB_PATH, RETENTION_DAYS logger = logging.getLogger("web.db") +# WAL mode permits any number of concurrent readers and one writer at a time, +# but a single sqlite3.Connection object is not safe to share across threads +# (cursors can collide). We keep one connection per thread via threading.local +# and a module-level write lock so concurrent writers don't trip +# "database is locked" during BEGIN IMMEDIATE. _lock = threading.Lock() +_local = threading.local() def _connect() -> sqlite3.Connection: @@ -24,10 +30,16 @@ def _connect() -> sqlite3.Connection: conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA foreign_keys=ON") + conn.execute("PRAGMA busy_timeout=5000") return conn -_conn: sqlite3.Connection = _connect() +def _get_conn() -> sqlite3.Connection: + c = getattr(_local, "conn", None) + if c is None: + c = _connect() + _local.conn = c + return c # --------------------------------------------------------------------------- @@ -235,7 +247,7 @@ MIGRATIONS: list[str] = [ def _current_version() -> int: try: - row = _conn.execute("SELECT COALESCE(MAX(version), 0) AS v FROM schema_version").fetchone() + row = _get_conn().execute("SELECT COALESCE(MAX(version), 0) AS v FROM schema_version").fetchone() return int(row["v"]) if row else 0 except sqlite3.Error: return 0 @@ -243,15 +255,15 @@ def _current_version() -> int: def init_db() -> None: with _lock: - _conn.execute("CREATE TABLE IF NOT EXISTS schema_version (version INTEGER PRIMARY KEY)") + _get_conn().execute("CREATE TABLE IF NOT EXISTS schema_version (version INTEGER PRIMARY KEY)") current = _current_version() for i, script in enumerate(MIGRATIONS, start=1): if i <= current: continue logger.info("applying migration v%d", i) - _conn.executescript(script) - _conn.execute("INSERT OR IGNORE INTO schema_version(version) VALUES (?)", (i,)) - _conn.execute( + _get_conn().executescript(script) + _get_conn().execute("INSERT OR IGNORE INTO schema_version(version) VALUES (?)", (i,)) + _get_conn().execute( "INSERT OR IGNORE INTO system_state(key, value) VALUES ('last_alert_heartbeat', '')" ) logger.info("DB initialized (schema v%d)", _current_version()) @@ -269,13 +281,13 @@ SECRET_KEYS = ("ANTHROPIC_API_KEY", "BERLIN_WOHNEN_USERNAME", "BERLIN_WOHNEN_PAS def get_secret(key: str) -> Optional[str]: - row = _conn.execute("SELECT value FROM secrets WHERE key = ?", (key,)).fetchone() + row = _get_conn().execute("SELECT value FROM secrets WHERE key = ?", (key,)).fetchone() return row["value"] if row else None def set_secret(key: str, value: str) -> None: with _lock: - _conn.execute( + _get_conn().execute( "INSERT INTO secrets(key, value, updated_at) VALUES (?, ?, ?) " "ON CONFLICT(key) DO UPDATE SET value = excluded.value, " " updated_at = excluded.updated_at", @@ -284,7 +296,7 @@ def set_secret(key: str, value: str) -> None: def all_secrets() -> dict[str, str]: - rows = _conn.execute("SELECT key, value FROM secrets").fetchall() + rows = _get_conn().execute("SELECT key, value FROM secrets").fetchall() return {r["key"]: r["value"] for r in rows} @@ -307,13 +319,13 @@ def seed_secrets_from_env() -> None: # --------------------------------------------------------------------------- def get_state(key: str) -> Optional[str]: - row = _conn.execute("SELECT value FROM system_state WHERE key = ?", (key,)).fetchone() + row = _get_conn().execute("SELECT value FROM system_state WHERE key = ?", (key,)).fetchone() return row["value"] if row else None def set_state(key: str, value: str) -> None: with _lock: - _conn.execute( + _get_conn().execute( "INSERT INTO system_state(key, value) VALUES (?, ?) " "ON CONFLICT(key) DO UPDATE SET value = excluded.value", (key, value), @@ -333,13 +345,13 @@ def _ensure_user_rows(user_id: int) -> None: "INSERT OR IGNORE INTO user_notifications(user_id, updated_at) VALUES (?, ?)", "INSERT OR IGNORE INTO user_preferences(user_id, updated_at) VALUES (?, ?)", ): - _conn.execute(q, (user_id, ts)) + _get_conn().execute(q, (user_id, ts)) def create_user(username: str, password_hash: str, is_admin: bool = False) -> int: ts = now_iso() with _lock: - cur = _conn.execute( + cur = _get_conn().execute( "INSERT INTO users(username, password_hash, is_admin, created_at, updated_at) " "VALUES (?, ?, ?, ?, ?)", (username, password_hash, 1 if is_admin else 0, ts, ts), @@ -351,22 +363,22 @@ def create_user(username: str, password_hash: str, is_admin: bool = False) -> in def get_user_by_username(username: str) -> Optional[sqlite3.Row]: - return _conn.execute( + return _get_conn().execute( "SELECT * FROM users WHERE username = ? COLLATE NOCASE AND disabled = 0", (username,) ).fetchone() def get_user(user_id: int) -> Optional[sqlite3.Row]: - return _conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone() + return _get_conn().execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone() def list_users() -> list[sqlite3.Row]: - return list(_conn.execute("SELECT * FROM users ORDER BY username").fetchall()) + return list(_get_conn().execute("SELECT * FROM users ORDER BY username").fetchall()) def set_user_password(user_id: int, password_hash: str) -> None: with _lock: - _conn.execute( + _get_conn().execute( "UPDATE users SET password_hash = ?, updated_at = ? WHERE id = ?", (password_hash, now_iso(), user_id), ) @@ -374,7 +386,7 @@ def set_user_password(user_id: int, password_hash: str) -> None: def set_user_disabled(user_id: int, disabled: bool) -> None: with _lock: - _conn.execute( + _get_conn().execute( "UPDATE users SET disabled = ?, updated_at = ? WHERE id = ?", (1 if disabled else 0, now_iso(), user_id), ) @@ -385,7 +397,7 @@ def delete_user(user_id: int) -> None: preferences, rejections, applications). Audit/error logs stay (user_id column is nullable).""" with _lock: - _conn.execute("DELETE FROM users WHERE id = ?", (user_id,)) + _get_conn().execute("DELETE FROM users WHERE id = ?", (user_id,)) # --------------------------------------------------------------------------- @@ -394,7 +406,7 @@ def delete_user(user_id: int) -> None: def get_profile(user_id: int) -> sqlite3.Row: _ensure_user_rows(user_id) - return _conn.execute("SELECT * FROM user_profiles WHERE user_id = ?", (user_id,)).fetchone() + return _get_conn().execute("SELECT * FROM user_profiles WHERE user_id = ?", (user_id,)).fetchone() def update_profile(user_id: int, data: dict) -> None: @@ -412,12 +424,12 @@ def update_profile(user_id: int, data: dict) -> None: cols = ", ".join(f"{k} = ?" for k in clean) vals = list(clean.values()) + [now_iso(), user_id] with _lock: - _conn.execute(f"UPDATE user_profiles SET {cols}, updated_at = ? WHERE user_id = ?", vals) + _get_conn().execute(f"UPDATE user_profiles SET {cols}, updated_at = ? WHERE user_id = ?", vals) def get_filters(user_id: int) -> sqlite3.Row: _ensure_user_rows(user_id) - return _conn.execute("SELECT * FROM user_filters WHERE user_id = ?", (user_id,)).fetchone() + return _get_conn().execute("SELECT * FROM user_filters WHERE user_id = ?", (user_id,)).fetchone() def update_filters(user_id: int, data: dict) -> None: @@ -430,12 +442,12 @@ def update_filters(user_id: int, data: dict) -> None: cols = ", ".join(f"{k} = ?" for k in clean) vals = list(clean.values()) + [now_iso(), user_id] with _lock: - _conn.execute(f"UPDATE user_filters SET {cols}, updated_at = ? WHERE user_id = ?", vals) + _get_conn().execute(f"UPDATE user_filters SET {cols}, updated_at = ? WHERE user_id = ?", vals) def get_notifications(user_id: int) -> sqlite3.Row: _ensure_user_rows(user_id) - return _conn.execute("SELECT * FROM user_notifications WHERE user_id = ?", (user_id,)).fetchone() + return _get_conn().execute("SELECT * FROM user_notifications WHERE user_id = ?", (user_id,)).fetchone() def update_notifications(user_id: int, data: dict) -> None: @@ -450,12 +462,12 @@ def update_notifications(user_id: int, data: dict) -> None: cols = ", ".join(f"{k} = ?" for k in clean) vals = list(clean.values()) + [now_iso(), user_id] with _lock: - _conn.execute(f"UPDATE user_notifications SET {cols}, updated_at = ? WHERE user_id = ?", vals) + _get_conn().execute(f"UPDATE user_notifications SET {cols}, updated_at = ? WHERE user_id = ?", vals) def get_preferences(user_id: int) -> sqlite3.Row: _ensure_user_rows(user_id) - return _conn.execute("SELECT * FROM user_preferences WHERE user_id = ?", (user_id,)).fetchone() + return _get_conn().execute("SELECT * FROM user_preferences WHERE user_id = ?", (user_id,)).fetchone() def update_preferences(user_id: int, data: dict) -> None: @@ -470,7 +482,7 @@ def update_preferences(user_id: int, data: dict) -> None: cols = ", ".join(f"{k} = ?" for k in clean) vals = list(clean.values()) + [now_iso(), user_id] with _lock: - _conn.execute(f"UPDATE user_preferences SET {cols}, updated_at = ? WHERE user_id = ?", vals) + _get_conn().execute(f"UPDATE user_preferences SET {cols}, updated_at = ? WHERE user_id = ?", vals) # --------------------------------------------------------------------------- @@ -480,20 +492,20 @@ def update_preferences(user_id: int, data: dict) -> None: def upsert_flat(payload: dict) -> bool: flat_id = str(payload["id"]) with _lock: - existing = _conn.execute( + existing = _get_conn().execute( "SELECT id, lat, lng FROM flats WHERE id = ?", (flat_id,) ).fetchone() if existing: # Backfill coords on old rows that pre-date the lat/lng migration. if (existing["lat"] is None or existing["lng"] is None) \ and payload.get("lat") is not None and payload.get("lng") is not None: - _conn.execute( + _get_conn().execute( "UPDATE flats SET lat = ?, lng = ? WHERE id = ?", (payload["lat"], payload["lng"], flat_id), ) return False c = payload.get("connectivity") or {} - _conn.execute( + _get_conn().execute( """INSERT INTO flats( id, link, address, rooms, size, total_rent, sqm_price, year_built, wbs, connectivity_morning_time, connectivity_night_time, address_link_gmaps, @@ -515,20 +527,20 @@ def upsert_flat(payload: dict) -> bool: def recent_flats(limit: int = 50) -> list[sqlite3.Row]: - return list(_conn.execute( + return list(_get_conn().execute( "SELECT * FROM flats ORDER BY discovered_at DESC LIMIT ?", (limit,) ).fetchall()) def get_flat(flat_id: str) -> Optional[sqlite3.Row]: - return _conn.execute("SELECT * FROM flats WHERE id = ?", (flat_id,)).fetchone() + return _get_conn().execute("SELECT * FROM flats WHERE id = ?", (flat_id,)).fetchone() def set_flat_enrichment(flat_id: str, status: str, enrichment: Optional[dict] = None, image_count: int = 0) -> None: with _lock: - _conn.execute( + _get_conn().execute( """UPDATE flats SET enrichment_status = ?, enrichment_json = ?, enrichment_updated_at = ?, @@ -541,7 +553,7 @@ def set_flat_enrichment(flat_id: str, status: str, def flats_needing_enrichment(limit: int = 100) -> list[sqlite3.Row]: - return list(_conn.execute( + return list(_get_conn().execute( """SELECT id, link FROM flats WHERE enrichment_status IN ('pending', 'failed') ORDER BY discovered_at DESC LIMIT ?""", @@ -550,7 +562,7 @@ def flats_needing_enrichment(limit: int = 100) -> list[sqlite3.Row]: def enrichment_counts() -> dict: - row = _conn.execute( + row = _get_conn().execute( """SELECT COUNT(*) AS total, SUM(CASE WHEN enrichment_status = 'ok' THEN 1 ELSE 0 END) AS ok, @@ -573,7 +585,7 @@ def enrichment_counts() -> dict: def start_application(user_id: int, flat_id: str, url: str, triggered_by: str, submit_forms: bool, profile_snapshot: dict) -> int: with _lock: - cur = _conn.execute( + cur = _get_conn().execute( """INSERT INTO applications( user_id, flat_id, url, triggered_by, submit_forms_used, started_at, profile_snapshot_json @@ -587,7 +599,7 @@ def start_application(user_id: int, flat_id: str, url: str, triggered_by: str, def finish_application(app_id: int, success: bool, message: str, provider: str = "", forensics: Optional[dict] = None) -> None: with _lock: - _conn.execute( + _get_conn().execute( """UPDATE applications SET finished_at = ?, success = ?, message = ?, provider = ?, forensics_json = ? WHERE id = ?""", @@ -598,17 +610,17 @@ def finish_application(app_id: int, success: bool, message: str, def get_application(app_id: int) -> Optional[sqlite3.Row]: - return _conn.execute("SELECT * FROM applications WHERE id = ?", (app_id,)).fetchone() + return _get_conn().execute("SELECT * FROM applications WHERE id = ?", (app_id,)).fetchone() def recent_applications(user_id: Optional[int], limit: int = 50) -> list[sqlite3.Row]: if user_id is None: - return list(_conn.execute( + return list(_get_conn().execute( """SELECT a.*, f.address, f.link FROM applications a LEFT JOIN flats f ON f.id = a.flat_id ORDER BY a.started_at DESC LIMIT ?""", (limit,) ).fetchall()) - return list(_conn.execute( + return list(_get_conn().execute( """SELECT a.*, f.address, f.link FROM applications a LEFT JOIN flats f ON f.id = a.flat_id WHERE a.user_id = ? @@ -623,7 +635,7 @@ def recent_applications(user_id: Optional[int], limit: int = 50) -> list[sqlite3 def reject_flat(user_id: int, flat_id: str) -> None: with _lock: - _conn.execute( + _get_conn().execute( "INSERT OR IGNORE INTO flat_rejections(user_id, flat_id, rejected_at) VALUES (?, ?, ?)", (user_id, flat_id, now_iso()), ) @@ -631,21 +643,21 @@ def reject_flat(user_id: int, flat_id: str) -> None: def unreject_flat(user_id: int, flat_id: str) -> None: with _lock: - _conn.execute( + _get_conn().execute( "DELETE FROM flat_rejections WHERE user_id = ? AND flat_id = ?", (user_id, flat_id), ) def rejected_flat_ids(user_id: int) -> set[str]: - rows = _conn.execute( + rows = _get_conn().execute( "SELECT flat_id FROM flat_rejections WHERE user_id = ?", (user_id,) ).fetchall() return {row["flat_id"] for row in rows} def rejected_flats(user_id: int, limit: int = 200) -> list[sqlite3.Row]: - return list(_conn.execute( + return list(_get_conn().execute( """SELECT f.*, r.rejected_at FROM flat_rejections r JOIN flats f ON f.id = r.flat_id WHERE r.user_id = ? @@ -655,7 +667,7 @@ def rejected_flats(user_id: int, limit: int = 200) -> list[sqlite3.Row]: def last_application_for_flat(user_id: int, flat_id: str) -> Optional[sqlite3.Row]: - return _conn.execute( + return _get_conn().execute( """SELECT * FROM applications WHERE user_id = ? AND flat_id = ? ORDER BY started_at DESC LIMIT 1""", @@ -663,6 +675,14 @@ def last_application_for_flat(user_id: int, flat_id: str) -> Optional[sqlite3.Ro ).fetchone() +def has_running_application(user_id: int) -> bool: + row = _get_conn().execute( + "SELECT 1 FROM applications WHERE user_id = ? AND finished_at IS NULL LIMIT 1", + (user_id,), + ).fetchone() + return row is not None + + # --------------------------------------------------------------------------- # Errors # --------------------------------------------------------------------------- @@ -671,7 +691,7 @@ def log_error(source: str, kind: str, summary: str, user_id: Optional[int] = None, application_id: Optional[int] = None, context: Optional[dict] = None) -> int: with _lock: - cur = _conn.execute( + cur = _get_conn().execute( """INSERT INTO errors(timestamp, user_id, source, kind, summary, application_id, context_json) VALUES (?, ?, ?, ?, ?, ?, ?)""", (now_iso(), user_id, source, kind, summary, application_id, @@ -683,24 +703,24 @@ def log_error(source: str, kind: str, summary: str, def recent_errors(user_id: Optional[int], limit: int = 100, include_global: bool = False) -> list[sqlite3.Row]: if user_id is None: - return list(_conn.execute( + return list(_get_conn().execute( "SELECT * FROM errors ORDER BY timestamp DESC LIMIT ?", (limit,) ).fetchall()) if include_global: - return list(_conn.execute( + return list(_get_conn().execute( """SELECT * FROM errors WHERE user_id = ? OR user_id IS NULL ORDER BY timestamp DESC LIMIT ?""", (user_id, limit), ).fetchall()) - return list(_conn.execute( + return list(_get_conn().execute( "SELECT * FROM errors WHERE user_id = ? ORDER BY timestamp DESC LIMIT ?", (user_id, limit), ).fetchall()) def get_error(error_id: int) -> Optional[sqlite3.Row]: - return _conn.execute("SELECT * FROM errors WHERE id = ?", (error_id,)).fetchone() + return _get_conn().execute("SELECT * FROM errors WHERE id = ?", (error_id,)).fetchone() # --------------------------------------------------------------------------- @@ -710,7 +730,7 @@ def get_error(error_id: int) -> Optional[sqlite3.Row]: def log_audit(actor: str, action: str, details: str = "", user_id: Optional[int] = None, ip: str = "") -> None: with _lock: - _conn.execute( + _get_conn().execute( "INSERT INTO audit_log(timestamp, user_id, actor, action, details, ip) " "VALUES (?, ?, ?, ?, ?, ?)", (now_iso(), user_id, actor, action, details, ip), @@ -719,10 +739,10 @@ def log_audit(actor: str, action: str, details: str = "", def recent_audit(user_id: Optional[int], limit: int = 100) -> list[sqlite3.Row]: if user_id is None: - return list(_conn.execute( + return list(_get_conn().execute( "SELECT * FROM audit_log ORDER BY timestamp DESC LIMIT ?", (limit,) ).fetchall()) - return list(_conn.execute( + return list(_get_conn().execute( "SELECT * FROM audit_log WHERE user_id = ? OR user_id IS NULL ORDER BY timestamp DESC LIMIT ?", (user_id, limit), ).fetchall()) @@ -737,7 +757,7 @@ def recent_audit(user_id: Optional[int], limit: int = 100) -> list[sqlite3.Row]: # --------------------------------------------------------------------------- def get_accepted_partnership(user_id: int) -> Optional[sqlite3.Row]: - return _conn.execute( + return _get_conn().execute( """SELECT * FROM partnerships WHERE status = 'accepted' AND (from_user_id = ? OR to_user_id = ?) LIMIT 1""", @@ -754,7 +774,7 @@ def get_partner_user(user_id: int) -> Optional[sqlite3.Row]: def partnership_incoming(user_id: int) -> list[sqlite3.Row]: - return list(_conn.execute( + return list(_get_conn().execute( """SELECT p.*, u.username AS from_username FROM partnerships p JOIN users u ON u.id = p.from_user_id WHERE p.status = 'pending' AND p.to_user_id = ? @@ -764,7 +784,7 @@ def partnership_incoming(user_id: int) -> list[sqlite3.Row]: def partnership_outgoing(user_id: int) -> list[sqlite3.Row]: - return list(_conn.execute( + return list(_get_conn().execute( """SELECT p.*, u.username AS to_username FROM partnerships p JOIN users u ON u.id = p.to_user_id WHERE p.status = 'pending' AND p.from_user_id = ? @@ -782,7 +802,7 @@ def partnership_request(from_id: int, to_id: int) -> Optional[int]: return None with _lock: # Reject duplicate in either direction. - dup = _conn.execute( + dup = _get_conn().execute( """SELECT id FROM partnerships WHERE (from_user_id = ? AND to_user_id = ?) OR (from_user_id = ? AND to_user_id = ?)""", @@ -790,7 +810,7 @@ def partnership_request(from_id: int, to_id: int) -> Optional[int]: ).fetchone() if dup: return None - cur = _conn.execute( + cur = _get_conn().execute( "INSERT INTO partnerships(from_user_id, to_user_id, status, created_at) " "VALUES (?, ?, 'pending', ?)", (from_id, to_id, now_iso()), @@ -801,7 +821,7 @@ def partnership_request(from_id: int, to_id: int) -> Optional[int]: def partnership_accept(request_id: int, user_id: int) -> bool: """Accept a pending request addressed to user_id. Also wipes any other pending rows involving either partner.""" - row = _conn.execute( + row = _get_conn().execute( "SELECT * FROM partnerships WHERE id = ? AND status = 'pending'", (request_id,), ).fetchone() @@ -812,12 +832,12 @@ def partnership_accept(request_id: int, user_id: int) -> bool: return False partner_id = row["from_user_id"] with _lock: - _conn.execute( + _get_conn().execute( "UPDATE partnerships SET status = 'accepted', accepted_at = ? WHERE id = ?", (now_iso(), request_id), ) # clean up any stale pending requests touching either user - _conn.execute( + _get_conn().execute( """DELETE FROM partnerships WHERE status = 'pending' AND (from_user_id IN (?, ?) OR to_user_id IN (?, ?))""", @@ -829,7 +849,7 @@ def partnership_accept(request_id: int, user_id: int) -> bool: def partnership_decline(request_id: int, user_id: int) -> bool: """Decline an incoming pending request (deletes the row).""" with _lock: - cur = _conn.execute( + cur = _get_conn().execute( """DELETE FROM partnerships WHERE id = ? AND status = 'pending' AND (to_user_id = ? OR from_user_id = ?)""", @@ -841,7 +861,7 @@ def partnership_decline(request_id: int, user_id: int) -> bool: def partnership_unlink(user_id: int) -> bool: """Remove the current accepted partnership (either side can call).""" with _lock: - cur = _conn.execute( + cur = _get_conn().execute( """DELETE FROM partnerships WHERE status = 'accepted' AND (from_user_id = ? OR to_user_id = ?)""", @@ -853,10 +873,10 @@ def partnership_unlink(user_id: int) -> bool: def partner_flat_actions(partner_id: int) -> dict: """Flats the partner has touched. 'applied' = any application (regardless of outcome); 'rejected' = in flat_rejections.""" - applied = {r["flat_id"] for r in _conn.execute( + applied = {r["flat_id"] for r in _get_conn().execute( "SELECT DISTINCT flat_id FROM applications WHERE user_id = ?", (partner_id,) ).fetchall()} - rejected = {r["flat_id"] for r in _conn.execute( + rejected = {r["flat_id"] for r in _get_conn().execute( "SELECT flat_id FROM flat_rejections WHERE user_id = ?", (partner_id,) ).fetchall()} return {"applied": applied, "rejected": rejected} @@ -867,10 +887,10 @@ def cleanup_retention() -> dict: stats = {} with _lock: for table in ("errors", "audit_log"): - cur = _conn.execute(f"DELETE FROM {table} WHERE timestamp < ?", (cutoff,)) + cur = _get_conn().execute(f"DELETE FROM {table} WHERE timestamp < ?", (cutoff,)) stats[table] = cur.rowcount # Drop forensics from old applications (but keep the row itself for history) - cur = _conn.execute( + cur = _get_conn().execute( "UPDATE applications SET forensics_json = NULL WHERE started_at < ? AND forensics_json IS NOT NULL", (cutoff,), ) From ebb11178e7bb9c388dd5def630d6f1f0cc586604 Mon Sep 17 00:00:00 2001 From: EiSiMo Date: Tue, 21 Apr 2026 19:06:05 +0200 Subject: [PATCH 2/3] chore: sweep dead code across all three services MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per review §1 — verified no callers before each deletion: - _next_scrape_utc (context dict key never read by any template) - ALERT_SCRAPE_INTERVAL_SECONDS settings import (only _next_scrape_utc read it) - alert/paths.py (imported by nothing) - alert/settings.py LANGUAGE (alert doesn't use translations.toml) - alert/main.py: the vestigial `c = {}` connectivity dict, the comment about re-enabling it, and the entire connectivity block in _flat_payload — the web-side columns stay NULL on insert now - alert/maps.py: DESTINATIONS, calculate_score, _get_next_weekday, _calculate_transfers (only geocode is used in the scraper) - alert/flat.py: connectivity + display_address properties, _connectivity field, unused datetime import - apply/utils.py str_to_preview (no callers) — file removed - web/matching.py: max_morning_commute + commute check - web/app.py: don't pass connectivity dict into flat_matches_filter, don't write email_address through update_notifications - web/db.py: get_error (no callers); drop kill_switch, max_morning_commute, email_address from their allowed-sets so they're not writable through update_* anymore - web/settings.py + docker-compose.yml: SMTP_HOST/PORT/USERNAME/PASSWORD/ FROM/STARTTLS (notifications.py is telegram-only now) DB columns themselves (kill_switch, email_address, max_morning_commute, connectivity_morning_time, connectivity_night_time) stay in the schema — SQLite can't drop them cheaply and they're harmless. Co-Authored-By: Claude Opus 4.7 (1M context) --- alert/flat.py | 17 ---------- alert/main.py | 11 ------- alert/maps.py | 81 +--------------------------------------------- alert/paths.py | 7 ---- alert/settings.py | 1 - apply/utils.py | 12 ------- docker-compose.yml | 6 ---- web/app.py | 15 +-------- web/db.py | 10 ++---- web/matching.py | 3 -- web/settings.py | 8 ----- 11 files changed, 5 insertions(+), 166 deletions(-) delete mode 100644 alert/paths.py delete mode 100644 apply/utils.py diff --git a/alert/flat.py b/alert/flat.py index 54958b8..e9f828d 100644 --- a/alert/flat.py +++ b/alert/flat.py @@ -27,7 +27,6 @@ class Flat: self.raw_data = data self.id = self.link # we could use data.get('id', None) but link is easier to debug self.gmaps = maps.Maps() - self._connectivity = None self._coords = None self.address_link_gmaps = f"https://www.google.com/maps/search/?api=1&query={quote(self.address)}" @@ -54,24 +53,8 @@ class Flat: return self.total_rent / self.size return 0.0 - @property - def connectivity(self): - if not self._connectivity: - self._connectivity = self.gmaps.calculate_score(self.address) - return self._connectivity - @property def coords(self): if self._coords is None: self._coords = self.gmaps.geocode(self.address) or (None, None) return self._coords - - @property - def display_address(self): - if ',' in self.address: - parts = self.address.split(',', 1) - street_part = parts[0].strip() - city_part = parts[1].replace(',', '').strip() - return f"{street_part}\n{city_part}" - else: - return self.address diff --git a/alert/main.py b/alert/main.py index 0b63345..204d4ee 100644 --- a/alert/main.py +++ b/alert/main.py @@ -31,11 +31,6 @@ class FlatAlerter: self.last_response_hash = "" def _flat_payload(self, flat: Flat) -> dict: - # Transit-connectivity is disabled to save Google-Maps quota. The - # helper on Flat (flat.connectivity → Maps.calculate_score) is - # intentionally kept so it can be re-enabled without re-writing code — - # just replace the empty dict with `flat.connectivity` when needed. - c: dict = {} lat, lng = flat.coords return { "id": flat.id, @@ -60,12 +55,6 @@ class FlatAlerter: "address_link_gmaps": flat.address_link_gmaps, "lat": lat, "lng": lng, - "connectivity": { - "morning_time": c.get("morning_time", 0), - "morning_transfers": c.get("morning_transfers", 0), - "night_time": c.get("night_time", 0), - "night_transfers": c.get("night_transfers", 0), - }, "raw_data": flat.raw_data, } diff --git a/alert/maps.py b/alert/maps.py index f07ce4c..7002ed4 100644 --- a/alert/maps.py +++ b/alert/maps.py @@ -1,25 +1,12 @@ import logging import googlemaps -from datetime import datetime, timedelta, time as dt_time from settings import GMAPS_API_KEY logger = logging.getLogger("flat-alert") -class Maps: - DESTINATIONS = { - "Hbf": "Berlin Hauptbahnhof", - "Friedrichstr": "Friedrichstraße, Berlin", - "Kotti": "Kottbusser Tor, Berlin", - "Warschauer": "Warschauer Straße, Berlin", - "Ostkreuz": "Ostkreuz, Berlin", - "Nollendorf": "Nollendorfplatz, Berlin", - "Zoo": "Zoologischer Garten, Berlin", - "Kudamm": "Kurfürstendamm, Berlin", - "Gesundbrunnen": "Gesundbrunnen, Berlin", - "Hermannplatz": "Hermannplatz, Berlin" - } +class Maps: def __init__(self): self.gmaps = googlemaps.Client(key=GMAPS_API_KEY) @@ -36,69 +23,3 @@ class Maps: except Exception as e: logger.warning("geocode failed for %r: %s", address, e) return None - - def _get_next_weekday(self, date, weekday): - days_ahead = weekday - date.weekday() - if days_ahead <= 0: - days_ahead += 7 - return date + timedelta(days_ahead) - - def _calculate_transfers(self, steps): - transit_count = sum(1 for step in steps if step['travel_mode'] == 'TRANSIT') - return max(0, transit_count - 1) - - def calculate_score(self, origin_address): - now = datetime.now() - # Next Monday 8:00 AM - next_monday = self._get_next_weekday(now, 0) - morning_departure = datetime.combine(next_monday.date(), dt_time(8, 0)) - # Next Sunday 2:00 AM - next_sunday = self._get_next_weekday(now, 6) - night_departure = datetime.combine(next_sunday.date(), dt_time(2, 0)) - - total_morning_minutes = 0 - total_morning_transfers = 0 - total_night_minutes = 0 - total_night_transfers = 0 - dest_count = 0 - - for key, dest_address in self.DESTINATIONS.items(): - # Morning: Flat -> Center - routes_morning = self.gmaps.directions( - origin=origin_address, - destination=dest_address, - mode="transit", - departure_time=morning_departure - ) - - # Night: Center -> Flat - routes_night = self.gmaps.directions( - origin=dest_address, - destination=origin_address, - mode="transit", - departure_time=night_departure - ) - - if routes_morning: - leg = routes_morning[0]['legs'][0] - total_morning_minutes += leg['duration']['value'] / 60 - total_morning_transfers += self._calculate_transfers(leg['steps']) - - if routes_night: - leg = routes_night[0]['legs'][0] - total_night_minutes += leg['duration']['value'] / 60 - total_night_transfers += self._calculate_transfers(leg['steps']) - - dest_count += 1 - - avg_m_time = total_morning_minutes / dest_count if dest_count else 0 - avg_m_trans = total_morning_transfers / dest_count if dest_count else 0 - avg_n_time = total_night_minutes / dest_count if dest_count else 0 - avg_n_trans = total_night_transfers / dest_count if dest_count else 0 - - return { - 'morning_time': avg_m_time, - 'morning_transfers': avg_m_trans, - 'night_time': avg_n_time, - 'night_transfers': avg_n_trans - } diff --git a/alert/paths.py b/alert/paths.py deleted file mode 100644 index d12fff4..0000000 --- a/alert/paths.py +++ /dev/null @@ -1,7 +0,0 @@ -import os - -DATA_DIR = "data" -ALREADY_NOTIFIED_FILE = "data/already_notified.txt" - -# create dirs if they do not exist yet. -os.makedirs(DATA_DIR, exist_ok=True) \ No newline at end of file diff --git a/alert/settings.py b/alert/settings.py index 0a04d1f..f905f4f 100644 --- a/alert/settings.py +++ b/alert/settings.py @@ -13,7 +13,6 @@ def _required(key: str) -> str: return val -LANGUAGE: str = getenv("LANGUAGE", "en") TIME_INTERVALL: int = int(getenv("SLEEP_INTERVALL", "60")) # web backend: alert POSTs discovered flats here diff --git a/apply/utils.py b/apply/utils.py deleted file mode 100644 index 9824937..0000000 --- a/apply/utils.py +++ /dev/null @@ -1,12 +0,0 @@ -import logging - -logger = logging.getLogger("flat-apply") - -def str_to_preview(string, max_length): - if not max_length > 3: - raise ValueError('max_length must be greater than 3') - - first_line = string.split('\n')[0] - if len(first_line) > max_length: - return first_line[:max_length-3] + '...' - return first_line diff --git a/docker-compose.yml b/docker-compose.yml index 016b515..43ca255 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -22,12 +22,6 @@ services: - RETENTION_DAYS=${RETENTION_DAYS:-14} - RETENTION_RUN_INTERVAL_SECONDS=${RETENTION_RUN_INTERVAL_SECONDS:-3600} - PUBLIC_URL=${PUBLIC_URL:-https://flat.lab.moritz.run} - - SMTP_HOST=${SMTP_HOST:-} - - SMTP_PORT=${SMTP_PORT:-587} - - SMTP_USERNAME=${SMTP_USERNAME:-} - - SMTP_PASSWORD=${SMTP_PASSWORD:-} - - SMTP_FROM=${SMTP_FROM:-wohnungsdidi@localhost} - - SMTP_STARTTLS=${SMTP_STARTTLS:-true} - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY:-} - ANTHROPIC_MODEL=${ANTHROPIC_MODEL:-claude-haiku-4-5-20251001} volumes: diff --git a/web/app.py b/web/app.py index 750be75..dc0924d 100644 --- a/web/app.py +++ b/web/app.py @@ -53,7 +53,6 @@ from auth import ( ) from matching import flat_matches_filter, row_to_dict from settings import ( - ALERT_SCRAPE_INTERVAL_SECONDS, APPLY_FAILURE_THRESHOLD, INTERNAL_API_KEY, PUBLIC_URL, @@ -200,16 +199,6 @@ def _auto_apply_allowed(prefs) -> bool: return apply_client.health() -def _next_scrape_utc() -> str: - hb = db.get_state("last_alert_heartbeat") - dt = _parse_iso(hb) - if dt is None: - return "" - if dt.tzinfo is None: - dt = dt.replace(tzinfo=timezone.utc) - return (dt + timedelta(seconds=ALERT_SCRAPE_INTERVAL_SECONDS)).astimezone(timezone.utc).isoformat(timespec="seconds") - - def _last_scrape_utc() -> str: hb = db.get_state("last_alert_heartbeat") dt = _parse_iso(hb) @@ -425,7 +414,7 @@ def _wohnungen_context(user) -> dict: continue if not flat_matches_filter({ "rooms": f["rooms"], "total_rent": f["total_rent"], "size": f["size"], - "wbs": f["wbs"], "connectivity": {"morning_time": f["connectivity_morning_time"]}, + "wbs": f["wbs"], }, filters): continue last = db.last_application_for_flat(uid, f["id"]) @@ -499,7 +488,6 @@ def _wohnungen_context(user) -> dict: "apply_allowed": allowed, "apply_block_reason": reason, "apply_reachable": apply_client.health(), - "next_scrape_utc": _next_scrape_utc(), "last_scrape_utc": _last_scrape_utc(), "has_running_apply": has_running, "poll_interval": 3 if has_running else 30, @@ -1063,7 +1051,6 @@ async def action_notifications(request: Request, user=Depends(require_user)): "channel": channel, "telegram_bot_token": form.get("telegram_bot_token", ""), "telegram_chat_id": form.get("telegram_chat_id", ""), - "email_address": "", "notify_on_match": _b("notify_on_match"), "notify_on_apply_success": _b("notify_on_apply_success"), "notify_on_apply_fail": _b("notify_on_apply_fail"), diff --git a/web/db.py b/web/db.py index a926d4c..86e8e7a 100644 --- a/web/db.py +++ b/web/db.py @@ -435,7 +435,7 @@ def get_filters(user_id: int) -> sqlite3.Row: def update_filters(user_id: int, data: dict) -> None: _ensure_user_rows(user_id) allowed = {"rooms_min", "rooms_max", "max_rent", "min_size", - "max_morning_commute", "wbs_required", "max_age_hours"} + "wbs_required", "max_age_hours"} clean = {k: data.get(k) for k in allowed if k in data} if not clean: return @@ -453,7 +453,7 @@ def get_notifications(user_id: int) -> sqlite3.Row: def update_notifications(user_id: int, data: dict) -> None: _ensure_user_rows(user_id) allowed = { - "channel", "telegram_bot_token", "telegram_chat_id", "email_address", + "channel", "telegram_bot_token", "telegram_chat_id", "notify_on_match", "notify_on_apply_success", "notify_on_apply_fail", } clean = {k: v for k, v in data.items() if k in allowed} @@ -473,7 +473,7 @@ def get_preferences(user_id: int) -> sqlite3.Row: def update_preferences(user_id: int, data: dict) -> None: _ensure_user_rows(user_id) allowed = { - "auto_apply_enabled", "submit_forms", "kill_switch", + "auto_apply_enabled", "submit_forms", "apply_circuit_open", "apply_recent_failures", } clean = {k: v for k, v in data.items() if k in allowed} @@ -719,10 +719,6 @@ def recent_errors(user_id: Optional[int], limit: int = 100, ).fetchall()) -def get_error(error_id: int) -> Optional[sqlite3.Row]: - return _get_conn().execute("SELECT * FROM errors WHERE id = ?", (error_id,)).fetchone() - - # --------------------------------------------------------------------------- # Audit log # --------------------------------------------------------------------------- diff --git a/web/matching.py b/web/matching.py index 09bf0ea..f468277 100644 --- a/web/matching.py +++ b/web/matching.py @@ -18,7 +18,6 @@ def flat_matches_filter(flat: dict, f: dict | None) -> bool: rooms = flat.get("rooms") or 0.0 rent = flat.get("total_rent") or 0.0 size = flat.get("size") or 0.0 - commute = (flat.get("connectivity") or {}).get("morning_time") or 0.0 wbs_str = str(flat.get("wbs", "")).strip().lower() if f.get("rooms_min") is not None and rooms < float(f["rooms_min"]): @@ -29,8 +28,6 @@ def flat_matches_filter(flat: dict, f: dict | None) -> bool: return False if f.get("min_size") is not None and size < float(f["min_size"]): return False - if f.get("max_morning_commute") is not None and commute > float(f["max_morning_commute"]): - return False wbs_req = (f.get("wbs_required") or "").strip().lower() if wbs_req == "yes": diff --git a/web/settings.py b/web/settings.py index ca95511..343450b 100644 --- a/web/settings.py +++ b/web/settings.py @@ -55,14 +55,6 @@ RETENTION_RUN_INTERVAL_SECONDS: int = int(getenv("RETENTION_RUN_INTERVAL_SECONDS LOGIN_RATE_LIMIT: int = int(getenv("LOGIN_RATE_LIMIT", "5")) LOGIN_RATE_WINDOW_SECONDS: int = int(getenv("LOGIN_RATE_WINDOW_SECONDS", "900")) -# --- Email (system-wide SMTP for notifications) ------------------------------- -SMTP_HOST: str = getenv("SMTP_HOST", "") -SMTP_PORT: int = int(getenv("SMTP_PORT", "587")) -SMTP_USERNAME: str = getenv("SMTP_USERNAME", "") -SMTP_PASSWORD: str = getenv("SMTP_PASSWORD", "") -SMTP_FROM: str = getenv("SMTP_FROM", "wohnungsdidi@localhost") -SMTP_STARTTLS: bool = getenv("SMTP_STARTTLS", "true").lower() in ("true", "1", "yes", "on") - # --- App URL (used to build links in notifications) --------------------------- PUBLIC_URL: str = getenv("PUBLIC_URL", "https://flat.lab.moritz.run") From 77098c82df02aec2df47cfb3b310e75d8af97e5a Mon Sep 17 00:00:00 2001 From: EiSiMo Date: Tue, 21 Apr 2026 19:06:48 +0200 Subject: [PATCH 3/3] perf: one grouped query for latest per-flat application, not N MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit _wohnungen_context was calling last_application_for_flat(uid, f.id) inside the flats loop — at 100 flats that's 101 queries per poll, and with running applications the page polls every 3s. New db.latest_applications_by_flat(user_id) returns {flat_id: row} via a single grouped self-join. Co-Authored-By: Claude Opus 4.7 (1M context) --- web/app.py | 6 ++++-- web/db.py | 18 ++++++++++++++++++ 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/web/app.py b/web/app.py index dc0924d..a23f9e1 100644 --- a/web/app.py +++ b/web/app.py @@ -400,6 +400,9 @@ def _wohnungen_context(user) -> dict: age_cutoff = None if max_age_hours: age_cutoff = datetime.now(timezone.utc) - timedelta(hours=int(max_age_hours)) + # One query for this user's latest application per flat, instead of a + # per-flat query inside the loop. + latest_apps = db.latest_applications_by_flat(uid) flats_view = [] for f in flats: if f["id"] in rejected: @@ -417,8 +420,7 @@ def _wohnungen_context(user) -> dict: "wbs": f["wbs"], }, filters): continue - last = db.last_application_for_flat(uid, f["id"]) - flats_view.append({"row": f, "last": last}) + flats_view.append({"row": f, "last": latest_apps.get(f["id"])}) rejected_view = db.rejected_flats(uid) enrichment_counts = db.enrichment_counts() diff --git a/web/db.py b/web/db.py index 86e8e7a..43a763e 100644 --- a/web/db.py +++ b/web/db.py @@ -675,6 +675,24 @@ def last_application_for_flat(user_id: int, flat_id: str) -> Optional[sqlite3.Ro ).fetchone() +def latest_applications_by_flat(user_id: int) -> dict: + """Return a dict {flat_id: latest-application-row} for all flats the user + has interacted with. One query instead of N.""" + rows = _get_conn().execute( + """SELECT a.* + FROM applications a + JOIN ( + SELECT flat_id, MAX(started_at) AS maxstart + FROM applications + WHERE user_id = ? + GROUP BY flat_id + ) m ON m.flat_id = a.flat_id AND m.maxstart = a.started_at + WHERE a.user_id = ?""", + (user_id, user_id), + ).fetchall() + return {r["flat_id"]: r for r in rows} + + def has_running_application(user_id: int) -> bool: row = _get_conn().execute( "SELECT 1 FROM applications WHERE user_id = ? AND finished_at IS NULL LIMIT 1",