Сахар в деревянной миске на деревянной поверхности, рассыпанные кристаллы сахара.

Масштабирование результатов машинного обучения на Databricks: Liquid или Partitioned? С солью или без?

Пример исследования методов максимизации кластеризации.

Делиться

846507fadb92540a64807bb92735a866
Фото Фаран Рауфи на Unsplash

Введение

Мы хотим спрогнозировать непрерывную переменную для четырех разных продуктов. Конвейер машинного обучения был создан в Databricks и состоит из двух основных компонентов.

  1. Подготовка объектов в SQL с использованием бессерверных вычислений.
  2. Вывод результатов на ансамбле из нескольких сотен моделей с использованием кластеров заданий для контроля над вычислительной мощностью.

В нашей первой попытке кластер из 420 ядер потратил почти 10 часов на обработку всего 18 разделов.

Цель состоит в том, чтобы настроить поток данных для максимального использования кластера и обеспечения масштабируемости. Вывод результатов осуществляется на четырех наборах моделей машинного обучения, по одному набору на каждый продукт. Однако мы сосредоточимся на том, как сохраняются данные, поскольку это покажет, какой уровень параллелизма мы можем использовать для вывода результатов. Мы не будем рассматривать внутренние механизмы самого вывода результатов.

Если разделов файла слишком мало, кластеру потребуется много времени для сканирования больших файлов, и в этом случае, если не будет произведено перераспределение разделов (что означает увеличение задержки в сети и перетасовку данных), вам, возможно, придется выполнять вычисления на большом количестве строк в каждом разделе. Это также приведет к увеличению времени выполнения.

b133e76973b49bd9f16cd30ace46d61e
Рис. 1. Не бойтесь добавить немного соли к своим данным, если это необходимо. Фотография Фарана Рауфи на Unsplash.

Однако у бизнеса мало терпения для внедрения конвейеров машинного обучения, оказывающих непосредственное влияние на организацию. Поэтому тестирование ограничено.

В этой статье мы рассмотрим структуру наших данных признаков, затем представим обзор процесса вывода машинного обучения и обсудим результаты и производительность вывода на основе четырех сценариев обработки наборов данных:

  1. Разделенная таблица, без соли, без ограничения количества строк в разделах (без соли и с разделами).
  2. Разделенная таблица, с добавлением соли, с ограничением в 1 миллион строк (salty and Partitioned)
  3. Таблица с кластеризацией Liquid, без соли, без ограничения количества строк в разделах (без соли и с Liquid).
  4. Таблица с жидкими кластерами, с добавлением соли, с ограничением в 1 миллион строк (соленые и жидкие).

Ландшафт данных

Данный набор данных содержит признаки, которые используются моделями машинного обучения для вывода результатов. Он содержит около 550 миллионов строк и включает четыре продукта, идентифицированных по атрибуту ProductLine :

  • Продукт А: ~10,45 млн. (1,9%)
  • Продукт B: ~4,4 млн. (0,8%)
  • Продукт C: ~100 млн. (17,6%)
  • Продукт D: ~354 млн. (79,7%)

Затем следует еще один атрибут с низкой кардинальностью, attrB, который содержит только два различных значения и используется в качестве фильтра для извлечения подмножеств набора данных для каждой части системы машинного обучения.

Кроме того, RunDate записывает дату создания признаков. Данные добавляются только в конец файла. Наконец, набор данных считывается с помощью следующего запроса:

 SELECT Id, ProductLine, AttrB, AttrC, RunDate, {model_features} FROM catalog.schema.FeatureStore WHERE ProductLine = :product AND AttrB = :attributeB AND RunDate = :RunDate

Внедрение Salt

Здесь распределение данных происходит динамически. Его цель — распределить данные в соответствии с объемами. Это означает, что крупные продукты получают больше сегментов, а мелкие — меньше. Например, продукт D должен получить около 80% сегментов, учитывая пропорции в массиве данных.

Мы делаем это для того, чтобы обеспечить предсказуемое время выполнения вычислений и максимально эффективно использовать кластер.

 # Calculate percentage of each (ProductLine, AttrB) based on row counts brand_cat_counts = df_demand_price_grid_load.groupBy( "ProductLine", "AttrB" ).count() total_count = df_demand_price_grid_load.count() brand_cat_percents = brand_cat_counts.withColumn( "percent", F.col("count") / F.lit(total_count) ) # Collect percentages as dicts with string keys (this will later determine # the number of salt buckets each product receives brand_cat_percent_dict = { f"{row['ProductLine']}|{row['AttrB']}": row['percent'] for row in brand_cat_percents.collect() } # Collect counts as dicts with string keys (this will help # to add an additional bucket if counts is not divisible by the number of # buckets for the product brand_cat_count_dict = { f"{row['ProductLine']}|{row['AttrB']}": row['count'] for row in brand_cat_percents.collect() } # Helper to flatten key-value pairs for create_map def dict_to_map_expr(d): expr = [] for k, v in d.items(): expr.append(F.lit(k)) expr.append(F.lit(v)) return expr percent_case = F.create_map(*dict_to_map_expr(brand_cat_percent_dict)) count_case = F.create_map(*dict_to_map_expr(brand_cat_count_dict)) # Add string key column in pyspark df_demand_price_grid_load = df_demand_price_grid_load.withColumn( "product_cat_key", F.concat_ws("|", F.col("ProductLine"), F.col("AttrB")) ) df_demand_price_grid_load = df_demand_price_grid_load.withColumn( "percent", percent_case.getItem(F.col("product_cat_key")) ).withColumn( "product_count", count_case.getItem(F.col("product_cat_key")) ) # Set min/max buckets min_buckets = 10 max_buckets = 1160 # Calculate buckets per row based on (BrandName, price_delta_cat) percentage df_demand_price_grid_load = df_demand_price_grid_load.withColumn( "buckets_base", (F.lit(min_buckets) + (F.col("percent") * (max_buckets - min_buckets))).cast("int") ) # Add an extra bucket if brand_count is not divisible by buckets_base df_demand_price_grid_load = df_demand_price_grid_load.withColumn( "buckets", F.when( (F.col("product_count") % F.col("buckets_base")) != 0, F.col("buckets_base") + 1 ).otherwise(F.col("buckets_base")) ) # Generate salt per row based on (ProductLine, AttrB) bucket count df_demand_price_grid_load = df_demand_price_grid_load.withColumn( "salt", (F.rand(seed=42) * F.col("buckets")).cast("int") ) # Perform the repartition using the core attributes and the salt column df_demand_price_grid_load = df_demand_price_grid_load.repartition( 1200, "AttrB", "ProductLine", "salt" ).drop("product_cat_key", "percent", "brand_count", "buckets_base", "buckets", "salt")

Наконец, мы сохраняем наш набор данных в таблицу признаков и добавляем максимальное количество строк в каждую секцию. Это необходимо для предотвращения создания Spark секций со слишком большим количеством строк, что может произойти даже после вычисления соли.

Почему мы устанавливаем ограничение в 1 миллион строк? Основное внимание уделяется времени выполнения модели, а не размеру файла. После нескольких тестов с 1, 1,5 и 2 миллионами строк, первый вариант показал наилучшие результаты в нашем случае. Опять же, бюджет и сроки этого проекта очень ограничены, поэтому нам нужно максимально эффективно использовать имеющиеся ресурсы.

 df_demand_price_grid_load.write .mode("overwrite") .option("replaceWhere", f"RunDate = '{params['RunDate']}'") .option("maxRecordsPerFile", 1_000_000)  .partitionBy("RunDate", "price_delta_cat", "BrandName")  .saveAsTable(f"{params['catalog_revauto']}.{params['schema_revenueautomation']}.demand_features_price_grid")

Почему бы просто не положиться на адаптивное выполнение запросов (AQE) в Spark?

Напомним, что основное внимание уделяется времени выполнения, а не измерениям, оптимизированным для обычных запросов Spark SQL, таким как размер файла. Использование только AQE было нашей первоначальной попыткой. Как вы увидите в результатах, время выполнения оказалось крайне нежелательным и не позволило максимально эффективно использовать кластер, учитывая объем наших данных.

Вывод машинного обучения

Существует конвейер, состоящий из 4 задач, по одной на каждый продукт. Каждая задача выполняет следующие общие шаги:

  • Загружает функции соответствующего продукта.
  • Загружает подмножество моделей машинного обучения для соответствующего продукта.
  • Выполняет вывод в половине подмножества, выделенного с помощью AttrB
  • Выполняет вывод в другой половине, разделенной по атрибуту AttrB
  • Сохраняет данные в таблицу результатов.

Чтобы не перегружать статью цифрами, мы сосредоточимся на одном из этапов вывода, хотя другой этап очень похож по структуре и результатам. Кроме того, на рис. 2 вы можете увидеть направленный ациклический граф (DAG) для оцениваемого вывода.

a5db07178c222a918483b8b65bce2b4d
Рис. 2. Ациклический граф для этапа вывода машинного обучения с помощью Spark. Авторство принадлежит автору.

На первый взгляд, все кажется очень простым, но время выполнения может варьироваться в зависимости от способа сохранения данных и размера кластера.

Конфигурация кластера

На этапе вывода, который мы анализируем, для каждого продукта используется один кластер, настроенный с учетом ограничений инфраструктуры проекта, а также распределения данных:

  • Продукт A: 35 рабочих процессов (Standard_DS14v2, 420 ядер)
  • Продукт B: 5 рабочих процессов (Standard_DS14v2, 70 ядер)
  • Продукт C: 1 рабочий процесс (Standard_DS14v2, 14 ядер)
  • Продукт D: 1 рабочий процесс (Standard_DS14v2, 14 ядер)

Кроме того, по умолчанию включена функция AdaptiveQueryExecution, которая позволит Spark самостоятельно определять наилучший способ сохранения данных с учетом предоставленного вами контекста.

Результаты и обсуждение

Для каждого сценария вы увидите отображение количества разделов файлов на продукт и среднего количества строк в каждом разделе, что даст вам представление о том, сколько строк система машинного обучения будет обрабатывать за одну задачу Spark. Кроме того, мы представим метрики Spark UI для наблюдения за производительностью во время выполнения и анализа распределения данных во время обработки. Мы рассмотрим только часть, касающуюся Spark UI, для продукта D, который является самым большим, чтобы не включать избыточную информацию. Кроме того, в зависимости от сценария обработка данных для продукта D становится узким местом во время выполнения. Это еще одна причина, по которой именно на этом аспекте был сделан основной акцент в результатах.

Несоленый и разделенный

На рис. 3 видно, что средний размер раздела файла составляет десятки миллионов строк, что означает значительное время выполнения для одного исполнителя. Наибольшее среднее количество строк в одном разделе приходится на продукт C — более 45 миллионов. Наименьшее — продукт B, содержащий в среднем около 12 миллионов строк.

90821618c4ee564fe0174e07963d65f9
Рис. 3. Среднее количество строк в разделе в зависимости от количества продуктов.

На рис. 4 показано количество разделов для каждого продукта, всего их 26. При проверке продукта D количество разделов, равное 18, значительно меньше имеющихся 420 ядер, и в среднем каждый раздел будет выполнять вывод примерно на 40 миллионах строк.

1842e52d49f8068d7aef56d7e4fc2c06
Рис. 4. Общее количество файловых разделов на один продукт.

Взгляните на рис. 5. В общей сложности кластер потратил 9,9 часов, и работа так и не была завершена, поскольку нам пришлось прервать задачу, так как она становилась слишком ресурсоемкой и блокировала тесты других пользователей.

5510bdd83d12833c3d2846009183ae96
Рис. 5. Сводка этапа вывода для разделенного набора данных без добавления соли для продукта D.

Из сводной статистики на рис. 6 для задач, которые были завершены, видно, что в разделах для продукта D наблюдалось сильное неравномерное распределение. Максимальный размер входных данных составлял ~56 МБ, а время выполнения — 7,8 ч.

2aef3ffe5c2bb1d2b379b64b0dd5ec49
Рис. 6. Сводная статистика выводов исполнителей на разделенном и несоленом наборе данных.

Несоленый и жидкий

В этом сценарии мы можем наблюдать очень похожие результаты с точки зрения среднего количества строк на раздел файла и количества разделов на продукт, как показано на рис. 7 и рис. 8 соответственно.

e102903fab4060df30d1bcea834d5a30
Рис. 7. Среднее количество строк в разделе в зависимости от количества продуктов.

Продукт D имеет 19 файловых разделов, что по-прежнему очень мало для 420 ядер.

d64e1a56a97578d265104fc89f5c1a24
Рис. 8. Общее количество файловых разделов на один продукт.

Уже сейчас понятно, что этот эксперимент будет очень дорогостоящим, поэтому я решил пропустить проверку выводов для этого сценария. Опять же, в идеальной ситуации мы бы продолжили его, но у меня накопилось много невыполненных задач.

Соленый и разделенный

После применения процесса добавления соли и перераспределения мы получаем в среднем около 2,5 млн записей на каждую секцию для продуктов A и B и около 1 млн для продуктов C и D, как показано на рис. 9.

1c50fc3de4082ea58e5d095e4cf7721f
Рис. 9. Среднее количество строк в разделе в зависимости от количества продуктов.

Кроме того, на рис. 10 видно, что количество разделов файла увеличилось примерно до 860 для продукта D, что составляет 430 для каждого этапа вывода.

bacd8463f7c90df246ec3f860f3a9fc6
Рис. 10. Общее количество файловых разделов на один продукт.

В результате время выполнения операции вывода продукта D с 360 задачами составляет 3 часа, как показано на рис. 11.

d182764d010b0aadd0f37c268b976761
Рис. 11. Сводка по этапу вывода для разделенного и «соленого» набора данных.

Анализ сводной статистики на рис. 12 показывает, что распределение выглядит сбалансированным, со средним временем выполнения около 1,7, но максимальная задача занимает 3 часа, что заслуживает дальнейшего изучения в будущем.

38db3f28b827ceb537bc3d84b9fdf6af
Рис. 12. Сводная статистика выводов исполнителей на основе разделенного и помеченного солью набора данных.

Одно из главных преимуществ заключается в том, что «соль» распределяет данные в соответствии с пропорциями продуктов. Если бы у нас было больше доступных ресурсов, мы могли бы увеличить количество разделов перемешивания в repartition() и добавить обработчиков в соответствии с пропорциями данных. Это гарантирует предсказуемое масштабирование нашего процесса.

Соленые и жидкие

В этом сценарии объединены два самых мощных рычага, которые мы исследовали до сих пор:

Использование «соли» для контроля размера файлов и параллелизма, а также кластеризация Liquid для размещения связанных данных в одном месте без жестких границ разделов.

После применения той же стратегии добавления соли и ограничения в 1 миллион строк на раздел, таблица, кластеризованная с использованием жидкой смеси, показывает очень похожий средний размер раздела на случай добавления соли и разделения на разделы, как показано на рис. 13. Продукты C и D остаются близкими к целевому значению в 1 миллион строк, в то время как продукты A и B немного превышают этот порог.

4dd2c4a26dcc52fcf2c17892c9f28e64
Рис. 13. Среднее количество строк в разделе в зависимости от количества продуктов.

Однако основное различие заключается в том, как эти разделы распределяются и используются Spark. Как показано на рис. 14, продукт D снова достигает большого количества файловых разделов, обеспечивая достаточный параллелизм для насыщения доступных ядер во время вывода.

e5992589b7b3b00bad216f463b4db859
Рис. 14. Общее количество файловых разделов на один продукт.

В отличие от секционированного подхода, кластеризация Liquid позволяет Spark адаптировать структуру файлов с течением времени, сохраняя при этом преимущества использования «соли». Это приводит к более равномерному распределению работы между исполнителями, с меньшим количеством экстремальных выбросов как по размеру входных данных, так и по продолжительности задач.

Из сводной статистики на рис. 15 видно, что большинство задач выполняется в течение короткого временного окна, а максимальная продолжительность выполнения задачи ниже, чем в сценарии с «солью» и разделением кластера. Это указывает на уменьшение неравномерности распределения нагрузки и лучшее балансирование нагрузки в кластере.

12c0d5561e0d1a2b6ed6d973aade4497
Рис. 15. Сводка по этапу вывода результатов для кластеризованного набора данных с жидкостями и набором данных с солью.
5b6bea1e8bf7764b903ee2cb6cf527c2
Рис. 16. Сводная статистика выводов исполнителей на основе кластеризованного и соленого набора данных.

Важным побочным эффектом является то, что кластеризация Liquid сохраняет локальность данных для отфильтрованных столбцов без принудительного соблюдения строгих границ разделов. Это позволяет Spark по-прежнему использовать преимущества пропуска данных, в то время как соль гарантирует, что ни один исполнитель не будет перегружен десятками миллионов строк.

В целом, конфигурация с использованием salty и liquid оказывается наиболее надежной: она максимизирует параллелизм, минимизирует неравномерность распределения ресурсов и снижает операционные риски при увеличении нагрузки на вывод данных или изменении конфигурации кластера.

Основные выводы

  • Масштабируемость вывода часто ограничивается структурой данных, а не сложностью модели. Недостаточно большие разделы файлов могут привести к простою сотен ядер, в то время как несколько исполнителей обрабатывают десятки миллионов строк.
  • Одного лишь разделения данных недостаточно для крупномасштабного вывода. Без контроля размера файлов разделенные таблицы могут создавать огромные разделы, что приводит к длительным и неравномерным задачам.
  • Добавление соли — эффективный инструмент для обеспечения параллелизма. Введение ключа соли и ограничение количества строк на раздел значительно увеличивает число выполняемых задач и стабилизирует время выполнения.
  • Кластеризация Liquid дополняет функцию добавления соли, уменьшая неравномерность распределения данных без жестких границ. Она позволяет Spark адаптировать структуру файлов с течением времени, делая систему более отказоустойчивой по мере роста объема данных.

Гектор Мехия. Все материалы от Гектора Мехии.

Источник: towardsdatascience.com

✅ Найденные теги: Databricks, Liquid, Partitioned, Масштабирование, машинное обучение, новости, Соль

ОСТАВЬТЕ СВОЙ КОММЕНТАРИЙ

Каталог бесплатных опенсорс-решений, которые можно развернуть локально и забыть о подписках

галерея

Схема взаимодействия пользователя и LLM через построитель контекста и внешние ресурсы.
Яркая полоса от падающего метеора в ночном небе над городом, звезды и огни на фоне.
Эволюция Вселенной: инфографика расширения и структуры космоса после Большого взрыва.
Компания Philips выпустила систему компьютерной томографии Rembra для диагностики неотложных состояний.
ideipro logotyp
Руководители обсуждают, как искусственный интеллект меняет структуру рабочей силы в здравоохранении, часть 1 | MobiHealthNews
ideipro logotyp
Манекен со штрихкодом на лице смотрит на смартфон в темноте.
Текст "oh, wow." на розовом фоне, минималистичный дизайн.
Image Not Found
Схема взаимодействия пользователя и LLM через построитель контекста и внешние ресурсы.

Контекстная инженерия как ваше конкурентное преимущество

Если вы обладаете уникальными экспертными знаниями в своей области и знаете, как применить их в своих системах искусственного интеллекта, вас будет трудно превзойти. Делиться В течение последних трех лет я постоянно возвращаюсь к одному и тому же…

Мар 7, 2026
Яркая полоса от падающего метеора в ночном небе над городом, звезды и огни на фоне.

Загрязнение атмосферы космическим мусором может стать огромной проблемой.

После того, как ступень ракеты Falcon 9 сгорела в атмосфере, испарения лития и других металлов распространились над Европой. Этот растущий вид загрязнения может разрушить озоновый слой и привести к образованию облаков, способствующих изменению климата. 30-секундная выдержка, показывающая…

Мар 7, 2026
Эволюция Вселенной: инфографика расширения и структуры космоса после Большого взрыва.

Слабое космическое гудение могло бы разгадать тайну расширения Вселенной.

Изображение эволюции Вселенной за 13,77 миллиарда лет. Крайний левый угол показывает самый ранний момент, который мы можем исследовать в настоящее время, когда период «инфляции» вызвал всплеск экспоненциального роста Вселенной. (Размеры показаны вертикальной протяженностью сетки на этом графике.)…

Мар 7, 2026
Компания Philips выпустила систему компьютерной томографии Rembra для диагностики неотложных состояний.

Компания Philips выпустила систему компьютерной томографии Rembra для диагностики неотложных состояний.

Технология Rembra сочетает в себе передовые детекторы с ультрабыстрой скоростью сканирования и реконструкции. Фото: Royal Philips / GlobeNewswire. Компания Philips представила свою систему компьютерной томографии (КТ) нового поколения для радиологии, Rembra, призванную решить проблемы, возникающие в условиях…

Мар 7, 2026

Впишите свой почтовый адрес и мы будем присылать вам на почту самые свежие новости в числе самых первых