laendleimmo-scraper/scraper.py

461 lines
15 KiB
Python

#!/usr/bin/env python3
"""laendleimmo.at scraper — finds listings and auto-submits the contact form."""
import argparse
import json
import sys
import time
import urllib.parse
from pathlib import Path
from typing import Optional
from playwright.sync_api import (
Browser,
Page,
sync_playwright,
TimeoutError as PWTimeout,
)
BASE_URL = "https://www.laendleimmo.at"
CONFIG_FILE = Path(__file__).parent / "contact_config.json"
CONTACTED_FILE = Path(__file__).parent / "contacted.json"
# --------------------------------------------------------------------------- #
# Config / state helpers
# --------------------------------------------------------------------------- #
def load_config() -> dict:
if not CONFIG_FILE.exists():
sys.exit(
f"[!] Config not found at {CONFIG_FILE}\n"
" Run: python configure.py"
)
with open(CONFIG_FILE, encoding="utf-8") as f:
return json.load(f)
def load_contacted() -> set:
if not CONTACTED_FILE.exists():
return set()
with open(CONTACTED_FILE, encoding="utf-8") as f:
return set(json.load(f))
def save_contacted(contacted: set) -> None:
with open(CONTACTED_FILE, "w", encoding="utf-8") as f:
json.dump(sorted(contacted), f, indent=2)
# --------------------------------------------------------------------------- #
# URL construction
# --------------------------------------------------------------------------- #
def build_search_urls(args) -> list[str]:
prop_types: list[str] = []
if args.flat:
prop_types.append("wohnung")
if args.house:
prop_types.append("haus")
if not prop_types:
prop_types = ["wohnung", "haus"]
marketing_types: list[Optional[str]] = []
if args.rent:
marketing_types.append("Mietobjekt")
if args.buy:
marketing_types.append("Kaufobjekt")
if not marketing_types:
marketing_types = [None] # no filter → all transaction types
urls: list[str] = []
for prop_type in prop_types:
for marketing_type in marketing_types:
params: list[tuple[str, str]] = []
if args.minrooms is not None:
params.append(("f[noOfRooms@f]", str(float(args.minrooms))))
if args.maxprice is not None:
params.append(("f[price@t]", str(args.maxprice)))
if marketing_type:
params.append(("f[marketingType]", marketing_type))
base = f"{BASE_URL}/{prop_type}/vorarlberg"
if params:
qs = urllib.parse.urlencode(params, quote_via=urllib.parse.quote)
urls.append(f"{base}?{qs}")
else:
urls.append(base)
return urls
# --------------------------------------------------------------------------- #
# Scraping
# --------------------------------------------------------------------------- #
def scrape_listings(page: Page, url: str) -> tuple[list[dict], Optional[str]]:
"""Return (listings_on_page, next_page_url_or_None)."""
try:
page.goto(url, wait_until="networkidle", timeout=30_000)
except PWTimeout:
page.goto(url, wait_until="domcontentloaded", timeout=30_000)
listings: list[dict] = []
seen: set[str] = set()
for a in page.query_selector_all("a[href*='/immobilien/']"):
href: Optional[str] = a.get_attribute("href")
if not href or href in seen:
continue
seen.add(href)
full_url = BASE_URL + href if href.startswith("/") else href
snippet = (a.inner_text() or "").strip().replace("\n", " ")[:120]
listings.append({"url": full_url, "snippet": snippet})
# Next-page link — try common patterns
next_url: Optional[str] = None
for sel in [
"a[aria-label='Nächste Seite']",
"a[aria-label='Next']",
"a[rel='next']",
".pagination a:last-child",
"a.next",
]:
el = page.query_selector(sel)
if el:
href = el.get_attribute("href")
if href and href != "#":
next_url = BASE_URL + href if href.startswith("/") else href
break
return listings, next_url
def collect_all_listings(
page: Page,
search_urls: list[str],
max_listings: int,
already_contacted: set,
) -> list[dict]:
all_new: list[dict] = []
for search_url in search_urls:
url: Optional[str] = search_url
page_num = 1
while url and len(all_new) < max_listings:
print(f" [scrape] page {page_num}: {url}")
listings, next_url = scrape_listings(page, url)
new = [l for l in listings if l["url"] not in already_contacted]
all_new.extend(new)
print(f" {len(listings)} found, {len(new)} new (total new: {len(all_new)})")
if len(all_new) >= max_listings:
break
url = next_url
page_num += 1
time.sleep(1.5)
return all_new[:max_listings]
# --------------------------------------------------------------------------- #
# Contact form submission
# --------------------------------------------------------------------------- #
def _try_fill(page: Page, selectors: list[str], value: str) -> bool:
for sel in selectors:
try:
el = page.query_selector(sel)
if el and el.is_visible():
el.fill(value)
return True
except Exception:
pass
return False
def _try_check(page: Page, selectors: list[str]) -> None:
for sel in selectors:
try:
for cb in page.query_selector_all(sel):
if cb.is_visible() and not cb.is_checked():
cb.check()
return
except Exception:
pass
def login(page: Page, config: dict) -> bool:
"""Attempt site login if credentials are configured. Returns True on success."""
email = config.get("login_email", "")
password = config.get("login_password", "")
if not email or not password:
return False
page.goto(f"{BASE_URL}/user/login", wait_until="networkidle", timeout=30_000)
for sel in ["input[type='email']", "input[name='email']", "input[name='username']"]:
if _try_fill(page, [sel], email):
break
for sel in ["input[type='password']", "input[name='password']"]:
if _try_fill(page, [sel], password):
break
for sel in [
"button[type='submit']",
"input[type='submit']",
"button:has-text('Anmelden')",
"button:has-text('Login')",
]:
btn = page.query_selector(sel)
if btn and btn.is_visible():
btn.click()
try:
page.wait_for_load_state("networkidle", timeout=10_000)
except PWTimeout:
pass
return True
return False
def submit_contact_form(page: Page, listing_url: str, config: dict) -> bool:
"""Navigate to listing and submit 'Anbieter kontaktieren' form."""
try:
page.goto(listing_url, wait_until="networkidle", timeout=30_000)
except PWTimeout:
try:
page.goto(listing_url, wait_until="domcontentloaded", timeout=30_000)
except PWTimeout:
print(" [!] Page load timed out")
return False
# Scroll to bottom to trigger lazy-loaded form
page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
time.sleep(1.5)
message = config.get("message", "")
# Personal data fields — may or may not be present (absent when logged in)
_try_fill(
page,
[
"input[name='name']",
"input[name='fullName']",
"input[name='contactName']",
"input[placeholder*='Name']",
"input[id*='name']:not([type='hidden'])",
],
config.get("name", ""),
)
_try_fill(
page,
[
"input[type='email']",
"input[name='email']",
"input[name='contactEmail']",
"input[placeholder*='Mail']",
],
config.get("email", ""),
)
_try_fill(
page,
[
"input[name='phone']",
"input[name='telefon']",
"input[name='tel']",
"input[type='tel']",
"input[placeholder*='Telefon']",
"input[placeholder*='Phone']",
],
config.get("phone", ""),
)
# Message textarea
filled_msg = False
for sel in [
"textarea[name='message']",
"textarea[name='nachricht']",
"textarea[name='body']",
"textarea[id*='message']",
"textarea[id*='nachricht']",
]:
if _try_fill(page, [sel], message):
filled_msg = True
break
if not filled_msg:
textareas = page.query_selector_all("textarea")
for ta in textareas:
if ta.is_visible():
ta.fill(message)
filled_msg = True
break
if not filled_msg:
print(" [!] No message field found — form structure may have changed")
return False
# Required consent checkbox
_try_check(
page,
[
"input[type='checkbox'][name*='zustimm']",
"input[type='checkbox'][name*='consent']",
"input[type='checkbox'][name*='datenschutz']",
"input[type='checkbox'][name*='agree']",
"input[type='checkbox'][name*='accept']",
],
)
# All remaining visible required checkboxes
for cb in page.query_selector_all("input[type='checkbox'][required]"):
if cb.is_visible() and not cb.is_checked():
cb.check()
# Submit
for sel in [
"button:has-text('Anfrage senden')",
"button:has-text('Anfrage absenden')",
"button:has-text('Senden')",
"button[type='submit']",
"input[type='submit']",
]:
btn = page.query_selector(sel)
if btn and btn.is_visible():
btn.click()
try:
page.wait_for_load_state("networkidle", timeout=15_000)
except PWTimeout:
pass
# Check for success text
body = page.content().lower()
if any(
kw in body
for kw in ["erfolgreich", "gesendet", "danke", "thank you", "wurde verschickt"]
):
return True
# Check for CAPTCHA
if any(kw in body for kw in ["captcha", "robot", "recaptcha"]):
print(" [!] CAPTCHA detected — manual action required")
return False
# Assume success if no obvious error page
return "error" not in body and "fehler" not in body
print(" [!] Submit button not found")
return False
# --------------------------------------------------------------------------- #
# Main
# --------------------------------------------------------------------------- #
def main() -> None:
parser = argparse.ArgumentParser(
description="Scrape laendleimmo.at and auto-contact matching listings.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s --flat --rent --minrooms 2 --maxprice 1200
%(prog)s --house --buy --minrooms 4 --maxprice 500000 --dry-run
%(prog)s --flat --rent --maxprice 900 --no-headless
""",
)
parser.add_argument("--minrooms", type=int, metavar="N", help="Minimum number of rooms")
parser.add_argument("--maxprice", type=int, metavar="EUR", help="Maximum price in €")
parser.add_argument("--house", action="store_true", help="Search for houses (Haus)")
parser.add_argument("--flat", action="store_true", help="Search for flats/apartments (Wohnung)")
parser.add_argument("--rent", action="store_true", help="Rentals only (Mietobjekte)")
parser.add_argument("--buy", action="store_true", help="Purchases only (Kaufobjekte)")
parser.add_argument(
"--dry-run", action="store_true",
help="List matches without sending any contact forms",
)
parser.add_argument(
"--max-listings", type=int, default=50, metavar="N",
help="Max new listings to process (default: 50)",
)
parser.add_argument(
"--delay", type=float, default=3.0, metavar="SEC",
help="Seconds between contact form submissions (default: 3)",
)
parser.add_argument(
"--headless", action="store_true", default=True,
help="Run browser headlessly (default)",
)
parser.add_argument(
"--no-headless", dest="headless", action="store_false",
help="Show browser window (useful for debugging / CAPTCHA solving)",
)
parser.add_argument(
"--reset", action="store_true",
help="Clear the contacted.json history and start fresh",
)
args = parser.parse_args()
if args.reset:
if CONTACTED_FILE.exists():
CONTACTED_FILE.unlink()
print("[i] contacted.json cleared.")
config = load_config()
contacted = load_contacted()
search_urls = build_search_urls(args)
print("Search URLs:")
for u in search_urls:
print(f" {u}")
print(f"Already contacted: {len(contacted)} listings")
print()
with sync_playwright() as pw:
browser: Browser = pw.chromium.launch(headless=args.headless)
ctx = browser.new_context(
user_agent=(
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
),
)
page = ctx.new_page()
# Optional login
if config.get("login_email"):
print("[i] Logging in...")
ok = login(page, config)
print(f" {'[OK] Logged in' if ok else '[!] Login failed — proceeding without login'}")
print()
# Collect listings
print("[i] Scraping listings...")
listings = collect_all_listings(page, search_urls, args.max_listings, contacted)
print(f"\n[i] {len(listings)} new listings to process.\n")
if not listings:
print("Nothing new to contact.")
browser.close()
return
if args.dry_run:
print("Dry-run mode — no forms will be submitted.\n")
for i, listing in enumerate(listings, 1):
print(f" {i:3}. {listing['url']}")
if listing["snippet"]:
print(f" {listing['snippet']}")
else:
success_count = 0
for i, listing in enumerate(listings, 1):
url = listing["url"]
print(f"[{i}/{len(listings)}] {url}")
ok = submit_contact_form(page, url, config)
if ok:
contacted.add(url)
save_contacted(contacted)
success_count += 1
print(" [OK] Contact form submitted")
else:
print(" [FAIL] Could not submit form")
if i < len(listings):
time.sleep(args.delay)
print(f"\nDone. {success_count}/{len(listings)} forms submitted.")
browser.close()
if __name__ == "__main__":
main()