lazyflat/web/common.py
EiSiMo b5b4908ee7 feat(web): footer build SHA shows "(latest)" or "(N behind)"
Footer now compares the running SOURCE_COMMIT against origin/main via
Gitea's compare API and renders "build <sha> (latest)" when up to date
or "build <sha> (N behind)" otherwise — so it's obvious from any page
whether the deploy is current.

Per-SHA cache (60s TTL, 1.5s timeout) keeps the lookup off the hot path
in steady state. Network or parse errors return None and the parens
suffix is just hidden — the SHA itself always renders, so a flaky git
host can never break the layout.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 11:21:24 +02:00

322 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Shared helpers and singletons used by the route modules.
Extracted from the old monolithic app.py. Signatures and behaviour are kept
byte-for-byte identical with the original helpers so the routers can import
from here without any behavioural change.
"""
import asyncio
import hmac
import logging
from datetime import datetime, timedelta, timezone
from typing import Any
from fastapi import Depends, HTTPException, Header, Request
from fastapi.templating import Jinja2Templates
try:
from zoneinfo import ZoneInfo
BERLIN_TZ = ZoneInfo("Europe/Berlin")
except Exception:
BERLIN_TZ = timezone.utc
import db
import notifications
from apply_client import ApplyClient, _row_to_profile
from auth import issue_csrf_token
from settings import APPLY_FAILURE_THRESHOLD, GIT_COMMIT, INTERNAL_API_KEY
from version import commits_behind_main
logger = logging.getLogger("web")
# Jinja is instantiated here so every router uses the same environment and
# the filters registered in app.py (de_dt, iso_utc, flat_slug) are visible
# to every template rendered via this instance.
templates = Jinja2Templates(directory="templates")
apply_client = ApplyClient()
# Strong refs for fire-and-forget tasks. asyncio.create_task only weakly
# references tasks from the event loop — the GC could drop them mid-flight.
_bg_tasks: set[asyncio.Task] = set()
def _spawn(coro) -> asyncio.Task:
t = asyncio.create_task(coro)
_bg_tasks.add(t)
t.add_done_callback(_bg_tasks.discard)
return t
# ---------------------------------------------------------------------------
# Time helpers
# ---------------------------------------------------------------------------
def _parse_iso(s: str | None) -> datetime | None:
if not s:
return None
try:
# Python handles +00:00 but not trailing Z. Storage always uses +00:00.
return datetime.fromisoformat(s)
except ValueError:
return None
def _de_dt(s: str | None) -> str:
"""UTC ISO → 'DD.MM.YYYY HH:MM' in Europe/Berlin."""
dt = _parse_iso(s)
if dt is None:
return ""
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(BERLIN_TZ).strftime("%d.%m.%Y %H:%M")
def _iso_utc(s: str | None) -> str:
"""Pass-through of a UTC ISO string (for data-attr in JS)."""
dt = _parse_iso(s)
if dt is None:
return ""
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc).isoformat(timespec="seconds")
def _last_scrape_utc() -> str:
hb = db.get_state("last_alert_heartbeat")
dt = _parse_iso(hb)
if dt is None:
return ""
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc).isoformat(timespec="seconds")
# ---------------------------------------------------------------------------
# Request helpers
# ---------------------------------------------------------------------------
def client_ip(request: Request) -> str:
xff = request.headers.get("x-forwarded-for")
if xff:
return xff.split(",")[0].strip()
return request.client.host if request.client else "unknown"
def require_internal(x_internal_api_key: str | None = Header(default=None)) -> None:
if not x_internal_api_key or not hmac.compare_digest(x_internal_api_key, INTERNAL_API_KEY):
raise HTTPException(status_code=401, detail="invalid internal key")
def base_context(request: Request, user, active_tab: str) -> dict:
behind = commits_behind_main(GIT_COMMIT)
if behind is None:
git_status = ""
elif behind == 0:
git_status = "latest"
else:
git_status = f"{behind} behind"
return {
"request": request,
"user": user,
"csrf": issue_csrf_token(user["id"]),
"active_tab": active_tab,
"is_admin": bool(user["is_admin"]),
"git_commit": GIT_COMMIT,
"git_commit_short": GIT_COMMIT[:7] if GIT_COMMIT and GIT_COMMIT != "dev" else GIT_COMMIT,
"git_status": git_status,
}
def _is_htmx(request: Request) -> bool:
return request.headers.get("hx-request", "").lower() == "true"
# ---------------------------------------------------------------------------
# Filter helpers
# ---------------------------------------------------------------------------
FILTER_KEYS = ("rooms_min", "rooms_max", "max_rent", "min_size",
"wbs_required", "max_age_hours", "districts")
def _has_filters(f) -> bool:
if not f:
return False
for k in FILTER_KEYS:
v = f[k] if hasattr(f, "keys") else None
if v not in (None, "", 0, 0.0):
return True
return False
def _alert_status(notifications_row) -> tuple[str, str]:
"""Return (label, chip_kind) for the user's alarm (notification) setup.
'aktiv' only if a real push channel is configured with credentials.
'ui' is not a real alarm — the dashboard already shows matches when you
happen to be looking.
"""
if not notifications_row:
return "nicht eingerichtet", "warn"
ch = (notifications_row["channel"] or "ui").strip()
if ch == "telegram":
if notifications_row["telegram_bot_token"] and notifications_row["telegram_chat_id"]:
return "aktiv (Telegram)", "ok"
return "unvollständig", "warn"
return "nicht eingerichtet", "warn"
def _filter_summary(f) -> str:
if not _has_filters(f):
return ""
parts = []
rmin, rmax = f["rooms_min"], f["rooms_max"]
if rmin or rmax:
def fmt(x):
return "" if x is None else ("%g" % x)
parts.append(f"{fmt(rmin)}{fmt(rmax)} Zi")
if f["max_rent"]:
parts.append(f"{int(f['max_rent'])}")
if f["min_size"]:
parts.append(f"{int(f['min_size'])}")
if f["wbs_required"] == "yes":
parts.append("WBS")
elif f["wbs_required"] == "no":
parts.append("ohne WBS")
if f["max_age_hours"]:
parts.append(f"{int(f['max_age_hours'])} h alt")
try:
districts_csv = (f["districts"] or "").strip()
except (KeyError, IndexError):
districts_csv = ""
if districts_csv:
n = sum(1 for d in districts_csv.split(",") if d.strip())
parts.append(f"{n} Bezirk{'e' if n != 1 else ''}")
return " · ".join(parts)
# ---------------------------------------------------------------------------
# Apply gates and orchestration
# ---------------------------------------------------------------------------
def _manual_apply_allowed() -> tuple[bool, str]:
"""Manual 'Bewerben' button is only blocked if the apply service is down."""
if not apply_client.health():
return False, "apply-service nicht erreichbar"
return True, ""
def _auto_apply_allowed(prefs) -> bool:
"""Auto-apply trigger gate: user must have enabled it and the circuit must be closed."""
if not prefs["auto_apply_enabled"]:
return False
if prefs["apply_circuit_open"]:
return False
return apply_client.health()
def _has_running_application(user_id: int) -> bool:
return db.has_running_application(user_id)
# Apply returns success=False with one of these phrases when the listing has
# been taken down or deactivated. We treat that as "the flat is gone" rather
# than "the apply pipeline is broken" — no circuit-breaker hit, no fail
# notification, and the flat is hidden from every user's list.
_OFFLINE_MESSAGE_MARKERS = (
"inserat offline", "inserat deaktiviert", # de
"ad offline", "ad deactivated", # en
)
def _is_offline_result(message: str) -> bool:
if not message:
return False
m = message.lower()
return any(marker in m for marker in _OFFLINE_MESSAGE_MARKERS)
def _finish_apply_background(app_id: int, user_id: int, flat_id: str, url: str,
profile: dict, submit_forms: bool) -> None:
"""Called on a worker thread AFTER the application row already exists.
The HTMX response has already shipped the running state to the user."""
logger.info("apply.running user=%s flat=%s application=%s submit=%s",
user_id, flat_id, app_id, submit_forms)
result = apply_client.apply(url=url, profile=profile,
submit_forms=submit_forms, application_id=app_id)
success = bool(result.get("success"))
message = result.get("message", "")
provider = result.get("provider", "")
forensics = result.get("forensics") or {}
db.finish_application(app_id, success=success, message=message,
provider=provider, forensics=forensics)
offline = not success and _is_offline_result(message)
if offline:
db.mark_flat_offline(flat_id)
prefs = db.get_preferences(user_id)
if success or offline:
# Offline isn't a pipeline failure — reset the counter so a streak of
# stale listings doesn't trip the circuit breaker.
db.update_preferences(user_id, {"apply_recent_failures": 0})
else:
failures = int(prefs["apply_recent_failures"] or 0) + 1
updates = {"apply_recent_failures": failures}
if failures >= APPLY_FAILURE_THRESHOLD:
updates["apply_circuit_open"] = 1
db.log_error(source="apply", kind="circuit_open", user_id=user_id,
summary=f"{failures} aufeinanderfolgende Fehler",
application_id=app_id)
db.update_preferences(user_id, updates)
db.log_error(source="apply", kind="apply_failure", user_id=user_id,
summary=message or "Bewerbung fehlgeschlagen",
application_id=app_id,
context={"provider": provider, "url": url})
flat = db.get_flat(flat_id)
flat_dict = {"address": flat["address"] if flat else "", "link": url,
"rooms": flat["rooms"] if flat else None,
"total_rent": flat["total_rent"] if flat else None}
if success:
notifications.on_apply_ok(user_id, flat_dict, message)
elif not offline:
notifications.on_apply_fail(user_id, flat_dict, message)
outcome = "success" if success else ("offline" if offline else "failure")
db.log_audit("system", "apply_finished",
f"app={app_id} outcome={outcome}", user_id=user_id)
def _kick_apply(user_id: int, flat_id: str, url: str, triggered_by: str) -> None:
"""Insert the application row synchronously so the immediate HTMX response
already renders the row's "läuft…" state. The long-running Playwright call
is then offloaded to a background thread."""
prefs = db.get_preferences(user_id)
profile_row = db.get_profile(user_id)
profile = _row_to_profile(profile_row)
submit_forms = bool(prefs["submit_forms"])
app_id = db.start_application(
user_id=user_id, flat_id=flat_id, url=url,
triggered_by=triggered_by, submit_forms=submit_forms,
profile_snapshot=profile,
)
_spawn(asyncio.to_thread(
_finish_apply_background, app_id, user_id, flat_id, url, profile, submit_forms,
))
# ---------------------------------------------------------------------------
# Misc
# ---------------------------------------------------------------------------
def _mask_secret(value: str) -> str:
if not value:
return ""
if len(value) <= 10:
return "" * len(value)
return value[:6] + "" + value[-4:]