PySpark для начинающих: за пределами основ
Сделайте следующий шаг к созданию реальных рабочих процессов с помощью Spark на вашем ноутбуке.
Делиться

Если вы читали мою первую статью из этой серии, «PySpark для начинающих: освоение основ», то вы уже понимаете суть Spark: распределенные данные, DataFrames и ленивое выполнение. Вы установили PySpark, запустили SparkSession, прочитали CSV-файл и выполнили простые манипуляции с данными в DataFrame. Ссылку на эту статью я оставлю в конце.
Стоит повторить то, что я часто использую термины PySpark и Spark как синонимы, но, строго говоря, Spark — это общая платформа для распределенных вычислений (написанная на Scala), а PySpark — это специализированный API для Spark на Python.
Помимо основ
Однако, когда вы преодолеваете начальный этап, происходит нечто интересное. Вы быстро понимаете, что ваш второй проект на PySpark требует несколько иного подхода:
- Вы хотите читать/записывать данные более безопасным, быстрым и предсказуемым способом.
- Вы хотите объединить наборы данных, не испытывая при этом неуверенности в правильности операций объединения.
- Вы хотите понять, почему Spark ведёт себя именно так, и как мягко направить его в нужное русло.
Эта статья проведет вас через следующие шаги. Она намеренно написана в неспешном и практичном стиле. Никаких сложных внутренних деталей. Никакой настройки кластера. Никаких сложных оптимизаций Spark. Только то, что нужно знать настоящим новичкам, когда они переходят от простых примеров к небольшим проектам в реальном мире.
Мы используем Spark с открытым исходным кодом, запускаем его локально, как и раньше.
1. Следующий шаг: правильное считывание данных.
В моей первой статье мы использовали максимально простой загрузчик CSV-файлов:
df = spark.read.csv("sales.csv", header=True, inferSchema=True)
Это работает — и подходит для первых экспериментов — но за этим скрывается тонкая проблема.
Spark угадывает типы ваших данных.
При использовании директивы inferSchema=True Spark анализирует небольшой фрагмент вашего файла и использует эту информацию, чтобы определить, является ли столбец целым числом, строкой, логическим значением или числом с плавающей запятой. Это означает:
- Если 99 строк содержат числовые данные, а 100-я строка пуста, Spark может интерпретировать столбец как строку .
- Если кто-то отредактирует файл на следующей неделе и случайно добавит 23,50 фунта стерлингов вместо 23,50, Spark может обработать весь столбец по-другому.
- Если ваш файл большой, используемая Spark выборка не будет представлять весь набор данных.
Это может привести к загадочному поведению в дальнейшем, к тем самым ошибкам, которые новичкам сложнее всего диагностировать.
Полезная привычка для начинающих: определите схему для своих данных.
Представьте себе схему как аналог чертежа для чтения данных в Spark. Прежде чем что-либо создавать, вы сообщаете Spark примерно следующее:
Названия столбцов
Каким типом данных они должны быть?
Указывает, является ли значение столбца необязательным.
Вот как это выглядит в нашем примере с данными о продажах. Напомним, что данные выглядели следующим образом:
transaction_id,customer_name,net_amount,tax_amount, is_member 101,Alice,250.50,25.05,true 102,Bob,120.00,6.00, false 103,Charlie,450.75,25.07,true 104,David,89.99,5.73,false
Чтобы указать типы указанных выше полей в Spark, мы определяем нашу схему с помощью кода, подобного следующему.
from pyspark.sql import types as T schema = T.StructType([ T.StructField("transaction_id", T.IntegerType(), False), T.StructField("customer_name", T.StringType(), False), T.StructField("net_amount", T.DoubleType(), True), T.StructField("tax_amount", T.DoubleType(), True), T.StructField("is_member", T.BooleanType(), True), ])
Названия столбцов и параметры типов говорят сами за себя. Параметр True[False] указывает, что в столбце не может быть значений NULL. Обратите внимание, что флаг допустимости значений NULL (True/False) в основном является метаданными схемы и информацией для оптимизации. Он не всегда строго соблюдается для каждого источника данных, как это происходит с ограничением NOT NULL в базе данных.
Дополнительные полезные опции при чтении данных в формате CSV.
Существует множество удобных опций для чтения CSV-файлов, которые можно комбинировать с директивой схемы, что делает загрузку CSV-данных еще более надежной.
К наиболее распространенным вариантам относятся:
- mode=”PERMISSIVE”: сохраняет некорректные строки насколько это возможно
- mode=”DROPMALFORMED”: удаляет некорректные строки
- режим=”FAILFAST”: ошибки возникают немедленно
- header= True[False]: Содержит ли файл [или нет] запись заголовка
- nullValue: какой текст должен заменять значения null во входных данных?
- dateFormat / timestampFormat
Теперь мы можем загрузить данные о продажах в DataFrame следующим образом:
df = ( spark.read .option("header", True) # Other modes: "PERMISSIVE" and "DROPMALFORMED". .option("mode", "FAILFAST") .option("nullValue", "N/A") .schema(schema) .csv("sales_data.csv") )
Почему это важно для начинающих?
- Вы должны знать, какие существуют типы данных, прежде чем начать работу.
- Если это указано, Spark будет отклонять некорректные строки, а не интерпретировать их без уведомления.
- Ваши преобразования становятся более предсказуемыми .
- Если объединить два набора данных позже, несоответствия типов вас не удивят.
2. Понимание преобразований данных
Напомним, в моей предыдущей статье, на первых этапах работы с датафреймами с помощью PySpark, мы добавили в наш датафрейм дополнительный производный столбец, используя следующий код:
df2 = df.withColumn("gross_amount", df.net_amount + df.tax_amount)
Я объяснил, что эта строка пока ничего не вычисляет. Она просто добавляет шаг к внутреннему плану Spark:
1. Read the CSV 2. Add a new column (gross_amount = net + tax)
Тогда вы можете добавить еще несколько шагов, подобных этому:
df3 = df2.withColumn("tax_percentage", df2.tax_amount / df2.gross_amount * 100)
Тем не менее, никаких вычислений не происходит. Только когда вы выполняете такое действие , как…
df3.show()
… говорит Спарк:
«Хорошо, теперь мне нужно выполнить все эти шаги».
Вот что означает «ленивое исполнение», но для новичков важно не само название, а эффект, и он означает следующее:
- Вы можете выполнять множество преобразований последовательно, не «платя» за них, до тех пор, пока вам не понадобится результат.
- Spark может внутренне изменять порядок выполнения операций для обеспечения их эффективной работы.
- Не стоит тратить время на промежуточные этапы обработки данных, которые впоследствии могут быть отфильтрованы.
Представьте это как обычную повседневную задачу, например, приготовление бутерброда:
- Вы собираете все ингредиенты.
- Вы собираете это в своем уме.
- На самом деле, нарезать и готовить нужно только тогда, когда вы точно знаете, что будете готовить.
3. Очистка данных до того, как они вызовут проблемы.
Реальные данные обычно неструктурированы и часто содержат пропущенные значения, пустые строки, повторяющиеся записи или значения-заполнители, такие как «N/A» и «unknown».
В PySpark цель состоит в том, чтобы выявлять и устранять очевидные проблемы на ранних стадиях, чтобы остальная часть вашего рабочего процесса вела себя предсказуемо. PySpark обладает рядом полезных функций, которые позволяют это сделать.
Удаление строк с пропущенными значениями
Простейшая функция очистки — dropna().
df_clean = df.dropna()
Это удаляет все строки, содержащие нулевое значение в любом столбце. Это может быть полезно, но часто это слишком агрессивно.
Чаще всего удаляют только те строки, в которых отсутствуют важные столбцы:
df_clean = df.dropna(subset=["net_amount", "tax_amount"])
Это означает:
Сохраняйте строку до тех пор, пока в ней присутствуют значения net_amount и tax_amount.
В других столбцах могут по-прежнему содержаться значения NULL, и это может быть нормально.
Заполнение пропущенных значений
Иногда вам не нужно удалять строки. Вам просто нужно заменить пропущенные значения чем-то осмысленным.
Вот тут-то и пригодится функция fillna().
df_clean = df.fillna({"city": "Unknown"})
Также можно заполнять числовые столбцы:
df_clean = df.fillna({"tax_amount": 0.0})
Это полезно, когда пропущенное значение имеет чёткое значение. Например, пропущенная сумма скидки вполне может быть заменена на 0,0. Но будьте осторожны. Заполнение пропущенных значений может изменить смысл ваших данных, если вы выберете неправильное значение по умолчанию.
Изменение типов столбцов с помощью функции приведения типов (cast()).
Иногда Spark ошибочно считывает тип столбца, особенно при работе с CSV-файлами. В этом случае можно преобразовать тип столбца с помощью оператора приведения типов (cast()):
from pyspark.sql import functions as F df_clean = df.withColumn("net_amount",F.col("net_amount").cast("double") )
Это особенно часто происходит, когда даты, числа или логические значения считываются как строки.
Удаление повторяющихся строк
Повторяющиеся строки могут появляться при многократном экспорте файлов, неправильном объединении или смешивании данных из нескольких источников. Удалить точные дубликаты можно следующим образом:
df_clean = df.dropDuplicates()
Или удалите дубликаты на основе одного или нескольких выбранных столбцов.
df_clean = df.dropDuplicates(["transaction_id"])
Второй вариант зачастую полезнее, потому что в нём говорится:
Каждый идентификатор транзакции должен отображаться только один раз.
Небольшой пример очистки данных
Объединив эти идеи:
from pyspark.sql import functions as F df_clean = ( df # Remove transactions missing required values. .dropna(subset=["transaction_id", "net_amount"]) # Supply defaults for optional values. .fillna( { "city": "Unknown", "tax_amount": 0.0, } ) # Apply the expected numeric types. .withColumn( "net_amount", F.col("net_amount").cast("double"), ) .withColumn( "tax_amount", F.col("tax_amount").cast("double"), ) # Keep one row for each transaction. .dropDuplicates(["transaction_id"]) )
4. Объединение наборов данных в PySpark без лишних сложностей.
Если вы раньше работали с базами данных, вы, вероятно, писали SQL-запросы для объединения двух или более таблиц. Объединения в Spark работают аналогично, но с использованием DataFrame.
Что такое объединение (join)?
Если вам незнакомо понятие объединения (join), то это способ сопоставления строк из одного DataFrame с соответствующими строками из другого DataFrame. Другими словами, это отвечает на вопрос типа:
«Какие строки в этом DataFrame соответствуют строкам в том DataFrame?»
В этом и заключается основная идея каждого объединения в PySpark. Как только эта часть станет понятной, синтаксис и типы объединений станут намного проще для понимания.
Если у вас есть два таких DataFrame:
sales_data.csv
transaction_id, customer_name, net_amount, tax_amount 101, Alice, 250.50, 25.05 102, Bob, 120.00, 6.00
customers.csv
customer_name, city, loyalty_level Alice, New York, Gold Bob, London, Silver
Вы можете объединить их по общему полю customer_name следующим образом:
df_sales = spark.read.csv("sales_data.csv", header=True) df_customers = spark.read.csv("customers.csv", header=True) df_joined = df_sales.join(df_customers, on="customer_name", how="inner") df_joined.show() # Output +-------------+--------------+----------+----------+--------+-------------+ |customer_name|transaction_id|net_amount|tax_amount|city |loyalty_level| +-------------+--------------+----------+----------+--------+-------------+ |Alice |101 |250.50 |25.05 |New York|Gold | |Bob |102 |120.00 |6.00 |London |Silver | +-------------+--------------+----------+----------+--------+-------------+
Какой тип объединения (join) следует использовать новичкам?
В Spark доступно несколько различных типов объединений (joins). В 99% случаев использования для начинающих вы будете использовать один из следующих вариантов:
- внутренний — показывать только совпадающие строки
- слева — показать все данные из левой таблицы, а также совпадения.
- внешний — показать все строки из обеих таблиц
Из всех типов соединений внутреннее соединение будет, безусловно, самым распространенным типом соединения, который вы будете использовать в своей повседневной работе.
Пока не стоит беспокоиться о таких сложных стратегиях объединения, как «широковещательная рассылка», «сортировка-слияние», «перемешивание-хеширование» или любых других. По мере приобретения опыта работы со Spark вы сможете изучить их подробнее в удобное для вас время.
Просто помните:
Объединение таблиц (Join) требует больших вычислительных затрат, чем простые операции над столбцами, поэтому используйте его при необходимости, но не бездумно.
5. Чтение и запись данных в стиле Spark: Parquet
Большинство новичков предпочитают CSV, потому что он им знаком. Но CSV медленный, негибкий и не поддерживает различные типы данных, а в реальной жизни Parquet — это собственный формат данных Spark. Parquet — это столбцовый, сжатый формат данных, идеально подходящий для анализа данных, составления отчетов и задач с интенсивным чтением.
Когда Spark считывает набор данных в формате Parquet:
- Загружаются только те столбцы, которые вам действительно необходимы.
- Оно понимает все типы данных.
- Загрузка происходит значительно быстрее, чем в формате CSV.
Содержимое DataFrame записывается в файлы формата Parquet следующим образом:
df_joined.write.mode("overwrite").parquet("output/enriched_sales")
Затем вы сможете мгновенно прочитать это вот так:
df_fast = spark.read.parquet("output/enriched_sales") df_fast.show()
Примечание: Использование формата Parquet для ввода и вывода файлов — это самый простой способ повысить производительность для любого начинающего пользователя Spark.
6. Мышление в рамках рабочих процессов PySpark.
Как только вы поймете, как считывать данные, очищать их, преобразовывать, объединять и записывать обратно, следующим шагом будет изучение того, как организовать эти действия в простой рабочий процесс. Проект PySpark для начинающих обычно проходит в следующей последовательности:
Read data -> check and clean it -> add useful columns -> combine with other data -> write the result
Это может показаться очевидным, но это важный сдвиг. Вы больше не просто экспериментируете с одним DataFrame за раз. Вы выстраиваете повторяемый процесс.
Каждый этап должен быть простым.
Полезная привычка для начинающих — четко определять цель каждого этапа рабочего процесса. Например:
df_raw = spark.read.schema(schema).csv("sales_data.csv", header=True) df_clean = df_raw.dropna(subset=["net_amount", "tax_amount"]) df_enriched = df_clean.withColumn( "gross_amount", F.col("net_amount") + F.col("tax_amount") ) df_final = df_enriched.join(df_customers, on="customer_name", how="left") df_final.write.mode("overwrite").parquet("output/final_dataset")
Этот стиль немного многословнее, чем объединение всего в одно длинное выражение, но его гораздо легче читать на этапе обучения.
Каждое имя DataFrame указывает на ваш текущий этап рабочего процесса:
df_raw -> the data as it arrived df_clean -> the data after basic cleaning df_enriched -> the data after adding new meaning df_final -> the dataset ready to save
Почему это важно
В случае возникновения проблем такая структура значительно упрощает отладку.
Вы можете проанализировать каждый этап, изучив данные:
df_raw.show() df_clean.show() df_enriched.show()
Вы можете проверить количество строк:
df_raw.count() df_clean.count() df_final.count()
Это помогает ответить на такие полезные вопросы, как:
Did rows disappear unexpectedly during cleaning? Did the join create more rows than expected? Did a calculated column produce nulls?
Простая мысленная модель: Входные данные → подготовка → комбинация → выходные данные — позволит вам удивительно далеко продвинуться в изучении PySpark.
7. Краткое введение в пользовательский интерфейс Spark.
Spark имеет удобный веб-интерфейс, который активируется при выполнении таких действий, как .count() или .write() . Когда ваша задача Spark выполняется локально, перейдите по следующей ссылке:
http://localhost:4040
Вы должны увидеть примерно следующее на экране.

На первый взгляд это может показаться немного сложным, но вам не нужно разбираться во всех вкладках. На данном этапе вам достаточно знать, что этот пользовательский интерфейс существует и для чего он полезен. А полезен он потому, что позволяет видеть, какие задания Spark уже выполнены или выполняются в данный момент.
По мере того, как вы будете осваивать Spark, пользовательский интерфейс поможет вам понять, почему задания завершаются с ошибкой или выполняются дольше, чем ожидалось. Но это придет гораздо позже. А пока относитесь к пользовательскому интерфейсу Spark как к приборной панели вашего автомобиля — вам не нужно разбираться в двигателе, чтобы заметить, когда что-то выглядит странно.
Итог: Теперь вы готовы к своему первому настоящему проекту на PySpark.
На этом этапе вы перешли от «Я могу запустить Spark» к «Я могу создать простой и понятный конвейер обработки Spark».
Теперь вы знаете, как:
- безопасно считывать данные.
- очистить и подготовить его.
- обогатить его новыми столбцами,
- объединить несколько наборов данных,
- эффективно сохранить результат.
- и наблюдайте за Spark ровно столько, чтобы сохранять уверенность.
В этой статье не требовалось кластерное подключение. Не требовалась сложная настройка. Именно так начинается множество реальных проектов на PySpark.
Когда у вас появится больше опыта, вы, возможно, захотите расширить свои знания, изучив некоторые из этих тем.
- чтение планов выполнения
- понимание перетасовок
- управление разделами
- другие типы соединений
- простая настройка производительности
Это лишь некоторые из тем, которые я надеюсь осветить в будущей статье, но на данный момент вы освоили следующий важный этап и можете создать что-то значимое и полезное с помощью PySpark.
Кстати, вот ссылка на первую статью из этой серии:
PySpark для начинающих: освоение основ, о котором я упоминал в начале.
Томас Рид. Все материалы от Томаса Рида.
Источник: towardsdatascience.com
Похожие записи
Похожие записи
Устройте вечеринку с бабушкой и предотвратите ее убийство в игре Apple Crumble, которая выходит в этом году.
07.06.2026
Древняя керамика — самое раннее свидетельство того, что люди занимались математикой
31.03.2026
«Без цензуры — только танками давить»: правда ли свободная пресса уничтожила СССР?
12.06.2025Подписка на рассылку
Получайте свежие новости и идеи на почту. Без спама — только самое интересное.
Нажимая «Подписаться», вы соглашаетесь с политикой конфиденциальности.
