Compare commits

...

2 commits

Author SHA1 Message Date
cb617dd38a perf + simpler: composite index, range-filtered protokoll, simpler profile
- Migration v9 adds idx_applications_user_flat_started on
  (user_id, flat_id, started_at DESC). Covers latest_applications_by_flat
  inner GROUP BY and the outer JOIN without a table scan.
- Push the protokoll date range into SQL instead of pulling 5000 rows
  into Python and filtering there: new audit_in_range / errors_in_range
  helpers with a shared _range_filter_rows impl. Protokoll page limits
  500, CSV export 5000.
- _row_to_profile collapses to `dict(profile_row)`. ProfileModel (Pydantic)
  already validates and coerces types on the apply side, extras ignored.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-21 19:16:45 +02:00
eb73b5e415 correctness batch: atomic writes, task refs, hmac, import-star, pickle
Per review §2:

- web/db.py: new _tx() context manager wraps multi-statement writers in
  BEGIN IMMEDIATE … COMMIT/ROLLBACK (our connections run in autocommit
  mode, so plain `with _lock:` doesn't give atomicity). partnership_accept
  (UPDATE + DELETE) and cleanup_retention (3 deletes/updates) now use it.
- Fire-and-forget tasks: add module-level _bg_tasks sets in web/app.py and
  web/enrichment.py. A _spawn() helper holds a strong ref until the task
  finishes so the GC can't drop it mid-flight (CPython's event loop only
  weakly references pending tasks).
- apply/main.py: require_api_key uses hmac.compare_digest, matching web's
  check. Also imports now use explicit names instead of `from settings *`.
- apply/language.py: replace `from settings import *` + `from paths import *`
  with explicit imports — this is the pattern that caused the LANGUAGE
  NameError earlier.
- alert/utils.py: pickle-based hash_any_object → deterministic JSON+sha256.
  Cheaper, portable across Python versions, no pickle attack surface.
- web/notifications.py: /fehler links repointed to /bewerbungen (the
  former page doesn't exist).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-21 19:14:26 +02:00
8 changed files with 113 additions and 45 deletions

View file

@ -1,6 +1,11 @@
import hashlib import hashlib
import pickle import json
def hash_any_object(var):
byte_stream = pickle.dumps(var) def hash_any_object(var) -> str:
return hashlib.sha256(byte_stream).hexdigest() """Deterministic sha256 over a JSON-encodable value. Replaces the older
pickle-based hash: pickle is nondeterministic across Python versions /
dict-ordering tweaks and pulls more attack surface than we need for a
plain change-detection fingerprint."""
encoded = json.dumps(var, sort_keys=True, ensure_ascii=False, default=str)
return hashlib.sha256(encoded.encode("utf-8")).hexdigest()

View file

@ -1,9 +1,9 @@
import tomllib import tomllib
from settings import *
from paths import *
import logging import logging
from settings import LANGUAGE
from paths import TRANSLATIONS_FILE
logger = logging.getLogger("flat-apply") logger = logging.getLogger("flat-apply")
with open(TRANSLATIONS_FILE, "rb") as f: with open(TRANSLATIONS_FILE, "rb") as f:

View file

@ -1,3 +1,4 @@
import hmac
import logging import logging
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from urllib.parse import urljoin, urlparse from urllib.parse import urljoin, urlparse
@ -71,7 +72,7 @@ class ApplyResponse(BaseModel):
def require_api_key(x_internal_api_key: str | None = Header(default=None)) -> None: def require_api_key(x_internal_api_key: str | None = Header(default=None)) -> None:
if not INTERNAL_API_KEY: if not INTERNAL_API_KEY:
raise HTTPException(status_code=503, detail="INTERNAL_API_KEY not configured") raise HTTPException(status_code=503, detail="INTERNAL_API_KEY not configured")
if x_internal_api_key != INTERNAL_API_KEY: if not x_internal_api_key or not hmac.compare_digest(x_internal_api_key, INTERNAL_API_KEY):
raise HTTPException(status_code=401, detail="invalid api key") raise HTTPException(status_code=401, detail="invalid api key")

View file

@ -69,6 +69,17 @@ logger = logging.getLogger("web")
apply_client = ApplyClient() apply_client = ApplyClient()
# Strong refs for fire-and-forget tasks. asyncio.create_task only weakly
# references tasks from the event loop — the GC could drop them mid-flight.
_bg_tasks: set[asyncio.Task] = set()
def _spawn(coro) -> asyncio.Task:
t = asyncio.create_task(coro)
_bg_tasks.add(t)
t.add_done_callback(_bg_tasks.discard)
return t
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# App + Jinja # App + Jinja
@ -326,7 +337,7 @@ def _kick_apply(user_id: int, flat_id: str, url: str, triggered_by: str) -> None
profile_snapshot=profile, profile_snapshot=profile,
) )
asyncio.create_task(asyncio.to_thread( _spawn(asyncio.to_thread(
_finish_apply_background, app_id, user_id, flat_id, url, profile, submit_forms, _finish_apply_background, app_id, user_id, flat_id, url, profile, submit_forms,
)) ))
@ -784,12 +795,11 @@ def _parse_date_range(from_str: str | None, to_str: str | None) -> tuple[str | N
return start, end return start, end
def _collect_events(start_iso: str | None, end_iso: str | None) -> list[dict]: def _collect_events(start_iso: str | None, end_iso: str | None,
limit: int = 500) -> list[dict]:
users = {row["id"]: row["username"] for row in db.list_users()} users = {row["id"]: row["username"] for row in db.list_users()}
events: list[dict] = [] events: list[dict] = []
for a in db.recent_audit(None, limit=5000): for a in db.audit_in_range(start_iso, end_iso, limit=limit):
if start_iso and a["timestamp"] < start_iso: continue
if end_iso and a["timestamp"] >= end_iso: continue
events.append({ events.append({
"kind": "audit", "ts": a["timestamp"], "source": "web", "kind": "audit", "ts": a["timestamp"], "source": "web",
"actor": a["actor"], "action": a["action"], "actor": a["actor"], "action": a["action"],
@ -797,9 +807,7 @@ def _collect_events(start_iso: str | None, end_iso: str | None) -> list[dict]:
"user": users.get(a["user_id"], ""), "user": users.get(a["user_id"], ""),
"ip": a["ip"] or "", "ip": a["ip"] or "",
}) })
for e in db.recent_errors(None, limit=5000): for e in db.errors_in_range(start_iso, end_iso, limit=limit):
if start_iso and e["timestamp"] < start_iso: continue
if end_iso and e["timestamp"] >= end_iso: continue
events.append({ events.append({
"kind": "error", "ts": e["timestamp"], "source": e["source"], "kind": "error", "ts": e["timestamp"], "source": e["source"],
"actor": e["source"], "action": e["kind"], "actor": e["source"], "action": e["kind"],
@ -852,7 +860,7 @@ def tab_admin(request: Request, section: str):
to_str = q.get("to") or "" to_str = q.get("to") or ""
start_iso, end_iso = _parse_date_range(from_str or None, to_str or None) start_iso, end_iso = _parse_date_range(from_str or None, to_str or None)
ctx.update({ ctx.update({
"events": _collect_events(start_iso, end_iso)[:500], "events": _collect_events(start_iso, end_iso, limit=500),
"from_str": from_str, "to_str": to_str, "from_str": from_str, "to_str": to_str,
}) })
elif section == "benutzer": elif section == "benutzer":
@ -875,7 +883,7 @@ def tab_logs_export(request: Request):
import csv as _csv import csv as _csv
q = request.query_params q = request.query_params
start_iso, end_iso = _parse_date_range(q.get("from") or None, q.get("to") or None) start_iso, end_iso = _parse_date_range(q.get("from") or None, q.get("to") or None)
events = _collect_events(start_iso, end_iso) events = _collect_events(start_iso, end_iso, limit=5000)
buf = io.StringIO() buf = io.StringIO()
w = _csv.writer(buf, delimiter=",", quoting=_csv.QUOTE_MINIMAL) w = _csv.writer(buf, delimiter=",", quoting=_csv.QUOTE_MINIMAL)

View file

@ -7,25 +7,14 @@ logger = logging.getLogger("web.apply_client")
def _row_to_profile(profile_row) -> dict: def _row_to_profile(profile_row) -> dict:
"""Convert a user_profiles row to the apply service Profile dict.""" """Convert a user_profiles row into the payload dict for /apply.
Apply-side ProfileModel (Pydantic) validates + coerces types; we just
hand over whatever the row has. `updated_at` and any other extra keys
are ignored by the model."""
if profile_row is None: if profile_row is None:
return {} return {}
keys = [ return dict(profile_row)
"salutation", "firstname", "lastname", "email", "telephone",
"street", "house_number", "postcode", "city",
"is_possessing_wbs", "wbs_type", "wbs_valid_till",
"wbs_rooms", "wbs_adults", "wbs_children", "is_prio_wbs",
"immomio_email", "immomio_password",
]
d = {}
for k in keys:
try:
d[k] = profile_row[k]
except (KeyError, IndexError):
pass
for k in ("is_possessing_wbs", "is_prio_wbs"):
d[k] = bool(d.get(k) or 0)
return d
class ApplyClient: class ApplyClient:

View file

@ -9,6 +9,7 @@ import json
import logging import logging
import sqlite3 import sqlite3
import threading import threading
from contextlib import contextmanager
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from typing import Any, Iterable, Optional from typing import Any, Iterable, Optional
@ -42,6 +43,23 @@ def _get_conn() -> sqlite3.Connection:
return c return c
@contextmanager
def _tx():
"""Atomic BEGIN IMMEDIATE … COMMIT/ROLLBACK. Required for multi-statement
writes because our connections run in autocommit mode (isolation_level=None).
Combine with _lock to serialize writers and avoid BUSY.
"""
c = _get_conn()
c.execute("BEGIN IMMEDIATE")
try:
yield c
except Exception:
c.execute("ROLLBACK")
raise
else:
c.execute("COMMIT")
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Schema # Schema
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -242,6 +260,11 @@ MIGRATIONS: list[str] = [
CREATE INDEX IF NOT EXISTS idx_partnerships_from ON partnerships(from_user_id); CREATE INDEX IF NOT EXISTS idx_partnerships_from ON partnerships(from_user_id);
CREATE INDEX IF NOT EXISTS idx_partnerships_to ON partnerships(to_user_id); CREATE INDEX IF NOT EXISTS idx_partnerships_to ON partnerships(to_user_id);
""", """,
# 0009: composite index for latest_applications_by_flat + last_application_for_flat
"""
CREATE INDEX IF NOT EXISTS idx_applications_user_flat_started
ON applications(user_id, flat_id, started_at DESC);
""",
] ]
@ -762,6 +785,36 @@ def recent_audit(user_id: Optional[int], limit: int = 100) -> list[sqlite3.Row]:
).fetchall()) ).fetchall())
def _range_filter_rows(table: str, ts_col: str, start_iso: Optional[str],
end_iso: Optional[str], limit: int) -> list[sqlite3.Row]:
"""Date-range filtered fetch from an append-only table. Pushes the
timestamp filter into SQL so we don't drag 5000 rows into Python just
to discard most of them."""
clauses, params = [], []
if start_iso:
clauses.append(f"{ts_col} >= ?")
params.append(start_iso)
if end_iso:
clauses.append(f"{ts_col} < ?")
params.append(end_iso)
where = ("WHERE " + " AND ".join(clauses)) if clauses else ""
params.append(limit)
return list(_get_conn().execute(
f"SELECT * FROM {table} {where} ORDER BY {ts_col} DESC LIMIT ?",
params,
).fetchall())
def audit_in_range(start_iso: Optional[str], end_iso: Optional[str],
limit: int = 500) -> list[sqlite3.Row]:
return _range_filter_rows("audit_log", "timestamp", start_iso, end_iso, limit)
def errors_in_range(start_iso: Optional[str], end_iso: Optional[str],
limit: int = 500) -> list[sqlite3.Row]:
return _range_filter_rows("errors", "timestamp", start_iso, end_iso, limit)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Retention cleanup # Retention cleanup
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -845,13 +898,13 @@ def partnership_accept(request_id: int, user_id: int) -> bool:
if get_accepted_partnership(row["from_user_id"]) or get_accepted_partnership(user_id): if get_accepted_partnership(row["from_user_id"]) or get_accepted_partnership(user_id):
return False return False
partner_id = row["from_user_id"] partner_id = row["from_user_id"]
with _lock: with _lock, _tx() as c:
_get_conn().execute( c.execute(
"UPDATE partnerships SET status = 'accepted', accepted_at = ? WHERE id = ?", "UPDATE partnerships SET status = 'accepted', accepted_at = ? WHERE id = ?",
(now_iso(), request_id), (now_iso(), request_id),
) )
# clean up any stale pending requests touching either user # clean up any stale pending requests touching either user
_get_conn().execute( c.execute(
"""DELETE FROM partnerships """DELETE FROM partnerships
WHERE status = 'pending' WHERE status = 'pending'
AND (from_user_id IN (?, ?) OR to_user_id IN (?, ?))""", AND (from_user_id IN (?, ?) OR to_user_id IN (?, ?))""",
@ -899,12 +952,12 @@ def partner_flat_actions(partner_id: int) -> dict:
def cleanup_retention() -> dict: def cleanup_retention() -> dict:
cutoff = (datetime.now(timezone.utc) - timedelta(days=RETENTION_DAYS)).isoformat(timespec="seconds") cutoff = (datetime.now(timezone.utc) - timedelta(days=RETENTION_DAYS)).isoformat(timespec="seconds")
stats = {} stats = {}
with _lock: with _lock, _tx() as c:
for table in ("errors", "audit_log"): for table in ("errors", "audit_log"):
cur = _get_conn().execute(f"DELETE FROM {table} WHERE timestamp < ?", (cutoff,)) cur = c.execute(f"DELETE FROM {table} WHERE timestamp < ?", (cutoff,))
stats[table] = cur.rowcount stats[table] = cur.rowcount
# Drop forensics from old applications (but keep the row itself for history) # Drop forensics from old applications (but keep the row itself for history)
cur = _get_conn().execute( cur = c.execute(
"UPDATE applications SET forensics_json = NULL WHERE started_at < ? AND forensics_json IS NOT NULL", "UPDATE applications SET forensics_json = NULL WHERE started_at < ? AND forensics_json IS NOT NULL",
(cutoff,), (cutoff,),
) )

View file

@ -218,8 +218,20 @@ def _record_failure(flat_id: str, step: str, reason: str) -> None:
pass pass
# Holding strong references to every spawned task so asyncio doesn't GC them
# mid-flight. create_task only weakly references tasks from the event loop.
_bg_tasks: set[asyncio.Task] = set()
def _spawn(coro) -> asyncio.Task:
t = asyncio.create_task(coro)
_bg_tasks.add(t)
t.add_done_callback(_bg_tasks.discard)
return t
def kick(flat_id: str) -> None: def kick(flat_id: str) -> None:
asyncio.create_task(asyncio.to_thread(enrich_flat_sync, flat_id)) _spawn(asyncio.to_thread(enrich_flat_sync, flat_id))
async def _backfill_runner() -> None: async def _backfill_runner() -> None:
@ -234,5 +246,5 @@ async def _backfill_runner() -> None:
def kick_backfill() -> int: def kick_backfill() -> int:
pending = db.flats_needing_enrichment(limit=200) pending = db.flats_needing_enrichment(limit=200)
asyncio.create_task(_backfill_runner()) _spawn(_backfill_runner())
return len(pending) return len(pending)

View file

@ -84,8 +84,8 @@ def on_apply_ok(user_id: int, flat: dict, message: str) -> None:
def on_apply_fail(user_id: int, flat: dict, message: str) -> None: def on_apply_fail(user_id: int, flat: dict, message: str) -> None:
addr = flat.get("address") or flat.get("link") addr = flat.get("address") or flat.get("link")
body = f"Bewerbung fehlgeschlagen: {addr}\n{message}\n{PUBLIC_URL}/fehler" body = f"Bewerbung fehlgeschlagen: {addr}\n{message}\n{PUBLIC_URL}/bewerbungen"
md = (f"*Bewerbung fehlgeschlagen*\n{addr}\n{message}\n" md = (f"*Bewerbung fehlgeschlagen*\n{addr}\n{message}\n"
f"[Fehler ansehen]({PUBLIC_URL}/fehler)") f"[Bewerbungen ansehen]({PUBLIC_URL}/bewerbungen)")
notify_user(user_id, "apply_fail", subject="[wohnungsdidi] Bewerbung fehlgeschlagen", notify_user(user_id, "apply_fail", subject="[wohnungsdidi] Bewerbung fehlgeschlagen",
body_plain=body, body_markdown=md) body_plain=body, body_markdown=md)