enrichment: drop LLM for structured info, dedup images by sha + phash
Per user request, the LLM is no longer asked to extract rooms/size/rent/WBS — those come from the inberlinwohnen.de scraper which is reliable. Haiku is now used for one narrow job: pick which <img> URLs from the listing page are actual flat photos (vs. logos, badges, ads, employee portraits). On any LLM failure the unfiltered candidate list passes through. Image dedup runs in two tiers: 1. SHA256 of bytes — drops different URLs that point to byte-identical files 2. Perceptual hash (Pillow + imagehash, Hamming distance ≤ 5) — drops the "same image at a different resolution" duplicates from srcset / CDN variants that were filling galleries with 2–4× copies UI: - Wohnungsliste falls back to scraper-only display (rooms/size/rent/wbs) - Detail panel only shows images + "Zur Original-Anzeige →"; description / features / pros & cons / kv table are gone - Per-row "erneut versuchen" link + the "analysiert…/?" status chips were tied to LLM extraction and are removed; the header "Bilder nachladen (N)" button still surfaces pending/failed batches for admins Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
374368e4af
commit
0aa4c6c2bb
6 changed files with 137 additions and 233 deletions
|
|
@ -1,19 +1,20 @@
|
|||
"""Flat-enrichment pipeline.
|
||||
"""Image enrichment pipeline.
|
||||
|
||||
For each new flat we:
|
||||
1. Ask the apply service to fetch the listing via Playwright (bypasses bot guards)
|
||||
2. Feed the HTML to Haiku via `llm.extract_flat_details` → structured dict
|
||||
3. Download each image URL directly into /data/flats/<slug>/NN.<ext>
|
||||
4. Persist result on the flat row (enrichment_json + image_count + status)
|
||||
2. Optionally let Haiku narrow the <img> URL list down to actual flat photos
|
||||
3. Download the images into /data/flats/<slug>/NN.<ext>, deduplicating by
|
||||
exact bytes (SHA256) and visual similarity (perceptual hash)
|
||||
|
||||
Kicked as a detached asyncio task from /internal/flats so scraping stays fast.
|
||||
Every failure is caught, stashed in enrichment_json as {"_error": "...", ...}
|
||||
and mirrored into the errors log so /logs/protokoll explains what went wrong.
|
||||
Structured fields (rooms / size / rent / WBS) come from the inberlinwohnen.de
|
||||
scraper and are not re-derived by the LLM. Failures land in
|
||||
enrichment_status='failed' with the reason in enrichment_json._error.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import hashlib
|
||||
import io
|
||||
import logging
|
||||
import mimetypes
|
||||
import os
|
||||
|
|
@ -33,12 +34,12 @@ IMAGES_DIR = DATA_DIR / "flats"
|
|||
IMAGES_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
MAX_IMAGES = 12
|
||||
MAX_IMAGE_BYTES = 3_000_000 # 3 MB per image
|
||||
MAX_IMAGE_BYTES = 5_000_000
|
||||
IMAGE_TIMEOUT = 15
|
||||
PHASH_DUPLICATE_THRESHOLD = 5 # Hamming distance ≤ N → considered the same picture
|
||||
|
||||
|
||||
class EnrichmentError(Exception):
|
||||
"""Raised by each pipeline step with a human-readable reason."""
|
||||
def __init__(self, step: str, reason: str):
|
||||
self.step = step
|
||||
self.reason = reason
|
||||
|
|
@ -46,7 +47,6 @@ class EnrichmentError(Exception):
|
|||
|
||||
|
||||
def flat_slug(flat_id: str) -> str:
|
||||
"""Filesystem-safe short identifier for a flat (IDs are URLs)."""
|
||||
return hashlib.sha1(flat_id.encode("utf-8")).hexdigest()[:16]
|
||||
|
||||
|
||||
|
|
@ -77,25 +77,50 @@ def _fetch_listing(url: str) -> dict:
|
|||
raise EnrichmentError("fetch", "apply returned non-JSON response")
|
||||
|
||||
|
||||
def _ext_from_response(resp: requests.Response, url: str) -> str:
|
||||
ct = resp.headers.get("content-type", "").split(";")[0].strip().lower()
|
||||
def _ext_for(content_type: str, url: str) -> str:
|
||||
ct = content_type.split(";")[0].strip().lower()
|
||||
if ct:
|
||||
ext = mimetypes.guess_extension(ct) or ""
|
||||
if ext:
|
||||
return ext.replace(".jpe", ".jpg")
|
||||
path = urlparse(url).path
|
||||
_, ext = os.path.splitext(path)
|
||||
_, ext = os.path.splitext(urlparse(url).path)
|
||||
return ext.lower() or ".jpg"
|
||||
|
||||
|
||||
def _phash(data: bytes):
|
||||
"""Return an imagehash.ImageHash for the bytes, or None if unsupported."""
|
||||
try:
|
||||
from PIL import Image
|
||||
import imagehash
|
||||
except ImportError:
|
||||
return None
|
||||
try:
|
||||
with Image.open(io.BytesIO(data)) as img:
|
||||
img.load()
|
||||
return imagehash.phash(img)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def _download_images(flat_id: str, urls: list[str], referer: str) -> int:
|
||||
"""Download up to MAX_IMAGES distinct flat photos. Dedup tiers:
|
||||
1. Skip URLs that fail / aren't images
|
||||
2. Skip exact byte-equal duplicates (different URL pointing to same file)
|
||||
3. Skip visually-equivalent images (same picture re-encoded at a
|
||||
different size — common with srcset/CDN variants)
|
||||
"""
|
||||
d = flat_image_dir(flat_id)
|
||||
for old in d.iterdir():
|
||||
try: old.unlink()
|
||||
except OSError: pass
|
||||
|
||||
seen_sha: set[str] = set()
|
||||
seen_phash: list = [] # list of imagehash objects
|
||||
|
||||
saved = 0
|
||||
for raw_url in urls[:MAX_IMAGES]:
|
||||
for raw_url in urls:
|
||||
if saved >= MAX_IMAGES:
|
||||
break
|
||||
try:
|
||||
r = requests.get(
|
||||
raw_url,
|
||||
|
|
@ -109,48 +134,53 @@ def _download_images(flat_id: str, urls: list[str], referer: str) -> int:
|
|||
ct = r.headers.get("content-type", "").split(";")[0].strip().lower()
|
||||
if not ct.startswith("image/"):
|
||||
continue
|
||||
ext = _ext_from_response(r, raw_url)
|
||||
path = d / f"{saved + 1:02d}{ext}"
|
||||
total = 0
|
||||
with open(path, "wb") as f:
|
||||
for chunk in r.iter_content(chunk_size=65_536):
|
||||
if not chunk:
|
||||
continue
|
||||
total += len(chunk)
|
||||
if total > MAX_IMAGE_BYTES:
|
||||
break
|
||||
f.write(chunk)
|
||||
if total == 0:
|
||||
path.unlink(missing_ok=True)
|
||||
data = bytearray()
|
||||
for chunk in r.iter_content(chunk_size=65_536):
|
||||
if not chunk:
|
||||
continue
|
||||
data.extend(chunk)
|
||||
if len(data) > MAX_IMAGE_BYTES:
|
||||
break
|
||||
if not data:
|
||||
continue
|
||||
saved += 1
|
||||
except requests.RequestException as e:
|
||||
logger.info("image download failed %s: %s", raw_url, e)
|
||||
continue
|
||||
|
||||
data_bytes = bytes(data)
|
||||
|
||||
sha = hashlib.sha256(data_bytes).hexdigest()
|
||||
if sha in seen_sha:
|
||||
continue
|
||||
seen_sha.add(sha)
|
||||
|
||||
ph = _phash(data_bytes)
|
||||
if ph is not None:
|
||||
if any((ph - prev) <= PHASH_DUPLICATE_THRESHOLD for prev in seen_phash):
|
||||
continue
|
||||
seen_phash.append(ph)
|
||||
|
||||
ext = _ext_for(ct, raw_url)
|
||||
path = d / f"{saved + 1:02d}{ext}"
|
||||
path.write_bytes(data_bytes)
|
||||
saved += 1
|
||||
|
||||
return saved
|
||||
|
||||
|
||||
def enrich_flat_sync(flat_id: str) -> None:
|
||||
"""Run the full enrichment pipeline for one flat. Blocking."""
|
||||
flat = db.get_flat(flat_id)
|
||||
if not flat:
|
||||
return
|
||||
url = flat["link"]
|
||||
logger.info("enrich start flat=%s url=%s", flat_id, url)
|
||||
logger.info("enrich start flat=%s", flat_id)
|
||||
|
||||
try:
|
||||
listing = _fetch_listing(url)
|
||||
html = listing.get("html") or ""
|
||||
final_url = listing.get("final_url") or url
|
||||
if not html.strip():
|
||||
raise EnrichmentError("fetch", "apply returned empty HTML")
|
||||
|
||||
details = llm.extract_flat_details(html, final_url)
|
||||
if details is None:
|
||||
raise EnrichmentError("llm", "model returned no tool_use or call failed (see web logs)")
|
||||
|
||||
image_urls = listing.get("image_urls") or []
|
||||
image_count = _download_images(flat_id, image_urls, referer=url)
|
||||
candidates = listing.get("image_urls") or []
|
||||
if candidates:
|
||||
candidates = llm.select_flat_image_urls(candidates, listing.get("final_url") or url)
|
||||
image_count = _download_images(flat_id, candidates, referer=url)
|
||||
except EnrichmentError as e:
|
||||
_record_failure(flat_id, e.step, e.reason)
|
||||
return
|
||||
|
|
@ -159,7 +189,7 @@ def enrich_flat_sync(flat_id: str) -> None:
|
|||
_record_failure(flat_id, "crash", f"{type(e).__name__}: {e}")
|
||||
return
|
||||
|
||||
db.set_flat_enrichment(flat_id, "ok", enrichment=details, image_count=image_count)
|
||||
db.set_flat_enrichment(flat_id, "ok", enrichment=None, image_count=image_count)
|
||||
logger.info("enrich done flat=%s images=%d", flat_id, image_count)
|
||||
|
||||
|
||||
|
|
@ -179,7 +209,6 @@ def _record_failure(flat_id: str, step: str, reason: str) -> None:
|
|||
|
||||
|
||||
def kick(flat_id: str) -> None:
|
||||
"""Fire-and-forget enrichment in a background thread."""
|
||||
asyncio.create_task(asyncio.to_thread(enrich_flat_sync, flat_id))
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue