lazyflat/web/db.py
EiSiMo d13f9c5b6e feat(filter): Berlin-Bezirk filter in Einstellungen
Adds a collapsible 12-Bezirk checkbox list to the filter tab. The UI
speaks Bezirke; internally the match runs on the PLZ extracted from
flat.address and resolved to a dominant Bezirk via a curated 187-PLZ
map (berlin_districts.py).

- Migration 0011 adds user_filters.districts (CSV of selected names)
- Empty stored value = no filter = all Bezirke ticked in the UI.
  Submitting "all ticked" or "none ticked" both normalise to empty
  so the defaults and the nuclear state mean the same thing.
- When a Bezirk filter is active, flats with an unknown/unmapped PLZ
  are excluded — if the user bothered to narrow by district, sneaking
  in unplaceable flats would be the wrong default.
- Filter summary on the Wohnungen page shows "N Bezirke" so it's
  visible the filter is active.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 10:05:55 +02:00

988 lines
37 KiB
Python

"""
SQLite data layer for lazyflat.
Multi-user: users, per-user profiles/filters/notifications/preferences.
All per-user rows are 1:1 with users. Errors and forensics are retained
for 14 days and cleaned up periodically.
"""
import json
import logging
import sqlite3
import threading
from contextlib import contextmanager
from datetime import datetime, timedelta, timezone
from typing import Any, Iterable, Optional
from settings import DB_PATH, RETENTION_DAYS
logger = logging.getLogger("web.db")
# WAL mode permits any number of concurrent readers and one writer at a time,
# but a single sqlite3.Connection object is not safe to share across threads
# (cursors can collide). We keep one connection per thread via threading.local
# and a module-level write lock so concurrent writers don't trip
# "database is locked" during BEGIN IMMEDIATE.
_lock = threading.Lock()
_local = threading.local()
def _connect() -> sqlite3.Connection:
conn = sqlite3.connect(DB_PATH, isolation_level=None, check_same_thread=False)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
conn.execute("PRAGMA busy_timeout=5000")
return conn
def _get_conn() -> sqlite3.Connection:
c = getattr(_local, "conn", None)
if c is None:
c = _connect()
_local.conn = c
return c
@contextmanager
def _tx():
"""Atomic BEGIN IMMEDIATE … COMMIT/ROLLBACK. Required for multi-statement
writes because our connections run in autocommit mode (isolation_level=None).
Combine with _lock to serialize writers and avoid BUSY.
"""
c = _get_conn()
c.execute("BEGIN IMMEDIATE")
try:
yield c
except Exception:
c.execute("ROLLBACK")
raise
else:
c.execute("COMMIT")
# ---------------------------------------------------------------------------
# Schema
# ---------------------------------------------------------------------------
MIGRATIONS: list[str] = [
# 0001: drop legacy single-user tables if present (only matters on upgrade
# from the pre-multi-user commit; no production data yet). Safe for fresh DBs.
"""
DROP TABLE IF EXISTS applications;
DROP TABLE IF EXISTS flats;
DROP TABLE IF EXISTS state;
DROP TABLE IF EXISTS audit_log;
""",
# 0002: base + users
"""
CREATE TABLE IF NOT EXISTS schema_version (version INTEGER PRIMARY KEY);
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL COLLATE NOCASE,
password_hash TEXT NOT NULL,
is_admin INTEGER NOT NULL DEFAULT 0,
disabled INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS user_profiles (
user_id INTEGER PRIMARY KEY REFERENCES users(id) ON DELETE CASCADE,
salutation TEXT DEFAULT '',
firstname TEXT DEFAULT '',
lastname TEXT DEFAULT '',
email TEXT DEFAULT '',
telephone TEXT DEFAULT '',
street TEXT DEFAULT '',
house_number TEXT DEFAULT '',
postcode TEXT DEFAULT '',
city TEXT DEFAULT '',
is_possessing_wbs INTEGER NOT NULL DEFAULT 0,
wbs_type TEXT DEFAULT '0',
wbs_valid_till TEXT DEFAULT '1970-01-01',
wbs_rooms INTEGER NOT NULL DEFAULT 0,
wbs_adults INTEGER NOT NULL DEFAULT 0,
wbs_children INTEGER NOT NULL DEFAULT 0,
is_prio_wbs INTEGER NOT NULL DEFAULT 0,
immomio_email TEXT DEFAULT '',
immomio_password TEXT DEFAULT '',
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS user_filters (
user_id INTEGER PRIMARY KEY REFERENCES users(id) ON DELETE CASCADE,
rooms_min REAL,
rooms_max REAL,
max_rent REAL,
min_size REAL,
max_morning_commute REAL,
wbs_required TEXT DEFAULT '', -- '', 'yes', 'no'
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS user_notifications (
user_id INTEGER PRIMARY KEY REFERENCES users(id) ON DELETE CASCADE,
channel TEXT NOT NULL DEFAULT 'ui', -- 'ui' | 'telegram' | 'email'
telegram_bot_token TEXT DEFAULT '',
telegram_chat_id TEXT DEFAULT '',
email_address TEXT DEFAULT '',
notify_on_match INTEGER NOT NULL DEFAULT 1,
notify_on_apply_success INTEGER NOT NULL DEFAULT 1,
notify_on_apply_fail INTEGER NOT NULL DEFAULT 1,
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS user_preferences (
user_id INTEGER PRIMARY KEY REFERENCES users(id) ON DELETE CASCADE,
auto_apply_enabled INTEGER NOT NULL DEFAULT 0,
submit_forms INTEGER NOT NULL DEFAULT 0,
kill_switch INTEGER NOT NULL DEFAULT 0,
apply_circuit_open INTEGER NOT NULL DEFAULT 0,
apply_recent_failures INTEGER NOT NULL DEFAULT 0,
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS flats (
id TEXT PRIMARY KEY,
link TEXT NOT NULL,
address TEXT,
rooms REAL,
size REAL,
total_rent REAL,
sqm_price REAL,
year_built TEXT,
wbs TEXT,
connectivity_morning_time REAL,
connectivity_night_time REAL,
address_link_gmaps TEXT,
payload_json TEXT NOT NULL,
discovered_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_flats_discovered ON flats(discovered_at DESC);
CREATE TABLE IF NOT EXISTS applications (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
flat_id TEXT NOT NULL REFERENCES flats(id),
url TEXT NOT NULL,
triggered_by TEXT NOT NULL, -- 'user' | 'auto'
submit_forms_used INTEGER NOT NULL DEFAULT 0,
provider TEXT DEFAULT '',
started_at TEXT NOT NULL,
finished_at TEXT,
success INTEGER,
message TEXT,
profile_snapshot_json TEXT,
forensics_json TEXT -- structured payload from apply service
);
CREATE INDEX IF NOT EXISTS idx_applications_user ON applications(user_id, started_at DESC);
CREATE INDEX IF NOT EXISTS idx_applications_started ON applications(started_at DESC);
CREATE INDEX IF NOT EXISTS idx_applications_flat ON applications(flat_id);
CREATE TABLE IF NOT EXISTS errors (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
user_id INTEGER,
source TEXT NOT NULL, -- 'apply'|'alert'|'web'|'system'
kind TEXT NOT NULL, -- 'apply_failure'|'scraper_error'|...
summary TEXT NOT NULL,
application_id INTEGER,
context_json TEXT
);
CREATE INDEX IF NOT EXISTS idx_errors_timestamp ON errors(timestamp DESC);
CREATE INDEX IF NOT EXISTS idx_errors_user ON errors(user_id, timestamp DESC);
CREATE TABLE IF NOT EXISTS audit_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
user_id INTEGER,
actor TEXT NOT NULL,
action TEXT NOT NULL,
details TEXT,
ip TEXT
);
CREATE INDEX IF NOT EXISTS idx_audit_timestamp ON audit_log(timestamp DESC);
CREATE INDEX IF NOT EXISTS idx_audit_user ON audit_log(user_id, timestamp DESC);
CREATE TABLE IF NOT EXISTS system_state (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
""",
# 0003: lat/lng for map view
"""
ALTER TABLE flats ADD COLUMN lat REAL;
ALTER TABLE flats ADD COLUMN lng REAL;
""",
# 0004: per-user rejections — flats the user doesn't want in the list anymore
"""
CREATE TABLE IF NOT EXISTS flat_rejections (
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
flat_id TEXT NOT NULL REFERENCES flats(id),
rejected_at TEXT NOT NULL,
PRIMARY KEY (user_id, flat_id)
);
CREATE INDEX IF NOT EXISTS idx_rejections_user ON flat_rejections(user_id);
""",
# 0005: LLM enrichment — extracted details + downloaded image count per flat
"""
ALTER TABLE flats ADD COLUMN enrichment_json TEXT;
ALTER TABLE flats ADD COLUMN enrichment_status TEXT NOT NULL DEFAULT 'pending';
ALTER TABLE flats ADD COLUMN enrichment_updated_at TEXT;
ALTER TABLE flats ADD COLUMN image_count INTEGER NOT NULL DEFAULT 0;
""",
# 0006: time filter — hide flats older than X hours (NULL = no limit)
"""
ALTER TABLE user_filters ADD COLUMN max_age_hours INTEGER;
""",
# 0007: secrets table — API keys / scraper creds editable from admin UI
"""
CREATE TABLE IF NOT EXISTS secrets (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at TEXT NOT NULL
);
""",
# 0008: partnerships — pair up two user accounts so each sees the other's
# apply/reject interactions in the flat list.
"""
CREATE TABLE IF NOT EXISTS partnerships (
id INTEGER PRIMARY KEY AUTOINCREMENT,
from_user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
to_user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
status TEXT NOT NULL DEFAULT 'pending', -- 'pending' | 'accepted'
created_at TEXT NOT NULL,
accepted_at TEXT,
UNIQUE(from_user_id, to_user_id),
CHECK(from_user_id != to_user_id)
);
CREATE INDEX IF NOT EXISTS idx_partnerships_from ON partnerships(from_user_id);
CREATE INDEX IF NOT EXISTS idx_partnerships_to ON partnerships(to_user_id);
""",
# 0009: composite index for latest_applications_by_flat + last_application_for_flat
"""
CREATE INDEX IF NOT EXISTS idx_applications_user_flat_started
ON applications(user_id, flat_id, started_at DESC);
""",
# 0010: flats go offline — mark globally so they drop out of every user's list
"""
ALTER TABLE flats ADD COLUMN offline_at TEXT;
CREATE INDEX IF NOT EXISTS idx_flats_offline ON flats(offline_at);
""",
# 0011: per-user Berlin-Bezirk filter. CSV of selected Bezirk names;
# empty = no filter (= all Bezirke match).
"""
ALTER TABLE user_filters ADD COLUMN districts TEXT NOT NULL DEFAULT '';
""",
]
def _current_version() -> int:
try:
row = _get_conn().execute("SELECT COALESCE(MAX(version), 0) AS v FROM schema_version").fetchone()
return int(row["v"]) if row else 0
except sqlite3.Error:
return 0
def init_db() -> None:
with _lock:
_get_conn().execute("CREATE TABLE IF NOT EXISTS schema_version (version INTEGER PRIMARY KEY)")
current = _current_version()
for i, script in enumerate(MIGRATIONS, start=1):
if i <= current:
continue
logger.info("applying migration v%d", i)
_get_conn().executescript(script)
_get_conn().execute("INSERT OR IGNORE INTO schema_version(version) VALUES (?)", (i,))
_get_conn().execute(
"INSERT OR IGNORE INTO system_state(key, value) VALUES ('last_alert_heartbeat', '')"
)
logger.info("DB initialized (schema v%d)", _current_version())
def now_iso() -> str:
return datetime.now(timezone.utc).isoformat(timespec="seconds")
# ---------------------------------------------------------------------------
# Secrets (admin-editable, source of truth for runtime creds)
# ---------------------------------------------------------------------------
SECRET_KEYS = ("ANTHROPIC_API_KEY", "BERLIN_WOHNEN_USERNAME", "BERLIN_WOHNEN_PASSWORD")
def get_secret(key: str) -> Optional[str]:
row = _get_conn().execute("SELECT value FROM secrets WHERE key = ?", (key,)).fetchone()
return row["value"] if row else None
def set_secret(key: str, value: str) -> None:
with _lock:
_get_conn().execute(
"INSERT INTO secrets(key, value, updated_at) VALUES (?, ?, ?) "
"ON CONFLICT(key) DO UPDATE SET value = excluded.value, "
" updated_at = excluded.updated_at",
(key, value, now_iso()),
)
def all_secrets() -> dict[str, str]:
rows = _get_conn().execute("SELECT key, value FROM secrets").fetchall()
return {r["key"]: r["value"] for r in rows}
def seed_secrets_from_env() -> None:
"""Copy env values into the DB for any secret key that's still empty.
Idempotent: existing DB values are never overwritten."""
import os
for k in SECRET_KEYS:
existing = get_secret(k)
if existing:
continue
env_val = os.environ.get(k, "")
if env_val:
set_secret(k, env_val)
logger.info("seeded secret %s from env", k)
# ---------------------------------------------------------------------------
# System state
# ---------------------------------------------------------------------------
def get_state(key: str) -> Optional[str]:
row = _get_conn().execute("SELECT value FROM system_state WHERE key = ?", (key,)).fetchone()
return row["value"] if row else None
def set_state(key: str, value: str) -> None:
with _lock:
_get_conn().execute(
"INSERT INTO system_state(key, value) VALUES (?, ?) "
"ON CONFLICT(key) DO UPDATE SET value = excluded.value",
(key, value),
)
# ---------------------------------------------------------------------------
# Users
# ---------------------------------------------------------------------------
def _ensure_user_rows(user_id: int) -> None:
ts = now_iso()
with _lock:
for q in (
"INSERT OR IGNORE INTO user_profiles(user_id, updated_at) VALUES (?, ?)",
"INSERT OR IGNORE INTO user_filters(user_id, updated_at) VALUES (?, ?)",
"INSERT OR IGNORE INTO user_notifications(user_id, updated_at) VALUES (?, ?)",
"INSERT OR IGNORE INTO user_preferences(user_id, updated_at) VALUES (?, ?)",
):
_get_conn().execute(q, (user_id, ts))
def create_user(username: str, password_hash: str, is_admin: bool = False) -> int:
ts = now_iso()
with _lock:
cur = _get_conn().execute(
"INSERT INTO users(username, password_hash, is_admin, created_at, updated_at) "
"VALUES (?, ?, ?, ?, ?)",
(username, password_hash, 1 if is_admin else 0, ts, ts),
)
uid = cur.lastrowid
_ensure_user_rows(uid)
logger.info("created user id=%s username=%s admin=%s", uid, username, is_admin)
return uid
def get_user_by_username(username: str) -> Optional[sqlite3.Row]:
return _get_conn().execute(
"SELECT * FROM users WHERE username = ? COLLATE NOCASE AND disabled = 0", (username,)
).fetchone()
def get_user(user_id: int) -> Optional[sqlite3.Row]:
return _get_conn().execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone()
def list_users() -> list[sqlite3.Row]:
return list(_get_conn().execute("SELECT * FROM users ORDER BY username").fetchall())
def set_user_password(user_id: int, password_hash: str) -> None:
with _lock:
_get_conn().execute(
"UPDATE users SET password_hash = ?, updated_at = ? WHERE id = ?",
(password_hash, now_iso(), user_id),
)
def set_user_disabled(user_id: int, disabled: bool) -> None:
with _lock:
_get_conn().execute(
"UPDATE users SET disabled = ?, updated_at = ? WHERE id = ?",
(1 if disabled else 0, now_iso(), user_id),
)
def delete_user(user_id: int) -> None:
"""Remove a user and everything that cascades (profile, filters, notifications,
preferences, rejections, applications). Audit/error logs stay (user_id column
is nullable)."""
with _lock:
_get_conn().execute("DELETE FROM users WHERE id = ?", (user_id,))
# ---------------------------------------------------------------------------
# User profile / filters / notifications / preferences
# ---------------------------------------------------------------------------
def get_profile(user_id: int) -> sqlite3.Row:
_ensure_user_rows(user_id)
return _get_conn().execute("SELECT * FROM user_profiles WHERE user_id = ?", (user_id,)).fetchone()
def update_profile(user_id: int, data: dict) -> None:
_ensure_user_rows(user_id)
allowed = {
"salutation", "firstname", "lastname", "email", "telephone",
"street", "house_number", "postcode", "city",
"is_possessing_wbs", "wbs_type", "wbs_valid_till",
"wbs_rooms", "wbs_adults", "wbs_children", "is_prio_wbs",
"immomio_email", "immomio_password",
}
clean = {k: v for k, v in data.items() if k in allowed}
if not clean:
return
cols = ", ".join(f"{k} = ?" for k in clean)
vals = list(clean.values()) + [now_iso(), user_id]
with _lock:
_get_conn().execute(f"UPDATE user_profiles SET {cols}, updated_at = ? WHERE user_id = ?", vals)
def get_filters(user_id: int) -> sqlite3.Row:
_ensure_user_rows(user_id)
return _get_conn().execute("SELECT * FROM user_filters WHERE user_id = ?", (user_id,)).fetchone()
def update_filters(user_id: int, data: dict) -> None:
_ensure_user_rows(user_id)
allowed = {"rooms_min", "rooms_max", "max_rent", "min_size",
"wbs_required", "max_age_hours", "districts"}
clean = {k: data.get(k) for k in allowed if k in data}
if not clean:
return
cols = ", ".join(f"{k} = ?" for k in clean)
vals = list(clean.values()) + [now_iso(), user_id]
with _lock:
_get_conn().execute(f"UPDATE user_filters SET {cols}, updated_at = ? WHERE user_id = ?", vals)
def get_notifications(user_id: int) -> sqlite3.Row:
_ensure_user_rows(user_id)
return _get_conn().execute("SELECT * FROM user_notifications WHERE user_id = ?", (user_id,)).fetchone()
def update_notifications(user_id: int, data: dict) -> None:
_ensure_user_rows(user_id)
allowed = {
"channel", "telegram_bot_token", "telegram_chat_id",
"notify_on_match", "notify_on_apply_success", "notify_on_apply_fail",
}
clean = {k: v for k, v in data.items() if k in allowed}
if not clean:
return
cols = ", ".join(f"{k} = ?" for k in clean)
vals = list(clean.values()) + [now_iso(), user_id]
with _lock:
_get_conn().execute(f"UPDATE user_notifications SET {cols}, updated_at = ? WHERE user_id = ?", vals)
def get_preferences(user_id: int) -> sqlite3.Row:
_ensure_user_rows(user_id)
return _get_conn().execute("SELECT * FROM user_preferences WHERE user_id = ?", (user_id,)).fetchone()
def update_preferences(user_id: int, data: dict) -> None:
_ensure_user_rows(user_id)
allowed = {
"auto_apply_enabled", "submit_forms",
"apply_circuit_open", "apply_recent_failures",
}
clean = {k: v for k, v in data.items() if k in allowed}
if not clean:
return
cols = ", ".join(f"{k} = ?" for k in clean)
vals = list(clean.values()) + [now_iso(), user_id]
with _lock:
_get_conn().execute(f"UPDATE user_preferences SET {cols}, updated_at = ? WHERE user_id = ?", vals)
# ---------------------------------------------------------------------------
# Flats
# ---------------------------------------------------------------------------
def upsert_flat(payload: dict) -> bool:
flat_id = str(payload["id"])
with _lock:
existing = _get_conn().execute(
"SELECT id, lat, lng FROM flats WHERE id = ?", (flat_id,)
).fetchone()
if existing:
# Backfill coords on old rows that pre-date the lat/lng migration.
if (existing["lat"] is None or existing["lng"] is None) \
and payload.get("lat") is not None and payload.get("lng") is not None:
_get_conn().execute(
"UPDATE flats SET lat = ?, lng = ? WHERE id = ?",
(payload["lat"], payload["lng"], flat_id),
)
return False
c = payload.get("connectivity") or {}
_get_conn().execute(
"""INSERT INTO flats(
id, link, address, rooms, size, total_rent, sqm_price, year_built, wbs,
connectivity_morning_time, connectivity_night_time, address_link_gmaps,
payload_json, discovered_at, lat, lng
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
flat_id, payload.get("link", ""), payload.get("address", ""),
payload.get("rooms"), payload.get("size"), payload.get("total_rent"),
payload.get("sqm_price"), str(payload.get("year_built", "")),
str(payload.get("wbs", "")),
c.get("morning_time"), c.get("night_time"),
payload.get("address_link_gmaps"),
json.dumps(payload, default=str),
now_iso(),
payload.get("lat"), payload.get("lng"),
),
)
return True
def recent_flats(limit: int = 50) -> list[sqlite3.Row]:
return list(_get_conn().execute(
"SELECT * FROM flats WHERE offline_at IS NULL "
"ORDER BY discovered_at DESC LIMIT ?", (limit,)
).fetchall())
def mark_flat_offline(flat_id: str) -> None:
"""Flag a flat as no longer reachable. Idempotent — the first offline
timestamp wins so we can tell how long it's been gone."""
with _lock:
_get_conn().execute(
"UPDATE flats SET offline_at = ? WHERE id = ? AND offline_at IS NULL",
(now_iso(), flat_id),
)
def get_flat(flat_id: str) -> Optional[sqlite3.Row]:
return _get_conn().execute("SELECT * FROM flats WHERE id = ?", (flat_id,)).fetchone()
def set_flat_enrichment(flat_id: str, status: str,
enrichment: Optional[dict] = None,
image_count: int = 0) -> None:
with _lock:
_get_conn().execute(
"""UPDATE flats SET enrichment_status = ?,
enrichment_json = ?,
enrichment_updated_at = ?,
image_count = ?
WHERE id = ?""",
(status,
json.dumps(enrichment) if enrichment is not None else None,
now_iso(), image_count, flat_id),
)
def flats_needing_enrichment(limit: int = 100) -> list[sqlite3.Row]:
return list(_get_conn().execute(
"""SELECT id, link FROM flats
WHERE enrichment_status IN ('pending', 'failed')
ORDER BY discovered_at DESC LIMIT ?""",
(limit,),
).fetchall())
def enrichment_counts() -> dict:
row = _get_conn().execute(
"""SELECT
COUNT(*) AS total,
SUM(CASE WHEN enrichment_status = 'ok' THEN 1 ELSE 0 END) AS ok,
SUM(CASE WHEN enrichment_status = 'pending' THEN 1 ELSE 0 END) AS pending,
SUM(CASE WHEN enrichment_status = 'failed' THEN 1 ELSE 0 END) AS failed
FROM flats"""
).fetchone()
return {
"total": int(row["total"] or 0),
"ok": int(row["ok"] or 0),
"pending": int(row["pending"] or 0),
"failed": int(row["failed"] or 0),
}
# ---------------------------------------------------------------------------
# Applications
# ---------------------------------------------------------------------------
def start_application(user_id: int, flat_id: str, url: str, triggered_by: str,
submit_forms: bool, profile_snapshot: dict) -> int:
with _lock:
cur = _get_conn().execute(
"""INSERT INTO applications(
user_id, flat_id, url, triggered_by, submit_forms_used,
started_at, profile_snapshot_json
) VALUES (?, ?, ?, ?, ?, ?, ?)""",
(user_id, flat_id, url, triggered_by, 1 if submit_forms else 0,
now_iso(), json.dumps(profile_snapshot)),
)
return cur.lastrowid
def finish_application(app_id: int, success: bool, message: str,
provider: str = "", forensics: Optional[dict] = None) -> None:
with _lock:
_get_conn().execute(
"""UPDATE applications SET finished_at = ?, success = ?, message = ?,
provider = ?, forensics_json = ?
WHERE id = ?""",
(now_iso(), 1 if success else 0, message, provider,
json.dumps(forensics) if forensics is not None else None,
app_id),
)
def get_application(app_id: int) -> Optional[sqlite3.Row]:
return _get_conn().execute("SELECT * FROM applications WHERE id = ?", (app_id,)).fetchone()
def recent_applications(user_id: Optional[int], limit: int = 50) -> list[sqlite3.Row]:
if user_id is None:
return list(_get_conn().execute(
"""SELECT a.*, f.address, f.link
FROM applications a LEFT JOIN flats f ON f.id = a.flat_id
ORDER BY a.started_at DESC LIMIT ?""", (limit,)
).fetchall())
return list(_get_conn().execute(
"""SELECT a.*, f.address, f.link
FROM applications a LEFT JOIN flats f ON f.id = a.flat_id
WHERE a.user_id = ?
ORDER BY a.started_at DESC LIMIT ?""",
(user_id, limit),
).fetchall())
# ---------------------------------------------------------------------------
# Rejections (flats a user doesn't want to see anymore)
# ---------------------------------------------------------------------------
def reject_flat(user_id: int, flat_id: str) -> None:
with _lock:
_get_conn().execute(
"INSERT OR IGNORE INTO flat_rejections(user_id, flat_id, rejected_at) VALUES (?, ?, ?)",
(user_id, flat_id, now_iso()),
)
def unreject_flat(user_id: int, flat_id: str) -> None:
with _lock:
_get_conn().execute(
"DELETE FROM flat_rejections WHERE user_id = ? AND flat_id = ?",
(user_id, flat_id),
)
def rejected_flat_ids(user_id: int) -> set[str]:
rows = _get_conn().execute(
"SELECT flat_id FROM flat_rejections WHERE user_id = ?", (user_id,)
).fetchall()
return {row["flat_id"] for row in rows}
def rejected_flats(user_id: int, limit: int = 200) -> list[sqlite3.Row]:
return list(_get_conn().execute(
"""SELECT f.*, r.rejected_at
FROM flat_rejections r JOIN flats f ON f.id = r.flat_id
WHERE r.user_id = ?
ORDER BY r.rejected_at DESC LIMIT ?""",
(user_id, limit),
).fetchall())
def last_application_for_flat(user_id: int, flat_id: str) -> Optional[sqlite3.Row]:
return _get_conn().execute(
"""SELECT * FROM applications
WHERE user_id = ? AND flat_id = ?
ORDER BY started_at DESC LIMIT 1""",
(user_id, flat_id),
).fetchone()
def latest_applications_by_flat(user_id: int) -> dict:
"""Return a dict {flat_id: latest-application-row} for all flats the user
has interacted with. One query instead of N."""
rows = _get_conn().execute(
"""SELECT a.*
FROM applications a
JOIN (
SELECT flat_id, MAX(started_at) AS maxstart
FROM applications
WHERE user_id = ?
GROUP BY flat_id
) m ON m.flat_id = a.flat_id AND m.maxstart = a.started_at
WHERE a.user_id = ?""",
(user_id, user_id),
).fetchall()
return {r["flat_id"]: r for r in rows}
def has_running_application(user_id: int) -> bool:
row = _get_conn().execute(
"SELECT 1 FROM applications WHERE user_id = ? AND finished_at IS NULL LIMIT 1",
(user_id,),
).fetchone()
return row is not None
# ---------------------------------------------------------------------------
# Errors
# ---------------------------------------------------------------------------
def log_error(source: str, kind: str, summary: str,
user_id: Optional[int] = None, application_id: Optional[int] = None,
context: Optional[dict] = None) -> int:
with _lock:
cur = _get_conn().execute(
"""INSERT INTO errors(timestamp, user_id, source, kind, summary, application_id, context_json)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(now_iso(), user_id, source, kind, summary, application_id,
json.dumps(context) if context else None),
)
return cur.lastrowid
def recent_errors(user_id: Optional[int], limit: int = 100,
include_global: bool = False) -> list[sqlite3.Row]:
if user_id is None:
return list(_get_conn().execute(
"SELECT * FROM errors ORDER BY timestamp DESC LIMIT ?", (limit,)
).fetchall())
if include_global:
return list(_get_conn().execute(
"""SELECT * FROM errors
WHERE user_id = ? OR user_id IS NULL
ORDER BY timestamp DESC LIMIT ?""",
(user_id, limit),
).fetchall())
return list(_get_conn().execute(
"SELECT * FROM errors WHERE user_id = ? ORDER BY timestamp DESC LIMIT ?",
(user_id, limit),
).fetchall())
# ---------------------------------------------------------------------------
# Audit log
# ---------------------------------------------------------------------------
def log_audit(actor: str, action: str, details: str = "",
user_id: Optional[int] = None, ip: str = "") -> None:
with _lock:
_get_conn().execute(
"INSERT INTO audit_log(timestamp, user_id, actor, action, details, ip) "
"VALUES (?, ?, ?, ?, ?, ?)",
(now_iso(), user_id, actor, action, details, ip),
)
def recent_audit(user_id: Optional[int], limit: int = 100) -> list[sqlite3.Row]:
if user_id is None:
return list(_get_conn().execute(
"SELECT * FROM audit_log ORDER BY timestamp DESC LIMIT ?", (limit,)
).fetchall())
return list(_get_conn().execute(
"SELECT * FROM audit_log WHERE user_id = ? OR user_id IS NULL ORDER BY timestamp DESC LIMIT ?",
(user_id, limit),
).fetchall())
def _range_filter_rows(table: str, ts_col: str, start_iso: Optional[str],
end_iso: Optional[str], limit: int) -> list[sqlite3.Row]:
"""Date-range filtered fetch from an append-only table. Pushes the
timestamp filter into SQL so we don't drag 5000 rows into Python just
to discard most of them."""
clauses, params = [], []
if start_iso:
clauses.append(f"{ts_col} >= ?")
params.append(start_iso)
if end_iso:
clauses.append(f"{ts_col} < ?")
params.append(end_iso)
where = ("WHERE " + " AND ".join(clauses)) if clauses else ""
params.append(limit)
return list(_get_conn().execute(
f"SELECT * FROM {table} {where} ORDER BY {ts_col} DESC LIMIT ?",
params,
).fetchall())
def audit_in_range(start_iso: Optional[str], end_iso: Optional[str],
limit: int = 500) -> list[sqlite3.Row]:
return _range_filter_rows("audit_log", "timestamp", start_iso, end_iso, limit)
def errors_in_range(start_iso: Optional[str], end_iso: Optional[str],
limit: int = 500) -> list[sqlite3.Row]:
return _range_filter_rows("errors", "timestamp", start_iso, end_iso, limit)
# ---------------------------------------------------------------------------
# Retention cleanup
# ---------------------------------------------------------------------------
# ---------------------------------------------------------------------------
# Partnerships
# ---------------------------------------------------------------------------
def get_accepted_partnership(user_id: int) -> Optional[sqlite3.Row]:
return _get_conn().execute(
"""SELECT * FROM partnerships
WHERE status = 'accepted'
AND (from_user_id = ? OR to_user_id = ?) LIMIT 1""",
(user_id, user_id),
).fetchone()
def get_partner_user(user_id: int) -> Optional[sqlite3.Row]:
row = get_accepted_partnership(user_id)
if not row:
return None
other = row["to_user_id"] if row["from_user_id"] == user_id else row["from_user_id"]
return get_user(other)
def partnership_incoming(user_id: int) -> list[sqlite3.Row]:
return list(_get_conn().execute(
"""SELECT p.*, u.username AS from_username
FROM partnerships p JOIN users u ON u.id = p.from_user_id
WHERE p.status = 'pending' AND p.to_user_id = ?
ORDER BY p.created_at DESC""",
(user_id,),
).fetchall())
def partnership_outgoing(user_id: int) -> list[sqlite3.Row]:
return list(_get_conn().execute(
"""SELECT p.*, u.username AS to_username
FROM partnerships p JOIN users u ON u.id = p.to_user_id
WHERE p.status = 'pending' AND p.from_user_id = ?
ORDER BY p.created_at DESC""",
(user_id,),
).fetchall())
def partnership_request(from_id: int, to_id: int) -> Optional[int]:
"""Create a pending request. Returns id or None if rejected (self, already
linked on either side, or a pending/accepted row already exists)."""
if from_id == to_id:
return None
if get_accepted_partnership(from_id) or get_accepted_partnership(to_id):
return None
with _lock:
# Reject duplicate in either direction.
dup = _get_conn().execute(
"""SELECT id FROM partnerships
WHERE (from_user_id = ? AND to_user_id = ?)
OR (from_user_id = ? AND to_user_id = ?)""",
(from_id, to_id, to_id, from_id),
).fetchone()
if dup:
return None
cur = _get_conn().execute(
"INSERT INTO partnerships(from_user_id, to_user_id, status, created_at) "
"VALUES (?, ?, 'pending', ?)",
(from_id, to_id, now_iso()),
)
return cur.lastrowid
def partnership_accept(request_id: int, user_id: int) -> bool:
"""Accept a pending request addressed to user_id. Also wipes any other
pending rows involving either partner."""
row = _get_conn().execute(
"SELECT * FROM partnerships WHERE id = ? AND status = 'pending'",
(request_id,),
).fetchone()
if not row or row["to_user_id"] != user_id:
return False
# Don't allow accept if either side already has an accepted partner.
if get_accepted_partnership(row["from_user_id"]) or get_accepted_partnership(user_id):
return False
partner_id = row["from_user_id"]
with _lock, _tx() as c:
c.execute(
"UPDATE partnerships SET status = 'accepted', accepted_at = ? WHERE id = ?",
(now_iso(), request_id),
)
# clean up any stale pending requests touching either user
c.execute(
"""DELETE FROM partnerships
WHERE status = 'pending'
AND (from_user_id IN (?, ?) OR to_user_id IN (?, ?))""",
(user_id, partner_id, user_id, partner_id),
)
return True
def partnership_decline(request_id: int, user_id: int) -> bool:
"""Decline an incoming pending request (deletes the row)."""
with _lock:
cur = _get_conn().execute(
"""DELETE FROM partnerships
WHERE id = ? AND status = 'pending'
AND (to_user_id = ? OR from_user_id = ?)""",
(request_id, user_id, user_id),
)
return cur.rowcount > 0
def partnership_unlink(user_id: int) -> bool:
"""Remove the current accepted partnership (either side can call)."""
with _lock:
cur = _get_conn().execute(
"""DELETE FROM partnerships
WHERE status = 'accepted'
AND (from_user_id = ? OR to_user_id = ?)""",
(user_id, user_id),
)
return cur.rowcount > 0
def partner_flat_actions(partner_id: int) -> dict:
"""Flats the partner has touched. 'applied' = any application (regardless
of outcome); 'rejected' = in flat_rejections."""
applied = {r["flat_id"] for r in _get_conn().execute(
"SELECT DISTINCT flat_id FROM applications WHERE user_id = ?", (partner_id,)
).fetchall()}
rejected = {r["flat_id"] for r in _get_conn().execute(
"SELECT flat_id FROM flat_rejections WHERE user_id = ?", (partner_id,)
).fetchall()}
return {"applied": applied, "rejected": rejected}
def cleanup_retention() -> dict:
cutoff = (datetime.now(timezone.utc) - timedelta(days=RETENTION_DAYS)).isoformat(timespec="seconds")
stats = {}
with _lock, _tx() as c:
for table in ("errors", "audit_log"):
cur = c.execute(f"DELETE FROM {table} WHERE timestamp < ?", (cutoff,))
stats[table] = cur.rowcount
# Drop forensics from old applications (but keep the row itself for history)
cur = c.execute(
"UPDATE applications SET forensics_json = NULL WHERE started_at < ? AND forensics_json IS NOT NULL",
(cutoff,),
)
stats["applications_forensics_wiped"] = cur.rowcount
if any(v for v in stats.values()):
logger.info("retention cleanup: %s", stats)
return stats