lazyflat/web/common.py
EiSiMo f1e26b38d0 refactor: split web/app.py into routers
app.py was ~1300 lines with every route, helper, and middleware mixed
together. Split into:

- app.py (~100 lines): FastAPI bootstrap, lifespan, /health, security
  headers, Jinja filter registration, include_router calls
- common.py: shared helpers (templates, apply_client, base_context,
  _is_htmx, client_ip, require_internal, time helpers, filter helpers,
  apply-gate helpers, _kick_apply / _finish_apply_background,
  _bg_tasks, _spawn, _mask_secret, _has_running_application, BERLIN_TZ)
- routes/auth.py: /login (GET+POST), /logout
- routes/wohnungen.py: /, /partials/wohnungen, /partials/wohnung/{id},
  /flat-images/{slug}/{idx}, /actions/apply|reject|unreject|auto-apply|
  submit-forms|reset-circuit|filters|enrich-all|enrich-flat; owns
  _wohnungen_context + _wohnungen_partial_or_redirect
- routes/bewerbungen.py: /bewerbungen, /bewerbungen/{id}/report.zip
- routes/einstellungen.py: /einstellungen, /einstellungen/{section},
  /actions/profile|notifications|account/password|partner/*; owns
  VALID_SECTIONS
- routes/admin.py: /logs redirect, /admin, /admin/{section},
  /logs/export.csv, /actions/users/*|secrets; owns ADMIN_SECTIONS,
  _parse_date_range, _collect_events
- routes/internal.py: /internal/flats|heartbeat|error|secrets

Route-diff before/after is empty — all 41 routes + /static mount
preserved. No behavior changes, pure mechanical split.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-21 19:27:12 +02:00

279 lines
9.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Shared helpers and singletons used by the route modules.
Extracted from the old monolithic app.py. Signatures and behaviour are kept
byte-for-byte identical with the original helpers so the routers can import
from here without any behavioural change.
"""
import asyncio
import hmac
import logging
from datetime import datetime, timedelta, timezone
from typing import Any
from fastapi import Depends, HTTPException, Header, Request
from fastapi.templating import Jinja2Templates
try:
from zoneinfo import ZoneInfo
BERLIN_TZ = ZoneInfo("Europe/Berlin")
except Exception:
BERLIN_TZ = timezone.utc
import db
import notifications
from apply_client import ApplyClient, _row_to_profile
from auth import issue_csrf_token
from settings import APPLY_FAILURE_THRESHOLD, INTERNAL_API_KEY
logger = logging.getLogger("web")
# Jinja is instantiated here so every router uses the same environment and
# the filters registered in app.py (de_dt, iso_utc, flat_slug) are visible
# to every template rendered via this instance.
templates = Jinja2Templates(directory="templates")
apply_client = ApplyClient()
# Strong refs for fire-and-forget tasks. asyncio.create_task only weakly
# references tasks from the event loop — the GC could drop them mid-flight.
_bg_tasks: set[asyncio.Task] = set()
def _spawn(coro) -> asyncio.Task:
t = asyncio.create_task(coro)
_bg_tasks.add(t)
t.add_done_callback(_bg_tasks.discard)
return t
# ---------------------------------------------------------------------------
# Time helpers
# ---------------------------------------------------------------------------
def _parse_iso(s: str | None) -> datetime | None:
if not s:
return None
try:
# Python handles +00:00 but not trailing Z. Storage always uses +00:00.
return datetime.fromisoformat(s)
except ValueError:
return None
def _de_dt(s: str | None) -> str:
"""UTC ISO → 'DD.MM.YYYY HH:MM' in Europe/Berlin."""
dt = _parse_iso(s)
if dt is None:
return ""
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(BERLIN_TZ).strftime("%d.%m.%Y %H:%M")
def _iso_utc(s: str | None) -> str:
"""Pass-through of a UTC ISO string (for data-attr in JS)."""
dt = _parse_iso(s)
if dt is None:
return ""
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc).isoformat(timespec="seconds")
def _last_scrape_utc() -> str:
hb = db.get_state("last_alert_heartbeat")
dt = _parse_iso(hb)
if dt is None:
return ""
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc).isoformat(timespec="seconds")
# ---------------------------------------------------------------------------
# Request helpers
# ---------------------------------------------------------------------------
def client_ip(request: Request) -> str:
xff = request.headers.get("x-forwarded-for")
if xff:
return xff.split(",")[0].strip()
return request.client.host if request.client else "unknown"
def require_internal(x_internal_api_key: str | None = Header(default=None)) -> None:
if not x_internal_api_key or not hmac.compare_digest(x_internal_api_key, INTERNAL_API_KEY):
raise HTTPException(status_code=401, detail="invalid internal key")
def base_context(request: Request, user, active_tab: str) -> dict:
return {
"request": request,
"user": user,
"csrf": issue_csrf_token(user["id"]),
"active_tab": active_tab,
"is_admin": bool(user["is_admin"]),
}
def _is_htmx(request: Request) -> bool:
return request.headers.get("hx-request", "").lower() == "true"
# ---------------------------------------------------------------------------
# Filter helpers
# ---------------------------------------------------------------------------
FILTER_KEYS = ("rooms_min", "rooms_max", "max_rent", "min_size",
"wbs_required", "max_age_hours")
def _has_filters(f) -> bool:
if not f:
return False
for k in FILTER_KEYS:
v = f[k] if hasattr(f, "keys") else None
if v not in (None, "", 0, 0.0):
return True
return False
def _alert_status(notifications_row) -> tuple[str, str]:
"""Return (label, chip_kind) for the user's alarm (notification) setup.
'aktiv' only if a real push channel is configured with credentials.
'ui' is not a real alarm — the dashboard already shows matches when you
happen to be looking.
"""
if not notifications_row:
return "nicht eingerichtet", "warn"
ch = (notifications_row["channel"] or "ui").strip()
if ch == "telegram":
if notifications_row["telegram_bot_token"] and notifications_row["telegram_chat_id"]:
return "aktiv (Telegram)", "ok"
return "unvollständig", "warn"
return "nicht eingerichtet", "warn"
def _filter_summary(f) -> str:
if not _has_filters(f):
return ""
parts = []
rmin, rmax = f["rooms_min"], f["rooms_max"]
if rmin or rmax:
def fmt(x):
return "" if x is None else ("%g" % x)
parts.append(f"{fmt(rmin)}{fmt(rmax)} Zi")
if f["max_rent"]:
parts.append(f"{int(f['max_rent'])}")
if f["min_size"]:
parts.append(f"{int(f['min_size'])}")
if f["wbs_required"] == "yes":
parts.append("WBS")
elif f["wbs_required"] == "no":
parts.append("ohne WBS")
if f["max_age_hours"]:
parts.append(f"{int(f['max_age_hours'])} h alt")
return " · ".join(parts)
# ---------------------------------------------------------------------------
# Apply gates and orchestration
# ---------------------------------------------------------------------------
def _manual_apply_allowed() -> tuple[bool, str]:
"""Manual 'Bewerben' button is only blocked if the apply service is down."""
if not apply_client.health():
return False, "apply-service nicht erreichbar"
return True, ""
def _auto_apply_allowed(prefs) -> bool:
"""Auto-apply trigger gate: user must have enabled it and the circuit must be closed."""
if not prefs["auto_apply_enabled"]:
return False
if prefs["apply_circuit_open"]:
return False
return apply_client.health()
def _has_running_application(user_id: int) -> bool:
return db.has_running_application(user_id)
def _finish_apply_background(app_id: int, user_id: int, flat_id: str, url: str,
profile: dict, submit_forms: bool) -> None:
"""Called on a worker thread AFTER the application row already exists.
The HTMX response has already shipped the running state to the user."""
logger.info("apply.running user=%s flat=%s application=%s submit=%s",
user_id, flat_id, app_id, submit_forms)
result = apply_client.apply(url=url, profile=profile,
submit_forms=submit_forms, application_id=app_id)
success = bool(result.get("success"))
message = result.get("message", "")
provider = result.get("provider", "")
forensics = result.get("forensics") or {}
db.finish_application(app_id, success=success, message=message,
provider=provider, forensics=forensics)
prefs = db.get_preferences(user_id)
if success:
db.update_preferences(user_id, {"apply_recent_failures": 0})
else:
failures = int(prefs["apply_recent_failures"] or 0) + 1
updates = {"apply_recent_failures": failures}
if failures >= APPLY_FAILURE_THRESHOLD:
updates["apply_circuit_open"] = 1
db.log_error(source="apply", kind="circuit_open", user_id=user_id,
summary=f"{failures} aufeinanderfolgende Fehler",
application_id=app_id)
db.update_preferences(user_id, updates)
db.log_error(source="apply", kind="apply_failure", user_id=user_id,
summary=message or "Bewerbung fehlgeschlagen",
application_id=app_id,
context={"provider": provider, "url": url})
flat = db.get_flat(flat_id)
flat_dict = {"address": flat["address"] if flat else "", "link": url,
"rooms": flat["rooms"] if flat else None,
"total_rent": flat["total_rent"] if flat else None}
if success:
notifications.on_apply_ok(user_id, flat_dict, message)
else:
notifications.on_apply_fail(user_id, flat_dict, message)
db.log_audit("system", "apply_finished", f"app={app_id} success={success}", user_id=user_id)
def _kick_apply(user_id: int, flat_id: str, url: str, triggered_by: str) -> None:
"""Insert the application row synchronously so the immediate HTMX response
already renders the row's "läuft…" state. The long-running Playwright call
is then offloaded to a background thread."""
prefs = db.get_preferences(user_id)
profile_row = db.get_profile(user_id)
profile = _row_to_profile(profile_row)
submit_forms = bool(prefs["submit_forms"])
app_id = db.start_application(
user_id=user_id, flat_id=flat_id, url=url,
triggered_by=triggered_by, submit_forms=submit_forms,
profile_snapshot=profile,
)
_spawn(asyncio.to_thread(
_finish_apply_background, app_id, user_id, flat_id, url, profile, submit_forms,
))
# ---------------------------------------------------------------------------
# Misc
# ---------------------------------------------------------------------------
def _mask_secret(value: str) -> str:
if not value:
return ""
if len(value) <= 10:
return "" * len(value)
return value[:6] + "" + value[-4:]