Files
parser-v1/enricher/website_analyzer.py
T
Aks 98309dcc96 fix: устранены все найденные аудитом баги и тихие падения
- SQL injection паттерн → параметризованные запросы во всех местах
- except: pass/continue → logger.warning() везде, ничего не тонет молча
- WAL mode + индекс domain_dedup_key в database.py
- try/finally для conn в main.py, утечка соединения устранена
- backoff 30с при 403/429 от Rusprofile/ЕГРЮЛ
- ликвидированные компании → egrul_status="liquidated"
- max_candidates в contacts_finder считает только реальных кандидатов
- DB_PATH абсолютный (Path(__file__).parent), HH_PAUSE_BETWEEN_QUERIES в config
- HH_SIGNAL_QUERIES дубль убран из launcher.py → импорт из config
- path traversal защита в egrul_enricher debug_dump_html

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-09 13:19:52 +03:00

456 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Tier 2 enrichment — анализ сайта компании.
Делает 1 HTTP-запрос → ищет в HTML маркеры:
- CMS / конструктор сайта (tilda, wix, wordpress, bitrix, ...)
- Live-чат (jivo, talk-me, ...)
- Онлайн-запись / онлайн-бронирование (yclients, dikidi, ...)
- Аналитика (Я.Метрика, Google Analytics, GTM)
- Email на странице → корпоративный или бесплатный домен
Все детекторы — по text-маркерам в HTML.
False positives возможны (например, упоминание "yclients" в блоге),
но для скоринга сигнал достаточный.
"""
import logging
import re
import time
from datetime import datetime
from typing import Optional
import requests
import urllib3
from normalization import extract_phones_from_text, normalize_domain, is_valid_inn, is_valid_ogrn
# Глушим спам InsecureRequestWarning — verify=False нужен из-за множества старых сайтов
# с просроченными SSL-сертификатами. Это безопасно, т.к. мы только читаем HTML, не передаём данные.
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
logger = logging.getLogger(__name__)
# ───────────────────────────────────────────────────────────────────────
# Сигнатуры — что искать в HTML (всё в нижнем регистре)
# ───────────────────────────────────────────────────────────────────────
CMS_SIGNATURES: dict[str, list[str]] = {
"tilda": ["tilda.cc", "tildacdn", "tilda-blocks", "t-rec"],
"wix": ["wixstatic.com", "wix.com/script", "_wixcssmodules"],
"wordpress": ["wp-content", "wp-includes", "wp-json"],
"bitrix": ["bitrix/", "bx-core", "/bitrix/js"],
"modx": ["modx.com", "/manager/modx", "modxcms"],
"joomla": ["joomla!", "/components/com_", "/media/jui/"],
"drupal": ["drupal.settings", "/sites/all/", "/sites/default/"],
"webflow": ["webflow.io", "webflow.com", "wf-tabs"],
"squarespace":["squarespace.com", "static1.squarespace"],
"insales": ["insales.ru", "insales-cdn"],
"shopify": ["cdn.shopify.com", "shopify.com/s/files"],
"opencart": ["catalog/view/theme", "route=common", "route=product"],
"1c-bitrix-sites": ["sites.bitrix24", "bitrix24.site"],
"readymag": ["readymag.com"],
"craftum": ["craftum.com", "craftumusercontent"],
}
# Конструкторы / авто-визитки — определяются по ДОМЕНУ (надёжнее HTML-сигнатур).
# Я.Бизнес (clients.site / business.site) — авто-сайт из Яндекс.Бизнеса,
# слабейшее веб-присутствие; его движок НЕ детектится как CMS по HTML → ловим по URL.
BUILDER_DOMAINS = {
"clients.site": "yandex_business",
"business.site": "yandex_business",
"tilda.ws": "tilda",
"wixsite.com": "wix",
"nethouse.": "nethouse",
"taplink.": "taplink",
".ucoz.": "ucoz",
}
LIVE_CHAT_SIGNATURES = [
"jivosite", "jivo.ru", "jivochat",
"talk-me", "talkme.ru",
"carrotquest", "carrot-quest",
"usedesk",
"verbox.ru",
"redhelper",
"chat2desk",
"webim.ru",
"tawk.to", "embed.tawk",
"crisp.chat",
"intercom.io", "widget.intercom",
"livechatinc.com",
"callbackhunter",
"callback24",
]
ONLINE_BOOKING_SIGNATURES = [
"yclients.com", "n.yclients",
"dikidi.ru", "dikidi.net",
"ucalendar",
"altegio",
"sonline.su",
"gbooking",
"tickt.ee",
"reservepad",
"bookform",
"rezgo",
# Текстовые маркеры (русский интерфейс)
"онлайн-запис", "онлайн запис",
"забронировать столик", "забронировать стол",
]
ANALYTICS_SIGNATURES = [
# Яндекс.Метрика
"mc.yandex.ru/metrika", "yandex_metrika", "ym(",
# Google Analytics + GTM
"google-analytics.com", "googletagmanager.com",
"gtag(", "ga('send'", "ga('create'",
# Mail.ru top
"top.mail.ru", "top-fwz1.mail.ru",
]
# Домены бесплатной почты — если email на них, у компании нет своего email-сервера.
FREE_EMAIL_DOMAINS = {
"gmail.com", "googlemail.com",
"mail.ru", "list.ru", "bk.ru", "inbox.ru",
"yandex.ru", "ya.ru", "yandex.com",
"rambler.ru", "lenta.ru", "myrambler.ru",
"hotmail.com", "outlook.com", "live.com",
"yahoo.com",
"icloud.com", "me.com",
"protonmail.com",
"qq.com", "163.com",
}
# Ограничения длины по RFC 5321: local-part ≤ 64, domain ≤ 253, TLD ≤ 24.
# Это защищает от catastrophic backtracking на патологически длинных входах.
EMAIL_PATTERN = re.compile(r"[a-zA-Z0-9._%+-]{1,64}@[a-zA-Z0-9.-]{1,253}\.[a-zA-Z]{2,24}")
# ───────────────────────────────────────────────────────────────────────
# Детекторы (atomic — каждая делает одну вещь)
# ───────────────────────────────────────────────────────────────────────
def detect_cms(html_lower: str) -> Optional[str]:
"""Возвращает ключ из CMS_SIGNATURES либо 'custom' если ничего не нашли."""
for cms_name, signatures in CMS_SIGNATURES.items():
if any(sig in html_lower for sig in signatures):
return cms_name
return "custom"
def detect_cms_by_url(url: Optional[str]) -> Optional[str]:
"""Определить конструктор по домену (Я.Бизнес и пр.). None если не конструктор."""
u = (url or "").lower()
for frag, cms in BUILDER_DOMAINS.items():
if frag in u:
return cms
return None
def detect_any(html_lower: str, signatures: list[str]) -> bool:
return any(sig in html_lower for sig in signatures)
def extract_emails(html: str) -> list[str]:
"""Все email со страницы (с сохранением порядка, без дублей)."""
raw = EMAIL_PATTERN.findall(html)
seen: set[str] = set()
result: list[str] = []
for email in raw:
e = email.lower()
if e not in seen:
seen.add(e)
result.append(e)
return result
def classify_email_domain(email: str) -> str:
"""'corporate' если домен email — собственный, 'free' если из FREE_EMAIL_DOMAINS."""
if "@" not in email:
return "free"
domain = email.split("@", 1)[1].lower()
return "free" if domain in FREE_EMAIL_DOMAINS else "corporate"
# ───────────────────────────────────────────────────────────────────────
# Главная функция
# ───────────────────────────────────────────────────────────────────────
def analyze_website(url: str, timeout: float = 6.0) -> dict:
"""Запрашивает сайт, возвращает все Tier 2 поля.
Все поля под None = не удалось определить (например, сайт мёртвый).
"""
result: dict = {
"site_alive": None,
"site_status_code": None,
"cms_type": None,
"has_live_chat": None,
"has_online_booking": None,
"has_analytics": None,
"email_domain_type": None,
"site_checked_at": datetime.now().isoformat(timespec="seconds"),
# Найденные контакты со страницы сайта — для слияния в лида через update_lead_contacts.
# Эти поля НЕ входят в ENRICHMENT_FIELDS (не пишутся через update_enrichment).
"emails_found": [],
"phones_found": [],
# ИНН/ОГРН/КПП из footer сайта (152-ФЗ disclosure)
"inn": None,
"ogrn": None,
"kpp": None,
}
if not url:
return result
headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/121.0 Safari/537.36"
),
"Accept-Language": "ru-RU,ru;q=0.9",
}
try:
# timeout=(connect, read) — раздельные таймауты, чтобы сайт не мог
# повесить парсер бесконечно если соединение установилось но сервер
# отдаёт данные по чуть-чуть.
resp = requests.get(
url, headers=headers,
timeout=(timeout, timeout),
allow_redirects=True, verify=False, # многие старые сайты с битым SSL
stream=False,
)
result["site_status_code"] = resp.status_code
result["site_alive"] = 1 if 200 <= resp.status_code < 400 else 0
except requests.exceptions.SSLError:
result["site_alive"] = 0
result["site_status_code"] = -1 # маркер SSL-ошибки
return result
except requests.exceptions.Timeout:
result["site_alive"] = 0
result["site_status_code"] = -2 # маркер таймаута
return result
except requests.exceptions.RequestException as e:
logger.debug(f" Не удалось получить {url}: {e}")
result["site_alive"] = 0
result["site_status_code"] = -3 # маркер общей ошибки
return result
if not result["site_alive"]:
return result
# Анализ HTML.
# Ограничение размера: некоторые сайты отдают 5-10MB HTML
# (раздутые JS-бандлы, JSON-LD, embedded data), на которых regex
# email/phone начинает страдать catastrophic backtracking и виснет на
# минуты. Все полезные сигналы (CMS, чат, запись, аналитика, email,
# телефон) обычно в первых ~200KB страницы. Обрезаем агрессивно.
MAX_HTML_BYTES = 500_000 # 500KB
html = resp.text
if len(html) > MAX_HTML_BYTES:
logger.debug(f" HTML обрезан с {len(html)} до {MAX_HTML_BYTES} байт")
html = html[:MAX_HTML_BYTES]
html_lower = html.lower()
# Сначала по домену (Я.Бизнес и пр. авто-визитки), потом по HTML-сигнатурам.
result["cms_type"] = detect_cms_by_url(resp.url) or detect_cms(html_lower)
result["has_live_chat"] = 1 if detect_any(html_lower, LIVE_CHAT_SIGNATURES) else 0
result["has_online_booking"] = 1 if detect_any(html_lower, ONLINE_BOOKING_SIGNATURES) else 0
result["has_analytics"] = 1 if detect_any(html_lower, ANALYTICS_SIGNATURES) else 0
# ─── ИНН/ОГРН/КПП из footer сайта ────────────────────────────────────
# По 152-ФЗ юр.лица обязаны публиковать реквизиты на сайте. Часто это
# не на главной, а на /contacts/ или /o-kompanii/. Поэтому:
# 1. Ищем на главной (текущий html)
# 2. Если нет — пробуем 4 типичных contact-страницы (1-2 сек каждая)
inn_val, ogrn_val, kpp_val = _extract_inn_ogrn_kpp(html)
if not inn_val:
# Стучимся на типичные contact/about-страницы
from urllib.parse import urljoin
for path in ("/contacts/", "/contact/", "/kontakty/", "/about/",
"/o-kompanii/", "/rekvizity/", "/info/"):
sub_url = urljoin(url, path)
try:
sub_resp = requests.get(
sub_url, headers=headers,
timeout=(timeout, timeout), verify=False, stream=False,
)
if sub_resp.status_code != 200:
time.sleep(0.5)
continue
sub_html = sub_resp.text[:MAX_HTML_BYTES]
sub_inn, sub_ogrn, sub_kpp = _extract_inn_ogrn_kpp(sub_html)
if sub_inn:
inn_val = sub_inn
ogrn_val = ogrn_val or sub_ogrn
kpp_val = kpp_val or sub_kpp
logger.debug(f" ИНН найден на {path}: {inn_val}")
time.sleep(0.5)
break
time.sleep(0.5)
except Exception as e:
logger.warning(f"[website_analyzer] подстраница {path} сайта {url}: {e}")
continue
if inn_val:
result["inn"] = inn_val
if ogrn_val:
result["ogrn"] = ogrn_val
if kpp_val:
result["kpp"] = kpp_val
# ─── Email — приоритет MAILTO-ссылок ─────────────────────────────────
# Любой email на странице может быть шумом (партнёр, mailchimp, sentry,
# пример в документации). Реальный контакт компании — обычно в footer'е
# как <a href="mailto:info@example.com">. Извлекаем приоритетно ИХ.
mailto_emails = re.findall(
r'href=["\']?mailto:([a-zA-Z0-9._%+-]{1,64}@[a-zA-Z0-9.-]{1,253}\.[a-zA-Z]{2,24})',
html,
)
if mailto_emails:
# Чистим, уникализируем, фильтруем шумные домены
clean = _filter_quality_emails(mailto_emails)
result["emails_found"] = clean[:5] # max 5 — больше уже нерелевантно
if clean:
result["email_domain_type"] = classify_email_domain(clean[0])
else:
# Fallback — текстовый поиск, но берём топ-3 по частоте и только те
# которые встречаются ≥1 раз (можно даже фильтровать ≥2 раза для шума).
all_emails = extract_emails(html)
if all_emails:
clean = _filter_quality_emails(all_emails)
# Сортируем по частоте — самые упоминаемые сверху
from collections import Counter
cnt = Counter(clean)
top = [e for e, _ in cnt.most_common(3)]
result["emails_found"] = top
if top:
result["email_domain_type"] = classify_email_domain(top[0])
# ─── Принадлежность email компании ───────────────────────────────────
# Оставляем только письма, чей домен совпадает с доменом сайта ИЛИ это
# бесплатный почтовик (компания сама опубликовала его у себя на сайте).
# Чужой корпоративный домен = разработчик темы / партнёр / отель → выкид.
result["emails_found"] = _belonging_emails(result["emails_found"], url)
result["email_domain_type"] = (
classify_email_domain(result["emails_found"][0])
if result["emails_found"] else None
)
# ─── Телефоны — приоритет TEL-ссылок ──────────────────────────────────
# Аналогично email: на странице может быть 78 совпадений regex'а с цифрами,
# это не контакты компании. Реальные контакты — в <a href="tel:...">.
# D19: берём ТОЛЬКО явные <a href="tel:..."> ссылки. Текстовый fallback
# (скан всех чисел страницы) УБРАН — он грёб партнёрские/шаблонные/иногородние
# номера, главный источник «чужих» телефонов. Достоверный телефон — с Я.Карт;
# эти — «к проверке» (пишутся в phones_extra через update_lead_contacts).
tel_phones_raw = re.findall(r'href=["\']?tel:([+0-9\-\s\(\)]{7,30})', html)
if tel_phones_raw:
tel_phones = extract_phones_from_text("\n".join(tel_phones_raw))
result["phones_found"] = _uniq(tel_phones)[:2]
return result
def _extract_inn_ogrn_kpp(html: str) -> tuple[str | None, str | None, str | None]:
"""Извлечь ИНН (10/12 цифр), ОГРН/ОГРНИП (13/15), КПП (9) из HTML страницы.
Используется для парсинга footer'ов сайтов компаний и страниц /contacts/.
Юр.лица по 152-ФЗ обязаны публиковать ИНН на сайте.
"""
# Разделитель между меткой и числом: пробелы, двоеточие или HTML-entity &nbsp;
# ПРИМ.: раньше было [\s:&nbsp;] — это класс из символов {пробел,:,&,n,b,s,p,;},
# а не entity. Заменено на корректную группу (?:[\s:]|&nbsp;)+.
sep = r"(?:[\s:]|&nbsp;)+"
inn_m = re.search(rf"\bИНН{sep}(\d{{10,12}})\b", html, re.IGNORECASE)
ogrn_m = re.search(rf"\bОГРН[ИП]{{0,2}}{sep}(\d{{13,15}})\b", html, re.IGNORECASE)
kpp_m = re.search(rf"\bКПП{sep}(\d{{9}})\b", html, re.IGNORECASE)
inn = inn_m.group(1) if inn_m and len(inn_m.group(1)) in (10, 12) else None
# Контрольная сумма: отсекаем фейк-ИНН с footer'ов (напр. 888800000099),
# которые проходят по длине, но не по контрольным цифрам РФ.
if inn and not is_valid_inn(inn):
inn = None
ogrn = ogrn_m.group(1) if ogrn_m and len(ogrn_m.group(1)) in (13, 15) else None
if ogrn and not is_valid_ogrn(ogrn):
ogrn = None
kpp = kpp_m.group(1) if kpp_m else None
return inn, ogrn, kpp
def _uniq(items: list[str]) -> list[str]:
"""Уникализация с сохранением порядка."""
seen: set[str] = set()
out: list[str] = []
for x in items:
if x and x not in seen:
seen.add(x)
out.append(x)
return out
# Подстроки в email которые говорят что это технический / служебный адрес
_BAD_EMAIL_SUBSTRINGS = (
"no-reply", "noreply", "mailer-daemon", "postmaster", "donotreply",
"@sentry.", "@example.", "@test.", "@localhost", "@email.com",
"@tilda.cc", "@wix.com", "@wordpress.com",
"u003e", "u003c", # JSON-escaped мусор из inline JS
".png", ".jpg", ".gif", # email в имени файла = ложное совпадение
)
def _filter_quality_emails(emails: list[str]) -> list[str]:
"""Отфильтровать технические/мусорные email + lowercase + uniq."""
out: list[str] = []
seen: set[str] = set()
for e in emails:
el = e.lower().strip()
if not el or el in seen:
continue
if any(bad in el for bad in _BAD_EMAIL_SUBSTRINGS):
continue
# Срезаем явно сломанные хвосты (типа "info@example.compng")
if re.search(r"\.(pn|jp|gi|cs|js|html?)g?$", el):
continue
seen.add(el)
out.append(el)
return out
def _belonging_emails(emails: list[str], site_url: str | None) -> list[str]:
"""Оставить только email, принадлежащие компании этого сайта.
Правило precision: домен письма == домен сайта (по 2 последним лейблам)
ИЛИ бесплатный почтовик (его компания сама опубликовала у себя на сайте).
Чужой корпоративный домен — почти всегда разработчик темы, партнёр,
агрегатор или соседний бренд → отбрасываем.
"""
def _reg(d: str | None) -> str | None:
if not d:
return None
parts = d.split(".")
return ".".join(parts[-2:]) if len(parts) >= 2 else d
site_reg = _reg(normalize_domain(site_url)) if site_url else None
out: list[str] = []
for e in emails:
dom = (e.split("@")[-1] or "").lower() if "@" in e else ""
if not dom:
continue
if dom in FREE_EMAIL_DOMAINS or (site_reg and _reg(dom) == site_reg):
out.append(e)
return out
if __name__ == "__main__":
# Smoke-тест на одном из реальных сайтов из БД
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
test_sites = [
"https://gvidon.wrf.su/", # Гвидон (из нашей БД)
"https://karavaevi.ru/", # Караваевы
"https://cafe-pushkin.ru/", # Кафе Пушкинъ
]
for site in test_sites:
print(f"\n{site}")
info = analyze_website(site)
for k, v in info.items():
print(f" {k}: {v}")