Откройте для себя радость создания надежных веб-приложений без сложностей, связанных с громоздким фронтенд-стеком
Делиться

В части 1 мы показали, как использовать HTMX для добавления интерактивности к нашим HTML-элементам. Другими словами, Javascript без Javascript. Чтобы проиллюстрировать это, мы начали создавать простой чат, который возвращал бы смоделированный ответ LLM. В этой статье мы расширим возможности нашего чат-бота и добавим несколько функций, среди которых потоковая передача, что значительно улучшает пользовательский опыт по сравнению с ранее созданным синхронным чатом.
- ✅ Потоковая передача в реальном времени с помощью SSE
- ✅ Архитектура на основе сеансов для нескольких пользователей
- ✅ Асинхронная координация с помощью asyncio.Queue
- ✅ Чистые шаблоны HTMX со специальной обработкой SSE
- ✅ Поисковый агент Google для ответов на запросы с использованием свежих данных
- ✅ Почти без JavaScript
Вот что мы построим сегодня:

От синхронной связи к асинхронной
То, что мы создали ранее, использовало самые базовые веб-функции с использованием форм. Наше взаимодействие было синхронным, то есть мы ничего не получали, пока сервер не завершит работу. Мы отправляем запрос, ждём полного ответа и отображаем его. В промежутках между этими двумя действиями мы просто… ждём.
Но современные чат-боты работают иначе, предоставляя возможности асинхронного общения. Это достигается с помощью потоковой передачи: мы получаем обновления и частичные ответы, а не ждем полного ответа. Это особенно полезно, когда процесс ответа занимает много времени, что обычно характерно для LLM, когда ответ длинный.
SSE против Websockets
SSE (Server Side Events) и Websockets — это два протокола обмена данными в реальном времени между клиентом и сервером.
Websockets обеспечивает полнодуплексные соединения: это означает, что браузер и сервер могут одновременно отправлять и получать данные. Это обычно используется в онлайн-играх, чат-приложениях и инструментах для совместной работы (например, Google Таблицах).
SSE является однонаправленным и допускает только односторонний обмен данными — от сервера к клиенту. Это означает, что клиент не может ничего отправить на сервер по этому протоколу. Если веб-сокеты — это двусторонний телефонный разговор, где люди могут говорить и слушать одновременно, то SSE — это как прослушивание радио. SSE обычно используются для отправки уведомлений, обновления графиков в финансовых приложениях или новостных лент.
Итак, почему мы выбираем SSE? Потому что в нашем случае нам не нужен полнодуплексный протокол, и простой HTTP (который не соответствует принципам работы Websockets) вполне достаточен для нашего случая: мы отправляем данные и получаем их. SSE просто означает, что мы будем получать данные в потоке, больше ничего не нужно.
Что мы хотим сделать
- Пользователь вводит запрос
- Сервер получает запрос и отправляет его в LLM
- LLM начинает производить контент
- Для каждого фрагмента контента сервер возвращает его немедленно.
- Браузер добавляет эту информацию в DOM
Мы разделим нашу работу на бэкэнд и фронтэнд части.
Бэкэнд
Бэкэнд будет проходить в 2 этапа:
- Конечная точка POST, которая получит сообщение и ничего не вернет.
- Конечная точка GET, которая будет считывать очередь и создавать выходной поток.
В нашей демонстрации мы для начала создадим фальшивый ответ LLM, повторив пользовательский ввод, что означает, что слова потока будут точно такими же, как и пользовательский ввод.
Чтобы поддерживать порядок, нам нужно разделить потоки сообщений (очереди) по сеансам пользователей, иначе мы будем путать разговоры. Поэтому мы создадим словарь сеансов для размещения наших очередей.
Далее нам нужно указать бэкенду дождаться заполнения очереди, прежде чем отправлять наш ответ. Если мы этого не сделаем, возникнут проблемы с параллельным выполнением или синхронизацией: SSE запускается на стороне клиента, очередь пуста, SSE закрывается, пользователь вводит сообщение, но… уже слишком поздно!
Решение: асинхронные очереди! Использование асинхронных очередей имеет ряд преимуществ:
- Если в очереди есть данные: возвращается немедленно
- Если очередь пуста: приостанавливает выполнение до вызова queue.put().
- Несколько потребителей: каждый получает свои собственные данные
- Потокобезопасность: отсутствие условий гонки
Я знаю, что вам не терпится узнать больше, поэтому вот код ниже:
from fastapi import FastAPI, Request, Form from fastapi.templating import Jinja2Templates from fastapi.responses import HTMLResponse, StreamingResponse import asyncio import time import uuid app = FastAPI() templates = Jinja2Templates(«templates») # Этот объект будет хранить идентификатор сеанса и соответствующее ему значение, асинхронную очередь. sessions = dict() @app.get(«/») async def root(request: Request): session_id = str(uuid.uuid4()) sessions[session_id] = asyncio.Queue() return templates.TemplateResponse(request, «index.html», context={«session_id»: session_id}) @app.post(«/chat») async def chat(request: Request, query: str=Form(…), session_id: str=Form(…)): «»» Отправка сообщения в очередь на основе сеансов «»» # Создание сеанса, если он не существует if session_id отсутствует в сеансах: sessions[session_id] = asyncio.Queue() # Помещение сообщения в очередь await sessions[session_id].put(query) return {«status»: «queued», «session_id»: session_id} @app.get(«/stream/{session_id}») async def stream(session_id: str): async def response_stream(): if session_id not in sessions: print(f»Сессия {session_id} не найдена!») return queue = sessions[session_id] # Это БЛОКИРУЕТСЯ, пока не поступят данные print(f»Ожидание сообщения в сессии {session_id}») data = await queue.get() print(f»Получено сообщение: {data}») message = «» await asyncio.sleep(1) for token in data.replace(«n», » «).split(» «): message += token + » » data = f»»»data:
nn»»» yield data await asyncio.sleep(0.03) queue.task_done() return StreamingResponse(response_stream(), media_type=»text/event-stream»)
Давайте поясним несколько ключевых понятий.
Изоляция сеанса
Важно, чтобы у каждого пользователя была своя очередь сообщений, чтобы не смешивать диалоги. Для этого используется словарь сессий. В реальных приложениях для хранения этой информации, вероятно, используется Redis. В коде ниже мы видим, что новый идентификатор сессии создаётся при загрузке страницы и сохраняется в словаре сессий. Перезагрузка страницы запускает новую сессию. Мы не сохраняем очереди сообщений, но можем это сделать, например, через базу данных. Эта тема рассматривается в части 3.
# Этот объект будет хранить идентификатор сеанса и соответствующее ему значение, асинхронную очередь. sessions = dict() @app.get(«/») async def root(request: Request): session_id = str(uuid.uuid4()) sessions[session_id] = asyncio.Queue() return templates.TemplateResponse(request, «index.html», context={«session_id»: session_id})
Блокирование координации
Нам необходимо контролировать порядок отправки SSE и получения пользовательских запросов. На стороне бэкенда порядок такой:
- Получить сообщение пользователя
- Создайте очередь сообщений и заполните ее
- Отправка сообщений из очереди в потоковом ответе
Невыполнение этого требования может привести к нежелательному поведению, например, к первоначальному чтению (пустой) очереди сообщений и последующему заполнению ее запросом пользователя.
Решение для управления порядком — использование asyncio.Queue. Этот объект будет использован дважды:
- Когда мы добавляем новые сообщения в очередь. Вставка сообщений «пробуждает» опрос в конечной точке SSE.
await sessions[session_id].put(query)
- Когда мы извлекаем сообщения из очереди. В этой строке код блокируется до тех пор, пока не поступит сигнал из очереди: «Эй, у меня новые данные!»:
данные = ожидание очереди.get()
Эта модель имеет ряд преимуществ:
- У каждого пользователя своя очередь
- Риска гоночных условий нет.
Потоковое моделирование
В этой статье мы смоделируем ответ LLM, разбив запрос пользователя на слова и возвращая их по одному. В части 3 мы подключим к этому настоящий LLM.
Потоковая передача данных осуществляется через объект StreamingResponse из FastAPI. Этот объект ожидает асинхронный генератор, который будет выдавать данные до завершения работы генератора. Необходимо использовать ключевое слово yield вместо return, иначе генератор просто остановится после первой итерации.
Давайте разложим нашу потоковую функцию:
Во-первых, нам нужно убедиться, что у нас есть очередь для текущего сеанса, из которой мы будем извлекать сообщения:
если session_id отсутствует в sessions: print(f»Сессия {session_id} не найдена!») return queue = sessions[session_id]
Далее, как только у нас появится очередь, мы извлечём из неё сообщения, если они там есть. В противном случае код приостанавливается и ждёт прибытия сообщений. Это самая важная часть нашей функции:
# Это блокирует, пока не поступят данные print(f»Ожидание сообщения в сеансе {session_id}») data = await queue.get() print(f»Получено сообщение: {data}»)
Для имитации потока мы теперь разделим сообщение на слова (называемые здесь токенами) и добавим несколько пауз для имитации процесса генерации текста из LLM (части asyncio.sleep). Обратите внимание, что данные, которые мы получаем, на самом деле представляют собой HTML-строки, инкапсулированные в строку, начинающуюся с «data:». Именно так отправляются сообщения SSE. Вы также можете пометить свои сообщения метаданными «event:». Например:
событие: my_custom_event данные:
Давайте посмотрим, как мы реализуем это в Python (для пуристов: используйте шаблоны Jinja для визуализации HTML вместо строки:) ):
message = «» # Первая пауза, чтобы браузер отобразил «Думаю, когда сообщение отправлено» await asyncio.sleep(1) # Имитация потоковой передачи путем разделения сообщения на слова for token in data.replace(«n», » «).split(» «): # Добавляем токены к сообщению message += token + » » # Оборачиваем сообщение в HTML-теги с метаданными «data» data = f»»»data:
nn»»» yield data # Пауза для имитации процесса генерации LLM await asyncio.sleep(0.03) queue.task_done()
Внешний интерфейс
У нашего фронтенда есть две задачи: отправлять запросы пользователей бэкенду и прослушивать сообщения SSE на определённом канале (session_id). Для этого мы применяем концепцию «Разделения концепций», то есть каждый элемент HTMX отвечает только за одну задачу.
- форма отправляет пользовательский ввод
- прослушиватель sse управляет потоковой передачей
- ul чат отображает сообщение
Для отправки сообщений мы будем использовать стандартное текстовое поле ввода в форме. Вот как работает HTMLX:
Если вы помните статью из части 1, у нас есть несколько атрибутов HTMX, которые заслуживают пояснений:
- hx-post: Конечная точка, в которой будут отправлены данные формы.
- hx-swap: Установите значение none, поскольку в нашем случае конечная точка не возвращает никаких данных.
- hx-trigger: указывает, какое событие вызовет запрос
- hx-on::before-request: Очень лёгкий фрагмент на JavaScript, добавляющий приложению живости. Мы добавим запрос пользователя в список в чате и покажем пользователю сообщение «В раздумьях», пока ожидаем поток сообщений SSE. Это приятнее, чем смотреть на пустую страницу.
Стоит отметить, что мы фактически передаем на бэкенд два параметра: данные, введенные пользователем, и идентификатор сессии. Таким образом, сообщение будет помещено в нужную очередь на бэкенде.
Затем мы определяем еще один компонент, который специально предназначен для прослушивания сообщений SSE.
Примечание: для замены отдельных элементов DOM можно было бы использовать другие методы, например, внеполосные (Out-of-band) замены. Они работают немного иначе, поскольку требуют определённого идентификатора для поиска в DOM. В нашем случае мы намеренно решили не назначать идентификаторы каждому записанному элементу списка.
Настоящий чат-бот с использованием Google Agent Development Kit
Сейчас самое время заменить нашу фиктивную конечную точку потоковой передачи на настоящую LLM. Для этого мы создадим агента с использованием Google ADK, оснащённого инструментами и памятью для извлечения информации и запоминания деталей разговора.
Очень краткое введение в агентов
Вы, вероятно, уже знаете, что такое степень магистра права (LLM), по крайней мере, я предполагаю. Главный недостаток LLM на сегодняшний день заключается в том, что только LLM не может получить доступ к информации в режиме реального времени: их знания сохраняются на момент обучения. Другой недостаток — невозможность доступа к информации, выходящей за рамки их обучения (например, к внутренним данным вашей компании).
Агенты — это тип приложений искусственного интеллекта, которые могут рассуждать, действовать и наблюдать. Рассуждением занимается LLM, «мозг». «Руки» агентов — это то, что мы называем «инструментами», и они могут принимать различные формы:
- функция Python, например, для получения API
- сервер MCP, который является стандартом, позволяющим агентам подключаться к API через стандартизированный интерфейс (например, получать доступ ко всем инструментам Gsuite без необходимости самостоятельно писать коннекторы API)
- другие агенты (в этом случае этот шаблон называется делегированием агента, где маршрутизатор или главные агенты контролируют различные подагенты)
В нашей демонстрации, чтобы максимально упростить задачу, мы будем использовать очень простой агент, который может использовать один инструмент: Google Поиск. Это позволит нам получать актуальную информацию и гарантировать её достоверность (по крайней мере, мы надеемся, что результаты Google Поиска…)
В мире Google ADK агентам нужна базовая информация:
- имя и описание, в основном для целей документирования
- инструкции: подсказка, определяющая поведение агента (используемые инструменты, формат вывода, шаги для выполнения и т. д.)
- инструменты: функции / серверы MCP / агенты, которые агент может использовать для выполнения своей задачи
Существуют также и другие концепции управления памятью и сеансами, но они выходят за рамки данной статьи.
Без лишних слов, давайте дадим определение нашему агенту!
Потоковый поисковый агент Google
из google.adk.agents импорт Agent из google.adk.agents.run_config импорт RunConfig, StreamingMode из google.adk.runners импорт Runner из google.adk.sessions импорт InMemorySessionService из google.genai импорт типов из google.adk.tools импорт google_search # Определите константы для агента APP_NAME = «default» # Приложение USER_ID = «default» # Пользователь SESSION = «default» # Сеанс MODEL_NAME = «gemini-2.5-flash-lite» # Шаг 1. Создайте агента LLM root_agent = Agent( model=MODEL_NAME, name=»text_chat_bot», description=»Текстовый чат-бот», instruction=»Вы полезный помощник. Ваша цель — отвечать на вопросы, основываясь на ваших знаниях. Используйте инструмент поиска Google, чтобы предоставлять актуальную и самую точную информацию», tools=[google_search] ) # Шаг 2. Настройте управление сеансами # InMemorySessionService хранит разговоры в оперативной памяти (временно) session_service = InMemorySessionService() # Шаг 3: Создание Runner runner = Runner(agent=root_agent, app_name=APP_NAME, session_service=session_service)
Объект `Runner` действует как посредник между вами и агентом.
Далее мы (пере)определяем нашу конечную точку `/stream`. Сначала мы проверяем наличие сеанса агента, в противном случае создаём его:
# Попытка создать новый сеанс или получить существующий try: session = await session_service.create_session( app_name=APP_NAME, user_id=USER_ID, session_id=session_id ) except: session = await session_service.get_session( app_name=APP_NAME, user_id=USER_ID, session_id=session_id )
Затем мы берем запрос пользователя и передаем его агенту в асинхронном режиме, чтобы получить обратно поток:
# Преобразовать строку запроса в формат содержимого ADK query = types.Content(role=»user», parts=[types.Part(text=query)]) # Асинхронно передать ответ агента async for event in runner.run_async( user_id=USER_ID, session_id=session.id, new_message=query, run_config=RunConfig(streaming_mode=StreamingMode.SSE) ):
Далее есть один нюанс. При формировании ответа агент может вывести двойной перенос строки «nn». Это проблематично, поскольку события SSE заканчиваются этим символом. Таким образом, наличие двойного переноса строки в строке означает:
- ваше текущее сообщение будет сокращено
- Ваше следующее сообщение будет неправильно отформатировано, и поток SSE остановится.
Вы можете попробовать сделать это самостоятельно. Чтобы исправить это, мы воспользуемся небольшим хаком, а также другим небольшим хаком для форматирования элементов списка (я использую Tailwind CSS, который переопределяет некоторые CSS-правила). Хак такой:
если event.partial: message += event.content.parts[0].text # Хак здесь html_content = markdown.markdown(message, extensions=['fenced_code']).replace(«n», «
«).replace(«
- «, «
- AI{html_content}
- «, «
- «).replace(«
- «, «
- ИИ{html_content}
- «) full_html = f»»»data:
nn»»» yield full_html queue.task_done() return StreamingResponse(response_stream(), media_type=»text/event-stream»)
И всё! Вы сможете общаться со своим чатом!
Ниже я добавляю небольшой фрагмент CSS для форматирования блоков кода. Если вы хотите, чтобы ваш чат создавал фрагменты кода, вам нужно, чтобы они были правильно отформатированы. Вот HTML:
pre, code { background-color: black; color: lightgrey; padding: 1%; border-radius: 10px; white-space: pre-wrap; font-size: 0.8rem; letter-spacing: -1px; }
Теперь вы также можете генерировать фрагменты кода:

Разум = взорван
Обзор рабочего процесса
При менее чем 200 строках кода мы смогли написать чат со следующей последовательностью действий, передать ответ с сервера и очень красиво отобразить его, поэкспериментировав с SSE и HTMX.
Пользователь вводит «Hello World» → Отправить ├── 1. Добавить «Me: Hello World» в чат ├── 2. Добавить «AI: Thinking…» в чат ├── 3. Отправить сообщение в чат ├── 4. Сервер помещает сообщение в очередь ├── 5. Поток SSE выдает ответ LLM на основе запроса ├── 6. Поток «AI: This» (заменяет «Thinking…») ├── 7. Поток «AI: This is the answer …» └── 8. Завершить
Заключение
В этой серии статей мы показали, как легко разработать приложение чат-бота практически без JavaScript и тяжёлого JavaScript-фреймворка, используя только Python и HTML. Мы рассмотрели такие темы, как серверный рендеринг, события, отправленные сервером (SSE), асинхронная потоковая передача и агенты, с помощью волшебной библиотеки HTMX.
Основная цель этих статей — показать, что веб-приложения доступны разработчикам, не использующим JavaScript. На самом деле, существует очень веская и обоснованная причина не использовать JavaScript для веб-разработки каждый раз, и, хотя JavaScript — мощный язык, сегодня мне кажется, что его иногда используют слишком часто, заменяя более простые, но надёжные подходы. Споры о серверных и клиентских приложениях ведутся уже давно и ещё не закончены, но я надеюсь, что эти статьи открыли глаза некоторым из вас и, в конце концов, чему-то вас научили.
Следите за обновлениями!
Источник: towardsdatascience.com
- «) full_html = f»»»data:
nn»»» yield full_html
Таким образом мы гарантируем, что никакие двойные переносы строк не нарушат наш поток SSE.
Полный код маршрута приведен ниже:
@app.get(«/stream/{session_id}») async def stream(session_id: str): async def response_stream(): if session_id not in sessions: print(f»Session {session_id} not found!») return # Попытка создать новый сеанс или извлечь существующий try: session = await session_service.create_session( app_name=APP_NAME, user_id=USER_ID, session_id=session_id ) except: session = await session_service.get_session( app_name=APP_NAME, user_id=USER_ID, session_id=session_id ) queue = sessions[session_id] # Это БЛОКИРУЕТ, пока не поступят данные print(f»Ожидание сообщения в сеансе {session_id}») query = await queue.get() print(f»Получено сообщение: {query}») message = «» # Преобразуем строку запроса в формат содержимого ADK query = types.Content(role=»user», parts=[types.Part(text=query)]) # Асинхронно передавать ответ агента async for event in runner.run_async( user_id=USER_ID, session_id=session.id, new_message=query, run_config=RunConfig(streaming_mode=StreamingMode.SSE) ): if event.partial: message += event.content.parts[0].text html_content = markdown.markdown(message, extensions=['fenced_code']).replace(«n», «
«).replace(«






















