Объединение глобального и локального поиска для получения максимально точного ответа
Делиться

Реализация GraphRAG от Microsoft была одной из первых систем GraphRAG и представила множество инновационных функций. Она сочетает в себе этап индексации, на котором извлекаются и суммируются сущности, связи и иерархические сообщества, с расширенными возможностями обработки запросов. Такой подход позволяет системе отвечать на общие тематические вопросы, используя предварительно вычисленные сводки сущностей, связей и сообществ, выходя за рамки традиционных ограничений стандартных систем RAG, связанных с поиском документов.

Я уже рассматривал этап индексации, а также механизмы глобального и локального поиска в предыдущих публикациях блога (здесь и здесь), поэтому мы пропустим эти подробности в данном обсуждении. Однако мы ещё не рассматривали поиск DRIFT, которому и будет посвящена эта публикация. DRIFT — это новый подход, сочетающий в себе характеристики методов глобального и локального поиска. Метод начинается с использования информации сообщества посредством векторного поиска для определения общей отправной точки для запросов, а затем использует эти знания сообщества для уточнения исходного вопроса и перевода его в подробные последующие запросы. Это позволяет DRIFT динамически обходить граф знаний для получения конкретной информации о сущностях, связях и других локальных данных, обеспечивая баланс между вычислительной эффективностью и всеобъемлющим качеством ответа.

Реализация использует рабочие процессы LlamaIndex для организации процесса поиска DRIFT, состоящего из нескольких ключевых этапов. Процесс начинается с генерации HyDE , которая формирует гипотетический ответ на основе образца отчёта сообщества для улучшения репрезентативности запроса.
Затем на этапе поиска по сообществу используется векторное сходство для выявления наиболее релевантных сообщений сообщества, что обеспечивает общий контекст для запроса. Система анализирует эти результаты, чтобы сформировать начальный промежуточный ответ и набор дополнительных запросов для более глубокого исследования.
Эти последующие запросы выполняются параллельно на этапе локального поиска , извлекая целевую информацию, включая фрагменты текста, сущности, взаимосвязи и дополнительные отчёты сообщества из графа знаний. Этот процесс может повторяться до максимальной глубины, при этом каждый раунд потенциально порождает новые последующие запросы.
Наконец, этап генерации ответов синтезирует все промежуточные ответы, собранные в ходе процесса, объединяя общие данные на уровне сообщества с подробными локальными результатами для получения комплексного ответа. Этот подход обеспечивает баланс между широтой и глубиной, начиная с общего контекста сообщества и постепенно углубляясь в детали.
Это моя реализация поиска DRIFT, адаптированная для рабочих процессов LlamaIndex и Neo4j. Я провёл обратную разработку этого подхода, изучив код Microsoft GraphRAG, поэтому возможны некоторые отличия от оригинальной реализации.
Код доступен на GitHub.
Набор данных
Для этой публикации мы будем использовать «Приключения Алисы в Стране чудес» Льюиса Кэрролла — классический текст, доступный бесплатно на сайте Project Gutenberg. Этот богатый повествовательный набор данных с взаимосвязанными персонажами, местами и событиями делает его отличным выбором для демонстрации возможностей GraphRAG.
Проглатывание
Для процесса приема данных мы повторно используем реализацию индексации Microsoft GraphRAG, которую я разработал для предыдущей записи в блоге, адаптировав ее в рабочий процесс LlamaIndex.

Конвейер приема данных следует стандартному подходу GraphRAG и состоит из трех основных этапов:
класс MSGraphRAGIngestion(Workflow): @step async def entity_extraction(self, ev: StartEvent) -> EntitySummarization: фрагменты = splitter.split_text(ev.text) await ms_graph.extract_nodes_and_rels(chunks, ev.allowed_entities) return EntitySummarization() @step async def entity_summarization( self, ev: EntitySummarization ) -> CommunitySummarization: await ms_graph.summarize_nodes_and_rels() return CommunitySummarization() @step async def community_summarization( self, ev: CommunitySummarization ) -> CommunityEmbeddings: await ms_graph.summarize_communities() return CommunityEmbeddings()
Рабочий процесс извлекает сущности и связи из текстовых фрагментов, генерирует сводки как для узлов, так и для связей, а затем создает иерархические сводки сообщества.
После суммирования мы генерируем векторные представления для сообществ и сущностей, чтобы обеспечить поиск по сходству. Вот этап создания представления для сообщества:
@step async def community_embeddings(self, ev: CommunityEmbeddings) -> EntityEmbeddings: # Извлечь все сообщества из базы данных графа community = ms_graph.query( «»» MATCH (c:__Community__) WHERE c.summary IS NOT NULL AND c.rating > $min_community_rating RETURN coalesce(c.title, «») + » » + c.summary AS community_description, c.id AS community_id «»», params={«min_community_rating»: MIN_COMMUNITY_RATING}, ) if community: # Сгенерировать векторные вложения из описаний сообществ response = await client.embeddings.create( input=[c[«community_description»] for c in community], model=TEXT_EMBEDDING_MODEL, ) # Сохранить вложения в графе и создать векторный индекс embeds = [ { «community_id»: community[«community_id»], «embedding»: embedding.embedding, } для сообщества, встраивание в zip(communities, response.data) ] ms_graph.query( «»»РАСКРЫТЬ $data как строку MATCH (c:__Community__ {id: row.community_id}) CALL db.create.setNodeVectorProperty(c, 'embedding', row.embedding)»»», params={«data»: embeds}, ) ms_graph.query( «СОЗДАТЬ ВЕКТОРНЫЙ ИНДЕКС community IF NOT EXISTS FOR (c:__Community__) ON c.embedding» ) return EntityEmbeddings()
Тот же процесс применяется к внедрению сущностей, создавая векторные индексы, необходимые для поиска DRIFT на основе сходства.
Поиск ДРИФТА
Поиск DRIFT — это интуитивно понятный подход к поиску информации: сначала нужно понять общую картину, а затем при необходимости углубляться в детали. Вместо того, чтобы сразу искать точные совпадения на уровне документа или сущности, DRIFT сначала обращается к сводкам сообщества — общим обзорам, охватывающим основные темы и разделы в графе знаний.
После того, как DRIFT обнаруживает релевантную информацию более высокого уровня, он интеллектуально генерирует последующие запросы для получения точной информации о конкретных сущностях, связях и исходных документах. Этот двухэтапный подход отражает естественный процесс поиска информации человеком: сначала мы ориентируемся на основе общего обзора, а затем задаём конкретные вопросы для уточнения деталей. Сочетая всеобъемлющий охват глобального поиска с точностью локального поиска, DRIFT обеспечивает как широту, так и глубину поиска без вычислительных затрат на обработку каждого отчёта или документа сообщества.
Давайте рассмотрим каждый этап внедрения.
Код доступен на GitHub.
Поиск по сообществу
DRIFT использует HyDE (гипотетические встраивания документов) для повышения точности векторного поиска. Вместо того, чтобы напрямую встраивать запрос пользователя, HyDE сначала генерирует гипотетический ответ, а затем использует его для поиска по сходству. Это работает, поскольку гипотетические ответы семантически ближе к реальным сводкам сообщества, чем исходные запросы.
@step async def hyde_generation(self, ev: StartEvent) -> CommunitySearch: # Извлекаем случайный отчет сообщества для использования в качестве шаблона для генерации HyDE random_community_report = driver.execute_query( «»» MATCH (c:__Community__) WHERE c.summary IS NOT NULL RETURN coalesce(c.title, «») + » » + c.summary AS community_description»»», result_transformer_=lambda r: r.data(), ) # Сгенерируем гипотетический ответ для улучшения представления запроса hyde = HYDE_PROMPT.format( query=ev.query, template=random_community_report[0][«community_description»] ) hyde_response = await client.responses.create( model=»gpt-5-mini», input=[{«role»: «user», «content»: hyde}], reasoning={«effort»: «low»}, ) return CommunitySearch(query=ev.query, hyde_query=hyde_response.output_text)
Затем мы встраиваем запрос HyDE и извлекаем 5 наиболее релевантных отчётов сообщества с помощью векторного сходства. Затем он предлагает LLM сгенерировать промежуточный ответ на основе этих отчётов и определить последующие запросы для более глубокого исследования. Промежуточный ответ сохраняется, и все последующие запросы отправляются параллельно для фазы локального поиска.
@step async def community_search(self, ctx: Context, ev: CommunitySearch) -> LocalSearch: # Создать встраивание из запроса, улучшенного HyDE embedding_response = await client.embeddings.create( input=ev.hyde_query, model=TEXT_EMBEDDING_MODEL ) embedding = embedding_response.data[0].embedding # Найти 5 самых релевантных отчетов сообщества с помощью векторного сходства community_reports = driver.execute_query( «»» CALL db.index.vector.queryNodes('community', 5, $embedding) YIELD node, score RETURN 'community-' + node.id AS source_id, node.summary AS community_summary «»», result_transformer_=lambda r: r.data(), embedding=embedding, ) # Сгенерировать первоначальный ответ и определите, какая дополнительная информация необходима initial_prompt = DRIFT_PRIMER_PROMPT.format( query=ev.query, community_reports=community_reports ) initial_response = await client.responses.create( model=»gpt-5-mini», input=[{«role»: «user», «content»: initial_prompt}], reasoning={«effort»: «low»}, ) response_json = json_repair.loads(initial_response.output_text) print(f»Initial intermediate response: {response_json['intermediate_answer']}») # Сохраните первоначальный ответ и подготовьте его к параллельному локальному поиску асинхронно с ctx.store.edit_state() как ctx_state: ctx_state[«intermediate_answers»] = [ { «intermediate_answer»: response_json[«intermediate_answer»], «score»: response_json[«score»], } ] ctx_state[«local_search_num»] = len(response_json[«follow_up_queries»]) # Отправка последующих запросов для параллельного выполнения для local_query в response_json[«follow_up_queries»]: ctx.send_event(LocalSearch(query=ev.query, local_query=local_query)) return None
Это определяет основной подход DRIFT: начать с широкого поиска по сообществу, улучшенного с помощью HyDE, а затем углубить его с помощью целевых последующих запросов.
Локальный поиск
На этапе локального поиска параллельно выполняются последующие запросы для детализации конкретных данных. Каждый запрос извлекает целевой контекст с помощью векторного поиска по сущностям, затем генерирует промежуточный ответ и, возможно, дополнительные последующие запросы.
@step(num_workers=5) async def local_search(self, ev: LocalSearch) -> LocalSearchResults: print(f»Running local query: {ev.local_query}») # Create embedding for local query response = await client.embeddings.create( input=ev.local_query, model=TEXT_EMBEDDING_MODEL ) embedding = response.data[0].embedding # Извлечь соответствующие сущности и собрать их связанный контекст: # — Текстовые фрагменты, в которых упоминаются сущности # — Сообщения сообщества, к которым принадлежат сущности # — Связи между извлеченными сущностями # — Описания сущностей local_reports = driver.execute_query( «»» CALL db.index.vector.queryNodes('entity', 5, $embedding) YIELD node, score WITH collect(node) AS nodes WITH collect { UNWIND nodes as n MATCH (n)<-[:MENTIONS]->(c:__Chunk__) WITH c, count(distinct n) as freq RETURN {chunkText: c.text, source_id: 'chunk-' + c.id} ORDER BY freq DESC LIMIT 3 } AS text_mapping, collect { UNWIND nodes as n MATCH (n)-[:IN_COMMUNITY*]->(c:__Community__) WHERE c.summary IS NOT NULL WITH c, c.rating as rank RETURN {summary: c.summary, source_id: 'community-' + c.id} ORDER BY rank DESC LIMIT 3 } AS report_mapping, collect { UNWIND nodes as n MATCH (n)-[r:SUMMARIZED_RELATIONSHIP]-(m) WHERE m IN nodes RETURN {descriptionText: r.summary, source_id: 'relationship-' + n.name + '-' + m.name} LIMIT 3 } as insideRels, collect { UNWIND nodes as n RETURN {descriptionText: n.summary, source_id: 'node-' + n.name} } as entitys RETURN {Chunks: text_mapping, Reports: report_mapping, Relationships: insideRels, Entities: entitys} AS output «»», result_transformer_=lambda r: r.data(), embedding=embedding, ) # Сгенерировать ответ на основе полученного контекста local_prompt = DRIFT_LOCAL_SYSTEM_PROMPT.format( response_type=DEFAULT_RESPONSE_TYPE, context_data=local_reports, global_query=ev.query, ) local_response = await client.responses.create( model=»gpt-5-mini», input=[{«role»: «user», «content»: local_prompt}], reasoning={«effort»: «low»}, ) response_json = json_repair.loads(local_response.output_text) # Ограничьте количество последующих запросов, чтобы предотвратить экспоненциальный рост response_json[«follow_up_queries»] = response_json[«follow_up_queries»][:LOCAL_TOP_K] return LocalSearchResults(results=response_json, query=ev.query)
Следующий шаг организует итеративный процесс углубления. Он ожидает завершения всех параллельных поисков с помощью collect_events, а затем решает, продолжать ли углубление. Если текущая глубина не достигла максимума (мы используем максимальную глубину = 2), он извлекает последующие запросы из всех результатов, сохраняет промежуточные ответы и запускает следующий раунд параллельных поисков.
@step async def local_search_results( self, ctx: Context, ev: LocalSearchResults ) -> LocalSearch | FinalAnswer: local_search_num = await ctx.store.get(«local_search_num») # Дождитесь завершения всех параллельных поисков results = ctx.collect_events(ev, [LocalSearchResults] * local_search_num) if results is None: return None intermediate_results = [ { «intermediate_answer»: event.results[«response»], «score»: event.results[«score»], } for event in results ] current_thought = await ctx.store.get(«local_search_thought», default=1) query = [ev.query for ev in results][0] # Продолжаем детализацию, если не достигли максимальной глубины if current_thought < MAX_LOCAL_SEARCH_DEPTH: await ctx.store.set("local_search_thought", current_thought + 1) follow_up_queries = [ query for event in results for query in event.results["follow_up_queries"] ] # Сохраняем промежуточные ответы и асинхронно отправляем следующий раунд поиска с помощью ctx.store.edit_state() как ctx_state: ctx_state["intermediate_answers"].extend(intermediate_results) ctx_state["local_search_num"] = len(follow_up_queries) for local_query in follow_up_queries: ctx.send_event(LocalSearch(query=query, local_query=local_query)) return None else: return FinalAnswer(query=query)
Это создаёт итеративный цикл уточнения, где каждый уровень глубины основывается на предыдущих результатах. По достижении максимальной глубины запускается генерация окончательного ответа.
Окончательный ответ
На последнем этапе все промежуточные ответы, собранные в ходе поиска DRIFT, синтезируются в комплексный ответ. Он включает в себя первоначальный ответ, полученный в ходе поиска в сообществе, и все ответы, полученные в ходе итераций локального поиска.
@step async def final_answer_generation(self, ctx: Context, ev: FinalAnswer) -> StopEvent: # Извлечь все промежуточные ответы, собранные в ходе процесса поиска intermediate_answers = await ctx.store.get(«intermediate_answers») # Обобщить все результаты в полный окончательный ответ answer_prompt = DRIFT_REDUCE_PROMPT.format( response_type=DEFAULT_RESPONSE_TYPE, context_data=intermediate_answers, global_query=ev.query, ) answer_response = await client.responses.create( model=»gpt-5-mini», input=[ {«role»: «developer», «content»: answer_prompt}, {«role»: «user», «content»: ev.query}, ], reasoning={«effort»: «low»}, ) return StopEvent(result=answer_response.output_text)
Краткое содержание
Поиск DRIFT представляет собой интересную стратегию, позволяющую сбалансировать широту глобального поиска с точностью локального. Начиная с контекста на уровне сообщества и постепенно углубляясь в итеративные запросы, он позволяет избежать вычислительных затрат, связанных с обработкой всех отчётов сообщества, при этом сохраняя полный охват.
Однако есть возможности для некоторых улучшений. Текущая реализация обрабатывает все промежуточные ответы одинаково, но фильтрация на основе их степени уверенности может улучшить качество окончательного ответа и снизить уровень шума. Аналогичным образом, последующие запросы можно ранжировать по релевантности или потенциальному объёму информации перед выполнением, обеспечивая первоочередное рассмотрение наиболее перспективных лидов.
Другим многообещающим улучшением станет внедрение этапа уточнения запросов, который использует LLM для анализа всех сгенерированных последующих запросов, группируя похожие запросы для предотвращения избыточных поисков и отфильтровывая запросы, которые вряд ли принесут полезную информацию. Это может значительно сократить количество локальных поисков, сохраняя при этом качество ответов.
Полная реализация доступна на GitHub для тех, кто заинтересован в экспериментах с этими улучшениями или адаптации DRIFT для собственных случаев использования.
Источник: towardsdatascience.com



























