import hmac import logging from contextlib import asynccontextmanager from urllib.parse import urljoin, urlparse from fastapi import Depends, FastAPI, Header, HTTPException, status from playwright.async_api import ViewportSize, async_playwright from pydantic import BaseModel, Field from rich.console import Console from rich.logging import RichHandler import providers from actions import Recorder from classes.application_result import ApplicationResult from classes.profile import Profile from language import _ from providers._provider import ApplyContext from settings import BROWSER_HEIGHT, BROWSER_LOCALE, BROWSER_WIDTH, HEADLESS, INTERNAL_API_KEY def setup_logging(): logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)-5s %(name)s: %(message)s", datefmt="%H:%M:%S", handlers=[RichHandler(markup=False, console=Console(width=140), show_time=False, show_path=False)], ) logging.getLogger("flat-apply").setLevel(logging.DEBUG) logging.getLogger("playwright").setLevel(logging.INFO) logger = logging.getLogger("flat-apply") setup_logging() class ProfileModel(BaseModel): salutation: str = "Herr" firstname: str = "" lastname: str = "" email: str = "" telephone: str = "" street: str = "" house_number: str = "" postcode: str = "" city: str = "" is_possessing_wbs: bool = False wbs_type: str = "0" wbs_valid_till: str = "1970-01-01" wbs_rooms: int = 0 wbs_adults: int = 0 wbs_children: int = 0 is_prio_wbs: bool = False immomio_email: str = "" immomio_password: str = "" class ApplyRequest(BaseModel): url: str profile: ProfileModel submit_forms: bool = False application_id: int | None = None # echoed back in logs class ApplyResponse(BaseModel): success: bool message: str provider: str application_id: int | None = None forensics: dict def require_api_key(x_internal_api_key: str | None = Header(default=None)) -> None: if not INTERNAL_API_KEY: raise HTTPException(status_code=503, detail="INTERNAL_API_KEY not configured") if not x_internal_api_key or not hmac.compare_digest(x_internal_api_key, INTERNAL_API_KEY): raise HTTPException(status_code=401, detail="invalid api key") @asynccontextmanager async def lifespan(_app: FastAPI): logger.info("apply ready, providers: %s", sorted(providers.PROVIDERS)) yield app = FastAPI(lifespan=lifespan, title="lazyflat-apply", docs_url=None, redoc_url=None) @app.get("/health") def health(): return {"status": "ok", "providers": sorted(providers.PROVIDERS)} @app.post("/apply", response_model=ApplyResponse, dependencies=[Depends(require_api_key)]) async def apply(req: ApplyRequest): url = req.url.strip() domain = urlparse(url).netloc.lower().removeprefix("www.") logger.info("apply request application_id=%s domain=%s submit=%s", req.application_id, domain, req.submit_forms) recorder = Recorder(url) recorder.step("request.received", detail=f"application_id={req.application_id} domain={domain} submit={req.submit_forms}") if domain not in providers.PROVIDERS: recorder.step("unsupported_provider", "warn", domain) result = ApplicationResult(False, message=_("unsupported_association")) return ApplyResponse( success=False, message=str(result), provider="", application_id=req.application_id, forensics=recorder.to_json(), ) provider = providers.PROVIDERS[domain] profile = Profile.from_dict(req.profile.model_dump()) ctx = ApplyContext(profile=profile, submit_forms=req.submit_forms, recorder=recorder) try: result = await provider.apply_for_flat(url, ctx) logger.info("apply outcome application_id=%s: %r", req.application_id, result) except Exception as e: logger.exception("apply crashed application_id=%s", req.application_id) recorder.step("exception", "error", f"{type(e).__name__}: {e}") result = ApplicationResult(False, f"Script Error:\n{e}") return ApplyResponse( success=result.success, message=str(result), provider=provider.domain, application_id=req.application_id, forensics=recorder.to_json(), ) class FetchListingRequest(BaseModel): url: str class FetchListingResponse(BaseModel): final_url: str html: str image_urls: list[str] MAX_FETCH_HTML_BYTES = 400_000 MAX_FETCH_IMAGES = 30 @app.post( "/internal/fetch-listing", response_model=FetchListingResponse, dependencies=[Depends(require_api_key)], ) async def fetch_listing(req: FetchListingRequest): """Headless Playwright fetch of a flat listing — returns page HTML + absolute image URLs. Used by the web service's LLM enrichment pipeline so we look like a real browser and don't get bounced by bot guards.""" url = req.url.strip() if not url: raise HTTPException(400, "url required") logger.info("fetch-listing url=%s", url) async with async_playwright() as p: browser = await p.chromium.launch( headless=HEADLESS, args=["--disable-blink-features=AutomationControlled"], ) try: context = await browser.new_context( viewport=ViewportSize({"width": BROWSER_WIDTH, "height": BROWSER_HEIGHT}), locale=BROWSER_LOCALE, ) page = await context.new_page() await page.goto(url, timeout=30_000) try: await page.wait_for_load_state("networkidle", timeout=10_000) except Exception: pass final_url = page.url html = await page.content() # Collect image candidates: + + srcset first URL. raw_imgs: list[str] = await page.evaluate( """() => { const out = []; document.querySelectorAll('img').forEach((img) => { if (img.src) out.push(img.src); const ds = img.getAttribute('data-src'); if (ds) out.push(ds); const ss = img.getAttribute('srcset'); if (ss) { const first = ss.split(',')[0].trim().split(' ')[0]; if (first) out.push(first); } }); return out; }""" ) finally: await browser.close() # Absolutize, dedupe, drop tiny icons/data-uris. seen: set[str] = set() image_urls: list[str] = [] for u in raw_imgs: if not u or u.startswith("data:"): continue absu = urljoin(final_url, u) if absu in seen: continue seen.add(absu) lower = absu.lower() if any(x in lower for x in ("logo", "favicon", "sprite", "icon", ".svg")): continue image_urls.append(absu) if len(image_urls) >= MAX_FETCH_IMAGES: break return FetchListingResponse( final_url=final_url, html=html[:MAX_FETCH_HTML_BYTES], image_urls=image_urls, )