В этой статье представлены пять полезных и эффективных декораторов Python для создания и оптимизации высокопроизводительных конвейеров обработки данных.

Изображение предоставлено редактором.
# Введение
Конвейеры обработки данных в проектах по анализу данных и машинному обучению — это очень практичный и универсальный способ автоматизации рабочих процессов обработки данных. Но иногда наш код может добавлять дополнительную сложность к основной логике. Декораторы Python могут преодолеть эту распространенную проблему. В этой статье представлены пять полезных и эффективных декораторов Python для создания и оптимизации высокопроизводительных конвейеров обработки данных.
Этот вводный код предшествует примерам кода, сопровождающим пять декораторов для загрузки версии набора данных о жилищном рынке Калифорнии, который я предоставил вам в общедоступном репозитории GitHub:
import pandas as pd import numpy as np # Загрузка набора данных DATA_URL = «https://raw.githubusercontent.com/gakudo-ai/open-datasets/main/housing.csv» print(«Загрузка исходного файла конвейера данных…») df_pipeline = pd.read_csv(DATA_URL) print(f»Загружено {df_pipeline.shape[0]} строк и {df_pipeline.shape[1]} столбцов.»
# 1. JIT-компиляция
Хотя циклы в Python имеют сомнительную репутацию из-за своей невероятной медленности и создания узких мест при выполнении сложных операций, таких как математические преобразования, в рамках набора данных, существует быстрое решение. Оно называется @njit и представляет собой декоратор в библиотеке Numba, который преобразует функции Python в оптимизированный машинный код, похожий на код на C, во время выполнения. Для больших наборов данных и сложных конвейеров обработки данных это может означать значительное ускорение.
from numba import njit import time # Извлечение числового столбца в виде массива NumPy для быстрой обработки incomes = df_pipeline['median_income'].fillna(0).values @njit def compute_complex_metric(income_array): result = np.zeros_like(income_array) # В чистом Python такой цикл обычно затягивается for i in range(len(income_array)): result[i] = np.log1p(income_array[i] * 2.5) ** 1.5 return result start = time.time() df_pipeline['income_metric'] = compute_complex_metric(incomes) print(f»Обработан массив за {time.time() — start:.5f} секунд!»)
# 2. Промежуточное кэширование
Когда конвейеры обработки данных содержат ресурсоемкие операции агрегирования или объединения данных, выполнение которых может занимать от нескольких минут до нескольких часов, для сериализации выходных данных функций можно использовать memory.cache. В случае перезапуска скрипта или восстановления после сбоя этот декоратор может перезагрузить сериализованные данные массива с диска, пропуская ресурсоемкие вычисления и экономя не только ресурсы, но и время.
from joblib import Memory import time # Создание локального каталога кэша для артефактов конвейера memory = Memory(«.pipeline_cache», verbose=0) @memory.cache def expensive_aggregation(df): print(«Выполняется операция группировки с большим объемом данных…») time.sleep(1.5) # Моделирование длительных шагов конвейера # Группировка точек данных по ocean_proximity и вычисление средних значений на уровне атрибутов return df.groupby('ocean_proximity', as_index=False).mean(numeric_only=True) # Первый запуск выполняет код; второй обращается к диску для мгновенной загрузки agg_df = expensive_aggregation(df_pipeline) agg_df_cached = expensive_aggregation(df_pipeline)
# 3. Проверка схемы
Pandera — это библиотека статистической типизации (проверки схемы), разработанная для предотвращения постепенного и незаметного искажения аналитических моделей, таких как модели машинного обучения или панели мониторинга, из-за низкого качества данных. В приведенном ниже примере достаточно использовать ее в сочетании с библиотекой параллельной обработки Dask, чтобы проверить, соответствует ли исходный конвейер указанной схеме. В противном случае генерируется ошибка, помогающая выявить потенциальные проблемы на ранней стадии.
import pandera as pa import pandas as pd import numpy as np from dask import delayed, compute # Определяем схему для обеспечения соблюдения типов данных и допустимых диапазонов housing_schema = pa.DataFrameSchema({ «median_income»: pa.Column(float, pa.Check.greater_than(0)), «total_rooms»: pa.Column(float, pa.Check.gt(0)), «ocean_proximity»: pa.Column(str, pa.Check.isin(['NEAR BAY', '
Источник: www.kdnuggets.com





















