"""Admin-only routes: protocol log viewer, user management, secrets, CSV export, and the /logs legacy redirect.""" import io import sqlite3 from datetime import datetime, timedelta, timezone from fastapi import APIRouter, Depends, Form, HTTPException, Request, Response from fastapi.responses import HTMLResponse, RedirectResponse import db from auth import current_user, hash_password, require_admin, require_csrf from common import ( BERLIN_TZ, _de_dt, _mask_secret, base_context, client_ip, templates, ) router = APIRouter() ADMIN_SECTIONS = ("protokoll", "benutzer", "geheimnisse") def _parse_date_range(from_str: str | None, to_str: str | None) -> tuple[str | None, str | None]: """Parse 'YYYY-MM-DD' local-Berlin date inputs into UTC ISO bounds. Bounds are inclusive start-of-day and start-of-next-day.""" def _to_utc_iso(s: str, end_of_day: bool) -> str | None: try: d = datetime.strptime(s, "%Y-%m-%d").replace(tzinfo=BERLIN_TZ) except ValueError: return None if end_of_day: d = d + timedelta(days=1) return d.astimezone(timezone.utc).isoformat(timespec="seconds") start = _to_utc_iso(from_str, False) if from_str else None end = _to_utc_iso(to_str, True) if to_str else None return start, end def _collect_events(start_iso: str | None, end_iso: str | None, limit: int = 500) -> list[dict]: users = {row["id"]: row["username"] for row in db.list_users()} events: list[dict] = [] for a in db.audit_in_range(start_iso, end_iso, limit=limit): events.append({ "kind": "audit", "ts": a["timestamp"], "source": "web", "actor": a["actor"], "action": a["action"], "details": a["details"] or "", "user": users.get(a["user_id"], ""), "ip": a["ip"] or "", }) for e in db.errors_in_range(start_iso, end_iso, limit=limit): events.append({ "kind": "error", "ts": e["timestamp"], "source": e["source"], "actor": e["source"], "action": e["kind"], "details": e["summary"] or "", "user": users.get(e["user_id"], "") if e["user_id"] else "", "ip": "", }) events.sort(key=lambda x: x["ts"], reverse=True) return events @router.get("/logs") def tab_logs_legacy(): # Old top-level Protokoll tab was merged into /admin/protokoll. return RedirectResponse("/admin/protokoll", status_code=301) @router.get("/admin", response_class=HTMLResponse) def tab_admin_root(request: Request): return RedirectResponse("/admin/protokoll", status_code=303) @router.get("/admin/{section}", response_class=HTMLResponse) def tab_admin(request: Request, section: str): u = current_user(request) if not u: return RedirectResponse("/login", status_code=303) if not u["is_admin"]: raise HTTPException(403, "admin only") if section not in ADMIN_SECTIONS: raise HTTPException(404) ctx = base_context(request, u, "admin") ctx["section"] = section if section == "protokoll": q = request.query_params from_str = q.get("from") or "" to_str = q.get("to") or "" start_iso, end_iso = _parse_date_range(from_str or None, to_str or None) ctx.update({ "events": _collect_events(start_iso, end_iso, limit=500), "from_str": from_str, "to_str": to_str, }) elif section == "benutzer": ctx["users"] = db.list_users() elif section == "geheimnisse": secrets = db.all_secrets() ctx["secrets_masked"] = {k: _mask_secret(secrets.get(k, "")) for k in db.SECRET_KEYS} ctx["secret_flash"] = request.query_params.get("ok") return templates.TemplateResponse("admin.html", ctx) @router.get("/logs/export.csv") def tab_logs_export(request: Request): u = current_user(request) if not u: raise HTTPException(401) if not u["is_admin"]: raise HTTPException(403) import csv as _csv q = request.query_params start_iso, end_iso = _parse_date_range(q.get("from") or None, q.get("to") or None) events = _collect_events(start_iso, end_iso, limit=5000) buf = io.StringIO() w = _csv.writer(buf, delimiter=",", quoting=_csv.QUOTE_MINIMAL) w.writerow(["timestamp_utc", "timestamp_berlin", "kind", "source", "actor", "action", "user", "details", "ip"]) for e in events: w.writerow([ e["ts"], _de_dt(e["ts"]), e["kind"], e["source"], e["actor"], e["action"], e["user"], e["details"], e["ip"], ]) body = buf.getvalue().encode("utf-8") filename = "lazyflat-protokoll" if q.get("from"): filename += f"-{q['from']}" if q.get("to"): filename += f"-bis-{q['to']}" filename += ".csv" return Response( content=body, media_type="text/csv; charset=utf-8", headers={"Content-Disposition": f'attachment; filename="{filename}"'}, ) @router.post("/actions/users/create") async def action_users_create( request: Request, username: str = Form(...), password: str = Form(...), is_admin: str = Form(""), csrf: str = Form(...), admin=Depends(require_admin), ): require_csrf(admin["id"], csrf) username = (username or "").strip() if not username or len(password) < 10: raise HTTPException(400, "username required, password >= 10 chars") try: uid = db.create_user(username, hash_password(password), is_admin=(is_admin.lower() in ("on", "true", "yes", "1"))) except sqlite3.IntegrityError: return RedirectResponse("/admin/benutzer?err=exists", status_code=303) db.log_audit(admin["username"], "user.created", f"new_user={username} id={uid}", user_id=admin["id"], ip=client_ip(request)) return RedirectResponse("/admin/benutzer?ok=1", status_code=303) @router.post("/actions/users/disable") async def action_users_disable( request: Request, target_id: int = Form(...), value: str = Form(...), csrf: str = Form(...), admin=Depends(require_admin), ): require_csrf(admin["id"], csrf) if target_id == admin["id"]: raise HTTPException(400, "refusing to disable self") db.set_user_disabled(target_id, value == "on") db.log_audit(admin["username"], "user.toggle_disable", f"target={target_id} disabled={value=='on'}", user_id=admin["id"], ip=client_ip(request)) return RedirectResponse("/admin/benutzer", status_code=303) @router.post("/actions/users/delete") async def action_users_delete( request: Request, target_id: int = Form(...), csrf: str = Form(...), admin=Depends(require_admin), ): require_csrf(admin["id"], csrf) if target_id == admin["id"]: raise HTTPException(400, "refusing to delete self") target = db.get_user(target_id) if not target: return RedirectResponse("/admin/benutzer", status_code=303) db.delete_user(target_id) db.log_audit(admin["username"], "user.deleted", f"target={target_id} username={target['username']}", user_id=admin["id"], ip=client_ip(request)) return RedirectResponse("/admin/benutzer?deleted=1", status_code=303) @router.post("/actions/secrets") async def action_secrets(request: Request, admin=Depends(require_admin)): form = await request.form() require_csrf(admin["id"], form.get("csrf", "")) changed = [] for key in db.SECRET_KEYS: raw = (form.get(key) or "").strip() if not raw: continue db.set_secret(key, raw) changed.append(key) db.log_audit(admin["username"], "secrets.updated", ",".join(changed) or "no-op", user_id=admin["id"], ip=client_ip(request)) return RedirectResponse("/admin/geheimnisse?ok=1", status_code=303)