* remove the kill-switch: auto-apply toggle is the single on/off; manual
'Bewerben' button now only gated by apply reachability; circuit breaker
stays but only gates auto-apply (manual bypasses, so a user can retry)
* Berlin-timezone date filter (de_dt) formats timestamps as DD.MM.YYYY HH:MM
everywhere; storage stays UTC
* Wohnungen: live 'entdeckt vor X' on every flat + 'nächste Aktualisierung in Xs'
countdown in the header, driven by /static/app.js; HTMX polls body every 30s
* drop the Fehler tab entirely; failed applications now carry a
'Fehler-Report herunterladen (ZIP)' link -> /bewerbungen/{id}/report.zip
bundles application.json, flat.json, profile_snapshot.json, forensics.json,
step_log.txt, page.html, console/errors/network JSONs, and decoded
screenshots/*.jpg for AI-assisted debugging
* trim the 'sensibel' blurb from the Profil tab
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
779 lines
28 KiB
Python
779 lines
28 KiB
Python
"""
|
|
lazyflat web app.
|
|
|
|
Four tabs:
|
|
- / → Wohnungen (all flats, per-user match highlighting, filter block, auto-apply switch)
|
|
- /bewerbungen → Bewerbungen (history + forensics; failed apps expose a ZIP report download)
|
|
- /logs → Logs (user-scoped audit log)
|
|
- /einstellungen/<section> → Einstellungen: profile, filter, notifications, account, admin users
|
|
|
|
All state-changing POSTs require CSRF. Internal endpoints require INTERNAL_API_KEY.
|
|
"""
|
|
import asyncio
|
|
import base64
|
|
import hmac
|
|
import io
|
|
import json
|
|
import logging
|
|
import sqlite3
|
|
import zipfile
|
|
from contextlib import asynccontextmanager
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Any
|
|
|
|
from fastapi import Depends, FastAPI, Form, Header, HTTPException, Request, Response, status
|
|
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse, StreamingResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.templating import Jinja2Templates
|
|
|
|
try:
|
|
from zoneinfo import ZoneInfo
|
|
BERLIN_TZ = ZoneInfo("Europe/Berlin")
|
|
except Exception:
|
|
BERLIN_TZ = timezone.utc
|
|
|
|
import db
|
|
import notifications
|
|
import retention
|
|
from apply_client import ApplyClient, _row_to_profile
|
|
from auth import (
|
|
bootstrap_admin,
|
|
clear_session_cookie,
|
|
current_user,
|
|
hash_password,
|
|
issue_csrf_token,
|
|
issue_session_cookie,
|
|
rate_limit_login,
|
|
require_admin,
|
|
require_csrf,
|
|
require_user,
|
|
verify_login,
|
|
)
|
|
from matching import flat_matches_filter, row_to_dict
|
|
from settings import (
|
|
ALERT_SCRAPE_INTERVAL_SECONDS,
|
|
APPLY_FAILURE_THRESHOLD,
|
|
INTERNAL_API_KEY,
|
|
PUBLIC_URL,
|
|
)
|
|
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s %(levelname)-5s %(name)s: %(message)s",
|
|
datefmt="%H:%M:%S",
|
|
)
|
|
logging.getLogger("urllib3").setLevel(logging.WARNING)
|
|
logger = logging.getLogger("web")
|
|
|
|
apply_client = ApplyClient()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# App + Jinja
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(_app: FastAPI):
|
|
db.init_db()
|
|
bootstrap_admin()
|
|
retention.start()
|
|
logger.info("web service ready")
|
|
yield
|
|
|
|
|
|
app = FastAPI(lifespan=lifespan, title="lazyflat", docs_url=None, redoc_url=None, openapi_url=None)
|
|
app.mount("/static", StaticFiles(directory="static"), name="static")
|
|
templates = Jinja2Templates(directory="templates")
|
|
|
|
|
|
def _parse_iso(s: str | None) -> datetime | None:
|
|
if not s:
|
|
return None
|
|
try:
|
|
# Python handles +00:00 but not trailing Z. Storage always uses +00:00.
|
|
return datetime.fromisoformat(s)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def _de_dt(s: str | None) -> str:
|
|
"""UTC ISO → 'DD.MM.YYYY HH:MM' in Europe/Berlin."""
|
|
dt = _parse_iso(s)
|
|
if dt is None:
|
|
return ""
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
|
return dt.astimezone(BERLIN_TZ).strftime("%d.%m.%Y %H:%M")
|
|
|
|
|
|
def _iso_utc(s: str | None) -> str:
|
|
"""Pass-through of a UTC ISO string (for data-attr in JS)."""
|
|
dt = _parse_iso(s)
|
|
if dt is None:
|
|
return ""
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
|
return dt.astimezone(timezone.utc).isoformat(timespec="seconds")
|
|
|
|
|
|
templates.env.filters["de_dt"] = _de_dt
|
|
templates.env.filters["iso_utc"] = _iso_utc
|
|
|
|
|
|
@app.middleware("http")
|
|
async def security_headers(request: Request, call_next):
|
|
resp: Response = await call_next(request)
|
|
resp.headers.setdefault("X-Frame-Options", "DENY")
|
|
resp.headers.setdefault("X-Content-Type-Options", "nosniff")
|
|
resp.headers.setdefault("Referrer-Policy", "strict-origin-when-cross-origin")
|
|
resp.headers.setdefault("Permissions-Policy", "geolocation=(), camera=(), microphone=()")
|
|
resp.headers.setdefault(
|
|
"Content-Security-Policy",
|
|
"default-src 'self'; "
|
|
"script-src 'self' https://cdn.tailwindcss.com https://unpkg.com; "
|
|
"style-src 'self' https://cdn.tailwindcss.com 'unsafe-inline'; "
|
|
"img-src 'self' data:; connect-src 'self'; frame-ancestors 'none';"
|
|
)
|
|
return resp
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def client_ip(request: Request) -> str:
|
|
xff = request.headers.get("x-forwarded-for")
|
|
if xff:
|
|
return xff.split(",")[0].strip()
|
|
return request.client.host if request.client else "unknown"
|
|
|
|
|
|
def require_internal(x_internal_api_key: str | None = Header(default=None)) -> None:
|
|
if not x_internal_api_key or not hmac.compare_digest(x_internal_api_key, INTERNAL_API_KEY):
|
|
raise HTTPException(status_code=401, detail="invalid internal key")
|
|
|
|
|
|
def base_context(request: Request, user, active_tab: str) -> dict:
|
|
return {
|
|
"request": request,
|
|
"user": user,
|
|
"csrf": issue_csrf_token(user["id"]),
|
|
"active_tab": active_tab,
|
|
"is_admin": bool(user["is_admin"]),
|
|
}
|
|
|
|
|
|
def _manual_apply_allowed() -> tuple[bool, str]:
|
|
"""Manual 'Bewerben' button is only blocked if the apply service is down."""
|
|
if not apply_client.health():
|
|
return False, "apply-service nicht erreichbar"
|
|
return True, ""
|
|
|
|
|
|
def _auto_apply_allowed(prefs) -> bool:
|
|
"""Auto-apply trigger gate: user must have enabled it and the circuit must be closed."""
|
|
if not prefs["auto_apply_enabled"]:
|
|
return False
|
|
if prefs["apply_circuit_open"]:
|
|
return False
|
|
return apply_client.health()
|
|
|
|
|
|
def _next_scrape_utc() -> str:
|
|
hb = db.get_state("last_alert_heartbeat")
|
|
dt = _parse_iso(hb)
|
|
if dt is None:
|
|
return ""
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
|
return (dt + timedelta(seconds=ALERT_SCRAPE_INTERVAL_SECONDS)).astimezone(timezone.utc).isoformat(timespec="seconds")
|
|
|
|
|
|
def _run_apply_background(user_id: int, flat_id: str, url: str, triggered_by: str) -> None:
|
|
prefs = db.get_preferences(user_id)
|
|
profile_row = db.get_profile(user_id)
|
|
profile = _row_to_profile(profile_row)
|
|
submit_forms = bool(prefs["submit_forms"])
|
|
|
|
app_id = db.start_application(
|
|
user_id=user_id, flat_id=flat_id, url=url,
|
|
triggered_by=triggered_by, submit_forms=submit_forms,
|
|
profile_snapshot=profile,
|
|
)
|
|
|
|
logger.info("apply.start user=%s flat=%s application=%s submit=%s",
|
|
user_id, flat_id, app_id, submit_forms)
|
|
|
|
result = apply_client.apply(url=url, profile=profile,
|
|
submit_forms=submit_forms, application_id=app_id)
|
|
success = bool(result.get("success"))
|
|
message = result.get("message", "")
|
|
provider = result.get("provider", "")
|
|
forensics = result.get("forensics") or {}
|
|
|
|
db.finish_application(app_id, success=success, message=message,
|
|
provider=provider, forensics=forensics)
|
|
|
|
if success:
|
|
db.update_preferences(user_id, {"apply_recent_failures": 0})
|
|
else:
|
|
failures = int(prefs["apply_recent_failures"] or 0) + 1
|
|
updates = {"apply_recent_failures": failures}
|
|
if failures >= APPLY_FAILURE_THRESHOLD:
|
|
updates["apply_circuit_open"] = 1
|
|
db.log_error(source="apply", kind="circuit_open", user_id=user_id,
|
|
summary=f"{failures} aufeinanderfolgende Fehler",
|
|
application_id=app_id)
|
|
db.update_preferences(user_id, updates)
|
|
db.log_error(source="apply", kind="apply_failure", user_id=user_id,
|
|
summary=message or "Bewerbung fehlgeschlagen",
|
|
application_id=app_id,
|
|
context={"provider": provider, "url": url})
|
|
|
|
flat = db.get_flat(flat_id)
|
|
flat_dict = {"address": flat["address"] if flat else "", "link": url,
|
|
"rooms": flat["rooms"] if flat else None,
|
|
"total_rent": flat["total_rent"] if flat else None}
|
|
if success:
|
|
notifications.on_apply_ok(user_id, flat_dict, message)
|
|
else:
|
|
notifications.on_apply_fail(user_id, flat_dict, message)
|
|
|
|
db.log_audit("system", "apply_finished", f"app={app_id} success={success}", user_id=user_id)
|
|
|
|
|
|
def _kick_apply(user_id: int, flat_id: str, url: str, triggered_by: str) -> None:
|
|
asyncio.create_task(asyncio.to_thread(
|
|
_run_apply_background, user_id, flat_id, url, triggered_by,
|
|
))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public routes
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.get("/health")
|
|
def health():
|
|
return {"status": "ok"}
|
|
|
|
|
|
@app.get("/login", response_class=HTMLResponse)
|
|
def login_form(request: Request, error: str | None = None):
|
|
if current_user(request):
|
|
return RedirectResponse("/", status_code=303)
|
|
return templates.TemplateResponse("login.html", {"request": request, "error": error})
|
|
|
|
|
|
@app.post("/login")
|
|
def login_submit(request: Request, username: str = Form(...), password: str = Form(...)):
|
|
ip = client_ip(request)
|
|
if not rate_limit_login(ip):
|
|
db.log_audit(username or "?", "login_rate_limited", ip=ip)
|
|
db.log_error(source="web", kind="rate_limit", summary=f"login throttled for {ip}",
|
|
context={"username": username or ""})
|
|
return templates.TemplateResponse(
|
|
"login.html",
|
|
{"request": request, "error": "Zu viele Versuche. Bitte später erneut."},
|
|
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
|
|
)
|
|
user = verify_login(username, password)
|
|
if not user:
|
|
db.log_audit(username or "?", "login_failed", ip=ip)
|
|
return templates.TemplateResponse(
|
|
"login.html",
|
|
{"request": request, "error": "Login fehlgeschlagen."},
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
)
|
|
response = RedirectResponse("/", status_code=303)
|
|
issue_session_cookie(response, user["id"])
|
|
db.log_audit(user["username"], "login_success", user_id=user["id"], ip=ip)
|
|
return response
|
|
|
|
|
|
@app.post("/logout")
|
|
def logout(request: Request):
|
|
u = current_user(request)
|
|
response = RedirectResponse("/login", status_code=303)
|
|
clear_session_cookie(response)
|
|
if u:
|
|
db.log_audit(u["username"], "logout", user_id=u["id"], ip=client_ip(request))
|
|
return response
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tab: Wohnungen
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _wohnungen_context(user) -> dict:
|
|
uid = user["id"]
|
|
filters_row = db.get_filters(uid)
|
|
prefs = db.get_preferences(uid)
|
|
filters = row_to_dict(filters_row)
|
|
flats = db.recent_flats(100)
|
|
|
|
flats_view = []
|
|
for f in flats:
|
|
last = db.last_application_for_flat(uid, f["id"])
|
|
flats_view.append({
|
|
"row": f,
|
|
"matched": flat_matches_filter({
|
|
"rooms": f["rooms"], "total_rent": f["total_rent"], "size": f["size"],
|
|
"wbs": f["wbs"], "connectivity": {"morning_time": f["connectivity_morning_time"]},
|
|
}, filters),
|
|
"last": last,
|
|
})
|
|
|
|
allowed, reason = _manual_apply_allowed()
|
|
return {
|
|
"flats": flats_view,
|
|
"filters": filters,
|
|
"auto_apply_enabled": bool(prefs["auto_apply_enabled"]),
|
|
"submit_forms": bool(prefs["submit_forms"]),
|
|
"circuit_open": bool(prefs["apply_circuit_open"]),
|
|
"apply_failures": int(prefs["apply_recent_failures"] or 0),
|
|
"apply_allowed": allowed,
|
|
"apply_block_reason": reason,
|
|
"apply_reachable": apply_client.health(),
|
|
"last_alert_heartbeat": db.get_state("last_alert_heartbeat") or "",
|
|
"next_scrape_utc": _next_scrape_utc(),
|
|
"scrape_interval_seconds": ALERT_SCRAPE_INTERVAL_SECONDS,
|
|
}
|
|
|
|
|
|
@app.get("/", response_class=HTMLResponse)
|
|
def tab_wohnungen(request: Request):
|
|
u = current_user(request)
|
|
if not u:
|
|
return RedirectResponse("/login", status_code=303)
|
|
ctx = base_context(request, u, "wohnungen")
|
|
ctx.update(_wohnungen_context(u))
|
|
return templates.TemplateResponse("wohnungen.html", ctx)
|
|
|
|
|
|
@app.get("/partials/wohnungen", response_class=HTMLResponse)
|
|
def partial_wohnungen(request: Request, user=Depends(require_user)):
|
|
ctx = base_context(request, user, "wohnungen")
|
|
ctx.update(_wohnungen_context(user))
|
|
return templates.TemplateResponse("_wohnungen_body.html", ctx)
|
|
|
|
|
|
@app.post("/actions/filters")
|
|
async def action_save_filters(
|
|
request: Request,
|
|
csrf: str = Form(...),
|
|
rooms_min: str = Form(""),
|
|
rooms_max: str = Form(""),
|
|
max_rent: str = Form(""),
|
|
min_size: str = Form(""),
|
|
max_morning_commute: str = Form(""),
|
|
wbs_required: str = Form(""),
|
|
user=Depends(require_user),
|
|
):
|
|
require_csrf(user["id"], csrf)
|
|
|
|
def _f(v):
|
|
v = (v or "").strip().replace(",", ".")
|
|
return float(v) if v else None
|
|
|
|
db.update_filters(user["id"], {
|
|
"rooms_min": _f(rooms_min),
|
|
"rooms_max": _f(rooms_max),
|
|
"max_rent": _f(max_rent),
|
|
"min_size": _f(min_size),
|
|
"max_morning_commute": _f(max_morning_commute),
|
|
"wbs_required": (wbs_required or "").strip(),
|
|
})
|
|
db.log_audit(user["username"], "filters.updated", user_id=user["id"], ip=client_ip(request))
|
|
return RedirectResponse("/", status_code=303)
|
|
|
|
|
|
@app.post("/actions/auto-apply")
|
|
async def action_auto_apply(
|
|
request: Request,
|
|
value: str = Form(...),
|
|
csrf: str = Form(...),
|
|
user=Depends(require_user),
|
|
):
|
|
require_csrf(user["id"], csrf)
|
|
new = 1 if value == "on" else 0
|
|
db.update_preferences(user["id"], {"auto_apply_enabled": new})
|
|
db.log_audit(user["username"], "auto_apply", "on" if new else "off",
|
|
user_id=user["id"], ip=client_ip(request))
|
|
return RedirectResponse("/", status_code=303)
|
|
|
|
|
|
@app.post("/actions/reset-circuit")
|
|
async def action_reset_circuit(
|
|
request: Request,
|
|
csrf: str = Form(...),
|
|
user=Depends(require_user),
|
|
):
|
|
require_csrf(user["id"], csrf)
|
|
db.update_preferences(user["id"], {"apply_circuit_open": 0, "apply_recent_failures": 0})
|
|
db.log_audit(user["username"], "reset_circuit", user_id=user["id"], ip=client_ip(request))
|
|
return RedirectResponse("/", status_code=303)
|
|
|
|
|
|
@app.post("/actions/apply")
|
|
async def action_apply(
|
|
request: Request,
|
|
flat_id: str = Form(...),
|
|
csrf: str = Form(...),
|
|
user=Depends(require_user),
|
|
):
|
|
require_csrf(user["id"], csrf)
|
|
allowed, reason = _manual_apply_allowed()
|
|
if not allowed:
|
|
raise HTTPException(409, f"apply disabled: {reason}")
|
|
flat = db.get_flat(flat_id)
|
|
if not flat:
|
|
raise HTTPException(404, "flat not found")
|
|
db.log_audit(user["username"], "trigger_apply", f"flat_id={flat_id}",
|
|
user_id=user["id"], ip=client_ip(request))
|
|
_kick_apply(user["id"], flat_id, flat["link"], "user")
|
|
return RedirectResponse("/", status_code=303)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tab: Bewerbungen
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.get("/bewerbungen", response_class=HTMLResponse)
|
|
def tab_bewerbungen(request: Request):
|
|
u = current_user(request)
|
|
if not u:
|
|
return RedirectResponse("/login", status_code=303)
|
|
ctx = base_context(request, u, "bewerbungen")
|
|
ctx["applications"] = db.recent_applications(u["id"], limit=100)
|
|
return templates.TemplateResponse("bewerbungen.html", ctx)
|
|
|
|
|
|
@app.get("/bewerbungen/{app_id}", response_class=HTMLResponse)
|
|
def bewerbung_detail(request: Request, app_id: int):
|
|
u = current_user(request)
|
|
if not u:
|
|
return RedirectResponse("/login", status_code=303)
|
|
a = db.get_application(app_id)
|
|
if not a or (a["user_id"] != u["id"] and not u["is_admin"]):
|
|
raise HTTPException(404, "not found")
|
|
forensics = json.loads(a["forensics_json"]) if a["forensics_json"] else None
|
|
profile = json.loads(a["profile_snapshot_json"]) if a["profile_snapshot_json"] else {}
|
|
ctx = base_context(request, u, "bewerbungen")
|
|
ctx.update({"application": a, "forensics": forensics, "profile_snapshot": profile})
|
|
return templates.TemplateResponse("bewerbung_detail.html", ctx)
|
|
|
|
|
|
@app.get("/bewerbungen/{app_id}/report.zip")
|
|
def bewerbung_zip(request: Request, app_id: int):
|
|
u = current_user(request)
|
|
if not u:
|
|
raise HTTPException(401)
|
|
a = db.get_application(app_id)
|
|
if not a or (a["user_id"] != u["id"] and not u["is_admin"]):
|
|
raise HTTPException(404)
|
|
|
|
flat = db.get_flat(a["flat_id"])
|
|
forensics = json.loads(a["forensics_json"]) if a["forensics_json"] else {}
|
|
profile = json.loads(a["profile_snapshot_json"]) if a["profile_snapshot_json"] else {}
|
|
app_meta = {k: a[k] for k in a.keys() if k not in ("forensics_json", "profile_snapshot_json")}
|
|
|
|
buf = io.BytesIO()
|
|
with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_DEFLATED) as zf:
|
|
zf.writestr(
|
|
"README.txt",
|
|
f"lazyflat application report\n"
|
|
f"application_id={a['id']}\n"
|
|
f"flat_id={a['flat_id']}\n"
|
|
f"provider={a['provider']}\n"
|
|
f"success={a['success']}\n"
|
|
f"started_at_utc={a['started_at']}\n"
|
|
f"finished_at_utc={a['finished_at']}\n"
|
|
f"submit_forms_used={bool(a['submit_forms_used'])}\n"
|
|
f"\n"
|
|
f"Contents:\n"
|
|
f" application.json DB row + metadata\n"
|
|
f" flat.json Flat details at discovery time\n"
|
|
f" profile_snapshot.json Profile used for this attempt\n"
|
|
f" forensics.json Full captured forensics\n"
|
|
f" step_log.txt Human-readable step log\n"
|
|
f" page.html Final page HTML (if captured)\n"
|
|
f" console.json Browser console entries\n"
|
|
f" errors.json Browser pageerror events\n"
|
|
f" network.json Network requests + partial responses\n"
|
|
f" screenshots/*.jpg Screenshots at key moments\n"
|
|
)
|
|
zf.writestr("application.json", json.dumps(app_meta, indent=2, default=str))
|
|
zf.writestr("flat.json", json.dumps(dict(flat) if flat else {}, indent=2, default=str))
|
|
zf.writestr("profile_snapshot.json", json.dumps(profile, indent=2, default=str))
|
|
zf.writestr("forensics.json", json.dumps(forensics, indent=2, default=str))
|
|
|
|
step_lines = []
|
|
for s in forensics.get("steps", []):
|
|
step_lines.append(f"[{s.get('ts', 0):7.2f}s] {s.get('step', '?'):<24} {s.get('status', ''):<5} {s.get('detail', '')}")
|
|
zf.writestr("step_log.txt", "\n".join(step_lines))
|
|
|
|
if forensics.get("final_html"):
|
|
zf.writestr("page.html", forensics["final_html"])
|
|
zf.writestr("console.json", json.dumps(forensics.get("console", []), indent=2))
|
|
zf.writestr("errors.json", json.dumps(forensics.get("errors", []), indent=2))
|
|
zf.writestr("network.json", json.dumps(forensics.get("network", []), indent=2))
|
|
|
|
for idx, s in enumerate(forensics.get("screenshots", []), start=1):
|
|
b64 = s.get("b64_jpeg", "")
|
|
if b64:
|
|
try:
|
|
data = base64.b64decode(b64)
|
|
label = (s.get("label") or f"step{idx}").replace("/", "_").replace(" ", "_")
|
|
zf.writestr(f"screenshots/{idx:02d}_{label}.jpg", data)
|
|
except Exception:
|
|
pass
|
|
|
|
buf.seek(0)
|
|
filename = f"lazyflat-report-{a['id']}.zip"
|
|
return StreamingResponse(
|
|
buf, media_type="application/zip",
|
|
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tab: Logs
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.get("/logs", response_class=HTMLResponse)
|
|
def tab_logs(request: Request):
|
|
u = current_user(request)
|
|
if not u:
|
|
return RedirectResponse("/login", status_code=303)
|
|
ctx = base_context(request, u, "logs")
|
|
ctx["events"] = db.recent_audit(u["id"], limit=200)
|
|
return templates.TemplateResponse("logs.html", ctx)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tab: Einstellungen (sub-tabs)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
VALID_SECTIONS = ("profil", "filter", "benachrichtigungen", "account", "benutzer")
|
|
|
|
|
|
@app.get("/einstellungen", response_class=HTMLResponse)
|
|
def tab_settings_root(request: Request):
|
|
return RedirectResponse("/einstellungen/profil", status_code=303)
|
|
|
|
|
|
@app.get("/einstellungen/{section}", response_class=HTMLResponse)
|
|
def tab_settings(request: Request, section: str):
|
|
u = current_user(request)
|
|
if not u:
|
|
return RedirectResponse("/login", status_code=303)
|
|
if section not in VALID_SECTIONS:
|
|
raise HTTPException(404)
|
|
if section == "benutzer" and not u["is_admin"]:
|
|
raise HTTPException(403)
|
|
|
|
ctx = base_context(request, u, "einstellungen")
|
|
ctx["section"] = section
|
|
|
|
if section == "profil":
|
|
ctx["profile"] = db.get_profile(u["id"])
|
|
elif section == "filter":
|
|
ctx["filters"] = row_to_dict(db.get_filters(u["id"]))
|
|
elif section == "benachrichtigungen":
|
|
ctx["notifications"] = db.get_notifications(u["id"])
|
|
elif section == "account":
|
|
pass
|
|
elif section == "benutzer":
|
|
ctx["users"] = db.list_users()
|
|
return templates.TemplateResponse("einstellungen.html", ctx)
|
|
|
|
|
|
@app.post("/actions/profile")
|
|
async def action_profile(request: Request, user=Depends(require_user)):
|
|
form = await request.form()
|
|
require_csrf(user["id"], form.get("csrf", ""))
|
|
|
|
def _b(name): return form.get(name, "").lower() in ("true", "on", "yes", "1")
|
|
def _i(name):
|
|
try: return int(form.get(name) or 0)
|
|
except ValueError: return 0
|
|
|
|
db.update_profile(user["id"], {
|
|
"salutation": form.get("salutation", ""),
|
|
"firstname": form.get("firstname", ""),
|
|
"lastname": form.get("lastname", ""),
|
|
"email": form.get("email", ""),
|
|
"telephone": form.get("telephone", ""),
|
|
"street": form.get("street", ""),
|
|
"house_number": form.get("house_number", ""),
|
|
"postcode": form.get("postcode", ""),
|
|
"city": form.get("city", ""),
|
|
"is_possessing_wbs": 1 if _b("is_possessing_wbs") else 0,
|
|
"wbs_type": form.get("wbs_type", "0"),
|
|
"wbs_valid_till": form.get("wbs_valid_till", "1970-01-01"),
|
|
"wbs_rooms": _i("wbs_rooms"),
|
|
"wbs_adults": _i("wbs_adults"),
|
|
"wbs_children": _i("wbs_children"),
|
|
"is_prio_wbs": 1 if _b("is_prio_wbs") else 0,
|
|
"immomio_email": form.get("immomio_email", ""),
|
|
"immomio_password": form.get("immomio_password", ""),
|
|
})
|
|
db.log_audit(user["username"], "profile.updated", user_id=user["id"], ip=client_ip(request))
|
|
return RedirectResponse("/einstellungen/profil", status_code=303)
|
|
|
|
|
|
@app.post("/actions/notifications")
|
|
async def action_notifications(request: Request, user=Depends(require_user)):
|
|
form = await request.form()
|
|
require_csrf(user["id"], form.get("csrf", ""))
|
|
def _b(n): return 1 if form.get(n, "").lower() in ("on", "true", "1", "yes") else 0
|
|
db.update_notifications(user["id"], {
|
|
"channel": form.get("channel", "ui"),
|
|
"telegram_bot_token": form.get("telegram_bot_token", ""),
|
|
"telegram_chat_id": form.get("telegram_chat_id", ""),
|
|
"email_address": form.get("email_address", ""),
|
|
"notify_on_match": _b("notify_on_match"),
|
|
"notify_on_apply_success": _b("notify_on_apply_success"),
|
|
"notify_on_apply_fail": _b("notify_on_apply_fail"),
|
|
})
|
|
db.log_audit(user["username"], "notifications.updated", user_id=user["id"], ip=client_ip(request))
|
|
return RedirectResponse("/einstellungen/benachrichtigungen", status_code=303)
|
|
|
|
|
|
@app.post("/actions/account/password")
|
|
async def action_password(
|
|
request: Request,
|
|
old_password: str = Form(""),
|
|
new_password: str = Form(""),
|
|
new_password_repeat: str = Form(""),
|
|
csrf: str = Form(...),
|
|
user=Depends(require_user),
|
|
):
|
|
require_csrf(user["id"], csrf)
|
|
if not new_password or new_password != new_password_repeat:
|
|
return RedirectResponse("/einstellungen/account?err=mismatch", status_code=303)
|
|
if len(new_password) < 10:
|
|
return RedirectResponse("/einstellungen/account?err=tooshort", status_code=303)
|
|
row = db.get_user_by_username(user["username"])
|
|
from auth import verify_hash
|
|
if not row or not verify_hash(row["password_hash"], old_password):
|
|
return RedirectResponse("/einstellungen/account?err=wrongold", status_code=303)
|
|
db.set_user_password(user["id"], hash_password(new_password))
|
|
db.log_audit(user["username"], "password.changed", user_id=user["id"], ip=client_ip(request))
|
|
return RedirectResponse("/einstellungen/account?ok=1", status_code=303)
|
|
|
|
|
|
@app.post("/actions/submit-forms")
|
|
async def action_submit_forms(
|
|
request: Request,
|
|
value: str = Form(...),
|
|
csrf: str = Form(...),
|
|
user=Depends(require_user),
|
|
):
|
|
require_csrf(user["id"], csrf)
|
|
new = 1 if value == "on" else 0
|
|
db.update_preferences(user["id"], {"submit_forms": new})
|
|
db.log_audit(user["username"], "submit_forms", "on" if new else "off",
|
|
user_id=user["id"], ip=client_ip(request))
|
|
return RedirectResponse("/einstellungen/profil", status_code=303)
|
|
|
|
|
|
@app.post("/actions/users/create")
|
|
async def action_users_create(
|
|
request: Request,
|
|
username: str = Form(...),
|
|
password: str = Form(...),
|
|
is_admin: str = Form(""),
|
|
csrf: str = Form(...),
|
|
admin=Depends(require_admin),
|
|
):
|
|
require_csrf(admin["id"], csrf)
|
|
username = (username or "").strip()
|
|
if not username or len(password) < 10:
|
|
raise HTTPException(400, "username required, password >= 10 chars")
|
|
try:
|
|
uid = db.create_user(username, hash_password(password),
|
|
is_admin=(is_admin.lower() in ("on", "true", "yes", "1")))
|
|
except sqlite3.IntegrityError:
|
|
return RedirectResponse("/einstellungen/benutzer?err=exists", status_code=303)
|
|
db.log_audit(admin["username"], "user.created", f"new_user={username} id={uid}",
|
|
user_id=admin["id"], ip=client_ip(request))
|
|
return RedirectResponse("/einstellungen/benutzer?ok=1", status_code=303)
|
|
|
|
|
|
@app.post("/actions/users/disable")
|
|
async def action_users_disable(
|
|
request: Request,
|
|
target_id: int = Form(...),
|
|
value: str = Form(...),
|
|
csrf: str = Form(...),
|
|
admin=Depends(require_admin),
|
|
):
|
|
require_csrf(admin["id"], csrf)
|
|
if target_id == admin["id"]:
|
|
raise HTTPException(400, "refusing to disable self")
|
|
db.set_user_disabled(target_id, value == "on")
|
|
db.log_audit(admin["username"], "user.toggle_disable",
|
|
f"target={target_id} disabled={value=='on'}",
|
|
user_id=admin["id"], ip=client_ip(request))
|
|
return RedirectResponse("/einstellungen/benutzer", status_code=303)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Internal endpoints
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.post("/internal/flats")
|
|
async def internal_submit_flat(
|
|
payload: dict,
|
|
_guard: None = Depends(require_internal),
|
|
):
|
|
if not payload.get("id") or not payload.get("link"):
|
|
raise HTTPException(400, "id and link required")
|
|
|
|
is_new = db.upsert_flat(payload)
|
|
if not is_new:
|
|
return {"status": "duplicate"}
|
|
|
|
for u in db.list_users():
|
|
if u["disabled"]:
|
|
continue
|
|
filters = row_to_dict(db.get_filters(u["id"]))
|
|
if not flat_matches_filter(payload, filters):
|
|
continue
|
|
|
|
db.log_audit("alert", "flat_matched",
|
|
f"user={u['username']} flat={payload['id']}",
|
|
user_id=u["id"])
|
|
notifications.on_match(u["id"], payload)
|
|
|
|
prefs = db.get_preferences(u["id"])
|
|
if _auto_apply_allowed(prefs):
|
|
_kick_apply(u["id"], str(payload["id"]), payload["link"], "auto")
|
|
db.log_audit("system", "auto_apply_kick",
|
|
f"user={u['username']} flat={payload['id']}",
|
|
user_id=u["id"])
|
|
|
|
return {"status": "ok"}
|
|
|
|
|
|
@app.post("/internal/heartbeat")
|
|
async def internal_heartbeat(payload: dict, _g: None = Depends(require_internal)):
|
|
service = payload.get("service", "unknown")
|
|
db.set_state(f"last_{service}_heartbeat", db.now_iso())
|
|
return {"status": "ok"}
|
|
|
|
|
|
@app.post("/internal/error")
|
|
async def internal_report_error(
|
|
payload: dict,
|
|
_g: None = Depends(require_internal),
|
|
):
|
|
db.log_error(
|
|
source=payload.get("source", "unknown"),
|
|
kind=payload.get("kind", "error"),
|
|
summary=payload.get("summary", ""),
|
|
context=payload.get("context"),
|
|
)
|
|
return {"status": "ok"}
|