Разработка пользовательского плагина GStreamer для NVIDIA DeepStream
Зачем нужна пользовательская обработка данных в DeepStream?
Делиться
NVIDIA DeepStream предоставляет готовый к использованию конвейер для многопотоковой видеоаналитики: аппаратное ускорение декодирования, отслеживание, отображение на экране и обмен сообщениями — все это объединено через GStreamer. Для стандартных моделей обнаружения, экспортируемых в TensorRT, nvinfer берет на себя все задачи.
Однако у этого типичного случая есть ограничения. Модели обработки изображений и языка, пользовательская постобработка, повернутые ограничивающие рамки или необходимость быстрой замены моделей во время выполнения — вот где предположения nvinfer перестают работать. Иногда у вас есть зрелый стек вывода PyTorch, тщательно настроенный вашей командой, и вы хотите, чтобы DeepStream вызывал его, а не переписывал его в конфигурационном файле.
Стоит отметить, что, если говорить конкретно о моделях семейства YOLO, то DeepStream-Yolo от Маркоса Лучано уже проделал отличную работу по реализации пользовательской постобработки на C++. Если C++ доступен, начните с него. В этой статье рассматривается другой подход: достижение того же результата полностью на Python с использованием пользовательского плагина GStreamer с pyservicemaker без ущерба для пропускной способности.
Ключевое преимущество этого подхода заключается в том, что нижестоящим элементам, таким как nvtracker , nvdsosd и nvmsgconv всё равно, какой именно элемент сгенерировал метаданные обнаружения. Если правильно записывать данные в структуру метаданных DeepStream, остальная часть экосистемы будет работать так, как будто nvinfer никогда и не был задействован.
Метаданные DeepStream
Каждый буфер, проходящий через конвейер DeepStream, содержит не только пиксельные данные. С момента прохождения кадров через nvstreammux к каждому GstBuffer прикрепляется структура NvDsBatchMeta . Иерархия проста и описана в официальной документации.
NvDsBatchMeta ├── NvDsUserMeta (batch-level custom metadata) └── NvDsFrameMeta (one per source stream) ├── NvDsUserMeta (frame-level custom metadata) └── NvDsObjectMeta (one per detected object) ├── NvDsClassifierMeta └── NvDsUserMeta (object-level custom metadata)
NvDsBatchMeta описывает весь пакет данных. Каждый NvDsFrameMeta соответствует одному исходному потоку и содержит информацию на уровне кадра, такую как идентификатор источника и номер кадра. Каждый NvDsObjectMeta представляет собой отдельное обнаружение, то есть, когда наш плагин записывает обнаружения, мы будем записывать NvDsObjectMeta для каждого из них.
Крайне важно понимать, что ничто из этого не принадлежит nvinfer . Это общий контракт на использование данных. Любой элемент GStreamer в конвейере может читать из него, записывать в него или делать и то, и другое:
-
nvtrackerсчитывает ограничивающие рамки объектов и записывает идентификаторы отслеживания. -
nvdsosdсчитывает прямоугольники и подписи для отрисовки наложений. - Функция
nvmsgconvсчитывает всю структуру для создания полезной нагрузки сообщения.
Наш пользовательский плагин просто будет записывать результаты обнаружения в эту структуру так же, как это делал бы nvinfer , и все последующие процессы будут их обрабатывать без изменений. Важно понимать одно ограничение, прежде чем писать какой-либо код: экземпляры NvDsObjectMeta нельзя создавать напрямую из Python . Попытка создания экземпляра класса вызовет ошибку » No constructor defined! во время выполнения.
Причина кроется в архитектуре. DeepStream управляет своими метаданными через пулы памяти — предварительно выделенные блоки, которые переиспользуются между кадрами, чтобы избежать накладных расходов на повторное выделение и освобождение памяти в высокопроизводительном конвейере. Эти пулы принадлежат NvDsBatchMeta и находятся на стороне C. Привязки Python предоставляют доступ к этим пулам, но намеренно не предоставляют конструктор на стороне Python, поскольку создание NvDsObjectMeta вне пула обошло бы управление жизненным циклом, которое обеспечивает предсказуемое использование памяти DeepStream. Правильный способ получить его — запросить его у пакета: batch_meta.acquire_object_meta() , который возвращает вам предварительно выделенный экземпляр из пула. Когда кадр завершится, DeepStream автоматически вернет его в пул.
Мост Python: pyservicemaker
Для взаимодействия с метаданными DeepStream из Python мы будем использовать pyservicemaker — текущий поддерживаемый NVIDIA SDK для Python для DeepStream. Официальная документация описывает основы конвейеров и потоков, но не показывает, как записывать и прикреплять метаданные из пользовательского элемента вывода. Именно этот пробел и восполняет данная статья.
Ключевой абстракцией является BatchMetadataOperator . Создание подкласса и реализация функции handle_metadata(batch_meta) дает доступ к полному NvDsBatchMeta для каждого буфера, проходящего через конвейер. Далее, итерация кадров сводится к использованию batch_meta.frame_items и прикреплению объекта обнаружения.
pyservicemaker также предоставляет обертку Buffer вокруг Gst.Buffer , которая напрямую предоставляет доступ к batch_meta и, что важно, к методу extract(batch_id) , возвращающему дескриптор DLPack для памяти GPU каждого кадра. Именно это делает возможным вывод без копирования, поскольку мы можем передавать кадр непосредственно в TensorRT, не покидая GPU.
Вместо использования BatchMetadataOperator отдельно через зонд, мы встроим тот же шаблон непосредственно в метод do_transform_ip нашего пользовательского плагина, что даст нам контроль над жизненным циклом элемента, его свойствами и согласованием ограничений, а также доступом к метаданным. Но сначала нам нужно собрать этот плагин.
Обнаруживаемый плагин Python для GStreamer
GStreamer обнаруживает плагины во время выполнения, сканируя каталоги, указанные в переменной GST_PLUGIN_PATH . В частности, для плагинов Python он ищет их внутри подкаталога python/ в каждом из этих путей. Это означает, что ваш плагин — это просто файл .py , помещенный в нужное место, без компиляции, без CMake, без разделяемой библиотеки. Компромисс заключается в том, что шаблон регистрации строгий, и ошибка приводит к скрытым сбоям, которые действительно трудно отладить.
$GST_PLUGIN_PATH/ └── python/ └── gstexampleplugin.py # your plugin
Установите GST_PLUGIN_PATH так, чтобы она указывала на родительский каталог, и GStreamer автоматически найдет python/gstexampleplugin.py при следующем запуске конвейера.
Скелет плагина
Вот минимальная структура элемента сквозного вывода: он получает пакеты видеобуферов, выполняет вывод, добавляет метаданные и передает буфер дальше без изменений.
import gi gi.require_version('Gst', '1.0') gi.require_version('GstBase', '1.0') from gi.repository import Gst, GstBase, GObject import torch from pyservicemaker import Buffer GST_PLUGIN_NAME = "gstexampleplugin" Gst.init(None) class GstExamplePlugin(GstBase.BaseTransform): __gstmetadata__ = ( 'GstExamplePlugin', # name 'Filter/Effect/Video', # classification 'Custom inference element', # description 'Your Name' # author ) src_format = Gst.Caps.from_string( "video/x-raw(memory:NVMM), format=RGB, " "width=(int)[ 1, 2147483647 ], height=(int)[ 1, 2147483647 ], " "framerate=(fraction)[ 0/1, 2147483647/1 ]" ) sink_format = Gst.Caps.from_string( "video/x-raw(memory:NVMM), format=RGB, " "width=(int)[ 1, 2147483647 ], height=(int)[ 1, 2147483647 ], " "framerate=(fraction)[ 0/1, 2147483647/1 ]" ) src_pad_template = Gst.PadTemplate.new( "src", Gst.PadDirection.SRC, Gst.PadPresence.ALWAYS, src_format ) sink_pad_template = Gst.PadTemplate.new( "sink", Gst.PadDirection.SINK, Gst.PadPresence.ALWAYS, sink_format ) __gsttemplates__ = (src_pad_template, sink_pad_template) __gproperties__ = { 'model-engine': ( str, 'TensorRT engine path', 'Path to the .engine file', '', GObject.ParamFlags.READWRITE ), 'confidence-threshold': ( float, 'Confidence threshold', 'Minimum confidence to attach a detection', 0.0, 1.0, 0.5, GObject.ParamFlags.READWRITE ), } def __init__(self): super().__init__() self.model_engine = '' self.confidence_threshold = 0.5 self.engine = None def do_get_property(self, prop): if prop.name == 'model-engine': return self.model_engine elif prop.name == 'confidence-threshold': return self.confidence_threshold def do_set_property(self, prop, value): if prop.name == 'model-engine': self.model_engine = value elif prop.name == 'confidence-threshold': self.confidence_threshold = value def do_start(self): # Load your TensorRT engine here self.engine = load_engine(self.model_engine) # This function should be implemented return True def do_transform_ip(self, gst_buffer: Gst.Buffer) -> Gst.FlowReturn: """In-place transform: attach metadata, pass buffer unchanged.""" buffer = Buffer(gst_buffer) batch_meta = buffer.batch_meta frames = [] for frame_meta in batch_meta.frame_items: t = torch.utils.dlpack.from_dlpack(buffer.extract(frame_meta.batch_id)) frames.append(t) batch = torch.stack(frames, dim=0) # Run your model inference results = self.engine(batch) # Now we will need to iterate over the results for each frame # and attach it to the object_meta in case it is detection/segmentation # otherwise we can do it as user_meta # The following is pseudocode, which depends on your inference for frame_meta in batch_meta.frame_items: for det in results: obj = batch_meta.acquire_object_meta() # Fill the obj with each detection ... frame_meta.append(obj) return Gst.FlowReturn.OK # --- Registration --- GObject.type_register(GstExamplePlugin) __gstelementfactory__ = (GST_PLUGIN_NAME, Gst.Rank.NONE, GstExamplePlugin)
Несколько важных моментов, касающихся этого скелета:
GstBase.BaseTransform является подходящим базовым классом для фильтра, работающего на месте, который принимает буфер, изменяет его (путем добавления метаданных) и передает дальше. Мы переопределяем do_transform_ip вместо do_transform потому что не выделяем новый выходной буфер.
__gstmetadata__ и __gsttemplates__ не являются необязательными. GStreamer не зарегистрирует элемент без них. Строка caps video/x-raw(memory:NVMM) сообщает GStreamer, что этот элемент работает с памятью NVIDIA, что крайне важно для работы на графическом процессоре в конвейере DeepStream.
__gproperties__ предоставляет доступ к model-engine и confidence-threshold как к полноценным свойствам GStreamer, что означает, что вы можете устанавливать их из командной строки gst-launch или из кода конвейера Python, не затрагивая исходный код.
Последние две строки необходимы для регистрации: GObject.type_register сообщает системе типов GObject о классе, а __gstelementfactory__ сообщает GStreamer, какое имя элемента следует предоставить и какой класс следует создать.
Проверка плагина. После того, как файл будет установлен и кэш очищен, подтвердите регистрацию с помощью:
GST_PLUGIN_PATH=/path/to/your/plugins gst-inspect-1.0 gstexampleplugin
Вы должны увидеть метаданные элемента, шаблоны отступов и оба свойства в списке. Если вы их видите, значит, GStreamer знает о вашем плагине, и вы готовы добавить его в конвейер обработки.
Пример сквозного вывода данных с помощью Ultralytics
После создания базовой структуры плагина пришло время заполнить логику вывода. Полный рабочий код доступен в виде GitHub Gist. Как только он станет доступен, вы сможете изучить его, как мы делали раньше, или запустить конвейер. Вот простой пример, который просто выполняет вывод и отображает частоту кадров:
gst-launch-1.0 -v nvstreammux name=m width=1280 height=720 batch-size=1 batched-push-timeout=33000 ! nvvideoconvert nvbuf-memory-type=0 ! 'video/x-raw(memory:NVMM), format=RGB' ! gstyoloplugin model-path=/path/to/yolo26s.engine ! fpsdisplaysink text-overlay=false silent=false sync=false video-sink=fakesink uridecodebin uri=file:///path/to/video.mp4 ! m.sink_0
Проверка кода
Проблема совместимости
Если вы читаете код, вы, возможно, заметили, что мы переопределяем объект tuple , но только внутри модуля ultralytics.nn.backends.tensorrt , поскольку именно там кроется проблема. Существует известный крайний случай совместимости между привязками Python к TensorRT и фреймворком-оберткой Python для GStreamer ( PyGObject ) , который приводит к сбою конвейера с печально известным сообщением «Segmentation fault (core dumped)». Именно поэтому нам потребовалось создать этот фрагмент кода, который помогает сохранить желаемое поведение:
import ultralytics.nn.backends.tensorrt as trt_backend _original_tuple = tuple def safe_tuple(obj): if "tensorrt" in type(obj).__module__ and type(obj).__name__ == "Dims": return _original_tuple(obj[i] for i in range(len(obj))) return _original_tuple(obj) trt_backend.tuple = safe_tuple
Это заменяет ссылку tuple внутри пространства имен бэкенда Ultralytics во время выполнения на версию, которая использует доступ по индексу для объектов Dims , оставляя все остальное без изменений. Это не элегантное решение, но оно требует точной настройки и должно выполняться во время импорта, до создания экземпляра какой-либо модели.
Цикл вывода
Сам цикл вывода довольно прост:
- Извлеките кадры из буферов.
- Предварительная обработка + вывод
- Если нижестоящие элементы конвейера являются плагинами глубокого потока, результаты следует прикрепить к метаданным объекта каждого кадра.
Ниже приведён фрагмент кода для работы с нулевым копированием с использованием DLPack:
frames = [] for frame_meta in batch_meta.frame_items: t = torch.utils.dlpack.from_dlpack(buffer.extract(frame_meta.batch_id)) frames.append(t) batch = torch.stack(frames, dim=0)
Предварительная обработка входных данных
Согласно документации, модели YOLO при обработке объекта torch.Tensor ожидают фиксированную форму входных данных (N, 3, 640, 640) . Однако кадры, поступающие из nvstreammux , будут иметь то разрешение, которое указано в исходном файле. Используемый подход — это масштабирование с черными полосами по бокам: кадр масштабируется так, чтобы соответствовать целевым размерам, сохраняя при этом соотношение сторон, а затем заполняется оставшееся пространство. Ключевая идея здесь заключается в том, что мы можем сделать это полностью на графическом процессоре, обрабатывая весь пакет данных одновременно, не затрагивая память центрального процессора.
Благодаря тому, что извлечение кадров, добавление черных полос по краям, вывод результатов и инверсия координат выполняются на графическом процессоре в рамках одного вызова do_transform_ip , плагин ведет себя точно так же, как nvinfer с точки зрения каждого последующего элемента, но с полной гибкостью стека вывода результатов на Python.
Далее в дело вступает остальная часть конвейера DeepStream: nvtracker присваивает идентификаторы, nvdsosd отрисовывает наложения, а nvmsgconv сериализует полезные данные.
Практические выводы и дальнейшие шаги
Если вы дочитали до этого момента, у вас есть рабочий шаблон для замены nvinfer собственным элементом вывода на Python, и, что более важно, вы понимаете, почему каждый элемент устроен именно так.
Данная модель является обобщающей. Все описанное здесь: каркас плагина, пакетная предварительная обработка и прикрепление метаданных — не зависит от модели. Замена Ultralytics YOLO на rfdetr от Roboflow не представляет сложности, а структура GStreamer и pyservicemaker остается идентичной. То же самое справедливо и для более экзотических архитектур: в собственном репозитории NVIDIA deepstream_reference_apps есть рабочий пример интеграции модели «зрение-язык» через vLLM с использованием именно этого подхода с плагинами, который стоит изучить, если вы стремитесь выйти за рамки обнаружения и перейти к пониманию видео.
Полный код плагина доступен в виде GitHub Gist. Если вы создадите на его основе что-нибудь: другую модель, многопотоковую конфигурацию или интеграцию с VLM, мне будет интересно узнать о результатах. Удачного кодирования!
Давид Редо Ньето Посмотреть все работы Давида Редо Ньето
Источник: towardsdatascience.com
Похожие записи
Оцените материал:
Присоединяйтесь и подпишитесь на рассылку самых свежих новостей по Email
Получайте свежие новости и идеи на почту. Без спама — только самое интересное.
Нажимая «Подписаться», вы соглашаетесь с политикой конфиденциальности.
