#!/usr/bin/env python3 """laendleimmo.at scraper — finds listings and auto-submits the contact form.""" import argparse import json import sys import time import urllib.parse from pathlib import Path from typing import Optional from playwright.sync_api import ( Browser, Page, sync_playwright, TimeoutError as PWTimeout, ) BASE_URL = "https://www.laendleimmo.at" CONFIG_FILE = Path(__file__).parent / "contact_config.json" CONTACTED_FILE = Path(__file__).parent / "contacted.json" # --------------------------------------------------------------------------- # # Config / state helpers # --------------------------------------------------------------------------- # def load_config() -> dict: if not CONFIG_FILE.exists(): sys.exit( f"[!] Config not found at {CONFIG_FILE}\n" " Run: python configure.py" ) with open(CONFIG_FILE, encoding="utf-8") as f: return json.load(f) def load_contacted() -> set: if not CONTACTED_FILE.exists(): return set() with open(CONTACTED_FILE, encoding="utf-8") as f: return set(json.load(f)) def save_contacted(contacted: set) -> None: with open(CONTACTED_FILE, "w", encoding="utf-8") as f: json.dump(sorted(contacted), f, indent=2) # --------------------------------------------------------------------------- # # URL construction # --------------------------------------------------------------------------- # def build_search_urls(args) -> list[str]: prop_types: list[str] = [] if args.flat: prop_types.append("wohnung") if args.house: prop_types.append("haus") if not prop_types: prop_types = ["wohnung", "haus"] marketing_types: list[Optional[str]] = [] if args.rent: marketing_types.append("Mietobjekt") if args.buy: marketing_types.append("Kaufobjekt") if not marketing_types: marketing_types = [None] # no filter → all transaction types urls: list[str] = [] for prop_type in prop_types: for marketing_type in marketing_types: params: list[tuple[str, str]] = [] if args.minrooms is not None: params.append(("f[noOfRooms@f]", str(float(args.minrooms)))) if args.maxprice is not None: params.append(("f[price@t]", str(args.maxprice))) if marketing_type: params.append(("f[marketingType]", marketing_type)) base = f"{BASE_URL}/{prop_type}/vorarlberg" if params: qs = urllib.parse.urlencode(params, quote_via=urllib.parse.quote) urls.append(f"{base}?{qs}") else: urls.append(base) return urls # --------------------------------------------------------------------------- # # Scraping # --------------------------------------------------------------------------- # def scrape_listings(page: Page, url: str) -> tuple[list[dict], Optional[str]]: """Return (listings_on_page, next_page_url_or_None).""" try: page.goto(url, wait_until="networkidle", timeout=30_000) except PWTimeout: page.goto(url, wait_until="domcontentloaded", timeout=30_000) listings: list[dict] = [] seen: set[str] = set() for a in page.query_selector_all("a[href*='/immobilien/']"): href: Optional[str] = a.get_attribute("href") if not href or href in seen: continue seen.add(href) full_url = BASE_URL + href if href.startswith("/") else href snippet = (a.inner_text() or "").strip().replace("\n", " ")[:120] listings.append({"url": full_url, "snippet": snippet}) # Next-page link — try common patterns next_url: Optional[str] = None for sel in [ "a[aria-label='Nächste Seite']", "a[aria-label='Next']", "a[rel='next']", ".pagination a:last-child", "a.next", ]: el = page.query_selector(sel) if el: href = el.get_attribute("href") if href and href != "#": next_url = BASE_URL + href if href.startswith("/") else href break return listings, next_url def collect_all_listings( page: Page, search_urls: list[str], max_listings: int, already_contacted: set, ) -> list[dict]: all_new: list[dict] = [] for search_url in search_urls: url: Optional[str] = search_url page_num = 1 while url and len(all_new) < max_listings: print(f" [scrape] page {page_num}: {url}") listings, next_url = scrape_listings(page, url) new = [l for l in listings if l["url"] not in already_contacted] all_new.extend(new) print(f" {len(listings)} found, {len(new)} new (total new: {len(all_new)})") if len(all_new) >= max_listings: break url = next_url page_num += 1 time.sleep(1.5) return all_new[:max_listings] # --------------------------------------------------------------------------- # # Contact form submission # --------------------------------------------------------------------------- # def _try_fill(page: Page, selectors: list[str], value: str) -> bool: for sel in selectors: try: el = page.query_selector(sel) if el and el.is_visible(): el.fill(value) return True except Exception: pass return False def _try_check(page: Page, selectors: list[str]) -> None: for sel in selectors: try: for cb in page.query_selector_all(sel): if cb.is_visible() and not cb.is_checked(): cb.check() return except Exception: pass def login(page: Page, config: dict) -> bool: """Attempt site login if credentials are configured. Returns True on success.""" email = config.get("login_email", "") password = config.get("login_password", "") if not email or not password: return False page.goto(f"{BASE_URL}/user/login", wait_until="networkidle", timeout=30_000) for sel in ["input[type='email']", "input[name='email']", "input[name='username']"]: if _try_fill(page, [sel], email): break for sel in ["input[type='password']", "input[name='password']"]: if _try_fill(page, [sel], password): break for sel in [ "button[type='submit']", "input[type='submit']", "button:has-text('Anmelden')", "button:has-text('Login')", ]: btn = page.query_selector(sel) if btn and btn.is_visible(): btn.click() try: page.wait_for_load_state("networkidle", timeout=10_000) except PWTimeout: pass return True return False def submit_contact_form(page: Page, listing_url: str, config: dict) -> bool: """Navigate to listing and submit 'Anbieter kontaktieren' form.""" try: page.goto(listing_url, wait_until="networkidle", timeout=30_000) except PWTimeout: try: page.goto(listing_url, wait_until="domcontentloaded", timeout=30_000) except PWTimeout: print(" [!] Page load timed out") return False # Scroll to bottom to trigger lazy-loaded form page.evaluate("window.scrollTo(0, document.body.scrollHeight)") time.sleep(1.5) message = config.get("message", "") # Personal data fields — may or may not be present (absent when logged in) _try_fill( page, [ "input[name='name']", "input[name='fullName']", "input[name='contactName']", "input[placeholder*='Name']", "input[id*='name']:not([type='hidden'])", ], config.get("name", ""), ) _try_fill( page, [ "input[type='email']", "input[name='email']", "input[name='contactEmail']", "input[placeholder*='Mail']", ], config.get("email", ""), ) _try_fill( page, [ "input[name='phone']", "input[name='telefon']", "input[name='tel']", "input[type='tel']", "input[placeholder*='Telefon']", "input[placeholder*='Phone']", ], config.get("phone", ""), ) # Message textarea filled_msg = False for sel in [ "textarea[name='message']", "textarea[name='nachricht']", "textarea[name='body']", "textarea[id*='message']", "textarea[id*='nachricht']", ]: if _try_fill(page, [sel], message): filled_msg = True break if not filled_msg: textareas = page.query_selector_all("textarea") for ta in textareas: if ta.is_visible(): ta.fill(message) filled_msg = True break if not filled_msg: print(" [!] No message field found — form structure may have changed") return False # Required consent checkbox _try_check( page, [ "input[type='checkbox'][name*='zustimm']", "input[type='checkbox'][name*='consent']", "input[type='checkbox'][name*='datenschutz']", "input[type='checkbox'][name*='agree']", "input[type='checkbox'][name*='accept']", ], ) # All remaining visible required checkboxes for cb in page.query_selector_all("input[type='checkbox'][required]"): if cb.is_visible() and not cb.is_checked(): cb.check() # Submit for sel in [ "button:has-text('Anfrage senden')", "button:has-text('Anfrage absenden')", "button:has-text('Senden')", "button[type='submit']", "input[type='submit']", ]: btn = page.query_selector(sel) if btn and btn.is_visible(): btn.click() try: page.wait_for_load_state("networkidle", timeout=15_000) except PWTimeout: pass # Check for success text body = page.content().lower() if any( kw in body for kw in ["erfolgreich", "gesendet", "danke", "thank you", "wurde verschickt"] ): return True # Check for CAPTCHA if any(kw in body for kw in ["captcha", "robot", "recaptcha"]): print(" [!] CAPTCHA detected — manual action required") return False # Assume success if no obvious error page return "error" not in body and "fehler" not in body print(" [!] Submit button not found") return False # --------------------------------------------------------------------------- # # Main # --------------------------------------------------------------------------- # def main() -> None: parser = argparse.ArgumentParser( description="Scrape laendleimmo.at and auto-contact matching listings.", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: %(prog)s --flat --rent --minrooms 2 --maxprice 1200 %(prog)s --house --buy --minrooms 4 --maxprice 500000 --dry-run %(prog)s --flat --rent --maxprice 900 --no-headless """, ) parser.add_argument("--minrooms", type=int, metavar="N", help="Minimum number of rooms") parser.add_argument("--maxprice", type=int, metavar="EUR", help="Maximum price in €") parser.add_argument("--house", action="store_true", help="Search for houses (Haus)") parser.add_argument("--flat", action="store_true", help="Search for flats/apartments (Wohnung)") parser.add_argument("--rent", action="store_true", help="Rentals only (Mietobjekte)") parser.add_argument("--buy", action="store_true", help="Purchases only (Kaufobjekte)") parser.add_argument( "--dry-run", action="store_true", help="List matches without sending any contact forms", ) parser.add_argument( "--max-listings", type=int, default=50, metavar="N", help="Max new listings to process (default: 50)", ) parser.add_argument( "--delay", type=float, default=3.0, metavar="SEC", help="Seconds between contact form submissions (default: 3)", ) parser.add_argument( "--headless", action="store_true", default=True, help="Run browser headlessly (default)", ) parser.add_argument( "--no-headless", dest="headless", action="store_false", help="Show browser window (useful for debugging / CAPTCHA solving)", ) parser.add_argument( "--reset", action="store_true", help="Clear the contacted.json history and start fresh", ) args = parser.parse_args() if args.reset: if CONTACTED_FILE.exists(): CONTACTED_FILE.unlink() print("[i] contacted.json cleared.") config = load_config() contacted = load_contacted() search_urls = build_search_urls(args) print("Search URLs:") for u in search_urls: print(f" {u}") print(f"Already contacted: {len(contacted)} listings") print() with sync_playwright() as pw: browser: Browser = pw.chromium.launch(headless=args.headless) ctx = browser.new_context( user_agent=( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/124.0.0.0 Safari/537.36" ), ) page = ctx.new_page() # Optional login if config.get("login_email"): print("[i] Logging in...") ok = login(page, config) print(f" {'[OK] Logged in' if ok else '[!] Login failed — proceeding without login'}") print() # Collect listings print("[i] Scraping listings...") listings = collect_all_listings(page, search_urls, args.max_listings, contacted) print(f"\n[i] {len(listings)} new listings to process.\n") if not listings: print("Nothing new to contact.") browser.close() return if args.dry_run: print("Dry-run mode — no forms will be submitted.\n") for i, listing in enumerate(listings, 1): print(f" {i:3}. {listing['url']}") if listing["snippet"]: print(f" {listing['snippet']}") else: success_count = 0 for i, listing in enumerate(listings, 1): url = listing["url"] print(f"[{i}/{len(listings)}] {url}") ok = submit_contact_form(page, url, config) if ok: contacted.add(url) save_contacted(contacted) success_count += 1 print(" [OK] Contact form submitted") else: print(" [FAIL] Could not submit form") if i < len(listings): time.sleep(args.delay) print(f"\nDone. {success_count}/{len(listings)} forms submitted.") browser.close() if __name__ == "__main__": main()