""" SQLite data layer for lazyflat. Multi-user: users, per-user profiles/filters/notifications/preferences. All per-user rows are 1:1 with users. Errors and forensics are retained for 14 days and cleaned up periodically. """ import json import logging import sqlite3 import threading from contextlib import contextmanager from datetime import datetime, timedelta, timezone from typing import Any, Iterable, Optional from settings import DB_PATH, RETENTION_DAYS logger = logging.getLogger("web.db") # WAL mode permits any number of concurrent readers and one writer at a time, # but a single sqlite3.Connection object is not safe to share across threads # (cursors can collide). We keep one connection per thread via threading.local # and a module-level write lock so concurrent writers don't trip # "database is locked" during BEGIN IMMEDIATE. _lock = threading.Lock() _local = threading.local() def _connect() -> sqlite3.Connection: conn = sqlite3.connect(DB_PATH, isolation_level=None, check_same_thread=False) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA foreign_keys=ON") conn.execute("PRAGMA busy_timeout=5000") return conn def _get_conn() -> sqlite3.Connection: c = getattr(_local, "conn", None) if c is None: c = _connect() _local.conn = c return c @contextmanager def _tx(): """Atomic BEGIN IMMEDIATE … COMMIT/ROLLBACK. Required for multi-statement writes because our connections run in autocommit mode (isolation_level=None). Combine with _lock to serialize writers and avoid BUSY. """ c = _get_conn() c.execute("BEGIN IMMEDIATE") try: yield c except Exception: c.execute("ROLLBACK") raise else: c.execute("COMMIT") # --------------------------------------------------------------------------- # Schema # --------------------------------------------------------------------------- MIGRATIONS: list[str] = [ # 0001: drop legacy single-user tables if present (only matters on upgrade # from the pre-multi-user commit; no production data yet). Safe for fresh DBs. """ DROP TABLE IF EXISTS applications; DROP TABLE IF EXISTS flats; DROP TABLE IF EXISTS state; DROP TABLE IF EXISTS audit_log; """, # 0002: base + users """ CREATE TABLE IF NOT EXISTS schema_version (version INTEGER PRIMARY KEY); CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL COLLATE NOCASE, password_hash TEXT NOT NULL, is_admin INTEGER NOT NULL DEFAULT 0, disabled INTEGER NOT NULL DEFAULT 0, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS user_profiles ( user_id INTEGER PRIMARY KEY REFERENCES users(id) ON DELETE CASCADE, salutation TEXT DEFAULT '', firstname TEXT DEFAULT '', lastname TEXT DEFAULT '', email TEXT DEFAULT '', telephone TEXT DEFAULT '', street TEXT DEFAULT '', house_number TEXT DEFAULT '', postcode TEXT DEFAULT '', city TEXT DEFAULT '', is_possessing_wbs INTEGER NOT NULL DEFAULT 0, wbs_type TEXT DEFAULT '0', wbs_valid_till TEXT DEFAULT '1970-01-01', wbs_rooms INTEGER NOT NULL DEFAULT 0, wbs_adults INTEGER NOT NULL DEFAULT 0, wbs_children INTEGER NOT NULL DEFAULT 0, is_prio_wbs INTEGER NOT NULL DEFAULT 0, immomio_email TEXT DEFAULT '', immomio_password TEXT DEFAULT '', updated_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS user_filters ( user_id INTEGER PRIMARY KEY REFERENCES users(id) ON DELETE CASCADE, rooms_min REAL, rooms_max REAL, max_rent REAL, min_size REAL, max_morning_commute REAL, wbs_required TEXT DEFAULT '', -- '', 'yes', 'no' updated_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS user_notifications ( user_id INTEGER PRIMARY KEY REFERENCES users(id) ON DELETE CASCADE, channel TEXT NOT NULL DEFAULT 'ui', -- 'ui' | 'telegram' | 'email' telegram_bot_token TEXT DEFAULT '', telegram_chat_id TEXT DEFAULT '', email_address TEXT DEFAULT '', notify_on_match INTEGER NOT NULL DEFAULT 1, notify_on_apply_success INTEGER NOT NULL DEFAULT 1, notify_on_apply_fail INTEGER NOT NULL DEFAULT 1, updated_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS user_preferences ( user_id INTEGER PRIMARY KEY REFERENCES users(id) ON DELETE CASCADE, auto_apply_enabled INTEGER NOT NULL DEFAULT 0, submit_forms INTEGER NOT NULL DEFAULT 0, kill_switch INTEGER NOT NULL DEFAULT 0, apply_circuit_open INTEGER NOT NULL DEFAULT 0, apply_recent_failures INTEGER NOT NULL DEFAULT 0, updated_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS flats ( id TEXT PRIMARY KEY, link TEXT NOT NULL, address TEXT, rooms REAL, size REAL, total_rent REAL, sqm_price REAL, year_built TEXT, wbs TEXT, connectivity_morning_time REAL, connectivity_night_time REAL, address_link_gmaps TEXT, payload_json TEXT NOT NULL, discovered_at TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_flats_discovered ON flats(discovered_at DESC); CREATE TABLE IF NOT EXISTS applications ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE, flat_id TEXT NOT NULL REFERENCES flats(id), url TEXT NOT NULL, triggered_by TEXT NOT NULL, -- 'user' | 'auto' submit_forms_used INTEGER NOT NULL DEFAULT 0, provider TEXT DEFAULT '', started_at TEXT NOT NULL, finished_at TEXT, success INTEGER, message TEXT, profile_snapshot_json TEXT, forensics_json TEXT -- structured payload from apply service ); CREATE INDEX IF NOT EXISTS idx_applications_user ON applications(user_id, started_at DESC); CREATE INDEX IF NOT EXISTS idx_applications_started ON applications(started_at DESC); CREATE INDEX IF NOT EXISTS idx_applications_flat ON applications(flat_id); CREATE TABLE IF NOT EXISTS errors ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, user_id INTEGER, source TEXT NOT NULL, -- 'apply'|'alert'|'web'|'system' kind TEXT NOT NULL, -- 'apply_failure'|'scraper_error'|... summary TEXT NOT NULL, application_id INTEGER, context_json TEXT ); CREATE INDEX IF NOT EXISTS idx_errors_timestamp ON errors(timestamp DESC); CREATE INDEX IF NOT EXISTS idx_errors_user ON errors(user_id, timestamp DESC); CREATE TABLE IF NOT EXISTS audit_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, user_id INTEGER, actor TEXT NOT NULL, action TEXT NOT NULL, details TEXT, ip TEXT ); CREATE INDEX IF NOT EXISTS idx_audit_timestamp ON audit_log(timestamp DESC); CREATE INDEX IF NOT EXISTS idx_audit_user ON audit_log(user_id, timestamp DESC); CREATE TABLE IF NOT EXISTS system_state ( key TEXT PRIMARY KEY, value TEXT NOT NULL ); """, # 0003: lat/lng for map view """ ALTER TABLE flats ADD COLUMN lat REAL; ALTER TABLE flats ADD COLUMN lng REAL; """, # 0004: per-user rejections — flats the user doesn't want in the list anymore """ CREATE TABLE IF NOT EXISTS flat_rejections ( user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE, flat_id TEXT NOT NULL REFERENCES flats(id), rejected_at TEXT NOT NULL, PRIMARY KEY (user_id, flat_id) ); CREATE INDEX IF NOT EXISTS idx_rejections_user ON flat_rejections(user_id); """, # 0005: LLM enrichment — extracted details + downloaded image count per flat """ ALTER TABLE flats ADD COLUMN enrichment_json TEXT; ALTER TABLE flats ADD COLUMN enrichment_status TEXT NOT NULL DEFAULT 'pending'; ALTER TABLE flats ADD COLUMN enrichment_updated_at TEXT; ALTER TABLE flats ADD COLUMN image_count INTEGER NOT NULL DEFAULT 0; """, # 0006: time filter — hide flats older than X hours (NULL = no limit) """ ALTER TABLE user_filters ADD COLUMN max_age_hours INTEGER; """, # 0007: secrets table — API keys / scraper creds editable from admin UI """ CREATE TABLE IF NOT EXISTS secrets ( key TEXT PRIMARY KEY, value TEXT NOT NULL, updated_at TEXT NOT NULL ); """, # 0008: partnerships — pair up two user accounts so each sees the other's # apply/reject interactions in the flat list. """ CREATE TABLE IF NOT EXISTS partnerships ( id INTEGER PRIMARY KEY AUTOINCREMENT, from_user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE, to_user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE, status TEXT NOT NULL DEFAULT 'pending', -- 'pending' | 'accepted' created_at TEXT NOT NULL, accepted_at TEXT, UNIQUE(from_user_id, to_user_id), CHECK(from_user_id != to_user_id) ); CREATE INDEX IF NOT EXISTS idx_partnerships_from ON partnerships(from_user_id); CREATE INDEX IF NOT EXISTS idx_partnerships_to ON partnerships(to_user_id); """, # 0009: composite index for latest_applications_by_flat + last_application_for_flat """ CREATE INDEX IF NOT EXISTS idx_applications_user_flat_started ON applications(user_id, flat_id, started_at DESC); """, # 0010: flats go offline — mark globally so they drop out of every user's list """ ALTER TABLE flats ADD COLUMN offline_at TEXT; CREATE INDEX IF NOT EXISTS idx_flats_offline ON flats(offline_at); """, ] def _current_version() -> int: try: row = _get_conn().execute("SELECT COALESCE(MAX(version), 0) AS v FROM schema_version").fetchone() return int(row["v"]) if row else 0 except sqlite3.Error: return 0 def init_db() -> None: with _lock: _get_conn().execute("CREATE TABLE IF NOT EXISTS schema_version (version INTEGER PRIMARY KEY)") current = _current_version() for i, script in enumerate(MIGRATIONS, start=1): if i <= current: continue logger.info("applying migration v%d", i) _get_conn().executescript(script) _get_conn().execute("INSERT OR IGNORE INTO schema_version(version) VALUES (?)", (i,)) _get_conn().execute( "INSERT OR IGNORE INTO system_state(key, value) VALUES ('last_alert_heartbeat', '')" ) logger.info("DB initialized (schema v%d)", _current_version()) def now_iso() -> str: return datetime.now(timezone.utc).isoformat(timespec="seconds") # --------------------------------------------------------------------------- # Secrets (admin-editable, source of truth for runtime creds) # --------------------------------------------------------------------------- SECRET_KEYS = ("ANTHROPIC_API_KEY", "BERLIN_WOHNEN_USERNAME", "BERLIN_WOHNEN_PASSWORD") def get_secret(key: str) -> Optional[str]: row = _get_conn().execute("SELECT value FROM secrets WHERE key = ?", (key,)).fetchone() return row["value"] if row else None def set_secret(key: str, value: str) -> None: with _lock: _get_conn().execute( "INSERT INTO secrets(key, value, updated_at) VALUES (?, ?, ?) " "ON CONFLICT(key) DO UPDATE SET value = excluded.value, " " updated_at = excluded.updated_at", (key, value, now_iso()), ) def all_secrets() -> dict[str, str]: rows = _get_conn().execute("SELECT key, value FROM secrets").fetchall() return {r["key"]: r["value"] for r in rows} def seed_secrets_from_env() -> None: """Copy env values into the DB for any secret key that's still empty. Idempotent: existing DB values are never overwritten.""" import os for k in SECRET_KEYS: existing = get_secret(k) if existing: continue env_val = os.environ.get(k, "") if env_val: set_secret(k, env_val) logger.info("seeded secret %s from env", k) # --------------------------------------------------------------------------- # System state # --------------------------------------------------------------------------- def get_state(key: str) -> Optional[str]: row = _get_conn().execute("SELECT value FROM system_state WHERE key = ?", (key,)).fetchone() return row["value"] if row else None def set_state(key: str, value: str) -> None: with _lock: _get_conn().execute( "INSERT INTO system_state(key, value) VALUES (?, ?) " "ON CONFLICT(key) DO UPDATE SET value = excluded.value", (key, value), ) # --------------------------------------------------------------------------- # Users # --------------------------------------------------------------------------- def _ensure_user_rows(user_id: int) -> None: ts = now_iso() with _lock: for q in ( "INSERT OR IGNORE INTO user_profiles(user_id, updated_at) VALUES (?, ?)", "INSERT OR IGNORE INTO user_filters(user_id, updated_at) VALUES (?, ?)", "INSERT OR IGNORE INTO user_notifications(user_id, updated_at) VALUES (?, ?)", "INSERT OR IGNORE INTO user_preferences(user_id, updated_at) VALUES (?, ?)", ): _get_conn().execute(q, (user_id, ts)) def create_user(username: str, password_hash: str, is_admin: bool = False) -> int: ts = now_iso() with _lock: cur = _get_conn().execute( "INSERT INTO users(username, password_hash, is_admin, created_at, updated_at) " "VALUES (?, ?, ?, ?, ?)", (username, password_hash, 1 if is_admin else 0, ts, ts), ) uid = cur.lastrowid _ensure_user_rows(uid) logger.info("created user id=%s username=%s admin=%s", uid, username, is_admin) return uid def get_user_by_username(username: str) -> Optional[sqlite3.Row]: return _get_conn().execute( "SELECT * FROM users WHERE username = ? COLLATE NOCASE AND disabled = 0", (username,) ).fetchone() def get_user(user_id: int) -> Optional[sqlite3.Row]: return _get_conn().execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone() def list_users() -> list[sqlite3.Row]: return list(_get_conn().execute("SELECT * FROM users ORDER BY username").fetchall()) def set_user_password(user_id: int, password_hash: str) -> None: with _lock: _get_conn().execute( "UPDATE users SET password_hash = ?, updated_at = ? WHERE id = ?", (password_hash, now_iso(), user_id), ) def set_user_disabled(user_id: int, disabled: bool) -> None: with _lock: _get_conn().execute( "UPDATE users SET disabled = ?, updated_at = ? WHERE id = ?", (1 if disabled else 0, now_iso(), user_id), ) def delete_user(user_id: int) -> None: """Remove a user and everything that cascades (profile, filters, notifications, preferences, rejections, applications). Audit/error logs stay (user_id column is nullable).""" with _lock: _get_conn().execute("DELETE FROM users WHERE id = ?", (user_id,)) # --------------------------------------------------------------------------- # User profile / filters / notifications / preferences # --------------------------------------------------------------------------- def get_profile(user_id: int) -> sqlite3.Row: _ensure_user_rows(user_id) return _get_conn().execute("SELECT * FROM user_profiles WHERE user_id = ?", (user_id,)).fetchone() def update_profile(user_id: int, data: dict) -> None: _ensure_user_rows(user_id) allowed = { "salutation", "firstname", "lastname", "email", "telephone", "street", "house_number", "postcode", "city", "is_possessing_wbs", "wbs_type", "wbs_valid_till", "wbs_rooms", "wbs_adults", "wbs_children", "is_prio_wbs", "immomio_email", "immomio_password", } clean = {k: v for k, v in data.items() if k in allowed} if not clean: return cols = ", ".join(f"{k} = ?" for k in clean) vals = list(clean.values()) + [now_iso(), user_id] with _lock: _get_conn().execute(f"UPDATE user_profiles SET {cols}, updated_at = ? WHERE user_id = ?", vals) def get_filters(user_id: int) -> sqlite3.Row: _ensure_user_rows(user_id) return _get_conn().execute("SELECT * FROM user_filters WHERE user_id = ?", (user_id,)).fetchone() def update_filters(user_id: int, data: dict) -> None: _ensure_user_rows(user_id) allowed = {"rooms_min", "rooms_max", "max_rent", "min_size", "wbs_required", "max_age_hours"} clean = {k: data.get(k) for k in allowed if k in data} if not clean: return cols = ", ".join(f"{k} = ?" for k in clean) vals = list(clean.values()) + [now_iso(), user_id] with _lock: _get_conn().execute(f"UPDATE user_filters SET {cols}, updated_at = ? WHERE user_id = ?", vals) def get_notifications(user_id: int) -> sqlite3.Row: _ensure_user_rows(user_id) return _get_conn().execute("SELECT * FROM user_notifications WHERE user_id = ?", (user_id,)).fetchone() def update_notifications(user_id: int, data: dict) -> None: _ensure_user_rows(user_id) allowed = { "channel", "telegram_bot_token", "telegram_chat_id", "notify_on_match", "notify_on_apply_success", "notify_on_apply_fail", } clean = {k: v for k, v in data.items() if k in allowed} if not clean: return cols = ", ".join(f"{k} = ?" for k in clean) vals = list(clean.values()) + [now_iso(), user_id] with _lock: _get_conn().execute(f"UPDATE user_notifications SET {cols}, updated_at = ? WHERE user_id = ?", vals) def get_preferences(user_id: int) -> sqlite3.Row: _ensure_user_rows(user_id) return _get_conn().execute("SELECT * FROM user_preferences WHERE user_id = ?", (user_id,)).fetchone() def update_preferences(user_id: int, data: dict) -> None: _ensure_user_rows(user_id) allowed = { "auto_apply_enabled", "submit_forms", "apply_circuit_open", "apply_recent_failures", } clean = {k: v for k, v in data.items() if k in allowed} if not clean: return cols = ", ".join(f"{k} = ?" for k in clean) vals = list(clean.values()) + [now_iso(), user_id] with _lock: _get_conn().execute(f"UPDATE user_preferences SET {cols}, updated_at = ? WHERE user_id = ?", vals) # --------------------------------------------------------------------------- # Flats # --------------------------------------------------------------------------- def upsert_flat(payload: dict) -> bool: flat_id = str(payload["id"]) with _lock: existing = _get_conn().execute( "SELECT id, lat, lng FROM flats WHERE id = ?", (flat_id,) ).fetchone() if existing: # Backfill coords on old rows that pre-date the lat/lng migration. if (existing["lat"] is None or existing["lng"] is None) \ and payload.get("lat") is not None and payload.get("lng") is not None: _get_conn().execute( "UPDATE flats SET lat = ?, lng = ? WHERE id = ?", (payload["lat"], payload["lng"], flat_id), ) return False c = payload.get("connectivity") or {} _get_conn().execute( """INSERT INTO flats( id, link, address, rooms, size, total_rent, sqm_price, year_built, wbs, connectivity_morning_time, connectivity_night_time, address_link_gmaps, payload_json, discovered_at, lat, lng ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( flat_id, payload.get("link", ""), payload.get("address", ""), payload.get("rooms"), payload.get("size"), payload.get("total_rent"), payload.get("sqm_price"), str(payload.get("year_built", "")), str(payload.get("wbs", "")), c.get("morning_time"), c.get("night_time"), payload.get("address_link_gmaps"), json.dumps(payload, default=str), now_iso(), payload.get("lat"), payload.get("lng"), ), ) return True def recent_flats(limit: int = 50) -> list[sqlite3.Row]: return list(_get_conn().execute( "SELECT * FROM flats WHERE offline_at IS NULL " "ORDER BY discovered_at DESC LIMIT ?", (limit,) ).fetchall()) def mark_flat_offline(flat_id: str) -> None: """Flag a flat as no longer reachable. Idempotent — the first offline timestamp wins so we can tell how long it's been gone.""" with _lock: _get_conn().execute( "UPDATE flats SET offline_at = ? WHERE id = ? AND offline_at IS NULL", (now_iso(), flat_id), ) def get_flat(flat_id: str) -> Optional[sqlite3.Row]: return _get_conn().execute("SELECT * FROM flats WHERE id = ?", (flat_id,)).fetchone() def set_flat_enrichment(flat_id: str, status: str, enrichment: Optional[dict] = None, image_count: int = 0) -> None: with _lock: _get_conn().execute( """UPDATE flats SET enrichment_status = ?, enrichment_json = ?, enrichment_updated_at = ?, image_count = ? WHERE id = ?""", (status, json.dumps(enrichment) if enrichment is not None else None, now_iso(), image_count, flat_id), ) def flats_needing_enrichment(limit: int = 100) -> list[sqlite3.Row]: return list(_get_conn().execute( """SELECT id, link FROM flats WHERE enrichment_status IN ('pending', 'failed') ORDER BY discovered_at DESC LIMIT ?""", (limit,), ).fetchall()) def enrichment_counts() -> dict: row = _get_conn().execute( """SELECT COUNT(*) AS total, SUM(CASE WHEN enrichment_status = 'ok' THEN 1 ELSE 0 END) AS ok, SUM(CASE WHEN enrichment_status = 'pending' THEN 1 ELSE 0 END) AS pending, SUM(CASE WHEN enrichment_status = 'failed' THEN 1 ELSE 0 END) AS failed FROM flats""" ).fetchone() return { "total": int(row["total"] or 0), "ok": int(row["ok"] or 0), "pending": int(row["pending"] or 0), "failed": int(row["failed"] or 0), } # --------------------------------------------------------------------------- # Applications # --------------------------------------------------------------------------- def start_application(user_id: int, flat_id: str, url: str, triggered_by: str, submit_forms: bool, profile_snapshot: dict) -> int: with _lock: cur = _get_conn().execute( """INSERT INTO applications( user_id, flat_id, url, triggered_by, submit_forms_used, started_at, profile_snapshot_json ) VALUES (?, ?, ?, ?, ?, ?, ?)""", (user_id, flat_id, url, triggered_by, 1 if submit_forms else 0, now_iso(), json.dumps(profile_snapshot)), ) return cur.lastrowid def finish_application(app_id: int, success: bool, message: str, provider: str = "", forensics: Optional[dict] = None) -> None: with _lock: _get_conn().execute( """UPDATE applications SET finished_at = ?, success = ?, message = ?, provider = ?, forensics_json = ? WHERE id = ?""", (now_iso(), 1 if success else 0, message, provider, json.dumps(forensics) if forensics is not None else None, app_id), ) def get_application(app_id: int) -> Optional[sqlite3.Row]: return _get_conn().execute("SELECT * FROM applications WHERE id = ?", (app_id,)).fetchone() def recent_applications(user_id: Optional[int], limit: int = 50) -> list[sqlite3.Row]: if user_id is None: return list(_get_conn().execute( """SELECT a.*, f.address, f.link FROM applications a LEFT JOIN flats f ON f.id = a.flat_id ORDER BY a.started_at DESC LIMIT ?""", (limit,) ).fetchall()) return list(_get_conn().execute( """SELECT a.*, f.address, f.link FROM applications a LEFT JOIN flats f ON f.id = a.flat_id WHERE a.user_id = ? ORDER BY a.started_at DESC LIMIT ?""", (user_id, limit), ).fetchall()) # --------------------------------------------------------------------------- # Rejections (flats a user doesn't want to see anymore) # --------------------------------------------------------------------------- def reject_flat(user_id: int, flat_id: str) -> None: with _lock: _get_conn().execute( "INSERT OR IGNORE INTO flat_rejections(user_id, flat_id, rejected_at) VALUES (?, ?, ?)", (user_id, flat_id, now_iso()), ) def unreject_flat(user_id: int, flat_id: str) -> None: with _lock: _get_conn().execute( "DELETE FROM flat_rejections WHERE user_id = ? AND flat_id = ?", (user_id, flat_id), ) def rejected_flat_ids(user_id: int) -> set[str]: rows = _get_conn().execute( "SELECT flat_id FROM flat_rejections WHERE user_id = ?", (user_id,) ).fetchall() return {row["flat_id"] for row in rows} def rejected_flats(user_id: int, limit: int = 200) -> list[sqlite3.Row]: return list(_get_conn().execute( """SELECT f.*, r.rejected_at FROM flat_rejections r JOIN flats f ON f.id = r.flat_id WHERE r.user_id = ? ORDER BY r.rejected_at DESC LIMIT ?""", (user_id, limit), ).fetchall()) def last_application_for_flat(user_id: int, flat_id: str) -> Optional[sqlite3.Row]: return _get_conn().execute( """SELECT * FROM applications WHERE user_id = ? AND flat_id = ? ORDER BY started_at DESC LIMIT 1""", (user_id, flat_id), ).fetchone() def latest_applications_by_flat(user_id: int) -> dict: """Return a dict {flat_id: latest-application-row} for all flats the user has interacted with. One query instead of N.""" rows = _get_conn().execute( """SELECT a.* FROM applications a JOIN ( SELECT flat_id, MAX(started_at) AS maxstart FROM applications WHERE user_id = ? GROUP BY flat_id ) m ON m.flat_id = a.flat_id AND m.maxstart = a.started_at WHERE a.user_id = ?""", (user_id, user_id), ).fetchall() return {r["flat_id"]: r for r in rows} def has_running_application(user_id: int) -> bool: row = _get_conn().execute( "SELECT 1 FROM applications WHERE user_id = ? AND finished_at IS NULL LIMIT 1", (user_id,), ).fetchone() return row is not None # --------------------------------------------------------------------------- # Errors # --------------------------------------------------------------------------- def log_error(source: str, kind: str, summary: str, user_id: Optional[int] = None, application_id: Optional[int] = None, context: Optional[dict] = None) -> int: with _lock: cur = _get_conn().execute( """INSERT INTO errors(timestamp, user_id, source, kind, summary, application_id, context_json) VALUES (?, ?, ?, ?, ?, ?, ?)""", (now_iso(), user_id, source, kind, summary, application_id, json.dumps(context) if context else None), ) return cur.lastrowid def recent_errors(user_id: Optional[int], limit: int = 100, include_global: bool = False) -> list[sqlite3.Row]: if user_id is None: return list(_get_conn().execute( "SELECT * FROM errors ORDER BY timestamp DESC LIMIT ?", (limit,) ).fetchall()) if include_global: return list(_get_conn().execute( """SELECT * FROM errors WHERE user_id = ? OR user_id IS NULL ORDER BY timestamp DESC LIMIT ?""", (user_id, limit), ).fetchall()) return list(_get_conn().execute( "SELECT * FROM errors WHERE user_id = ? ORDER BY timestamp DESC LIMIT ?", (user_id, limit), ).fetchall()) # --------------------------------------------------------------------------- # Audit log # --------------------------------------------------------------------------- def log_audit(actor: str, action: str, details: str = "", user_id: Optional[int] = None, ip: str = "") -> None: with _lock: _get_conn().execute( "INSERT INTO audit_log(timestamp, user_id, actor, action, details, ip) " "VALUES (?, ?, ?, ?, ?, ?)", (now_iso(), user_id, actor, action, details, ip), ) def recent_audit(user_id: Optional[int], limit: int = 100) -> list[sqlite3.Row]: if user_id is None: return list(_get_conn().execute( "SELECT * FROM audit_log ORDER BY timestamp DESC LIMIT ?", (limit,) ).fetchall()) return list(_get_conn().execute( "SELECT * FROM audit_log WHERE user_id = ? OR user_id IS NULL ORDER BY timestamp DESC LIMIT ?", (user_id, limit), ).fetchall()) def _range_filter_rows(table: str, ts_col: str, start_iso: Optional[str], end_iso: Optional[str], limit: int) -> list[sqlite3.Row]: """Date-range filtered fetch from an append-only table. Pushes the timestamp filter into SQL so we don't drag 5000 rows into Python just to discard most of them.""" clauses, params = [], [] if start_iso: clauses.append(f"{ts_col} >= ?") params.append(start_iso) if end_iso: clauses.append(f"{ts_col} < ?") params.append(end_iso) where = ("WHERE " + " AND ".join(clauses)) if clauses else "" params.append(limit) return list(_get_conn().execute( f"SELECT * FROM {table} {where} ORDER BY {ts_col} DESC LIMIT ?", params, ).fetchall()) def audit_in_range(start_iso: Optional[str], end_iso: Optional[str], limit: int = 500) -> list[sqlite3.Row]: return _range_filter_rows("audit_log", "timestamp", start_iso, end_iso, limit) def errors_in_range(start_iso: Optional[str], end_iso: Optional[str], limit: int = 500) -> list[sqlite3.Row]: return _range_filter_rows("errors", "timestamp", start_iso, end_iso, limit) # --------------------------------------------------------------------------- # Retention cleanup # --------------------------------------------------------------------------- # --------------------------------------------------------------------------- # Partnerships # --------------------------------------------------------------------------- def get_accepted_partnership(user_id: int) -> Optional[sqlite3.Row]: return _get_conn().execute( """SELECT * FROM partnerships WHERE status = 'accepted' AND (from_user_id = ? OR to_user_id = ?) LIMIT 1""", (user_id, user_id), ).fetchone() def get_partner_user(user_id: int) -> Optional[sqlite3.Row]: row = get_accepted_partnership(user_id) if not row: return None other = row["to_user_id"] if row["from_user_id"] == user_id else row["from_user_id"] return get_user(other) def partnership_incoming(user_id: int) -> list[sqlite3.Row]: return list(_get_conn().execute( """SELECT p.*, u.username AS from_username FROM partnerships p JOIN users u ON u.id = p.from_user_id WHERE p.status = 'pending' AND p.to_user_id = ? ORDER BY p.created_at DESC""", (user_id,), ).fetchall()) def partnership_outgoing(user_id: int) -> list[sqlite3.Row]: return list(_get_conn().execute( """SELECT p.*, u.username AS to_username FROM partnerships p JOIN users u ON u.id = p.to_user_id WHERE p.status = 'pending' AND p.from_user_id = ? ORDER BY p.created_at DESC""", (user_id,), ).fetchall()) def partnership_request(from_id: int, to_id: int) -> Optional[int]: """Create a pending request. Returns id or None if rejected (self, already linked on either side, or a pending/accepted row already exists).""" if from_id == to_id: return None if get_accepted_partnership(from_id) or get_accepted_partnership(to_id): return None with _lock: # Reject duplicate in either direction. dup = _get_conn().execute( """SELECT id FROM partnerships WHERE (from_user_id = ? AND to_user_id = ?) OR (from_user_id = ? AND to_user_id = ?)""", (from_id, to_id, to_id, from_id), ).fetchone() if dup: return None cur = _get_conn().execute( "INSERT INTO partnerships(from_user_id, to_user_id, status, created_at) " "VALUES (?, ?, 'pending', ?)", (from_id, to_id, now_iso()), ) return cur.lastrowid def partnership_accept(request_id: int, user_id: int) -> bool: """Accept a pending request addressed to user_id. Also wipes any other pending rows involving either partner.""" row = _get_conn().execute( "SELECT * FROM partnerships WHERE id = ? AND status = 'pending'", (request_id,), ).fetchone() if not row or row["to_user_id"] != user_id: return False # Don't allow accept if either side already has an accepted partner. if get_accepted_partnership(row["from_user_id"]) or get_accepted_partnership(user_id): return False partner_id = row["from_user_id"] with _lock, _tx() as c: c.execute( "UPDATE partnerships SET status = 'accepted', accepted_at = ? WHERE id = ?", (now_iso(), request_id), ) # clean up any stale pending requests touching either user c.execute( """DELETE FROM partnerships WHERE status = 'pending' AND (from_user_id IN (?, ?) OR to_user_id IN (?, ?))""", (user_id, partner_id, user_id, partner_id), ) return True def partnership_decline(request_id: int, user_id: int) -> bool: """Decline an incoming pending request (deletes the row).""" with _lock: cur = _get_conn().execute( """DELETE FROM partnerships WHERE id = ? AND status = 'pending' AND (to_user_id = ? OR from_user_id = ?)""", (request_id, user_id, user_id), ) return cur.rowcount > 0 def partnership_unlink(user_id: int) -> bool: """Remove the current accepted partnership (either side can call).""" with _lock: cur = _get_conn().execute( """DELETE FROM partnerships WHERE status = 'accepted' AND (from_user_id = ? OR to_user_id = ?)""", (user_id, user_id), ) return cur.rowcount > 0 def partner_flat_actions(partner_id: int) -> dict: """Flats the partner has touched. 'applied' = any application (regardless of outcome); 'rejected' = in flat_rejections.""" applied = {r["flat_id"] for r in _get_conn().execute( "SELECT DISTINCT flat_id FROM applications WHERE user_id = ?", (partner_id,) ).fetchall()} rejected = {r["flat_id"] for r in _get_conn().execute( "SELECT flat_id FROM flat_rejections WHERE user_id = ?", (partner_id,) ).fetchall()} return {"applied": applied, "rejected": rejected} def cleanup_retention() -> dict: cutoff = (datetime.now(timezone.utc) - timedelta(days=RETENTION_DAYS)).isoformat(timespec="seconds") stats = {} with _lock, _tx() as c: for table in ("errors", "audit_log"): cur = c.execute(f"DELETE FROM {table} WHERE timestamp < ?", (cutoff,)) stats[table] = cur.rowcount # Drop forensics from old applications (but keep the row itself for history) cur = c.execute( "UPDATE applications SET forensics_json = NULL WHERE started_at < ? AND forensics_json IS NOT NULL", (cutoff,), ) stats["applications_forensics_wiped"] = cur.rowcount if any(v for v in stats.values()): logger.info("retention cleanup: %s", stats) return stats