The module-level _conn was being hit from FastAPI handlers, the retention daemon thread, and asyncio.to_thread workers simultaneously. Sharing a single sqlite3.Connection across threads is unsafe (cursors collide) even with check_same_thread=False and WAL. The writer _lock didn't cover readers, so a reader cursor could race a writer mid-statement. Switch to threading.local(): each thread gets its own Connection via _get_conn(). WAL handles concurrent readers/writer at the DB level; busy_timeout=5000 absorbs short-lived "database is locked" when two threads both try to BEGIN IMMEDIATE. The write-serialising _lock stays — it keeps multi-statement writer blocks atomic and avoids busy-loop on concurrent writers. External access via db._conn replaced with a new db.has_running_application helper (the only caller outside db.py). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
900 lines
34 KiB
Python
900 lines
34 KiB
Python
"""
|
|
SQLite data layer for wohnungsdidi.
|
|
|
|
Multi-user: users, per-user profiles/filters/notifications/preferences.
|
|
All per-user rows are 1:1 with users. Errors and forensics are retained
|
|
for 14 days and cleaned up periodically.
|
|
"""
|
|
import json
|
|
import logging
|
|
import sqlite3
|
|
import threading
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Any, Iterable, Optional
|
|
|
|
from settings import DB_PATH, RETENTION_DAYS
|
|
|
|
logger = logging.getLogger("web.db")
|
|
|
|
# WAL mode permits any number of concurrent readers and one writer at a time,
|
|
# but a single sqlite3.Connection object is not safe to share across threads
|
|
# (cursors can collide). We keep one connection per thread via threading.local
|
|
# and a module-level write lock so concurrent writers don't trip
|
|
# "database is locked" during BEGIN IMMEDIATE.
|
|
_lock = threading.Lock()
|
|
_local = threading.local()
|
|
|
|
|
|
def _connect() -> sqlite3.Connection:
|
|
conn = sqlite3.connect(DB_PATH, isolation_level=None, check_same_thread=False)
|
|
conn.row_factory = sqlite3.Row
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.execute("PRAGMA foreign_keys=ON")
|
|
conn.execute("PRAGMA busy_timeout=5000")
|
|
return conn
|
|
|
|
|
|
def _get_conn() -> sqlite3.Connection:
|
|
c = getattr(_local, "conn", None)
|
|
if c is None:
|
|
c = _connect()
|
|
_local.conn = c
|
|
return c
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Schema
|
|
# ---------------------------------------------------------------------------
|
|
|
|
MIGRATIONS: list[str] = [
|
|
# 0001: drop legacy single-user tables if present (only matters on upgrade
|
|
# from the pre-multi-user commit; no production data yet). Safe for fresh DBs.
|
|
"""
|
|
DROP TABLE IF EXISTS applications;
|
|
DROP TABLE IF EXISTS flats;
|
|
DROP TABLE IF EXISTS state;
|
|
DROP TABLE IF EXISTS audit_log;
|
|
""",
|
|
# 0002: base + users
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS schema_version (version INTEGER PRIMARY KEY);
|
|
|
|
CREATE TABLE IF NOT EXISTS users (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
username TEXT UNIQUE NOT NULL COLLATE NOCASE,
|
|
password_hash TEXT NOT NULL,
|
|
is_admin INTEGER NOT NULL DEFAULT 0,
|
|
disabled INTEGER NOT NULL DEFAULT 0,
|
|
created_at TEXT NOT NULL,
|
|
updated_at TEXT NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS user_profiles (
|
|
user_id INTEGER PRIMARY KEY REFERENCES users(id) ON DELETE CASCADE,
|
|
salutation TEXT DEFAULT '',
|
|
firstname TEXT DEFAULT '',
|
|
lastname TEXT DEFAULT '',
|
|
email TEXT DEFAULT '',
|
|
telephone TEXT DEFAULT '',
|
|
street TEXT DEFAULT '',
|
|
house_number TEXT DEFAULT '',
|
|
postcode TEXT DEFAULT '',
|
|
city TEXT DEFAULT '',
|
|
is_possessing_wbs INTEGER NOT NULL DEFAULT 0,
|
|
wbs_type TEXT DEFAULT '0',
|
|
wbs_valid_till TEXT DEFAULT '1970-01-01',
|
|
wbs_rooms INTEGER NOT NULL DEFAULT 0,
|
|
wbs_adults INTEGER NOT NULL DEFAULT 0,
|
|
wbs_children INTEGER NOT NULL DEFAULT 0,
|
|
is_prio_wbs INTEGER NOT NULL DEFAULT 0,
|
|
immomio_email TEXT DEFAULT '',
|
|
immomio_password TEXT DEFAULT '',
|
|
updated_at TEXT NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS user_filters (
|
|
user_id INTEGER PRIMARY KEY REFERENCES users(id) ON DELETE CASCADE,
|
|
rooms_min REAL,
|
|
rooms_max REAL,
|
|
max_rent REAL,
|
|
min_size REAL,
|
|
max_morning_commute REAL,
|
|
wbs_required TEXT DEFAULT '', -- '', 'yes', 'no'
|
|
updated_at TEXT NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS user_notifications (
|
|
user_id INTEGER PRIMARY KEY REFERENCES users(id) ON DELETE CASCADE,
|
|
channel TEXT NOT NULL DEFAULT 'ui', -- 'ui' | 'telegram' | 'email'
|
|
telegram_bot_token TEXT DEFAULT '',
|
|
telegram_chat_id TEXT DEFAULT '',
|
|
email_address TEXT DEFAULT '',
|
|
notify_on_match INTEGER NOT NULL DEFAULT 1,
|
|
notify_on_apply_success INTEGER NOT NULL DEFAULT 1,
|
|
notify_on_apply_fail INTEGER NOT NULL DEFAULT 1,
|
|
updated_at TEXT NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS user_preferences (
|
|
user_id INTEGER PRIMARY KEY REFERENCES users(id) ON DELETE CASCADE,
|
|
auto_apply_enabled INTEGER NOT NULL DEFAULT 0,
|
|
submit_forms INTEGER NOT NULL DEFAULT 0,
|
|
kill_switch INTEGER NOT NULL DEFAULT 0,
|
|
apply_circuit_open INTEGER NOT NULL DEFAULT 0,
|
|
apply_recent_failures INTEGER NOT NULL DEFAULT 0,
|
|
updated_at TEXT NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS flats (
|
|
id TEXT PRIMARY KEY,
|
|
link TEXT NOT NULL,
|
|
address TEXT,
|
|
rooms REAL,
|
|
size REAL,
|
|
total_rent REAL,
|
|
sqm_price REAL,
|
|
year_built TEXT,
|
|
wbs TEXT,
|
|
connectivity_morning_time REAL,
|
|
connectivity_night_time REAL,
|
|
address_link_gmaps TEXT,
|
|
payload_json TEXT NOT NULL,
|
|
discovered_at TEXT NOT NULL
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_flats_discovered ON flats(discovered_at DESC);
|
|
|
|
CREATE TABLE IF NOT EXISTS applications (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
|
|
flat_id TEXT NOT NULL REFERENCES flats(id),
|
|
url TEXT NOT NULL,
|
|
triggered_by TEXT NOT NULL, -- 'user' | 'auto'
|
|
submit_forms_used INTEGER NOT NULL DEFAULT 0,
|
|
provider TEXT DEFAULT '',
|
|
started_at TEXT NOT NULL,
|
|
finished_at TEXT,
|
|
success INTEGER,
|
|
message TEXT,
|
|
profile_snapshot_json TEXT,
|
|
forensics_json TEXT -- structured payload from apply service
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_applications_user ON applications(user_id, started_at DESC);
|
|
CREATE INDEX IF NOT EXISTS idx_applications_started ON applications(started_at DESC);
|
|
CREATE INDEX IF NOT EXISTS idx_applications_flat ON applications(flat_id);
|
|
|
|
CREATE TABLE IF NOT EXISTS errors (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp TEXT NOT NULL,
|
|
user_id INTEGER,
|
|
source TEXT NOT NULL, -- 'apply'|'alert'|'web'|'system'
|
|
kind TEXT NOT NULL, -- 'apply_failure'|'scraper_error'|...
|
|
summary TEXT NOT NULL,
|
|
application_id INTEGER,
|
|
context_json TEXT
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_errors_timestamp ON errors(timestamp DESC);
|
|
CREATE INDEX IF NOT EXISTS idx_errors_user ON errors(user_id, timestamp DESC);
|
|
|
|
CREATE TABLE IF NOT EXISTS audit_log (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp TEXT NOT NULL,
|
|
user_id INTEGER,
|
|
actor TEXT NOT NULL,
|
|
action TEXT NOT NULL,
|
|
details TEXT,
|
|
ip TEXT
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_audit_timestamp ON audit_log(timestamp DESC);
|
|
CREATE INDEX IF NOT EXISTS idx_audit_user ON audit_log(user_id, timestamp DESC);
|
|
|
|
CREATE TABLE IF NOT EXISTS system_state (
|
|
key TEXT PRIMARY KEY,
|
|
value TEXT NOT NULL
|
|
);
|
|
""",
|
|
# 0003: lat/lng for map view
|
|
"""
|
|
ALTER TABLE flats ADD COLUMN lat REAL;
|
|
ALTER TABLE flats ADD COLUMN lng REAL;
|
|
""",
|
|
# 0004: per-user rejections — flats the user doesn't want in the list anymore
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS flat_rejections (
|
|
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
|
|
flat_id TEXT NOT NULL REFERENCES flats(id),
|
|
rejected_at TEXT NOT NULL,
|
|
PRIMARY KEY (user_id, flat_id)
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_rejections_user ON flat_rejections(user_id);
|
|
""",
|
|
# 0005: LLM enrichment — extracted details + downloaded image count per flat
|
|
"""
|
|
ALTER TABLE flats ADD COLUMN enrichment_json TEXT;
|
|
ALTER TABLE flats ADD COLUMN enrichment_status TEXT NOT NULL DEFAULT 'pending';
|
|
ALTER TABLE flats ADD COLUMN enrichment_updated_at TEXT;
|
|
ALTER TABLE flats ADD COLUMN image_count INTEGER NOT NULL DEFAULT 0;
|
|
""",
|
|
# 0006: time filter — hide flats older than X hours (NULL = no limit)
|
|
"""
|
|
ALTER TABLE user_filters ADD COLUMN max_age_hours INTEGER;
|
|
""",
|
|
# 0007: secrets table — API keys / scraper creds editable from admin UI
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS secrets (
|
|
key TEXT PRIMARY KEY,
|
|
value TEXT NOT NULL,
|
|
updated_at TEXT NOT NULL
|
|
);
|
|
""",
|
|
# 0008: partnerships — pair up two user accounts so each sees the other's
|
|
# apply/reject interactions in the flat list.
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS partnerships (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
from_user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
|
|
to_user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
|
|
status TEXT NOT NULL DEFAULT 'pending', -- 'pending' | 'accepted'
|
|
created_at TEXT NOT NULL,
|
|
accepted_at TEXT,
|
|
UNIQUE(from_user_id, to_user_id),
|
|
CHECK(from_user_id != to_user_id)
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_partnerships_from ON partnerships(from_user_id);
|
|
CREATE INDEX IF NOT EXISTS idx_partnerships_to ON partnerships(to_user_id);
|
|
""",
|
|
]
|
|
|
|
|
|
def _current_version() -> int:
|
|
try:
|
|
row = _get_conn().execute("SELECT COALESCE(MAX(version), 0) AS v FROM schema_version").fetchone()
|
|
return int(row["v"]) if row else 0
|
|
except sqlite3.Error:
|
|
return 0
|
|
|
|
|
|
def init_db() -> None:
|
|
with _lock:
|
|
_get_conn().execute("CREATE TABLE IF NOT EXISTS schema_version (version INTEGER PRIMARY KEY)")
|
|
current = _current_version()
|
|
for i, script in enumerate(MIGRATIONS, start=1):
|
|
if i <= current:
|
|
continue
|
|
logger.info("applying migration v%d", i)
|
|
_get_conn().executescript(script)
|
|
_get_conn().execute("INSERT OR IGNORE INTO schema_version(version) VALUES (?)", (i,))
|
|
_get_conn().execute(
|
|
"INSERT OR IGNORE INTO system_state(key, value) VALUES ('last_alert_heartbeat', '')"
|
|
)
|
|
logger.info("DB initialized (schema v%d)", _current_version())
|
|
|
|
|
|
def now_iso() -> str:
|
|
return datetime.now(timezone.utc).isoformat(timespec="seconds")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Secrets (admin-editable, source of truth for runtime creds)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
SECRET_KEYS = ("ANTHROPIC_API_KEY", "BERLIN_WOHNEN_USERNAME", "BERLIN_WOHNEN_PASSWORD")
|
|
|
|
|
|
def get_secret(key: str) -> Optional[str]:
|
|
row = _get_conn().execute("SELECT value FROM secrets WHERE key = ?", (key,)).fetchone()
|
|
return row["value"] if row else None
|
|
|
|
|
|
def set_secret(key: str, value: str) -> None:
|
|
with _lock:
|
|
_get_conn().execute(
|
|
"INSERT INTO secrets(key, value, updated_at) VALUES (?, ?, ?) "
|
|
"ON CONFLICT(key) DO UPDATE SET value = excluded.value, "
|
|
" updated_at = excluded.updated_at",
|
|
(key, value, now_iso()),
|
|
)
|
|
|
|
|
|
def all_secrets() -> dict[str, str]:
|
|
rows = _get_conn().execute("SELECT key, value FROM secrets").fetchall()
|
|
return {r["key"]: r["value"] for r in rows}
|
|
|
|
|
|
def seed_secrets_from_env() -> None:
|
|
"""Copy env values into the DB for any secret key that's still empty.
|
|
Idempotent: existing DB values are never overwritten."""
|
|
import os
|
|
for k in SECRET_KEYS:
|
|
existing = get_secret(k)
|
|
if existing:
|
|
continue
|
|
env_val = os.environ.get(k, "")
|
|
if env_val:
|
|
set_secret(k, env_val)
|
|
logger.info("seeded secret %s from env", k)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# System state
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def get_state(key: str) -> Optional[str]:
|
|
row = _get_conn().execute("SELECT value FROM system_state WHERE key = ?", (key,)).fetchone()
|
|
return row["value"] if row else None
|
|
|
|
|
|
def set_state(key: str, value: str) -> None:
|
|
with _lock:
|
|
_get_conn().execute(
|
|
"INSERT INTO system_state(key, value) VALUES (?, ?) "
|
|
"ON CONFLICT(key) DO UPDATE SET value = excluded.value",
|
|
(key, value),
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Users
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _ensure_user_rows(user_id: int) -> None:
|
|
ts = now_iso()
|
|
with _lock:
|
|
for q in (
|
|
"INSERT OR IGNORE INTO user_profiles(user_id, updated_at) VALUES (?, ?)",
|
|
"INSERT OR IGNORE INTO user_filters(user_id, updated_at) VALUES (?, ?)",
|
|
"INSERT OR IGNORE INTO user_notifications(user_id, updated_at) VALUES (?, ?)",
|
|
"INSERT OR IGNORE INTO user_preferences(user_id, updated_at) VALUES (?, ?)",
|
|
):
|
|
_get_conn().execute(q, (user_id, ts))
|
|
|
|
|
|
def create_user(username: str, password_hash: str, is_admin: bool = False) -> int:
|
|
ts = now_iso()
|
|
with _lock:
|
|
cur = _get_conn().execute(
|
|
"INSERT INTO users(username, password_hash, is_admin, created_at, updated_at) "
|
|
"VALUES (?, ?, ?, ?, ?)",
|
|
(username, password_hash, 1 if is_admin else 0, ts, ts),
|
|
)
|
|
uid = cur.lastrowid
|
|
_ensure_user_rows(uid)
|
|
logger.info("created user id=%s username=%s admin=%s", uid, username, is_admin)
|
|
return uid
|
|
|
|
|
|
def get_user_by_username(username: str) -> Optional[sqlite3.Row]:
|
|
return _get_conn().execute(
|
|
"SELECT * FROM users WHERE username = ? COLLATE NOCASE AND disabled = 0", (username,)
|
|
).fetchone()
|
|
|
|
|
|
def get_user(user_id: int) -> Optional[sqlite3.Row]:
|
|
return _get_conn().execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone()
|
|
|
|
|
|
def list_users() -> list[sqlite3.Row]:
|
|
return list(_get_conn().execute("SELECT * FROM users ORDER BY username").fetchall())
|
|
|
|
|
|
def set_user_password(user_id: int, password_hash: str) -> None:
|
|
with _lock:
|
|
_get_conn().execute(
|
|
"UPDATE users SET password_hash = ?, updated_at = ? WHERE id = ?",
|
|
(password_hash, now_iso(), user_id),
|
|
)
|
|
|
|
|
|
def set_user_disabled(user_id: int, disabled: bool) -> None:
|
|
with _lock:
|
|
_get_conn().execute(
|
|
"UPDATE users SET disabled = ?, updated_at = ? WHERE id = ?",
|
|
(1 if disabled else 0, now_iso(), user_id),
|
|
)
|
|
|
|
|
|
def delete_user(user_id: int) -> None:
|
|
"""Remove a user and everything that cascades (profile, filters, notifications,
|
|
preferences, rejections, applications). Audit/error logs stay (user_id column
|
|
is nullable)."""
|
|
with _lock:
|
|
_get_conn().execute("DELETE FROM users WHERE id = ?", (user_id,))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# User profile / filters / notifications / preferences
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def get_profile(user_id: int) -> sqlite3.Row:
|
|
_ensure_user_rows(user_id)
|
|
return _get_conn().execute("SELECT * FROM user_profiles WHERE user_id = ?", (user_id,)).fetchone()
|
|
|
|
|
|
def update_profile(user_id: int, data: dict) -> None:
|
|
_ensure_user_rows(user_id)
|
|
allowed = {
|
|
"salutation", "firstname", "lastname", "email", "telephone",
|
|
"street", "house_number", "postcode", "city",
|
|
"is_possessing_wbs", "wbs_type", "wbs_valid_till",
|
|
"wbs_rooms", "wbs_adults", "wbs_children", "is_prio_wbs",
|
|
"immomio_email", "immomio_password",
|
|
}
|
|
clean = {k: v for k, v in data.items() if k in allowed}
|
|
if not clean:
|
|
return
|
|
cols = ", ".join(f"{k} = ?" for k in clean)
|
|
vals = list(clean.values()) + [now_iso(), user_id]
|
|
with _lock:
|
|
_get_conn().execute(f"UPDATE user_profiles SET {cols}, updated_at = ? WHERE user_id = ?", vals)
|
|
|
|
|
|
def get_filters(user_id: int) -> sqlite3.Row:
|
|
_ensure_user_rows(user_id)
|
|
return _get_conn().execute("SELECT * FROM user_filters WHERE user_id = ?", (user_id,)).fetchone()
|
|
|
|
|
|
def update_filters(user_id: int, data: dict) -> None:
|
|
_ensure_user_rows(user_id)
|
|
allowed = {"rooms_min", "rooms_max", "max_rent", "min_size",
|
|
"max_morning_commute", "wbs_required", "max_age_hours"}
|
|
clean = {k: data.get(k) for k in allowed if k in data}
|
|
if not clean:
|
|
return
|
|
cols = ", ".join(f"{k} = ?" for k in clean)
|
|
vals = list(clean.values()) + [now_iso(), user_id]
|
|
with _lock:
|
|
_get_conn().execute(f"UPDATE user_filters SET {cols}, updated_at = ? WHERE user_id = ?", vals)
|
|
|
|
|
|
def get_notifications(user_id: int) -> sqlite3.Row:
|
|
_ensure_user_rows(user_id)
|
|
return _get_conn().execute("SELECT * FROM user_notifications WHERE user_id = ?", (user_id,)).fetchone()
|
|
|
|
|
|
def update_notifications(user_id: int, data: dict) -> None:
|
|
_ensure_user_rows(user_id)
|
|
allowed = {
|
|
"channel", "telegram_bot_token", "telegram_chat_id", "email_address",
|
|
"notify_on_match", "notify_on_apply_success", "notify_on_apply_fail",
|
|
}
|
|
clean = {k: v for k, v in data.items() if k in allowed}
|
|
if not clean:
|
|
return
|
|
cols = ", ".join(f"{k} = ?" for k in clean)
|
|
vals = list(clean.values()) + [now_iso(), user_id]
|
|
with _lock:
|
|
_get_conn().execute(f"UPDATE user_notifications SET {cols}, updated_at = ? WHERE user_id = ?", vals)
|
|
|
|
|
|
def get_preferences(user_id: int) -> sqlite3.Row:
|
|
_ensure_user_rows(user_id)
|
|
return _get_conn().execute("SELECT * FROM user_preferences WHERE user_id = ?", (user_id,)).fetchone()
|
|
|
|
|
|
def update_preferences(user_id: int, data: dict) -> None:
|
|
_ensure_user_rows(user_id)
|
|
allowed = {
|
|
"auto_apply_enabled", "submit_forms", "kill_switch",
|
|
"apply_circuit_open", "apply_recent_failures",
|
|
}
|
|
clean = {k: v for k, v in data.items() if k in allowed}
|
|
if not clean:
|
|
return
|
|
cols = ", ".join(f"{k} = ?" for k in clean)
|
|
vals = list(clean.values()) + [now_iso(), user_id]
|
|
with _lock:
|
|
_get_conn().execute(f"UPDATE user_preferences SET {cols}, updated_at = ? WHERE user_id = ?", vals)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Flats
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def upsert_flat(payload: dict) -> bool:
|
|
flat_id = str(payload["id"])
|
|
with _lock:
|
|
existing = _get_conn().execute(
|
|
"SELECT id, lat, lng FROM flats WHERE id = ?", (flat_id,)
|
|
).fetchone()
|
|
if existing:
|
|
# Backfill coords on old rows that pre-date the lat/lng migration.
|
|
if (existing["lat"] is None or existing["lng"] is None) \
|
|
and payload.get("lat") is not None and payload.get("lng") is not None:
|
|
_get_conn().execute(
|
|
"UPDATE flats SET lat = ?, lng = ? WHERE id = ?",
|
|
(payload["lat"], payload["lng"], flat_id),
|
|
)
|
|
return False
|
|
c = payload.get("connectivity") or {}
|
|
_get_conn().execute(
|
|
"""INSERT INTO flats(
|
|
id, link, address, rooms, size, total_rent, sqm_price, year_built, wbs,
|
|
connectivity_morning_time, connectivity_night_time, address_link_gmaps,
|
|
payload_json, discovered_at, lat, lng
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(
|
|
flat_id, payload.get("link", ""), payload.get("address", ""),
|
|
payload.get("rooms"), payload.get("size"), payload.get("total_rent"),
|
|
payload.get("sqm_price"), str(payload.get("year_built", "")),
|
|
str(payload.get("wbs", "")),
|
|
c.get("morning_time"), c.get("night_time"),
|
|
payload.get("address_link_gmaps"),
|
|
json.dumps(payload, default=str),
|
|
now_iso(),
|
|
payload.get("lat"), payload.get("lng"),
|
|
),
|
|
)
|
|
return True
|
|
|
|
|
|
def recent_flats(limit: int = 50) -> list[sqlite3.Row]:
|
|
return list(_get_conn().execute(
|
|
"SELECT * FROM flats ORDER BY discovered_at DESC LIMIT ?", (limit,)
|
|
).fetchall())
|
|
|
|
|
|
def get_flat(flat_id: str) -> Optional[sqlite3.Row]:
|
|
return _get_conn().execute("SELECT * FROM flats WHERE id = ?", (flat_id,)).fetchone()
|
|
|
|
|
|
def set_flat_enrichment(flat_id: str, status: str,
|
|
enrichment: Optional[dict] = None,
|
|
image_count: int = 0) -> None:
|
|
with _lock:
|
|
_get_conn().execute(
|
|
"""UPDATE flats SET enrichment_status = ?,
|
|
enrichment_json = ?,
|
|
enrichment_updated_at = ?,
|
|
image_count = ?
|
|
WHERE id = ?""",
|
|
(status,
|
|
json.dumps(enrichment) if enrichment is not None else None,
|
|
now_iso(), image_count, flat_id),
|
|
)
|
|
|
|
|
|
def flats_needing_enrichment(limit: int = 100) -> list[sqlite3.Row]:
|
|
return list(_get_conn().execute(
|
|
"""SELECT id, link FROM flats
|
|
WHERE enrichment_status IN ('pending', 'failed')
|
|
ORDER BY discovered_at DESC LIMIT ?""",
|
|
(limit,),
|
|
).fetchall())
|
|
|
|
|
|
def enrichment_counts() -> dict:
|
|
row = _get_conn().execute(
|
|
"""SELECT
|
|
COUNT(*) AS total,
|
|
SUM(CASE WHEN enrichment_status = 'ok' THEN 1 ELSE 0 END) AS ok,
|
|
SUM(CASE WHEN enrichment_status = 'pending' THEN 1 ELSE 0 END) AS pending,
|
|
SUM(CASE WHEN enrichment_status = 'failed' THEN 1 ELSE 0 END) AS failed
|
|
FROM flats"""
|
|
).fetchone()
|
|
return {
|
|
"total": int(row["total"] or 0),
|
|
"ok": int(row["ok"] or 0),
|
|
"pending": int(row["pending"] or 0),
|
|
"failed": int(row["failed"] or 0),
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Applications
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def start_application(user_id: int, flat_id: str, url: str, triggered_by: str,
|
|
submit_forms: bool, profile_snapshot: dict) -> int:
|
|
with _lock:
|
|
cur = _get_conn().execute(
|
|
"""INSERT INTO applications(
|
|
user_id, flat_id, url, triggered_by, submit_forms_used,
|
|
started_at, profile_snapshot_json
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?)""",
|
|
(user_id, flat_id, url, triggered_by, 1 if submit_forms else 0,
|
|
now_iso(), json.dumps(profile_snapshot)),
|
|
)
|
|
return cur.lastrowid
|
|
|
|
|
|
def finish_application(app_id: int, success: bool, message: str,
|
|
provider: str = "", forensics: Optional[dict] = None) -> None:
|
|
with _lock:
|
|
_get_conn().execute(
|
|
"""UPDATE applications SET finished_at = ?, success = ?, message = ?,
|
|
provider = ?, forensics_json = ?
|
|
WHERE id = ?""",
|
|
(now_iso(), 1 if success else 0, message, provider,
|
|
json.dumps(forensics) if forensics is not None else None,
|
|
app_id),
|
|
)
|
|
|
|
|
|
def get_application(app_id: int) -> Optional[sqlite3.Row]:
|
|
return _get_conn().execute("SELECT * FROM applications WHERE id = ?", (app_id,)).fetchone()
|
|
|
|
|
|
def recent_applications(user_id: Optional[int], limit: int = 50) -> list[sqlite3.Row]:
|
|
if user_id is None:
|
|
return list(_get_conn().execute(
|
|
"""SELECT a.*, f.address, f.link
|
|
FROM applications a LEFT JOIN flats f ON f.id = a.flat_id
|
|
ORDER BY a.started_at DESC LIMIT ?""", (limit,)
|
|
).fetchall())
|
|
return list(_get_conn().execute(
|
|
"""SELECT a.*, f.address, f.link
|
|
FROM applications a LEFT JOIN flats f ON f.id = a.flat_id
|
|
WHERE a.user_id = ?
|
|
ORDER BY a.started_at DESC LIMIT ?""",
|
|
(user_id, limit),
|
|
).fetchall())
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Rejections (flats a user doesn't want to see anymore)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def reject_flat(user_id: int, flat_id: str) -> None:
|
|
with _lock:
|
|
_get_conn().execute(
|
|
"INSERT OR IGNORE INTO flat_rejections(user_id, flat_id, rejected_at) VALUES (?, ?, ?)",
|
|
(user_id, flat_id, now_iso()),
|
|
)
|
|
|
|
|
|
def unreject_flat(user_id: int, flat_id: str) -> None:
|
|
with _lock:
|
|
_get_conn().execute(
|
|
"DELETE FROM flat_rejections WHERE user_id = ? AND flat_id = ?",
|
|
(user_id, flat_id),
|
|
)
|
|
|
|
|
|
def rejected_flat_ids(user_id: int) -> set[str]:
|
|
rows = _get_conn().execute(
|
|
"SELECT flat_id FROM flat_rejections WHERE user_id = ?", (user_id,)
|
|
).fetchall()
|
|
return {row["flat_id"] for row in rows}
|
|
|
|
|
|
def rejected_flats(user_id: int, limit: int = 200) -> list[sqlite3.Row]:
|
|
return list(_get_conn().execute(
|
|
"""SELECT f.*, r.rejected_at
|
|
FROM flat_rejections r JOIN flats f ON f.id = r.flat_id
|
|
WHERE r.user_id = ?
|
|
ORDER BY r.rejected_at DESC LIMIT ?""",
|
|
(user_id, limit),
|
|
).fetchall())
|
|
|
|
|
|
def last_application_for_flat(user_id: int, flat_id: str) -> Optional[sqlite3.Row]:
|
|
return _get_conn().execute(
|
|
"""SELECT * FROM applications
|
|
WHERE user_id = ? AND flat_id = ?
|
|
ORDER BY started_at DESC LIMIT 1""",
|
|
(user_id, flat_id),
|
|
).fetchone()
|
|
|
|
|
|
def has_running_application(user_id: int) -> bool:
|
|
row = _get_conn().execute(
|
|
"SELECT 1 FROM applications WHERE user_id = ? AND finished_at IS NULL LIMIT 1",
|
|
(user_id,),
|
|
).fetchone()
|
|
return row is not None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Errors
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def log_error(source: str, kind: str, summary: str,
|
|
user_id: Optional[int] = None, application_id: Optional[int] = None,
|
|
context: Optional[dict] = None) -> int:
|
|
with _lock:
|
|
cur = _get_conn().execute(
|
|
"""INSERT INTO errors(timestamp, user_id, source, kind, summary, application_id, context_json)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)""",
|
|
(now_iso(), user_id, source, kind, summary, application_id,
|
|
json.dumps(context) if context else None),
|
|
)
|
|
return cur.lastrowid
|
|
|
|
|
|
def recent_errors(user_id: Optional[int], limit: int = 100,
|
|
include_global: bool = False) -> list[sqlite3.Row]:
|
|
if user_id is None:
|
|
return list(_get_conn().execute(
|
|
"SELECT * FROM errors ORDER BY timestamp DESC LIMIT ?", (limit,)
|
|
).fetchall())
|
|
if include_global:
|
|
return list(_get_conn().execute(
|
|
"""SELECT * FROM errors
|
|
WHERE user_id = ? OR user_id IS NULL
|
|
ORDER BY timestamp DESC LIMIT ?""",
|
|
(user_id, limit),
|
|
).fetchall())
|
|
return list(_get_conn().execute(
|
|
"SELECT * FROM errors WHERE user_id = ? ORDER BY timestamp DESC LIMIT ?",
|
|
(user_id, limit),
|
|
).fetchall())
|
|
|
|
|
|
def get_error(error_id: int) -> Optional[sqlite3.Row]:
|
|
return _get_conn().execute("SELECT * FROM errors WHERE id = ?", (error_id,)).fetchone()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Audit log
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def log_audit(actor: str, action: str, details: str = "",
|
|
user_id: Optional[int] = None, ip: str = "") -> None:
|
|
with _lock:
|
|
_get_conn().execute(
|
|
"INSERT INTO audit_log(timestamp, user_id, actor, action, details, ip) "
|
|
"VALUES (?, ?, ?, ?, ?, ?)",
|
|
(now_iso(), user_id, actor, action, details, ip),
|
|
)
|
|
|
|
|
|
def recent_audit(user_id: Optional[int], limit: int = 100) -> list[sqlite3.Row]:
|
|
if user_id is None:
|
|
return list(_get_conn().execute(
|
|
"SELECT * FROM audit_log ORDER BY timestamp DESC LIMIT ?", (limit,)
|
|
).fetchall())
|
|
return list(_get_conn().execute(
|
|
"SELECT * FROM audit_log WHERE user_id = ? OR user_id IS NULL ORDER BY timestamp DESC LIMIT ?",
|
|
(user_id, limit),
|
|
).fetchall())
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Retention cleanup
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Partnerships
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def get_accepted_partnership(user_id: int) -> Optional[sqlite3.Row]:
|
|
return _get_conn().execute(
|
|
"""SELECT * FROM partnerships
|
|
WHERE status = 'accepted'
|
|
AND (from_user_id = ? OR to_user_id = ?) LIMIT 1""",
|
|
(user_id, user_id),
|
|
).fetchone()
|
|
|
|
|
|
def get_partner_user(user_id: int) -> Optional[sqlite3.Row]:
|
|
row = get_accepted_partnership(user_id)
|
|
if not row:
|
|
return None
|
|
other = row["to_user_id"] if row["from_user_id"] == user_id else row["from_user_id"]
|
|
return get_user(other)
|
|
|
|
|
|
def partnership_incoming(user_id: int) -> list[sqlite3.Row]:
|
|
return list(_get_conn().execute(
|
|
"""SELECT p.*, u.username AS from_username
|
|
FROM partnerships p JOIN users u ON u.id = p.from_user_id
|
|
WHERE p.status = 'pending' AND p.to_user_id = ?
|
|
ORDER BY p.created_at DESC""",
|
|
(user_id,),
|
|
).fetchall())
|
|
|
|
|
|
def partnership_outgoing(user_id: int) -> list[sqlite3.Row]:
|
|
return list(_get_conn().execute(
|
|
"""SELECT p.*, u.username AS to_username
|
|
FROM partnerships p JOIN users u ON u.id = p.to_user_id
|
|
WHERE p.status = 'pending' AND p.from_user_id = ?
|
|
ORDER BY p.created_at DESC""",
|
|
(user_id,),
|
|
).fetchall())
|
|
|
|
|
|
def partnership_request(from_id: int, to_id: int) -> Optional[int]:
|
|
"""Create a pending request. Returns id or None if rejected (self, already
|
|
linked on either side, or a pending/accepted row already exists)."""
|
|
if from_id == to_id:
|
|
return None
|
|
if get_accepted_partnership(from_id) or get_accepted_partnership(to_id):
|
|
return None
|
|
with _lock:
|
|
# Reject duplicate in either direction.
|
|
dup = _get_conn().execute(
|
|
"""SELECT id FROM partnerships
|
|
WHERE (from_user_id = ? AND to_user_id = ?)
|
|
OR (from_user_id = ? AND to_user_id = ?)""",
|
|
(from_id, to_id, to_id, from_id),
|
|
).fetchone()
|
|
if dup:
|
|
return None
|
|
cur = _get_conn().execute(
|
|
"INSERT INTO partnerships(from_user_id, to_user_id, status, created_at) "
|
|
"VALUES (?, ?, 'pending', ?)",
|
|
(from_id, to_id, now_iso()),
|
|
)
|
|
return cur.lastrowid
|
|
|
|
|
|
def partnership_accept(request_id: int, user_id: int) -> bool:
|
|
"""Accept a pending request addressed to user_id. Also wipes any other
|
|
pending rows involving either partner."""
|
|
row = _get_conn().execute(
|
|
"SELECT * FROM partnerships WHERE id = ? AND status = 'pending'",
|
|
(request_id,),
|
|
).fetchone()
|
|
if not row or row["to_user_id"] != user_id:
|
|
return False
|
|
# Don't allow accept if either side already has an accepted partner.
|
|
if get_accepted_partnership(row["from_user_id"]) or get_accepted_partnership(user_id):
|
|
return False
|
|
partner_id = row["from_user_id"]
|
|
with _lock:
|
|
_get_conn().execute(
|
|
"UPDATE partnerships SET status = 'accepted', accepted_at = ? WHERE id = ?",
|
|
(now_iso(), request_id),
|
|
)
|
|
# clean up any stale pending requests touching either user
|
|
_get_conn().execute(
|
|
"""DELETE FROM partnerships
|
|
WHERE status = 'pending'
|
|
AND (from_user_id IN (?, ?) OR to_user_id IN (?, ?))""",
|
|
(user_id, partner_id, user_id, partner_id),
|
|
)
|
|
return True
|
|
|
|
|
|
def partnership_decline(request_id: int, user_id: int) -> bool:
|
|
"""Decline an incoming pending request (deletes the row)."""
|
|
with _lock:
|
|
cur = _get_conn().execute(
|
|
"""DELETE FROM partnerships
|
|
WHERE id = ? AND status = 'pending'
|
|
AND (to_user_id = ? OR from_user_id = ?)""",
|
|
(request_id, user_id, user_id),
|
|
)
|
|
return cur.rowcount > 0
|
|
|
|
|
|
def partnership_unlink(user_id: int) -> bool:
|
|
"""Remove the current accepted partnership (either side can call)."""
|
|
with _lock:
|
|
cur = _get_conn().execute(
|
|
"""DELETE FROM partnerships
|
|
WHERE status = 'accepted'
|
|
AND (from_user_id = ? OR to_user_id = ?)""",
|
|
(user_id, user_id),
|
|
)
|
|
return cur.rowcount > 0
|
|
|
|
|
|
def partner_flat_actions(partner_id: int) -> dict:
|
|
"""Flats the partner has touched. 'applied' = any application (regardless
|
|
of outcome); 'rejected' = in flat_rejections."""
|
|
applied = {r["flat_id"] for r in _get_conn().execute(
|
|
"SELECT DISTINCT flat_id FROM applications WHERE user_id = ?", (partner_id,)
|
|
).fetchall()}
|
|
rejected = {r["flat_id"] for r in _get_conn().execute(
|
|
"SELECT flat_id FROM flat_rejections WHERE user_id = ?", (partner_id,)
|
|
).fetchall()}
|
|
return {"applied": applied, "rejected": rejected}
|
|
|
|
|
|
def cleanup_retention() -> dict:
|
|
cutoff = (datetime.now(timezone.utc) - timedelta(days=RETENTION_DAYS)).isoformat(timespec="seconds")
|
|
stats = {}
|
|
with _lock:
|
|
for table in ("errors", "audit_log"):
|
|
cur = _get_conn().execute(f"DELETE FROM {table} WHERE timestamp < ?", (cutoff,))
|
|
stats[table] = cur.rowcount
|
|
# Drop forensics from old applications (but keep the row itself for history)
|
|
cur = _get_conn().execute(
|
|
"UPDATE applications SET forensics_json = NULL WHERE started_at < ? AND forensics_json IS NOT NULL",
|
|
(cutoff,),
|
|
)
|
|
stats["applications_forensics_wiped"] = cur.rowcount
|
|
if any(v for v in stats.values()):
|
|
logger.info("retention cleanup: %s", stats)
|
|
return stats
|