"""Точка входа парсера лидов. ПРИМЕРЫ ЗАПУСКА: # Полный pipeline (парсинг + Tier 2 + ЕГРЮЛ + rescore + export) одной командой python main.py --full --category "автосервис,салон красоты" --limit 30 # Парсинг 1 категории на Я.Картах python main.py --source yandex --category "кафе" --limit 30 # HH.ru — компании которые ищут "руки" (нет CRM = +3 hh_signal в скоринг) python main.py --source hh # все signal-запросы python main.py --source hh --category "оператор колл-центра" # только один python main.py --source hh --enrich --enrich-egrul --rescore --export # полный pipeline для HH # Все категории на Яндекс.Картах (~3-4 часа) python main.py --source yandex # Только Tier 2 — анализ сайтов (без браузера) python main.py --enrich # Только ЕГРЮЛ — обогатить ИНН/директором/датой регистрации python main.py --enrich-egrul # Только пересчёт score у всех (после обновления формулы) python main.py --rescore # Комбинированно: обогатить и пересчитать без парсинга python main.py --enrich --enrich-egrul --rescore --export # Просто экспорт того что уже в БД (горячие лиды) python main.py --export --min-score 5 # Показать статистику python main.py --stats """ import argparse import logging import random import sys import time from datetime import datetime import config from database import ( cleanup_bad_director_names, finish_source_run, fix_categories_from_runs, get_all_leads, get_connection, get_hh_leads_without_website, get_leads_for_egrul, get_leads_for_enrichment, get_leads_for_finance, get_stats, init_db, start_source_run, update_egrul, update_enrichment, update_finance, update_lead_contacts, update_lead_website, update_score, upsert_lead, ) from export.csv_export import export_day, export_master, export_run, export_to_csv from logger_setup import configure_root_logger, setup_logger from scoring import annotate_with_score, calculate_score # ─────────────────────────────────────────────────────────────────────── # Логгер: цветной вывод (colorlog) # ─────────────────────────────────────────────────────────────────────── configure_root_logger(level=logging.INFO) # для всех модулей (parsers/, enricher/) logger = setup_logger("main") # ─────────────────────────────────────────────────────────────────────── # Раннеры по источникам # ─────────────────────────────────────────────────────────────────────── def _save_leads(conn, leads: list[dict], run_id: int | None = None) -> tuple[int, int]: """Скоринг + upsert. Возвращает (inserted, merged). Если задан run_id — каждый лид связывается с прогоном через lead_in_run. Blacklisted-лиды (крупные банки, госструктуры, сети ритейла) пропускаются — им бесполезен холодный outreach. """ from enricher.blacklist import is_blacklisted inserted = 0 merged = 0 skipped_blacklist = 0 for lead in leads: name = lead.get("name") or "" is_bl, bl_reason = is_blacklisted(name) if is_bl: skipped_blacklist += 1 logger.debug(f" ⊘ blacklist skip: {name!r} ({bl_reason})") continue annotate_with_score(lead) result = upsert_lead(conn, lead, run_id=run_id) if result == "inserted": inserted += 1 elif result == "merged": merged += 1 if skipped_blacklist: logger.info(f" ⊘ Пропущено blacklisted: {skipped_blacklist}") return inserted, merged def _format_location(city: str, district: str | None) -> str: """Шапка локации для sources_log.city: 'Москва' / 'Москва и МО / Балашиха'.""" return f"{city} / {district}" if district else city def run_yandex(conn, categories: list[str], city: str, max_cards: int, district: str | None = None) -> list[int]: """Прогон по Яндекс.Картам. Каждая категория = отдельный прогон (run_id в sources_log) + связка лидов через lead_in_run. Это даёт возможность экспортировать CSV конкретного прогона. Возвращает список run_id всех созданных прогонов — для последующего auto-export. """ from parsers.yandex_maps import parse_yandex_maps location_label = _format_location(city, district) run_ids: list[int] = [] for idx, category in enumerate(categories, start=1): logger.info(f"\n{'═' * 60}") logger.info(f"📍 Категория [{idx}/{len(categories)}]: {category} ({location_label})") logger.info(f"{'═' * 60}") run_id = start_source_run(conn, "yandex_maps", category, location_label) run_ids.append(run_id) try: leads = parse_yandex_maps({ "query": category, "city": city, "district": district, "max_cards": max_cards, }) except Exception as e: logger.exception(f"Парсер упал на категории {category}: {e}") finish_source_run(conn, run_id, 0, 0, 0, 1, "error", str(e)) continue inserted, merged = _save_leads(conn, leads, run_id=run_id) finish_source_run(conn, run_id, len(leads), inserted, merged, 0, "ok") logger.info( f"✓ Категория {category}: найдено {len(leads)}, " f"вставлено {inserted}, мержей {merged} (run #{run_id})" ) # Пауза между категориями if idx < len(categories): from parsers.base import BaseParser BaseParser().sleep_between_categories() return run_ids # ─────────────────────────────────────────────────────────────────────── # HH.ru runner # ─────────────────────────────────────────────────────────────────────── def run_hh(conn, queries: list[str], city: str, max_pages: int = None) -> list[int]: """Прогон HH.ru. Один signal-запрос = один прогон (run_id). Возвращает список run_id всех созданных прогонов — для последующего auto-export. """ from parsers.hh import parse_hh_signal pages = max_pages or config.HH_MAX_PAGES_PER_QUERY run_ids: list[int] = [] for idx, query in enumerate(queries, start=1): logger.info(f"\n{'═' * 60}") logger.info(f"💼 HH signal-запрос [{idx}/{len(queries)}]: {query} ({city})") logger.info(f"{'═' * 60}") run_id = start_source_run(conn, "hh", query, city) run_ids.append(run_id) try: leads = parse_hh_signal( query=query, city=city, max_pages=pages, period_days=config.HH_PERIOD_DAYS, ) except Exception as e: logger.exception(f"HH парсер упал на запросе '{query}': {e}") finish_source_run(conn, run_id, 0, 0, 0, 1, "error", str(e)) continue inserted, merged = _save_leads(conn, leads, run_id=run_id) finish_source_run(conn, run_id, len(leads), inserted, merged, 0, "ok") logger.info( f"✓ HH '{query}': найдено {len(leads)}, " f"вставлено {inserted}, мержей {merged} (run #{run_id})" ) # Пауза между запросами (anti-rate-limit) if idx < len(queries): time.sleep(config.HH_PAUSE_BETWEEN_QUERIES) return run_ids # ─────────────────────────────────────────────────────────────────────── # Tier 2 — enrichment + rescore # ─────────────────────────────────────────────────────────────────────── def run_enrichment(conn, limit: int | None = None) -> int: """Прогон website_analyzer по лидам с website. Возвращает кол-во обогащённых. После каждого лида сразу пересчитывается score (новая инфа → новый скор). """ from enricher.website_analyzer import analyze_website leads = get_leads_for_enrichment(conn, limit=limit, only_unchecked=True) if not leads: logger.info("Нет лидов для enrichment (все уже обогащены или нет website).") return 0 logger.info(f"\n🔧 Tier 2 enrichment: {len(leads)} лидов с website") enriched_count = 0 for idx, row in enumerate(leads, start=1): lead_id, name, website = row["id"], row["name"], row["website"] logger.info(f"[{idx}/{len(leads)}] {name} → {website[:60]}") info = analyze_website(website, timeout=10) update_enrichment(conn, lead_id, info) # Сливаем найденные на сайте email и доп.телефоны в лида (мерж без дублей). added_e, added_p = update_lead_contacts( conn, lead_id, emails_found=info.get("emails_found"), phones_found=info.get("phones_found"), ) # ─── ИНН/ОГРН/КПП с сайта (152-ФЗ disclosure) ────────────────────── # По закону юр.лица должны публиковать реквизиты на сайте. # Если нашли ИНН в footer — это самый надёжный путь для брендов, # которые Rusprofile не индексирует по имени ("Кафе Пушкинъ" и т.п.). site_inn = info.get("inn") site_ogrn = info.get("ogrn") site_kpp = info.get("kpp") inn_added = False if site_inn or site_ogrn or site_kpp: # Не перезаписываем уже заполненные поля current = dict(conn.execute( "SELECT inn, ogrn FROM leads WHERE id = ?", (lead_id,) ).fetchone()) updates = {} if site_inn and not current.get("inn"): updates["inn"] = site_inn inn_added = True if site_ogrn and not current.get("ogrn"): updates["ogrn"] = site_ogrn if site_kpp: # kpp нет в leads — игнорируем (хранится в ЕГРЮЛ-связанных полях) pass if updates: try: sets = ", ".join(f"{k} = ?" for k in updates) vals = list(updates.values()) + [lead_id] conn.execute(f"UPDATE leads SET {sets} WHERE id = ?", vals) conn.commit() except Exception as e: logger.warning(f" ⚠ update inn/ogrn: {e}") # Если нашли ИНН на сайте, сразу идём в Rusprofile by-inn за директором rusprofile_note = "" if inn_added: try: from enricher.egrul_enricher import enrich_egrul_by_inn egrul_info = enrich_egrul_by_inn(site_inn, company_name=name) if egrul_info.get("egrul_status") == "found": # Записываем то что НЕ заполнено egrul_updates = {} for k in ("director_name", "registration_date", "ogrn"): if egrul_info.get(k): existing = conn.execute( f"SELECT {k} FROM leads WHERE id = ?", (lead_id,) ).fetchone()[0] if not existing: egrul_updates[k] = egrul_info[k] # Помечаем как проверенный в ЕГРЮЛ now = datetime.now().isoformat(timespec="seconds") egrul_updates["egrul_checked_at"] = now egrul_updates["egrul_status"] = "found" if egrul_updates: sets = ", ".join(f"{k} = ?" for k in egrul_updates) vals = list(egrul_updates.values()) + [lead_id] conn.execute(f"UPDATE leads SET {sets} WHERE id = ?", vals) conn.commit() director = egrul_info.get("director_name") or "—" rusprofile_note = f" +ЕГРЮЛ:директор={director[:30]}" except Exception as e: logger.warning(f" ⚠ Rusprofile by-inn: {e}") # Пересчитать score (теперь с Tier 2 данными + ИНН с сайта) full_lead = dict(conn.execute("SELECT * FROM leads WHERE id = ?", (lead_id,)).fetchone()) score, breakdown = calculate_score(full_lead) update_score(conn, lead_id, score, breakdown) # Краткий вывод что нашли cms = info.get("cms_type") or "—" alive = "✓" if info.get("site_alive") else "✗" chat = "💬" if info.get("has_live_chat") else "—" booking = "📅" if info.get("has_online_booking") else "—" analytics = "📊" if info.get("has_analytics") else "—" contacts_note = "" if added_e or added_p: contacts_note = f" +{added_e}email +{added_p}тел" inn_note = f" +ИНН={site_inn}" if inn_added else "" logger.info( f" {alive} cms={cms} chat={chat} book={booking} analytics={analytics} → score={score}" f"{contacts_note}{inn_note}{rusprofile_note}" ) enriched_count += 1 # Вежливая пауза между сайтами (2026-05-18: 0.5 → 0.2 для ускорения) time.sleep(0.2) logger.info(f"\n✅ Обогащено: {enriched_count}") return enriched_count def run_egrul_enrichment(conn, limit: int | None = None) -> int: """ЕГРЮЛ-обогащение через DaData (primary) + Rusprofile (fallback). Три ветки в зависимости от состояния лида: • Уже полный (inn + director + website + phone_primary) → skip • Есть ИНН → DaData findById (точный поиск по ИНН) + Rusprofile fallback • Нет ИНН → DaData suggest (по имени, индексирует бренды) + Rusprofile fallback по имени DaData первый потому что: • Индексирует бренды («Шоколадница», «ВкусВилл») по которым Rusprofile not_found • Без капчи, надёжный JSON-API • 10К запросов/день бесплатно update_egrul не перезаписывает поля которые уже заполнены в БД. Возвращает количество ОБРАБОТАННЫХ (не считая skipped). """ from enricher.egrul_enricher import enrich_egrul, enrich_egrul_by_inn from enricher.dadata_enricher import enrich_via_dadata, enrich_via_dadata_by_inn leads = get_leads_for_egrul(conn, limit=limit, only_unchecked=True) if not leads: logger.info("Нет лидов для ЕГРЮЛ-обогащения (все уже проверены).") return 0 logger.info(f"\n🏛 ЕГРЮЛ enrichment: {len(leads)} лидов на входе") skipped = 0 found = 0 not_found = 0 duplicates = 0 errors = 0 processed = 0 for idx, row in enumerate(leads, start=1): lead_id = row["id"] name = row["name"] city = row["city"] if "city" in row.keys() else None existing_inn = row["inn"] if "inn" in row.keys() else None existing_director = row["director_name"] if "director_name" in row.keys() else None existing_website = row["website"] if "website" in row.keys() else None existing_phone = row["phone_primary"] if "phone_primary" in row.keys() else None # ─── Ветка A: уже полный лид → skip ────────────────────────────── if existing_inn and existing_director and (existing_website or existing_phone): skipped += 1 # Помечаем как проверенный, чтобы в следующий раз не дёргать now = datetime.now().isoformat(timespec="seconds") conn.execute( "UPDATE leads SET egrul_checked_at = ?, egrul_status = ? WHERE id = ?", (now, "skipped_already_full", lead_id), ) conn.commit() continue # ─── Ветка B: есть ИНН → искать по ИНН ─────────────────────────── if existing_inn: logger.info(f"[{idx}/{len(leads)}] ИНН={existing_inn} → DaData by-inn") try: # 1. DaData findById — самый надёжный путь info = enrich_via_dadata_by_inn(existing_inn) # 2. Если DaData не нашла — fallback на Rusprofile if info.get("egrul_status") != "found": info = enrich_egrul_by_inn(existing_inn, company_name=name) except Exception as e: logger.exception(f" ⚠ Падение by-inn: {e}") errors += 1 time.sleep(1.0) continue else: # ─── Ветка C: нет ИНН → DaData suggest по имени ───────────── logger.info(f"[{idx}/{len(leads)}] {name}") try: # 1. DaData suggest — индексирует бренды («Шоколадница») info = enrich_via_dadata(name, city=city) # 2. Если DaData не нашла — fallback на Rusprofile if info.get("egrul_status") != "found": info = enrich_egrul(name, city=city) except Exception as e: logger.exception(f" ⚠ Падение by-name: {e}") errors += 1 time.sleep(1.0) continue # Фильтруем info — не перезаписываем уже заполненные поля filtered_info = dict(info) for k, existing_val in ( ("inn", existing_inn), ("director_name", existing_director), ("website", existing_website), ("phone_primary", existing_phone), ("address", row["address"] if "address" in row.keys() else None), ): if existing_val and filtered_info.get(k): # Уже есть — не трогаем filtered_info.pop(k) try: update_status = update_egrul(conn, lead_id, filtered_info) except Exception as e: logger.exception(f" ⚠ Падение update_egrul: {e}") errors += 1 time.sleep(1.0) continue processed += 1 # Пересчитать score (registration_date / director могли появиться) full_lead = dict(conn.execute("SELECT * FROM leads WHERE id = ?", (lead_id,)).fetchone()) score, breakdown = calculate_score(full_lead) update_score(conn, lead_id, score, breakdown) status = info.get("egrul_status") if status == "found": found += 1 director = info.get("director_name") or existing_director or "—" reg = info.get("registration_date") or "—" web = info.get("website") or existing_website or "—" phone = info.get("phone_primary") or existing_phone or "—" dup_marker = " (DUP по ИНН)" if update_status == "duplicate" else "" logger.info( f" ✓ директор={director} | рег.={reg} | сайт={web[:50]} | " f"тел.={phone} → score={score}{dup_marker}" ) if update_status == "duplicate": duplicates += 1 elif status == "not_found": not_found += 1 logger.info(f" ✗ не найдено (ни DaData, ни Rusprofile)") else: errors += 1 logger.warning(f" ⚠ ошибка ЕГРЮЛ") time.sleep(0.7) logger.info( f"\n✅ ЕГРЮЛ: skipped={skipped} (уже полные), " f"найдено {found} (DUP {duplicates}), не найдено {not_found}, " f"ошибок {errors}, всего обработано {processed}/{len(leads)}" ) return processed def run_financials(conn, limit: int | None = None) -> int: """Добор финансов (сотрудники + оборот) по ООО-лидам через DaData findById. Идёт по лидам с ИНН 10 цифр (ООО) без finance_checked_at. ИП пропускаются (не сдают отчётность). Источник — DaData (поля employee_count + finance.income). """ from enricher.dadata_enricher import enrich_via_dadata_by_inn leads = get_leads_for_finance(conn, limit=limit, only_unchecked=True) if not leads: logger.info("Нет ООО-лидов для добора финансов (все проверены либо нет ИНН).") return 0 logger.info(f"\n💰 Финансы (DaData findById): {len(leads)} ООО-лидов") found = 0 for idx, row in enumerate(leads, start=1): inn = row["inn"] try: info = enrich_via_dadata_by_inn(inn) except Exception as e: logger.warning(f" ⚠ DaData финансы {inn}: {e}") time.sleep(0.5) continue wrote = update_finance(conn, row["id"], info) if wrote: found += 1 logger.info( f"[{idx}/{len(leads)}] {row['name'][:28]}: " f"👥{info.get('employee_count')} 💰{info.get('revenue')} ({info.get('finance_year')})" ) else: logger.info(f"[{idx}/{len(leads)}] {row['name'][:28]}: финансов нет в ФНС") time.sleep(0.3) logger.info(f"\n✅ Финансы добавлены: {found}/{len(leads)}") return found def run_find_sites( conn, limit: int | None = None, run_ids: list[int] | None = None, source_filter: str | None = None, ) -> int: """Найти website для лидов где его нет, через DuckDuckGo поиск. Ищем по «name + ИНН + сайт». DDG отдаёт первый non-aggregator результат. Сразу после этого обычный --enrich (website_analyzer) достанет email/phone со страницы Контакты найденного сайта. run_ids: если задано — обрабатываются ТОЛЬКО лиды из этих прогонов (через таблицу lead_in_run). Это поведение по умолчанию в --full, чтобы не ходить по всем 2500+ лидам БД при каждом прогоне. Если None — идём по ВСЕМ лидам без website (медленно, использовать только при явном `--find-sites` без `--source`). Возвращает количество лидов которым добавили website. """ from enricher.contacts_finder import find_company_website # Условия фильтра с плейсхолдером алиаса ({p}='' для простого запроса, # 'l.' для запроса с JOIN lead_in_run). source — через параметр (?), а не # интерполяцию: исключает SQL-инъекцию и хрупкий .replace() по строке SQL. conds = [ "{p}name IS NOT NULL AND {p}name != ''", "({p}website IS NULL OR {p}website = '')", "({p}site_checked_at IS NULL OR {p}site_checked_at = '')", ] if source_filter: conds.append("{p}source = ?") if run_ids: # Ограничение: только лиды из указанных прогонов placeholders = ",".join("?" for _ in run_ids) where = " AND ".join(c.format(p="l.") for c in conds) sql = ( f"SELECT DISTINCT l.id, l.name, l.inn FROM leads l " f"JOIN lead_in_run lir ON lir.lead_id = l.id " f"WHERE lir.run_id IN ({placeholders}) AND ({where}) " f"ORDER BY l.id" ) params = list(run_ids) + ([source_filter] if source_filter else []) else: where = " AND ".join(c.format(p="") for c in conds) sql = f"SELECT id, name, inn FROM leads WHERE {where} ORDER BY id" params = [source_filter] if source_filter else [] if limit: sql += f" LIMIT {int(limit)}" leads = conn.execute(sql, params).fetchall() if not leads: logger.info("Нет лидов для поиска сайтов (у всех есть website или site_checked_at).") return 0 logger.info(f"\n🔎 Поиск сайтов через DDG: {len(leads)} лидов") found = 0 for idx, row in enumerate(leads, start=1): name = row["name"] inn = row["inn"] logger.info(f"[{idx}/{len(leads)}] {name[:55]} ИНН={inn or '—'}") try: site = find_company_website(name, inn=inn) except Exception as e: logger.warning(f" ⚠ ошибка поиска: {e}") time.sleep(1.0) continue if site: conn.execute( "UPDATE leads SET website = ?, has_website = 1 WHERE id = ?", (site, row["id"]), ) conn.commit() found += 1 logger.info(f" ✓ {site}") else: logger.info(f" ✗ сайт не найден") # Пауза между запросами к DDG — вежливость и анти-rate-limit time.sleep(random.uniform(1.2, 2.2)) logger.info(f"\n✅ Сайтов найдено: {found}/{len(leads)}") return found def run_hh_websites(conn, limit: int | None = None) -> int: """Пройти по страницам HH-employer'ов, добрать website / email / phones. Цель — закрыть дыру: HH в выдаче не отдаёт сайт, но на странице `hh.ru/employer/{id}` он часто указан. После добавления website обычный --enrich (Tier 2) даст email со страниц сайтов. Возвращает количество лидов которым добавили website. """ from parsers.hh_employers import parse_hh_employer_pages leads = get_hh_leads_without_website(conn, limit=limit) if not leads: logger.info("Нет HH-лидов без website (все уже обогащены или нет HH-лидов).") return 0 logger.info(f"\n🔍 HH employer-page enrichment: {len(leads)} лидов") # Преобразуем source_id 'hh_12345' → employer_id '12345' для построения URL payload: list[dict] = [] for row in leads: source_id = row["source_id"] or "" if source_id.startswith("hh_"): payload.append({ "lead_id": row["id"], "employer_id": source_id[3:], "name": row["name"], }) if not payload: logger.info("Нет валидных HH-employer ID для парсинга.") return 0 # Botasaurus-парсинг страниц employer'ов. # Employer-страницы легче поисковой выдачи, паузы можно короче чем # config.MIN/MAX_DELAY (которые осторожны для самого HH парсера). results = parse_hh_employer_pages({ "leads": payload, "delay_min": 1.0, "delay_max": 2.5, }) # Запись в БД: website + найденные на странице email/phones websites_added = 0 contacts_added = 0 for r in results: lead_id = r["lead_id"] website = r.get("website") emails = r.get("emails") or [] phones = r.get("phones") or [] if website: if update_lead_website(conn, lead_id, website): websites_added += 1 if emails or phones: added_e, added_p = update_lead_contacts( conn, lead_id, emails_found=emails, phones_found=phones, ) if added_e or added_p: contacts_added += 1 logger.info( f"\n✅ HH websites: добавлено {websites_added} сайтов " f"+ {contacts_added} лидов получили доп. контакты со страницы employer'а" ) return websites_added def run_rescore(conn) -> int: """Пересчитать score у всех лидов в БД (после изменения формулы).""" leads = get_all_leads(conn) logger.info(f"\n🔄 Пересчёт score для {len(leads)} лидов") changed = 0 for row in leads: lead = dict(row) old_score = lead.get("score") or 0 new_score, breakdown = calculate_score(lead) # Пишем всегда: даже при том же числе score формат breakdown мог # измениться (v4 flags → v5 pain_products/band) — CRM читает breakdown. update_score(conn, lead["id"], new_score, breakdown) if new_score != old_score: changed += 1 logger.info(f"✅ Score изменился у {changed} из {len(leads)} лидов") return changed # ─────────────────────────────────────────────────────────────────────── # CLI # ─────────────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser(description="Парсер лидов 44AS") parser.add_argument( "--source", choices=["yandex", "hh", "2gis", "vk", "all"], help="Какой источник парсить", ) parser.add_argument( "--category", help=( "Категория или список через запятую. " "Пример: --category 'автосервис,салон красоты,стоматология'. " "Если не указано — все из config.CATEGORIES" ), ) parser.add_argument( "--city", default=config.ACTIVE_CITY, help=( f"Регион / город (по умолчанию: {config.ACTIVE_CITY}). " f"Известные: {', '.join(config.CITIES.keys())}. " f"Если указать неизвестный — он автоматически уйдёт как район в 'Москва и МО'" ), ) parser.add_argument( "--district", default=None, help=( "Район внутри города (Митино, Бутово, Беляево, ...). " "Добавляется к поисковому запросу и сохраняется в БД отдельным полем" ), ) parser.add_argument( "--limit", type=int, default=config.MAX_CARDS_PER_CATEGORY, help="Максимум карточек на 1 категорию", ) parser.add_argument( "--export", action="store_true", help="После парсинга экспортировать в CSV", ) parser.add_argument( "--min-score", type=int, default=0, help="Минимальный score для экспорта", ) parser.add_argument( "--stats", action="store_true", help="Показать статистику и выйти", ) parser.add_argument( "--enrich", action="store_true", help="Tier 2: проанализировать сайты лидов (CMS, чат, запись, аналитика)", ) parser.add_argument( "--enrich-egrul", action="store_true", dest="enrich_egrul", help="Tier 3: обогатить лиды через ЕГРЮЛ (ИНН, директор, дата регистрации)", ) parser.add_argument( "--enrich-finance", action="store_true", dest="enrich_finance", help="Финансы: добор числа сотрудников + оборота по ООО через DaData (ИП не сдают отчётность)", ) parser.add_argument( "--rescore", action="store_true", help="Пересчитать score у всех лидов (после изменения формулы)", ) parser.add_argument( "--full", action="store_true", help=( "Полный pipeline: парсинг → enrichment (сайт + ЕГРЮЛ) → rescore → export. " "Эквивалент --source yandex --enrich --enrich-egrul --rescore --export" ), ) parser.add_argument( "--cleanup-directors", action="store_true", dest="cleanup_directors", help=( "Очистить лидов у которых в director_name записана должность вместо ФИО " "и сбросить флаг egrul_checked_at для повторного обогащения" ), ) parser.add_argument( "--fix-categories", action="store_true", dest="fix_categories", help=( "Одноразовая миграция: переписать lead.category на поисковый запрос " "(query) из sources_log. До правки 2026-05-18 Я.Карты писали в category " "то что показала карточка, HH — название вакансии. Теперь во всех " "парсерах category = query (то что Ян ввёл)." ), ) parser.add_argument( "--rescan-sites", action="store_true", dest="rescan_sites", help=( "Сбросить site_checked_at у всех лидов — следующий --enrich пройдёт по ним " "заново. Нужно для перепрогона enricher на уже обогащённых лидах " "(например, после расширения сбора контактов в analyze_website)." ), ) parser.add_argument( "--hh-enrich-websites", action="store_true", dest="hh_enrich_websites", help=( "Парсинг страниц работодателей HH (hh.ru/employer/{id}) для дозаполнения " "поля website у HH-лидов. После этого обычный --enrich (Tier 2) " "пройдёт по этим сайтам и даст email/доп.телефоны. Workflow: " "сначала --source hh --enrich-egrul → потом --hh-enrich-websites → " "потом --enrich. Можно одной командой." ), ) parser.add_argument( "--find-sites", action="store_true", dest="find_sites", help=( "Для лидов БЕЗ website ищет сайт компании через DuckDuckGo по " "имени + ИНН. Нужен для WB-лидов (ЕГРЮЛ не содержит сайта). " "После этого обычный --enrich достанет email/phone со страницы Контакты. " "В --full запускается автоматически перед --enrich." ), ) parser.add_argument( "--find-sites-limit", type=int, default=None, dest="find_sites_limit", metavar="N", help="Ограничить кол-во лидов для поиска сайта (тест: --find-sites-limit 10)", ) parser.add_argument( "--find-sites-source", type=str, default=None, dest="find_sites_source", metavar="SRC", help=( "Только лиды этого источника (hh / yandex_maps). " "Полезно когда нужно дозалить сайты по конкретному источнику." ), ) parser.add_argument( "--hh-limit", type=int, default=None, dest="hh_limit", metavar="N", help=( "Ограничить количество HH-лидов для добора сайта (--hh-enrich-websites). " "Полезно для теста: --hh-limit 20 займёт ~1-2 минуты вместо ~1.5 часа." ), ) parser.add_argument( "--export-master", action="store_true", dest="export_master", help="Сохранить snapshot всей БД в exports/_master/all_leads.csv (перезапись)", ) parser.add_argument( "--export-run", type=int, default=None, dest="export_run", metavar="N", help="Пересоздать CSV конкретного прогона по его ID из sources_log", ) args = parser.parse_args() # --full раскрывается во все шаги. # Source указывается ЯВНО (--source yandex / --source hh / --source all), # default = yandex чтобы случайно не запустить тяжёлый прогон по всем источникам. if args.full: args.source = args.source or "yandex" # HH employer-pages нужны ТОЛЬКО когда парсим HH (у Я.Карт сайт есть в карточке). # Раньше флаг ставился всегда → `--full --source yandex` гонял HH-обогащение по # ВСЕЙ базе HH-лидов без сайта (~час впустую). Теперь — только при source hh/all # (как и обещает README: «+ --hh-enrich-websites если source=hh»). args.hh_enrich_websites = args.hh_enrich_websites or args.source in ("hh", "all") args.find_sites = True # ⬅ ищем сайты для лидов где website пуст (DDG fallback) args.enrich = True # → website-analyzer достаёт email/phone со страниц args.enrich_egrul = True args.enrich_finance = True # → сотрудники + оборот по ООО (DaData) args.rescore = True args.export = True # Нормализация --city: PowerShell на Windows иногда съедает первую non-ASCII # букву ("Москва" → "осква"). Восстанавливаем по словарю известных городов. if args.city: from config import CITIES if args.city not in CITIES: # Ищем известный город который заканчивается на этот суффикс for known in CITIES: if known.lower().endswith(args.city.lower()) and len(known) > len(args.city): logger.warning( f"⚠️ --city '{args.city}' нормализуется к '{known}' " f"(PowerShell съел первую букву?)" ) args.city = known break # Инициализация БД init_db(config.DB_PATH) conn = get_connection(config.DB_PATH) try: # Только статистика — выходим сразу if args.stats: stats = get_stats(conn) print("\n📊 СТАТИСТИКА:") for k, v in stats.items(): print(f" {k}: {v}") return # Cleanup кривых ФИО директоров (одноразовая операция) if args.cleanup_directors: cleared = cleanup_bad_director_names(conn) logger.info(f"🧹 Очищено директоров с должностями вместо ФИО: {cleared}") # Не выходим — даём дальше pipeline отработать (например --enrich-egrul) # Одноразовая миграция категорий: lead.category = sources_log.query if args.fix_categories: fixed = fix_categories_from_runs(conn) logger.info(f"🏷️ Категории переписаны на поисковый запрос у {fixed} лидов.") # Сброс site_checked_at для перепрогона enricher на уже обогащённых лидах if args.rescan_sites: cursor = conn.execute( "UPDATE leads SET site_checked_at = NULL WHERE site_checked_at IS NOT NULL" ) conn.commit() logger.info(f"🔁 Сброшено site_checked_at у {cursor.rowcount} лидов — следующий --enrich пройдёт по ним заново.") # Если не указан ни один из шагов — показать help if not ( args.source or args.enrich or args.enrich_egrul or args.enrich_finance or args.rescore or args.export or args.cleanup_directors or args.fix_categories or args.rescan_sites or args.hh_enrich_websites or args.find_sites or args.export_master or args.export_run is not None ): parser.print_help() sys.exit(1) # Собираем run_id всех прогонов этой сессии — для auto-export CSV по прогонам collected_run_ids: list[int] = [] # ── Pipeline — шаги выполняются последовательно ───────────────── # Шаг 1: Парсинг (если указан --source) if args.source: if args.category: categories = [c.strip() for c in args.category.split(",") if c.strip()] else: categories = config.CATEGORIES if args.source in ("yandex", "all"): collected_run_ids.extend( run_yandex(conn, categories, args.city, args.limit, district=args.district) ) if args.source in ("hh", "all"): # Для HH "категории" — это signal-запросы (оператор / администратор / etc.) # Если пользователь явно указал --category — используем его список. # Иначе — стандартный config.HH_SIGNAL_QUERIES. hh_queries = categories if args.category else config.HH_SIGNAL_QUERIES collected_run_ids.extend( run_hh(conn, hh_queries, args.city, max_pages=args.limit if args.category else None) ) if args.source in ("2gis", "all"): logger.warning("⚠️ Парсер 2GIS пока не реализован (Phase 1.4)") if args.source in ("vk", "all"): logger.warning("⚠️ Парсер VK пока не реализован (Phase 1.5)") # --limit относится только к парсингу (карточек на категорию). # Enrichment всегда обрабатывает ВСЕХ непроверенных, чтобы не оставлять хвосты. # Шаг 1.5: HH employer-page enrichment (дозаполняет website у HH-лидов). # Должен идти ДО Tier 2 (--enrich), иначе у HH-лидов нет сайта и Tier 2 # их пропустит → email не соберутся. if args.hh_enrich_websites: run_hh_websites(conn, limit=args.hh_limit) # Шаг 1.6: Поиск сайтов через DDG (для лидов без website). # Должен идти ДО --enrich по той же причине что и hh_enrich_websites: # сначала найти сайт → потом website-analyzer вытащит email/phone. # # ВАЖНО: ограничиваем поиск только лидами ТЕКУЩЕЙ сессии (collected_run_ids). # Иначе пойдём по всей БД (2500+ лидов = час+ DDG-запросов). # Если --find-sites вызван БЕЗ --source (например, для добивки старых лидов), # тогда run_ids=None и идём по всем (медленно, но осознанно). if args.find_sites: run_find_sites( conn, limit=args.find_sites_limit, run_ids=collected_run_ids if collected_run_ids else None, source_filter=args.find_sites_source, ) # Шаг 2: Enrichment сайтов (если указан --enrich) if args.enrich: run_enrichment(conn, limit=None) # Шаг 3: Enrichment ЕГРЮЛ (если указан --enrich-egrul) if args.enrich_egrul: run_egrul_enrichment(conn, limit=None) # Шаг 3.5: Финансы — сотрудники + оборот по ООО (если указан --enrich-finance) if args.enrich_finance: run_financials(conn, limit=None) # Шаг 4: Rescore (если указан --rescore) if args.rescore: run_rescore(conn) # Финальная статистика stats = get_stats(conn) print("\n" + "═" * 60) print("📊 РЕЗУЛЬТАТ:") for k, v in stats.items(): print(f" {k}: {v}") print("═" * 60) # Шаг 5: Экспорт (если указан --export) # Новое поведение: если в сессии были прогоны парсера → отдельный CSV # каждого прогона в exports/YYYY-MM/ с шапкой-метаданными. # Старое поведение (backward compat): --export без парсинга → плоский CSV # по min_score в exports/YYYY-MM/leads_.csv. if args.export: if collected_run_ids: print(f"\n📤 Экспорт {len(collected_run_ids)} прогонов сессии:") for run_id in collected_run_ids: export_run(config.DB_PATH, run_id) # Сводный файл «всё, что собрано за день» в папку дня export_day(config.DB_PATH) else: export_to_csv(config.DB_PATH, min_score=args.min_score) # Шаг 6: master-snapshot всей БД (--export-master) if args.export_master: export_master(config.DB_PATH) # Шаг 7: пересоздать CSV конкретного прогона (--export-run N) if args.export_run is not None: export_run(config.DB_PATH, args.export_run) finally: conn.close() if __name__ == "__main__": main()