Adds a collapsible 12-Bezirk checkbox list to the filter tab. The UI speaks Bezirke; internally the match runs on the PLZ extracted from flat.address and resolved to a dominant Bezirk via a curated 187-PLZ map (berlin_districts.py). - Migration 0011 adds user_filters.districts (CSV of selected names) - Empty stored value = no filter = all Bezirke ticked in the UI. Submitting "all ticked" or "none ticked" both normalise to empty so the defaults and the nuclear state mean the same thing. - When a Bezirk filter is active, flats with an unknown/unmapped PLZ are excluded — if the user bothered to narrow by district, sneaking in unplaceable flats would be the wrong default. - Filter summary on the Wohnungen page shows "N Bezirke" so it's visible the filter is active. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
311 lines
11 KiB
Python
311 lines
11 KiB
Python
"""Shared helpers and singletons used by the route modules.
|
||
|
||
Extracted from the old monolithic app.py. Signatures and behaviour are kept
|
||
byte-for-byte identical with the original helpers so the routers can import
|
||
from here without any behavioural change.
|
||
"""
|
||
import asyncio
|
||
import hmac
|
||
import logging
|
||
from datetime import datetime, timedelta, timezone
|
||
from typing import Any
|
||
|
||
from fastapi import Depends, HTTPException, Header, Request
|
||
from fastapi.templating import Jinja2Templates
|
||
|
||
try:
|
||
from zoneinfo import ZoneInfo
|
||
BERLIN_TZ = ZoneInfo("Europe/Berlin")
|
||
except Exception:
|
||
BERLIN_TZ = timezone.utc
|
||
|
||
import db
|
||
import notifications
|
||
from apply_client import ApplyClient, _row_to_profile
|
||
from auth import issue_csrf_token
|
||
from settings import APPLY_FAILURE_THRESHOLD, INTERNAL_API_KEY
|
||
|
||
|
||
logger = logging.getLogger("web")
|
||
|
||
# Jinja is instantiated here so every router uses the same environment and
|
||
# the filters registered in app.py (de_dt, iso_utc, flat_slug) are visible
|
||
# to every template rendered via this instance.
|
||
templates = Jinja2Templates(directory="templates")
|
||
|
||
apply_client = ApplyClient()
|
||
|
||
# Strong refs for fire-and-forget tasks. asyncio.create_task only weakly
|
||
# references tasks from the event loop — the GC could drop them mid-flight.
|
||
_bg_tasks: set[asyncio.Task] = set()
|
||
|
||
|
||
def _spawn(coro) -> asyncio.Task:
|
||
t = asyncio.create_task(coro)
|
||
_bg_tasks.add(t)
|
||
t.add_done_callback(_bg_tasks.discard)
|
||
return t
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Time helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _parse_iso(s: str | None) -> datetime | None:
|
||
if not s:
|
||
return None
|
||
try:
|
||
# Python handles +00:00 but not trailing Z. Storage always uses +00:00.
|
||
return datetime.fromisoformat(s)
|
||
except ValueError:
|
||
return None
|
||
|
||
|
||
def _de_dt(s: str | None) -> str:
|
||
"""UTC ISO → 'DD.MM.YYYY HH:MM' in Europe/Berlin."""
|
||
dt = _parse_iso(s)
|
||
if dt is None:
|
||
return ""
|
||
if dt.tzinfo is None:
|
||
dt = dt.replace(tzinfo=timezone.utc)
|
||
return dt.astimezone(BERLIN_TZ).strftime("%d.%m.%Y %H:%M")
|
||
|
||
|
||
def _iso_utc(s: str | None) -> str:
|
||
"""Pass-through of a UTC ISO string (for data-attr in JS)."""
|
||
dt = _parse_iso(s)
|
||
if dt is None:
|
||
return ""
|
||
if dt.tzinfo is None:
|
||
dt = dt.replace(tzinfo=timezone.utc)
|
||
return dt.astimezone(timezone.utc).isoformat(timespec="seconds")
|
||
|
||
|
||
def _last_scrape_utc() -> str:
|
||
hb = db.get_state("last_alert_heartbeat")
|
||
dt = _parse_iso(hb)
|
||
if dt is None:
|
||
return ""
|
||
if dt.tzinfo is None:
|
||
dt = dt.replace(tzinfo=timezone.utc)
|
||
return dt.astimezone(timezone.utc).isoformat(timespec="seconds")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Request helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def client_ip(request: Request) -> str:
|
||
xff = request.headers.get("x-forwarded-for")
|
||
if xff:
|
||
return xff.split(",")[0].strip()
|
||
return request.client.host if request.client else "unknown"
|
||
|
||
|
||
def require_internal(x_internal_api_key: str | None = Header(default=None)) -> None:
|
||
if not x_internal_api_key or not hmac.compare_digest(x_internal_api_key, INTERNAL_API_KEY):
|
||
raise HTTPException(status_code=401, detail="invalid internal key")
|
||
|
||
|
||
def base_context(request: Request, user, active_tab: str) -> dict:
|
||
return {
|
||
"request": request,
|
||
"user": user,
|
||
"csrf": issue_csrf_token(user["id"]),
|
||
"active_tab": active_tab,
|
||
"is_admin": bool(user["is_admin"]),
|
||
}
|
||
|
||
|
||
def _is_htmx(request: Request) -> bool:
|
||
return request.headers.get("hx-request", "").lower() == "true"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Filter helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
FILTER_KEYS = ("rooms_min", "rooms_max", "max_rent", "min_size",
|
||
"wbs_required", "max_age_hours", "districts")
|
||
|
||
|
||
def _has_filters(f) -> bool:
|
||
if not f:
|
||
return False
|
||
for k in FILTER_KEYS:
|
||
v = f[k] if hasattr(f, "keys") else None
|
||
if v not in (None, "", 0, 0.0):
|
||
return True
|
||
return False
|
||
|
||
|
||
def _alert_status(notifications_row) -> tuple[str, str]:
|
||
"""Return (label, chip_kind) for the user's alarm (notification) setup.
|
||
|
||
'aktiv' only if a real push channel is configured with credentials.
|
||
'ui' is not a real alarm — the dashboard already shows matches when you
|
||
happen to be looking.
|
||
"""
|
||
if not notifications_row:
|
||
return "nicht eingerichtet", "warn"
|
||
ch = (notifications_row["channel"] or "ui").strip()
|
||
if ch == "telegram":
|
||
if notifications_row["telegram_bot_token"] and notifications_row["telegram_chat_id"]:
|
||
return "aktiv (Telegram)", "ok"
|
||
return "unvollständig", "warn"
|
||
return "nicht eingerichtet", "warn"
|
||
|
||
|
||
def _filter_summary(f) -> str:
|
||
if not _has_filters(f):
|
||
return "—"
|
||
parts = []
|
||
rmin, rmax = f["rooms_min"], f["rooms_max"]
|
||
if rmin or rmax:
|
||
def fmt(x):
|
||
return "–" if x is None else ("%g" % x)
|
||
parts.append(f"{fmt(rmin)}–{fmt(rmax)} Zi")
|
||
if f["max_rent"]:
|
||
parts.append(f"≤ {int(f['max_rent'])} €")
|
||
if f["min_size"]:
|
||
parts.append(f"≥ {int(f['min_size'])} m²")
|
||
if f["wbs_required"] == "yes":
|
||
parts.append("WBS")
|
||
elif f["wbs_required"] == "no":
|
||
parts.append("ohne WBS")
|
||
if f["max_age_hours"]:
|
||
parts.append(f"≤ {int(f['max_age_hours'])} h alt")
|
||
try:
|
||
districts_csv = (f["districts"] or "").strip()
|
||
except (KeyError, IndexError):
|
||
districts_csv = ""
|
||
if districts_csv:
|
||
n = sum(1 for d in districts_csv.split(",") if d.strip())
|
||
parts.append(f"{n} Bezirk{'e' if n != 1 else ''}")
|
||
return " · ".join(parts)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Apply gates and orchestration
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _manual_apply_allowed() -> tuple[bool, str]:
|
||
"""Manual 'Bewerben' button is only blocked if the apply service is down."""
|
||
if not apply_client.health():
|
||
return False, "apply-service nicht erreichbar"
|
||
return True, ""
|
||
|
||
|
||
def _auto_apply_allowed(prefs) -> bool:
|
||
"""Auto-apply trigger gate: user must have enabled it and the circuit must be closed."""
|
||
if not prefs["auto_apply_enabled"]:
|
||
return False
|
||
if prefs["apply_circuit_open"]:
|
||
return False
|
||
return apply_client.health()
|
||
|
||
|
||
def _has_running_application(user_id: int) -> bool:
|
||
return db.has_running_application(user_id)
|
||
|
||
|
||
# Apply returns success=False with one of these phrases when the listing has
|
||
# been taken down or deactivated. We treat that as "the flat is gone" rather
|
||
# than "the apply pipeline is broken" — no circuit-breaker hit, no fail
|
||
# notification, and the flat is hidden from every user's list.
|
||
_OFFLINE_MESSAGE_MARKERS = (
|
||
"inserat offline", "inserat deaktiviert", # de
|
||
"ad offline", "ad deactivated", # en
|
||
)
|
||
|
||
|
||
def _is_offline_result(message: str) -> bool:
|
||
if not message:
|
||
return False
|
||
m = message.lower()
|
||
return any(marker in m for marker in _OFFLINE_MESSAGE_MARKERS)
|
||
|
||
|
||
def _finish_apply_background(app_id: int, user_id: int, flat_id: str, url: str,
|
||
profile: dict, submit_forms: bool) -> None:
|
||
"""Called on a worker thread AFTER the application row already exists.
|
||
The HTMX response has already shipped the running state to the user."""
|
||
logger.info("apply.running user=%s flat=%s application=%s submit=%s",
|
||
user_id, flat_id, app_id, submit_forms)
|
||
result = apply_client.apply(url=url, profile=profile,
|
||
submit_forms=submit_forms, application_id=app_id)
|
||
success = bool(result.get("success"))
|
||
message = result.get("message", "")
|
||
provider = result.get("provider", "")
|
||
forensics = result.get("forensics") or {}
|
||
|
||
db.finish_application(app_id, success=success, message=message,
|
||
provider=provider, forensics=forensics)
|
||
|
||
offline = not success and _is_offline_result(message)
|
||
if offline:
|
||
db.mark_flat_offline(flat_id)
|
||
|
||
prefs = db.get_preferences(user_id)
|
||
if success or offline:
|
||
# Offline isn't a pipeline failure — reset the counter so a streak of
|
||
# stale listings doesn't trip the circuit breaker.
|
||
db.update_preferences(user_id, {"apply_recent_failures": 0})
|
||
else:
|
||
failures = int(prefs["apply_recent_failures"] or 0) + 1
|
||
updates = {"apply_recent_failures": failures}
|
||
if failures >= APPLY_FAILURE_THRESHOLD:
|
||
updates["apply_circuit_open"] = 1
|
||
db.log_error(source="apply", kind="circuit_open", user_id=user_id,
|
||
summary=f"{failures} aufeinanderfolgende Fehler",
|
||
application_id=app_id)
|
||
db.update_preferences(user_id, updates)
|
||
db.log_error(source="apply", kind="apply_failure", user_id=user_id,
|
||
summary=message or "Bewerbung fehlgeschlagen",
|
||
application_id=app_id,
|
||
context={"provider": provider, "url": url})
|
||
|
||
flat = db.get_flat(flat_id)
|
||
flat_dict = {"address": flat["address"] if flat else "", "link": url,
|
||
"rooms": flat["rooms"] if flat else None,
|
||
"total_rent": flat["total_rent"] if flat else None}
|
||
if success:
|
||
notifications.on_apply_ok(user_id, flat_dict, message)
|
||
elif not offline:
|
||
notifications.on_apply_fail(user_id, flat_dict, message)
|
||
|
||
outcome = "success" if success else ("offline" if offline else "failure")
|
||
db.log_audit("system", "apply_finished",
|
||
f"app={app_id} outcome={outcome}", user_id=user_id)
|
||
|
||
|
||
def _kick_apply(user_id: int, flat_id: str, url: str, triggered_by: str) -> None:
|
||
"""Insert the application row synchronously so the immediate HTMX response
|
||
already renders the row's "läuft…" state. The long-running Playwright call
|
||
is then offloaded to a background thread."""
|
||
prefs = db.get_preferences(user_id)
|
||
profile_row = db.get_profile(user_id)
|
||
profile = _row_to_profile(profile_row)
|
||
submit_forms = bool(prefs["submit_forms"])
|
||
|
||
app_id = db.start_application(
|
||
user_id=user_id, flat_id=flat_id, url=url,
|
||
triggered_by=triggered_by, submit_forms=submit_forms,
|
||
profile_snapshot=profile,
|
||
)
|
||
|
||
_spawn(asyncio.to_thread(
|
||
_finish_apply_background, app_id, user_id, flat_id, url, profile, submit_forms,
|
||
))
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Misc
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _mask_secret(value: str) -> str:
|
||
if not value:
|
||
return ""
|
||
if len(value) <= 10:
|
||
return "•" * len(value)
|
||
return value[:6] + "…" + value[-4:]
|