Архив рубрики ~Обо всем~

Я думал, что разработка данных — это просто написание скриптов. Я ошибался.

Я думал, что разработка данных — это просто написание скриптов. Я ошибался.
Я думал, что разработка данных — это просто написание скриптов. Я ошибался.

Я пытался подготовить свой ETL-конвейер к работе в производственной среде. Три вещи сломались. Каждая из них научила меня тому, чему одно лишь написание скриптов никогда бы не научило.

Делиться

Создано с помощью Gemini AI

После того, как я создал свой первый ETL-конвейер, я подумал, что довольно хорошо понимаю, что такое инженерия данных. Вы извлекаете данные откуда-то, очищаете их, а затем загружаете в нужное место. ETL. Достаточно просто.

Для контекста: я аналитик данных, пытающийся перейти в область инженерии данных. Я публично документирую этот процесс, начиная с составленного мной в начале этого года плана самообучения на 12 месяцев. Последним шагом на этом пути стало создание моего первого ETL-конвейера с нуля с использованием API GitHub, о котором я писал здесь, на TDS. Этот конвейер работал. Он извлекал данные, очищал их и сохранял в CSV-файл. Я был им доволен.

Поэтому я решил довести дело до конца, сделать его более «готовым к производству», как любят говорить в интернете. То, что произошло дальше, меня по-настоящему удивило. Не потому, что что-то сломалось, а потому, что эти поломки выявили многое.

Первоначальный трубопровод

Первоначальный конвейер обработки данных был простым, что меня устраивало, ведь в этом и заключалась суть. Извлечь данные из API GitHub, немного их очистить, сохранить всё в CSV-файл. Для своих целей он работал идеально: как учебное задание. Но CSV-файл и одноразовый скрипт — это не то, как работает инженерия данных в реальном мире. Я хотел выяснить, что на самом деле означает «реальный мир», поэтому решил усовершенствовать конвейер и посмотреть, что получится.

Вот полный оригинальный алгоритм для тех, кто не читал предыдущую статью:

 import requests from datetime import datetime, timedelta url = "https://api.github.com/search/repositories" params = { "q": "language:python created:>2025-04-22", "sort": "stars", "order": "desc", "per_page": 30 } response = requests.get(url, params=params) data = response.json() import pandas as pd repos = [] for repo in data['items']: repos.append({ "name": repo['name'], "owner": repo['owner']['login'], "stars": repo['stargazers_count'], "forks": repo['forks_count'], "language": repo['language'], "description": repo['description'], "url": repo['html_url'], "created_at": repo['created_at'] }) df = pd.DataFrame(repos) df_clean = df.dropna(subset=['description']) df_clean = df_clean.copy() df_clean['viral'] = df_clean['stars'].apply(lambda x: 'Yes' if x > 50000 else 'No') df_clean = df_clean.sort_values('stars', ascending=False).reset_index(drop=True) df_clean.to_csv('github_trending_repos.csv', index=False) print("Pipeline complete. File saved.")

Простой, понятный и работающий. Но как только вы пытаетесь запустить его несколько раз или возвращаетесь к нему на следующий день, начинают проявляться недостатки.

Стена первая: У трубопровода не было памяти.

Первое обновление прошло без проблем. Вместо сохранения в CSV-файл я загрузил данные в базу данных SQLite. SQLite по-прежнему представляет собой всего лишь один файл, но ведёт себя как настоящая база данных. В неё можно делать запросы, проверять её содержимое и корректно её расширять. Казалось, это небольшое изменение. Но это было не так.

Я запустил конвейер один раз и получил 22 репозитория. Затем я запустил его второй раз, ничего не меняя, и проверил базу данных.

 Total rows: 44 Unique repos: 22 Duplicates: 22

Честно говоря, я этого не ожидал. Я предполагал, что это может произойти, но никогда по-настоящему не думал, что это случится. Но я рад, что это произошло, потому что это был первый раз, когда я увидел, как мой конвейер обработки данных сломался. И то, что это показало, было простым, но важным: у скрипта не было памяти. Каждый раз при запуске он начинался с нуля и вслепую добавлял все, что находил. Никаких предупреждений, никаких ошибок. Просто «Конвейер завершен», как будто все в порядке.

Именно здесь я наткнулся на понятие, называемое идемпотентностью.
Идемпотентность — это замысловатое слово, обозначающее простую идею. Если что-то уже произошло, это не должно повториться. В контексте конвейера обработки данных это означает, что запуск конвейера один раз или десять раз всегда должен давать один и тот же результат. Никаких лишних строк, никаких дубликатов, никакого скрытого повреждения данных.

Решение оказалось довольно простым. Теперь, прежде чем вставлять что-либо в базу данных, конвейер проверяет, существует ли эта запись уже. Если да, то сначала удаляет её, а затем вставляет новую версию. Небольшое изменение в подходе, но оно полностью меняет надёжность вашего конвейера.

И вот что мне запомнилось: простой скрипт никогда не подумает об этом сам по себе. Это нужно встраивать целенаправленно. Это уже не скриптинг, это инженерия.

Стена вторая: Данные исчезли за одну ночь

Вторая стена была менее техничной и более тревожной.

Когда я закрыл Colab на ночь и вернулся на следующий день, меня охватило тревожное чувство. Мне пришлось всё запускать заново с нуля, надеясь, что ничего не сломается, хотя накануне всё работало идеально. Тщательно созданная мной база данных просто исчезла. И я вспомнил, что до этого проекта мне с трудом удавалось найти свой исходный файл ETL-конвейера. Я потратил много времени на его поиски, прежде чем наконец его нашёл. Это чувство, когда ты чуть не теряешь свою работу, остаётся с тобой навсегда.

Я знал, что должен быть лучший способ. Реальный конвейер обработки данных не может зависеть от того, что кто-то будет запускать его каждое утро. Данные должны храниться где-то, где они сохранятся и после завершения сессии.

Решение заключалось в том, чтобы напрямую подключить Google Drive к Colab и указать путь к базе данных там, а не во временной среде Colab. Изменена всего одна строка кода:

 conn = sqlite3.connect('/content/drive/MyDrive/github_repos.db')

Теперь база данных находится в Google Drive. Закройте сессию, перезапустите среду выполнения, откройте совершенно новый блокнот. Данные по-прежнему там и ждут своего часа.

Но это исправление выявило нечто большее. Если сохранение данных уже требует размышлений о том, где они хранятся и как сохраняются, что произойдет, если вам потребуется, чтобы конвейер обработки данных запускался автоматически каждый день без вашего участия?

Третья стена: Никто не может вечно нажимать кнопку «Бег».

Больше всего меня удивило то, что «третья стена».

Даже после решения проблемы с дубликатами и безопасного хранения базы данных в Google Drive, кому-то все равно приходится открывать блокнот и нажимать кнопку «Запустить». Colab — это не сервер. Это интерактивная среда. Она не просыпается в 3 часа ночи, не получает свежие данные из API GitHub и не засыпает снова. Она создана не для этого.

И когда я об этом подумал с точки зрения реального мира, меня это сразу же осенило. В реальной компании никто не сидит в полночь, настороженно ожидая запуска скрипта. Конвейер должен работать сам по себе. По расписанию. Надежно. Независимо от того, наблюдает за нами кто-то или нет.

Вот тут-то и пригодятся такие инструменты, как Apache Airflow, Prefect и облачные cron-задания. Это не скрипты на Python. Это системы, которые работают на серверах, управляют расписаниями, обрабатывают сбои, отправляют оповещения при возникновении проблем и хранят историю каждого запуска.

Честно говоря, именно планирование — это концепция, которую мне больше всего интересно изучить в дальнейшем, потому что именно здесь работа с данными начинает ощущаться как настоящая работа с инфраструктурой.

Обзор ключевых изменений

Позвольте мне подробно рассказать о трех изменениях, которые я внес в конвейер обработки данных, и о том, что делает каждое из них.

1. Замена CSV на SQLite.

 # Before df_clean.to_csv('github_trending_repos.csv', index=False) # After conn = sqlite3.connect('github_repos.db') df_clean.to_sql('repos', conn, if_exists='append', index=False) conn.close()

Сохранение в CSV-файл подходит для разового анализа. Но CSV — это всего лишь текстовый файл. Вы не сможете легко выполнять запросы к нему, и он плохо масштабируется по мере роста объёма данных. SQLite — это полноценная база данных, а это значит, что вы можете выполнять SQL-запросы, проверять уже имеющиеся в ней данные и корректно развивать её. Та же простота, но гораздо больше возможностей.

2. Исправление проблемы с дубликатами

 cursor.execute(''' DELETE FROM repos WHERE url IN (SELECT url FROM repos_temp) ''') cursor.execute(''' INSERT INTO repos SELECT * FROM repos_temp ''')

Это решение проблемы идемпотентности. Перед добавлением чего-либо конвейер проверяет, существует ли этот репозиторий уже в базе данных, используя его URL-адрес в качестве уникального идентификатора. Если существует, он сначала удаляет его, а затем вставляет новую версию. Таким образом, независимо от того, сколько раз запускается конвейер, вы всегда получаете чистые данные без дубликатов.

3. Сохранение данных в Google Диск

 # Before conn = sqlite3.connect('github_repos.db') # After conn = sqlite3.connect('/content/drive/MyDrive/github_repos.db')

Это всего одна строка, но она меняет всё. Вместо сохранения базы данных во временной среде Colab, где она исчезает после закрытия сессии, она сохраняется напрямую в Google Диск. Закройте блокнот, перезапустите среду выполнения, вернитесь завтра. Ваши данные всё ещё будут ждать вас там.

Вот полный модернизированный конвейер, объединяющий все три изменения:

 import requests import pandas as pd import sqlite3 from datetime import datetime, timedelta # Extract yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d') url = "https://api.github.com/search/repositories" params = { "q": f"language:python created:>{yesterday}", "sort": "stars", "order": "desc", "per_page": 30 } response = requests.get(url, params=params) data = response.json() # Transform repos = [] for repo in data['items']: repos.append({ "name": repo['name'], "owner": repo['owner']['login'], "stars": repo['stargazers_count'], "forks": repo['forks_count'], "language": repo['language'], "description": repo['description'], "url": repo['html_url'], "created_at": repo['created_at'] }) df = pd.DataFrame(repos) df_clean = df.dropna(subset=['description']) df_clean = df_clean.copy() df_clean['viral'] = df_clean['stars'].apply(lambda x: 'Yes' if x > 50000 else 'No') df_clean = df_clean.sort_values('stars', ascending=False).reset_index(drop=True) # Load conn = sqlite3.connect('/content/drive/MyDrive/github_repos.db') cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS repos ( name TEXT, owner TEXT, stars INTEGER, forks INTEGER, language TEXT, description TEXT, url TEXT, created_at TEXT, viral TEXT, loaded_at TEXT ) ''') df_clean['loaded_at'] = datetime.now().strftime('%Y-%m-%d') df_clean.to_sql('repos_temp', conn, if_exists='replace', index=False) cursor.execute(''' DELETE FROM repos WHERE url IN (SELECT url FROM repos_temp) ''') cursor.execute(''' INSERT INTO repos SELECT * FROM repos_temp ''') conn.commit() conn.close() print("Pipeline complete. Duplicates handled.")

Так что же такое инженерия данных на самом деле?

Если бы вы спросили меня, что такое инженерия данных после того, как я создал свой первый ETL-конвейер, я бы ответил, что это в основном написание скриптов. Извлечение, преобразование, загрузка. Повторение. Именно так это выглядело со стороны.

Но после того, как я продвинул этот конвейер дальше и увидел, как он ломается тремя разными способами, я теперь смотрю на это по-другому. Инженерия данных — это создание надежных систем, а не просто скриптов, которые запускаются. Разница есть. Скрипт делает то, что вы ему говорите, один раз и тогда, когда вы ему это говорите. Система же обрабатывает сбои, запоминает, что она уже сделала, сохраняет данные после одной сессии и работает по расписанию без чьего-либо контроля.

Идемпотентность, персистентность, планирование. Ни одно из этих понятий не проявилось, когда я однажды запускал свой конвейер в ноутбуке. Они открылись только тогда, когда я попытался заставить его работать как нечто реальное.

В реальной компании вы не можете позволить себе ошибиться в этом. Данные, которые генерирует ваш конвейер обработки данных, используются для принятия решений. Если он полон дубликатов, исчезает за ночь или запускается только тогда, когда кто-то вспоминает нажать кнопку, это не конвейер обработки данных. Это уязвимость.

Мне еще многому предстоит научиться. Следующим препятствием на моем пути станет планирование. Но я иду к нему, понимая, что разработка данных никогда не сводилась только к написанию скриптов. Мне просто пришлось кое-что сломать, чтобы это понять.

Это продолжающаяся серия статей по проектированию данных. Следите за моими публикациями, где я буду документировать каждый этап этого пути, включая те моменты, которые проходят не совсем гладко.

Свяжитесь со мной в LinkedIn, YouTube и Twitter.

Ибрагим Салами. Все материалы от Ибрагима Салами.

Источник: towardsdatascience.com

✅ Найденные теги: «Я, Данных, Думал, Написание, новости, Просто, Разработка
Читайте также
Архив рубрики ~Лента новостей~ Наследники Дуоса: как работает современный двухсимочный смартфон Архив рубрики ~Лента новостей~ Все эти IPO масштаба галактики сопряжены с риском экономического кризиса Архив рубрики ~Лента новостей~ Французский Deezer запустил бесплатный инструмент для распознавания сгенерированной музыки на сторонних стримингах Архив рубрики ~Лента новостей~ Я попросила Claude Fable 5 сделать игру одним промптом. Получился симулятор админа ИИ-канала Архив рубрики ~Лента новостей~ Ногти поведали о тяжелой болезни потомка основателя империи Сун. Его состояние резко ухудшилось за десять недель до смерти Архив рубрики ~Лента новостей~ Команды Формулы-1 тратят миллионы на свои симуляторы — чем они отличаются от других? Архив рубрики ~Лента новостей~ Ученые нашли экологичную замену химикатам для защиты томатов от опасного грибка: Сельское хозяйство Архив рубрики ~Лента новостей~ Военная SPAC-компания Quantum Space пытается перехватить волну IPO SpaceX. Архив рубрики ~Лента новостей~ Вместо ускорителей NVIDIA собирается поставлять в Китай центральные процессоры Архив рубрики ~Лента новостей~ 3 хитрости NumPy для повышения производительности численных вычислений Архив рубрики ~Лента новостей~ IPO SpaceX: все необходимые обновления в режиме реального времени. Архив рубрики ~Лента новостей~ Представляем новые возможности GPT-Rosalind | OpenAI Архив рубрики ~Лента новостей~ Экологически чистая вычислительная платформа на базе ваших старых телефонов. Архив рубрики ~Полезное~ Midjourney для чайников за пару минут Архив рубрики ~Лента новостей~ Наследники Дуоса: как работает современный двухсимочный смартфон Архив рубрики ~Лента новостей~ Все эти IPO масштаба галактики сопряжены с риском экономического кризиса Архив рубрики ~Лента новостей~ Французский Deezer запустил бесплатный инструмент для распознавания сгенерированной музыки на сторонних стримингах Архив рубрики ~Лента новостей~ Я попросила Claude Fable 5 сделать игру одним промптом. Получился симулятор админа ИИ-канала Архив рубрики ~Лента новостей~ Ногти поведали о тяжелой болезни потомка основателя империи Сун. Его состояние резко ухудшилось за десять недель до смерти Архив рубрики ~Лента новостей~ Команды Формулы-1 тратят миллионы на свои симуляторы — чем они отличаются от других? Архив рубрики ~Лента новостей~ Ученые нашли экологичную замену химикатам для защиты томатов от опасного грибка: Сельское хозяйство Архив рубрики ~Лента новостей~ Военная SPAC-компания Quantum Space пытается перехватить волну IPO SpaceX. Архив рубрики ~Лента новостей~ Вместо ускорителей NVIDIA собирается поставлять в Китай центральные процессоры Архив рубрики ~Лента новостей~ 3 хитрости NumPy для повышения производительности численных вычислений Архив рубрики ~Лента новостей~ IPO SpaceX: все необходимые обновления в режиме реального времени. Архив рубрики ~Лента новостей~ Представляем новые возможности GPT-Rosalind | OpenAI Архив рубрики ~Лента новостей~ Экологически чистая вычислительная платформа на базе ваших старых телефонов. Архив рубрики ~Полезное~ Midjourney для чайников за пару минут

Оставить комментарий