Files
Aks f78f35fb3f init: Parser v1 — Lead Generation Engine
Парсер лидов МБ РФ: Яндекс.Карты + HH.ru + обогащение DaData/ЕГРЮЛ/Rusprofile + Streamlit CRM.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-09 12:56:06 +03:00

263 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Экспорт лидов из SQLite в CSV.
utf-8-sig — чтобы Excel не ломал кириллицу при двойном клике.
Три режима:
- export_run(db_path, run_id) → CSV одного прогона в exports/YYYY-MM/YYYY-MM-DD/ с шапкой-комментарием
- export_master(db_path) → snapshot всей БД в exports/_master/all_leads.csv (перезапись)
- export_to_csv(db_path, ...) → [старый универсальный] плоский CSV по min_score
"""
import re
import sqlite3
from datetime import datetime
from pathlib import Path
import pandas as pd
import config
# Колонки CSV в порядке появления в файле.
# Включает CRM-поля (comments / last_action / last_reaction / last_touched_at)
# и Tier 2/3 enrichment поля.
EXPORT_COLUMNS = [
# Идентичность
"id", "name", "inn", "ogrn", "director_name",
# Контакты
"phone_primary", "email_primary", "phones", "emails",
"website", "vk_url", "telegram_url", "instagram_url", "youtube_url",
# Гео
"address", "city", "region", "district",
# Бизнес
"category", "reviews_count", "reviews_avg",
# Анализ сайта (Tier 2)
"site_alive", "cms_type", "has_live_chat", "has_online_booking",
# ЕГРЮЛ (Tier 3)
"registration_date", "egrul_status",
# Скоринг
"score", "score_breakdown",
# CRM (ручной режим)
"outreach_status", "comments", "last_action", "last_reaction", "last_touched_at",
# Системные
"source", "source_url", "parsed_at",
]
# ───────────────────────────────────────────────────────────────────────
# Helpers
# ───────────────────────────────────────────────────────────────────────
def _safe_part(s: str | None) -> str:
"""Безопасная часть имени файла: убирает запрещённые символы FS,
схлопывает пробелы в _. Кириллицу оставляет.
"""
if not s:
return "unknown"
safe = re.sub(r'[/\\:*?"<>|]+', "", s.strip())
safe = re.sub(r"\s+", "_", safe)
return safe or "unknown"
def _day_dir() -> Path:
"""exports/YYYY-MM/YYYY-MM-DD/ — папка дня ВНУТРИ папки месяца (создаётся при первом обращении)."""
now = datetime.now()
path = Path(config.EXPORT_DIR) / now.strftime("%Y-%m") / now.strftime("%Y-%m-%d")
path.mkdir(parents=True, exist_ok=True)
return path
def _master_dir() -> Path:
"""exports/_master/ — создаётся при первом обращении."""
path = Path(config.EXPORT_DIR) / "_master"
path.mkdir(parents=True, exist_ok=True)
return path
def _write_with_header(path: Path, header_lines: list[str], df: pd.DataFrame) -> None:
"""Записать CSV: сначала шапка с префиксом '# ', потом таблица."""
with open(path, "w", encoding="utf-8-sig", newline="") as f:
for line in header_lines:
f.write(f"# {line}\n")
f.write("#\n") # разделитель между шапкой и таблицей
df.to_csv(f, index=False)
# ───────────────────────────────────────────────────────────────────────
# Режим 1: экспорт одного прогона (новый, основной)
# ───────────────────────────────────────────────────────────────────────
def export_run(db_path: str, run_id: int) -> str | None:
"""Экспортировать лидов одного прогона в exports/YYYY-MM/.
Берёт метаданные прогона из sources_log, лидов — через JOIN с lead_in_run.
Имя файла: leads_<source>_<query>_<city>_<timestamp>.csv
Возвращает путь к файлу, либо None если прогон не найден.
"""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
run = conn.execute(
"SELECT * FROM sources_log WHERE id = ?", (run_id,)
).fetchone()
if not run:
conn.close()
print(f"⚠ Прогон #{run_id} не найден в sources_log")
return None
run = dict(run)
# Лиды прогона через JOIN
cols = ", ".join(f"l.{c}" for c in EXPORT_COLUMNS)
query = f"""
SELECT {cols}, lir.role AS run_role
FROM leads l
INNER JOIN lead_in_run lir ON lir.lead_id = l.id
WHERE lir.run_id = ?
ORDER BY l.score DESC, l.id
"""
df = pd.read_sql_query(query, conn, params=(run_id,))
conn.close()
# Шапка
score_min = int(df["score"].min()) if len(df) else 0
score_max = int(df["score"].max()) if len(df) else 0
started_short = (run.get("started_at") or "")[:10]
n_inserted = int((df["run_role"] == "inserted").sum()) if len(df) else 0
n_merged = int((df["run_role"] == "merged").sum()) if len(df) else 0
header = [
f"Дата: {started_short or ''}",
f"Источник: {run.get('source') or ''}",
f"Запрос: {run.get('query') or ''}",
f"Город: {run.get('city') or ''}",
f"Найдено: {len(df)} лидов ({n_inserted} новых, {n_merged} обновлённых)",
f"Диапазон score: {score_min}{score_max}",
f"Прогон ID: {run_id}, статус: {run.get('status', '')}",
]
# Имя файла
ts = datetime.now().strftime("%Y%m%d_%H%M")
parts = [
"leads",
_safe_part(run.get("source")),
_safe_part(run.get("query")),
_safe_part(run.get("city")),
ts,
]
filename = "_".join(parts) + ".csv"
out_path = _day_dir() / filename
_write_with_header(out_path, header, df)
print(f"📤 Прогон #{run_id}: {len(df)} лидов → {out_path}")
return str(out_path)
# ───────────────────────────────────────────────────────────────────────
# Режим 2: master-snapshot всей БД
# ───────────────────────────────────────────────────────────────────────
def export_master(db_path: str) -> str:
"""Сохранить snapshot всей БД в exports/_master/all_leads.csv (перезапись).
Содержит ВСЕХ лидов на текущий момент со всеми CRM-статусами.
Каждый запуск переписывает файл — нужно для актуальности.
"""
conn = sqlite3.connect(db_path)
cols = ", ".join(EXPORT_COLUMNS)
df = pd.read_sql_query(
f"SELECT {cols} FROM leads ORDER BY score DESC, id",
conn,
)
conn.close()
hot_count = int((df["score"] >= config.HOT_LEAD_THRESHOLD).sum()) if len(df) else 0
header = [
f"Сгенерирован: {datetime.now().strftime('%Y-%m-%d %H:%M')}",
f"Всего лидов: {len(df)}",
f"Hot (score ≥ {config.HOT_LEAD_THRESHOLD}): {hot_count}",
]
out_path = _master_dir() / "all_leads.csv"
_write_with_header(out_path, header, df)
print(f"📤 Master: {len(df)} лидов → {out_path}")
return str(out_path)
# ───────────────────────────────────────────────────────────────────────
# Сводка дня: всё, что собрано за сегодня — один CSV в папке дня
# ───────────────────────────────────────────────────────────────────────
def export_day(db_path: str) -> str | None:
"""Сводный CSV всех лидов, собранных/обновлённых за СЕГОДНЯ.
«Собрано за день» = лиды из прогонов с started_at = сегодня
(через lead_in_run + sources_log) — включая и новые, и обновлённые (merged).
Пишется в exports/YYYY-MM/YYYY-MM-DD/_сводка_дня_<date>.csv (перезапись).
Возвращает путь, либо None если за сегодня прогонов не было.
"""
today = datetime.now().strftime("%Y-%m-%d")
conn = sqlite3.connect(db_path)
cols = ", ".join(f"l.{c}" for c in EXPORT_COLUMNS)
query = f"""
SELECT DISTINCT {cols}
FROM leads l
JOIN lead_in_run lir ON lir.lead_id = l.id
JOIN sources_log sl ON sl.id = lir.run_id
WHERE substr(sl.started_at, 1, 10) = ?
ORDER BY l.score DESC, l.id
"""
df = pd.read_sql_query(query, conn, params=(today,))
conn.close()
if df.empty:
return None
header = [
f"Сводка за день: {today}",
f"Собрано за день: {len(df)} лидов",
f"Score: {int(df['score'].min())}{int(df['score'].max())}",
]
out_path = _day_dir() / f"_сводка_дня_{today}.csv"
_write_with_header(out_path, header, df)
print(f"📤 Сводка дня: {len(df)} лидов → {out_path}")
return str(out_path)
# ───────────────────────────────────────────────────────────────────────
# Режим 3: старый универсальный экспорт (плоский CSV по min_score) — backward compat
# ───────────────────────────────────────────────────────────────────────
def export_to_csv(
db_path: str = "leads.db",
output_path: str | None = None,
min_score: int = 0,
only_new: bool = False,
) -> str:
"""[Совместимость] Экспорт всех лидов с фильтром score >= min_score.
Используется когда вызвали `--export` без указания run_id.
Для новых сценариев предпочитай export_run() / export_master().
"""
Path(config.EXPORT_DIR).mkdir(parents=True, exist_ok=True)
if not output_path:
ts = datetime.now().strftime("%Y%m%d_%H%M")
output_path = str(_day_dir() / f"leads_{ts}.csv")
where_clauses = ["score >= ?"]
params: list = [min_score]
if only_new:
where_clauses.append("outreach_status IN ('new', 'inbox')")
where = " AND ".join(where_clauses)
cols = ", ".join(EXPORT_COLUMNS)
query = f"""
SELECT {cols}
FROM leads
WHERE {where}
ORDER BY score DESC, parsed_at DESC
"""
conn = sqlite3.connect(db_path)
df = pd.read_sql_query(query, conn, params=params)
conn.close()
df.to_csv(output_path, index=False, encoding="utf-8-sig")
print(f"📤 Экспортировано {len(df)} записей → {output_path}")
return output_path