Что делать, когда память становится новым узким отображением при обработке данных?
Как разбиение данных на блоки в Pandas, Dask и Polar обрабатывает миллионы записей, когда добавление вычислительных ресурсов невозможно.
Делиться
В условиях искусственного интеллекта память, в настоящее время, стала основным ресурсом. Поскольку спрос на память и инфраструктуру хранения данных, вызванный бумом ИИ, достиг исторически высоких показателей, такие компании, как Micron Technology и Sandisk, привлекли беспрецедентное внимание и резко повысили цены на свою продукцию, используя свою ценовую власть. Но это беда для компаний, которые создают приложения, интенсивно используют данные, рассчитывают на более высокие хранилища для обучения ИИ, внедряют крупномасштабную аналитику или работают с низкой рентабельностью в облачных сервисах.
Для инженеров данных это не просто рыночные новости. Это повседневная проблема. Когда оперативная память и флэш-память становятся все лучше, старый рефлекс «увеличить емкость» перестает работать. Бюджеты не масштабируются с учетом данных, а счет за облачные услуги находится под пристальным вниманием. Что же делать инженерам данных, когда наш набор данных удваивается, кластер остается прежним?Мы должны включить изобретательность.
В этой статье я начну с задачи определения ETL, требующей преобразования данных более чем 6 миллионов сообщений в социальных сетях со смешанными типами данных в условиях ограниченных вычислительных мощностей. Я рассматриваю несколько решений — как классических, так и современных — для поддержания работоспособности вашего ETL-конвейера без обновления оборудования или облачных сервисов.
Проблема: набор данных из 6,2 миллионов строк не сохраняется в памяти.
История начинается с нового ETL-конвейера, который вы собираетесь создать. Исходные данные — это 6,2 миллиона постов с платформы социальных сетей. Набор данных, извлекаемых из социальных сетей API и после преобразования в формат JSON, становится более 200 столбцов, и самая проблемная часть заключается в том, что большое количество полей данных имеет смешанные типы данных. class=»wp-block-prismatic-blocks»> { "reaction_count": 1250 } { "reaction_count": "1250" } { "reaction_count": null }
{ "hashtags": ["AI", "Python"] } { "hashtags": "AI" } { "hashtags": null }
, поскольку PySpark требует схемы согласования, а схема API социальных сетей меняется, для решения проблем столбцов со смешанными типами данных можно использовать Панды. В отличие от PySpark, Pandas по умолчанию хранит эти столбцы как object и не требует, чтобы строка алгоритма была одной и той же шаблоном.
import pandas as pd defnormalize_mixed_columns(df, mix_columns): """ Преобразование столбцов смешанного типа в строки. """ clean_df = df.copy() для столбца в mix_columns: clean_df[column] = ( clean_df[column] .where(cleaned_df[column].isna(), clean_df[column].astype(str)) return clean_dfsocial_posts_clean =normalize_mixed_columns(social_posts_df,mixed_columns)
Просто и понятно. Однако при включении кода выполнение этого процесса было прервано, поскольку объем используемой памяти превысил доступные ресурсы. Задание завершилось ошибкой.
Классическое решение: снижение пиковых параметров на память с помощью обработки по блокам.
Узким отображением является 6,2 миллиона строк. Размер набора данных составляет примерно 30 ГБ, что соответствует стандартному объему памяти, доступному для облачных рабочих процессов. Он включает доступную память рабочего процесса во время промежуточных преобразований датовых рамок. Поэтому вместо выполнения преобразования типа данных для всего столбца, который заставляет Pandas материализовать большие временные объекты в памяти, используется метод разбиения на блоки (фрагментирование), который разделяет каждый столбец на части. Таким образом, Pandas необходимо обработать только 250 000 строк за раз, уменьшить память и перейти к следующему блоку.import gc defnormalize_mixed_columns_chunked( df, mix_columns, chunk_size=250000 ): clean_df = df.copy() для столбца в mix_columns: col_idx = clean_df.columns.get_loc(column) для начала в диапазоне (0, len(cleaned_df), chunk_size): end = min(start + chunk_size, len(cleaned_df)) chunk = clean_df.iloc[start:end, col_idx] маска = chunk.notna() if Mask.any(): chunk = chunk.astype(object) chunk.loc[mask] = ( chunk.loc[mask].astype(str) .values ) clean_df.iloc[start:end, col_idx] = chunk.values del chunk del Mask gc.collect() return clean_df Social_posts_clean =normalize_mixed_columns_chunked(social_posts_df, mix_columns)
После того, как пиковый объем памяти значительно сокращается, преобразование данных успешно завершается, конвейер стабилизируется. Однако время выполнения значительно увеличивается. Это неудивительно, поскольку метод разбиения блоков по сути представляет собой компромисс. Это жертвует скоростью выполнения ради надежности конвейера.
От ручной разбивки блоков допартийного параллельного выполнения
Помимо ручного разбивки частей в Pandas, Dask автоматически разбивает DataFrame на несколько меньших частей и использует свою память во время преобразования данных. Однако выполнение его внутреннего механизма отличается от разбивки на части. Когда я устанавливаю chunk_size в Pandas, он считывает одну часть, выполняет на ней мой код, удаляет ее из оперативной памяти, а затем переходит к следующему. Он использует один источник ЦП за раз, поэтому для облачных сервисов, предоставляющих несколько ядер ЦП, он не использует их ресурсы в полной мере. Кроме того, мне приходится вручную писать цикл для агрегирования результатов, что делает код сложным и объемным.
Dask автоматически разбивает набор данных на фрагменты. Dask строит графические задачи и планирует распределение данных по доступным ядрам ЦП, что значительно сокращает время выполнения. Поскольку DataFrame Dask состоит из нескольких разделов DataFrame Pandas, при чтении файлов CSV или JSON Dask определяет типы данных на основе выборки данных. Если столбец содержит несогласованные значения, например, пустые строки («»), None, целые числа и строки, Dask, скорее всего, выдаст ошибку ValueError, TypeError или ошибку определения метаданных. Это происходит потому, что Dask определяет тип столбца данных на основе первоначального выбора данных и предполагает, что столбец является целым числом. Но если он встретит текст в одном из следующих фрагментов, Dask выдаст ошибку.
Для решения этой проблемы необходимо указать логические типы данных столбцов, вместо того, чтобы опираться на случайное определение. Приведенный ниже код использует Dask для преобразования данных в столбцы смешанных типов.
import dask.dataframe as dd df = dd.read_parquet( "social_posts.parquet", engine = "pyarrow") mix_columns = [ "hashtags", "mentions", "location", "reaction_count", ] для столбца в mix_columns: df[column] = df[column].map(str, Meta=(column, 'str')) df.to_parquet("social_posts_clean/", engine="pyarrow")
Dask не так гибок, как фрагментирование Pandas, при обработке обработки столбцов со смешанными типами данных, поскольку требуются указания, какие столбцы необходимо преобразовать. Кроме того, он по-прежнему выполняет множество операций Pandas в каждом разделе, поэтому рабочая нагрузка, преобладание столбцов с объектами Python, может оставаться ресурсоемкими и замедленными при обработке наборов данных, содержащих миллионы строк. Существует ли какое-либо другое решение, эффективно использующее кэш ЦП?
Более сильная альтернатива — полярники
Вы можете спросить, существует ли подход, позволяющий сбалансировать скорость и эффективность использования памяти. Ответ — Polars, библиотека DataFrame, реализованная на Rust Engine. По сравнению с Python, Rust высокооптимизированный машинный код и предлагает отличное управление памятью. Он сводит к минимуму выделение памяти и затраты на сбор мусора. Однако у Rust есть и свои недостатки. Скорость разработки намного ниже, чем у Python, из-за строгих проверок компилятора, а кривая обучение чрезвычайно крутая. Именно поэтому он гораздо менее популярен, чем Python, с момента своего создания в 2010 году. Означает ли это, что инженеры данных не могут использовать этот метод, если они не знакомы с Rust?
Polars — это невероятно быстрая библиотека DataFrame, созданная на основе движка Rust и доступная через API Python. Она была запущена в 2020 году и создана для обработки гигантских наборов данных гораздо быстрее, чем собственная библиотека Pandas на Python. Она сохраняет преимущества движка Rust, но позволяет пользователям Python импортировать данные из библиотеки Python.
Polars использует форму столбцовых данных Apache Arrow, хранящуюся в памяти, которая разработана для минимизации копирования данных в память и максимизации эффективности использования кэша ЦП. Он выполнил операцию .cast(pl.String) непосредственно в коде Rust. Эти особенности Polar позволяют работать в несколько раз быстрее, чем Python, и использовать лишь малую часть памяти. Эти механизмы ограничивают ненужное использование памяти. Поэтому при обработке наборов данных, превышающих доступный объем памяти, Polars может обрабатывать данные в потоковом режиме и разрешать одновременную загрузку всего набора данных в оперативную память.
импортировать поляры как pl df = pl.read_parquet("social_posts.parquet") mix_columns = [ "hashtags", "mentions", "location", "reaction_count", ] df = df.with_columns([ pl.col(col).cast(pl.String) для столбца в mix_columns]) df.write_parquet( "social_posts_clean.parquet" )
Polars превосходит два предыдущих решения по управлению памятью, поскольку он изначально разработан для более эффективного использования памяти, в то время как Pandas Chunking и Dask создают увеличенную нагрузку на память во время выполнение.
Однако Поляры — это не панацея. У него есть свои недостатки. Во-первых, Polars создает собственный API для работы с DataFrame, хотя и использует Python. Часто приходится переписывать стандартные операции Pandas, такие как apply() , индексирование и группировка. Во-вторых, многие внешние библиотеки по-прежнему построены на основе Pandas, поэтому интеграция с Polars всё ещё требует преобразования между форматами DataFrame.
Заключительные мысли
Значит, Pandas устаревает? Не обязательно.
Каждый из трех подходов решает свою задачу. Наилучший выбор зависит от физического отключения, а не от новейших технологий.
Если у вас действительно ограниченные вычислительные ресурсы и вы используете алгоритмические схемы обработки данных, разбиение блоков в Pandas по-прежнему остается примитивным решением. Оно значительно снизило пиковое потребление памяти. Компромисс заключается во времени выполнения. Но во многих производственных средах более медленный, но более стабильный конвейер гораздо дороже, чем более быстрый, который постоянно дает сбои.
Если ваша рабочая нагрузка уже превышает возможности одной машины и вы используете несколько ядер ЦП, Dask — лучший вариант. Он автоматизирует разбиение на разделы и параллельное выполнение. Однако следует обратить внимание на схемы согласованности и типы данных, особенно при работе с полуструктурированными данными.
Polar вы выбираете, когда производительность имеет решающее значение, и вы освоили новый API DataFrame. Polar обычно считается наиболее эффективным решением благодаря его движку на Rust, формированию памяти Apache Arrow и оптимизатору запросов. Все эти функции позволяют Polar обрабатывать большие наборы данных со значительно меньшим потреблением памяти и гораздо более высокой производительностью. Как и в случае с Dask, необходимо решить проблемы, вызванные смешанными типами данных, и обеспечить согласованность схем.
В заключение оптимизация памяти — это не поиск единственного наилучшего решения. Речь идет об обеспечении ограничения вашего проекта и выборе подходящего инструмента. В эпоху искусственного интеллекта умение оптимизировать конвейеры обработки данных в условиях ограниченного объема памяти становится ценным навыком для инженеров данных.
Надежный ETL-конвейер требует не только эффективного использования памяти. Это также зависит от тестируемости, удобства сопровождения и надежности развертывания.
Эта статья является частью моей серии практических материалов по проектированию данных. Если вас интересует создание готовых к использованию в производственной среде ETL-конвейеров, выходящих за рамки оптимизации производительности, вы также можете выбрать статью «Ваша первая задача в качестве инженера данных в новой компании? Сделать ETL-конвейер тестируемым», в которой разработана среда настройки, технологическое развитие и разработка с использованием ИИ.
за внимание!
Цзяян Инь: посмотреть все в Цзяян Инь
Источник: towardsdatascience.com
Похожие записи
Оцените материал:
Похожие записи
Присоединяйтесь и подпишитесь на рассылку самых свежих новостей по Email
Получайте свежие новости и идеи на почту. Без спама — только самое интересное.
Нажимая «Подписаться», вы соглашаетесь с политикой конфиденциальности.
