1. New /admin route with sub-tabs (Protokoll, Benutzer) for admins. Top nav: "Protokoll" dropped, "Admin" added right of Einstellungen. /logs and /einstellungen/benutzer issue 301 redirects to the new paths. Benutzer is no longer part of Einstellungen sub-nav. 2. User_filters.max_age_hours (migration v6) — new dropdown (1–10 h / beliebig) under Einstellungen → Filter; Wohnungen list drops flats older than the cutoff by discovered_at. 3. Header shows "aktualisiert vor X s" instead of a countdown. Template emits data-counter-up-utc with last_alert_heartbeat; app.js ticks up each second. When a scrape runs, the heartbeat updates and the HTMX swap resets the counter naturally. 4. Chevron state synced after HTMX swaps: panes preserved via hx-preserve keep the user's open/closed state, and the sibling button's .open class is re-applied by syncFlatExpandState() on afterSwap — previously a scroll-triggered poll would flip the chevron back to closed while the pane stayed open. 5. "Final absenden" footer removed from the profile page (functionality is unchanged, the switch still sits atop Wohnungen). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
685 lines
25 KiB
Python
685 lines
25 KiB
Python
"""
|
|
SQLite data layer for lazyflat.
|
|
|
|
Multi-user: users, per-user profiles/filters/notifications/preferences.
|
|
All per-user rows are 1:1 with users. Errors and forensics are retained
|
|
for 14 days and cleaned up periodically.
|
|
"""
|
|
import json
|
|
import logging
|
|
import sqlite3
|
|
import threading
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Any, Iterable, Optional
|
|
|
|
from settings import DB_PATH, RETENTION_DAYS
|
|
|
|
logger = logging.getLogger("web.db")
|
|
|
|
_lock = threading.Lock()
|
|
|
|
|
|
def _connect() -> sqlite3.Connection:
|
|
conn = sqlite3.connect(DB_PATH, isolation_level=None, check_same_thread=False)
|
|
conn.row_factory = sqlite3.Row
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.execute("PRAGMA foreign_keys=ON")
|
|
return conn
|
|
|
|
|
|
_conn: sqlite3.Connection = _connect()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Schema
|
|
# ---------------------------------------------------------------------------
|
|
|
|
MIGRATIONS: list[str] = [
|
|
# 0001: drop legacy single-user tables if present (only matters on upgrade
|
|
# from the pre-multi-user commit; no production data yet). Safe for fresh DBs.
|
|
"""
|
|
DROP TABLE IF EXISTS applications;
|
|
DROP TABLE IF EXISTS flats;
|
|
DROP TABLE IF EXISTS state;
|
|
DROP TABLE IF EXISTS audit_log;
|
|
""",
|
|
# 0002: base + users
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS schema_version (version INTEGER PRIMARY KEY);
|
|
|
|
CREATE TABLE IF NOT EXISTS users (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
username TEXT UNIQUE NOT NULL COLLATE NOCASE,
|
|
password_hash TEXT NOT NULL,
|
|
is_admin INTEGER NOT NULL DEFAULT 0,
|
|
disabled INTEGER NOT NULL DEFAULT 0,
|
|
created_at TEXT NOT NULL,
|
|
updated_at TEXT NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS user_profiles (
|
|
user_id INTEGER PRIMARY KEY REFERENCES users(id) ON DELETE CASCADE,
|
|
salutation TEXT DEFAULT '',
|
|
firstname TEXT DEFAULT '',
|
|
lastname TEXT DEFAULT '',
|
|
email TEXT DEFAULT '',
|
|
telephone TEXT DEFAULT '',
|
|
street TEXT DEFAULT '',
|
|
house_number TEXT DEFAULT '',
|
|
postcode TEXT DEFAULT '',
|
|
city TEXT DEFAULT '',
|
|
is_possessing_wbs INTEGER NOT NULL DEFAULT 0,
|
|
wbs_type TEXT DEFAULT '0',
|
|
wbs_valid_till TEXT DEFAULT '1970-01-01',
|
|
wbs_rooms INTEGER NOT NULL DEFAULT 0,
|
|
wbs_adults INTEGER NOT NULL DEFAULT 0,
|
|
wbs_children INTEGER NOT NULL DEFAULT 0,
|
|
is_prio_wbs INTEGER NOT NULL DEFAULT 0,
|
|
immomio_email TEXT DEFAULT '',
|
|
immomio_password TEXT DEFAULT '',
|
|
updated_at TEXT NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS user_filters (
|
|
user_id INTEGER PRIMARY KEY REFERENCES users(id) ON DELETE CASCADE,
|
|
rooms_min REAL,
|
|
rooms_max REAL,
|
|
max_rent REAL,
|
|
min_size REAL,
|
|
max_morning_commute REAL,
|
|
wbs_required TEXT DEFAULT '', -- '', 'yes', 'no'
|
|
updated_at TEXT NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS user_notifications (
|
|
user_id INTEGER PRIMARY KEY REFERENCES users(id) ON DELETE CASCADE,
|
|
channel TEXT NOT NULL DEFAULT 'ui', -- 'ui' | 'telegram' | 'email'
|
|
telegram_bot_token TEXT DEFAULT '',
|
|
telegram_chat_id TEXT DEFAULT '',
|
|
email_address TEXT DEFAULT '',
|
|
notify_on_match INTEGER NOT NULL DEFAULT 1,
|
|
notify_on_apply_success INTEGER NOT NULL DEFAULT 1,
|
|
notify_on_apply_fail INTEGER NOT NULL DEFAULT 1,
|
|
updated_at TEXT NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS user_preferences (
|
|
user_id INTEGER PRIMARY KEY REFERENCES users(id) ON DELETE CASCADE,
|
|
auto_apply_enabled INTEGER NOT NULL DEFAULT 0,
|
|
submit_forms INTEGER NOT NULL DEFAULT 0,
|
|
kill_switch INTEGER NOT NULL DEFAULT 0,
|
|
apply_circuit_open INTEGER NOT NULL DEFAULT 0,
|
|
apply_recent_failures INTEGER NOT NULL DEFAULT 0,
|
|
updated_at TEXT NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS flats (
|
|
id TEXT PRIMARY KEY,
|
|
link TEXT NOT NULL,
|
|
address TEXT,
|
|
rooms REAL,
|
|
size REAL,
|
|
total_rent REAL,
|
|
sqm_price REAL,
|
|
year_built TEXT,
|
|
wbs TEXT,
|
|
connectivity_morning_time REAL,
|
|
connectivity_night_time REAL,
|
|
address_link_gmaps TEXT,
|
|
payload_json TEXT NOT NULL,
|
|
discovered_at TEXT NOT NULL
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_flats_discovered ON flats(discovered_at DESC);
|
|
|
|
CREATE TABLE IF NOT EXISTS applications (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
|
|
flat_id TEXT NOT NULL REFERENCES flats(id),
|
|
url TEXT NOT NULL,
|
|
triggered_by TEXT NOT NULL, -- 'user' | 'auto'
|
|
submit_forms_used INTEGER NOT NULL DEFAULT 0,
|
|
provider TEXT DEFAULT '',
|
|
started_at TEXT NOT NULL,
|
|
finished_at TEXT,
|
|
success INTEGER,
|
|
message TEXT,
|
|
profile_snapshot_json TEXT,
|
|
forensics_json TEXT -- structured payload from apply service
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_applications_user ON applications(user_id, started_at DESC);
|
|
CREATE INDEX IF NOT EXISTS idx_applications_started ON applications(started_at DESC);
|
|
CREATE INDEX IF NOT EXISTS idx_applications_flat ON applications(flat_id);
|
|
|
|
CREATE TABLE IF NOT EXISTS errors (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp TEXT NOT NULL,
|
|
user_id INTEGER,
|
|
source TEXT NOT NULL, -- 'apply'|'alert'|'web'|'system'
|
|
kind TEXT NOT NULL, -- 'apply_failure'|'scraper_error'|...
|
|
summary TEXT NOT NULL,
|
|
application_id INTEGER,
|
|
context_json TEXT
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_errors_timestamp ON errors(timestamp DESC);
|
|
CREATE INDEX IF NOT EXISTS idx_errors_user ON errors(user_id, timestamp DESC);
|
|
|
|
CREATE TABLE IF NOT EXISTS audit_log (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp TEXT NOT NULL,
|
|
user_id INTEGER,
|
|
actor TEXT NOT NULL,
|
|
action TEXT NOT NULL,
|
|
details TEXT,
|
|
ip TEXT
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_audit_timestamp ON audit_log(timestamp DESC);
|
|
CREATE INDEX IF NOT EXISTS idx_audit_user ON audit_log(user_id, timestamp DESC);
|
|
|
|
CREATE TABLE IF NOT EXISTS system_state (
|
|
key TEXT PRIMARY KEY,
|
|
value TEXT NOT NULL
|
|
);
|
|
""",
|
|
# 0003: lat/lng for map view
|
|
"""
|
|
ALTER TABLE flats ADD COLUMN lat REAL;
|
|
ALTER TABLE flats ADD COLUMN lng REAL;
|
|
""",
|
|
# 0004: per-user rejections — flats the user doesn't want in the list anymore
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS flat_rejections (
|
|
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
|
|
flat_id TEXT NOT NULL REFERENCES flats(id),
|
|
rejected_at TEXT NOT NULL,
|
|
PRIMARY KEY (user_id, flat_id)
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_rejections_user ON flat_rejections(user_id);
|
|
""",
|
|
# 0005: LLM enrichment — extracted details + downloaded image count per flat
|
|
"""
|
|
ALTER TABLE flats ADD COLUMN enrichment_json TEXT;
|
|
ALTER TABLE flats ADD COLUMN enrichment_status TEXT NOT NULL DEFAULT 'pending';
|
|
ALTER TABLE flats ADD COLUMN enrichment_updated_at TEXT;
|
|
ALTER TABLE flats ADD COLUMN image_count INTEGER NOT NULL DEFAULT 0;
|
|
""",
|
|
# 0006: time filter — hide flats older than X hours (NULL = no limit)
|
|
"""
|
|
ALTER TABLE user_filters ADD COLUMN max_age_hours INTEGER;
|
|
""",
|
|
]
|
|
|
|
|
|
def _current_version() -> int:
|
|
try:
|
|
row = _conn.execute("SELECT COALESCE(MAX(version), 0) AS v FROM schema_version").fetchone()
|
|
return int(row["v"]) if row else 0
|
|
except sqlite3.Error:
|
|
return 0
|
|
|
|
|
|
def init_db() -> None:
|
|
with _lock:
|
|
_conn.execute("CREATE TABLE IF NOT EXISTS schema_version (version INTEGER PRIMARY KEY)")
|
|
current = _current_version()
|
|
for i, script in enumerate(MIGRATIONS, start=1):
|
|
if i <= current:
|
|
continue
|
|
logger.info("applying migration v%d", i)
|
|
_conn.executescript(script)
|
|
_conn.execute("INSERT OR IGNORE INTO schema_version(version) VALUES (?)", (i,))
|
|
_conn.execute(
|
|
"INSERT OR IGNORE INTO system_state(key, value) VALUES ('last_alert_heartbeat', '')"
|
|
)
|
|
logger.info("DB initialized (schema v%d)", _current_version())
|
|
|
|
|
|
def now_iso() -> str:
|
|
return datetime.now(timezone.utc).isoformat(timespec="seconds")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# System state
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def get_state(key: str) -> Optional[str]:
|
|
row = _conn.execute("SELECT value FROM system_state WHERE key = ?", (key,)).fetchone()
|
|
return row["value"] if row else None
|
|
|
|
|
|
def set_state(key: str, value: str) -> None:
|
|
with _lock:
|
|
_conn.execute(
|
|
"INSERT INTO system_state(key, value) VALUES (?, ?) "
|
|
"ON CONFLICT(key) DO UPDATE SET value = excluded.value",
|
|
(key, value),
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Users
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _ensure_user_rows(user_id: int) -> None:
|
|
ts = now_iso()
|
|
with _lock:
|
|
for q in (
|
|
"INSERT OR IGNORE INTO user_profiles(user_id, updated_at) VALUES (?, ?)",
|
|
"INSERT OR IGNORE INTO user_filters(user_id, updated_at) VALUES (?, ?)",
|
|
"INSERT OR IGNORE INTO user_notifications(user_id, updated_at) VALUES (?, ?)",
|
|
"INSERT OR IGNORE INTO user_preferences(user_id, updated_at) VALUES (?, ?)",
|
|
):
|
|
_conn.execute(q, (user_id, ts))
|
|
|
|
|
|
def create_user(username: str, password_hash: str, is_admin: bool = False) -> int:
|
|
ts = now_iso()
|
|
with _lock:
|
|
cur = _conn.execute(
|
|
"INSERT INTO users(username, password_hash, is_admin, created_at, updated_at) "
|
|
"VALUES (?, ?, ?, ?, ?)",
|
|
(username, password_hash, 1 if is_admin else 0, ts, ts),
|
|
)
|
|
uid = cur.lastrowid
|
|
_ensure_user_rows(uid)
|
|
logger.info("created user id=%s username=%s admin=%s", uid, username, is_admin)
|
|
return uid
|
|
|
|
|
|
def get_user_by_username(username: str) -> Optional[sqlite3.Row]:
|
|
return _conn.execute(
|
|
"SELECT * FROM users WHERE username = ? COLLATE NOCASE AND disabled = 0", (username,)
|
|
).fetchone()
|
|
|
|
|
|
def get_user(user_id: int) -> Optional[sqlite3.Row]:
|
|
return _conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone()
|
|
|
|
|
|
def list_users() -> list[sqlite3.Row]:
|
|
return list(_conn.execute("SELECT * FROM users ORDER BY username").fetchall())
|
|
|
|
|
|
def set_user_password(user_id: int, password_hash: str) -> None:
|
|
with _lock:
|
|
_conn.execute(
|
|
"UPDATE users SET password_hash = ?, updated_at = ? WHERE id = ?",
|
|
(password_hash, now_iso(), user_id),
|
|
)
|
|
|
|
|
|
def set_user_disabled(user_id: int, disabled: bool) -> None:
|
|
with _lock:
|
|
_conn.execute(
|
|
"UPDATE users SET disabled = ?, updated_at = ? WHERE id = ?",
|
|
(1 if disabled else 0, now_iso(), user_id),
|
|
)
|
|
|
|
|
|
def delete_user(user_id: int) -> None:
|
|
"""Remove a user and everything that cascades (profile, filters, notifications,
|
|
preferences, rejections, applications). Audit/error logs stay (user_id column
|
|
is nullable)."""
|
|
with _lock:
|
|
_conn.execute("DELETE FROM users WHERE id = ?", (user_id,))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# User profile / filters / notifications / preferences
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def get_profile(user_id: int) -> sqlite3.Row:
|
|
_ensure_user_rows(user_id)
|
|
return _conn.execute("SELECT * FROM user_profiles WHERE user_id = ?", (user_id,)).fetchone()
|
|
|
|
|
|
def update_profile(user_id: int, data: dict) -> None:
|
|
_ensure_user_rows(user_id)
|
|
allowed = {
|
|
"salutation", "firstname", "lastname", "email", "telephone",
|
|
"street", "house_number", "postcode", "city",
|
|
"is_possessing_wbs", "wbs_type", "wbs_valid_till",
|
|
"wbs_rooms", "wbs_adults", "wbs_children", "is_prio_wbs",
|
|
"immomio_email", "immomio_password",
|
|
}
|
|
clean = {k: v for k, v in data.items() if k in allowed}
|
|
if not clean:
|
|
return
|
|
cols = ", ".join(f"{k} = ?" for k in clean)
|
|
vals = list(clean.values()) + [now_iso(), user_id]
|
|
with _lock:
|
|
_conn.execute(f"UPDATE user_profiles SET {cols}, updated_at = ? WHERE user_id = ?", vals)
|
|
|
|
|
|
def get_filters(user_id: int) -> sqlite3.Row:
|
|
_ensure_user_rows(user_id)
|
|
return _conn.execute("SELECT * FROM user_filters WHERE user_id = ?", (user_id,)).fetchone()
|
|
|
|
|
|
def update_filters(user_id: int, data: dict) -> None:
|
|
_ensure_user_rows(user_id)
|
|
allowed = {"rooms_min", "rooms_max", "max_rent", "min_size",
|
|
"max_morning_commute", "wbs_required", "max_age_hours"}
|
|
clean = {k: data.get(k) for k in allowed if k in data}
|
|
if not clean:
|
|
return
|
|
cols = ", ".join(f"{k} = ?" for k in clean)
|
|
vals = list(clean.values()) + [now_iso(), user_id]
|
|
with _lock:
|
|
_conn.execute(f"UPDATE user_filters SET {cols}, updated_at = ? WHERE user_id = ?", vals)
|
|
|
|
|
|
def get_notifications(user_id: int) -> sqlite3.Row:
|
|
_ensure_user_rows(user_id)
|
|
return _conn.execute("SELECT * FROM user_notifications WHERE user_id = ?", (user_id,)).fetchone()
|
|
|
|
|
|
def update_notifications(user_id: int, data: dict) -> None:
|
|
_ensure_user_rows(user_id)
|
|
allowed = {
|
|
"channel", "telegram_bot_token", "telegram_chat_id", "email_address",
|
|
"notify_on_match", "notify_on_apply_success", "notify_on_apply_fail",
|
|
}
|
|
clean = {k: v for k, v in data.items() if k in allowed}
|
|
if not clean:
|
|
return
|
|
cols = ", ".join(f"{k} = ?" for k in clean)
|
|
vals = list(clean.values()) + [now_iso(), user_id]
|
|
with _lock:
|
|
_conn.execute(f"UPDATE user_notifications SET {cols}, updated_at = ? WHERE user_id = ?", vals)
|
|
|
|
|
|
def get_preferences(user_id: int) -> sqlite3.Row:
|
|
_ensure_user_rows(user_id)
|
|
return _conn.execute("SELECT * FROM user_preferences WHERE user_id = ?", (user_id,)).fetchone()
|
|
|
|
|
|
def update_preferences(user_id: int, data: dict) -> None:
|
|
_ensure_user_rows(user_id)
|
|
allowed = {
|
|
"auto_apply_enabled", "submit_forms", "kill_switch",
|
|
"apply_circuit_open", "apply_recent_failures",
|
|
}
|
|
clean = {k: v for k, v in data.items() if k in allowed}
|
|
if not clean:
|
|
return
|
|
cols = ", ".join(f"{k} = ?" for k in clean)
|
|
vals = list(clean.values()) + [now_iso(), user_id]
|
|
with _lock:
|
|
_conn.execute(f"UPDATE user_preferences SET {cols}, updated_at = ? WHERE user_id = ?", vals)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Flats
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def upsert_flat(payload: dict) -> bool:
|
|
flat_id = str(payload["id"])
|
|
with _lock:
|
|
existing = _conn.execute(
|
|
"SELECT id, lat, lng FROM flats WHERE id = ?", (flat_id,)
|
|
).fetchone()
|
|
if existing:
|
|
# Backfill coords on old rows that pre-date the lat/lng migration.
|
|
if (existing["lat"] is None or existing["lng"] is None) \
|
|
and payload.get("lat") is not None and payload.get("lng") is not None:
|
|
_conn.execute(
|
|
"UPDATE flats SET lat = ?, lng = ? WHERE id = ?",
|
|
(payload["lat"], payload["lng"], flat_id),
|
|
)
|
|
return False
|
|
c = payload.get("connectivity") or {}
|
|
_conn.execute(
|
|
"""INSERT INTO flats(
|
|
id, link, address, rooms, size, total_rent, sqm_price, year_built, wbs,
|
|
connectivity_morning_time, connectivity_night_time, address_link_gmaps,
|
|
payload_json, discovered_at, lat, lng
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(
|
|
flat_id, payload.get("link", ""), payload.get("address", ""),
|
|
payload.get("rooms"), payload.get("size"), payload.get("total_rent"),
|
|
payload.get("sqm_price"), str(payload.get("year_built", "")),
|
|
str(payload.get("wbs", "")),
|
|
c.get("morning_time"), c.get("night_time"),
|
|
payload.get("address_link_gmaps"),
|
|
json.dumps(payload, default=str),
|
|
now_iso(),
|
|
payload.get("lat"), payload.get("lng"),
|
|
),
|
|
)
|
|
return True
|
|
|
|
|
|
def recent_flats(limit: int = 50) -> list[sqlite3.Row]:
|
|
return list(_conn.execute(
|
|
"SELECT * FROM flats ORDER BY discovered_at DESC LIMIT ?", (limit,)
|
|
).fetchall())
|
|
|
|
|
|
def get_flat(flat_id: str) -> Optional[sqlite3.Row]:
|
|
return _conn.execute("SELECT * FROM flats WHERE id = ?", (flat_id,)).fetchone()
|
|
|
|
|
|
def set_flat_enrichment(flat_id: str, status: str,
|
|
enrichment: Optional[dict] = None,
|
|
image_count: int = 0) -> None:
|
|
with _lock:
|
|
_conn.execute(
|
|
"""UPDATE flats SET enrichment_status = ?,
|
|
enrichment_json = ?,
|
|
enrichment_updated_at = ?,
|
|
image_count = ?
|
|
WHERE id = ?""",
|
|
(status,
|
|
json.dumps(enrichment) if enrichment is not None else None,
|
|
now_iso(), image_count, flat_id),
|
|
)
|
|
|
|
|
|
def flats_needing_enrichment(limit: int = 100) -> list[sqlite3.Row]:
|
|
return list(_conn.execute(
|
|
"""SELECT id, link FROM flats
|
|
WHERE enrichment_status IN ('pending', 'failed')
|
|
ORDER BY discovered_at DESC LIMIT ?""",
|
|
(limit,),
|
|
).fetchall())
|
|
|
|
|
|
def enrichment_counts() -> dict:
|
|
row = _conn.execute(
|
|
"""SELECT
|
|
COUNT(*) AS total,
|
|
SUM(CASE WHEN enrichment_status = 'ok' THEN 1 ELSE 0 END) AS ok,
|
|
SUM(CASE WHEN enrichment_status = 'pending' THEN 1 ELSE 0 END) AS pending,
|
|
SUM(CASE WHEN enrichment_status = 'failed' THEN 1 ELSE 0 END) AS failed
|
|
FROM flats"""
|
|
).fetchone()
|
|
return {
|
|
"total": int(row["total"] or 0),
|
|
"ok": int(row["ok"] or 0),
|
|
"pending": int(row["pending"] or 0),
|
|
"failed": int(row["failed"] or 0),
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Applications
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def start_application(user_id: int, flat_id: str, url: str, triggered_by: str,
|
|
submit_forms: bool, profile_snapshot: dict) -> int:
|
|
with _lock:
|
|
cur = _conn.execute(
|
|
"""INSERT INTO applications(
|
|
user_id, flat_id, url, triggered_by, submit_forms_used,
|
|
started_at, profile_snapshot_json
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?)""",
|
|
(user_id, flat_id, url, triggered_by, 1 if submit_forms else 0,
|
|
now_iso(), json.dumps(profile_snapshot)),
|
|
)
|
|
return cur.lastrowid
|
|
|
|
|
|
def finish_application(app_id: int, success: bool, message: str,
|
|
provider: str = "", forensics: Optional[dict] = None) -> None:
|
|
with _lock:
|
|
_conn.execute(
|
|
"""UPDATE applications SET finished_at = ?, success = ?, message = ?,
|
|
provider = ?, forensics_json = ?
|
|
WHERE id = ?""",
|
|
(now_iso(), 1 if success else 0, message, provider,
|
|
json.dumps(forensics) if forensics is not None else None,
|
|
app_id),
|
|
)
|
|
|
|
|
|
def get_application(app_id: int) -> Optional[sqlite3.Row]:
|
|
return _conn.execute("SELECT * FROM applications WHERE id = ?", (app_id,)).fetchone()
|
|
|
|
|
|
def recent_applications(user_id: Optional[int], limit: int = 50) -> list[sqlite3.Row]:
|
|
if user_id is None:
|
|
return list(_conn.execute(
|
|
"""SELECT a.*, f.address, f.link
|
|
FROM applications a LEFT JOIN flats f ON f.id = a.flat_id
|
|
ORDER BY a.started_at DESC LIMIT ?""", (limit,)
|
|
).fetchall())
|
|
return list(_conn.execute(
|
|
"""SELECT a.*, f.address, f.link
|
|
FROM applications a LEFT JOIN flats f ON f.id = a.flat_id
|
|
WHERE a.user_id = ?
|
|
ORDER BY a.started_at DESC LIMIT ?""",
|
|
(user_id, limit),
|
|
).fetchall())
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Rejections (flats a user doesn't want to see anymore)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def reject_flat(user_id: int, flat_id: str) -> None:
|
|
with _lock:
|
|
_conn.execute(
|
|
"INSERT OR IGNORE INTO flat_rejections(user_id, flat_id, rejected_at) VALUES (?, ?, ?)",
|
|
(user_id, flat_id, now_iso()),
|
|
)
|
|
|
|
|
|
def unreject_flat(user_id: int, flat_id: str) -> None:
|
|
with _lock:
|
|
_conn.execute(
|
|
"DELETE FROM flat_rejections WHERE user_id = ? AND flat_id = ?",
|
|
(user_id, flat_id),
|
|
)
|
|
|
|
|
|
def rejected_flat_ids(user_id: int) -> set[str]:
|
|
rows = _conn.execute(
|
|
"SELECT flat_id FROM flat_rejections WHERE user_id = ?", (user_id,)
|
|
).fetchall()
|
|
return {row["flat_id"] for row in rows}
|
|
|
|
|
|
def rejected_flats(user_id: int, limit: int = 200) -> list[sqlite3.Row]:
|
|
return list(_conn.execute(
|
|
"""SELECT f.*, r.rejected_at
|
|
FROM flat_rejections r JOIN flats f ON f.id = r.flat_id
|
|
WHERE r.user_id = ?
|
|
ORDER BY r.rejected_at DESC LIMIT ?""",
|
|
(user_id, limit),
|
|
).fetchall())
|
|
|
|
|
|
def last_application_for_flat(user_id: int, flat_id: str) -> Optional[sqlite3.Row]:
|
|
return _conn.execute(
|
|
"""SELECT * FROM applications
|
|
WHERE user_id = ? AND flat_id = ?
|
|
ORDER BY started_at DESC LIMIT 1""",
|
|
(user_id, flat_id),
|
|
).fetchone()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Errors
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def log_error(source: str, kind: str, summary: str,
|
|
user_id: Optional[int] = None, application_id: Optional[int] = None,
|
|
context: Optional[dict] = None) -> int:
|
|
with _lock:
|
|
cur = _conn.execute(
|
|
"""INSERT INTO errors(timestamp, user_id, source, kind, summary, application_id, context_json)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)""",
|
|
(now_iso(), user_id, source, kind, summary, application_id,
|
|
json.dumps(context) if context else None),
|
|
)
|
|
return cur.lastrowid
|
|
|
|
|
|
def recent_errors(user_id: Optional[int], limit: int = 100,
|
|
include_global: bool = False) -> list[sqlite3.Row]:
|
|
if user_id is None:
|
|
return list(_conn.execute(
|
|
"SELECT * FROM errors ORDER BY timestamp DESC LIMIT ?", (limit,)
|
|
).fetchall())
|
|
if include_global:
|
|
return list(_conn.execute(
|
|
"""SELECT * FROM errors
|
|
WHERE user_id = ? OR user_id IS NULL
|
|
ORDER BY timestamp DESC LIMIT ?""",
|
|
(user_id, limit),
|
|
).fetchall())
|
|
return list(_conn.execute(
|
|
"SELECT * FROM errors WHERE user_id = ? ORDER BY timestamp DESC LIMIT ?",
|
|
(user_id, limit),
|
|
).fetchall())
|
|
|
|
|
|
def get_error(error_id: int) -> Optional[sqlite3.Row]:
|
|
return _conn.execute("SELECT * FROM errors WHERE id = ?", (error_id,)).fetchone()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Audit log
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def log_audit(actor: str, action: str, details: str = "",
|
|
user_id: Optional[int] = None, ip: str = "") -> None:
|
|
with _lock:
|
|
_conn.execute(
|
|
"INSERT INTO audit_log(timestamp, user_id, actor, action, details, ip) "
|
|
"VALUES (?, ?, ?, ?, ?, ?)",
|
|
(now_iso(), user_id, actor, action, details, ip),
|
|
)
|
|
|
|
|
|
def recent_audit(user_id: Optional[int], limit: int = 100) -> list[sqlite3.Row]:
|
|
if user_id is None:
|
|
return list(_conn.execute(
|
|
"SELECT * FROM audit_log ORDER BY timestamp DESC LIMIT ?", (limit,)
|
|
).fetchall())
|
|
return list(_conn.execute(
|
|
"SELECT * FROM audit_log WHERE user_id = ? OR user_id IS NULL ORDER BY timestamp DESC LIMIT ?",
|
|
(user_id, limit),
|
|
).fetchall())
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Retention cleanup
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def cleanup_retention() -> dict:
|
|
cutoff = (datetime.now(timezone.utc) - timedelta(days=RETENTION_DAYS)).isoformat(timespec="seconds")
|
|
stats = {}
|
|
with _lock:
|
|
for table in ("errors", "audit_log"):
|
|
cur = _conn.execute(f"DELETE FROM {table} WHERE timestamp < ?", (cutoff,))
|
|
stats[table] = cur.rowcount
|
|
# Drop forensics from old applications (but keep the row itself for history)
|
|
cur = _conn.execute(
|
|
"UPDATE applications SET forensics_json = NULL WHERE started_at < ? AND forensics_json IS NOT NULL",
|
|
(cutoff,),
|
|
)
|
|
stats["applications_forensics_wiped"] = cur.rowcount
|
|
if any(v for v in stats.values()):
|
|
logger.info("retention cleanup: %s", stats)
|
|
return stats
|