init: Parser v1 — Lead Generation Engine

Парсер лидов МБ РФ: Яндекс.Карты + HH.ru + обогащение DaData/ЕГРЮЛ/Rusprofile + Streamlit CRM.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Aks
2026-06-09 12:56:06 +03:00
commit f78f35fb3f
33 changed files with 9198 additions and 0 deletions
View File
+201
View File
@@ -0,0 +1,201 @@
"""Blacklist крупных компаний которым outreach бесполезен.
Эти компании НЕ наши клиенты для предложения сайта/автоматизации:
• У них собственные IT-команды и маркетинг
• Они закрытые холдинги / госструктуры
• Сетки которые мы не сможем "пробить" холодным письмом
При парсинге HH такие компании отсекаем сразу (не вставляем в БД).
Существующих в БД помечаем outreach_status='excluded'.
Стратегия проверки:
1. Точное совпадение очищенного имени со списком EXACT_NAMES
2. Подстрока — содержит ли имя одно из KEYWORD_FRAGMENTS
3. Признак крупного юр.лица (ПАО / Госкорпорация / ФГУП / ФГАОУ)
"""
import re
# Точные имена (lowercase, без юр.формы) — известные крупные сети
EXACT_NAMES = {
# Банки и финансы
"газпромбанк", "альфа-банк", "альфа банк", "втб", "сбер", "сбербанк",
"совкомбанк", "мкб", "райффайзен", "тинькофф", "т-банк", "tinkoff",
"россельхозбанк", "дом.рф", "дом рф", "точка", "тинькофф банк",
"отп банк", "псб", "промсвязьбанк", "банк псб", "норвик банк", "морской банк",
"энергогарант", "согаз-мед", "согаз", "ренессанс страхование",
"финансовый дом солид",
# Телеком
"мтс", "билайн", "мегафон", "tele2", "теле2", "ростелеком",
"ростелеком контакт-центр",
# Ритейл / сети
"вкусвилл", "x5", "икс 5", "перекрёсток", "перекресток", "пятерочка",
"пятёрочка", "магнит", "лента", "ашан", "ашан ритейл россия",
"азбука вкуса", "metro", "лемана про", "lamoda", "ozon", "озон",
"дикси", "красное белое", "красное & белое", "красное и белое",
"бристоль", "fix price", "фикс прайс", "светофор", "верный",
"мираторг", "магнолия", "вкусвилл",
"wildberries", "вайлдберриз", "rwb", "сбермаркет", "яндекс еда",
"яндекс.еда", "яндекс крауд", "яндекс крауд: поддержка",
"яндекс крауд: ai-тренеры", "яндекс команда для бизнеса",
"сбер для экспертов", "сбер. it", "сбер тех", "сберпр", "сберправо",
"ситилинк", "ситилинк: магазины", "merlion", "mts", "т-банк", "тинькофф",
"теремок", "кари", "карі",
# IT-холдинги
"yandex", "mail", "mail.ru", "vk", "rambler", "rambler&co", "kaspersky",
# Маркетплейсы и крупные шопы
"wildberries", "bork", "dns", "dns shop", "сеть магазинов цифровой и бытовой техники dns",
"м.видео", "эльдорадо", "rendez-vous",
# Госструктуры
"правительство москвы", "минстрой", "минздрав", "мфц",
"гбу мфц города москвы мои документы", "грчц",
"росатом", "ростех", "газпром", "роснефть", "лукойл", "транснефть",
"роскосмос", "ржд", "ао росгеология",
# Кадровые / HR крупные (не наши клиенты)
"world class", "encore fitness", "xfit", "ddx fitness", "сити фитнес",
"fitness one",
# Известные сети ресторанов / общепита
"шоколадница", "коффемания", "il патио", "иль патио", "ginza project",
"white rabbit family", "kuxnja", "тануки",
# Сети общепита / фастфуд (расширено 2026-06-05)
"му-му", "му му",
"вкусно и точка", "вкусно — и точка", "вкусно -и точка",
"додо пицца", "додо pizza", "dodo pizza",
"крошка картошка", "крошка-картошка",
"якитория", "стардогс", "стардог", "stardogs",
"грабли", "кофе хауз", "coffee house", "правда кофе",
"даблби", "double b", "хлеб насущный", "буханка",
"чайхона №1", "чайхона номер 1", "две палочки",
"планета суши", "росинтер", "ростикс", "rostic's", "rostics",
"бургер кинг", "сабвей", "subway", "крошка-картошка",
"братья караваевы", "кулинарная лавка братьев караваевых",
"прайм стар", "prime star", "кофемания",
# Девелоперы / стройка
"пик", "лср", "ск самолёт", "эталон", "гк эталон", "гк эталон москва",
"стройтрансгаз", "группа самолёт", "самолёт",
# IT-аутсорс / гиганты услуг
"merlion", "softline", "ланит", "крок", "ит-такт", "консист бизнес групп",
"datapro", "datasoft",
# Strategy / consulting big4
"б1", "b1", "kpmg", "deloitte", "ey", "pwc", "ernst & young",
# Кофейни / международные бренды
"starbucks", "burger king", "kfc", "mcdonalds", "макдоналдс",
# Прочие массовые
"лента", "о'кей", "ашан", "global village", "x5 retail group",
"x5 управляющая компания", "оборонстрой", "стройэлектромонтаж",
}
# Подстроки — если в имени встречаются, скорее всего это крупная компания
# или госструктура (не наша целевая аудитория)
KEYWORD_FRAGMENTS = (
# Юр.формы крупных компаний
" пао ", "пао ", " ао ", # ПАО почти всегда крупные публичные компании
"акционерное общество",
"публичное акционерное",
# Госструктуры
"фгуп", "фгаоу", "фгбу", "фгкоу", "гбуз", "гбу ", "гбоу",
"минздрав", "минобр", "минстрой", "мфц", "правительств",
"госкорпорация", "гос. корп", "ао росгеология", "ао росат",
# Сетевые маркеры
"ритейл россия", "торговая сеть", "розничная сеть",
"холдинг", "корпорация", "концерн", "группа компаний",
# Сети ресторанов / общепита (общие маркеры)
" ресторанная группа ", " ресторанная группа", "ресторанная группа ",
"группа ресторанов", "ресторанный холдинг",
# Крупные банки (универсальные)
" банк ", "банк ", "банка ", "банком ",
# Крупные международные
" moscow ", " corp.", " ltd.", " llc",
# Государственные / некоммерческие
"ао аккую нуклеар", " росатом", " ростех",
# Маркеры HR-аутсорса крупных компаний — это сами кадровые агентства, не клиенты
" кадровое агентство", " кадровый центр", "кадровый центр ",
"рекрутмент",
# Билеты, бронирования, логисты-гиганты
"почта россии", "сдек", "boxberry", "dpd", "деловые линии", "транспортная компания",
)
# Префиксы которые мы убираем для нормализации перед проверкой
HR_HEADERS = (
"пао ", "оао ", "ао ", "ооо ", "ип ", "зао ", "нко ", "гк ",
"сеть ", "сети ", "группа ", "группа компаний ",
"ресторан ", "кафе ", "бар ", "магазин ", "клиника ",
"салон красоты ", "салон ", "студия красоты ", "студия ",
"имидж-лаборатория ", "барбершоп ", "пиццерия ", "столовая ",
"автосервис ", "автосалон ",
)
def _normalize(name: str) -> str:
"""Нормализовать имя для сравнения: lowercase, без юр.формы, без скобочного суффикса."""
if not name:
return ""
s = name.lower().strip()
# Убираем скобочный суффикс «(ИП ФИО)» / «(ООО ХХХ)»
s = re.sub(r"\s*\([^)]*\)\s*", " ", s)
# Убираем кавычки
s = re.sub(r"[«»\"'`'.,]", " ", s)
# Убираем юр.формы и HR-префиксы
for pref in sorted(HR_HEADERS, key=len, reverse=True):
if s.startswith(pref):
s = s[len(pref):].strip()
break
return re.sub(r"\s+", " ", s).strip()
def is_blacklisted(name: str) -> tuple[bool, str | None]:
"""Проверить попадает ли компания в blacklist.
Возвращает (True, reason) если в blacklist, или (False, None).
reason — что именно сматчилось (для лога).
"""
if not name:
return False, None
name_lower = name.lower()
normalized = _normalize(name)
# 1. Точное совпадение нормализованного имени
if normalized in EXACT_NAMES:
return True, f"exact: {normalized!r}"
# 1b. Префикс — «вкусвилл даркстор» начинается с «вкусвилл», «альфа-банк
# центральный офис» — с «альфа-банк». Берём самое длинное совпадение.
for known in sorted(EXACT_NAMES, key=len, reverse=True):
if len(known) >= 5 and normalized.startswith(known + " "):
return True, f"prefix: {known!r}"
# 2. Подстрока (KEYWORD_FRAGMENTS) — должна быть в исходном имени lowercase
for frag in KEYWORD_FRAGMENTS:
if frag in name_lower:
return True, f"keyword: {frag!r}"
# 3. Имя начинается с ПАО / Публичное акционерное — точно крупное публичное АО
if (name_lower.startswith("пао ") or name_lower.startswith("публичное акционерное")
or " пао " in name_lower):
return True, "ПАО"
return False, None
if __name__ == "__main__":
tests = [
("Газпромбанк", True),
("ПАО Совкомбанк. Центральный офис.", True),
("Альфа-Банк. Центральный офис", True),
("АШАН Ритейл Россия, Работа в магазине", True),
("Правительство Москвы", True),
("ВкусВилл. Даркстор", True),
("Сбер для экспертов", True),
("ООО Гранд Пирог", False),
("Кафе Пушкинъ", False),
("ИП Иванов Иван", False),
("Студия маникюра BLOOM", False),
("ФГАОУ ВО РНИМУ", True), # фгаоу
("Гос. корп. ГАУЗ ...", True),
]
print("=== is_blacklisted smoke-test ===")
for name, expected in tests:
got, reason = is_blacklisted(name)
mark = "" if got == expected else ""
print(f" {mark} {name[:50]:50}{got} ({reason or ''}) expected={expected}")
+361
View File
@@ -0,0 +1,361 @@
"""Поиск сайта компании по имени/ИНН — для лидов у которых нет website
(например, HH-вакансии без employer-страницы, Я.Карты без сайта).
ЕГРЮЛ юридически НЕ содержит сайта/телефона/email. Их нужно искать в
интернете. Самый надёжный путь без капчи — DuckDuckGo HTML endpoint
(https://html.duckduckgo.com/html/?q=...), он работает без авторизации
и без JS, отдаёт обычный HTML с органическими результатами.
После того как сайт найден — обычный website-analyzer вытащит email/phone
со страницы «Контакты».
Поиск идёт по двум запросам подряд:
1. "{очищенное_название} ИНН {ИНН} сайт" — если ИНН известен
2. "{очищенное_название} официальный сайт"
Очищаем название от юр.формы (ООО / ИП / АО / ...) и от кавычек —
вьюшка качественнее.
Игнорируем агрегаторы (rusprofile, wb, ozon, hh, ya.maps, 2gis и пр.) —
они не являются сайтом самой компании.
"""
import logging
import random
import re
import time
from urllib.parse import quote, unquote, urlparse
import requests
import urllib3
from fake_useragent import UserAgent
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
logger = logging.getLogger(__name__)
_ua = UserAgent()
# DuckDuckGo HTML endpoint — без JS / капчи, отдаёт сразу результаты
SEARCH_URL = "https://html.duckduckgo.com/html/?q={query}"
# Домены которые ТОЧНО не являются сайтом самой компании.
# Расширенный список — после теста 2026-05-19 куда пролезли rbc.ru, saby.ru,
# find-org.net, focus.kontur.ru и пр. агрегаторы.
BLOCKED_DOMAINS = {
# Российские агрегаторы ЕГРЮЛ / каталоги / отчётность
"rusprofile.ru", "zachestnyibiznes.ru", "checko.ru", "list-org.com",
"spark-interfax.ru", "kartoteka.ru", "vbankcenter.ru", "audit-it.ru",
"find-org.net", "find-org.com", "k-agent.ru", "kontragent.vbr.ru",
"kontur.ru", "focus.kontur.ru", "kontur-extern.ru",
"saby.ru", "sbis.ru", "tensor.ru",
"check.tochka.com", "tochka.com",
"b2b.house", "b2bhouse.ru",
"reputation.ru", "rb.ru", "rbc.ru", "companies.rbc.ru",
"kartaslov.ru", "sezinnopolis.ru",
"vyborg.spravker.ru", "spravker.ru",
"datanewton.ru", "vsledu.ru", "1cnalog.ru",
"spark.interfax.ru", "myseldon.com", "seldon.basis.ru",
"casebook.ru", "ru-arbitr.ru", "kad.arbitr.ru",
# Тематические агрегаторы (медицина / автосервис и т.п.)
"doctu.ru", "prodoctorov.ru", "docdoc.ru", "napopravku.ru",
"doc.ru", "krasotaimedicina.ru", "stomatologclub.ru",
"fitauto.ru", "drom.ru", "auto.ru", "carmoney.ru",
"apteka.ru", "asna.ru", "rigla.ru",
"xfirm.ru", "1cinfo.ru", "1c.ru", "1cms.ru",
"finjobs.ru", "nerab.ru",
# Маркетплейсы
"wildberries.ru", "wb.ru", "ozon.ru", "market.yandex.ru", "sbermegamarket.ru",
"kazanexpress.ru", "lamoda.ru", "kupivip.ru", "aliexpress.ru",
"store.steampowered.com", # явно мусор
# Работа / объявления
"hh.ru", "rabota.ru", "superjob.ru", "trudvsem.ru", "avito.ru",
"youla.ru",
# Карты / поисковики / соцсети
"yandex.ru", "yandex.com", "ya.ru", "google.com", "google.ru",
"duckduckgo.com", "bing.com", "mail.ru",
"vk.com", "vk.ru", "vkontakte.ru", "ok.ru", "instagram.com",
"facebook.com", "twitter.com", "x.com", "youtube.com",
"t.me", "telegram.org", "telegram.me",
"wikipedia.org", "habr.com", "pikabu.ru", "dzen.ru",
"ru.wikipedia.org", "en.wikipedia.org",
# 2ГИС / справочники
"2gis.ru", "2gis.com", "spravochnik.org", "spravka.ru",
# ФНС / гос
"nalog.ru", "egrul.nalog.ru", "gosuslugi.ru", "service.nalog.ru",
# Видео / прочее
"rutube.ru",
# Глобальные новости (не сайт компании)
"globalcosmeticsnews.com", "naturestudio.com",
# Китайские / международные маркетплейсы
"alibaba.com", "aliexpress.com", "made-in-china.com", "1688.com",
"ebay.com", "amazon.com", "amazon.de", "etsy.com",
"shopify.com", "redmart.com",
# Сервисы проверки деклараций / сертификации (мимо от поиска по ИНН)
"dip-world.com", "декларации-соответствия.рус",
"che-cko.ru", # ещё один rusprofile-клон
"xn----8sbnaarbafefe1bc6dh3a4bf.xn--rus", # punycode декларации-соответствия.рус
"rosakkredit.ru", "fsa.gov.ru", "rst.gov.ru",
"sertifikatonline.ru", "novotest.ru", "sertifikatik.ru",
"decl-tr.com", "tr-cu.com", "tr-cu.ru",
# App stores / LinkedIn / прочие глобальные сервисы
"apps.apple.com", "play.google.com", "linkedin.com",
# Похожие на rusprofile (но другие — тоже агрегаторы)
"rusprofiles.com", "rusprofiles.ru", "egrul.itsoft.ru", "itsoft.ru",
"egrul-info.ru", "egrul.online", "egrul.io",
# Контрагент-проверки
"vbankcenter.ru", "rosfirm.info", "rosfirm.ru",
"centrinform.ru", "buhonline.ru",
# Радио / СМИ
"echofm.online", "radio1.ru", "kommersant.ru", "rbc.ru",
# Блог-платформы (не сайт компании)
"blogspot.com", "wordpress.com", "wix.com", "tilda.ws",
"tilda.cc", "medium.com", "livejournal.com",
}
def _clean_company_name(name: str) -> str:
"""Убрать ООО / ИП / АО / кавычки — оставить чистое имя бренда."""
if not name:
return ""
cleaned = re.sub(
r"^(?:Общество\s+с\s+ограниченной\s+ответственностью|"
r"Индивидуальный(?:\s+П|\s+п)редприниматель|"
r"Акционерное\s+общество|"
r"Публичное\s+акционерное\s+общество|"
r"ООО|ИП|АО|ПАО|ЗАО|НКО|ОАО)\b\s*",
"", name, flags=re.IGNORECASE,
)
# Убираем кавычки и точки в начале/конце
cleaned = cleaned.strip(" «»\"'.,").strip()
return cleaned
def _is_company_site(url: str) -> bool:
"""True если URL похож на сайт самой компании (не агрегатор и не соцсеть)."""
try:
host = (urlparse(url).hostname or "").lower()
except Exception:
return False
if not host:
return False
# Убираем www. префикс
host = host[4:] if host.startswith("www.") else host
for blocked in BLOCKED_DOMAINS:
if host == blocked or host.endswith("." + blocked):
return False
return True
def _parse_ddg_results(html: str) -> list[str]:
"""Извлечь органические URL из DuckDuckGo HTML результата.
DDG: <a class="result__a" href="//duckduckgo.com/l/?uddg=URL_ENCODED">
Декодируем uddg= и возвращаем реальный URL.
"""
urls: list[str] = []
# Главный паттерн (с uddg-обёрткой)
for m in re.finditer(
r'<a[^>]+class="result__a"[^>]+href="([^"]+)"', html,
):
href = m.group(1)
m2 = re.search(r"uddg=([^&]+)", href)
if m2:
try:
real = unquote(m2.group(1))
except Exception:
continue
else:
real = href
if real.startswith("http"):
urls.append(real)
# Запасной паттерн — без класса (на случай если DDG поменял разметку)
if not urls:
for m in re.finditer(r'<a[^>]+href="(https?://[^"]+)"[^>]*>', html):
urls.append(m.group(1))
return urls
def _verify_site_belongs_to_company(
url: str,
name_cleaned: str,
inn: str | None,
timeout: float = 6.0,
) -> bool:
"""Проверить что найденный сайт реально принадлежит этой компании.
ЖЁСТКИЕ ПРАВИЛА (после фейлов с kamaz≠АМАЗ, fscosmetics≠Cosmetics):
1. Если у лида есть ИНН — он ОБЯЗАН быть на сайте дословно. Иначе
отказ. Это сильнейший сигнал, нет ИНН на сайте = не их сайт.
2. Если ИНН нет — проверяем только полное совпадение названия бренда
по WORD-BOUNDARIES (не как подстрока). 'АМАЗ' внутри 'kamaz' не
считается. Иначе отказ.
Возвращает True только при уверенном совпадении.
"""
try:
r = requests.get(
url, headers={"User-Agent": _ua.random}, timeout=timeout, verify=False,
allow_redirects=True,
)
except Exception:
return False
if r.status_code != 200:
return False
html = r.text[:200_000]
# Сильнейший сигнал — ИНН в HTML страницы. Если есть — сразу True.
if inn and inn in html:
return True
# ИНН на главной странице есть не всегда (часто только в /контакты).
# Поэтому отсутствие ИНН — НЕ приговор. Дальше проверяем имя бренда.
if not name_cleaned or len(name_cleaned) < 4:
return False
# Список слишком общих слов — без ИНН они не помогают
GENERIC = {
"home", "shop", "store", "market", "company", "group",
"trade", "service", "services", "center", "centre",
"cosmetics", "beauty", "office", "studio", "moscow",
"russia", "online", "global", "international",
"магазин", "товары", "сервис", "центр", "офис",
"доставка", "красота", "офис-менеджер",
}
# Слова бренда (≥4 символа, не из generic-списка)
words = [
w for w in re.split(r"[\s«»\"'\-,.&]+", name_cleaned)
if len(w) >= 4 and w.lower() not in GENERIC
]
if not words:
# Все слова имени — слишком общие. Без ИНН — пропускаем.
return False
html_lower = html.lower()
# Ищем по word boundaries — слово в окружении не-буквенно-цифровых символов
for w in words:
# \b в Python re работает на ASCII по умолчанию. Для кириллицы
# эмулируем через look-around: до и после слова должен быть НЕ-буква.
pattern = (
r"(?:^|[^a-zA-Zа-яА-ЯёЁ0-9])"
+ re.escape(w.lower())
+ r"(?:$|[^a-zA-Zа-яА-ЯёЁ0-9])"
)
if re.search(pattern, html_lower):
return True
return False
def find_company_website(
name: str,
inn: str | None = None,
timeout: float = 8.0,
verify: bool = True,
) -> str | None:
"""Поиск сайта компании в DuckDuckGo + верификация по содержимому.
Этапы:
1. DDG-запросы от специфичного к общему (с ИНН, потом без)
2. Для каждого кандидата (≤8 за все запросы) — фильтр по blocklist'у
3. Скачиваем главную страницу → проверяем что на ней упомянут ИНН
ИЛИ имя бренда. Без этого опасно: для "ИП Иванов" DDG отдаст
Forbes или Омбудсмен — будут чужие email/phone.
Возвращает только верифицированный URL (или None).
Если имя слишком короткое/общее (1-2 слова без ИНН) — возвращает None
сразу, чтобы не сосать чужие данные.
"""
cleaned = _clean_company_name(name)
if not cleaned:
return None
# Защита от ложных совпадений: для "ИП Иванов Иван" без ИНН — отказ.
# Слишком общее ФИО + отсутствие ИНН = гарантированно подсунут чужой сайт.
if not inn:
words_in_name = [w for w in re.split(r"[\s«»\"'-]+", cleaned) if w]
# Эвристика: если все слова это похожи на ФИО (3 слова с большой буквы)
# → отказ. Реальный бренд бы имел уникальное название.
if 2 <= len(words_in_name) <= 4 and all(
w[:1].isupper() and w[1:].islower() for w in words_in_name if len(w) > 2
):
logger.debug(f" Skip (ФИО без ИНН): {name}")
return None
# Список запросов от самого специфичного к общему
queries = []
if inn:
queries.append(f'"{cleaned}" ИНН {inn} сайт')
queries.append(f"{cleaned} {inn}")
queries.append(f"{cleaned} официальный сайт")
if not inn:
# Без ИНН только специфичный запрос — без fallback на общий cleaned
pass
else:
queries.append(cleaned)
headers_base = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "ru-RU,ru;q=0.9,en;q=0.8",
}
seen: set[str] = set()
max_candidates = 6 # проверим не больше 6 разных сайтов
for query in queries:
try:
url = SEARCH_URL.format(query=quote(query))
headers = {**headers_base, "User-Agent": _ua.random}
r = requests.get(url, headers=headers, timeout=timeout, verify=False)
except Exception as e:
logger.debug(f" DDG fetch failed for {query!r}: {e}")
continue
if r.status_code != 200:
continue
for found_url in _parse_ddg_results(r.text):
host = (urlparse(found_url).hostname or "").lower()
if host in seen:
continue
seen.add(host)
if not _is_company_site(found_url):
continue
if len(seen) > max_candidates:
break
root = f"{urlparse(found_url).scheme}://{host}"
if not verify:
return root
# Верифицируем что сайт реально про эту компанию
if _verify_site_belongs_to_company(root, cleaned, inn):
logger.debug(f" ✓ Верифицирован: {root} (по {query!r})")
return root
else:
logger.debug(f" ✗ Не верифицирован: {root}")
# Пауза между запросами — вежливость к DDG
time.sleep(random.uniform(0.6, 1.2))
return None
if __name__ == "__main__":
# Smoke-test — передай "имя_компании ИНН" в аргументах
import sys
logging.basicConfig(level=logging.INFO, format="%(message)s")
cases = [
("Кафе Пушкинъ", None),
("ООО Тануки", None),
]
if len(sys.argv) > 1:
cases = [(sys.argv[1], sys.argv[2] if len(sys.argv) > 2 else None)]
for name, inn in cases:
print(f"\n{name[:50]} (ИНН {inn})")
result = find_company_website(name, inn=inn)
print(f" site: {result}")
+351
View File
@@ -0,0 +1,351 @@
"""DaData enricher — поиск компании по имени через DaData Suggestions API.
DaData (https://dadata.ru) — российский сервис подсказок. Бесплатный тариф
«Подсказки» — 10 000 запросов/день. Возвращает данные из ЕГРЮЛ:
• ИНН, ОГРН, КПП
• Полное и краткое название (юр.форма + бренд)
• Юр.адрес (нормализованный, с координатами)
• ФИО директора и должность
• ОКВЭД основной + дополнительные
• Дата регистрации
• Статус (действующее / ликвидировано)
Зачем нужен (в дополнение к Rusprofile):
• Rusprofile часто не индексирует бренды («Шоколадница» зарегистрирована
как «ООО ХХХ», без слова «Шоколадница» в названии).
• DaData умеет искать по бренду, синонимам, частичным совпадениям.
• Без капчи, без подсунутых страниц.
НЕ выдаёт телефон, email, сайт — это не часть ЕГРЮЛ. Их собираем
отдельно через Я.Карты / website_analyzer.
Использование:
info = enrich_via_dadata("ВкусВилл", city="Москва")
if info["egrul_status"] == "found":
# info["inn"], info["director_name"], info["address"], ...
"""
import logging
import os
import re
from datetime import datetime
from typing import Optional
import requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
logger = logging.getLogger(__name__)
SUGGEST_URL = "https://suggestions.dadata.ru/suggestions/api/4_1/rs/suggest/party"
FIND_BY_ID_URL = "https://suggestions.dadata.ru/suggestions/api/4_1/rs/findById/party"
def _api_key() -> Optional[str]:
"""Получить API-ключ из переменной окружения DADATA_API_KEY."""
key = os.environ.get("DADATA_API_KEY")
if not key:
# Pытаемся подгрузить из .env при первом вызове
try:
from dotenv import load_dotenv
load_dotenv()
key = os.environ.get("DADATA_API_KEY")
except ImportError:
pass
return key
def _empty_result() -> dict:
return {
"inn": None,
"ogrn": None,
"kpp": None,
"full_name": None,
"short_name": None,
"director_name": None,
"director_post": None,
"address": None,
"registration_date": None,
"okved": None,
"status": None,
"type": None, # LEGAL / INDIVIDUAL
# Финансы (ФНС через DaData, D20)
"employee_count": None, # среднесписочная численность
"revenue": None, # доходы за год (≈ оборот), руб
"expense": None, # расходы за год, руб
"finance_year": None, # год отчётности
"egrul_checked_at": datetime.now().isoformat(timespec="seconds"),
"egrul_status": "error",
}
def _parse_party(data: dict) -> dict:
"""Преобразовать DaData-объект party → наш dict."""
result = _empty_result()
result["inn"] = data.get("inn")
result["ogrn"] = data.get("ogrn")
result["kpp"] = data.get("kpp")
result["type"] = data.get("type") # LEGAL / INDIVIDUAL
name = data.get("name") or {}
result["full_name"] = name.get("full_with_opf") or name.get("full")
result["short_name"] = name.get("short_with_opf") or name.get("short")
mgmt = data.get("management") or {}
director = mgmt.get("name")
if director:
# DaData выдаёт ФИО заглавными буквами ("ИВАНОВ ИВАН ИВАНОВИЧ").
# Приводим к Title Case ("Иванов Иван Иванович").
result["director_name"] = director.title()
result["director_post"] = (mgmt.get("post") or "").title() or None
addr = data.get("address") or {}
result["address"] = addr.get("value") or addr.get("unrestricted_value")
state = data.get("state") or {}
result["status"] = state.get("status") # ACTIVE / LIQUIDATING / LIQUIDATED
reg_ts = state.get("registration_date")
if reg_ts:
# DaData возвращает timestamp в миллисекундах (epoch ms)
try:
dt = datetime.fromtimestamp(int(reg_ts) / 1000)
result["registration_date"] = dt.strftime("%Y-%m-%d")
except (ValueError, TypeError, OSError):
pass
# ОКВЭД основной
result["okved"] = data.get("okved")
# ─── Финансы (ФНС через DaData, D20) ──────────────────────────────────
result["employee_count"] = data.get("employee_count")
fin = data.get("finance") or {}
result["revenue"] = fin.get("income") # доходы за год (≈ оборот), руб
result["expense"] = fin.get("expense") # расходы за год, руб
result["finance_year"] = fin.get("year") # год отчётности (если отдан)
result["egrul_status"] = "found"
return result
# Префиксы которые часто добавляются к HR-имени работодателя но НЕ являются
# частью юр.названия. Убираем их перед запросом в DaData.
HR_PREFIXES = (
"ресторан", "кафе", "бар", "столовая", "пиццерия", "кофейня", "пекарня",
"салон красоты", "салон", "студия красоты", "студия", "парикмахерская",
"барбершоп", "имидж-лаборатория", "клиника", "медицинский центр", "стоматология",
"сеть кофеен", "сеть ресторанов", "сеть магазинов", "сеть",
"группа компаний", "группа", "холдинг", "магазин", "ателье",
"центр массажа", "центр", "академия", "школа", "детский сад",
"автосалон", "автосервис", "автомойка",
"юридическая компания", "юридический центр", "адвокатское бюро",
"консалтинговая компания", "ит-компания", "it компания",
"ип ", # «ИП Иванов Иван» → «Иванов Иван»
)
def _clean_for_dadata(name: str) -> list[str]:
"""Очистить имя HR-работодателя в кандидаты для DaData.
HH часто называет работодателя как «Ресторан The Бык (ИП Межлумова И.Ю.)».
DaData ищет по точному юр.названию, поэтому таких записей не находит.
Возвращает список кандидатов — пробуем по очереди:
1. Оригинал
2. Без префиксов «Ресторан / Кафе / ИП / Сеть / Студия...»
3. Без скобочного суффикса «(ИП ФИО)»
4. Содержимое скобок «(ИП Иванов И.И.)» как отдельный кандидат
"""
if not name:
return []
candidates = [name]
# Извлечь содержимое скобок и сам префикс-до-скобки
m_paren = re.search(r"^(.+?)\s*\(([^)]+)\)\s*$", name)
if m_paren:
head = m_paren.group(1).strip()
inner = m_paren.group(2).strip()
if head and head not in candidates:
candidates.append(head)
if inner and inner not in candidates:
candidates.append(inner)
# Снять HR-префиксы (case-insensitive). Сортируем по длине DESC чтобы
# длинные префиксы ('барбершоп') проверялись раньше коротких ('бар') —
# иначе 'Барбершоп BRITVA' → 'бершоп BRITVA'.
prefixes_sorted = sorted(HR_PREFIXES, key=len, reverse=True)
for base in list(candidates):
lower = base.lower()
for pref in prefixes_sorted:
# Проверяем что префикс — отдельное слово (за ним пробел или конец строки)
if lower.startswith(pref + " ") or lower == pref:
stripped = base[len(pref):].lstrip(" -—:").strip()
if stripped and stripped not in candidates:
candidates.append(stripped)
break
# Заменить подчёркивания на пробел (Meat_Coin → Meat Coin)
for base in list(candidates):
if "_" in base:
alt = base.replace("_", " ").strip()
if alt and alt not in candidates:
candidates.append(alt)
return candidates
def enrich_via_dadata(
name: str,
city: str | None = None,
timeout: float = 10.0,
) -> dict:
"""Поиск компании по имени через DaData Suggestions API.
name: название компании (бренд или юр.название)
city: опционально — для приоритизации московских результатов
Стратегия: пробуем несколько кандидатов очищенного имени (см. _clean_for_dadata):
оригинал → без HR-префикса → без скобочного суффикса → содержимое скобок.
Останавливаемся на первом найденном результате.
"""
result = _empty_result()
if not name:
result["egrul_status"] = "not_found"
return result
api_key = _api_key()
if not api_key:
logger.warning("DADATA_API_KEY не задан в .env — DaData enricher disabled")
return result
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": f"Token {api_key}",
}
candidates = _clean_for_dadata(name)
suggestions = []
for query in candidates:
body: dict = {"query": query, "count": 10}
# Приоритезация по городу
if city:
if "москва" in city.lower():
body["locations_boost"] = [{"kladr_id": "77"}]
elif "санкт-петербург" in city.lower() or "спб" in city.lower():
body["locations_boost"] = [{"kladr_id": "78"}]
try:
r = requests.post(SUGGEST_URL, json=body, headers=headers, timeout=timeout)
except requests.exceptions.RequestException as e:
logger.warning(f" DaData request failed for '{query}': {e}")
continue
if r.status_code == 403:
logger.error("DaData: 403 Forbidden — проверь DADATA_API_KEY")
return result
if r.status_code != 200:
continue
try:
data = r.json()
except ValueError:
continue
suggestions = data.get("suggestions") or []
if suggestions:
if query != name:
logger.debug(f" DaData: '{name!r}' → cleaned '{query}'{len(suggestions)} рез.")
break
if not suggestions:
result["egrul_status"] = "not_found"
return result
# Выбираем лучшего кандидата.
# Приоритет: ACTIVE + указанный город в адресе > просто ACTIVE > любой первый
best = None
if city:
city_low = city.lower()
for s in suggestions:
d = s.get("data") or {}
state = d.get("state") or {}
addr = (d.get("address") or {}).get("value") or ""
if state.get("status") == "ACTIVE" and city_low in addr.lower():
best = d
break
if not best:
for s in suggestions:
d = s.get("data") or {}
state = d.get("state") or {}
if state.get("status") == "ACTIVE":
best = d
break
if not best:
# Все ликвидированы — берём первого, не записываем в БД
best = suggestions[0].get("data") or {}
parsed = _parse_party(best)
return parsed
def enrich_via_dadata_by_inn(inn: str, timeout: float = 10.0) -> dict:
"""Поиск компании по ИНН через DaData findById API."""
result = _empty_result()
if not inn:
result["egrul_status"] = "not_found"
return result
api_key = _api_key()
if not api_key:
return result
body = {"query": inn}
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": f"Token {api_key}",
}
try:
r = requests.post(FIND_BY_ID_URL, json=body, headers=headers, timeout=timeout)
except requests.exceptions.RequestException as e:
logger.warning(f" DaData findById failed for INN {inn}: {e}")
return result
if r.status_code != 200:
return result
try:
data = r.json()
except ValueError:
return result
suggestions = data.get("suggestions") or []
if not suggestions:
result["egrul_status"] = "not_found"
return result
return _parse_party(suggestions[0].get("data") or {})
if __name__ == "__main__":
# Smoke-test
logging.basicConfig(level=logging.INFO, format="%(message)s")
cases = [
("ВкусВилл", "Москва"),
("Шоколадница", "Москва"),
("Гранд Пирог", "Москва"),
("Кулинарная лавка братьев Караваевых", "Москва"),
("ПАО Совкомбанк", "Москва"),
]
for name, city in cases:
info = enrich_via_dadata(name, city=city)
print(f"\n{name} ({city})")
print(f" status: {info['egrul_status']}")
if info["egrul_status"] == "found":
print(f" {info['full_name']}")
print(f" ИНН={info['inn']} ОГРН={info['ogrn']} КПП={info['kpp']}")
print(f" директор: {info['director_name']} ({info['director_post']})")
print(f" адрес: {info['address']}")
print(f" регистрация: {info['registration_date']}, статус: {info['status']}")
+699
View File
@@ -0,0 +1,699 @@
"""ЕГРЮЛ-обогащение лидов.
Источник: Rusprofile.ru — публичная база, без авторизации, бесплатно.
Парсим страницу поиска → переходим на детальную → достаём ИНН, ОГРН,
директора, дату регистрации, юр.адрес.
Стратегия запросов:
- Случайный User-Agent
- timeout 10 сек
- При 403/429 — пропускаем (логируем как 'error')
- Пауза между лидами в run_egrul_enrichment, не здесь
Ограничения:
- Поиск идёт по названию → могут быть промахи (в Rusprofile компания
может быть зарегистрирована под другим юр.названием).
- Для повышения точности можно фильтровать по городу через city
параметр (опционально).
"""
import logging
import re
from datetime import datetime
from typing import Optional
from urllib.parse import quote
import requests
import urllib3
from fake_useragent import UserAgent
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
logger = logging.getLogger(__name__)
_ua = UserAgent()
BASE_URL = "https://www.rusprofile.ru"
SEARCH_URL = BASE_URL + "/search?query={query}&type=ul"
# Регекспы для парсинга детальной страницы Rusprofile
PATTERNS = {
# ИНН — 10 цифр (юр. лицо) или 12 цифр (ИП).
# Стратегия: ищем рядом со словом "ИНН" в окне до 200 символов.
"inn": [
# Прямое совпадение с itemprop / meta
r'itemprop="taxID?"[^>]*>(\d{10,12})',
r'itemprop="taxID?"[^>]*content="(\d{10,12})"',
# В meta og:title или title часто есть полное название с ИНН
r'<meta[^>]+property="og:title"[^>]+content="[^"]*ИНН\s*(\d{10,12})',
r'<title>[^<]*ИНН\s*(\d{10,12})',
# В meta description
r'<meta[^>]+name="description"[^>]+content="[^"]*ИНН\s*(\d{10,12})',
# Generic после слова ИНН в окне 200 символов (но не КПП!)
r'\bИНН\b(?![^>]*КПП)[\s\S]{1,200}?>(\d{10,12})<',
r'\bИНН\b[\s\S]{1,80}?(\d{10,12})',
],
"ogrn": [
r'itemprop="vatID"[^>]*>(\d{13,15})',
r'<meta[^>]+content="[^"]*ОГРН\s*(\d{13,15})',
r'\bОГРН\b[\s\S]{1,200}?>(\d{13,15})<',
r'\bОГРН\b[\s\S]{1,80}?(\d{13,15})',
],
"director_name": [
# Структурированные данные (legacy — Rusprofile убрал к 2026)
r'itemprop="ceoName"[^>]*>([^<]{5,100})',
r'itemprop="employee"[\s\S]{1,200}?itemprop="name"[^>]*>([^<]{5,100})',
# 2026: Rusprofile показывает директора в AI-генерируемом описании:
# "Генеральным директором ... является ... — Наталия Юрьевна Нестерова"
# либо "Руководителем ... является ... Иванов Иван Иванович"
r'(?:Генеральным\s+директором|Руководителем)[\s\S]{1,200}?[—–-]\s*([А-ЯЁ][а-яё]+\s+[А-ЯЁ][а-яё]+(?:\s+[А-ЯЁ][а-яё]+)?)\b',
r'(?:Генеральный\s+директор|Руководитель)[\s:]*[—–-]?\s*([А-ЯЁ][а-яё]+\s+[А-ЯЁ][а-яё]+(?:\s+[А-ЯЁ][а-яё]+)?)\b',
# Текстовый поиск через >...< (legacy с structured разметкой)
r'(?:Генеральный директор|Директор|Руководитель)[\s\S]{1,400}?>([А-ЯЁ][а-яё]+\s+[А-ЯЁ][а-яё]+(?:\s+[А-ЯЁ][а-яё]+)?)<',
],
"registration_date": [
# ISO datetime атрибут (предпочтительно)
r'itemprop="foundingDate"[^>]*content="(\d{4}-\d{2}-\d{2})',
r'itemprop="foundingDate"[^>]*>(\d{4}-\d{2}-\d{2})',
r'datetime="(\d{4}-\d{2}-\d{2})"[^>]*itemprop="foundingDate"',
# Текстовый формат "12 марта 2018"
r'(?:Дата регистрации|Зарегистрирован[аои]?)[\s\S]{1,200}?(\d{1,2}\s+(?:января|февраля|марта|апреля|мая|июня|июля|августа|сентября|октября|ноября|декабря)\s+\d{4})',
# Формат "12.03.2018"
r'(?:Дата регистрации|Зарегистрирован[аои]?)[\s\S]{1,200}?(\d{1,2}\.\d{1,2}\.\d{4})',
],
"address_legal": [
# Структурированный адрес
r'itemprop="address"[\s\S]{1,500}?itemprop="streetAddress"[^>]*>([^<]{10,300})',
r'itemprop="address"[^>]*>([^<]{10,300})',
# Текстовый — после "Юридический адрес"
r'(?:Юридический адрес|Адрес юр\.?|Адрес\s+регистрации)[\s\S]{1,300}?>([А-ЯЁ][^<]{15,300})<',
],
"website": [
# Структурированный URL компании
r'itemprop="url"[^>]+href="(https?://[^"]{4,200})"',
r'itemprop="url"[^>]*>(https?://[^<]{4,200})',
# Текстовый — "Сайт компании: URL"
r'(?:Сайт\s+компании|Веб-сайт|Сайт)[\s:]*[^<]*?<a[^>]+href="(https?://[^"]{4,200})"',
# data-атрибут
r'data-website="(https?://[^"]{4,200})"',
],
"phone": [
# Структурированный телефон
r'itemprop="telephone"[^>]*>([^<]{5,30})',
r'itemprop="telephone"[^>]+content="([^"]{5,30})"',
# tel: ссылки
r'href="tel:(\+?\d[\d\-\s\(\)]{7,20})"',
],
}
# Месяцы для парсинга русских дат
RU_MONTHS = {
"января": 1, "февраля": 2, "марта": 3, "апреля": 4,
"мая": 5, "июня": 6, "июля": 7, "августа": 8,
"сентября": 9, "октября": 10, "ноября": 11, "декабря": 12,
}
def _headers() -> dict:
return {
"User-Agent": _ua.random,
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "ru-RU,ru;q=0.9",
}
def _find_first(patterns: list[str], html: str) -> Optional[str]:
"""Перебор regex-паттернов, возврат первого совпадения."""
for p in patterns:
m = re.search(p, html, re.IGNORECASE | re.DOTALL)
if m:
return m.group(1).strip()
return None
def _parse_ru_date(raw: str) -> Optional[str]:
"""'12 марта 2018' / '12.03.2018''2018-03-12' (ISO)."""
if not raw:
return None
raw = raw.strip()
# Формат "12.03.2018"
m = re.match(r"(\d{1,2})\.(\d{1,2})\.(\d{4})", raw)
if m:
d, mo, y = m.groups()
return f"{y}-{int(mo):02d}-{int(d):02d}"
# Формат "12 марта 2018"
m = re.match(r"(\d{1,2})\s+(\w+)\s+(\d{4})", raw)
if m:
d, mon_ru, y = m.groups()
mo = RU_MONTHS.get(mon_ru.lower())
if mo:
return f"{y}-{mo:02d}-{int(d):02d}"
return None
def _strip_html(text: str) -> str:
"""Удалить HTML-теги и лишние пробелы из строки."""
if not text:
return text
text = re.sub(r"<[^>]+>", "", text)
text = re.sub(r"\s+", " ", text)
return text.strip()
# Должности и не-имена — отсеиваем чтобы не попадали в director_name
NON_NAME_TOKENS = {
# Должности
"генеральный", "директор", "руководитель", "управляющий",
"председатель", "совет", "совета", "учредитель",
"конкурсный", "временный", "ликвидатор", "управление",
"производство", "штукатурных", "работ", "услуги",
# Юр.формы
"общество", "ограниченной", "ответственностью",
"акционерное", "акционерного", "публичное", "непубличное",
"товарищество", "кооператив", "товарищества",
# Слова которые ЯВНО входят в названия компаний, но не в ФИО
"инвест", "групп", "групп.", "групп,", "холдинг",
"трейд", "трэйд", "системс", "технологии", "сервис",
"хаус", "плюс", "плаза", "лтд", "медиа", "капитал",
"финанс", "финансы", "консалт", "консалтинг",
"проджект", "проджектс", "девелопмент", "пром",
# Англо-буквы тоже встречаются в названиях
"ай", "ти", "би", "ви", "энд",
}
def _validate_director_name(
raw: str | None,
company_name: str | None = None,
) -> str | None:
"""Проверить что это похоже на ФИО (Фамилия Имя [Отчество]).
Отсекает:
• Должности и обрывки ('Генеральный директор', 'Конкурсный управляющий')
• Слова входящие в название компании (защита от «Ти Инвест» как
director_name когда компания — «ООО Инвест Ай Ти»). Если хотя бы
2 слова из кандидата встречаются в company_name — это название.
"""
if not raw:
return None
cleaned = _strip_html(raw)
if not cleaned:
return None
words = cleaned.split()
if not (2 <= len(words) <= 4):
return None
# Все слова должны начинаться с прописной русской буквы и быть длиннее 2 символов
for w in words:
if len(w) < 2:
return None
if not re.match(r"^[А-ЯЁ][а-яё-]+$", w):
return None
if w.lower() in NON_NAME_TOKENS:
return None
# Защита: если слова из «директора» входят в название компании →
# это не ФИО, а кусок названия (кейс: ООО «Инвест Ай Ти» → «Ти Инвест»).
if company_name:
# Нормализуем company_name — берём только русские слова длиннее 3 символов
company_words = {
w.lower() for w in re.findall(r"[А-ЯЁа-яё]{4,}", company_name)
if w.lower() not in NON_NAME_TOKENS
}
if company_words:
# Считаем сколько слов кандидата встречаются в названии
overlap = sum(1 for w in words if w.lower() in company_words)
# Если ≥2 слов входят в название → это кусок названия, не ФИО
if overlap >= 2:
return None
# Если кандидат 2 слова и 1 из них в названии → подозрительно, отказ
if len(words) == 2 and overlap >= 1:
return None
return cleaned
def _find_first_company_url(search_html: str) -> Optional[str]:
"""Найти URL первой компании/ИП в результатах поиска Rusprofile.
/id/N — юр.лица (ООО, АО, ...)
/ip/N — индивидуальные предприниматели
"""
m = re.search(r'<a[^>]+href="(/(?:id|ip)/\d+)"[^>]*>', search_html)
if m:
return BASE_URL + m.group(1)
return None
def _empty_result() -> dict:
"""Базовая структура результата с пустыми полями."""
return {
"inn": None,
"ogrn": None,
"director_name": None,
"registration_date": None,
"address": None,
"website": None,
"phone_primary": None,
"egrul_checked_at": datetime.now().isoformat(timespec="seconds"),
"egrul_status": "error",
}
def _parse_company_detail(
html: str,
result: dict,
company_name_query: str | None = None,
) -> dict:
"""Распарсить детальную страницу Rusprofile (/id/N или /ip/N) —
извлечь ИНН/ОГРН/директора (или ФИО ИП)/дату/адрес.
company_name_query — оригинальное имя по которому искали (для anti-false-positive
в validate_director_name: если ФИО содержит слова из названия компании, это
кусок названия а не ФИО).
"""
# Ограничиваем HTML для regex (защита от backtracking на больших страницах)
if len(html) > 500_000:
html = html[:500_000]
# Распознаём — это страница ИП или юр.лица
is_ip_page = bool(re.search(r"<title>\s*ИП\s+[А-ЯЁ]", html))
inn = _find_first(PATTERNS["inn"], html)
ogrn = _find_first(PATTERNS["ogrn"], html)
director = None
if is_ip_page:
# Для ИП — извлекаем ФИО предпринимателя из title:
# "<title>ИП Симонян Асмик Вардановна, село Угловая (ИНН ...)</title>"
m = re.search(
r"<title>\s*ИП\s+([А-ЯЁ][а-яё]+\s+[А-ЯЁ][а-яё]+(?:\s+[А-ЯЁ][а-яё]+)?)",
html,
)
if m:
director = _validate_director_name(m.group(1), company_name_query) or None
# Если в title не нашли — пробуем h1
if not director:
m = re.search(
r"<h1[^>]*>\s*ИП\s+([А-ЯЁ][а-яё]+\s+[А-ЯЁ][а-яё]+(?:\s+[А-ЯЁ][а-яё]+)?)",
html,
)
if m:
director = _validate_director_name(m.group(1), company_name_query) or None
else:
# Для ООО — стандартный перебор паттернов «Генеральным директором ... — ФИО»
for p in PATTERNS["director_name"]:
for match in re.finditer(p, html, re.IGNORECASE | re.DOTALL):
candidate = match.group(1).strip()
validated = _validate_director_name(candidate, company_name_query)
if validated:
director = validated
break
if director:
break
reg_date_raw = _find_first(PATTERNS["registration_date"], html)
address_raw = _find_first(PATTERNS["address_legal"], html)
website_raw = _find_first(PATTERNS["website"], html)
phone_raw = _find_first(PATTERNS["phone"], html)
if inn:
result["inn"] = inn
if ogrn:
result["ogrn"] = ogrn
if director:
result["director_name"] = director
if reg_date_raw:
result["registration_date"] = _parse_ru_date(reg_date_raw)
if address_raw:
result["address"] = _strip_html(address_raw)
if website_raw:
ws = website_raw.strip()
# Фильтр шумных доменов (rusprofile сам себя, схемы аналитики)
if not any(b in ws.lower() for b in ("rusprofile.ru", "yandex.ru/maps", "search?")):
result["website"] = ws
if phone_raw:
# Нормализуем телефон до цифр + ведущей "+"
digits = re.sub(r"[^\d+]", "", phone_raw)
if 10 <= len(re.sub(r"\D", "", digits)) <= 15:
result["phone_primary"] = digits
if any(result[k] for k in ("inn", "ogrn", "director_name", "registration_date")):
result["egrul_status"] = "found"
else:
result["egrul_status"] = "not_found"
return result
def enrich_egrul_by_inn(
inn: str,
timeout: float = 10.0,
company_name: str | None = None,
) -> dict:
"""Поиск в Rusprofile по уже известному ИНН (для WB/HH-лидов с ИНН).
Стратегия: /search?query={INN} → если Rusprofile уверен в совпадении,
делает редирект на /id/N (ООО) или /ip/N (ИП).
Если НЕ редиректит (остаётся на /search) — это значит ИНН неоднозначен,
и доверять первой попавшейся ссылке нельзя (может быть чужая компания
с похожими цифрами). В этом случае возвращаем not_found.
Дополнительно: ВЕРИФИЦИРУЕМ что на детальной странице действительно
наш ИНН (защита от ложных редиректов).
Возвращает тот же dict что enrich_egrul.
"""
result = _empty_result()
if not inn:
result["egrul_status"] = "not_found"
return result
try:
# ИНН в search → ожидаем редирект на /id/N или /ip/N
search_url = SEARCH_URL.format(query=quote(inn))
resp = requests.get(
search_url, headers=_headers(), timeout=timeout, verify=False,
allow_redirects=True,
)
if resp.status_code in (403, 429):
logger.warning(f" Rusprofile blocked us ({resp.status_code}) для ИНН {inn}")
return result
if resp.status_code != 200:
return result
# Если остались на /search → ищем РОВНО ОДНУ ссылку /id/ или /ip/.
# Если ноль или >1 ссылок — ИНН неоднозначен, отказываемся (safety).
# Был кейс: ИП Гильмизянов 166023395678 → Rusprofile показал страницу
# results со ссылками на чужие компании → подсунули чужого директора.
target_url: str | None = None
if "/id/" not in resp.url and "/ip/" not in resp.url:
ip_links = re.findall(r'href="(/ip/\d+)"', resp.text)
id_links = re.findall(r'href="(/id/\d+)"', resp.text)
ip_uniq = list(dict.fromkeys(ip_links))
id_uniq = list(dict.fromkeys(id_links))
# Однозначное совпадение: ровно 1 ссылка одного типа, ни одной другого
if len(ip_uniq) == 1 and len(id_uniq) == 0:
target_url = BASE_URL + ip_uniq[0]
elif len(id_uniq) == 1 and len(ip_uniq) == 0:
target_url = BASE_URL + id_uniq[0]
else:
result["egrul_status"] = "not_found"
return result
# Открываем единственную найденную ссылку
resp = requests.get(target_url, headers=_headers(), timeout=timeout, verify=False)
if resp.status_code != 200:
return result
# Прямой редирект ИЛИ переход по единственной ссылке — парсим
# и обязательно верифицируем ИНН (защита от чужого редиректа,
# как случай Хошафовой 616821187962 → /ip/319619600193563 (Андреев)).
if inn not in resp.text:
logger.debug(
f" ИНН {inn} не найден в HTML страницы — ложный редирект, skip"
)
result["egrul_status"] = "not_found"
return result
return _parse_company_detail(resp.text, result, company_name_query=company_name)
except requests.exceptions.Timeout:
logger.debug(f" Timeout на Rusprofile для ИНН {inn}")
except requests.exceptions.RequestException as e:
logger.debug(f" Ошибка Rusprofile для ИНН {inn}: {e}")
except Exception as e:
logger.exception(f" Неожиданная ошибка для ИНН {inn}: {e}")
return result
def _normalize_for_match(s: str) -> str:
"""Нормализовать строку для fuzzy-сравнения названий компаний.
Удаляем юр.формы, кавычки, знаки препинания, lowercase.
«ООО "Ромашка"» → «ромашка»
«Общество с ограниченной ответственностью «Альфа Бета»» → «альфа бета»
"""
if not s:
return ""
s = s.lower()
# Убираем юр.формы и шумные слова
s = re.sub(
r"\b(?:общество\s+с\s+ограниченной\s+ответственностью|"
r"индивидуальный\s+предприниматель|"
r"акционерное\s+общество|"
r"публичное\s+акционерное\s+общество|"
r"непубличное\s+акционерное\s+общество|"
r"товарищество\s+на\s+вере|"
r"ооо|ип|ао|пао|зао|нко|оао|гк|кб|нпф|пкф|тд)\b",
"",
s,
)
# Убираем кавычки/пунктуацию
s = re.sub(r"[«»\"'`'.,()]", " ", s)
s = re.sub(r"\s+", " ", s).strip()
return s
def _strip_hh_suffixes(name: str) -> str:
"""Срезать HR-локационные суффиксы из HH-имён.
HH добавляет к названию работодателя типичные суффиксы вроде
«. Центральный офис», «, Работа в магазине», «Бизнес и инфраструктура».
Они НЕ часть юр.названия и портят fuzzy-match с Rusprofile.
Примеры:
'ПАО Совкомбанк. Центральный офис.''ПАО Совкомбанк'
'АШАН Ритейл Россия, Работа в магазине''АШАН Ритейл Россия'
'ПАО Банк ПСБ, Бизнес и инфраструктура''ПАО Банк ПСБ'
'ОАО Концерн Радиоэлектронные технологии, ОАО, УК''ОАО Концерн Радиоэлектронные технологии'
'Перекресток. Кафе Select''Перекресток'
"""
if not name:
return name
# Известные HR-суффиксы (после первой запятой/точки/слэша). Сравниваем case-insensitive.
HR_MARKERS = [
"центральный офис", "центральный офис", "головной офис",
"работа в магазине", "работа в офисе", "работа в ресторане",
"бизнес и инфраструктура", "офис продаж",
"кафе select",
"представительство", "филиал",
"ук", "управляющая компания",
]
parts = re.split(r"[.,/]", name, maxsplit=1)
if len(parts) == 2:
head, tail = parts[0].strip(), parts[1].strip().lower()
# Если хвост содержит маркер локации — отбрасываем хвост
for m in HR_MARKERS:
if m in tail:
return head
# Если хвост короткий и не похож на ключевую часть — тоже отбрасываем
# (например "Перекресток. Кафе Select" — хвост "Кафе Select" не специфичен)
if len(tail) <= 30 and any(w in tail for w in ("офис", "точка", "магазин", "ресторан")):
return head
return name
def _name_match_score(found: str, query: str) -> float:
"""0.0-1.0: насколько найденное название похоже на искомое.
Считаем долю слов из query (длиной ≥3) которые встречаются в found.
Если в query одно слово — ищем по подстроке.
"""
f = _normalize_for_match(found)
q = _normalize_for_match(query)
if not f or not q:
return 0.0
q_words = [w for w in q.split() if len(w) >= 3]
if not q_words:
return 0.0
if len(q_words) == 1:
return 1.0 if q_words[0] in f else 0.0
matched = sum(1 for w in q_words if w in f)
return matched / len(q_words)
def _extract_company_title(html: str) -> str:
"""Извлечь название компании со страницы Rusprofile.
КРИТИЧНО (исправлено 2026-05-21): на странице /id/N может быть несколько
<h1>, причём первый — рекламный блок другой компании. Реальное имя
компании всегда в <title>. Поэтому приоритет:
1. <title> — главный источник (Rusprofile генерирует его из карточки)
2. <h1> — fallback если title пустой
Пример title: «АО "Вкусвилл" Черноголовка (ИНН 7734443270) адрес и реквизиты»
→ возвращаем 'АО "Вкусвилл" Черноголовка'
"""
# 1. <title> — приоритет (главный)
m = re.search(r"<title>([^<]+)</title>", html, re.IGNORECASE)
if m:
title = m.group(1)
# Срезаем хвосты: "(ИНН ...) адрес", ", г.Москва", "адрес и реквизиты"
title = re.split(r"[(,]|(?:\sадрес\s|\sИНН\s)", title, maxsplit=1)[0]
title = title.strip()
if title and len(title) >= 3:
return title
# 2. <h1> — fallback
m = re.search(r"<h1[^>]*>([^<]{3,300})</h1>", html, re.IGNORECASE)
if m:
return _strip_html(m.group(1))
return ""
def _looks_too_generic(name: str) -> bool:
"""True если имя слишком общее/короткое — Rusprofile подсунет случайную компанию."""
cleaned = _normalize_for_match(name)
if not cleaned:
return True
words = cleaned.split()
# 1 слово короче 6 символов → точно общее
if len(words) == 1 and len(words[0]) < 6:
return True
# 2+ коротких слов в имени из 3 букв — общее
if len(words) >= 2 and all(len(w) <= 4 for w in words):
return True
return False
def enrich_egrul(
name: str,
city: str | None = None,
timeout: float = 10.0,
debug_dump_html: str | None = None,
) -> dict:
"""Поиск компании в Rusprofile по названию С УСИЛЕННОЙ ВАЛИДАЦИЕЙ.
Стратегия:
1. /search?query={name} → ждём редирект на /id/N или /ip/N
2. Если редирект — fuzzy-сравнение найденного title с искомым name.
Если совпадение слабое (<50%) — отказ, чтобы не подсунуть чужого.
3. Если /search НЕ редиректит — ищем строго одну ссылку /id/ или /ip/
(без множественных кандидатов) И только если имя достаточно
специфичное (не «Банкирро» / «Флант»).
Цель: лучше not_found чем выдать чужого директора.
"""
result = _empty_result()
if not name:
result["egrul_status"] = "not_found"
return result
# Срезаем HR-суффиксы из HH-имён («ПАО Совкомбанк. Центральный офис.» →
# «ПАО Совкомбанк»). Иначе fuzzy-match не пройдёт — Rusprofile не знает
# «Центральный офис» как часть юр.названия.
name_for_search = _strip_hh_suffixes(name)
if name_for_search != name:
logger.debug(f" HR-suffix stripped: {name!r}{name_for_search!r}")
# Слишком общие имена («Флант», «MOOD», «4hands») — Rusprofile в 99%
# подсунет случайную совпавшую компанию. Лучше сразу отказ.
if _looks_too_generic(name_for_search):
logger.debug(f" enrich_egrul: '{name_for_search}' слишком общее → skip")
result["egrul_status"] = "not_found"
return result
# Уточняем поиск городом если есть
query = name_for_search
if city and city.lower() != "москва":
query = f"{name_for_search} {city}"
try:
# 1. Поиск
search_url = SEARCH_URL.format(query=quote(query))
resp = requests.get(
search_url, headers=_headers(), timeout=timeout, verify=False,
allow_redirects=True,
)
if resp.status_code in (403, 429):
logger.warning(f" Rusprofile blocked us ({resp.status_code}) для '{name}'")
return result
if resp.status_code != 200:
logger.debug(f" Rusprofile вернул {resp.status_code} для '{name}'")
return result
# Если редирект на /id/N или /ip/N — Rusprofile уверен в матче.
# Если на /search — кандидатов много, доверять опасно.
if "/id/" in resp.url or "/ip/" in resp.url:
html = resp.text
# Fuzzy-проверка названия: то ли это что мы искали?
found_title = _extract_company_title(html)
score = _name_match_score(found_title, name)
if score < 0.3:
logger.debug(
f" enrich_egrul: '{name}' → найдено '{found_title[:60]}' "
f"(name_match={score:.2f}) — не совпадает, skip"
)
result["egrul_status"] = "not_found"
return result
else:
# /search не редиректит → берём ПЕРВУЮ ссылку (Rusprofile сортирует
# по релевантности). Защита от подсунутой чужой компании —
# fuzzy-сравнение title на следующем шаге.
company_url = _find_first_company_url(resp.text)
if not company_url:
logger.debug(f" enrich_egrul: '{name}' — ни одной /id/ или /ip/ ссылки")
result["egrul_status"] = "not_found"
return result
resp = requests.get(company_url, headers=_headers(), timeout=timeout, verify=False)
if resp.status_code != 200:
return result
html = resp.text
# Главная защита: title найденной компании должен быть похож на искомое имя
found_title = _extract_company_title(html)
score = _name_match_score(found_title, name)
if score < 0.3:
logger.debug(
f" enrich_egrul: '{name}''{found_title[:60]}' "
f"(name_match={score:.2f}) — не совпадает, skip"
)
result["egrul_status"] = "not_found"
return result
# Debug: сохранить HTML для отладки regex'ов
if debug_dump_html:
try:
with open(debug_dump_html, "w", encoding="utf-8") as f:
f.write(html)
logger.info(f" [debug] HTML сохранён в {debug_dump_html}")
except Exception as e:
logger.warning(f" [debug] Не удалось сохранить HTML: {e}")
# Извлекаем все поля (один общий парсер для by_name и by_inn).
# Передаём company_name=name_for_search (без HR-суффиксов) чтобы
# валидатор директора отсекал слова из названия.
return _parse_company_detail(html, result, company_name_query=name_for_search)
except requests.exceptions.Timeout:
logger.debug(f" Timeout на Rusprofile для '{name}'")
except requests.exceptions.RequestException as e:
logger.debug(f" Ошибка Rusprofile для '{name}': {e}")
except Exception as e:
logger.exception(f" Неожиданная ошибка ЕГРЮЛ для '{name}': {e}")
return result
if __name__ == "__main__":
# Smoke-тест на реальных названиях из БД.
# У первой компании сохраняем HTML для отладки regex'ов → debug_rusprofile.html
logging.basicConfig(level=logging.INFO, format="%(message)s")
test_names = [
("Гвидон", "Москва"),
("Кафе Пушкинъ", "Москва"),
("Тануки", "Москва"),
]
for idx, (name, city) in enumerate(test_names):
print(f"\n→ Поиск: {name} ({city})")
# Для первой компании сохраняем HTML
debug = "debug_rusprofile.html" if idx == 0 else None
info = enrich_egrul(name, city, debug_dump_html=debug)
for k, v in info.items():
print(f" {k}: {v}")
+450
View File
@@ -0,0 +1,450 @@
"""Tier 2 enrichment — анализ сайта компании.
Делает 1 HTTP-запрос → ищет в HTML маркеры:
- CMS / конструктор сайта (tilda, wix, wordpress, bitrix, ...)
- Live-чат (jivo, talk-me, ...)
- Онлайн-запись / онлайн-бронирование (yclients, dikidi, ...)
- Аналитика (Я.Метрика, Google Analytics, GTM)
- Email на странице → корпоративный или бесплатный домен
Все детекторы — по text-маркерам в HTML.
False positives возможны (например, упоминание "yclients" в блоге),
но для скоринга сигнал достаточный.
"""
import logging
import re
from datetime import datetime
from typing import Optional
import requests
import urllib3
from normalization import extract_phones_from_text, normalize_domain, is_valid_inn, is_valid_ogrn
# Глушим спам InsecureRequestWarning — verify=False нужен из-за множества старых сайтов
# с просроченными SSL-сертификатами. Это безопасно, т.к. мы только читаем HTML, не передаём данные.
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
logger = logging.getLogger(__name__)
# ───────────────────────────────────────────────────────────────────────
# Сигнатуры — что искать в HTML (всё в нижнем регистре)
# ───────────────────────────────────────────────────────────────────────
CMS_SIGNATURES: dict[str, list[str]] = {
"tilda": ["tilda.cc", "tildacdn", "tilda-blocks", "t-rec"],
"wix": ["wixstatic.com", "wix.com/script", "_wixcssmodules"],
"wordpress": ["wp-content", "wp-includes", "wp-json"],
"bitrix": ["bitrix/", "bx-core", "/bitrix/js"],
"modx": ["modx.com", "/manager/modx", "modxcms"],
"joomla": ["joomla!", "/components/com_", "/media/jui/"],
"drupal": ["drupal.settings", "/sites/all/", "/sites/default/"],
"webflow": ["webflow.io", "webflow.com", "wf-tabs"],
"squarespace":["squarespace.com", "static1.squarespace"],
"insales": ["insales.ru", "insales-cdn"],
"shopify": ["cdn.shopify.com", "shopify.com/s/files"],
"opencart": ["catalog/view/theme", "route=common", "route=product"],
"1c-bitrix-sites": ["sites.bitrix24", "bitrix24.site"],
"readymag": ["readymag.com"],
"craftum": ["craftum.com", "craftumusercontent"],
}
# Конструкторы / авто-визитки — определяются по ДОМЕНУ (надёжнее HTML-сигнатур).
# Я.Бизнес (clients.site / business.site) — авто-сайт из Яндекс.Бизнеса,
# слабейшее веб-присутствие; его движок НЕ детектится как CMS по HTML → ловим по URL.
BUILDER_DOMAINS = {
"clients.site": "yandex_business",
"business.site": "yandex_business",
"tilda.ws": "tilda",
"wixsite.com": "wix",
"nethouse.": "nethouse",
"taplink.": "taplink",
".ucoz.": "ucoz",
}
LIVE_CHAT_SIGNATURES = [
"jivosite", "jivo.ru", "jivochat",
"talk-me", "talkme.ru",
"carrotquest", "carrot-quest",
"usedesk",
"verbox.ru",
"redhelper",
"chat2desk",
"webim.ru",
"tawk.to", "embed.tawk",
"crisp.chat",
"intercom.io", "widget.intercom",
"livechatinc.com",
"callbackhunter",
"callback24",
]
ONLINE_BOOKING_SIGNATURES = [
"yclients.com", "n.yclients",
"dikidi.ru", "dikidi.net",
"ucalendar",
"altegio",
"sonline.su",
"gbooking",
"tickt.ee",
"reservepad",
"bookform",
"rezgo",
# Текстовые маркеры (русский интерфейс)
"онлайн-запис", "онлайн запис",
"забронировать столик", "забронировать стол",
]
ANALYTICS_SIGNATURES = [
# Яндекс.Метрика
"mc.yandex.ru/metrika", "yandex_metrika", "ym(",
# Google Analytics + GTM
"google-analytics.com", "googletagmanager.com",
"gtag(", "ga('send'", "ga('create'",
# Mail.ru top
"top.mail.ru", "top-fwz1.mail.ru",
]
# Домены бесплатной почты — если email на них, у компании нет своего email-сервера.
FREE_EMAIL_DOMAINS = {
"gmail.com", "googlemail.com",
"mail.ru", "list.ru", "bk.ru", "inbox.ru",
"yandex.ru", "ya.ru", "yandex.com",
"rambler.ru", "lenta.ru", "myrambler.ru",
"hotmail.com", "outlook.com", "live.com",
"yahoo.com",
"icloud.com", "me.com",
"protonmail.com",
"qq.com", "163.com",
}
# Ограничения длины по RFC 5321: local-part ≤ 64, domain ≤ 253, TLD ≤ 24.
# Это защищает от catastrophic backtracking на патологически длинных входах.
EMAIL_PATTERN = re.compile(r"[a-zA-Z0-9._%+-]{1,64}@[a-zA-Z0-9.-]{1,253}\.[a-zA-Z]{2,24}")
# ───────────────────────────────────────────────────────────────────────
# Детекторы (atomic — каждая делает одну вещь)
# ───────────────────────────────────────────────────────────────────────
def detect_cms(html_lower: str) -> Optional[str]:
"""Возвращает ключ из CMS_SIGNATURES либо 'custom' если ничего не нашли."""
for cms_name, signatures in CMS_SIGNATURES.items():
if any(sig in html_lower for sig in signatures):
return cms_name
return "custom"
def detect_cms_by_url(url: Optional[str]) -> Optional[str]:
"""Определить конструктор по домену (Я.Бизнес и пр.). None если не конструктор."""
u = (url or "").lower()
for frag, cms in BUILDER_DOMAINS.items():
if frag in u:
return cms
return None
def detect_any(html_lower: str, signatures: list[str]) -> bool:
return any(sig in html_lower for sig in signatures)
def extract_emails(html: str) -> list[str]:
"""Все email со страницы (с сохранением порядка, без дублей)."""
raw = EMAIL_PATTERN.findall(html)
seen: set[str] = set()
result: list[str] = []
for email in raw:
e = email.lower()
if e not in seen:
seen.add(e)
result.append(e)
return result
def classify_email_domain(email: str) -> str:
"""'corporate' если домен email — собственный, 'free' если из FREE_EMAIL_DOMAINS."""
if "@" not in email:
return "free"
domain = email.split("@", 1)[1].lower()
return "free" if domain in FREE_EMAIL_DOMAINS else "corporate"
# ───────────────────────────────────────────────────────────────────────
# Главная функция
# ───────────────────────────────────────────────────────────────────────
def analyze_website(url: str, timeout: float = 6.0) -> dict:
"""Запрашивает сайт, возвращает все Tier 2 поля.
Все поля под None = не удалось определить (например, сайт мёртвый).
"""
result: dict = {
"site_alive": None,
"site_status_code": None,
"cms_type": None,
"has_live_chat": None,
"has_online_booking": None,
"has_analytics": None,
"email_domain_type": None,
"site_checked_at": datetime.now().isoformat(timespec="seconds"),
# Найденные контакты со страницы сайта — для слияния в лида через update_lead_contacts.
# Эти поля НЕ входят в ENRICHMENT_FIELDS (не пишутся через update_enrichment).
"emails_found": [],
"phones_found": [],
# ИНН/ОГРН/КПП из footer сайта (152-ФЗ disclosure)
"inn": None,
"ogrn": None,
"kpp": None,
}
if not url:
return result
headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/121.0 Safari/537.36"
),
"Accept-Language": "ru-RU,ru;q=0.9",
}
try:
# timeout=(connect, read) — раздельные таймауты, чтобы сайт не мог
# повесить парсер бесконечно если соединение установилось но сервер
# отдаёт данные по чуть-чуть.
resp = requests.get(
url, headers=headers,
timeout=(timeout, timeout),
allow_redirects=True, verify=False, # многие старые сайты с битым SSL
stream=False,
)
result["site_status_code"] = resp.status_code
result["site_alive"] = 1 if 200 <= resp.status_code < 400 else 0
except requests.exceptions.SSLError:
result["site_alive"] = 0
result["site_status_code"] = -1 # маркер SSL-ошибки
return result
except requests.exceptions.Timeout:
result["site_alive"] = 0
result["site_status_code"] = -2 # маркер таймаута
return result
except requests.exceptions.RequestException as e:
logger.debug(f" Не удалось получить {url}: {e}")
result["site_alive"] = 0
result["site_status_code"] = -3 # маркер общей ошибки
return result
if not result["site_alive"]:
return result
# Анализ HTML.
# Ограничение размера: некоторые сайты отдают 5-10MB HTML
# (раздутые JS-бандлы, JSON-LD, embedded data), на которых regex
# email/phone начинает страдать catastrophic backtracking и виснет на
# минуты. Все полезные сигналы (CMS, чат, запись, аналитика, email,
# телефон) обычно в первых ~200KB страницы. Обрезаем агрессивно.
MAX_HTML_BYTES = 500_000 # 500KB
html = resp.text
if len(html) > MAX_HTML_BYTES:
logger.debug(f" HTML обрезан с {len(html)} до {MAX_HTML_BYTES} байт")
html = html[:MAX_HTML_BYTES]
html_lower = html.lower()
# Сначала по домену (Я.Бизнес и пр. авто-визитки), потом по HTML-сигнатурам.
result["cms_type"] = detect_cms_by_url(resp.url) or detect_cms(html_lower)
result["has_live_chat"] = 1 if detect_any(html_lower, LIVE_CHAT_SIGNATURES) else 0
result["has_online_booking"] = 1 if detect_any(html_lower, ONLINE_BOOKING_SIGNATURES) else 0
result["has_analytics"] = 1 if detect_any(html_lower, ANALYTICS_SIGNATURES) else 0
# ─── ИНН/ОГРН/КПП из footer сайта ────────────────────────────────────
# По 152-ФЗ юр.лица обязаны публиковать реквизиты на сайте. Часто это
# не на главной, а на /contacts/ или /o-kompanii/. Поэтому:
# 1. Ищем на главной (текущий html)
# 2. Если нет — пробуем 4 типичных contact-страницы (1-2 сек каждая)
inn_val, ogrn_val, kpp_val = _extract_inn_ogrn_kpp(html)
if not inn_val:
# Стучимся на типичные contact/about-страницы
from urllib.parse import urljoin
for path in ("/contacts/", "/contact/", "/kontakty/", "/about/",
"/o-kompanii/", "/rekvizity/", "/info/"):
sub_url = urljoin(url, path)
try:
sub_resp = requests.get(
sub_url, headers=headers,
timeout=(timeout, timeout), verify=False, stream=False,
)
if sub_resp.status_code != 200:
continue
sub_html = sub_resp.text[:MAX_HTML_BYTES]
sub_inn, sub_ogrn, sub_kpp = _extract_inn_ogrn_kpp(sub_html)
if sub_inn:
inn_val = sub_inn
ogrn_val = ogrn_val or sub_ogrn
kpp_val = kpp_val or sub_kpp
logger.debug(f" ИНН найден на {path}: {inn_val}")
break
except Exception:
continue
if inn_val:
result["inn"] = inn_val
if ogrn_val:
result["ogrn"] = ogrn_val
if kpp_val:
result["kpp"] = kpp_val
# ─── Email — приоритет MAILTO-ссылок ─────────────────────────────────
# Любой email на странице может быть шумом (партнёр, mailchimp, sentry,
# пример в документации). Реальный контакт компании — обычно в footer'е
# как <a href="mailto:info@example.com">. Извлекаем приоритетно ИХ.
mailto_emails = re.findall(
r'href=["\']?mailto:([a-zA-Z0-9._%+-]{1,64}@[a-zA-Z0-9.-]{1,253}\.[a-zA-Z]{2,24})',
html,
)
if mailto_emails:
# Чистим, уникализируем, фильтруем шумные домены
clean = _filter_quality_emails(mailto_emails)
result["emails_found"] = clean[:5] # max 5 — больше уже нерелевантно
if clean:
result["email_domain_type"] = classify_email_domain(clean[0])
else:
# Fallback — текстовый поиск, но берём топ-3 по частоте и только те
# которые встречаются ≥1 раз (можно даже фильтровать ≥2 раза для шума).
all_emails = extract_emails(html)
if all_emails:
clean = _filter_quality_emails(all_emails)
# Сортируем по частоте — самые упоминаемые сверху
from collections import Counter
cnt = Counter(clean)
top = [e for e, _ in cnt.most_common(3)]
result["emails_found"] = top
if top:
result["email_domain_type"] = classify_email_domain(top[0])
# ─── Принадлежность email компании ───────────────────────────────────
# Оставляем только письма, чей домен совпадает с доменом сайта ИЛИ это
# бесплатный почтовик (компания сама опубликовала его у себя на сайте).
# Чужой корпоративный домен = разработчик темы / партнёр / отель → выкид.
result["emails_found"] = _belonging_emails(result["emails_found"], url)
result["email_domain_type"] = (
classify_email_domain(result["emails_found"][0])
if result["emails_found"] else None
)
# ─── Телефоны — приоритет TEL-ссылок ──────────────────────────────────
# Аналогично email: на странице может быть 78 совпадений regex'а с цифрами,
# это не контакты компании. Реальные контакты — в <a href="tel:...">.
# D19: берём ТОЛЬКО явные <a href="tel:..."> ссылки. Текстовый fallback
# (скан всех чисел страницы) УБРАН — он грёб партнёрские/шаблонные/иногородние
# номера, главный источник «чужих» телефонов. Достоверный телефон — с Я.Карт;
# эти — «к проверке» (пишутся в phones_extra через update_lead_contacts).
tel_phones_raw = re.findall(r'href=["\']?tel:([+0-9\-\s\(\)]{7,30})', html)
if tel_phones_raw:
tel_phones = extract_phones_from_text("\n".join(tel_phones_raw))
result["phones_found"] = _uniq(tel_phones)[:2]
return result
def _extract_inn_ogrn_kpp(html: str) -> tuple[str | None, str | None, str | None]:
"""Извлечь ИНН (10/12 цифр), ОГРН/ОГРНИП (13/15), КПП (9) из HTML страницы.
Используется для парсинга footer'ов сайтов компаний и страниц /contacts/.
Юр.лица по 152-ФЗ обязаны публиковать ИНН на сайте.
"""
# Разделитель между меткой и числом: пробелы, двоеточие или HTML-entity &nbsp;
# ПРИМ.: раньше было [\s:&nbsp;] — это класс из символов {пробел,:,&,n,b,s,p,;},
# а не entity. Заменено на корректную группу (?:[\s:]|&nbsp;)+.
sep = r"(?:[\s:]|&nbsp;)+"
inn_m = re.search(rf"\bИНН{sep}(\d{{10,12}})\b", html, re.IGNORECASE)
ogrn_m = re.search(rf"\bОГРН[ИП]{{0,2}}{sep}(\d{{13,15}})\b", html, re.IGNORECASE)
kpp_m = re.search(rf"\bКПП{sep}(\d{{9}})\b", html, re.IGNORECASE)
inn = inn_m.group(1) if inn_m and len(inn_m.group(1)) in (10, 12) else None
# Контрольная сумма: отсекаем фейк-ИНН с footer'ов (напр. 888800000099),
# которые проходят по длине, но не по контрольным цифрам РФ.
if inn and not is_valid_inn(inn):
inn = None
ogrn = ogrn_m.group(1) if ogrn_m and len(ogrn_m.group(1)) in (13, 15) else None
if ogrn and not is_valid_ogrn(ogrn):
ogrn = None
kpp = kpp_m.group(1) if kpp_m else None
return inn, ogrn, kpp
def _uniq(items: list[str]) -> list[str]:
"""Уникализация с сохранением порядка."""
seen: set[str] = set()
out: list[str] = []
for x in items:
if x and x not in seen:
seen.add(x)
out.append(x)
return out
# Подстроки в email которые говорят что это технический / служебный адрес
_BAD_EMAIL_SUBSTRINGS = (
"no-reply", "noreply", "mailer-daemon", "postmaster", "donotreply",
"@sentry.", "@example.", "@test.", "@localhost", "@email.com",
"@tilda.cc", "@wix.com", "@wordpress.com",
"u003e", "u003c", # JSON-escaped мусор из inline JS
".png", ".jpg", ".gif", # email в имени файла = ложное совпадение
)
def _filter_quality_emails(emails: list[str]) -> list[str]:
"""Отфильтровать технические/мусорные email + lowercase + uniq."""
out: list[str] = []
seen: set[str] = set()
for e in emails:
el = e.lower().strip()
if not el or el in seen:
continue
if any(bad in el for bad in _BAD_EMAIL_SUBSTRINGS):
continue
# Срезаем явно сломанные хвосты (типа "info@example.compng")
if re.search(r"\.(pn|jp|gi|cs|js|html?)g?$", el):
continue
seen.add(el)
out.append(el)
return out
def _belonging_emails(emails: list[str], site_url: str | None) -> list[str]:
"""Оставить только email, принадлежащие компании этого сайта.
Правило precision: домен письма == домен сайта (по 2 последним лейблам)
ИЛИ бесплатный почтовик (его компания сама опубликовала у себя на сайте).
Чужой корпоративный домен — почти всегда разработчик темы, партнёр,
агрегатор или соседний бренд → отбрасываем.
"""
def _reg(d: str | None) -> str | None:
if not d:
return None
parts = d.split(".")
return ".".join(parts[-2:]) if len(parts) >= 2 else d
site_reg = _reg(normalize_domain(site_url)) if site_url else None
out: list[str] = []
for e in emails:
dom = (e.split("@")[-1] or "").lower() if "@" in e else ""
if not dom:
continue
if dom in FREE_EMAIL_DOMAINS or (site_reg and _reg(dom) == site_reg):
out.append(e)
return out
if __name__ == "__main__":
# Smoke-тест на одном из реальных сайтов из БД
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
test_sites = [
"https://gvidon.wrf.su/", # Гвидон (из нашей БД)
"https://karavaevi.ru/", # Караваевы
"https://cafe-pushkin.ru/", # Кафе Пушкинъ
]
for site in test_sites:
print(f"\n{site}")
info = analyze_website(site)
for k, v in info.items():
print(f" {k}: {v}")