Пример исследования методов максимизации кластеризации.
Делиться

Введение
Мы хотим спрогнозировать непрерывную переменную для четырех разных продуктов. Конвейер машинного обучения был создан в Databricks и состоит из двух основных компонентов.
- Подготовка объектов в SQL с использованием бессерверных вычислений.
- Вывод результатов на ансамбле из нескольких сотен моделей с использованием кластеров заданий для контроля над вычислительной мощностью.
В нашей первой попытке кластер из 420 ядер потратил почти 10 часов на обработку всего 18 разделов.
Цель состоит в том, чтобы настроить поток данных для максимального использования кластера и обеспечения масштабируемости. Вывод результатов осуществляется на четырех наборах моделей машинного обучения, по одному набору на каждый продукт. Однако мы сосредоточимся на том, как сохраняются данные, поскольку это покажет, какой уровень параллелизма мы можем использовать для вывода результатов. Мы не будем рассматривать внутренние механизмы самого вывода результатов.
Если разделов файла слишком мало, кластеру потребуется много времени для сканирования больших файлов, и в этом случае, если не будет произведено перераспределение разделов (что означает увеличение задержки в сети и перетасовку данных), вам, возможно, придется выполнять вычисления на большом количестве строк в каждом разделе. Это также приведет к увеличению времени выполнения.

Однако у бизнеса мало терпения для внедрения конвейеров машинного обучения, оказывающих непосредственное влияние на организацию. Поэтому тестирование ограничено.
В этой статье мы рассмотрим структуру наших данных признаков, затем представим обзор процесса вывода машинного обучения и обсудим результаты и производительность вывода на основе четырех сценариев обработки наборов данных:
- Разделенная таблица, без соли, без ограничения количества строк в разделах (без соли и с разделами).
- Разделенная таблица, с добавлением соли, с ограничением в 1 миллион строк (salty and Partitioned)
- Таблица с кластеризацией Liquid, без соли, без ограничения количества строк в разделах (без соли и с Liquid).
- Таблица с жидкими кластерами, с добавлением соли, с ограничением в 1 миллион строк (соленые и жидкие).
Ландшафт данных
Данный набор данных содержит признаки, которые используются моделями машинного обучения для вывода результатов. Он содержит около 550 миллионов строк и включает четыре продукта, идентифицированных по атрибуту ProductLine :
- Продукт А: ~10,45 млн. (1,9%)
- Продукт B: ~4,4 млн. (0,8%)
- Продукт C: ~100 млн. (17,6%)
- Продукт D: ~354 млн. (79,7%)
Затем следует еще один атрибут с низкой кардинальностью, attrB, который содержит только два различных значения и используется в качестве фильтра для извлечения подмножеств набора данных для каждой части системы машинного обучения.
Кроме того, RunDate записывает дату создания признаков. Данные добавляются только в конец файла. Наконец, набор данных считывается с помощью следующего запроса:
SELECT Id, ProductLine, AttrB, AttrC, RunDate, {model_features} FROM catalog.schema.FeatureStore WHERE ProductLine = :product AND AttrB = :attributeB AND RunDate = :RunDateВнедрение Salt
Здесь распределение данных происходит динамически. Его цель — распределить данные в соответствии с объемами. Это означает, что крупные продукты получают больше сегментов, а мелкие — меньше. Например, продукт D должен получить около 80% сегментов, учитывая пропорции в массиве данных.
Мы делаем это для того, чтобы обеспечить предсказуемое время выполнения вычислений и максимально эффективно использовать кластер.
# Calculate percentage of each (ProductLine, AttrB) based on row counts brand_cat_counts = df_demand_price_grid_load.groupBy( "ProductLine", "AttrB" ).count() total_count = df_demand_price_grid_load.count() brand_cat_percents = brand_cat_counts.withColumn( "percent", F.col("count") / F.lit(total_count) ) # Collect percentages as dicts with string keys (this will later determine # the number of salt buckets each product receives brand_cat_percent_dict = { f"{row['ProductLine']}|{row['AttrB']}": row['percent'] for row in brand_cat_percents.collect() } # Collect counts as dicts with string keys (this will help # to add an additional bucket if counts is not divisible by the number of # buckets for the product brand_cat_count_dict = { f"{row['ProductLine']}|{row['AttrB']}": row['count'] for row in brand_cat_percents.collect() } # Helper to flatten key-value pairs for create_map def dict_to_map_expr(d): expr = [] for k, v in d.items(): expr.append(F.lit(k)) expr.append(F.lit(v)) return expr percent_case = F.create_map(*dict_to_map_expr(brand_cat_percent_dict)) count_case = F.create_map(*dict_to_map_expr(brand_cat_count_dict)) # Add string key column in pyspark df_demand_price_grid_load = df_demand_price_grid_load.withColumn( "product_cat_key", F.concat_ws("|", F.col("ProductLine"), F.col("AttrB")) ) df_demand_price_grid_load = df_demand_price_grid_load.withColumn( "percent", percent_case.getItem(F.col("product_cat_key")) ).withColumn( "product_count", count_case.getItem(F.col("product_cat_key")) ) # Set min/max buckets min_buckets = 10 max_buckets = 1160 # Calculate buckets per row based on (BrandName, price_delta_cat) percentage df_demand_price_grid_load = df_demand_price_grid_load.withColumn( "buckets_base", (F.lit(min_buckets) + (F.col("percent") * (max_buckets - min_buckets))).cast("int") ) # Add an extra bucket if brand_count is not divisible by buckets_base df_demand_price_grid_load = df_demand_price_grid_load.withColumn( "buckets", F.when( (F.col("product_count") % F.col("buckets_base")) != 0, F.col("buckets_base") + 1 ).otherwise(F.col("buckets_base")) ) # Generate salt per row based on (ProductLine, AttrB) bucket count df_demand_price_grid_load = df_demand_price_grid_load.withColumn( "salt", (F.rand(seed=42) * F.col("buckets")).cast("int") ) # Perform the repartition using the core attributes and the salt column df_demand_price_grid_load = df_demand_price_grid_load.repartition( 1200, "AttrB", "ProductLine", "salt" ).drop("product_cat_key", "percent", "brand_count", "buckets_base", "buckets", "salt")Наконец, мы сохраняем наш набор данных в таблицу признаков и добавляем максимальное количество строк в каждую секцию. Это необходимо для предотвращения создания Spark секций со слишком большим количеством строк, что может произойти даже после вычисления соли.
Почему мы устанавливаем ограничение в 1 миллион строк? Основное внимание уделяется времени выполнения модели, а не размеру файла. После нескольких тестов с 1, 1,5 и 2 миллионами строк, первый вариант показал наилучшие результаты в нашем случае. Опять же, бюджет и сроки этого проекта очень ограничены, поэтому нам нужно максимально эффективно использовать имеющиеся ресурсы.
df_demand_price_grid_load.write .mode("overwrite") .option("replaceWhere", f"RunDate = '{params['RunDate']}'") .option("maxRecordsPerFile", 1_000_000) .partitionBy("RunDate", "price_delta_cat", "BrandName") .saveAsTable(f"{params['catalog_revauto']}.{params['schema_revenueautomation']}.demand_features_price_grid")Почему бы просто не положиться на адаптивное выполнение запросов (AQE) в Spark?
Напомним, что основное внимание уделяется времени выполнения, а не измерениям, оптимизированным для обычных запросов Spark SQL, таким как размер файла. Использование только AQE было нашей первоначальной попыткой. Как вы увидите в результатах, время выполнения оказалось крайне нежелательным и не позволило максимально эффективно использовать кластер, учитывая объем наших данных.
Вывод машинного обучения
Существует конвейер, состоящий из 4 задач, по одной на каждый продукт. Каждая задача выполняет следующие общие шаги:
- Загружает функции соответствующего продукта.
- Загружает подмножество моделей машинного обучения для соответствующего продукта.
- Выполняет вывод в половине подмножества, выделенного с помощью
AttrB - Выполняет вывод в другой половине, разделенной по атрибуту
AttrB - Сохраняет данные в таблицу результатов.
Чтобы не перегружать статью цифрами, мы сосредоточимся на одном из этапов вывода, хотя другой этап очень похож по структуре и результатам. Кроме того, на рис. 2 вы можете увидеть направленный ациклический граф (DAG) для оцениваемого вывода.

На первый взгляд, все кажется очень простым, но время выполнения может варьироваться в зависимости от способа сохранения данных и размера кластера.
Конфигурация кластера
На этапе вывода, который мы анализируем, для каждого продукта используется один кластер, настроенный с учетом ограничений инфраструктуры проекта, а также распределения данных:
- Продукт A: 35 рабочих процессов (Standard_DS14v2, 420 ядер)
- Продукт B: 5 рабочих процессов (Standard_DS14v2, 70 ядер)
- Продукт C: 1 рабочий процесс (Standard_DS14v2, 14 ядер)
- Продукт D: 1 рабочий процесс (Standard_DS14v2, 14 ядер)
Кроме того, по умолчанию включена функция AdaptiveQueryExecution, которая позволит Spark самостоятельно определять наилучший способ сохранения данных с учетом предоставленного вами контекста.
Результаты и обсуждение
Для каждого сценария вы увидите отображение количества разделов файлов на продукт и среднего количества строк в каждом разделе, что даст вам представление о том, сколько строк система машинного обучения будет обрабатывать за одну задачу Spark. Кроме того, мы представим метрики Spark UI для наблюдения за производительностью во время выполнения и анализа распределения данных во время обработки. Мы рассмотрим только часть, касающуюся Spark UI, для продукта D, который является самым большим, чтобы не включать избыточную информацию. Кроме того, в зависимости от сценария обработка данных для продукта D становится узким местом во время выполнения. Это еще одна причина, по которой именно на этом аспекте был сделан основной акцент в результатах.
Несоленый и разделенный
На рис. 3 видно, что средний размер раздела файла составляет десятки миллионов строк, что означает значительное время выполнения для одного исполнителя. Наибольшее среднее количество строк в одном разделе приходится на продукт C — более 45 миллионов. Наименьшее — продукт B, содержащий в среднем около 12 миллионов строк.

На рис. 4 показано количество разделов для каждого продукта, всего их 26. При проверке продукта D количество разделов, равное 18, значительно меньше имеющихся 420 ядер, и в среднем каждый раздел будет выполнять вывод примерно на 40 миллионах строк.

Взгляните на рис. 5. В общей сложности кластер потратил 9,9 часов, и работа так и не была завершена, поскольку нам пришлось прервать задачу, так как она становилась слишком ресурсоемкой и блокировала тесты других пользователей.

Из сводной статистики на рис. 6 для задач, которые были завершены, видно, что в разделах для продукта D наблюдалось сильное неравномерное распределение. Максимальный размер входных данных составлял ~56 МБ, а время выполнения — 7,8 ч.

Несоленый и жидкий
В этом сценарии мы можем наблюдать очень похожие результаты с точки зрения среднего количества строк на раздел файла и количества разделов на продукт, как показано на рис. 7 и рис. 8 соответственно.

Продукт D имеет 19 файловых разделов, что по-прежнему очень мало для 420 ядер.

Уже сейчас понятно, что этот эксперимент будет очень дорогостоящим, поэтому я решил пропустить проверку выводов для этого сценария. Опять же, в идеальной ситуации мы бы продолжили его, но у меня накопилось много невыполненных задач.
Соленый и разделенный
После применения процесса добавления соли и перераспределения мы получаем в среднем около 2,5 млн записей на каждую секцию для продуктов A и B и около 1 млн для продуктов C и D, как показано на рис. 9.

Кроме того, на рис. 10 видно, что количество разделов файла увеличилось примерно до 860 для продукта D, что составляет 430 для каждого этапа вывода.

В результате время выполнения операции вывода продукта D с 360 задачами составляет 3 часа, как показано на рис. 11.

Анализ сводной статистики на рис. 12 показывает, что распределение выглядит сбалансированным, со средним временем выполнения около 1,7, но максимальная задача занимает 3 часа, что заслуживает дальнейшего изучения в будущем.

Одно из главных преимуществ заключается в том, что «соль» распределяет данные в соответствии с пропорциями продуктов. Если бы у нас было больше доступных ресурсов, мы могли бы увеличить количество разделов перемешивания в repartition() и добавить обработчиков в соответствии с пропорциями данных. Это гарантирует предсказуемое масштабирование нашего процесса.
Соленые и жидкие
В этом сценарии объединены два самых мощных рычага, которые мы исследовали до сих пор:
Использование «соли» для контроля размера файлов и параллелизма, а также кластеризация Liquid для размещения связанных данных в одном месте без жестких границ разделов.
После применения той же стратегии добавления соли и ограничения в 1 миллион строк на раздел, таблица, кластеризованная с использованием жидкой смеси, показывает очень похожий средний размер раздела на случай добавления соли и разделения на разделы, как показано на рис. 13. Продукты C и D остаются близкими к целевому значению в 1 миллион строк, в то время как продукты A и B немного превышают этот порог.

Однако основное различие заключается в том, как эти разделы распределяются и используются Spark. Как показано на рис. 14, продукт D снова достигает большого количества файловых разделов, обеспечивая достаточный параллелизм для насыщения доступных ядер во время вывода.

В отличие от секционированного подхода, кластеризация Liquid позволяет Spark адаптировать структуру файлов с течением времени, сохраняя при этом преимущества использования «соли». Это приводит к более равномерному распределению работы между исполнителями, с меньшим количеством экстремальных выбросов как по размеру входных данных, так и по продолжительности задач.
Из сводной статистики на рис. 15 видно, что большинство задач выполняется в течение короткого временного окна, а максимальная продолжительность выполнения задачи ниже, чем в сценарии с «солью» и разделением кластера. Это указывает на уменьшение неравномерности распределения нагрузки и лучшее балансирование нагрузки в кластере.


Важным побочным эффектом является то, что кластеризация Liquid сохраняет локальность данных для отфильтрованных столбцов без принудительного соблюдения строгих границ разделов. Это позволяет Spark по-прежнему использовать преимущества пропуска данных, в то время как соль гарантирует, что ни один исполнитель не будет перегружен десятками миллионов строк.
В целом, конфигурация с использованием salty и liquid оказывается наиболее надежной: она максимизирует параллелизм, минимизирует неравномерность распределения ресурсов и снижает операционные риски при увеличении нагрузки на вывод данных или изменении конфигурации кластера.
Основные выводы
- Масштабируемость вывода часто ограничивается структурой данных, а не сложностью модели. Недостаточно большие разделы файлов могут привести к простою сотен ядер, в то время как несколько исполнителей обрабатывают десятки миллионов строк.
- Одного лишь разделения данных недостаточно для крупномасштабного вывода. Без контроля размера файлов разделенные таблицы могут создавать огромные разделы, что приводит к длительным и неравномерным задачам.
- Добавление соли — эффективный инструмент для обеспечения параллелизма. Введение ключа соли и ограничение количества строк на раздел значительно увеличивает число выполняемых задач и стабилизирует время выполнения.
- Кластеризация Liquid дополняет функцию добавления соли, уменьшая неравномерность распределения данных без жестких границ. Она позволяет Spark адаптировать структуру файлов с течением времени, делая систему более отказоустойчивой по мере роста объема данных.
Гектор Мехия. Все материалы от Гектора Мехии.
Источник: towardsdatascience.com






















