* apply: Recorder.step_snap(page, name) captures both a JPEG screenshot and the page HTML for every major moment; every provider now calls step_snap at each logical step so failure reports contain the exact DOM and rendered state at every stage of the flow * ZIP report: each snapshot becomes snapshots/NN_<label>.jpg + snapshots/NN_<label>.html for AI-assisted debugging * web: Wohnungsliste zeigt nur noch Flats, die die eigenen Filter treffen; Match-Chip entfernt (Liste ist jetzt implizit matchend) * UI komplett auf Deutsch: Protokoll statt Logs, Administrator statt admin, Trockenmodus statt dry-run, Automatik pausiert statt circuit open, Alarm statt Alert, Abmelden statt Logout * Wohnungen-Header: Zeile 1 Info (Alarm + Filter), Zeile 2 Schalter mit echten Radio-Paaren (An/Aus) für Automatisch bewerben und Trockenmodus; hx-confirm auf den kritischen Radios; per-form CSS für sichtbaren Check-State * Protokoll: von/bis-Datumsfilter (Berliner Zeit) + CSV-Download (/logs/export.csv) mit UTC + lokaler Zeit Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
135 lines
6.5 KiB
Python
135 lines
6.5 KiB
Python
import logging
|
|
|
|
from actions import create_dummy_pdf, open_page
|
|
from classes.application_result import ApplicationResult
|
|
from language import _
|
|
from providers._provider import ApplyContext, Provider
|
|
|
|
logger = logging.getLogger("flat-apply")
|
|
|
|
|
|
class Gewobag(Provider):
|
|
@property
|
|
def domain(self) -> str:
|
|
return "gewobag.de"
|
|
|
|
async def apply_for_flat(self, url: str, ctx: ApplyContext) -> ApplicationResult:
|
|
r = ctx.recorder
|
|
p = ctx.profile
|
|
async with open_page(url, recorder=r) as page:
|
|
await r.step_snap(page, "cookies.check")
|
|
btn = page.get_by_text("Alle Cookies akzeptieren")
|
|
if await btn.is_visible():
|
|
await btn.click()
|
|
await r.step_snap(page, "cookies.accepted")
|
|
|
|
await r.step_snap(page, "ad.exists_check")
|
|
if await page.get_by_text("Mietangebot nicht gefunden").first.is_visible():
|
|
await r.step_snap(page, "abort.not_found", status="warn")
|
|
return ApplicationResult(False, message=_("not_found"))
|
|
|
|
await r.step_snap(page, "ad.still_open_check")
|
|
if await page.locator('#immo-mediation-notice').is_visible():
|
|
await r.step_snap(page, "abort.deactivated", status="warn")
|
|
return ApplicationResult(False, message=_("ad_deactivated"))
|
|
|
|
await r.step_snap(page, "form.open")
|
|
await page.get_by_role("button", name="Anfrage senden").first.click()
|
|
|
|
await r.step_snap(page, "form.senior_check")
|
|
if await self._is_senior_flat(page):
|
|
await r.step_snap(page, "abort.senior_flat", status="warn")
|
|
return ApplicationResult(False, _("senior_flat"))
|
|
|
|
await r.step_snap(page, "form.special_wbs_check")
|
|
if await self._is_special_needs_wbs(page):
|
|
await r.step_snap(page, "abort.special_wbs", status="warn")
|
|
return ApplicationResult(False, _("special_need_wbs_flat"))
|
|
|
|
await r.step_snap(page, "iframe.locate")
|
|
form_iframe = page.frame_locator("#contact-iframe")
|
|
|
|
async def fill_field(locator, filling):
|
|
field = form_iframe.locator(locator)
|
|
if await field.is_visible():
|
|
await field.fill(filling)
|
|
await page.wait_for_timeout(100)
|
|
|
|
async def select_field(locator, selection):
|
|
field = form_iframe.locator(locator)
|
|
if await field.is_visible():
|
|
await field.click()
|
|
await page.wait_for_timeout(100)
|
|
await form_iframe.get_by_role("option", name=selection, exact=True).click()
|
|
await page.wait_for_timeout(100)
|
|
|
|
async def check_checkbox(locator):
|
|
field = form_iframe.locator(locator)
|
|
if await field.first.is_visible():
|
|
await field.evaluate_all("elements => elements.forEach(el => el.click())")
|
|
await page.wait_for_timeout(100)
|
|
|
|
async def upload_files(locator, files=None):
|
|
if not files:
|
|
create_dummy_pdf()
|
|
files = ["DummyPDF.pdf"]
|
|
sec = form_iframe.locator(locator)
|
|
if await sec.count() > 0:
|
|
await sec.locator("input[type='file']").set_input_files(files)
|
|
await page.wait_for_timeout(2000)
|
|
|
|
await r.step_snap(page, "form.fill_personal")
|
|
await select_field("#salutation-dropdown", p.salutation)
|
|
await fill_field("#firstName", p.firstname)
|
|
await fill_field("#lastName", p.lastname)
|
|
await fill_field("#email", p.email)
|
|
await fill_field("#phone-number", p.telephone)
|
|
await fill_field("#street", p.street)
|
|
await fill_field("#house-number", p.house_number)
|
|
await fill_field("#zip-code", p.postcode)
|
|
await fill_field("#city", p.city)
|
|
await fill_field("input[id*='anzahl_erwachsene']", str(p.adult_count))
|
|
await fill_field("input[id*='anzahl_kinder']", str(p.children_count))
|
|
await fill_field("input[id*='gesamtzahl_der_einziehenden_personen']", str(p.person_count))
|
|
|
|
await r.step_snap(page, "form.wbs_fill")
|
|
await check_checkbox("[data-cy*='wbs_available'][data-cy*='-Ja']")
|
|
await fill_field("input[id*='wbs_valid_until']", p.wbs_valid_till_dt().strftime("%d.%m.%Y"))
|
|
await select_field("input[id*='wbs_max_number_rooms']", f"{p.wbs_rooms} Räume")
|
|
await select_field("input[id*='art_bezeichnung_des_wbs']", f"WBS {p.wbs_type}")
|
|
await select_field("input[id*='fuer_wen']", "Für mich selbst")
|
|
await fill_field("input[id*='telephone_number']", p.telephone)
|
|
await check_checkbox("input[id*='datenschutzhinweis']")
|
|
await upload_files("el-application-form-document-upload", files=None)
|
|
await r.step_snap(page, "form.filled")
|
|
|
|
if not ctx.submit_forms:
|
|
await r.step_snap(page, "submit.dry_run")
|
|
return ApplicationResult(True, _("application_success_dry"))
|
|
|
|
await r.step_snap(page, "submit.click")
|
|
await form_iframe.get_by_role("button", name="Anfrage versenden").click()
|
|
await page.wait_for_timeout(5000)
|
|
|
|
await r.step_snap(page, "success.check", detail=page.url)
|
|
if page.url.startswith("https://www.gewobag.de/daten-uebermittelt/"):
|
|
return ApplicationResult(True)
|
|
elif await self._is_missing_fields_warning(page):
|
|
await r.step_snap(page, "missing_fields_detected", status="warn")
|
|
return ApplicationResult(False, _("missing_fields"))
|
|
await r.step_snap(page, "success.not_found", status="warn")
|
|
return ApplicationResult(False, _("submit_conformation_msg_not_found"))
|
|
|
|
async def _is_senior_flat(self, page):
|
|
f = page.frame_locator("#contact-iframe")
|
|
return await f.locator("label[for*='mindestalter_seniorenwohnhaus_erreicht']").first.is_visible()
|
|
|
|
async def _is_special_needs_wbs(self, page):
|
|
f = page.frame_locator("#contact-iframe")
|
|
return await f.locator("label[for*='wbs_mit_besonderem_wohnbedarf_vorhanden']").first.is_visible()
|
|
|
|
async def _is_missing_fields_warning(self, page):
|
|
f = page.frame_locator("#contact-iframe")
|
|
msg = f.locator("span.ant-alert-message",
|
|
has_text="Es wurden nicht alle Felder korrekt befüllt.")
|
|
return await msg.first.is_visible()
|