Compare commits

..

No commits in common. "cb617dd38aecfe58a18c22072207081194f91de3" and "77098c82df02aec2df47cfb3b310e75d8af97e5a" have entirely different histories.

8 changed files with 45 additions and 113 deletions

View file

@ -1,11 +1,6 @@
import hashlib import hashlib
import json import pickle
def hash_any_object(var):
def hash_any_object(var) -> str: byte_stream = pickle.dumps(var)
"""Deterministic sha256 over a JSON-encodable value. Replaces the older return hashlib.sha256(byte_stream).hexdigest()
pickle-based hash: pickle is nondeterministic across Python versions /
dict-ordering tweaks and pulls more attack surface than we need for a
plain change-detection fingerprint."""
encoded = json.dumps(var, sort_keys=True, ensure_ascii=False, default=str)
return hashlib.sha256(encoded.encode("utf-8")).hexdigest()

View file

@ -1,8 +1,8 @@
import tomllib import tomllib
import logging
from settings import LANGUAGE from settings import *
from paths import TRANSLATIONS_FILE from paths import *
import logging
logger = logging.getLogger("flat-apply") logger = logging.getLogger("flat-apply")

View file

@ -1,4 +1,3 @@
import hmac
import logging import logging
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from urllib.parse import urljoin, urlparse from urllib.parse import urljoin, urlparse
@ -72,7 +71,7 @@ class ApplyResponse(BaseModel):
def require_api_key(x_internal_api_key: str | None = Header(default=None)) -> None: def require_api_key(x_internal_api_key: str | None = Header(default=None)) -> None:
if not INTERNAL_API_KEY: if not INTERNAL_API_KEY:
raise HTTPException(status_code=503, detail="INTERNAL_API_KEY not configured") raise HTTPException(status_code=503, detail="INTERNAL_API_KEY not configured")
if not x_internal_api_key or not hmac.compare_digest(x_internal_api_key, INTERNAL_API_KEY): if x_internal_api_key != INTERNAL_API_KEY:
raise HTTPException(status_code=401, detail="invalid api key") raise HTTPException(status_code=401, detail="invalid api key")

View file

@ -69,17 +69,6 @@ logger = logging.getLogger("web")
apply_client = ApplyClient() apply_client = ApplyClient()
# Strong refs for fire-and-forget tasks. asyncio.create_task only weakly
# references tasks from the event loop — the GC could drop them mid-flight.
_bg_tasks: set[asyncio.Task] = set()
def _spawn(coro) -> asyncio.Task:
t = asyncio.create_task(coro)
_bg_tasks.add(t)
t.add_done_callback(_bg_tasks.discard)
return t
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# App + Jinja # App + Jinja
@ -337,7 +326,7 @@ def _kick_apply(user_id: int, flat_id: str, url: str, triggered_by: str) -> None
profile_snapshot=profile, profile_snapshot=profile,
) )
_spawn(asyncio.to_thread( asyncio.create_task(asyncio.to_thread(
_finish_apply_background, app_id, user_id, flat_id, url, profile, submit_forms, _finish_apply_background, app_id, user_id, flat_id, url, profile, submit_forms,
)) ))
@ -795,11 +784,12 @@ def _parse_date_range(from_str: str | None, to_str: str | None) -> tuple[str | N
return start, end return start, end
def _collect_events(start_iso: str | None, end_iso: str | None, def _collect_events(start_iso: str | None, end_iso: str | None) -> list[dict]:
limit: int = 500) -> list[dict]:
users = {row["id"]: row["username"] for row in db.list_users()} users = {row["id"]: row["username"] for row in db.list_users()}
events: list[dict] = [] events: list[dict] = []
for a in db.audit_in_range(start_iso, end_iso, limit=limit): for a in db.recent_audit(None, limit=5000):
if start_iso and a["timestamp"] < start_iso: continue
if end_iso and a["timestamp"] >= end_iso: continue
events.append({ events.append({
"kind": "audit", "ts": a["timestamp"], "source": "web", "kind": "audit", "ts": a["timestamp"], "source": "web",
"actor": a["actor"], "action": a["action"], "actor": a["actor"], "action": a["action"],
@ -807,7 +797,9 @@ def _collect_events(start_iso: str | None, end_iso: str | None,
"user": users.get(a["user_id"], ""), "user": users.get(a["user_id"], ""),
"ip": a["ip"] or "", "ip": a["ip"] or "",
}) })
for e in db.errors_in_range(start_iso, end_iso, limit=limit): for e in db.recent_errors(None, limit=5000):
if start_iso and e["timestamp"] < start_iso: continue
if end_iso and e["timestamp"] >= end_iso: continue
events.append({ events.append({
"kind": "error", "ts": e["timestamp"], "source": e["source"], "kind": "error", "ts": e["timestamp"], "source": e["source"],
"actor": e["source"], "action": e["kind"], "actor": e["source"], "action": e["kind"],
@ -860,7 +852,7 @@ def tab_admin(request: Request, section: str):
to_str = q.get("to") or "" to_str = q.get("to") or ""
start_iso, end_iso = _parse_date_range(from_str or None, to_str or None) start_iso, end_iso = _parse_date_range(from_str or None, to_str or None)
ctx.update({ ctx.update({
"events": _collect_events(start_iso, end_iso, limit=500), "events": _collect_events(start_iso, end_iso)[:500],
"from_str": from_str, "to_str": to_str, "from_str": from_str, "to_str": to_str,
}) })
elif section == "benutzer": elif section == "benutzer":
@ -883,7 +875,7 @@ def tab_logs_export(request: Request):
import csv as _csv import csv as _csv
q = request.query_params q = request.query_params
start_iso, end_iso = _parse_date_range(q.get("from") or None, q.get("to") or None) start_iso, end_iso = _parse_date_range(q.get("from") or None, q.get("to") or None)
events = _collect_events(start_iso, end_iso, limit=5000) events = _collect_events(start_iso, end_iso)
buf = io.StringIO() buf = io.StringIO()
w = _csv.writer(buf, delimiter=",", quoting=_csv.QUOTE_MINIMAL) w = _csv.writer(buf, delimiter=",", quoting=_csv.QUOTE_MINIMAL)

View file

@ -7,14 +7,25 @@ logger = logging.getLogger("web.apply_client")
def _row_to_profile(profile_row) -> dict: def _row_to_profile(profile_row) -> dict:
"""Convert a user_profiles row into the payload dict for /apply. """Convert a user_profiles row to the apply service Profile dict."""
Apply-side ProfileModel (Pydantic) validates + coerces types; we just
hand over whatever the row has. `updated_at` and any other extra keys
are ignored by the model."""
if profile_row is None: if profile_row is None:
return {} return {}
return dict(profile_row) keys = [
"salutation", "firstname", "lastname", "email", "telephone",
"street", "house_number", "postcode", "city",
"is_possessing_wbs", "wbs_type", "wbs_valid_till",
"wbs_rooms", "wbs_adults", "wbs_children", "is_prio_wbs",
"immomio_email", "immomio_password",
]
d = {}
for k in keys:
try:
d[k] = profile_row[k]
except (KeyError, IndexError):
pass
for k in ("is_possessing_wbs", "is_prio_wbs"):
d[k] = bool(d.get(k) or 0)
return d
class ApplyClient: class ApplyClient:

View file

@ -9,7 +9,6 @@ import json
import logging import logging
import sqlite3 import sqlite3
import threading import threading
from contextlib import contextmanager
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from typing import Any, Iterable, Optional from typing import Any, Iterable, Optional
@ -43,23 +42,6 @@ def _get_conn() -> sqlite3.Connection:
return c return c
@contextmanager
def _tx():
"""Atomic BEGIN IMMEDIATE … COMMIT/ROLLBACK. Required for multi-statement
writes because our connections run in autocommit mode (isolation_level=None).
Combine with _lock to serialize writers and avoid BUSY.
"""
c = _get_conn()
c.execute("BEGIN IMMEDIATE")
try:
yield c
except Exception:
c.execute("ROLLBACK")
raise
else:
c.execute("COMMIT")
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Schema # Schema
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -260,11 +242,6 @@ MIGRATIONS: list[str] = [
CREATE INDEX IF NOT EXISTS idx_partnerships_from ON partnerships(from_user_id); CREATE INDEX IF NOT EXISTS idx_partnerships_from ON partnerships(from_user_id);
CREATE INDEX IF NOT EXISTS idx_partnerships_to ON partnerships(to_user_id); CREATE INDEX IF NOT EXISTS idx_partnerships_to ON partnerships(to_user_id);
""", """,
# 0009: composite index for latest_applications_by_flat + last_application_for_flat
"""
CREATE INDEX IF NOT EXISTS idx_applications_user_flat_started
ON applications(user_id, flat_id, started_at DESC);
""",
] ]
@ -785,36 +762,6 @@ def recent_audit(user_id: Optional[int], limit: int = 100) -> list[sqlite3.Row]:
).fetchall()) ).fetchall())
def _range_filter_rows(table: str, ts_col: str, start_iso: Optional[str],
end_iso: Optional[str], limit: int) -> list[sqlite3.Row]:
"""Date-range filtered fetch from an append-only table. Pushes the
timestamp filter into SQL so we don't drag 5000 rows into Python just
to discard most of them."""
clauses, params = [], []
if start_iso:
clauses.append(f"{ts_col} >= ?")
params.append(start_iso)
if end_iso:
clauses.append(f"{ts_col} < ?")
params.append(end_iso)
where = ("WHERE " + " AND ".join(clauses)) if clauses else ""
params.append(limit)
return list(_get_conn().execute(
f"SELECT * FROM {table} {where} ORDER BY {ts_col} DESC LIMIT ?",
params,
).fetchall())
def audit_in_range(start_iso: Optional[str], end_iso: Optional[str],
limit: int = 500) -> list[sqlite3.Row]:
return _range_filter_rows("audit_log", "timestamp", start_iso, end_iso, limit)
def errors_in_range(start_iso: Optional[str], end_iso: Optional[str],
limit: int = 500) -> list[sqlite3.Row]:
return _range_filter_rows("errors", "timestamp", start_iso, end_iso, limit)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Retention cleanup # Retention cleanup
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -898,13 +845,13 @@ def partnership_accept(request_id: int, user_id: int) -> bool:
if get_accepted_partnership(row["from_user_id"]) or get_accepted_partnership(user_id): if get_accepted_partnership(row["from_user_id"]) or get_accepted_partnership(user_id):
return False return False
partner_id = row["from_user_id"] partner_id = row["from_user_id"]
with _lock, _tx() as c: with _lock:
c.execute( _get_conn().execute(
"UPDATE partnerships SET status = 'accepted', accepted_at = ? WHERE id = ?", "UPDATE partnerships SET status = 'accepted', accepted_at = ? WHERE id = ?",
(now_iso(), request_id), (now_iso(), request_id),
) )
# clean up any stale pending requests touching either user # clean up any stale pending requests touching either user
c.execute( _get_conn().execute(
"""DELETE FROM partnerships """DELETE FROM partnerships
WHERE status = 'pending' WHERE status = 'pending'
AND (from_user_id IN (?, ?) OR to_user_id IN (?, ?))""", AND (from_user_id IN (?, ?) OR to_user_id IN (?, ?))""",
@ -952,12 +899,12 @@ def partner_flat_actions(partner_id: int) -> dict:
def cleanup_retention() -> dict: def cleanup_retention() -> dict:
cutoff = (datetime.now(timezone.utc) - timedelta(days=RETENTION_DAYS)).isoformat(timespec="seconds") cutoff = (datetime.now(timezone.utc) - timedelta(days=RETENTION_DAYS)).isoformat(timespec="seconds")
stats = {} stats = {}
with _lock, _tx() as c: with _lock:
for table in ("errors", "audit_log"): for table in ("errors", "audit_log"):
cur = c.execute(f"DELETE FROM {table} WHERE timestamp < ?", (cutoff,)) cur = _get_conn().execute(f"DELETE FROM {table} WHERE timestamp < ?", (cutoff,))
stats[table] = cur.rowcount stats[table] = cur.rowcount
# Drop forensics from old applications (but keep the row itself for history) # Drop forensics from old applications (but keep the row itself for history)
cur = c.execute( cur = _get_conn().execute(
"UPDATE applications SET forensics_json = NULL WHERE started_at < ? AND forensics_json IS NOT NULL", "UPDATE applications SET forensics_json = NULL WHERE started_at < ? AND forensics_json IS NOT NULL",
(cutoff,), (cutoff,),
) )

View file

@ -218,20 +218,8 @@ def _record_failure(flat_id: str, step: str, reason: str) -> None:
pass pass
# Holding strong references to every spawned task so asyncio doesn't GC them
# mid-flight. create_task only weakly references tasks from the event loop.
_bg_tasks: set[asyncio.Task] = set()
def _spawn(coro) -> asyncio.Task:
t = asyncio.create_task(coro)
_bg_tasks.add(t)
t.add_done_callback(_bg_tasks.discard)
return t
def kick(flat_id: str) -> None: def kick(flat_id: str) -> None:
_spawn(asyncio.to_thread(enrich_flat_sync, flat_id)) asyncio.create_task(asyncio.to_thread(enrich_flat_sync, flat_id))
async def _backfill_runner() -> None: async def _backfill_runner() -> None:
@ -246,5 +234,5 @@ async def _backfill_runner() -> None:
def kick_backfill() -> int: def kick_backfill() -> int:
pending = db.flats_needing_enrichment(limit=200) pending = db.flats_needing_enrichment(limit=200)
_spawn(_backfill_runner()) asyncio.create_task(_backfill_runner())
return len(pending) return len(pending)

View file

@ -84,8 +84,8 @@ def on_apply_ok(user_id: int, flat: dict, message: str) -> None:
def on_apply_fail(user_id: int, flat: dict, message: str) -> None: def on_apply_fail(user_id: int, flat: dict, message: str) -> None:
addr = flat.get("address") or flat.get("link") addr = flat.get("address") or flat.get("link")
body = f"Bewerbung fehlgeschlagen: {addr}\n{message}\n{PUBLIC_URL}/bewerbungen" body = f"Bewerbung fehlgeschlagen: {addr}\n{message}\n{PUBLIC_URL}/fehler"
md = (f"*Bewerbung fehlgeschlagen*\n{addr}\n{message}\n" md = (f"*Bewerbung fehlgeschlagen*\n{addr}\n{message}\n"
f"[Bewerbungen ansehen]({PUBLIC_URL}/bewerbungen)") f"[Fehler ansehen]({PUBLIC_URL}/fehler)")
notify_user(user_id, "apply_fail", subject="[wohnungsdidi] Bewerbung fehlgeschlagen", notify_user(user_id, "apply_fail", subject="[wohnungsdidi] Bewerbung fehlgeschlagen",
body_plain=body, body_markdown=md) body_plain=body, body_markdown=md)