per-step screenshot + html snapshots, matches-only list, full German UI, CSV export

* apply: Recorder.step_snap(page, name) captures both a JPEG screenshot and
  the page HTML for every major moment; every provider now calls step_snap at
  each logical step so failure reports contain the exact DOM and rendered
  state at every stage of the flow
* ZIP report: each snapshot becomes snapshots/NN_<label>.jpg +
  snapshots/NN_<label>.html for AI-assisted debugging
* web: Wohnungsliste zeigt nur noch Flats, die die eigenen Filter treffen;
  Match-Chip entfernt (Liste ist jetzt implizit matchend)
* UI komplett auf Deutsch: Protokoll statt Logs, Administrator statt admin,
  Trockenmodus statt dry-run, Automatik pausiert statt circuit open,
  Alarm statt Alert, Abmelden statt Logout
* Wohnungen-Header: Zeile 1 Info (Alarm + Filter), Zeile 2 Schalter mit
  echten Radio-Paaren (An/Aus) für Automatisch bewerben und Trockenmodus;
  hx-confirm auf den kritischen Radios; per-form CSS für sichtbaren Check-State
* Protokoll: von/bis-Datumsfilter (Berliner Zeit) + CSV-Download
  (/logs/export.csv) mit UTC + lokaler Zeit

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Moritz 2026-04-21 11:40:12 +02:00
parent 04b591fa9e
commit 7444f90d6a
16 changed files with 360 additions and 202 deletions

View file

@ -383,15 +383,13 @@ def _wohnungen_context(user) -> dict:
flats_view = []
for f in flats:
if not flat_matches_filter({
"rooms": f["rooms"], "total_rent": f["total_rent"], "size": f["size"],
"wbs": f["wbs"], "connectivity": {"morning_time": f["connectivity_morning_time"]},
}, filters):
continue
last = db.last_application_for_flat(uid, f["id"])
flats_view.append({
"row": f,
"matched": flat_matches_filter({
"rooms": f["rooms"], "total_rent": f["total_rent"], "size": f["size"],
"wbs": f["wbs"], "connectivity": {"morning_time": f["connectivity_morning_time"]},
}, filters),
"last": last,
})
flats_view.append({"row": f, "last": last})
allowed, reason = _manual_apply_allowed()
alert_label, alert_chip = _alert_status(filters_row, notif_row)
@ -574,7 +572,8 @@ def bewerbung_zip(request: Request, app_id: int):
f" console.json Browser console entries\n"
f" errors.json Browser pageerror events\n"
f" network.json Network requests + partial responses\n"
f" screenshots/*.jpg Screenshots at key moments\n"
f" snapshots/NN_*.jpg Screenshot at each step (NN = order)\n"
f" snapshots/NN_*.html Page HTML at each step\n"
)
zf.writestr("application.json", json.dumps(app_meta, indent=2, default=str))
zf.writestr("flat.json", json.dumps(dict(flat) if flat else {}, indent=2, default=str))
@ -593,14 +592,17 @@ def bewerbung_zip(request: Request, app_id: int):
zf.writestr("network.json", json.dumps(forensics.get("network", []), indent=2))
for idx, s in enumerate(forensics.get("screenshots", []), start=1):
label = (s.get("label") or f"step{idx}").replace("/", "_").replace(" ", "_")
b64 = s.get("b64_jpeg", "")
if b64:
try:
data = base64.b64decode(b64)
label = (s.get("label") or f"step{idx}").replace("/", "_").replace(" ", "_")
zf.writestr(f"screenshots/{idx:02d}_{label}.jpg", data)
zf.writestr(f"snapshots/{idx:02d}_{label}.jpg", data)
except Exception:
pass
html = s.get("html") or ""
if html:
zf.writestr(f"snapshots/{idx:02d}_{label}.html", html)
buf.seek(0)
filename = f"lazyflat-report-{a['id']}.zip"
@ -614,47 +616,108 @@ def bewerbung_zip(request: Request, app_id: int):
# Tab: Logs
# ---------------------------------------------------------------------------
def _parse_date_range(from_str: str | None, to_str: str | None) -> tuple[str | None, str | None]:
"""Parse 'YYYY-MM-DD' local-Berlin date inputs into UTC ISO bounds.
Bounds are inclusive start-of-day and start-of-next-day."""
def _to_utc_iso(s: str, end_of_day: bool) -> str | None:
try:
d = datetime.strptime(s, "%Y-%m-%d").replace(tzinfo=BERLIN_TZ)
except ValueError:
return None
if end_of_day:
d = d + timedelta(days=1)
return d.astimezone(timezone.utc).isoformat(timespec="seconds")
start = _to_utc_iso(from_str, False) if from_str else None
end = _to_utc_iso(to_str, True) if to_str else None
return start, end
def _collect_events(start_iso: str | None, end_iso: str | None) -> list[dict]:
users = {row["id"]: row["username"] for row in db.list_users()}
events: list[dict] = []
for a in db.recent_audit(None, limit=5000):
if start_iso and a["timestamp"] < start_iso: continue
if end_iso and a["timestamp"] >= end_iso: continue
events.append({
"kind": "audit", "ts": a["timestamp"], "source": "web",
"actor": a["actor"], "action": a["action"],
"details": a["details"] or "",
"user": users.get(a["user_id"], ""),
"ip": a["ip"] or "",
})
for e in db.recent_errors(None, limit=5000):
if start_iso and e["timestamp"] < start_iso: continue
if end_iso and e["timestamp"] >= end_iso: continue
events.append({
"kind": "error", "ts": e["timestamp"], "source": e["source"],
"actor": e["source"], "action": e["kind"],
"details": e["summary"] or "",
"user": users.get(e["user_id"], "") if e["user_id"] else "",
"ip": "",
})
events.sort(key=lambda x: x["ts"], reverse=True)
return events
@app.get("/logs", response_class=HTMLResponse)
def tab_logs(request: Request):
def tab_logs(request: Request, **kwargs):
u = current_user(request)
if not u:
return RedirectResponse("/login", status_code=303)
if not u["is_admin"]:
raise HTTPException(403, "admin only")
# Merge audit + errors across the whole system for a unified view.
users = {row["id"]: row["username"] for row in db.list_users()}
events: list[dict] = []
for a in db.recent_audit(None, limit=500):
events.append({
"kind": "audit",
"ts": a["timestamp"],
"source": "web",
"actor": a["actor"],
"action": a["action"],
"details": a["details"] or "",
"user": users.get(a["user_id"], ""),
"ip": a["ip"] or "",
})
for e in db.recent_errors(None, limit=500):
events.append({
"kind": "error",
"ts": e["timestamp"],
"source": e["source"],
"actor": e["source"],
"action": e["kind"],
"details": e["summary"] or "",
"user": users.get(e["user_id"], "") if e["user_id"] else "",
"ip": "",
})
events.sort(key=lambda x: x["ts"], reverse=True)
events = events[:300]
q = request.query_params
from_str = q.get("from") or ""
to_str = q.get("to") or ""
start_iso, end_iso = _parse_date_range(from_str or None, to_str or None)
events = _collect_events(start_iso, end_iso)[:500]
ctx = base_context(request, u, "logs")
ctx["events"] = events
ctx.update({"events": events, "from_str": from_str, "to_str": to_str})
return templates.TemplateResponse("logs.html", ctx)
@app.get("/logs/export.csv")
def tab_logs_export(request: Request):
u = current_user(request)
if not u:
raise HTTPException(401)
if not u["is_admin"]:
raise HTTPException(403)
import csv as _csv
q = request.query_params
start_iso, end_iso = _parse_date_range(q.get("from") or None, q.get("to") or None)
events = _collect_events(start_iso, end_iso)
buf = io.StringIO()
w = _csv.writer(buf, delimiter=",", quoting=_csv.QUOTE_MINIMAL)
w.writerow(["timestamp_utc", "timestamp_berlin", "kind", "source", "actor", "action", "user", "details", "ip"])
for e in events:
w.writerow([
e["ts"],
_de_dt(e["ts"]),
e["kind"],
e["source"],
e["actor"],
e["action"],
e["user"],
e["details"],
e["ip"],
])
body = buf.getvalue().encode("utf-8")
filename = "lazyflat-protokoll"
if q.get("from"): filename += f"-{q['from']}"
if q.get("to"): filename += f"-bis-{q['to']}"
filename += ".csv"
return Response(
content=body, media_type="text/csv; charset=utf-8",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
# ---------------------------------------------------------------------------
# Tab: Einstellungen (sub-tabs)
# ---------------------------------------------------------------------------