Архив рубрики ~Лента новостей~

Разработка пользовательского плагина GStreamer для NVIDIA DeepStream

Разработка пользовательского плагина GStreamer для NVIDIA DeepStream
Разработка пользовательского плагина GStreamer для NVIDIA DeepStream

Зачем нужна пользовательская обработка данных в DeepStream?

Делиться

Фото Джулиана Хохгесанга на Unsplash

NVIDIA DeepStream предоставляет готовый к использованию конвейер для многопотоковой видеоаналитики: аппаратное ускорение декодирования, отслеживание, отображение на экране и обмен сообщениями — все это объединено через GStreamer. Для стандартных моделей обнаружения, экспортируемых в TensorRT, nvinfer берет на себя все задачи.

Однако у этого типичного случая есть ограничения. Модели обработки изображений и языка, пользовательская постобработка, повернутые ограничивающие рамки или необходимость быстрой замены моделей во время выполнения — вот где предположения nvinfer перестают работать. Иногда у вас есть зрелый стек вывода PyTorch, тщательно настроенный вашей командой, и вы хотите, чтобы DeepStream вызывал его, а не переписывал его в конфигурационном файле.

Стоит отметить, что, если говорить конкретно о моделях семейства YOLO, то DeepStream-Yolo от Маркоса Лучано уже проделал отличную работу по реализации пользовательской постобработки на C++. Если C++ доступен, начните с него. В этой статье рассматривается другой подход: достижение того же результата полностью на Python с использованием пользовательского плагина GStreamer с pyservicemaker без ущерба для пропускной способности.

Ключевое преимущество этого подхода заключается в том, что нижестоящим элементам, таким как nvtracker , nvdsosd и nvmsgconv всё равно, какой именно элемент сгенерировал метаданные обнаружения. Если правильно записывать данные в структуру метаданных DeepStream, остальная часть экосистемы будет работать так, как будто nvinfer никогда и не был задействован.

Метаданные DeepStream

Каждый буфер, проходящий через конвейер DeepStream, содержит не только пиксельные данные. С момента прохождения кадров через nvstreammux к каждому GstBuffer прикрепляется структура NvDsBatchMeta . Иерархия проста и описана в официальной документации.

 NvDsBatchMeta ├── NvDsUserMeta (batch-level custom metadata) └── NvDsFrameMeta (one per source stream) ├── NvDsUserMeta (frame-level custom metadata) └── NvDsObjectMeta (one per detected object) ├── NvDsClassifierMeta └── NvDsUserMeta (object-level custom metadata)

NvDsBatchMeta описывает весь пакет данных. Каждый NvDsFrameMeta соответствует одному исходному потоку и содержит информацию на уровне кадра, такую как идентификатор источника и номер кадра. Каждый NvDsObjectMeta представляет собой отдельное обнаружение, то есть, когда наш плагин записывает обнаружения, мы будем записывать NvDsObjectMeta для каждого из них.

Крайне важно понимать, что ничто из этого не принадлежит nvinfer . Это общий контракт на использование данных. Любой элемент GStreamer в конвейере может читать из него, записывать в него или делать и то, и другое:

  • nvtracker считывает ограничивающие рамки объектов и записывает идентификаторы отслеживания.
  • nvdsosd считывает прямоугольники и подписи для отрисовки наложений.
  • Функция nvmsgconv считывает всю структуру для создания полезной нагрузки сообщения.

Наш пользовательский плагин просто будет записывать результаты обнаружения в эту структуру так же, как это делал бы nvinfer , и все последующие процессы будут их обрабатывать без изменений. Важно понимать одно ограничение, прежде чем писать какой-либо код: экземпляры NvDsObjectMeta нельзя создавать напрямую из Python . Попытка создания экземпляра класса вызовет ошибку » No constructor defined! во время выполнения.

Причина кроется в архитектуре. DeepStream управляет своими метаданными через пулы памяти — предварительно выделенные блоки, которые переиспользуются между кадрами, чтобы избежать накладных расходов на повторное выделение и освобождение памяти в высокопроизводительном конвейере. Эти пулы принадлежат NvDsBatchMeta и находятся на стороне C. Привязки Python предоставляют доступ к этим пулам, но намеренно не предоставляют конструктор на стороне Python, поскольку создание NvDsObjectMeta вне пула обошло бы управление жизненным циклом, которое обеспечивает предсказуемое использование памяти DeepStream. Правильный способ получить его — запросить его у пакета: batch_meta.acquire_object_meta() , который возвращает вам предварительно выделенный экземпляр из пула. Когда кадр завершится, DeepStream автоматически вернет его в пул.

Мост Python: pyservicemaker

Для взаимодействия с метаданными DeepStream из Python мы будем использовать pyservicemaker — текущий поддерживаемый NVIDIA SDK для Python для DeepStream. Официальная документация описывает основы конвейеров и потоков, но не показывает, как записывать и прикреплять метаданные из пользовательского элемента вывода. Именно этот пробел и восполняет данная статья.

Ключевой абстракцией является BatchMetadataOperator . Создание подкласса и реализация функции handle_metadata(batch_meta) дает доступ к полному NvDsBatchMeta для каждого буфера, проходящего через конвейер. Далее, итерация кадров сводится к использованию batch_meta.frame_items и прикреплению объекта обнаружения.

pyservicemaker также предоставляет обертку Buffer вокруг Gst.Buffer , которая напрямую предоставляет доступ к batch_meta и, что важно, к методу extract(batch_id) , возвращающему дескриптор DLPack для памяти GPU каждого кадра. Именно это делает возможным вывод без копирования, поскольку мы можем передавать кадр непосредственно в TensorRT, не покидая GPU.

Вместо использования BatchMetadataOperator отдельно через зонд, мы встроим тот же шаблон непосредственно в метод do_transform_ip нашего пользовательского плагина, что даст нам контроль над жизненным циклом элемента, его свойствами и согласованием ограничений, а также доступом к метаданным. Но сначала нам нужно собрать этот плагин.

Обнаруживаемый плагин Python для GStreamer

GStreamer обнаруживает плагины во время выполнения, сканируя каталоги, указанные в переменной GST_PLUGIN_PATH . В частности, для плагинов Python он ищет их внутри подкаталога python/ в каждом из этих путей. Это означает, что ваш плагин — это просто файл .py , помещенный в нужное место, без компиляции, без CMake, без разделяемой библиотеки. Компромисс заключается в том, что шаблон регистрации строгий, и ошибка приводит к скрытым сбоям, которые действительно трудно отладить.

 $GST_PLUGIN_PATH/ └── python/ └── gstexampleplugin.py # your plugin

Установите GST_PLUGIN_PATH так, чтобы она указывала на родительский каталог, и GStreamer автоматически найдет python/gstexampleplugin.py при следующем запуске конвейера.

Скелет плагина

Вот минимальная структура элемента сквозного вывода: он получает пакеты видеобуферов, выполняет вывод, добавляет метаданные и передает буфер дальше без изменений.

 import gi gi.require_version('Gst', '1.0') gi.require_version('GstBase', '1.0') from gi.repository import Gst, GstBase, GObject import torch from pyservicemaker import Buffer GST_PLUGIN_NAME = "gstexampleplugin" Gst.init(None) class GstExamplePlugin(GstBase.BaseTransform): __gstmetadata__ = ( 'GstExamplePlugin', # name 'Filter/Effect/Video', # classification 'Custom inference element', # description 'Your Name' # author ) src_format = Gst.Caps.from_string( "video/x-raw(memory:NVMM), format=RGB, " "width=(int)[ 1, 2147483647 ], height=(int)[ 1, 2147483647 ], " "framerate=(fraction)[ 0/1, 2147483647/1 ]" ) sink_format = Gst.Caps.from_string( "video/x-raw(memory:NVMM), format=RGB, " "width=(int)[ 1, 2147483647 ], height=(int)[ 1, 2147483647 ], " "framerate=(fraction)[ 0/1, 2147483647/1 ]" ) src_pad_template = Gst.PadTemplate.new( "src", Gst.PadDirection.SRC, Gst.PadPresence.ALWAYS, src_format ) sink_pad_template = Gst.PadTemplate.new( "sink", Gst.PadDirection.SINK, Gst.PadPresence.ALWAYS, sink_format ) __gsttemplates__ = (src_pad_template, sink_pad_template) __gproperties__ = { 'model-engine': ( str, 'TensorRT engine path', 'Path to the .engine file', '', GObject.ParamFlags.READWRITE ), 'confidence-threshold': ( float, 'Confidence threshold', 'Minimum confidence to attach a detection', 0.0, 1.0, 0.5, GObject.ParamFlags.READWRITE ), } def __init__(self): super().__init__() self.model_engine = '' self.confidence_threshold = 0.5 self.engine = None def do_get_property(self, prop): if prop.name == 'model-engine': return self.model_engine elif prop.name == 'confidence-threshold': return self.confidence_threshold def do_set_property(self, prop, value): if prop.name == 'model-engine': self.model_engine = value elif prop.name == 'confidence-threshold': self.confidence_threshold = value def do_start(self): # Load your TensorRT engine here self.engine = load_engine(self.model_engine) # This function should be implemented return True def do_transform_ip(self, gst_buffer: Gst.Buffer) -> Gst.FlowReturn: """In-place transform: attach metadata, pass buffer unchanged.""" buffer = Buffer(gst_buffer) batch_meta = buffer.batch_meta frames = [] for frame_meta in batch_meta.frame_items: t = torch.utils.dlpack.from_dlpack(buffer.extract(frame_meta.batch_id)) frames.append(t) batch = torch.stack(frames, dim=0) # Run your model inference results = self.engine(batch) # Now we will need to iterate over the results for each frame # and attach it to the object_meta in case it is detection/segmentation # otherwise we can do it as user_meta # The following is pseudocode, which depends on your inference for frame_meta in batch_meta.frame_items: for det in results: obj = batch_meta.acquire_object_meta() # Fill the obj with each detection ... frame_meta.append(obj) return Gst.FlowReturn.OK # --- Registration --- GObject.type_register(GstExamplePlugin) __gstelementfactory__ = (GST_PLUGIN_NAME, Gst.Rank.NONE, GstExamplePlugin)

Несколько важных моментов, касающихся этого скелета:

GstBase.BaseTransform является подходящим базовым классом для фильтра, работающего на месте, который принимает буфер, изменяет его (путем добавления метаданных) и передает дальше. Мы переопределяем do_transform_ip вместо do_transform потому что не выделяем новый выходной буфер.

__gstmetadata__ и __gsttemplates__ не являются необязательными. GStreamer не зарегистрирует элемент без них. Строка caps video/x-raw(memory:NVMM) сообщает GStreamer, что этот элемент работает с памятью NVIDIA, что крайне важно для работы на графическом процессоре в конвейере DeepStream.

__gproperties__ предоставляет доступ к model-engine и confidence-threshold как к полноценным свойствам GStreamer, что означает, что вы можете устанавливать их из командной строки gst-launch или из кода конвейера Python, не затрагивая исходный код.

Последние две строки необходимы для регистрации: GObject.type_register сообщает системе типов GObject о классе, а __gstelementfactory__ сообщает GStreamer, какое имя элемента следует предоставить и какой класс следует создать.

Проверка плагина. После того, как файл будет установлен и кэш очищен, подтвердите регистрацию с помощью:

 GST_PLUGIN_PATH=/path/to/your/plugins gst-inspect-1.0 gstexampleplugin

Вы должны увидеть метаданные элемента, шаблоны отступов и оба свойства в списке. Если вы их видите, значит, GStreamer знает о вашем плагине, и вы готовы добавить его в конвейер обработки.

Пример сквозного вывода данных с помощью Ultralytics

После создания базовой структуры плагина пришло время заполнить логику вывода. Полный рабочий код доступен в виде GitHub Gist. Как только он станет доступен, вы сможете изучить его, как мы делали раньше, или запустить конвейер. Вот простой пример, который просто выполняет вывод и отображает частоту кадров:

 gst-launch-1.0 -v  nvstreammux name=m width=1280 height=720 batch-size=1  batched-push-timeout=33000 !  nvvideoconvert nvbuf-memory-type=0 !  'video/x-raw(memory:NVMM), format=RGB' !  gstyoloplugin model-path=/path/to/yolo26s.engine !  fpsdisplaysink text-overlay=false silent=false sync=false  video-sink=fakesink  uridecodebin uri=file:///path/to/video.mp4 ! m.sink_0

Проверка кода

Проблема совместимости

Если вы читаете код, вы, возможно, заметили, что мы переопределяем объект tuple , но только внутри модуля ultralytics.nn.backends.tensorrt , поскольку именно там кроется проблема. Существует известный крайний случай совместимости между привязками Python к TensorRT и фреймворком-оберткой Python для GStreamer ( PyGObject ) , который приводит к сбою конвейера с печально известным сообщением «Segmentation fault (core dumped)». Именно поэтому нам потребовалось создать этот фрагмент кода, который помогает сохранить желаемое поведение:

 import ultralytics.nn.backends.tensorrt as trt_backend _original_tuple = tuple def safe_tuple(obj): if "tensorrt" in type(obj).__module__ and type(obj).__name__ == "Dims": return _original_tuple(obj[i] for i in range(len(obj))) return _original_tuple(obj) trt_backend.tuple = safe_tuple

Это заменяет ссылку tuple внутри пространства имен бэкенда Ultralytics во время выполнения на версию, которая использует доступ по индексу для объектов Dims , оставляя все остальное без изменений. Это не элегантное решение, но оно требует точной настройки и должно выполняться во время импорта, до создания экземпляра какой-либо модели.

Цикл вывода

Сам цикл вывода довольно прост:

  1. Извлеките кадры из буферов.
  2. Предварительная обработка + вывод
  3. Если нижестоящие элементы конвейера являются плагинами глубокого потока, результаты следует прикрепить к метаданным объекта каждого кадра.

Ниже приведён фрагмент кода для работы с нулевым копированием с использованием DLPack:

 frames = [] for frame_meta in batch_meta.frame_items: t = torch.utils.dlpack.from_dlpack(buffer.extract(frame_meta.batch_id)) frames.append(t) batch = torch.stack(frames, dim=0)

Предварительная обработка входных данных

Согласно документации, модели YOLO при обработке объекта torch.Tensor ожидают фиксированную форму входных данных (N, 3, 640, 640) . Однако кадры, поступающие из nvstreammux , будут иметь то разрешение, которое указано в исходном файле. Используемый подход — это масштабирование с черными полосами по бокам: кадр масштабируется так, чтобы соответствовать целевым размерам, сохраняя при этом соотношение сторон, а затем заполняется оставшееся пространство. Ключевая идея здесь заключается в том, что мы можем сделать это полностью на графическом процессоре, обрабатывая весь пакет данных одновременно, не затрагивая память центрального процессора.

Благодаря тому, что извлечение кадров, добавление черных полос по краям, вывод результатов и инверсия координат выполняются на графическом процессоре в рамках одного вызова do_transform_ip , плагин ведет себя точно так же, как nvinfer с точки зрения каждого последующего элемента, но с полной гибкостью стека вывода результатов на Python.

Далее в дело вступает остальная часть конвейера DeepStream: nvtracker присваивает идентификаторы, nvdsosd отрисовывает наложения, а nvmsgconv сериализует полезные данные.

Практические выводы и дальнейшие шаги

Если вы дочитали до этого момента, у вас есть рабочий шаблон для замены nvinfer собственным элементом вывода на Python, и, что более важно, вы понимаете, почему каждый элемент устроен именно так.

Данная модель является обобщающей. Все описанное здесь: каркас плагина, пакетная предварительная обработка и прикрепление метаданных — не зависит от модели. Замена Ultralytics YOLO на rfdetr от Roboflow не представляет сложности, а структура GStreamer и pyservicemaker остается идентичной. То же самое справедливо и для более экзотических архитектур: в собственном репозитории NVIDIA deepstream_reference_apps есть рабочий пример интеграции модели «зрение-язык» через vLLM с использованием именно этого подхода с плагинами, который стоит изучить, если вы стремитесь выйти за рамки обнаружения и перейти к пониманию видео.

Полный код плагина доступен в виде GitHub Gist. Если вы создадите на его основе что-нибудь: другую модель, многопотоковую конфигурацию или интеграцию с VLM, мне будет интересно узнать о результатах. Удачного кодирования!

Давид Редо Ньето Посмотреть все работы Давида Редо Ньето

Источник: towardsdatascience.com

Оцените материал:

Поделиться
Понравилась статья? Расскажите другим
ВКонтакте
Читайте также
Архив рубрики ~Коротко из Telegram~ Московский метрополитен начал масштабный перевод пассажирской инфраструктуры на российскую операционную… Архив рубрики ~Коротко из Telegram~ Кстати, если вы хотели сделать свою wiki-LLM, но было жалко… Архив рубрики ~Обо всем~ Искусственный интеллект отображает скрытые факторы, влияющие на выживаемость при раке во всем мире. Новости робототехники TechCrunch Mobility: Новая таблица показателей роботакси демонстрирует доминирование Китая. Новости робототехники Прорыв, благодаря которому лица роботов стали менее жуткими. Новости робототехники Роботизированная рука, созданная по образцу осьминога, использует тактильные датчики в присосках для автономного захвата под водой. Архив рубрики ~Обо всем~ Лучшие предложения Sam's Club, способные конкурировать с Prime Day 2026 (включая скидку 50% на членство). Архив рубрики ~Обо всем~ Наушники AirPods могут измерять частоту сердечных сокращений, но насколько точны эти измерения? Архив рубрики ~Обо всем~ Этот ИИ обнаруживает опасные клетки крови, которые врачи часто пропускают. Новости робототехники Как насчёт роботов с лейблом Made in Kazakhstan? Это не… Новости робототехники Будущий дизайн бионических рук. Ради собственного развития решил поэксперементировать, как… Архив рубрики ~Коротко из Telegram~ OpenAI забрала у Google одного из ключевых AI-ресёрчеров Ноам Шазир… Архив рубрики ~Коротко из Telegram~ Hyundai выкупил у холдинга SoftBank оставшуюся долю акций Boston Dynamics,… Архив рубрики ~Коротко из Telegram~ 🤖Perplexity — большое обновление Computer + новость которая удивит 🖥Deep… Архив рубрики ~Коротко из Telegram~ Московский метрополитен начал масштабный перевод пассажирской инфраструктуры на российскую операционную… Архив рубрики ~Коротко из Telegram~ Кстати, если вы хотели сделать свою wiki-LLM, но было жалко… Архив рубрики ~Обо всем~ Искусственный интеллект отображает скрытые факторы, влияющие на выживаемость при раке во всем мире. Новости робототехники TechCrunch Mobility: Новая таблица показателей роботакси демонстрирует доминирование Китая. Новости робототехники Прорыв, благодаря которому лица роботов стали менее жуткими. Новости робототехники Роботизированная рука, созданная по образцу осьминога, использует тактильные датчики в присосках для автономного захвата под водой. Архив рубрики ~Обо всем~ Лучшие предложения Sam's Club, способные конкурировать с Prime Day 2026 (включая скидку 50% на членство). Архив рубрики ~Обо всем~ Наушники AirPods могут измерять частоту сердечных сокращений, но насколько точны эти измерения? Архив рубрики ~Обо всем~ Этот ИИ обнаруживает опасные клетки крови, которые врачи часто пропускают. Новости робототехники Как насчёт роботов с лейблом Made in Kazakhstan? Это не… Новости робототехники Будущий дизайн бионических рук. Ради собственного развития решил поэксперементировать, как… Архив рубрики ~Коротко из Telegram~ OpenAI забрала у Google одного из ключевых AI-ресёрчеров Ноам Шазир… Архив рубрики ~Коротко из Telegram~ Hyundai выкупил у холдинга SoftBank оставшуюся долю акций Boston Dynamics,… Архив рубрики ~Коротко из Telegram~ 🤖Perplexity — большое обновление Computer + новость которая удивит 🖥Deep…

Оставить комментарий