Я думал, что разработка данных — это просто написание скриптов. Я ошибался.
Я пытался подготовить свой ETL-конвейер к работе в производственной среде. Три вещи сломались. Каждая из них научила меня тому, чему одно лишь написание скриптов никогда бы не научило.
Делиться
После того, как я создал свой первый ETL-конвейер, я подумал, что довольно хорошо понимаю, что такое инженерия данных. Вы извлекаете данные откуда-то, очищаете их, а затем загружаете в нужное место. ETL. Достаточно просто.
Для контекста: я аналитик данных, пытающийся перейти в область инженерии данных. Я публично документирую этот процесс, начиная с составленного мной в начале этого года плана самообучения на 12 месяцев. Последним шагом на этом пути стало создание моего первого ETL-конвейера с нуля с использованием API GitHub, о котором я писал здесь, на TDS. Этот конвейер работал. Он извлекал данные, очищал их и сохранял в CSV-файл. Я был им доволен.
Поэтому я решил довести дело до конца, сделать его более «готовым к производству», как любят говорить в интернете. То, что произошло дальше, меня по-настоящему удивило. Не потому, что что-то сломалось, а потому, что эти поломки выявили многое.
Первоначальный трубопровод
Первоначальный конвейер обработки данных был простым, что меня устраивало, ведь в этом и заключалась суть. Извлечь данные из API GitHub, немного их очистить, сохранить всё в CSV-файл. Для своих целей он работал идеально: как учебное задание. Но CSV-файл и одноразовый скрипт — это не то, как работает инженерия данных в реальном мире. Я хотел выяснить, что на самом деле означает «реальный мир», поэтому решил усовершенствовать конвейер и посмотреть, что получится.
Вот полный оригинальный алгоритм для тех, кто не читал предыдущую статью:
import requests from datetime import datetime, timedelta url = "https://api.github.com/search/repositories" params = { "q": "language:python created:>2025-04-22", "sort": "stars", "order": "desc", "per_page": 30 } response = requests.get(url, params=params) data = response.json() import pandas as pd repos = [] for repo in data['items']: repos.append({ "name": repo['name'], "owner": repo['owner']['login'], "stars": repo['stargazers_count'], "forks": repo['forks_count'], "language": repo['language'], "description": repo['description'], "url": repo['html_url'], "created_at": repo['created_at'] }) df = pd.DataFrame(repos) df_clean = df.dropna(subset=['description']) df_clean = df_clean.copy() df_clean['viral'] = df_clean['stars'].apply(lambda x: 'Yes' if x > 50000 else 'No') df_clean = df_clean.sort_values('stars', ascending=False).reset_index(drop=True) df_clean.to_csv('github_trending_repos.csv', index=False) print("Pipeline complete. File saved.")
Простой, понятный и работающий. Но как только вы пытаетесь запустить его несколько раз или возвращаетесь к нему на следующий день, начинают проявляться недостатки.
Стена первая: У трубопровода не было памяти.
Первое обновление прошло без проблем. Вместо сохранения в CSV-файл я загрузил данные в базу данных SQLite. SQLite по-прежнему представляет собой всего лишь один файл, но ведёт себя как настоящая база данных. В неё можно делать запросы, проверять её содержимое и корректно её расширять. Казалось, это небольшое изменение. Но это было не так.
Я запустил конвейер один раз и получил 22 репозитория. Затем я запустил его второй раз, ничего не меняя, и проверил базу данных.
Total rows: 44 Unique repos: 22 Duplicates: 22
Честно говоря, я этого не ожидал. Я предполагал, что это может произойти, но никогда по-настоящему не думал, что это случится. Но я рад, что это произошло, потому что это был первый раз, когда я увидел, как мой конвейер обработки данных сломался. И то, что это показало, было простым, но важным: у скрипта не было памяти. Каждый раз при запуске он начинался с нуля и вслепую добавлял все, что находил. Никаких предупреждений, никаких ошибок. Просто «Конвейер завершен», как будто все в порядке.
Именно здесь я наткнулся на понятие, называемое идемпотентностью.
Идемпотентность — это замысловатое слово, обозначающее простую идею. Если что-то уже произошло, это не должно повториться. В контексте конвейера обработки данных это означает, что запуск конвейера один раз или десять раз всегда должен давать один и тот же результат. Никаких лишних строк, никаких дубликатов, никакого скрытого повреждения данных.
Решение оказалось довольно простым. Теперь, прежде чем вставлять что-либо в базу данных, конвейер проверяет, существует ли эта запись уже. Если да, то сначала удаляет её, а затем вставляет новую версию. Небольшое изменение в подходе, но оно полностью меняет надёжность вашего конвейера.
И вот что мне запомнилось: простой скрипт никогда не подумает об этом сам по себе. Это нужно встраивать целенаправленно. Это уже не скриптинг, это инженерия.
Стена вторая: Данные исчезли за одну ночь
Вторая стена была менее техничной и более тревожной.
Когда я закрыл Colab на ночь и вернулся на следующий день, меня охватило тревожное чувство. Мне пришлось всё запускать заново с нуля, надеясь, что ничего не сломается, хотя накануне всё работало идеально. Тщательно созданная мной база данных просто исчезла. И я вспомнил, что до этого проекта мне с трудом удавалось найти свой исходный файл ETL-конвейера. Я потратил много времени на его поиски, прежде чем наконец его нашёл. Это чувство, когда ты чуть не теряешь свою работу, остаётся с тобой навсегда.
Я знал, что должен быть лучший способ. Реальный конвейер обработки данных не может зависеть от того, что кто-то будет запускать его каждое утро. Данные должны храниться где-то, где они сохранятся и после завершения сессии.
Решение заключалось в том, чтобы напрямую подключить Google Drive к Colab и указать путь к базе данных там, а не во временной среде Colab. Изменена всего одна строка кода:
conn = sqlite3.connect('/content/drive/MyDrive/github_repos.db')
Теперь база данных находится в Google Drive. Закройте сессию, перезапустите среду выполнения, откройте совершенно новый блокнот. Данные по-прежнему там и ждут своего часа.
Но это исправление выявило нечто большее. Если сохранение данных уже требует размышлений о том, где они хранятся и как сохраняются, что произойдет, если вам потребуется, чтобы конвейер обработки данных запускался автоматически каждый день без вашего участия?
Третья стена: Никто не может вечно нажимать кнопку «Бег».
Больше всего меня удивило то, что «третья стена».
Даже после решения проблемы с дубликатами и безопасного хранения базы данных в Google Drive, кому-то все равно приходится открывать блокнот и нажимать кнопку «Запустить». Colab — это не сервер. Это интерактивная среда. Она не просыпается в 3 часа ночи, не получает свежие данные из API GitHub и не засыпает снова. Она создана не для этого.
И когда я об этом подумал с точки зрения реального мира, меня это сразу же осенило. В реальной компании никто не сидит в полночь, настороженно ожидая запуска скрипта. Конвейер должен работать сам по себе. По расписанию. Надежно. Независимо от того, наблюдает за нами кто-то или нет.
Вот тут-то и пригодятся такие инструменты, как Apache Airflow, Prefect и облачные cron-задания. Это не скрипты на Python. Это системы, которые работают на серверах, управляют расписаниями, обрабатывают сбои, отправляют оповещения при возникновении проблем и хранят историю каждого запуска.
Честно говоря, именно планирование — это концепция, которую мне больше всего интересно изучить в дальнейшем, потому что именно здесь работа с данными начинает ощущаться как настоящая работа с инфраструктурой.
Обзор ключевых изменений
Позвольте мне подробно рассказать о трех изменениях, которые я внес в конвейер обработки данных, и о том, что делает каждое из них.
1. Замена CSV на SQLite.
# Before df_clean.to_csv('github_trending_repos.csv', index=False) # After conn = sqlite3.connect('github_repos.db') df_clean.to_sql('repos', conn, if_exists='append', index=False) conn.close()
Сохранение в CSV-файл подходит для разового анализа. Но CSV — это всего лишь текстовый файл. Вы не сможете легко выполнять запросы к нему, и он плохо масштабируется по мере роста объёма данных. SQLite — это полноценная база данных, а это значит, что вы можете выполнять SQL-запросы, проверять уже имеющиеся в ней данные и корректно развивать её. Та же простота, но гораздо больше возможностей.
2. Исправление проблемы с дубликатами
cursor.execute(''' DELETE FROM repos WHERE url IN (SELECT url FROM repos_temp) ''') cursor.execute(''' INSERT INTO repos SELECT * FROM repos_temp ''')
Это решение проблемы идемпотентности. Перед добавлением чего-либо конвейер проверяет, существует ли этот репозиторий уже в базе данных, используя его URL-адрес в качестве уникального идентификатора. Если существует, он сначала удаляет его, а затем вставляет новую версию. Таким образом, независимо от того, сколько раз запускается конвейер, вы всегда получаете чистые данные без дубликатов.
3. Сохранение данных в Google Диск
# Before conn = sqlite3.connect('github_repos.db') # After conn = sqlite3.connect('/content/drive/MyDrive/github_repos.db')
Это всего одна строка, но она меняет всё. Вместо сохранения базы данных во временной среде Colab, где она исчезает после закрытия сессии, она сохраняется напрямую в Google Диск. Закройте блокнот, перезапустите среду выполнения, вернитесь завтра. Ваши данные всё ещё будут ждать вас там.
Вот полный модернизированный конвейер, объединяющий все три изменения:
import requests import pandas as pd import sqlite3 from datetime import datetime, timedelta # Extract yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d') url = "https://api.github.com/search/repositories" params = { "q": f"language:python created:>{yesterday}", "sort": "stars", "order": "desc", "per_page": 30 } response = requests.get(url, params=params) data = response.json() # Transform repos = [] for repo in data['items']: repos.append({ "name": repo['name'], "owner": repo['owner']['login'], "stars": repo['stargazers_count'], "forks": repo['forks_count'], "language": repo['language'], "description": repo['description'], "url": repo['html_url'], "created_at": repo['created_at'] }) df = pd.DataFrame(repos) df_clean = df.dropna(subset=['description']) df_clean = df_clean.copy() df_clean['viral'] = df_clean['stars'].apply(lambda x: 'Yes' if x > 50000 else 'No') df_clean = df_clean.sort_values('stars', ascending=False).reset_index(drop=True) # Load conn = sqlite3.connect('/content/drive/MyDrive/github_repos.db') cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS repos ( name TEXT, owner TEXT, stars INTEGER, forks INTEGER, language TEXT, description TEXT, url TEXT, created_at TEXT, viral TEXT, loaded_at TEXT ) ''') df_clean['loaded_at'] = datetime.now().strftime('%Y-%m-%d') df_clean.to_sql('repos_temp', conn, if_exists='replace', index=False) cursor.execute(''' DELETE FROM repos WHERE url IN (SELECT url FROM repos_temp) ''') cursor.execute(''' INSERT INTO repos SELECT * FROM repos_temp ''') conn.commit() conn.close() print("Pipeline complete. Duplicates handled.")
Так что же такое инженерия данных на самом деле?
Если бы вы спросили меня, что такое инженерия данных после того, как я создал свой первый ETL-конвейер, я бы ответил, что это в основном написание скриптов. Извлечение, преобразование, загрузка. Повторение. Именно так это выглядело со стороны.
Но после того, как я продвинул этот конвейер дальше и увидел, как он ломается тремя разными способами, я теперь смотрю на это по-другому. Инженерия данных — это создание надежных систем, а не просто скриптов, которые запускаются. Разница есть. Скрипт делает то, что вы ему говорите, один раз и тогда, когда вы ему это говорите. Система же обрабатывает сбои, запоминает, что она уже сделала, сохраняет данные после одной сессии и работает по расписанию без чьего-либо контроля.
Идемпотентность, персистентность, планирование. Ни одно из этих понятий не проявилось, когда я однажды запускал свой конвейер в ноутбуке. Они открылись только тогда, когда я попытался заставить его работать как нечто реальное.
В реальной компании вы не можете позволить себе ошибиться в этом. Данные, которые генерирует ваш конвейер обработки данных, используются для принятия решений. Если он полон дубликатов, исчезает за ночь или запускается только тогда, когда кто-то вспоминает нажать кнопку, это не конвейер обработки данных. Это уязвимость.
Мне еще многому предстоит научиться. Следующим препятствием на моем пути станет планирование. Но я иду к нему, понимая, что разработка данных никогда не сводилась только к написанию скриптов. Мне просто пришлось кое-что сломать, чтобы это понять.
Это продолжающаяся серия статей по проектированию данных. Следите за моими публикациями, где я буду документировать каждый этап этого пути, включая те моменты, которые проходят не совсем гладко.
Свяжитесь со мной в LinkedIn, YouTube и Twitter.
Ибрагим Салами. Все материалы от Ибрагима Салами.
Источник: towardsdatascience.com
Похожие записи
- Компания Google подала в суд на китайскую киберпреступную организацию, которая использовала искусственный интеллект для обмана «сотни тысяч жертв».
- Dreaming: Улучшенная память для более эффективного использования ChatGPT | OpenAI
- Исследование того, как ИИ может помочь пользователям понять состояние кожи.
