multi-user: users, per-user profiles/filters/notifications, tab UI, apply forensics

* DB: users + user_profiles/filters/notifications/preferences; applications gets
  user_id + forensics_json + profile_snapshot_json; new errors table
  with 14d retention; schema versioning via MIGRATIONS list
* auth: password hashes in DB (argon2); env vars seed first admin; per-user
  sessions; CSRF bound to user id
* apply: personal info/WBS moved out of env into the request body; providers
  take an ApplyContext with Profile + submit_forms; full Playwright recorder
  (step log, console, page errors, network, screenshots, final HTML)
* web: five top-level tabs (Wohnungen/Bewerbungen/Logs/Fehler/Einstellungen);
  settings sub-tabs profil/filter/benachrichtigungen/account/benutzer;
  per-user matching, auto-apply and notifications (UI/Telegram/SMTP); red
  auto-apply switch on Wohnungen tab; forensics detail view for bewerbungen
  and fehler; retention background thread

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Moritz 2026-04-21 10:52:41 +02:00
parent e663386a19
commit c630b500ef
36 changed files with 2763 additions and 1113 deletions

View file

@ -1,38 +1,198 @@
"""
Playwright actions + forensic recorder.
The recorder captures everything a downstream AI agent would need to diagnose
a broken application flow:
* a structured `step_log` (one entry per `recorder.step(...)`)
* browser console logs
* browser errors
* every network request + selective response bodies
* page HTML at finalize time
* screenshots at key moments
Payloads are capped so SQLite stays healthy. Screenshots are base64 JPEGs.
"""
import asyncio
import base64
import logging
import time
from contextlib import asynccontextmanager
from playwright.async_api import async_playwright, ViewportSize
from typing import Optional
from playwright.async_api import ViewportSize, async_playwright
from reportlab.pdfgen import canvas
from settings import *
import logging
from settings import BROWSER_HEIGHT, BROWSER_LOCALE, BROWSER_WIDTH, HEADLESS, POST_SUBMISSION_SLEEP_MS
logger = logging.getLogger("flat-apply")
MAX_CONSOLE_ENTRIES = 200
MAX_NETWORK_ENTRIES = 150
MAX_BODY_SNIPPET = 2000
MAX_HTML_DUMP = 200_000 # 200 KB
SCREENSHOT_JPEG_QUALITY = 60
class Recorder:
"""Captures browser + step telemetry for one apply run."""
def __init__(self, url: str):
self.started_at = time.time()
self.url = url
self.steps: list[dict] = []
self.console: list[dict] = []
self.errors: list[dict] = []
self.network: list[dict] = []
self.screenshots: list[dict] = []
self.final_html: Optional[str] = None
self.final_url: Optional[str] = None
# --- step log -----------------------------------------------------------
def step(self, step_name: str, status: str = "ok", detail: str = "") -> None:
entry = {
"ts": round(time.time() - self.started_at, 3),
"step": step_name,
"status": status,
"detail": str(detail)[:500],
}
self.steps.append(entry)
log = logger.info if status == "ok" else logger.warning
log("step %-20s %-4s %s", step_name, status, detail)
# --- browser hooks ------------------------------------------------------
def _attach(self, page) -> None:
def on_console(msg):
try:
text = msg.text
except Exception:
text = "<unavailable>"
if len(self.console) < MAX_CONSOLE_ENTRIES:
self.console.append({
"ts": round(time.time() - self.started_at, 3),
"type": getattr(msg, "type", "?"),
"text": text[:500],
})
def on_pageerror(err):
if len(self.errors) < MAX_CONSOLE_ENTRIES:
self.errors.append({
"ts": round(time.time() - self.started_at, 3),
"message": str(err)[:1000],
})
def on_request(req):
if len(self.network) < MAX_NETWORK_ENTRIES:
self.network.append({
"ts": round(time.time() - self.started_at, 3),
"kind": "request",
"method": req.method,
"url": req.url,
"resource_type": req.resource_type,
})
async def on_response(resp):
if len(self.network) >= MAX_NETWORK_ENTRIES:
return
try:
snippet = ""
if resp.status >= 400:
try:
body = await resp.text()
snippet = body[:MAX_BODY_SNIPPET]
except Exception:
snippet = ""
self.network.append({
"ts": round(time.time() - self.started_at, 3),
"kind": "response",
"status": resp.status,
"url": resp.url,
"body_snippet": snippet,
})
except Exception:
pass
page.on("console", on_console)
page.on("pageerror", on_pageerror)
page.on("request", on_request)
page.on("response", lambda r: asyncio.create_task(on_response(r)))
# --- screenshots --------------------------------------------------------
async def snap(self, page, label: str) -> None:
try:
data = await page.screenshot(type="jpeg", quality=SCREENSHOT_JPEG_QUALITY,
full_page=False, timeout=5000)
b64 = base64.b64encode(data).decode("ascii")
self.screenshots.append({
"ts": round(time.time() - self.started_at, 3),
"label": label,
"url": page.url,
"b64_jpeg": b64,
"size": len(data),
})
except Exception as e:
logger.warning("snap failed (%s): %s", label, e)
async def finalize(self, page) -> None:
try:
self.final_url = page.url
html = await page.content()
self.final_html = html[:MAX_HTML_DUMP]
except Exception as e:
logger.warning("finalize html dump failed: %s", e)
await self.snap(page, "final")
def to_json(self) -> dict:
return {
"url": self.url,
"final_url": self.final_url,
"duration_s": round(time.time() - self.started_at, 3),
"steps": self.steps,
"console": self.console,
"errors": self.errors,
"network": self.network,
"screenshots": self.screenshots,
"final_html": self.final_html,
}
@asynccontextmanager
async def open_page(url):
async def open_page(url: str, recorder: Optional[Recorder] = None):
async with async_playwright() as p:
browser = await p.chromium.launch(
headless=HEADLESS,
args=["--disable-blink-features=AutomationControlled"]
args=["--disable-blink-features=AutomationControlled"],
)
context = await browser.new_context(
viewport=ViewportSize({
"width": BROWSER_WIDTH,
"height": BROWSER_HEIGHT}),
locale=BROWSER_LOCALE
viewport=ViewportSize({"width": BROWSER_WIDTH, "height": BROWSER_HEIGHT}),
locale=BROWSER_LOCALE,
)
page = await context.new_page()
if recorder:
recorder._attach(page)
recorder.step("launch", detail=f"headless={HEADLESS}")
if recorder:
recorder.step("goto", detail=url)
await page.goto(url)
await page.wait_for_load_state("networkidle")
if recorder:
recorder.step("loaded", detail=page.url)
await recorder.snap(page, "loaded")
try:
yield page
finally:
if recorder:
try:
await recorder.finalize(page)
except Exception:
logger.exception("recorder.finalize failed")
await page.wait_for_timeout(POST_SUBMISSION_SLEEP_MS)
await browser.close()
def create_dummy_pdf():
logger.info("creating dummy pdf")
c = canvas.Canvas("DummyPDF.pdf")

66
apply/classes/profile.py Normal file
View file

@ -0,0 +1,66 @@
"""Applicant profile passed from web → apply on each request."""
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class Profile:
salutation: str = "Herr"
firstname: str = ""
lastname: str = ""
email: str = ""
telephone: str = ""
street: str = ""
house_number: str = ""
postcode: str = ""
city: str = ""
# WBS
is_possessing_wbs: bool = False
wbs_type: str = "0"
wbs_valid_till: str = "1970-01-01" # ISO date string
wbs_rooms: int = 0
wbs_adults: int = 0
wbs_children: int = 0
is_prio_wbs: bool = False
# optional: immomio login for providers that need it
immomio_email: str = ""
immomio_password: str = ""
@property
def person_count(self) -> int:
return self.wbs_adults + self.wbs_children
@property
def adult_count(self) -> int:
return self.wbs_adults
@property
def children_count(self) -> int:
return self.wbs_children
def wbs_valid_till_dt(self) -> datetime:
try:
return datetime.strptime(self.wbs_valid_till, "%Y-%m-%d")
except ValueError:
return datetime(1970, 1, 1)
@classmethod
def from_dict(cls, d: dict) -> "Profile":
safe = {k: d.get(k) for k in cls.__dataclass_fields__.keys() if k in d and d[k] is not None}
# normalise booleans + ints
for k in ("is_possessing_wbs", "is_prio_wbs"):
if k in safe:
v = safe[k]
if isinstance(v, str):
safe[k] = v.lower() in ("true", "1", "yes", "on")
else:
safe[k] = bool(v)
for k in ("wbs_rooms", "wbs_adults", "wbs_children"):
if k in safe:
try:
safe[k] = int(safe[k])
except (TypeError, ValueError):
safe[k] = 0
return cls(**safe)

View file

@ -3,81 +3,125 @@ from contextlib import asynccontextmanager
from urllib.parse import urlparse
from fastapi import Depends, FastAPI, Header, HTTPException, status
from pydantic import BaseModel
from pydantic import BaseModel, Field
from rich.console import Console
from rich.logging import RichHandler
import providers
from actions import Recorder
from classes.application_result import ApplicationResult
from classes.profile import Profile
from language import _
from settings import INTERNAL_API_KEY, log_settings
from providers._provider import ApplyContext
from settings import INTERNAL_API_KEY
def setup_logging():
logging.basicConfig(
level=logging.WARNING,
format="%(message)s",
datefmt="[%X]",
handlers=[RichHandler(markup=True, console=Console(width=110))],
level=logging.INFO,
format="%(asctime)s %(levelname)-5s %(name)s: %(message)s",
datefmt="%H:%M:%S",
handlers=[RichHandler(markup=False, console=Console(width=140), show_time=False, show_path=False)],
)
logging.getLogger("flat-apply").setLevel(logging.DEBUG)
logging.getLogger("playwright").setLevel(logging.INFO)
logger = logging.getLogger("flat-apply")
setup_logging()
class ProfileModel(BaseModel):
salutation: str = "Herr"
firstname: str = ""
lastname: str = ""
email: str = ""
telephone: str = ""
street: str = ""
house_number: str = ""
postcode: str = ""
city: str = ""
is_possessing_wbs: bool = False
wbs_type: str = "0"
wbs_valid_till: str = "1970-01-01"
wbs_rooms: int = 0
wbs_adults: int = 0
wbs_children: int = 0
is_prio_wbs: bool = False
immomio_email: str = ""
immomio_password: str = ""
class ApplyRequest(BaseModel):
url: str
profile: ProfileModel
submit_forms: bool = False
application_id: int | None = None # echoed back in logs
class ApplyResponse(BaseModel):
success: bool
message: str
provider: str
application_id: int | None = None
forensics: dict
def require_api_key(x_internal_api_key: str | None = Header(default=None)) -> None:
if not INTERNAL_API_KEY:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="apply service has no INTERNAL_API_KEY configured",
)
raise HTTPException(status_code=503, detail="INTERNAL_API_KEY not configured")
if x_internal_api_key != INTERNAL_API_KEY:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="invalid api key")
raise HTTPException(status_code=401, detail="invalid api key")
@asynccontextmanager
async def lifespan(_app: FastAPI):
log_settings()
logger.info(f"apply ready, providers: {sorted(providers.PROVIDERS)}")
logger.info("apply ready, providers: %s", sorted(providers.PROVIDERS))
yield
app = FastAPI(lifespan=lifespan, title="lazyflat-apply")
app = FastAPI(lifespan=lifespan, title="lazyflat-apply", docs_url=None, redoc_url=None)
@app.get("/health")
def health():
return {"status": "ok"}
return {"status": "ok", "providers": sorted(providers.PROVIDERS)}
@app.post("/apply", response_model=ApplyResponse, dependencies=[Depends(require_api_key)])
async def apply(req: ApplyRequest):
url = req.url.strip()
domain = urlparse(url).netloc.lower().removeprefix("www.")
logger.info(f"apply request for domain={domain} url={url}")
logger.info("apply request application_id=%s domain=%s submit=%s",
req.application_id, domain, req.submit_forms)
recorder = Recorder(url)
recorder.step("request.received", detail=f"application_id={req.application_id} domain={domain} submit={req.submit_forms}")
if domain not in providers.PROVIDERS:
logger.warning(f"unsupported provider: {domain}")
recorder.step("unsupported_provider", "warn", domain)
result = ApplicationResult(False, message=_("unsupported_association"))
return ApplyResponse(success=result.success, message=str(result))
return ApplyResponse(
success=False, message=str(result), provider="",
application_id=req.application_id, forensics=recorder.to_json(),
)
provider = providers.PROVIDERS[domain]
profile = Profile.from_dict(req.profile.model_dump())
ctx = ApplyContext(profile=profile, submit_forms=req.submit_forms, recorder=recorder)
try:
provider = providers.PROVIDERS[domain]
result = await provider.apply_for_flat(url)
logger.info(f"application result: {repr(result)}")
result = await provider.apply_for_flat(url, ctx)
logger.info("apply outcome application_id=%s: %r", req.application_id, result)
except Exception as e:
logger.exception("error while applying")
logger.exception("apply crashed application_id=%s", req.application_id)
recorder.step("exception", "error", f"{type(e).__name__}: {e}")
result = ApplicationResult(False, f"Script Error:\n{e}")
return ApplyResponse(success=result.success, message=str(result))
return ApplyResponse(
success=result.success,
message=str(result),
provider=provider.domain,
application_id=req.application_id,
forensics=recorder.to_json(),
)

View file

@ -1,22 +1,37 @@
"""Abstract provider interface. Concrete providers live next to this file."""
import asyncio
from abc import ABC, abstractmethod
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass
from actions import Recorder
from classes.application_result import ApplicationResult
from classes.profile import Profile
logger = logging.getLogger("flat-apply")
@dataclass
class ApplyContext:
"""Per-request state passed into every provider call."""
profile: Profile
submit_forms: bool
recorder: Recorder
class Provider(ABC):
@property
@abstractmethod
def domain(self) -> str:
"""every flat provider needs a domain"""
pass
...
@abstractmethod
async def apply_for_flat(self, url: str) -> ApplicationResult:
"""every flat provider needs to be able to apply for flats"""
pass
async def apply_for_flat(self, url: str, ctx: ApplyContext) -> ApplicationResult:
...
def test_apply(self, url):
print(asyncio.run(self.apply_for_flat(url)))
def test_apply(self, url, profile: Profile | None = None, submit_forms: bool = False):
rec = Recorder(url)
ctx = ApplyContext(profile or Profile(), submit_forms=submit_forms, recorder=rec)
result = asyncio.run(self.apply_for_flat(url, ctx))
print(repr(result))
return result, rec.to_json()

View file

@ -1,129 +1,101 @@
from actions import *
from language import _
from classes.application_result import ApplicationResult
from providers._provider import Provider
from settings import *
import logging
from actions import open_page
from classes.application_result import ApplicationResult
from language import _
from providers._provider import ApplyContext, Provider
logger = logging.getLogger("flat-apply")
class Degewo(Provider):
@property
def domain(self) -> str:
return "degewo.de"
async def apply_for_flat(self, url) -> ApplicationResult:
async with open_page(url) as page:
logger.info("\tSTEP 1: accepting cookies")
async def apply_for_flat(self, url: str, ctx: ApplyContext) -> ApplicationResult:
r = ctx.recorder
p = ctx.profile
async with open_page(url, recorder=r) as page:
r.step("cookies.check")
cookie_accept_btn = page.locator("#cookie-consent-submit-all")
if await cookie_accept_btn.is_visible():
await cookie_accept_btn.click()
logger.debug("\t\tcookie accept button clicked")
else:
logger.debug("\t\tno cookie accept button found")
r.step("cookies.accepted")
logger.info("\tSTEP 2: check if the page was not found")
r.step("page.404_check")
if page.url == "https://www.degewo.de/immosuche/404":
logger.debug("\t\t'page not found' message found - returning")
return ApplicationResult(
success=False,
message=_("ad_offline"))
logger.debug("\t\t'page not found' message not found")
return ApplicationResult(False, message=_("ad_offline"))
logger.info("\tSTEP 3: check if the ad is deactivated")
r.step("ad.deactivated_check")
if await page.locator("span", has_text="Inserat deaktiviert").is_visible():
logger.debug("\t\t'ad deactivated' message found - returning")
return ApplicationResult(
success=False,
message=_("ad_deactivated"))
logger.debug("\t\t'ad deactivated' message not found")
return ApplicationResult(False, message=_("ad_deactivated"))
logger.info("\tSTEP 4: check if the page moved")
r.step("page.moved_check")
if await page.locator("h1", has_text="Diese Seite ist umgezogen!").is_visible():
logger.debug("\t\t'page moved' message found - returning")
return ApplicationResult(
success=False,
message=_("ad_offline"))
logger.debug("\t\t'page moved' message not found")
return ApplicationResult(False, message=_("ad_offline"))
logger.info("\tSTEP 5: go to the application form")
await page.get_by_role("link", name="Kontakt").click()
logger.info("\tSTEP 6: find the form iframe")
r.step("form.open"); await page.get_by_role("link", name="Kontakt").click()
r.step("iframe.locate")
form_frame = page.frame_locator("iframe[src*='wohnungshelden']")
logger.info("\tSTEP 7: fill the form")
await form_frame.locator("#salutation").fill(SALUTATION)
await form_frame.get_by_role("option", name=SALUTATION, exact=True).click()
await form_frame.locator("#firstName").fill(FIRSTNAME)
await form_frame.locator("#lastName").fill(LASTNAME)
await form_frame.locator("#email").fill(EMAIL)
await form_frame.locator("input[title='Telefonnummer']").fill(TELEPHONE)
await form_frame.locator("input[title='Anzahl einziehende Personen']").fill(str(PERSON_COUNT))
r.step("form.fill_personal")
await form_frame.locator("#salutation").fill(p.salutation)
await form_frame.get_by_role("option", name=p.salutation, exact=True).click()
await form_frame.locator("#firstName").fill(p.firstname)
await form_frame.locator("#lastName").fill(p.lastname)
await form_frame.locator("#email").fill(p.email)
await form_frame.locator("input[title='Telefonnummer']").fill(p.telephone)
await form_frame.locator("input[title='Anzahl einziehende Personen']").fill(str(p.person_count))
await page.wait_for_timeout(1000)
r.step("form.wbs_section")
wbs_question = form_frame.locator("input[id*='wbs_available'][id$='Ja']")
if await wbs_question.is_visible():
if not IS_POSSESSING_WBS:
if not p.is_possessing_wbs:
r.step("wbs_required_abort", "warn")
return ApplicationResult(False, _("wbs_required"))
await wbs_question.click()
await form_frame.locator("input[title='WBS gültig bis']").fill(WBS_VALID_TILL.strftime("%d.%m.%Y"))
await form_frame.locator("input[title='WBS gültig bis']").fill(p.wbs_valid_till_dt().strftime("%d.%m.%Y"))
await page.wait_for_timeout(1000)
wbs_rooms_select = form_frame.locator("ng-select[id*='wbs_max_number_rooms']")
await wbs_rooms_select.click()
await page.wait_for_timeout(1000)
correct_wbs_room_option = form_frame.get_by_role("option", name=str(WBS_ROOMS), exact=True)
await correct_wbs_room_option.click()
await form_frame.get_by_role("option", name=str(p.wbs_rooms), exact=True).click()
await page.wait_for_timeout(1000)
await form_frame.locator("ng-select[id*='fuer_wen_ist_wohnungsanfrage']").click()
await page.wait_for_timeout(1000)
await form_frame.get_by_role("option", name="Für mich selbst").click()
logger.info("\tSTEP 8: submit the form")
if not SUBMIT_FORMS:
logger.debug(f"\t\tdry run - not submitting")
if not ctx.submit_forms:
r.step("submit.dry_run")
return ApplicationResult(success=True, message=_("application_success_dry"))
await form_frame.locator("button[data-cy*='btn-submit']").click()
r.step("submit.click"); await form_frame.locator("button[data-cy*='btn-submit']").click()
await page.wait_for_timeout(3000)
logger.info("\tSTEP 9: check the success")
r.step("success.check")
if await page.locator("h4", has_text="Vielen Dank für das Übermitteln Ihrer Informationen. Sie können dieses Fenster jetzt schließen.").is_visible():
logger.info(f"\t\tsuccess detected by heading")
return ApplicationResult(success=True)
elif await self.is_missing_fields_warning(page):
logger.warning(f"\t\tmissing fields warning detected")
elif await self._missing_fields_warning(page):
r.step("missing_fields_detected", "warn")
return ApplicationResult(success=False, message=_("missing_fields"))
elif await self.is_already_applied_warning(page):
logger.warning(f"\t\talready applied warning detected")
elif await self._already_applied_warning(page):
r.step("already_applied_detected", "warn")
return ApplicationResult(success=False, message=_("already_applied"))
logger.warning(f"\t\tsubmit conformation not found")
r.step("success.not_found", "warn")
return ApplicationResult(success=False, message=_("submit_conformation_msg_not_found"))
async def is_already_applied_warning(self, page):
async def _already_applied_warning(self, page) -> bool:
await page.wait_for_timeout(1000)
form_iframe = page.frame_locator("iframe[src*='wohnungshelden']")
already_applied_warning = form_iframe.locator("span.ant-alert-message",
has_text="Es existiert bereits eine Anfrage mit dieser E-Mail Adresse")
if await already_applied_warning.first.is_visible():
return True
return False
f = page.frame_locator("iframe[src*='wohnungshelden']")
msg = f.locator("span.ant-alert-message",
has_text="Es existiert bereits eine Anfrage mit dieser E-Mail Adresse")
return await msg.first.is_visible()
async def is_missing_fields_warning(self, page):
async def _missing_fields_warning(self, page) -> bool:
await page.wait_for_timeout(1000)
form_iframe = page.frame_locator("iframe[src*='wohnungshelden']")
already_applied_warning = form_iframe.locator("span.ant-alert-message",
has_text="Es wurden nicht alle Felder korrekt befüllt. Bitte prüfen Sie ihre Eingaben")
if await already_applied_warning.first.is_visible():
return True
return False
if __name__ == "__main__":
# url = "https://www.degewo.de/immosuche/details/neubau-mit-wbs-140-160-180-220-mit-besonderem-wohnbedarf-1" # already applied
# url = "https://www.degewo.de/immosuche/details/wohnung-sucht-neuen-mieter-1" # angebot geschlossen
# url = "https://www.degewo.de/immosuche/details/wohnung-sucht-neuen-mieter-145" # seite nicht gefunden
# url = "https://www.degewo.de/immosuche/details/1-zimmer-mit-balkon-3"
# url = "https://www.degewo.de/immosuche/details/2-zimmer-in-gropiusstadt-4"
url = "https://www.degewo.de/immosuche/details/2-zimmer-in-gropiusstadt-4"
provider = Degewo()
provider.test_apply(url)
f = page.frame_locator("iframe[src*='wohnungshelden']")
msg = f.locator("span.ant-alert-message",
has_text="Es wurden nicht alle Felder korrekt befüllt. Bitte prüfen Sie ihre Eingaben")
return await msg.first.is_visible()

View file

@ -1,81 +1,69 @@
from actions import *
from language import _
from classes.application_result import ApplicationResult
from providers._provider import Provider
from settings import *
import logging
from actions import open_page
from classes.application_result import ApplicationResult
from language import _
from providers._provider import ApplyContext, Provider
logger = logging.getLogger("flat-apply")
class Gesobau(Provider):
@property
def domain(self) -> str:
return "gesobau.de"
async def apply_for_flat(self, url) -> ApplicationResult:
async with open_page(url) as page:
logger.info("\tSTEP 1: extracting immomio link")
async def apply_for_flat(self, url: str, ctx: ApplyContext) -> ApplicationResult:
r = ctx.recorder
p = ctx.profile
if not (p.immomio_email and p.immomio_password):
r.step("immomio_creds_missing", "warn")
return ApplicationResult(False, _("missing_fields"))
async with open_page(url, recorder=r) as page:
r.step("extract.immomio_link")
immomio_link = await page.get_by_role("link", name="Jetzt bewerben").get_attribute("href")
logger.info("\tSTEP 2: going to auth page")
await page.goto("https://tenant.immomio.com/de/auth/login")
r.step("auth.goto"); await page.goto("https://tenant.immomio.com/de/auth/login")
await page.wait_for_timeout(1000)
logger.info("\tSTEP 3: logging in")
await page.locator('input[name="email"]').fill(IMMOMIO_EMAIL)
r.step("auth.login")
await page.locator('input[name="email"]').fill(p.immomio_email)
await page.get_by_role("button", name="Anmelden").click()
await page.wait_for_timeout(1000)
await page.locator("#password").fill(IMMOMIO_PASSWORD)
await page.locator("#password").fill(p.immomio_password)
await page.locator("#kc-login").click()
await page.wait_for_timeout(1000)
logger.info("\tSTEP 4: going back to immomio")
r.step("back.to_immomio", detail=immomio_link or "")
await page.goto(immomio_link)
await page.wait_for_timeout(1000)
logger.info("\tSTEP 5: accepting cookies")
r.step("cookies.check")
cookie_accept_btn = page.get_by_role("button", name="Alle erlauben")
if await cookie_accept_btn.is_visible():
await cookie_accept_btn.click()
logger.debug("\t\tcookie accept button clicked")
else:
logger.debug("\t\tno cookie accept button found")
logger.info("\tSTEP 6: click apply now")
await page.get_by_role("button", name="Jetzt bewerben").click()
r.step("apply.click"); await page.get_by_role("button", name="Jetzt bewerben").click()
await page.wait_for_timeout(3000)
logger.info("\tSTEP 7: check if already applied")
r.step("already_applied.check")
if page.url == "https://tenant.immomio.com/de/properties/applications":
return ApplicationResult(False, message=_("already_applied"))
logger.info("\tSTEP 8: clicking answer questions")
r.step("answer_questions.click")
answer_questions_btn = page.get_by_role("button", name="Fragen beantworten")
if await answer_questions_btn.is_visible():
await answer_questions_btn.click()
logger.debug("\t\tanswer questions button clicked")
await page.wait_for_timeout(2000)
else:
logger.debug("\t\tno answer questions button found")
if await answer_questions_btn.is_visible(): # sometimes this button must be clicked twice
if await answer_questions_btn.is_visible():
await answer_questions_btn.click()
logger.debug("\t\tanswer questions button clicked")
await page.wait_for_timeout(2000)
logger.info("\tSTEP 9: verifying success by answer button vanishing")
if not await answer_questions_btn.is_visible(): # TODO better verify success
logger.info("\t\tsuccess detected by answer button vanishing")
r.step("success.check")
if not await answer_questions_btn.is_visible():
return ApplicationResult(True)
logger.info("\t\tsubmit conformation not found")
r.step("success.not_found", "warn")
return ApplicationResult(False, _("submit_conformation_msg_not_found"))
if __name__ == "__main__":
# url = "https://www.gesobau.de/?immo_ref=10-03239-00007-1185" # already applied
# url = "https://www.gesobau.de/mieten/wohnungssuche/detailseite/florastrasse-10-12179-00002-1002-1d4d1a94-b555-48f8-b06d-d6fc02aecb0d/"
url = "https://www.gesobau.de/mieten/wohnungssuche/detailseite/rolandstrasse-10-03020-00007-1052-7f47d893-e659-4e4f-a7cd-5dcd53f4e6d7/"
provider = Gesobau()
provider.test_apply(url)

View file

@ -1,168 +1,128 @@
from actions import *
from language import _
from classes.application_result import ApplicationResult
from providers._provider import Provider
from settings import *
import logging
from actions import create_dummy_pdf, open_page
from classes.application_result import ApplicationResult
from language import _
from providers._provider import ApplyContext, Provider
logger = logging.getLogger("flat-apply")
class Gewobag(Provider):
@property
def domain(self) -> str:
return "gewobag.de"
async def apply_for_flat(self, url) -> ApplicationResult:
async with open_page(url) as page:
logger.info("\tSTEP 1: accepting cookies")
cookie_accept_btn = page.get_by_text("Alle Cookies akzeptieren")
if await cookie_accept_btn.is_visible():
await cookie_accept_btn.click()
logger.debug("\t\tcookie accept button clicked")
else:
logger.debug("\t\tno cookie accept button found")
async def apply_for_flat(self, url: str, ctx: ApplyContext) -> ApplicationResult:
r = ctx.recorder
p = ctx.profile
async with open_page(url, recorder=r) as page:
r.step("cookies.check")
btn = page.get_by_text("Alle Cookies akzeptieren")
if await btn.is_visible():
await btn.click()
r.step("cookies.accepted")
logger.info("\tSTEP 2: check if the page was not found")
r.step("ad.exists_check")
if await page.get_by_text("Mietangebot nicht gefunden").first.is_visible():
logger.debug("\t\t'page not found' message found - returning")
return ApplicationResult(
success=False,
message=_("not_found"))
logger.debug("\t\t'page not found' message not found")
return ApplicationResult(False, message=_("not_found"))
logger.info("\tSTEP 3: check if ad is still open")
r.step("ad.still_open_check")
if await page.locator('#immo-mediation-notice').is_visible():
logger.debug("\t\tad closed notice found - returning")
return ApplicationResult(
success=False,
message=_("ad_deactivated"))
logger.debug("\t\tno ad closed notice found")
return ApplicationResult(False, message=_("ad_deactivated"))
logger.info("\tSTEP 4: go to the application form")
await page.get_by_role("button", name="Anfrage senden").first.click()
r.step("form.open"); await page.get_by_role("button", name="Anfrage senden").first.click()
logger.info("\tSTEP 5: check if the flat is for seniors only")
if await self.is_senior_flat(page):
logger.debug("\t\tflat is for seniors only - returning")
r.step("form.senior_check")
if await self._is_senior_flat(page):
return ApplicationResult(False, _("senior_flat"))
logger.debug("\t\tflat is not seniors only")
logger.info("\tSTEP 6: check if the flat is for special needs wbs only")
if await self.is_special_needs_wbs(page):
logger.debug("\t\tflat is for special needs wbs only - returning")
r.step("form.special_wbs_check")
if await self._is_special_needs_wbs(page):
return ApplicationResult(False, _("special_need_wbs_flat"))
logger.debug("\t\tflat is not for special needs wbs only")
logger.info("\tSTEP 7: find the form iframe")
r.step("iframe.locate")
form_iframe = page.frame_locator("#contact-iframe")
logger.info("\tSTEP 8: define helper functions")
async def fill_field(locator, filling):
logger.debug(f"\t\tfill_field('{locator}', '{filling}')")
field = form_iframe.locator(locator)
if await field.is_visible():
await field.fill(filling)
await page.wait_for_timeout(100)
else:
logger.debug(f"\t\t\tfield was not found")
async def select_field(locator, selection):
logger.debug(f"\t\tselect_field('{locator}', '{selection}')")
field = form_iframe.locator(locator)
if await field.is_visible():
await field.click()
await page.wait_for_timeout(100)
await form_iframe.get_by_role("option", name=selection, exact=True).click()
await page.wait_for_timeout(100)
else:
logger.debug(f"\t\t\tfield was not found")
async def check_checkbox(locator):
logger.debug(f"\t\tcheck_checkbox('{locator}')")
field = form_iframe.locator(locator)
if await field.first.is_visible():
await field.evaluate_all("elements => elements.forEach(el => el.click())")
await page.wait_for_timeout(100)
else:
logger.debug(f"\t\t\tfield was not found")
async def upload_files(locator, files=None):
if not files:
create_dummy_pdf()
files = ["DummyPDF.pdf"]
logger.debug(f"\t\tupload_files('{locator}', {str(files)})")
wbs_upload_section = form_iframe.locator(locator)
if await wbs_upload_section.count() > 0:
await wbs_upload_section.locator("input[type='file']").set_input_files(files)
sec = form_iframe.locator(locator)
if await sec.count() > 0:
await sec.locator("input[type='file']").set_input_files(files)
await page.wait_for_timeout(2000)
else:
logger.debug(f"\t\t\tfield was not found")
r.step("form.fill_personal")
await select_field("#salutation-dropdown", p.salutation)
await fill_field("#firstName", p.firstname)
await fill_field("#lastName", p.lastname)
await fill_field("#email", p.email)
await fill_field("#phone-number", p.telephone)
await fill_field("#street", p.street)
await fill_field("#house-number", p.house_number)
await fill_field("#zip-code", p.postcode)
await fill_field("#city", p.city)
await fill_field("input[id*='anzahl_erwachsene']", str(p.adult_count))
await fill_field("input[id*='anzahl_kinder']", str(p.children_count))
await fill_field("input[id*='gesamtzahl_der_einziehenden_personen']", str(p.person_count))
logger.info("\tSTEP 9: fill the form")
await select_field("#salutation-dropdown", SALUTATION)
await fill_field("#firstName", FIRSTNAME)
await fill_field("#lastName", LASTNAME)
await fill_field("#email", EMAIL)
await fill_field("#phone-number", TELEPHONE)
await fill_field("#street", STREET)
await fill_field("#house-number", HOUSE_NUMBER)
await fill_field("#zip-code", POSTCODE)
await fill_field("#city", CITY)
await fill_field("input[id*='anzahl_erwachsene']", str(ADULT_COUNT))
await fill_field("input[id*='anzahl_kinder']", str(CHILDREN_COUNT))
await fill_field("input[id*='gesamtzahl_der_einziehenden_personen']", str(PERSON_COUNT))
r.step("form.wbs_fill")
await check_checkbox("[data-cy*='wbs_available'][data-cy*='-Ja']")
await fill_field("input[id*='wbs_valid_until']", WBS_VALID_TILL.strftime("%d.%m.%Y"))
await select_field("input[id*='wbs_max_number_rooms']", f"{WBS_ROOMS} Räume")
await select_field("input[id*='art_bezeichnung_des_wbs']", f"WBS {WBS_TYPE}")
await fill_field("input[id*='wbs_valid_until']", p.wbs_valid_till_dt().strftime("%d.%m.%Y"))
await select_field("input[id*='wbs_max_number_rooms']", f"{p.wbs_rooms} Räume")
await select_field("input[id*='art_bezeichnung_des_wbs']", f"WBS {p.wbs_type}")
await select_field("input[id*='fuer_wen']", "Für mich selbst")
await fill_field("input[id*='telephone_number']", TELEPHONE)
await fill_field("input[id*='telephone_number']", p.telephone)
await check_checkbox("input[id*='datenschutzhinweis']")
await upload_files("el-application-form-document-upload", files=None)
logger.info("\tSTEP 10: submit the form")
if not SUBMIT_FORMS:
logger.debug(f"\t\tdry run - not submitting")
if not ctx.submit_forms:
r.step("submit.dry_run")
return ApplicationResult(True, _("application_success_dry"))
await form_iframe.get_by_role("button", name="Anfrage versenden").click()
r.step("submit.click"); await form_iframe.get_by_role("button", name="Anfrage versenden").click()
await page.wait_for_timeout(5000)
logger.info("\tSTEP 11: check the success")
r.step("success.check", detail=page.url)
if page.url.startswith("https://www.gewobag.de/daten-uebermittelt/"):
logger.info(f"\t\tsuccess detected by page url")
return ApplicationResult(True)
elif self.is_missing_fields_warning(page):
logger.warning(f"\t\tmissing fields warning detected")
elif await self._is_missing_fields_warning(page):
r.step("missing_fields_detected", "warn")
return ApplicationResult(False, _("missing_fields"))
else:
logger.warning(f"\t\tneither missing fields nor success detected")
return ApplicationResult(False, _("submit_conformation_msg_not_found"))
r.step("success.not_found", "warn")
return ApplicationResult(False, _("submit_conformation_msg_not_found"))
async def is_senior_flat(self, page):
form_iframe = page.frame_locator("#contact-iframe")
return await form_iframe.locator("label[for*='mindestalter_seniorenwohnhaus_erreicht']").first.is_visible()
async def _is_senior_flat(self, page):
f = page.frame_locator("#contact-iframe")
return await f.locator("label[for*='mindestalter_seniorenwohnhaus_erreicht']").first.is_visible()
async def is_special_needs_wbs(self, page):
form_iframe = page.frame_locator("#contact-iframe")
return await form_iframe.locator("label[for*='wbs_mit_besonderem_wohnbedarf_vorhanden']").first.is_visible()
async def _is_special_needs_wbs(self, page):
f = page.frame_locator("#contact-iframe")
return await f.locator("label[for*='wbs_mit_besonderem_wohnbedarf_vorhanden']").first.is_visible()
async def is_missing_fields_warning(self, page):
form_iframe = page.frame_locator("#contact-iframe")
missing_field_msg = form_iframe.locator("span.ant-alert-message",
has_text="Es wurden nicht alle Felder korrekt befüllt.")
if await missing_field_msg.first.is_visible():
return True
return False
if __name__ == "__main__":
#url = "https://www.gewobag.de/fuer-mietinteressentinnen/mietangebote/0100-01036-0601-0286-vms1/" # wbs
#url = "https://www.gewobag.de/fuer-mietinteressentinnen/mietangebote/7100-72401-0101-0011/" # senior
url = "https://www.gewobag.de/fuer-mietinteressentinnen/mietangebote/6011-31046-0105-0045/" # more wbs fields
#url = "https://www.gewobag.de/fuer-mietinteressentinnen/mietangebote/0100-01036-0401-0191/" # special need wbs
#url = "https://www.gewobag.de/fuer-mietinteressentinnen/mietangebote/0100-02571-0103-0169/"
# url = "https://www.gewobag.de/fuer-mietinteressentinnen/mietangebote/0100-02571-0103-169/" # page not found
provider = Gewobag()
provider.test_apply(url)
async def _is_missing_fields_warning(self, page):
f = page.frame_locator("#contact-iframe")
msg = f.locator("span.ant-alert-message",
has_text="Es wurden nicht alle Felder korrekt befüllt.")
return await msg.first.is_visible()

View file

@ -1,62 +1,55 @@
from actions import *
from language import _
from classes.application_result import ApplicationResult
from providers._provider import Provider
from settings import *
import logging
from actions import open_page
from classes.application_result import ApplicationResult
from language import _
from providers._provider import ApplyContext, Provider
logger = logging.getLogger("flat-apply")
class Howoge(Provider):
@property
def domain(self) -> str:
return "howoge.de"
async def apply_for_flat(self, url) -> ApplicationResult:
async with open_page(url) as page:
logger.info("\tSTEP 1: accepting cookies")
async def apply_for_flat(self, url: str, ctx: ApplyContext) -> ApplicationResult:
r = ctx.recorder
p = ctx.profile
async with open_page(url, recorder=r) as page:
r.step("cookies.check")
cookie_accept_btn = page.get_by_role("button", name="Alles akzeptieren")
if await cookie_accept_btn.is_visible():
await cookie_accept_btn.click()
logger.debug("\t\tcookie accept button clicked")
r.step("cookies.accepted")
else:
logger.debug("\t\tno cookie accept button found")
r.step("cookies.absent")
logger.info("\tSTEP 2: check if the page was not found")
r.step("page.404_check", detail=page.url)
if page.url == "https://www.howoge.de/404":
logger.debug("\t\t'page not found' url found - returning")
return ApplicationResult(
success=False,
message=_("not_found"))
logger.debug("\t\t'page not found' url not found")
return ApplicationResult(False, message=_("not_found"))
logger.info("\tSTEP 3: go to the application form")
r.step("form.open")
await page.get_by_role("link", name="Besichtigung anfragen").click()
logger.info("\tSTEP 4: fill the form")
await page.get_by_text("Ja, ich habe die Hinweise zum WBS zur Kenntnis genommen.").click()
r.step("form.wbs_hint"); await page.get_by_text("Ja, ich habe die Hinweise zum WBS zur Kenntnis genommen.").click()
await page.get_by_role("button", name="Weiter").click()
await page.get_by_text("Ja, ich habe den Hinweis zum Haushaltsnettoeinkommen zur Kenntnis genommen.").click()
r.step("form.income_hint"); await page.get_by_text("Ja, ich habe den Hinweis zum Haushaltsnettoeinkommen zur Kenntnis genommen.").click()
await page.get_by_role("button", name="Weiter").click()
await page.get_by_text("Ja, ich habe den Hinweis zur Bonitätsauskunft zur Kenntnis genommen.").click()
r.step("form.bonitaet_hint"); await page.get_by_text("Ja, ich habe den Hinweis zur Bonitätsauskunft zur Kenntnis genommen.").click()
await page.get_by_role("button", name="Weiter").click()
await page.locator("#immo-form-firstname").fill(FIRSTNAME)
await page.locator("#immo-form-lastname").fill(LASTNAME)
await page.locator("#immo-form-email").fill(EMAIL)
r.step("form.name"); await page.locator("#immo-form-firstname").fill(p.firstname)
await page.locator("#immo-form-lastname").fill(p.lastname)
await page.locator("#immo-form-email").fill(p.email)
logger.info("\tSTEP 5: submit the form")
if not SUBMIT_FORMS:
logger.debug(f"\t\tdry run - not submitting")
if not ctx.submit_forms:
r.step("submit.dry_run")
return ApplicationResult(True, _("application_success_dry"))
await page.get_by_role("button", name="Anfrage senden").click()
logger.info("\tSTEP 6: check the success")
r.step("submit.click"); await page.get_by_role("button", name="Anfrage senden").click()
r.step("success.check")
if await page.get_by_role("heading", name="Vielen Dank.").is_visible():
return ApplicationResult(True)
r.step("success.not_found", "warn")
return ApplicationResult(False, _("submit_conformation_msg_not_found"))
if __name__ == "__main__":
# url = "https://www.howoge.de/wohnungen-gewerbe/wohnungssuche/detail/1770-26279-6.html" # not found
url = "https://www.howoge.de/immobiliensuche/wohnungssuche/detail/1770-27695-194.html"
provider = Howoge()
provider.test_apply(url)

View file

@ -1,65 +1,56 @@
from actions import *
from language import _
from classes.application_result import ApplicationResult
from providers._provider import Provider
from settings import *
import logging
from actions import open_page
from classes.application_result import ApplicationResult
from language import _
from providers._provider import ApplyContext, Provider
logger = logging.getLogger("flat-apply")
class Stadtundland(Provider):
@property
def domain(self) -> str:
return "stadtundland.de"
async def apply_for_flat(self, url) -> ApplicationResult:
async with open_page(url) as page:
logger.info("\tSTEP 1: accepting cookies")
async def apply_for_flat(self, url: str, ctx: ApplyContext) -> ApplicationResult:
r = ctx.recorder
p = ctx.profile
async with open_page(url, recorder=r) as page:
r.step("cookies.check")
cookie_accept_btn = page.get_by_text("Alle akzeptieren")
if await cookie_accept_btn.is_visible():
await cookie_accept_btn.click()
logger.debug("\t\tcookie accept button clicked")
r.step("cookies.accepted")
else:
logger.debug("\t\tno cookie accept button found")
r.step("cookies.absent")
logger.info("\tSTEP 2: check if ad is still open")
r.step("page.offline_check")
if await page.get_by_role("heading", name="Hier ist etwas schief gelaufen").is_visible():
logger.debug("\t\tsomething went wrong notice found - returning")
return ApplicationResult(
success=False,
message=_("ad_offline"))
logger.debug("\t\tsomething went wrong notice not found")
return ApplicationResult(False, message=_("ad_offline"))
logger.info("\tSTEP 3: fill the form")
await page.locator("#name").fill(FIRSTNAME)
await page.locator("#surname").fill(LASTNAME)
await page.locator("#street").fill(STREET)
await page.locator("#houseNo").fill(HOUSE_NUMBER)
await page.locator("#postalCode").fill(POSTCODE)
await page.locator("#city").fill(CITY)
await page.locator("#phone").fill(TELEPHONE)
await page.locator("#email").fill(EMAIL)
r.step("form.fill")
await page.locator("#name").fill(p.firstname)
await page.locator("#surname").fill(p.lastname)
await page.locator("#street").fill(p.street)
await page.locator("#houseNo").fill(p.house_number)
await page.locator("#postalCode").fill(p.postcode)
await page.locator("#city").fill(p.city)
await page.locator("#phone").fill(p.telephone)
await page.locator("#email").fill(p.email)
await page.locator("#privacy").check()
await page.locator("#provision").check()
logger.info("\tSTEP 4: submit the form")
if not SUBMIT_FORMS:
logger.debug(f"\t\tdry run - not submitting")
return ApplicationResult(False, _("application_success_dry"))
await page.get_by_role("button", name="Eingaben prüfen").click()
await page.get_by_role("button", name="Absenden").click()
if not ctx.submit_forms:
r.step("submit.dry_run")
return ApplicationResult(True, _("application_success_dry"))
r.step("submit.review"); await page.get_by_role("button", name="Eingaben prüfen").click()
r.step("submit.send"); await page.get_by_role("button", name="Absenden").click()
await page.wait_for_timeout(2000)
logger.info("\tSTEP 5: check the success")
r.step("success.check")
if await page.locator("p").filter(has_text="Vielen Dank!").is_visible():
logger.info(f"\t\tsuccess detected by paragraph text")
return ApplicationResult(True)
logger.warning(f"\t\tsuccess message not found")
r.step("success.not_found", "warn")
return ApplicationResult(success=False, message=_("submit_conformation_msg_not_found"))
if __name__ == "__main__":
# url = "https://stadtundland.de/wohnungssuche/1001%2F0203%2F00310" # offline
url = "https://stadtundland.de/wohnungssuche/1050%2F8222%2F00091" # wbs
provider = Stadtundland()
provider.test_apply(url)

View file

@ -1,10 +1,10 @@
from actions import *
from language import _
from classes.application_result import ApplicationResult
from providers._provider import Provider
from settings import *
import logging
from actions import open_page
from classes.application_result import ApplicationResult
from language import _
from providers._provider import ApplyContext, Provider
logger = logging.getLogger("flat-apply")
@ -13,88 +13,71 @@ class Wbm(Provider):
def domain(self) -> str:
return "wbm.de"
async def apply_for_flat(self, url) -> ApplicationResult:
async with open_page(url) as page:
logger.info("\tSTEP 1: checking if page not found")
async def apply_for_flat(self, url: str, ctx: ApplyContext) -> ApplicationResult:
r = ctx.recorder
p = ctx.profile
async with open_page(url, recorder=r) as page:
r.step("page.404_check")
if await page.get_by_role("heading", name="Page Not Found").is_visible():
logger.debug("\t\t'page not found' message found - returning")
return ApplicationResult(
success=False,
message=_("not_found"))
logger.debug("\t\t'page not found' message not found")
return ApplicationResult(False, message=_("not_found"))
logger.info("\tSTEP 2: accepting cookies")
r.step("cookies.check")
cookie_accept_btn = page.get_by_text("Alle zulassen")
if await cookie_accept_btn.is_visible():
await cookie_accept_btn.click()
logger.debug("\t\tcookie accept button clicked")
r.step("cookies.accepted")
else:
logger.debug("\t\tno cookie accept button found")
r.step("cookies.absent")
logger.info("\tSTEP 3: removing chatbot help icon")
await page.locator('#removeConvaiseChat').click()
r.step("chatbot.remove")
try:
await page.locator('#removeConvaiseChat').click()
except Exception as e:
r.step("chatbot.remove_failed", "warn", str(e))
logger.info("\tSTEP 4: checking if ad is offline")
r.step("ad.offline_check")
if page.url == "https://www.wbm.de/wohnungen-berlin/angebote/":
logger.debug("\t\t'page not found' url found - returning")
return ApplicationResult(
success=False,
message=_("ad_offline"))
logger.debug("\t\t'page not found' url not found")
return ApplicationResult(False, message=_("ad_offline"))
r.step("form.open"); await page.locator('.openimmo-detail__contact-box-button').click()
logger.info("\tSTEP 5: go to the application form")
await page.locator('.openimmo-detail__contact-box-button').click()
logger.info("\tSTEP 6: filling the application form")
if IS_POSSESSING_WBS:
r.step("form.wbs_section")
if p.is_possessing_wbs:
await page.locator('label[for="powermail_field_wbsvorhanden_1"]').click()
await page.locator("input[name*='[wbsgueltigbis]']").fill(WBS_VALID_TILL.strftime("%Y-%m-%d"))
await page.locator("select[name*='[wbszimmeranzahl]']").select_option(str(WBS_ROOMS))
await page.locator("#powermail_field_einkommensgrenzenacheinkommensbescheinigung9").select_option(WBS_TYPE)
if IS_PRIO_WBS:
await page.locator("input[name*='[wbsgueltigbis]']").fill(p.wbs_valid_till_dt().strftime("%Y-%m-%d"))
await page.locator("select[name*='[wbszimmeranzahl]']").select_option(str(p.wbs_rooms))
await page.locator("#powermail_field_einkommensgrenzenacheinkommensbescheinigung9").select_option(p.wbs_type)
if p.is_prio_wbs:
await page.locator("#powermail_field_wbsmitbesonderemwohnbedarf_1").check(force=True)
else:
await page.locator('label[for="powermail_field_wbsvorhanden_2"]').click()
await page.locator("#powermail_field_anrede").select_option(SALUTATION)
await page.locator("#powermail_field_name").fill(LASTNAME)
await page.locator("#powermail_field_vorname").fill(FIRSTNAME)
await page.locator("#powermail_field_strasse").fill(STREET)
await page.locator("#powermail_field_plz").fill(POSTCODE)
await page.locator("#powermail_field_ort").fill(CITY)
await page.locator("#powermail_field_e_mail").fill(EMAIL)
await page.locator("#powermail_field_telefon").fill(TELEPHONE)
r.step("form.personal")
await page.locator("#powermail_field_anrede").select_option(p.salutation)
await page.locator("#powermail_field_name").fill(p.lastname)
await page.locator("#powermail_field_vorname").fill(p.firstname)
await page.locator("#powermail_field_strasse").fill(p.street)
await page.locator("#powermail_field_plz").fill(p.postcode)
await page.locator("#powermail_field_ort").fill(p.city)
await page.locator("#powermail_field_e_mail").fill(p.email)
await page.locator("#powermail_field_telefon").fill(p.telephone)
await page.locator("#powermail_field_datenschutzhinweis_1").check(force=True)
logger.info("\tSTEP 7: submit the form")
if not SUBMIT_FORMS:
logger.debug(f"\t\tdry run - not submitting")
if not ctx.submit_forms:
r.step("submit.dry_run")
return ApplicationResult(success=True, message=_("application_success_dry"))
await page.get_by_role("button", name="Anfrage absenden").click()
logger.info("\tSTEP 8: check the success")
r.step("submit.click"); await page.get_by_role("button", name="Anfrage absenden").click()
r.step("success.check")
if await page.get_by_text("Wir haben Ihre Anfrage für das Wohnungsangebot erhalten.").is_visible():
logger.info(f"\t\tsuccess detected by text")
return ApplicationResult(True)
elif await self.is_missing_fields_warning(page):
logger.warning(f"\t\tmissing fields warning detected")
elif await self._missing_fields_warning(page):
r.step("missing_fields_detected", "warn")
return ApplicationResult(False, _("missing_fields"))
else:
logger.warning(f"\t\tneither missing fields nor success detected")
return ApplicationResult(success=False, message=_("submit_conformation_msg_not_found"))
r.step("success.not_found", "warn")
return ApplicationResult(success=False, message=_("submit_conformation_msg_not_found"))
async def is_missing_fields_warning(self, page):
async def _missing_fields_warning(self, page) -> bool:
missing_field_msg = page.get_by_text("Dieses Feld muss ausgefüllt werden!").first
if await missing_field_msg.first.is_visible():
return True
return False
if __name__ == "__main__":
# url = "https://www.wbm.de/wohnungen-berlin/angebote/details/4-zimmer-wohnung-in-spandau-1/" # not found
url = "https://www.wbm.de/wohnungen-berlin/angebote/details/wbs-160-180-220-perfekt-fuer-kleine-familien-3-zimmer-wohnung-mit-balkon/"
provider = Wbm()
provider.test_apply(url)
return await missing_field_msg.first.is_visible()

View file

@ -1,6 +1,12 @@
"""
apply service config.
Personal info and WBS fields used to live here as env vars; they now come in
with every /apply request as a Profile. The only remaining settings are
browser + service-level knobs.
"""
import logging
import sys
from datetime import datetime as dt
from os import getenv
from dotenv import load_dotenv
@ -8,103 +14,43 @@ logger = logging.getLogger("flat-apply")
load_dotenv()
def get_env_or_fail(key: str, default: str = None, required: bool = True) -> str:
def get_env_or_fail(key: str, default: str | None = None, required: bool = True) -> str:
value = getenv(key, default)
if required and value is None:
logger.error(f"Missing required environment variable: {key}")
sys.exit(1)
return value
def get_bool_env(key: str, default: str = "False", required: bool = True) -> bool:
return get_env_or_fail(key, default, required).lower() in ("true", "1", "yes", "on")
def get_int_env(key: str, default: str = None, required: bool = True) -> int:
value_str = get_env_or_fail(key, default, required)
def get_bool_env(key: str, default: str = "False") -> bool:
return (getenv(key, default) or "").lower() in ("true", "1", "yes", "on")
def get_int_env(key: str, default: str) -> int:
v = getenv(key, default) or default
try:
return int(value_str)
return int(v)
except ValueError:
logger.error(f"Environment variable {key} must be an integer. Got: {value_str}")
logger.error(f"Env var {key} must be int, got {v!r}")
sys.exit(1)
def get_date_env(key: str, fmt: str = "%Y-%m-%d", default: str = None, required: bool = True) -> dt:
value_str = get_env_or_fail(key, default, required)
try:
return dt.strptime(value_str, fmt)
except ValueError:
logger.error(f"Environment variable {key} must be a date in format {fmt}. Got: {value_str}")
sys.exit(1)
# --- General Settings ---
LANGUAGE: str = get_env_or_fail("LANGUAGE", "de", False)
# --- Browser -----------------------------------------------------------------
HEADLESS: bool = get_bool_env("HEADLESS", "True")
BROWSER_WIDTH: int = get_int_env("BROWSER_WIDTH", "600")
BROWSER_HEIGHT: int = get_int_env("BROWSER_HEIGHT", "800")
BROWSER_LOCALE: str = getenv("BROWSER_LOCALE", "de-DE")
POST_SUBMISSION_SLEEP_MS: int = get_int_env("POST_SUBMISSION_SLEEP_MS", "0")
# --- Browser Settings ---
HEADLESS: bool = get_bool_env("HEADLESS", "True", True)
BROWSER_WIDTH: int = get_int_env("BROWSER_WIDTH", "600", False)
BROWSER_HEIGHT: int = get_int_env("BROWSER_HEIGHT", "800", False)
BROWSER_LOCALE: str = get_env_or_fail("BROWSER_LOCALE", "de-DE", False)
POST_SUBMISSION_SLEEP_MS: int = get_int_env("POST_SUBMISSION_SLEEP_MS", "0", False)
# --- Automation Mode ---
SUBMIT_FORMS: bool = get_bool_env("SUBMIT_FORMS", "False")
# --- HTTP Server ---
HTTP_HOST: str = get_env_or_fail("HTTP_HOST", "0.0.0.0", False)
HTTP_PORT: int = get_int_env("HTTP_PORT", "8000", False)
# --- Personal Information ---
SALUTATION: str = get_env_or_fail("SALUTATION")
LASTNAME: str = get_env_or_fail("LASTNAME")
FIRSTNAME: str = get_env_or_fail("FIRSTNAME")
EMAIL: str = get_env_or_fail("EMAIL")
TELEPHONE: str = get_env_or_fail("TELEPHONE")
STREET: str = get_env_or_fail("STREET")
HOUSE_NUMBER: str = get_env_or_fail("HOUSE_NUMBER")
POSTCODE: str = get_env_or_fail("POSTCODE")
CITY: str = get_env_or_fail("CITY")
# --- WBS Information ---
IS_POSSESSING_WBS: bool = get_bool_env("IS_POSSESSING_WBS", "False")
WBS_TYPE: str = get_env_or_fail("WBS_TYPE", "0", False)
WBS_VALID_TILL: dt = get_date_env("WBS_VALID_TILL", default="1970-01-01", required=False)
WBS_ROOMS: int = get_int_env("WBS_ROOMS", "0", False)
ADULT_COUNT: int = get_int_env("WBS_ADULTS", "0", False)
CHILDREN_COUNT: int = get_int_env("WBS_CHILDREN", "0", False)
PERSON_COUNT: int = ADULT_COUNT + CHILDREN_COUNT
IS_PRIO_WBS: bool = get_bool_env("IS_PRIO_WBS", "False")
# --- Secrets ---
IMMOMIO_EMAIL: str = get_env_or_fail("IMMOMIO_EMAIL", required=False)
IMMOMIO_PASSWORD: str = get_env_or_fail("IMMOMIO_PASSWORD", required=False)
# --- Service -----------------------------------------------------------------
INTERNAL_API_KEY: str = get_env_or_fail("INTERNAL_API_KEY")
def log_settings():
def log_settings() -> None:
logger.debug("--- Settings ---")
logger.debug(f"LANGUAGE: {LANGUAGE}")
logger.debug(f"HEADLESS: {HEADLESS}")
logger.debug(f"BROWSER_WIDTH: {BROWSER_WIDTH}")
logger.debug(f"BROWSER_HEIGHT: {BROWSER_HEIGHT}")
logger.debug(f"BROWSER_LOCALE: {BROWSER_LOCALE}")
logger.debug(f"POST_SUBMISSION_SLEEP_MS: {POST_SUBMISSION_SLEEP_MS}")
logger.debug(f"HEADLESS: {HEADLESS}")
logger.debug(f"SUBMIT_FORMS: {SUBMIT_FORMS}")
logger.debug(f"HTTP_HOST: {HTTP_HOST}")
logger.debug(f"HTTP_PORT: {HTTP_PORT}")
logger.debug(f"SALUTATION: {SALUTATION}")
logger.debug(f"LASTNAME: {LASTNAME}")
logger.debug(f"FIRSTNAME: {FIRSTNAME}")
logger.debug(f"EMAIL: {EMAIL}")
logger.debug(f"TELEPHONE: {TELEPHONE}")
logger.debug(f"STREET: {STREET}")
logger.debug(f"HOUSE_NUMBER: {HOUSE_NUMBER}")
logger.debug(f"POSTCODE: {POSTCODE}")
logger.debug(f"CITY: {CITY}")
logger.debug(f"IS_POSSESSING_WBS: {IS_POSSESSING_WBS}")
logger.debug(f"WBS_TYPE: {WBS_TYPE}")
logger.debug(f"WBS_VALID_TILL: {WBS_VALID_TILL}")
logger.debug(f"WBS_ROOMS: {WBS_ROOMS}")
logger.debug(f"WBS_ADULTS: {ADULT_COUNT}")
logger.debug(f"WBS_CHILDREN: {CHILDREN_COUNT}")
logger.debug(f"PERSON_COUNT: {PERSON_COUNT}")
logger.debug(f"IS_PRIO_WBS: {IS_PRIO_WBS}")
logger.debug(f"IMMOMIO_EMAIL: {IMMOMIO_EMAIL}")
masked_password = "***" if IMMOMIO_PASSWORD else "None"
logger.debug(f"IMMOMIO_PASSWORD: {masked_password}")