Files
Aks 98309dcc96 fix: устранены все найденные аудитом баги и тихие падения
- SQL injection паттерн → параметризованные запросы во всех местах
- except: pass/continue → logger.warning() везде, ничего не тонет молча
- WAL mode + индекс domain_dedup_key в database.py
- try/finally для conn в main.py, утечка соединения устранена
- backoff 30с при 403/429 от Rusprofile/ЕГРЮЛ
- ликвидированные компании → egrul_status="liquidated"
- max_candidates в contacts_finder считает только реальных кандидатов
- DB_PATH абсолютный (Path(__file__).parent), HH_PAUSE_BETWEEN_QUERIES в config
- HH_SIGNAL_QUERIES дубль убран из launcher.py → импорт из config
- path traversal защита в egrul_enricher debug_dump_html

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-09 13:19:52 +03:00

356 lines
15 KiB
Python

"""DaData enricher — поиск компании по имени через DaData Suggestions API.
DaData (https://dadata.ru) — российский сервис подсказок. Бесплатный тариф
«Подсказки» — 10 000 запросов/день. Возвращает данные из ЕГРЮЛ:
• ИНН, ОГРН, КПП
• Полное и краткое название (юр.форма + бренд)
• Юр.адрес (нормализованный, с координатами)
• ФИО директора и должность
• ОКВЭД основной + дополнительные
• Дата регистрации
• Статус (действующее / ликвидировано)
Зачем нужен (в дополнение к Rusprofile):
• Rusprofile часто не индексирует бренды («Шоколадница» зарегистрирована
как «ООО ХХХ», без слова «Шоколадница» в названии).
• DaData умеет искать по бренду, синонимам, частичным совпадениям.
• Без капчи, без подсунутых страниц.
НЕ выдаёт телефон, email, сайт — это не часть ЕГРЮЛ. Их собираем
отдельно через Я.Карты / website_analyzer.
Использование:
info = enrich_via_dadata("ВкусВилл", city="Москва")
if info["egrul_status"] == "found":
# info["inn"], info["director_name"], info["address"], ...
"""
import logging
import os
import re
from datetime import datetime
from typing import Optional
import requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
logger = logging.getLogger(__name__)
SUGGEST_URL = "https://suggestions.dadata.ru/suggestions/api/4_1/rs/suggest/party"
FIND_BY_ID_URL = "https://suggestions.dadata.ru/suggestions/api/4_1/rs/findById/party"
def _api_key() -> Optional[str]:
"""Получить API-ключ из переменной окружения DADATA_API_KEY."""
key = os.environ.get("DADATA_API_KEY")
if not key:
# Pытаемся подгрузить из .env при первом вызове
try:
from dotenv import load_dotenv
load_dotenv()
key = os.environ.get("DADATA_API_KEY")
except ImportError:
logger.warning("[dadata] python-dotenv не установлен, .env не загружен")
return key
def _empty_result() -> dict:
return {
"inn": None,
"ogrn": None,
"kpp": None,
"full_name": None,
"short_name": None,
"director_name": None,
"director_post": None,
"address": None,
"registration_date": None,
"okved": None,
"status": None,
"type": None, # LEGAL / INDIVIDUAL
# Финансы (ФНС через DaData, D20)
"employee_count": None, # среднесписочная численность
"revenue": None, # доходы за год (≈ оборот), руб
"expense": None, # расходы за год, руб
"finance_year": None, # год отчётности
"egrul_checked_at": datetime.now().isoformat(timespec="seconds"),
"egrul_status": "error",
}
def _parse_party(data: dict) -> dict:
"""Преобразовать DaData-объект party → наш dict."""
result = _empty_result()
result["inn"] = data.get("inn")
result["ogrn"] = data.get("ogrn")
result["kpp"] = data.get("kpp")
result["type"] = data.get("type") # LEGAL / INDIVIDUAL
name = data.get("name") or {}
result["full_name"] = name.get("full_with_opf") or name.get("full")
result["short_name"] = name.get("short_with_opf") or name.get("short")
mgmt = data.get("management") or {}
director = mgmt.get("name")
if director:
# DaData выдаёт ФИО заглавными буквами ("ИВАНОВ ИВАН ИВАНОВИЧ").
# Приводим к Title Case ("Иванов Иван Иванович").
result["director_name"] = director.title()
result["director_post"] = (mgmt.get("post") or "").title() or None
addr = data.get("address") or {}
result["address"] = addr.get("value") or addr.get("unrestricted_value")
state = data.get("state") or {}
result["status"] = state.get("status") # ACTIVE / LIQUIDATING / LIQUIDATED
reg_ts = state.get("registration_date")
if reg_ts:
# DaData возвращает timestamp в миллисекундах (epoch ms)
try:
dt = datetime.fromtimestamp(int(reg_ts) / 1000)
result["registration_date"] = dt.strftime("%Y-%m-%d")
except (ValueError, TypeError, OSError):
pass
# ОКВЭД основной
result["okved"] = data.get("okved")
# ─── Финансы (ФНС через DaData, D20) ──────────────────────────────────
result["employee_count"] = data.get("employee_count")
fin = data.get("finance") or {}
result["revenue"] = fin.get("income") # доходы за год (≈ оборот), руб
result["expense"] = fin.get("expense") # расходы за год, руб
result["finance_year"] = fin.get("year") # год отчётности (если отдан)
result["egrul_status"] = "found"
return result
# Префиксы которые часто добавляются к HR-имени работодателя но НЕ являются
# частью юр.названия. Убираем их перед запросом в DaData.
HR_PREFIXES = (
"ресторан", "кафе", "бар", "столовая", "пиццерия", "кофейня", "пекарня",
"салон красоты", "салон", "студия красоты", "студия", "парикмахерская",
"барбершоп", "имидж-лаборатория", "клиника", "медицинский центр", "стоматология",
"сеть кофеен", "сеть ресторанов", "сеть магазинов", "сеть",
"группа компаний", "группа", "холдинг", "магазин", "ателье",
"центр массажа", "центр", "академия", "школа", "детский сад",
"автосалон", "автосервис", "автомойка",
"юридическая компания", "юридический центр", "адвокатское бюро",
"консалтинговая компания", "ит-компания", "it компания",
"ип ", # «ИП Иванов Иван» → «Иванов Иван»
)
def _clean_for_dadata(name: str) -> list[str]:
"""Очистить имя HR-работодателя в кандидаты для DaData.
HH часто называет работодателя как «Ресторан The Бык (ИП Межлумова И.Ю.)».
DaData ищет по точному юр.названию, поэтому таких записей не находит.
Возвращает список кандидатов — пробуем по очереди:
1. Оригинал
2. Без префиксов «Ресторан / Кафе / ИП / Сеть / Студия...»
3. Без скобочного суффикса «(ИП ФИО)»
4. Содержимое скобок «(ИП Иванов И.И.)» как отдельный кандидат
"""
if not name:
return []
candidates = [name]
# Извлечь содержимое скобок и сам префикс-до-скобки
m_paren = re.search(r"^(.+?)\s*\(([^)]+)\)\s*$", name)
if m_paren:
head = m_paren.group(1).strip()
inner = m_paren.group(2).strip()
if head and head not in candidates:
candidates.append(head)
if inner and inner not in candidates:
candidates.append(inner)
# Снять HR-префиксы (case-insensitive). Сортируем по длине DESC чтобы
# длинные префиксы ('барбершоп') проверялись раньше коротких ('бар') —
# иначе 'Барбершоп BRITVA' → 'бершоп BRITVA'.
prefixes_sorted = sorted(HR_PREFIXES, key=len, reverse=True)
for base in list(candidates):
lower = base.lower()
for pref in prefixes_sorted:
# Проверяем что префикс — отдельное слово (за ним пробел или конец строки)
if lower.startswith(pref + " ") or lower == pref:
stripped = base[len(pref):].lstrip(" -—:").strip()
if stripped and stripped not in candidates:
candidates.append(stripped)
break
# Заменить подчёркивания на пробел (Meat_Coin → Meat Coin)
for base in list(candidates):
if "_" in base:
alt = base.replace("_", " ").strip()
if alt and alt not in candidates:
candidates.append(alt)
return candidates
def enrich_via_dadata(
name: str,
city: str | None = None,
timeout: float = 10.0,
) -> dict:
"""Поиск компании по имени через DaData Suggestions API.
name: название компании (бренд или юр.название)
city: опционально — для приоритизации московских результатов
Стратегия: пробуем несколько кандидатов очищенного имени (см. _clean_for_dadata):
оригинал → без HR-префикса → без скобочного суффикса → содержимое скобок.
Останавливаемся на первом найденном результате.
"""
result = _empty_result()
if not name:
result["egrul_status"] = "not_found"
return result
api_key = _api_key()
if not api_key:
logger.warning("DADATA_API_KEY не задан в .env — DaData enricher disabled")
return result
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": f"Token {api_key}",
}
candidates = _clean_for_dadata(name)
suggestions = []
for query in candidates:
body: dict = {"query": query, "count": 10}
# Приоритезация по городу
if city:
if "москва" in city.lower():
body["locations_boost"] = [{"kladr_id": "77"}]
elif "санкт-петербург" in city.lower() or "спб" in city.lower():
body["locations_boost"] = [{"kladr_id": "78"}]
try:
r = requests.post(SUGGEST_URL, json=body, headers=headers, timeout=timeout)
except requests.exceptions.RequestException as e:
logger.warning(f" DaData request failed for '{query}': {e}")
continue
if r.status_code == 403:
logger.error("DaData: 403 Forbidden — проверь DADATA_API_KEY")
return result
if r.status_code != 200:
continue
try:
data = r.json()
except ValueError:
continue
suggestions = data.get("suggestions") or []
if suggestions:
if query != name:
logger.debug(f" DaData: '{name!r}' → cleaned '{query}'{len(suggestions)} рез.")
break
if not suggestions:
result["egrul_status"] = "not_found"
return result
# Выбираем лучшего кандидата.
# Приоритет: ACTIVE + указанный город в адресе > просто ACTIVE > любой первый
best = None
if city:
city_low = city.lower()
for s in suggestions:
d = s.get("data") or {}
state = d.get("state") or {}
addr = (d.get("address") or {}).get("value") or ""
if state.get("status") == "ACTIVE" and city_low in addr.lower():
best = d
break
if not best:
for s in suggestions:
d = s.get("data") or {}
state = d.get("state") or {}
if state.get("status") == "ACTIVE":
best = d
break
if not best:
# Все ликвидированы — берём первого, логируем, выставляем статус
logger.warning(f"[dadata] Все совпадения ликвидированы для '{name}'")
best = suggestions[0].get("data") or {}
parsed = _parse_party(best)
parsed["egrul_status"] = "liquidated"
return parsed
parsed = _parse_party(best)
return parsed
def enrich_via_dadata_by_inn(inn: str, timeout: float = 10.0) -> dict:
"""Поиск компании по ИНН через DaData findById API."""
result = _empty_result()
if not inn:
result["egrul_status"] = "not_found"
return result
api_key = _api_key()
if not api_key:
return result
body = {"query": inn}
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": f"Token {api_key}",
}
try:
r = requests.post(FIND_BY_ID_URL, json=body, headers=headers, timeout=timeout)
except requests.exceptions.RequestException as e:
logger.warning(f" DaData findById failed for INN {inn}: {e}")
return result
if r.status_code != 200:
return result
try:
data = r.json()
except ValueError:
return result
suggestions = data.get("suggestions") or []
if not suggestions:
result["egrul_status"] = "not_found"
return result
return _parse_party(suggestions[0].get("data") or {})
if __name__ == "__main__":
# Smoke-test
logging.basicConfig(level=logging.INFO, format="%(message)s")
cases = [
("ВкусВилл", "Москва"),
("Шоколадница", "Москва"),
("Гранд Пирог", "Москва"),
("Кулинарная лавка братьев Караваевых", "Москва"),
("ПАО Совкомбанк", "Москва"),
]
for name, city in cases:
info = enrich_via_dadata(name, city=city)
print(f"\n{name} ({city})")
print(f" status: {info['egrul_status']}")
if info["egrul_status"] == "found":
print(f" {info['full_name']}")
print(f" ИНН={info['inn']} ОГРН={info['ogrn']} КПП={info['kpp']}")
print(f" директор: {info['director_name']} ({info['director_post']})")
print(f" адрес: {info['address']}")
print(f" регистрация: {info['registration_date']}, статус: {info['status']}")