* apply: Recorder.step_snap(page, name) captures both a JPEG screenshot and the page HTML for every major moment; every provider now calls step_snap at each logical step so failure reports contain the exact DOM and rendered state at every stage of the flow * ZIP report: each snapshot becomes snapshots/NN_<label>.jpg + snapshots/NN_<label>.html for AI-assisted debugging * web: Wohnungsliste zeigt nur noch Flats, die die eigenen Filter treffen; Match-Chip entfernt (Liste ist jetzt implizit matchend) * UI komplett auf Deutsch: Protokoll statt Logs, Administrator statt admin, Trockenmodus statt dry-run, Automatik pausiert statt circuit open, Alarm statt Alert, Abmelden statt Logout * Wohnungen-Header: Zeile 1 Info (Alarm + Filter), Zeile 2 Schalter mit echten Radio-Paaren (An/Aus) für Automatisch bewerben und Trockenmodus; hx-confirm auf den kritischen Radios; per-form CSS für sichtbaren Check-State * Protokoll: von/bis-Datumsfilter (Berliner Zeit) + CSV-Download (/logs/export.csv) mit UTC + lokaler Zeit Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
213 lines
7.4 KiB
Python
213 lines
7.4 KiB
Python
"""
|
|
Playwright actions + forensic recorder.
|
|
|
|
The recorder captures everything a downstream AI agent would need to diagnose
|
|
a broken application flow:
|
|
|
|
* a structured `step_log` (one entry per `recorder.step(...)`)
|
|
* browser console logs
|
|
* browser errors
|
|
* every network request + selective response bodies
|
|
* page HTML at finalize time
|
|
* screenshots at key moments
|
|
|
|
Payloads are capped so SQLite stays healthy. Screenshots are base64 JPEGs.
|
|
"""
|
|
import asyncio
|
|
import base64
|
|
import logging
|
|
import time
|
|
from contextlib import asynccontextmanager
|
|
from typing import Optional
|
|
|
|
from playwright.async_api import ViewportSize, async_playwright
|
|
from reportlab.pdfgen import canvas
|
|
|
|
from settings import BROWSER_HEIGHT, BROWSER_LOCALE, BROWSER_WIDTH, HEADLESS, POST_SUBMISSION_SLEEP_MS
|
|
|
|
logger = logging.getLogger("flat-apply")
|
|
|
|
MAX_CONSOLE_ENTRIES = 200
|
|
MAX_NETWORK_ENTRIES = 150
|
|
MAX_BODY_SNIPPET = 2000
|
|
MAX_HTML_DUMP = 200_000 # 200 KB
|
|
MAX_SCREENSHOTS = 40
|
|
SCREENSHOT_JPEG_QUALITY = 60
|
|
|
|
|
|
class Recorder:
|
|
"""Captures browser + step telemetry for one apply run."""
|
|
|
|
def __init__(self, url: str):
|
|
self.started_at = time.time()
|
|
self.url = url
|
|
self.steps: list[dict] = []
|
|
self.console: list[dict] = []
|
|
self.errors: list[dict] = []
|
|
self.network: list[dict] = []
|
|
self.screenshots: list[dict] = []
|
|
self.final_html: Optional[str] = None
|
|
self.final_url: Optional[str] = None
|
|
|
|
# --- step log -----------------------------------------------------------
|
|
def step(self, step_name: str, status: str = "ok", detail: str = "") -> None:
|
|
entry = {
|
|
"ts": round(time.time() - self.started_at, 3),
|
|
"step": step_name,
|
|
"status": status,
|
|
"detail": str(detail)[:500],
|
|
}
|
|
self.steps.append(entry)
|
|
log = logger.info if status == "ok" else logger.warning
|
|
log("step %-20s %-4s %s", step_name, status, detail)
|
|
|
|
# --- browser hooks ------------------------------------------------------
|
|
def _attach(self, page) -> None:
|
|
def on_console(msg):
|
|
try:
|
|
text = msg.text
|
|
except Exception:
|
|
text = "<unavailable>"
|
|
if len(self.console) < MAX_CONSOLE_ENTRIES:
|
|
self.console.append({
|
|
"ts": round(time.time() - self.started_at, 3),
|
|
"type": getattr(msg, "type", "?"),
|
|
"text": text[:500],
|
|
})
|
|
|
|
def on_pageerror(err):
|
|
if len(self.errors) < MAX_CONSOLE_ENTRIES:
|
|
self.errors.append({
|
|
"ts": round(time.time() - self.started_at, 3),
|
|
"message": str(err)[:1000],
|
|
})
|
|
|
|
def on_request(req):
|
|
if len(self.network) < MAX_NETWORK_ENTRIES:
|
|
self.network.append({
|
|
"ts": round(time.time() - self.started_at, 3),
|
|
"kind": "request",
|
|
"method": req.method,
|
|
"url": req.url,
|
|
"resource_type": req.resource_type,
|
|
})
|
|
|
|
async def on_response(resp):
|
|
if len(self.network) >= MAX_NETWORK_ENTRIES:
|
|
return
|
|
try:
|
|
snippet = ""
|
|
if resp.status >= 400:
|
|
try:
|
|
body = await resp.text()
|
|
snippet = body[:MAX_BODY_SNIPPET]
|
|
except Exception:
|
|
snippet = ""
|
|
self.network.append({
|
|
"ts": round(time.time() - self.started_at, 3),
|
|
"kind": "response",
|
|
"status": resp.status,
|
|
"url": resp.url,
|
|
"body_snippet": snippet,
|
|
})
|
|
except Exception:
|
|
pass
|
|
|
|
page.on("console", on_console)
|
|
page.on("pageerror", on_pageerror)
|
|
page.on("request", on_request)
|
|
page.on("response", lambda r: asyncio.create_task(on_response(r)))
|
|
|
|
# --- screenshots + html dump -------------------------------------------
|
|
async def snap(self, page, label: str) -> None:
|
|
"""Capture screenshot + full page HTML for this moment."""
|
|
if len(self.screenshots) >= MAX_SCREENSHOTS:
|
|
return
|
|
ts = round(time.time() - self.started_at, 3)
|
|
entry = {"ts": ts, "label": label, "url": page.url,
|
|
"b64_jpeg": "", "size": 0, "html": "", "html_size": 0}
|
|
try:
|
|
img = await page.screenshot(type="jpeg", quality=SCREENSHOT_JPEG_QUALITY,
|
|
full_page=False, timeout=5000)
|
|
entry["b64_jpeg"] = base64.b64encode(img).decode("ascii")
|
|
entry["size"] = len(img)
|
|
except Exception as e:
|
|
logger.warning("snap screenshot failed (%s): %s", label, e)
|
|
try:
|
|
html = await page.content()
|
|
entry["html"] = html[:MAX_HTML_DUMP]
|
|
entry["html_size"] = len(html)
|
|
except Exception as e:
|
|
logger.warning("snap html failed (%s): %s", label, e)
|
|
self.screenshots.append(entry)
|
|
|
|
async def step_snap(self, page, name: str, detail: str = "", status: str = "ok") -> None:
|
|
"""Log a step AND capture a screenshot + HTML for it."""
|
|
self.step(name, status, detail)
|
|
await self.snap(page, name)
|
|
|
|
async def finalize(self, page) -> None:
|
|
try:
|
|
self.final_url = page.url
|
|
html = await page.content()
|
|
self.final_html = html[:MAX_HTML_DUMP]
|
|
except Exception as e:
|
|
logger.warning("finalize html dump failed: %s", e)
|
|
await self.snap(page, "final")
|
|
|
|
def to_json(self) -> dict:
|
|
return {
|
|
"url": self.url,
|
|
"final_url": self.final_url,
|
|
"duration_s": round(time.time() - self.started_at, 3),
|
|
"steps": self.steps,
|
|
"console": self.console,
|
|
"errors": self.errors,
|
|
"network": self.network,
|
|
"screenshots": self.screenshots,
|
|
"final_html": self.final_html,
|
|
}
|
|
|
|
|
|
@asynccontextmanager
|
|
async def open_page(url: str, recorder: Optional[Recorder] = None):
|
|
async with async_playwright() as p:
|
|
browser = await p.chromium.launch(
|
|
headless=HEADLESS,
|
|
args=["--disable-blink-features=AutomationControlled"],
|
|
)
|
|
context = await browser.new_context(
|
|
viewport=ViewportSize({"width": BROWSER_WIDTH, "height": BROWSER_HEIGHT}),
|
|
locale=BROWSER_LOCALE,
|
|
)
|
|
page = await context.new_page()
|
|
|
|
if recorder:
|
|
recorder._attach(page)
|
|
recorder.step("launch", detail=f"headless={HEADLESS}")
|
|
|
|
if recorder:
|
|
recorder.step("goto", detail=url)
|
|
await page.goto(url)
|
|
await page.wait_for_load_state("networkidle")
|
|
if recorder:
|
|
recorder.step("loaded", detail=page.url)
|
|
await recorder.snap(page, "loaded")
|
|
|
|
try:
|
|
yield page
|
|
finally:
|
|
if recorder:
|
|
try:
|
|
await recorder.finalize(page)
|
|
except Exception:
|
|
logger.exception("recorder.finalize failed")
|
|
await page.wait_for_timeout(POST_SUBMISSION_SLEEP_MS)
|
|
await browser.close()
|
|
|
|
|
|
def create_dummy_pdf():
|
|
logger.info("creating dummy pdf")
|
|
c = canvas.Canvas("DummyPDF.pdf")
|
|
c.drawString(100, 750, "Hello! This is a dummy PDF file.")
|
|
c.save()
|