Подробный анализ узких мест в передаче данных, их выявление и устранение с помощью систем NVIDIA Nsight™ – часть 2.
Делиться

Это продолжение статьи «Оптимизация передачи данных в рабочих нагрузках ИИ/машинного обучения», где мы продемонстрировали использование NVIDIA Nsight™ Systems (nsys) для изучения и решения распространенной проблемы узкого места при загрузке данных — ситуаций, когда графический процессор простаивает, ожидая входных данных от центрального процессора. В этой статье мы сосредоточимся на передаче данных в обратном направлении, от графического процессора к центральному процессору. Более конкретно, мы рассмотрим рабочие нагрузки вывода ИИ/машинного обучения, где размер выходных данных, возвращаемых моделью, относительно велик. Типичные примеры включают: 1) запуск модели сегментации сцены (попиксельная разметка) на пакетах изображений высокого разрешения и 2) захват многомерных векторных представлений признаков входных последовательностей с использованием модели кодировщика (например, для создания векторной базы данных). Оба примера включают выполнение модели на входном пакете, а затем копирование выходного тензора с графического процессора на центральный процессор для дополнительной обработки, хранения и/или передачи по сети.
Копирование выходных данных модели из памяти графического процессора в память центрального процессора обычно получает гораздо меньше внимания в руководствах по оптимизации, чем копирование данных из центрального процессора в графический процессор, которые используются для передачи модели (например, см. здесь). Но их потенциальное влияние на эффективность модели и затраты на выполнение может быть столь же пагубным. Более того, хотя оптимизации загрузки данных из центрального процессора в графический процессор хорошо документированы и легко реализуемы, оптимизация копирования данных в обратном направлении требует немного больше ручной работы.
В этом посте мы применим ту же стратегию, что и в предыдущем: мы определим тестовую модель и используем профилировщик nsys для выявления и устранения узких мест в производительности. Мы будем проводить эксперименты на экземпляре Amazon EC2 g6e.2xlarge (с графическим процессором NVIDIA L40S), работающем под управлением образа AMI AWS Deep Learning (Ubuntu 24.04) с PyTorch (2.8), профилировщиком nsys-cli (версия 2025.6.1) и библиотекой NVIDIA Tools Extension (NVTX).
Отказ от ответственности
Представленный код предназначен для демонстрационных целей; пожалуйста, не полагайтесь на его корректность или оптимальность. Не следует воспринимать использование нами каких-либо библиотек, инструментов или платформ как одобрение их использования. Влияние рассматриваемых оптимизаций может значительно варьироваться в зависимости от особенностей модели и среды выполнения. Перед внедрением обязательно оцените их влияние на ваш собственный сценарий использования.
Большое спасибо Ицхаку Леви и Гиладу Вассерману за их вклад в эту статью.
Игрушечная модель PyTorch
Мы представляем скрипт пакетного вывода, который выполняет сегментацию изображений на синтетическом наборе данных с использованием модели DeepLabV3 с архитектурой ResNet-50. Выходные данные модели копируются на ЦП для постобработки и хранения. Мы дополняем различные этапы вывода цветовыми аннотациями nvtx:
import time, torch, nvtx from torch.utils.data import Dataset, DataLoader from torch.cuda import profiler from torchvision.models.segmentation import deeplabv3_resnet50 DEVICE = «cuda» WARMUP_STEPS = 10 PROFILE_STEPS = 3 COOLDOWN_STEPS = 1 TOTAL_STEPS = WARMUP_STEPS + PROFILE_STEPS + COOLDOWN_STEPS BATCH_SIZE = 64 TOTAL_SAMPLES = TOTAL_STEPS * BATCH_SIZE IMG_SIZE = 512 N_CLASSES = 21 NUM_WORKERS = 8 ASYNC_DATALOAD = True # Синтетический набор данных со случайными изображениями class FakeDataset(Dataset): def __len__(self): return TOTAL_SAMPLES def __getitem__(self, index): img = torch.randn((3, IMG_SIZE, IMG_SIZE)) return img # вспомогательный класс для предварительной выборки данных на GPU class DataPrefetcher: def __init__(self, loader): self.loader = iter(loader) self.stream = torch.cuda.Stream() self.next_batch = None self.preload() def preload(self): try: data = next(self.loader) with torch.cuda.stream(self.stream): next_data = data.to(DEVICE, non_blocking=ASYNC_DATALOAD) self.next_batch = next_data except: self.next_batch = None def __iter__(self): return self def __next__(self): torch.cuda.current_stream().wait_stream(self.stream) data = self.next_batch self.preload() return data model = deeplabv3_resnet50(weights_backbone=None).to(DEVICE).eval() data_loader = DataLoader( FakeDataset(), batch_size=BATCH_SIZE, num_workers=NUM_WORKERS, pin_memory=ASYNC_DATALOAD ) data_iter = DataPrefetcher(data_loader) def synchronize_all(): torch.cuda.synchronize() def to_cpu(output): return output.cpu() def process_output(batch_id, logits): # выполнить постобработку выходных данных with open('/dev/null', 'wb') as f: f.write(logits.numpy().tobytes()) with torch.inference_mode(): for i in range(TOTAL_STEPS): if i == WARMUP_STEPS: synchronize_all() start_time = time.perf_counter() profiler.start() elif i == WARMUP_STEPS + PROFILE_STEPS: synchronize_all() profiler.stop() end_time = time.perf_counter() with nvtx.annotate(f»Пакет {i}», color=»blue»): with nvtx.annotate(«Получить пакет», color=»red»): batch = next(data_iter) with nvtx.annotate(«Вычислить», color=»green»): output = model(batch) with nvtx.annotate(«Копировать в ЦП», color=»yellow»): output_cpu = to_cpu(output['out']) with nvtx.annotate(«Обработать вывод», color=»cyan»): process_output(i, output_cpu) total_time = end_time — start_time throughput = PROFILE_STEPS / total_time print(f»Пропускная способность: {throughput:.2f} шагов/сек»)
Обратите внимание на включение всех оптимизаций загрузки данных с ЦП на ГП, о которых мы говорили в нашей предыдущей публикации.
Для получения трассировки профилирования nsys мы выполняем следующую команду:
nsys profile —capture-range=cudaProfilerApi —trace=cuda,nvtx,osrt —output=baseline python batch_infer.py
В результате получается файл трассировки baseline.nsys-rep, который мы копируем на нашу машину для разработки для анализа.
Для измерения пропускной способности процесса вывода мы увеличиваем количество шагов до 100. Средняя пропускная способность нашего базового эксперимента составляет 0,45 шагов в секунду. В следующих разделах мы будем использовать трассировки профилирования nsys для постепенного улучшения этого результата.
Базовый анализ производительности
На изображении ниже показана трассировка профиля nsys нашего базового эксперимента:

В разделе, посвященном графическому процессору, мы видим следующую повторяющуюся закономерность:
- Блок вычислений ядра (выделен светло-голубым цветом), выполняющийся примерно 520 миллисекунд.
- Небольшой блок копирования памяти с хоста на устройство (выделен зеленым цветом), выполняющийся параллельно с вычислениями ядра. Такая параллельность была достигнута с помощью оптимизаций, описанных в нашей предыдущей публикации.
- Блок копирования данных из памяти устройства в память хоста (выделен красным), выполняющийся в течение примерно 750 миллисекунд.
- Между каждыми двумя шагами наблюдается длительный период простоя графического процессора (~940 миллисекунд) (пустые промежутки времени).
Глядя на полосу NVTX в разделе ЦП, мы видим, что пробелы идеально совпадают с блоком «вывод процесса» (голубого цвета). В нашей первоначальной реализации и выполнение модели, и функция сохранения выходных данных выполняются в одном и том же процессе последовательно. Это приводит к значительному времени простоя на графическом процессоре, поскольку ЦП ожидает возврата результата функции сохранения, прежде чем передать графическому процессору следующую партию данных.
Оптимизация 1: Многопроцессная обработка выходных данных
Первым шагом является запуск функции сохранения выходных данных в параллельных рабочих процессах. Аналогичный шаг мы предприняли в нашей предыдущей публикации, когда перенесли последовательность подготовки входных пакетов на выделенные рабочие процессы. Однако, если там нам удалось автоматизировать многопроцессную загрузку данных, просто установив аргумент num_workers класса DataLoader в ненулевое значение, то применение многопроцессной обработки выходных данных требует ручной реализации. Здесь мы выбираем простое решение для демонстрационных целей. Его следует адаптировать под ваши потребности и предпочтения в проектировании.
Многопроцессорная обработка PyTorch
Мы реализуем стратегию «производитель-потребитель», используя встроенный в PyTorch пакет multiprocessing, torch.multiprocessing. Мы определяем очередь для хранения выходных пакетов и несколько рабочих процессов-потребителей, которые обрабатывают пакеты из очереди. Мы модифицируем наш цикл вывода, чтобы помещать выходные буферы в выходную очередь. Мы также обновляем утилиту synchronize_all(), чтобы она очищала очередь и добавляла последовательность очистки в конце скрипта.
Следующий блок кода содержит нашу первоначальную реализацию. Как мы увидим в следующих разделах, для достижения максимальной производительности потребуется некоторая оптимизация.
import torch.multiprocessing as mp POSTPROC_WORKERS = 8 # Настройка для оптимальной пропускной способности output_queue = mp.JoinableQueue(maxsize=POSTPROC_WORKERS) def output_worker(in_q): while True: item = in_q.get() if item is None: break # Сигнал к завершению batch_id, batch_preds = item process_output(batch_id, batch_preds) in_q.task_done() processes = [] for _ in range(POSTPROC_WORKERS): p = mp.Process(target=output_worker, args=(output_queue,)) p.start() processes.append(p) def synchronize_all(): torch.cuda.synchronize() output_queue.join() # Очистка очереди с помощью torch.inference_mode(): for i in range(TOTAL_STEPS): if i == WARMUP_STEPS: synchronize_all() start_time = time.perf_counter() profiler.start() elif i == WARMUP_STEPS + PROFILE_STEPS: synchronize_all() profiler.stop() end_time = time.perf_counter() with nvtx.annotate(f»Batch {i}», color=»blue»): with nvtx.annotate(«get batch», color=»red»): batch = next(data_iter) with nvtx.annotate(«compute», color=»green»): output = model(batch) with nvtx.annotate(«copy to CPU», color=»yellow»): output_cpu = to_cpu(output['out']) with nvtx.annotate(«queue output», color=»cyan»): output_queue.put((i, output_cpu)) total_time = end_time — start_time throughput = PROFILE_STEPS / total_time print(f»Пропускная способность: {throughput:.2f} шагов/сек») # очистка for _ in range(POSTPROC_WORKERS): output_queue.put(None)
Оптимизация обработки выходных данных с участием нескольких работников приводит к производительности в 0,71 шага в секунду — это на 58% больше, чем в базовом варианте.
Повторный запуск команды nsys приводит к следующему результату трассировки профилирования:

Мы видим, что размер блока пробелов значительно уменьшился (с ~940 миллисекунд до ~50). Если бы мы увеличили масштаб оставшегося пробела, мы бы обнаружили, что он выровнен по операции «munmap». В нашей предыдущей публикации это же открытие легло в основу оптимизации асинхронного копирования данных. Но на этот раз мы используем промежуточный этап оптимизации памяти в виде предварительно выделенного пула буферов.
Оптимизация 2: Предварительное выделение буферного пула
Чтобы уменьшить накладные расходы на выделение и управление новым тензором на ЦП на каждой итерации, мы инициализируем пул тензоров, предварительно выделенных в общей памяти, и определяем вторую очередь для управления их использованием.
Обновленный код представлен ниже:
shape = (BATCH_SIZE, N_CLASSES, IMG_SIZE, IMG_SIZE) buffer_pool = [torch.empty(shape).share_memory_() for _ in range(POSTPROC_WORKERS)] buf_queue = mp.Queue() for i in range(POSTPROC_WORKERS): buf_queue.put(i) def output_worker(buffer_pool, in_q, buf_q): while True: item = in_q.get() if item is None: break # сигнал к завершению batch_id, buf_id = item process_output(batch_id, buffer_pool[buf_id]) buf_q.put(buf_id) in_q.task_done() processes = [] for _ in range(POSTPROC_WORKERS): p = mp.Process(target=output_worker, args=(buffer_pool,output_queue,buf_queue)) p.start() processes.append(p) def to_cpu(output): buf_id = buf_queue.get() output_cpu = buffer_pool[buf_id] output_cpu.copy_(output) return output_cpu, buf_id with torch.inference_mode(): for i in range(TOTAL_STEPS): if i == WARMUP_STEPS: synchronize_all() start_time = time.perf_counter() profiler.start() elif i == WARMUP_STEPS + PROFILE_STEPS: synchronize_all() profiler.stop() end_time = time.perf_counter() with nvtx.annotate(f»Batch {i}», color=»blue»): with nvtx.annotate(«get batch», color=»red»): batch = next(data_iter) with nvtx.annotate(«compute», color=»green»): output = model(batch) with nvtx.annotate(«copy to CPU», color=»yellow»): output_cpu, buf_id = to_cpu(output['out']) with nvtx.annotate(«queue output», color=»cyan»): output_queue.put((i, buf_id))
После этих изменений пропускная способность вывода возрастает до 1,51 — более чем в 2 раза по сравнению с нашим предыдущим результатом.
Ниже представлен новый трассировочный профиль:

Мало того, что пробелы практически исчезли, так ещё и время выполнения операции CUDA DtoH (выделено красным) сократилось с ~750 миллисекунд до ~110. Предположительно, большое копирование данных с GPU на CPU требовало значительных накладных расходов на управление памятью, которые мы устранили, внедрив выделенный буферный пул.
Несмотря на значительное улучшение, если мы увеличим масштаб, то обнаружим, что остается около ~0,5 миллисекунды пустого пространства, вызванного синхронностью команды копирования с графического процессора на центральный процессор — пока копирование не завершится, центральный процессор не запустит вычисления ядра для следующей партии данных.
Оптимизация 3: Асинхронное копирование данных
Наша третья оптимизация заключается в изменении способа копирования данных с устройства на хост на асинхронный. Как и прежде, мы обнаружим, что реализация этого изменения сложнее, чем при копировании с ЦП на ГП.
Первый шаг — передать параметр non_blocking=True команде копирования с графического процессора на центральный процессор.
def to_cpu(output): buf_id = buf_queue.get() output_cpu = buffer_pool[buf_id] output_cpu.copy_(output, non_blocking=True) return output_cpu, buf_id
Однако, как мы видели в нашем предыдущем посте, это изменение не окажет существенного влияния, если мы не изменим наши тензоры, чтобы они использовали закрепленную память:
форма = (BATCH_SIZE, N_CLASSES, IMG_SIZE, IMG_SIZE) buffer_pool = [torch.empty(shape, pin_memory=True).share_memory_() for _ in range(POSTPROC_WORKERS)]
Крайне важно отметить, что если мы внесем в наш скрипт только эти два изменения, пропускная способность увеличится, но выходные данные могут быть искажены (например, см. здесь). Нам необходим механизм, основанный на событиях, для идентификации каждого момента завершения копирования с графического процессора на центральный процессор, чтобы мы могли продолжить обработку выходных данных. (Обратите внимание, что это не требовалось при асинхронном копировании с центрального процессора на графический. Поскольку один поток данных на графическом процессоре обрабатывает команды последовательно, вычисления ядра начинаются только после завершения копирования. Синхронизация требовалась только при введении второго потока.)
Для реализации механизма уведомлений мы определяем пул событий CUDA и дополнительную очередь для управления их использованием. Кроме того, мы определяем поток-слушатель для мониторинга состояния событий в очереди и заполнения выходной очереди после завершения копирования.
import threading, queue event_pool = [torch.cuda.Event() for _ in range(POSTPROC_WORKERS)] event_queue = queue.Queue() def event_monitor(event_pool, event_queue, output_queue): while True: item = event_queue.get() if item is None: break batch_id, buf_idx = item event_pool[buf_idx].synchronize() output_queue.put((batch_id, buf_idx)) event_queue.task_done() monitor = threading.Thread(target=event_monitor, args=(event_pool, event_queue, output_queue)) monitor.start()
Обновленная последовательность вывода состоит из следующих шагов:
- Получите входной пакет, предварительно загруженный на графический процессор.
- Выполните модель на входном пакете данных, чтобы получить выходной тензор на графическом процессоре.
- Запросите свободный буфер ЦП из очереди буферов и используйте его для запуска асинхронного копирования данных. Настройте событие, которое будет срабатывать по завершении копирования, и добавьте это событие в очередь событий.
- Поток мониторинга ожидает срабатывания события, а затем помещает выходной тензор в очередь вывода для обработки.
- Рабочий поток извлекает выходной тензор из очереди и сохраняет его на диск. Затем он возвращает буфер обратно в очередь буферов.
Обновленный код представлен ниже.
def synchronize_all(): torch.cuda.synchronize() event_queue.join() output_queue.join() with torch.inference_mode(): for i in range(TOTAL_STEPS): if i == WARMUP_STEPS: synchronize_all() start_time = time.perf_counter() profiler.start() elif i == WARMUP_STEPS + PROFILE_STEPS: synchronize_all() profiler.stop() end_time = time.perf_counter() with nvtx.annotate(f»Batch {i}», color=»blue»): with nvtx.annotate(«get batch», color=»red»): batch = next(data_iter) with nvtx.annotate(«compute», color=»green»): output = model(batch) with nvtx.annotate(«copy to CPU», color=»yellow»): output_cpu, buf_id = to_cpu(output['out']) with nvtx.annotate(«queue CUDA event», color=»cyan»): event_pool[buf_id].record() event_queue.put((i, buf_id)) total_time = end_time — start_time throughput = PROFILE_STEPS / total_time print(f»Throughput: {throughput:.2f} steps/sec») # cleanup event_queue.put(None) for _ in range(POSTPROC_WORKERS): output_queue.put(None)
В результате пропускная способность составляет 1,55 шагов в секунду.
Ниже представлен новый трассировочный профиль:

В строке NVTX раздела CPU мы видим, что все операции цикла вывода сгруппированы слева — это означает, что все они выполнялись немедленно и асинхронно. Мы также видим вызовы синхронизации событий (светло-зеленым цветом), работающие в выделенном потоке мониторинга. В разделе GPU мы видим, что вычисления ядра начинаются сразу после завершения копирования с устройства на хост.
Наша заключительная оптимизация будет сосредоточена на улучшении распараллеливания операций ядра и памяти на графическом процессоре.
Оптимизация 4: Конвейерная обработка с использованием потоков CUDA
Как и в нашем предыдущем посте, мы хотим использовать преимущества независимых механизмов копирования памяти (DMA) и вычислений ядра (SM). Мы делаем это, назначая копирование памяти выделенному потоку CUDA:
egress_stream = torch.cuda.Stream() with torch.inference_mode(): for i in range(TOTAL_STEPS): if i == WARMUP_STEPS: synchronize_all() start_time = time.perf_counter() profiler.start() elif i == WARMUP_STEPS + PROFILE_STEPS: synchronize_all() profiler.stop() end_time = time.perf_counter() with nvtx.annotate(f»Batch {i}», color=»blue»): with nvtx.annotate(«get batch», color=»red»): batch = next(data_iter) with nvtx.annotate(«compute», color=»green»): output = model(batch) # on separate stream with torch.cuda.stream(egress_stream): # wait for default stream to complete compute egress_stream.wait_stream(torch.cuda.default_stream()) with nvtx.annotate(«copy to CPU», color=»yellow»): output_cpu, buf_id = to_cpu(output['out']) with nvtx.annotate(«queue CUDA event», color=»cyan»): event_pool[buf_id].record(egress_stream) event_queue.put((i, buf_id))
В результате достигается пропускная способность в 1,85 шагов в секунду — дополнительное улучшение на 19,3% по сравнению с нашим предыдущим экспериментом.
Итоговый результат трассировки профиля представлен ниже:

В разделе GPU мы видим непрерывный блок вычислений ядра (светло-голубым цветом), при этом параллельно выполняются вычисления между хостом и устройством (светло-зеленым цветом) и устройством и хостом (фиолетовым цветом). Наш цикл вывода теперь ограничен вычислительными ресурсами, что означает, что мы исчерпали все практические возможности для оптимизации передачи данных.
Результаты
Результаты нашего исследования представлены в следующей таблице:

Благодаря использованию профилировщика nsys нам удалось повысить эффективность более чем в 4 раза. Естественно, влияние обсуждаемых нами оптимизаций будет варьироваться в зависимости от особенностей модели и среды выполнения.
Краткое содержание
На этом завершается вторая часть нашей серии статей по теме оптимизации передачи данных в задачах искусственного интеллекта/машинного обучения. Первая часть была посвящена копированию данных с хоста на устройство, а вторая — с устройства на хост. При наивной реализации передача данных в любом направлении может привести к значительным узким местам в производительности, вызывая нехватку ресурсов графического процессора и увеличение затрат времени выполнения. Используя профилировщик Nsight Systems, мы продемонстрировали, как выявлять и устранять эти узкие места и повышать эффективность во время выполнения.
Хотя оптимизация в обоих направлениях включала схожие шаги, детали реализации существенно различались. Оптимизация передачи данных с ЦП на ГП хорошо поддерживается API загрузки данных PyTorch и требует относительно небольших изменений в цикле выполнения, тогда как оптимизация передачи данных с ГП на ЦП потребовала немного больше работы по разработке программного обеспечения. Важно отметить, что решения, представленные в этом посте, выбраны в демонстрационных целях. Ваше собственное решение может значительно отличаться в зависимости от потребностей вашего проекта и предпочтений в проектировании.
Рассмотрев копирование данных между ЦП и ГП, а также между ГП и ЦП, мы переходим к транзакциям между ГП: следите за обновлениями — в будущем мы опубликуем статью об оптимизации передачи данных между ГП в распределенных задачах обучения.
Источник: towardsdatascience.com



























