ui: slim 4-card strip, admin-only system log, HTMX apply, title cleanup

* Wohnungen header: single slim row with Alert status · Filter summary ·
  Auto-Bewerben toggle · Trockenmodus toggle. Big filter panel removed —
  filters live only in /einstellungen/filter.
* Alert status: 'nicht eingerichtet' until the user has actual filters (+
  valid notification creds if telegram/email). 'aktiv' otherwise.
* Logs tab: admin-only (gated both in layout and server-side). Shows merged
  audit + errors across all users, sorted newest-first, capped at 300.
* Apply, auto-apply, trockenmodus and circuit reset buttons post via HTMX and
  swap the Wohnungen body. While any application is still running for the
  user the poll interval drops from 30s to 3s so status flips to 'beworben'
  or 'fehlgeschlagen' almost immediately.
* Browser tab title is now always 'lazyflat'.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Moritz 2026-04-21 11:25:59 +02:00
parent 332d9eea19
commit 04b591fa9e
9 changed files with 204 additions and 136 deletions

View file

@ -164,6 +164,19 @@ def base_context(request: Request, user, active_tab: str) -> dict:
}
def _is_htmx(request: Request) -> bool:
return request.headers.get("hx-request", "").lower() == "true"
def _wohnungen_partial_or_redirect(request: Request, user):
"""If called via HTMX, render the body partial; otherwise redirect to /."""
if _is_htmx(request):
ctx = base_context(request, user, "wohnungen")
ctx.update(_wohnungen_context(user))
return templates.TemplateResponse("_wohnungen_body.html", ctx)
return RedirectResponse("/", status_code=303)
def _manual_apply_allowed() -> tuple[bool, str]:
"""Manual 'Bewerben' button is only blocked if the apply service is down."""
if not apply_client.health():
@ -190,6 +203,61 @@ def _next_scrape_utc() -> str:
return (dt + timedelta(seconds=ALERT_SCRAPE_INTERVAL_SECONDS)).astimezone(timezone.utc).isoformat(timespec="seconds")
FILTER_KEYS = ("rooms_min", "rooms_max", "max_rent", "min_size", "max_morning_commute", "wbs_required")
def _has_filters(f) -> bool:
if not f:
return False
for k in FILTER_KEYS:
v = f[k] if hasattr(f, "keys") else None
if v not in (None, "", 0, 0.0):
return True
return False
def _alert_status(filters_row, notifications_row) -> tuple[str, str]:
"""Return (label, chip_kind) describing the user's alert setup."""
if not _has_filters(filters_row):
return "nicht eingerichtet", "warn"
ch = (notifications_row["channel"] if notifications_row else "ui") or "ui"
if ch == "telegram" and not (notifications_row["telegram_bot_token"] and notifications_row["telegram_chat_id"]):
return "Benachrichtigung fehlt", "warn"
if ch == "email" and not notifications_row["email_address"]:
return "Benachrichtigung fehlt", "warn"
return "aktiv", "ok"
def _filter_summary(f) -> str:
if not _has_filters(f):
return ""
parts = []
rmin, rmax = f["rooms_min"], f["rooms_max"]
if rmin or rmax:
def fmt(x):
return "" if x is None else ("%g" % x)
parts.append(f"{fmt(rmin)}{fmt(rmax)} Zi")
if f["max_rent"]:
parts.append(f"{int(f['max_rent'])}")
if f["min_size"]:
parts.append(f"{int(f['min_size'])}")
if f["max_morning_commute"]:
parts.append(f"{int(f['max_morning_commute'])} min")
if f["wbs_required"] == "yes":
parts.append("WBS")
elif f["wbs_required"] == "no":
parts.append("ohne WBS")
return " · ".join(parts)
def _has_running_application(user_id: int) -> bool:
row = db._conn.execute(
"SELECT 1 FROM applications WHERE user_id = ? AND finished_at IS NULL LIMIT 1",
(user_id,),
).fetchone()
return row is not None
def _run_apply_background(user_id: int, flat_id: str, url: str, triggered_by: str) -> None:
prefs = db.get_preferences(user_id)
profile_row = db.get_profile(user_id)
@ -308,6 +376,7 @@ def logout(request: Request):
def _wohnungen_context(user) -> dict:
uid = user["id"]
filters_row = db.get_filters(uid)
notif_row = db.get_notifications(uid)
prefs = db.get_preferences(uid)
filters = row_to_dict(filters_row)
flats = db.recent_flats(100)
@ -325,9 +394,13 @@ def _wohnungen_context(user) -> dict:
})
allowed, reason = _manual_apply_allowed()
alert_label, alert_chip = _alert_status(filters_row, notif_row)
has_running = _has_running_application(uid)
return {
"flats": flats_view,
"filters": filters,
"alert_label": alert_label,
"alert_chip": alert_chip,
"filter_summary": _filter_summary(filters_row),
"auto_apply_enabled": bool(prefs["auto_apply_enabled"]),
"submit_forms": bool(prefs["submit_forms"]),
"circuit_open": bool(prefs["apply_circuit_open"]),
@ -335,9 +408,9 @@ def _wohnungen_context(user) -> dict:
"apply_allowed": allowed,
"apply_block_reason": reason,
"apply_reachable": apply_client.health(),
"last_alert_heartbeat": db.get_state("last_alert_heartbeat") or "",
"next_scrape_utc": _next_scrape_utc(),
"scrape_interval_seconds": ALERT_SCRAPE_INTERVAL_SECONDS,
"has_running_apply": has_running,
"poll_interval": 3 if has_running else 30,
}
@ -400,7 +473,7 @@ async def action_auto_apply(
db.update_preferences(user["id"], {"auto_apply_enabled": new})
db.log_audit(user["username"], "auto_apply", "on" if new else "off",
user_id=user["id"], ip=client_ip(request))
return RedirectResponse("/", status_code=303)
return _wohnungen_partial_or_redirect(request, user)
@app.post("/actions/reset-circuit")
@ -412,7 +485,7 @@ async def action_reset_circuit(
require_csrf(user["id"], csrf)
db.update_preferences(user["id"], {"apply_circuit_open": 0, "apply_recent_failures": 0})
db.log_audit(user["username"], "reset_circuit", user_id=user["id"], ip=client_ip(request))
return RedirectResponse("/", status_code=303)
return _wohnungen_partial_or_redirect(request, user)
@app.post("/actions/apply")
@ -432,7 +505,7 @@ async def action_apply(
db.log_audit(user["username"], "trigger_apply", f"flat_id={flat_id}",
user_id=user["id"], ip=client_ip(request))
_kick_apply(user["id"], flat_id, flat["link"], "user")
return RedirectResponse("/", status_code=303)
return _wohnungen_partial_or_redirect(request, user)
# ---------------------------------------------------------------------------
@ -546,8 +619,39 @@ def tab_logs(request: Request):
u = current_user(request)
if not u:
return RedirectResponse("/login", status_code=303)
if not u["is_admin"]:
raise HTTPException(403, "admin only")
# Merge audit + errors across the whole system for a unified view.
users = {row["id"]: row["username"] for row in db.list_users()}
events: list[dict] = []
for a in db.recent_audit(None, limit=500):
events.append({
"kind": "audit",
"ts": a["timestamp"],
"source": "web",
"actor": a["actor"],
"action": a["action"],
"details": a["details"] or "",
"user": users.get(a["user_id"], ""),
"ip": a["ip"] or "",
})
for e in db.recent_errors(None, limit=500):
events.append({
"kind": "error",
"ts": e["timestamp"],
"source": e["source"],
"actor": e["source"],
"action": e["kind"],
"details": e["summary"] or "",
"user": users.get(e["user_id"], "") if e["user_id"] else "",
"ip": "",
})
events.sort(key=lambda x: x["ts"], reverse=True)
events = events[:300]
ctx = base_context(request, u, "logs")
ctx["events"] = db.recent_audit(u["id"], limit=200)
ctx["events"] = events
return templates.TemplateResponse("logs.html", ctx)
@ -676,7 +780,9 @@ async def action_submit_forms(
db.update_preferences(user["id"], {"submit_forms": new})
db.log_audit(user["username"], "submit_forms", "on" if new else "off",
user_id=user["id"], ip=client_ip(request))
return RedirectResponse("/einstellungen/profil", status_code=303)
if _is_htmx(request):
return _wohnungen_partial_or_redirect(request, user)
return RedirectResponse(request.headers.get("referer", "/einstellungen/profil"), status_code=303)
@app.post("/actions/users/create")