* DB: users + user_profiles/filters/notifications/preferences; applications gets user_id + forensics_json + profile_snapshot_json; new errors table with 14d retention; schema versioning via MIGRATIONS list * auth: password hashes in DB (argon2); env vars seed first admin; per-user sessions; CSRF bound to user id * apply: personal info/WBS moved out of env into the request body; providers take an ApplyContext with Profile + submit_forms; full Playwright recorder (step log, console, page errors, network, screenshots, final HTML) * web: five top-level tabs (Wohnungen/Bewerbungen/Logs/Fehler/Einstellungen); settings sub-tabs profil/filter/benachrichtigungen/account/benutzer; per-user matching, auto-apply and notifications (UI/Telegram/SMTP); red auto-apply switch on Wohnungen tab; forensics detail view for bewerbungen and fehler; retention background thread Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
52 lines
1.6 KiB
Python
52 lines
1.6 KiB
Python
"""
|
|
Per-user filter matching.
|
|
|
|
Each user has one row in user_filters. A flat matches the user if all
|
|
of the user's non-null constraints are satisfied. Empty filters = matches all.
|
|
"""
|
|
import logging
|
|
from typing import Iterable
|
|
|
|
logger = logging.getLogger("web.matching")
|
|
|
|
|
|
def flat_matches_filter(flat: dict, f: dict | None) -> bool:
|
|
"""f is a user_filters row converted to dict (or None = no filter set)."""
|
|
if not f:
|
|
return True
|
|
|
|
rooms = flat.get("rooms") or 0.0
|
|
rent = flat.get("total_rent") or 0.0
|
|
size = flat.get("size") or 0.0
|
|
commute = (flat.get("connectivity") or {}).get("morning_time") or 0.0
|
|
wbs_str = str(flat.get("wbs", "")).strip().lower()
|
|
|
|
if f.get("rooms_min") is not None and rooms < float(f["rooms_min"]):
|
|
return False
|
|
if f.get("rooms_max") is not None and rooms > float(f["rooms_max"]):
|
|
return False
|
|
if f.get("max_rent") is not None and rent > float(f["max_rent"]):
|
|
return False
|
|
if f.get("min_size") is not None and size < float(f["min_size"]):
|
|
return False
|
|
if f.get("max_morning_commute") is not None and commute > float(f["max_morning_commute"]):
|
|
return False
|
|
|
|
wbs_req = (f.get("wbs_required") or "").strip().lower()
|
|
if wbs_req == "yes":
|
|
if not wbs_str or wbs_str in ("kein", "nein", "no", "ohne", "-"):
|
|
return False
|
|
elif wbs_req == "no":
|
|
if wbs_str and wbs_str not in ("kein", "nein", "no", "ohne", "-"):
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def row_to_dict(row) -> dict:
|
|
if row is None:
|
|
return {}
|
|
try:
|
|
return {k: row[k] for k in row.keys()}
|
|
except Exception:
|
|
return dict(row)
|