В этой статье узнаем, как создать систему мониторинга новостей из Telegram-каналов и чатов с интеллектуальной фильтрацией и отправкой в целевой канал. Прототип мы реализуем на примере анализа экономических новостей.
Статья является логическим продолжением статей «Парсинг Телеграм-каналов, групп и чатов с обработкой в LLM» и «Парсинг pdf-отчётов публичных компаний для получения трейдерских инсайтов».
Мы рассмотрим, как агрегировать новости из каналов в Telegram, убирать дубликаты и автоматически детектировать те новости, которые могут повлиять на котировки публичных компаний. Это полезно, если есть желание сыграть на скорости распространения информации и «опередить рынок» в принятии инвестиционных решений на пару минут.
Разработанный шаблон можно применить и для других сценариев. Таких, как отслеживание новостей и акций конкурентов, отслеживание упоминаний компании/темы/услуги содержащие запрос «помогите найти электрика в ЖК…» и еще множестве подобных сценариев, где нужна автоматическая обработка множества сообщений из открытых чатов и каналов.
Приступим к реализации решения
Наш проект состоит из одного основного скрипта, который выполняет Мониторинг Telegram-каналов в реальном времени, раннюю фильтрацию по ключевым словам для отсева заведомо нерелевантных сообщений, векторизацию текста с помощью OpenAI Embeddings, поиск дубликатов в векторной базе Qdrant, интеллектуальную классификацию с помощью LLM Amvera для точного определения тематики и сохранение уникальных сообщений с отправкой в целевой канал.
Проведем подготовку и подготовим списки переменных и зависимостей.
Используем следующие зависимости:
telethon: Асинхронный клиент для Telegram API
qdrant-client: Клиент для работы с векторной базой данных Qdrant
openai: Для создания векторных представлений текста (эмбеддингов)
httpx: Для асинхронных HTTP-запросов к LLM Amvera
python-dotenv:Для загрузки переменных окружения
Мы будем использовать несколько как обязательных, так и опциональных переменных окружения. Вот их список.
Обязательные переменные:
API_ID = int(os.getenv(«API_ID», «0»)) ID приложения Telegram* API_HASH = os.getenv(«API_HASH») Хэш приложения Telegram* OPENAI_API_KEY = os.getenv(«OPENAI_API_KEY») Ключ для OpenAI Embeddings* QDRANT_URL = os.getenv(«QDRANT_URL») URL базы данных Qdrant* QDRANT_API_KEY = os.getenv(«QDRANT_API_KEY») Ключ API Qdrant* LLM_API_KEY = os.getenv(«LLM_API_KEY») Ключ для LLM Amvera*
Опциональные переменные:
SOURCE_CHANNEL = os.getenv(«SOURCE_CHANNEL») Канал-источник для мониторинга* DEST_CHANNEL = os.getenv(«DEST_CHANNEL») Целевой канал для результатов* COLLECTION_NAME = os.getenv(«COLLECTION_NAME», «telegram_news») SIMILARITY_THRESHOLD = float(os.getenv(«SIMILARITY_THRESHOLD», «0.80»)) EMBEDDING_MODEL = os.getenv(«EMBEDDING_MODEL», «text-embedding-3-small»)
А теперь перейдем к самому важному и напишем код нашего сервиса
Скрипт начинает работу с инициализации всех необходимых клиентов и проверки их доступности:
async def init_clients(): global openai_client, qdrant_client loop = asyncio.get_event_loop() # OpenAI embeddings client if OPENAI_API_KEY and OpenAI is not None: try: openai_client = OpenAI(api_key=OPENAI_API_KEY) logger.info(«OpenAI client initialized») except Exception as e: logger.exception(f»Failed to init OpenAI client: {e}») openai_client = None else: if OPENAI_API_KEY and OpenAI is None: logger.warning(«OpenAI SDK not available — install openai package or adapt code.») else: logger.warning(«OPENAI_API_KEY not set — embeddings disabled.») openai_client = None
Перед началом работы система выполняет «preflight checks» — проверяет доступность всех внешних сервисов и при необходимости создает коллекцию в Qdrant.
Проведем предварительную фильтрация по ключевым словам.
Если мы будем полностью обрабатывать все сообщения, мы разоримся на API LLM и неприемлемо повысим нагрузку на сервис. Рациональнее руками задать слова, которые будут характеризовать потенциально интересные нам сообщения и обрабатывать только их.
В будущем можно сделать классификацию на основе ML, но пока просто зададим набор слов, которые можно менять под свои цели.
KEYWORD_HINTS = [ «акци», «бирж», «курс», «доллар», «евро», «рубл», «цен», «инфляц», «процент», «ставк», «рынок», «фонд», «индекс», «nasdaq», «sp», «s&p», «дивиден», «финанс», «эконом», «рецесс», «ввп», «облигац», «доходн», «отчет», «отчетность», «баланс», «выручк», «прибыл», «речь», «ставк»]
Чтобы адаптировать скрипт под свои нужды, просто отредактируйте список KEYWORD_HINTS, добавив термины из вашей предметной области. Например, для мониторинга криптовалют добавьте: «биткоин», «блокчейн», «эфириум».
Выполним векторизацию и поиск дубликатов.
Каждое сообщение преобразуется в векторное представление с помощью OpenAI:
# ========== Embeddings / Qdrant ops ========== async def embed_text(text: str): if not openai_client: raise RuntimeError(«OpenAI client not configured») if not text: return [] loop = asyncio.get_event_loop() try: resp = await loop.run_in_executor(None, lambda: openai_client.embeddings.create(model=EMBEDDING_MODEL, input=text)) emb = resp.data[0].embedding return emb except Exception as e: logger.exception(f»Embedding failed: {e}») raise
Затем система ищет похожие сообщения в базе Qdrant:
Qdrant это специальная векторная база данных. Вы можете использовать альтернативы в виде расширения PostgreSQL — pgVector или ChromaDB.
async def is_similar_to_existing(vec): if qdrant_client is None: return False, 0.0, None loop = asyncio.get_event_loop() try: results = await loop.run_in_executor( None, lambda: qdrant_client.search(collection_name=COLLECTION_NAME, query_vector=vec, limit=5, with_payload=True, with_vectors=True) ) except TypeError: try: results = await loop.run_in_executor( None, lambda: qdrant_client.search(collection_name=COLLECTION_NAME, query_vector=vec, limit=5, with_payload=True) ) except Exception as e: logger.warning(f»Qdrant search failed (fallback): {e}») return False, 0.0, None except Exception as e: logger.warning(f»Qdrant search failed: {e}») return False, 0.0, None best_sim = 0.0 best_hit = None for r in results: candidate_vector = getattr(r, «vector», None) if candidate_vector is None and getattr(r, «payload», None): candidate_vector = r.payload.get(«_vector») or r.payload.get(«vector») if candidate_vector: try: sim = cosine_sim(vec, candidate_vector) if sim > best_sim: best_sim = sim best_hit = r except Exception: logger.debug(«Skipping candidate vector due to mismatch») return (best_sim >= SIMILARITY_THRESHOLD), best_sim, best_hit
Настроем чувствительность срабатывания
Наши сообщения могут быть на одну тему, но отличаться словами. Важно задать такую степень близости, чтобы избежать дублирования срабатывания, но и не пропустить важные новые сообщения, семантически похожие на прошлые инфоповоды, но отличные по сути.
Параметр SIMILARITY_THRESHOLD — это один из самых важных параметров в системе, который позволяет тонко настраивать баланс между обнаружением дубликатов и пропуском уникального контента.
Регулируя его, мы сможем эксперементально подобрать нужный нам порог сходства текстов.
4. Интеллектуальная классификация с помощью LLM
Это ядро системы. Языковая модель анализирует текст и определяет его релевантность:
async def classify_with_llm(text: str) -> Tuple[bool, str]: «»» Возвращает (is_relevant, reason). Использует LLM_BASE_URL + LLM_API_KEY. Если LLM недоступен или ответ не распарсить — делаем keyword fallback. «»» text = text.strip() if not text: return False, «empty_text» if not LLM_BASE_URL or not LLM_API_KEY: return keyword_fallback(text) prompt = ( «Вы — точный классификатор. Определите, относится ли следующее сообщение к финансово-экономической » «тематике: акции (фондовый рынок), движение индексов, курсы валют, процентные ставки, » «макроэкономические показатели (ВВП, инфляция и т.п.), финансовая отчётность компаний, облигации, » «доходности, банковские/финансовые новости и другие экономические события.nn» «ОТВЕЧАЙТЕ ТОЛЬКО ОДНИМ КОРРЕКТНЫМ JSON-ОБЪЕКТОМ И НИЧЕМ БОЛЕЕ, В ТАКОМ ВИДЕ:n» ‘{«relevant»: true|false, «reason»: «короткое объяснение на русском», «labels»: [«акции»,»курс_валют»,»инфляция»]}nn’ «Поле ‘relevant’ — true если сообщение релевантно, false — если нет. » «В ‘labels’ перечислите короткие метки (например: «акции», «курс_валют», «инфляция», «фин_отчётность», «облигации»). » «В ‘reason’ дайте краткое пояснение (1–2 коротких фразы).nn» f»Сообщение для анализа:nn{text}nn» «Возвращайте ровно один JSON-объект и ничего больше.» )
Не меняя код, вы можете полностью изменить тематику фильтрации, просто отредактировав промпт. Например, для мониторинга ИТ-новостей измените описание тематики в промпте.
Сохранение и пересылка сообщений
Уникальные релевантные сообщения сохраняются в Qdrant и отправляются в целевой канал:
@events.register(events.NewMessage) async def global_handler(event): try: if SOURCE_CHANNEL and str(event.chat_id) not in (str(SOURCE_CHANNEL), SOURCE_CHANNEL): return msg = event.message text = (msg.message or «»).strip() if not text: logger.info(«Empty message — skip») return # Ранняя фильтрация по ключевым словам: если нет ни одного хинта — skip if not has_keyword_hint(text): logger.info(«Early keyword filter: no keyword hints found — skip») return logger.info(f»New message (id={msg.id}) — passed keyword filter, embedding…») try: emb = await embed_text(text) except Exception: logger.exception(«Embedding error — skipping message») return if not emb: logger.warning(«Empty embedding — skip») return # check similarity similar, sim_score, hit = await is_similar_to_existing(emb) logger.info(f»Similarity: {sim_score:.4f} (threshold={SIMILARITY_THRESHOLD})») if similar: logger.info(«Message is similar to existing — skip sending») return # If unique, check topic relevance via LLM logger.info(«Classifying message topic with LLM…») is_relevant, reason = await classify_with_llm(text) logger.info(f»LLM relevance: {is_relevant}, reason: {reason}») if not is_relevant: logger.info(«Message not relevant to finance/economics — skip and DO NOT upsert») return # Relevant => upsert then send normalized = normalize_text_for_uuid(text) point_id = str(uuid.uuid5(uuid.NAMESPACE_OID, normalized)) payload = { «text»: text, «chat_id»: str(event.chat_id), «message_id»: msg.id, «date»: str(msg.date.isoformat() if getattr(msg, «date», None) else time.time()), «llm_relevance_reason»: reason, } try: await upsert_point(point_id, emb, payload) logger.info(«Upserted point into Qdrant») except Exception: logger.exception(«Upsert failed — not sending to avoid duplicates») return # Send to destination channel if DEST_CHANNEL: try: await tg.send_message(DEST_CHANNEL, text) logger.info(f»Sent to destination {DEST_CHANNEL}») except ValueError as ve: logger.error(«Cannot get entity for DEST_CHANNEL — ensure bot/user is member and has rights») logger.exception(ve) except Exception as e: logger.exception(f»Failed to send message to DEST_CHANNEL: {e}») else: logger.warning(«DEST_CHANNEL not set — message not sent») except Exception as e: logger.exception(f»Unhandled error in handler: {e}»)
Как адаптировать проект под свои нужды
1. Меняем ключевые слова и промты
*# Пример для мониторинга ИТ-новостей* prompt = ( «Вы — точный классификатор. Определите, относится ли следующее сообщение к ИТ-тематике: » «программирование, искусственный интеллект, кибербезопасность, облачные технологии, » «стартапы, инвестиции в технологии, новые гаджеты и устройства.nn» )
2. Настраиваем источники
Укажите каналы для мониторинга и целевой канал в переменных окружения:
SOURCE_CHANNEL: username или ID канала-источника DEST_CHANNEL: username или ID канала для результатов
3. Регулируем параметры фильтрации
SIMILARITY_THRESHOLD=0.90 — очень строгая фильтрация дубликатов SIMILARITY_THRESHOLD=0.70 — более мягкая фильтрация Изменение KEYWORD_HINTS — регулировка раннего отсева
4. Выбираем модели эмбеддингов
Можете выбрать свою модель, что позволит ее проще интегрировать в наш проект. Как пример:
text-embedding-3-small — быстрее и дешевле.
text-embedding-3-large — точнее, но дороже.
Проведем тестирование системы фильтрации
Посмотрим, как код отличает дубликаты от уникальных новостей. Для этого проведем серию тестов для проверки эффективности системы мониторинга и фильтрации. Вот результаты двух ключевых тестов, демонстрирующих работу алгоритма в реальных условиях.
Тест 1: Фильтрация точных дубликатов и противоположных новостей
Входные данные:

Результат фильтрации:

Рекламное сообщение про автоподушки отсеяно на этапе ранней фильтрации по ключевым словам. Система не обнаружила в тексте финансово-экономических терминов из списка KEYWORD_HINTS.
Точный дубликат новости про рост акций Oracle отфильтрован системой векторного поиска. Косинусное сходство составило ~0.98, что значительно выше порога SIMILARITY_THRESHOLD=0.80.
Фильтрацию прошла первоначальная новость о росте акций Oracle. Она распознана как релевантная финансово-экономическая информация. И контрастная новость о банкротстве Oracle. Несмотря на упоминание той же компании, содержание кардинально отличается. Векторное представление этого сообщения значительно отличается от оригинала (сходство ~0.35), поэтому система правильно классифицировала его как уникальную информацию.
Тест 2: Интеллектуальное различение похожих формулировок
Входные данные:

Результат фильтрации:

Новость про Oracle прошла фильтрацию, но не показана в результатах, так как тест фокусировался на сообщениях про МТС.
Сообщение «акции МТС торгуются на 5% больше» отфильтровано как семантический дубликат. Хотя формулировка отличается, LLM-классификатор определил, что оба сообщения передают одинаковую смысловую нагрузку о росте акций МТС на 5%. Векторное сходство составило ~0.82.
Фильтрацию прошло оригинальное сообщение о росте акций МТС на 5%. Что мы и ожидали в результате работы нашего скрипта.
Заключение
Мы создали универсальную систему мониторинга и фильтрации контента из Telegram. Такой подход можно адаптировать для таких задач, как мониторинг новостей по конкретной тематике, отслеживание упоминаний брендов, сбор рыночной аналитики и создание дайджестов контента из различных источников.
Полный код проекта доступен на GitHub.
Источник: habr.com



























