Робот рассказывает о прогнозе погоды, температура 15°C, облачно с дождем.

Как создать конвейер ETL для обработки метеорологических данных с использованием ИИ, Databricks и GPT-4o: от API до панели мониторинга

Пошаговое руководство по преобразованию данных из API погоды в панель мониторинга на платформе Databricks.

Делиться

3de2ad26c0edbce1fc1120636b7b0e2f

В этом году Databricks снова встряхнула рынок данных. Компания запустила бесплатную версию платформы Databricks со всеми включенными функциями. Это, как минимум, отличный ресурс для обучения и тестирования.

Именно поэтому я создал комплексный проект, призванный помочь вам освоить основы работы с основными ресурсами Databricks.

Этот проект демонстрирует полный рабочий процесс извлечения, преобразования и загрузки данных (ETL) в Databricks. Он интегрирует API OpenWeatherMap для получения данных и модель OpenAI GPT-4o-mini для предоставления персонализированных рекомендаций по выбору одежды в зависимости от погоды.

Давайте узнаем об этом подробнее.

Проект

В рамках проекта реализуется полноценный конвейер обработки данных в Databricks, включающий следующие этапы.

  1. Выдержка : Получает текущие данные о погоде в Нью-Йорке через API OpenWeatherMap [1].
  2. Transform : Преобразует метки времени UTC в местное время Нью-Йорка и использует GPT-4o-mini от OpenAI [2] для генерации персонализированных рекомендаций по выбору одежды на основе температуры.
  3. Загрузка : Сохраняет данные в каталоге Databricks Unity как в виде необработанных JSON-файлов, так и в виде структурированной таблицы Delta (Silver Layer).
  4. Оркестрация : Блокнот с этим ETL-кодом добавляется в задание и планируется к выполнению каждый час в Databricks.
  5. Аналитика : Серебряный слой используется для отображения информации о погоде на панели мониторинга Databricks, а также рекомендаций LLM.

Вот архитектура.

4174e690f9799ac956206b1936612115

Отлично. Теперь, когда мы понимаем, что нам нужно делать, перейдем к практической части этого урока.

Примечание : если у вас еще нет учетной записи в Databricks, перейдите на страницу бесплатной версии Databricks [3], нажмите «Зарегистрироваться для бесплатной версии» и следуйте инструкциям на экране, чтобы получить бесплатный доступ.

Выдержка: Интеграция API и Databricks

Как я обычно говорю, для начала любого проекта, связанного с данными, нужны данные, верно? Поэтому наша задача здесь — интегрировать API OpenWeatherMap для прямой загрузки данных в блокнот PySpark в Databricks. На первый взгляд эта задача может показаться сложной, но поверьте, это не так.

На главной странице Databricks создайте новый блокнот, используя кнопку «+Новый» , затем выберите «Блокнот» .

4b767f1259919e8e8574618094e8e433

Для части, отвечающей за извлечение данных , нам потребуется:

1. Ключ API из API OpenWeatherMap.

Чтобы получить его, перейдите на страницу регистрации API и пройдите бесплатную регистрацию. После входа в панель управления нажмите на вкладку «Ключ API», где вы сможете его увидеть.

2. Импорт пакетов

# Импорт запросов на импорт импорт json

Далее мы создадим класс Python, чтобы упорядочить наш код и сделать его готовым к использованию в производственной среде.

  • Этот класс получает только что созданный нами API_KEY, а также город и страну для получения данных о погоде.
  • Возвращает ответ в формате JSON.

# Создание класса для модульной организации кода class Weather: # Определение конструктора def __init__(self, API_KEY): self.API_KEY = API_KEY # Определение метода для получения данных о погоде def get_weather(self, city, country, units='imperial'): self.city = city self.country = country self.units = units # Выполнение GET-запроса к конечной точке API, возвращающей данные в формате JSON url = f»https://api.openweathermap.org/data/2.5/weather?q={city},{country}&APPID={w.API_KEY}&units={units}» response = requests.get(url) # Использование метода .json() для разбора текста ответа и возврата if response.status_code != 200: raise Exception(f»Error: {response.status_code} — {response.text}») return response.json()

Отлично. Теперь мы можем запустить этот класс. Обратите внимание, что мы используем dbutils.widgets.get(). Эта команда проверяет параметры запланированного задания, о чем мы поговорим позже в этой статье. Рекомендуется хранить секреты в безопасности.

# Получаем ключ API OpenWeatherMap API_KEY = dbutils.widgets.get('API_KEY') # Создаем экземпляр класса w = Weather(API_KEY=API_KEY) # Получаем данные о погоде nyc = w.get_weather(city='New York', country='US') nyc

Вот ответ.

{'coord': {'lon': -74.006, 'lat': 40.7143}, 'weather': [{'id': 804, 'main': 'Clouds', 'description': 'overcast clouds', 'icon': '04d'}], 'base': 'stations', 'main': {'temp': 54.14, 'feels_like': 53.44, 'temp_min': 51.76, 'temp_max': 56.26, 'pressure': 992, 'humidity': 89, 'sea_level': 992, 'grnd_level': 993}, 'visibility': 10000, 'wind': {'speed': 21.85, {'градус': 270, 'порыв ветра': 37.98}, 'облака': {'все': 100}, 'dt': 1766161441, 'sys': {'тип': 1, 'id': 4610, 'страна': 'США', 'восход': 1766146541, 'закат': 1766179850}, 'часовой пояс': -18000, 'id': 5128581, 'имя': 'Нью-Йорк', 'код': 200}

Получив этот ответ, мы можем перейти к этапу преобразования данных, где мы проведем их очистку и преобразование.

Преобразование: форматирование данных

В этом разделе мы рассмотрим задачи очистки и преобразования исходных данных. Начнем с выбора необходимых данных для нашей панели мониторинга. Это просто получение данных из словаря (или JSON).

# Получение информации id = nyc['id'] timestamp = nyc['dt'] weather = nyc['weather'][0]['main'] temp = nyc['main']['temp'] tmin = nyc['main']['temp_min'] tmax = nyc['main']['temp_max'] country = nyc['sys']['country'] city = nyc['name'] sunrise = nyc['sys']['sunrise'] sunset = nyc['sys']['sunset']

Далее, давайте преобразуем метки времени в нью-йоркский часовой пояс, поскольку он соответствует гринвичскому времени.

# Преобразование времени восхода и захода солнца в дату и время в часовом поясе Нью-Йорка from datetime import datetime, timezone from zoneinfo import ZoneInfo import time # Преобразование метки времени, времени восхода и захода солнца в часовой пояс Нью-Йорка target_timezone = ZoneInfo(«America/New_York») dt_utc = datetime.fromtimestamp(sunrise, tz=timezone.utc) sunrise_nyc = str(dt_utc.astimezone(target_timezone).time()) # получаем только время восхода солнца dt_utc = datetime.fromtimestamp(sunset, tz=timezone.utc) sunset_nyc = str(dt_utc.astimezone(target_timezone).time()) # получаем только время захода солнца dt_utc = datetime.fromtimestamp(timestamp, tz=timezone.utc) time_nyc = str(dt_utc.astimezone(target_timezone))

Наконец, мы преобразуем его в датафрейм Spark.

# Создание датафрейма из переменных df = spark.createDataFrame([[id, time_nyc, weather, temp, tmin, tmax, country, city, sunrise_nyc, sunset_nyc]], schema=['id', 'timestamp','weather', 'temp', 'tmin', 'tmax', 'country', 'city', 'sunrise', 'sunset'])

57c90c0757b661e1ded9a0cef524caee

Последний шаг в этом разделе — добавление предложения от модели LLM. На этом этапе мы выберем некоторые данные, полученные из API, и передадим их модели, попросив ее вернуть предложение о том, как человек может одеться, чтобы подготовиться к погоде.

  • Вам потребуется ключ API OpenAI.
  • Учитывайте погодные условия, максимальную и минимальную температуру (weather, tmax, tmin).
  • Попросите преподавателя магистратуры дать рекомендации по выбору одежды в зависимости от погоды.
  • Добавьте предложенное решение в итоговый фрейм данных.

%pip install openai —quiet from openai import OpenAI import pyspark.sql.functions as F from pyspark.sql.functions import col # Получение ключа OpenAI OPENAI_API_KEY = dbutils.widgets.get('OPENAI_API_KEY') client = OpenAI( # Это значение по умолчанию, его можно опустить api_key=OPENAI_API_KEY ) response = client.responses.create( model=»gpt-4o-mini», instructions=»Вы синоптик, дающий советы о том, как одеться в зависимости от погоды. Ответьте одним предложением.»», input=f»Погода: {weather}, максимальная температура: {tmax}, минимальная температура: {tmin}. Как мне одеться?» ) suggestion = response.output_text # Добавление предложения в df df = df.withColumn('suggestion', F.lit(suggestion)) display(df)

Отлично. Мы почти закончили ETL. Теперь осталось только загрузить данные. Это следующий раздел.

Загрузка: Сохранение данных и создание серебряного слоя.

Последний этап ETL-процесса — загрузка данных. Мы загрузим их двумя разными способами.

  1. Сохранение исходных файлов в томе каталога Unity.
  2. Преобразованный датафрейм сохраняется непосредственно в слой Silver, который представляет собой таблицу Delta Table, готовую для использования на панели мониторинга.

Давайте создадим каталог, в котором будут храниться все данные о погоде, полученные из API.

— Создание каталога CREATE CATALOG IF NOT EXISTS pipeline_weather COMMENT 'Это каталог для конвейера обработки метеорологических данных';

Далее мы создадим схему для Lakehouse. В ней будет храниться том с полученными необработанными JSON-файлами.

— Создание схемы CREATE SCHEMA IF NOT EXISTS pipeline_weather.lakehouse COMMENT 'Это схема для конвейера обработки данных о погоде';

Теперь мы создадим том для исходных файлов.

— Давайте создадим том CREATE VOLUME IF NOT EXISTS pipeline_weather.lakehouse.raw_data COMMENT 'Это том необработанных данных для конвейера обработки погоды';

Мы также создаём ещё одну схему для хранения таблицы изменений серебряного слоя.

—Создание схемы для хранения преобразованных данных CREATE SCHEMA IF NOT EXISTS pipeline_weather.silver COMMENT 'Это схема для конвейера обработки погоды';

После того, как мы всё настроим, наш каталог будет выглядеть вот так.

5bf0ed2b7444c642fd8740bd1770daff

Теперь давайте сохраним необработанный JSON-ответ в наш том Raw Volume. Чтобы все было упорядочено и предотвратилось перезапись, мы добавим уникальную метку времени к каждому имени файла.

Добавляя эти файлы к тому, а не просто перезаписывая их, мы создаём надёжный «журнал аудита». Это служит своего рода страховочной сеткой, означающей, что если последующий процесс завершится с ошибкой или мы столкнёмся с потерей данных, мы всегда сможем вернуться к источнику и повторно обработать исходные данные, когда это потребуется.

# Получение временной метки stamp = datetime.now().strftime('%Y-%m-%d_%H-%M-%S') # Путь для сохранения json_path = f'/Volumes/pipeline_weather/lakehouse/raw_data/weather_{stamp}.json' # Сохранение данных в файл JSON df.write.mode('append').json(json_path)

Хотя мы сохраняем исходный JSON в качестве «источника истины», настоящая магия происходит при сохранении очищенных данных в таблицу Delta Table на уровне Silver. Используя .mode(“append”) и формат Delta, мы гарантируем, что наши данные структурированы, соответствуют схеме и готовы к высокоскоростной аналитике или инструментам бизнес-аналитики. Этот уровень преобразует неструктурированные ответы API в надежную, доступную для запросов таблицу, которая растет с каждым запуском конвейера.

# Сохраняем преобразованные данные в таблицу (схему) ( df .write .format('delta') .mode(«append») .saveAsTable('pipeline_weather.silver.weather') )

Прекрасно! Теперь, когда всё готово, давайте посмотрим, как выглядит наш стол.

2e4aa99c3bd9f702f89624374ed635dd

Давайте начнём автоматизировать этот процесс прямо сейчас.

Оркестрация: планирование автоматического запуска блокнота.

Продолжая работу над проектом, пришло время настроить этот конвейер на автономную работу с минимальным контролем. Для этого в Databricks есть вкладка «Задания и конвейеры», где легко запланировать выполнение заданий.

  1. В левой панели нажмите вкладку «Задания и конвейеры».
  2. Найдите кнопку «Создать» и выберите «Задание».
  3. Нажмите на значок блокнота, чтобы добавить его в задание.
  4. Настройте параметры в соответствии с приведенными ниже данными.
  5. Добавьте ключи API в параметры.
  6. Нажмите «Создать задачу» .
  7. Нажмите «Запустить сейчас» , чтобы проверить, работает ли программа.
af40f467bf753fe311bb4fba2fcbd80e
2e8e8e39c2045ad50c5b8d3202d82a1b

После нажатия кнопки «Запустить сейчас» должен начаться запуск блокнота, и отобразится сообщение «Успешно».

aec249f3149fc10c74a02ba9b49177a9

Если задача выполняется без проблем, пора настроить её на автоматическое выполнение.

  1. Нажмите кнопку «Добавить триггер» в правой части экрана, прямо под разделом «Расписания и триггеры».
  2. Тип триггера = Запланированный.
  3. Тип расписания: выберите «Расширенное».
  4. Выберите «Каждый 1 час» из выпадающих списков.
  5. Сохрани это.

Отлично. Наш конвейер обработки данных теперь работает в автоматическом режиме! Каждый час система будет обращаться к API OpenWeatherMap, получать актуальную информацию о погоде в Нью-Йорке и сохранять её в нашей таблице Silver Layer Table.

Аналитика: создание панели мониторинга для принятия решений на основе данных.

Последним элементом этой головоломки является создание аналитического отчета, который будет отображать информацию о погоде и предоставлять пользователю полезные рекомендации о том, как одеваться по погоде на улице.

  1. Нажмите на вкладку «Панели мониторинга» на левой боковой панели.
  2. Нажмите кнопку «Создать панель управления».
  3. Это откроет перед нами чистый холст, на котором мы сможем работать.
48ba586960b3099be82a90305ae23c7b

Теперь панели мониторинга работают на основе данных, получаемых из SQL-запросов. Поэтому, прежде чем добавлять текст и графику на холст, нам необходимо сначала создать несколько метрик, которые будут переменными для ввода данных в карточки панели мониторинга и графику.

Итак, нажмите кнопку «+Создать из SQL» , чтобы запустить метрику. Дайте ей имя. Например, Location, чтобы получить последнее полученное название города, я должен использовать следующий запрос.

— Получить последнее полученное название города SELECT city FROM pipeline_weather.silver.weather ORDER BY timestamp DESC LIMIT 1

И нам нужно создать по одному SQL-запросу для каждой метрики. Все они доступны в репозитории GitHub [ ].

Далее мы переходим на вкладку «Панель управления» и начинаем перетаскивать элементы на холст.

a54db731c938864f5a1c0339c4beca05

После нажатия на «Текст» вы можете вставить прямоугольник на холст и отредактировать текст. При нажатии на графический элемент вставляется место для графического изображения, и открывается меню справа для выбора переменных и настроек.

618a9d83112a3aee6c5723d23b33bab7

Хорошо. После добавления всех элементов панель управления будет выглядеть так.

fc6f22c8aa9f54782855686b941fdfad

Как здорово! На этом наш проект завершается.

Прежде чем уйти

Этот проект легко можно повторить примерно за час, в зависимости от вашего опыта работы с экосистемой Databricks. Несмотря на быстроту разработки, он позволит вам применить множество ключевых инженерных навыков:

  • Архитектурное проектирование : Вы научитесь создавать современное пространство для отдыха на озере с нуля.
  • Бесшовная интеграция данных : Вы обеспечите связь между внешними веб-API и платформой Databricks для приема данных в режиме реального времени.
  • Чистый, модульный код : Мы выходим за рамки простых скриптов, используя классы и функции Python для организации и удобства сопровождения кода.
  • Автоматизация и оркестровка : Вы получите практический опыт планирования задач, чтобы обеспечить надежную работу вашего проекта в автоматическом режиме.
  • Предоставление реальной ценности : цель состоит не просто в передаче данных, а в предоставлении ценности. Преобразуя необработанные метеорологические показатели в практические рекомендации по выбору одежды с помощью ИИ, мы превращаем «холодные данные» в полезный сервис для конечного пользователя.

Если вам понравился этот материал, мои контактные данные и дополнительную информацию обо мне вы найдете на моем сайте.

https://gustavorsantos.me

Репозиторий GitHub

Здесь находится репозиторий этого проекта.

https://github.com/gurezende/Databricks-Weather-Pipeline

Ссылки

[1. API OpenWeatherMap] (https://openweathermap.org/)

[2. Открытая платформа ИИ] (https://platform.openai.com/)

[3. Бесплатная версия Databricks] (https://www.databricks.com/learn/free-edition)

[4. Репозиторий GitHub] (https://github.com/gurezende/Databricks-Weather-Pipeline)

Источник: towardsdatascience.com

✅ Найденные теги: Databricks, GPT-4o, ИИ, Как, Конвейер ETL, Метеорологические Данные, новости

ОСТАВЬТЕ СВОЙ КОММЕНТАРИЙ

Ваш адрес email не будет опубликован. Обязательные поля помечены *

Каталог бесплатных опенсорс-решений, которые можно развернуть локально и забыть о подписках

галерея

Фото сгенерированных лиц: исследование показывает, что люди не могут отличить настоящие лица от сгенерированных
Нейросети построили капитализм за трое суток: 100 агентов Claude заперли…
Скетч: цифровой осьминог и виртуальный мир внутри компьютера с человечком.
Сцена с жестами пальцами, где один жест символизирует "VPN", а другой "KHP".
‼️Paramount купила Warner Bros. Discovery — сумма сделки составила безумные…
Скриншот репозитория GitHub "Claude Scientific Skills" AI для научных исследований.
Структура эффективного запроса Claude с элементами задачи, контекста и референса.
Эскиз и готовая веб-страница платформы для AI-дизайна в современном темном режиме.
ideipro logotyp
Image Not Found
Звёздное небо с галактиками и туманностями, космос, Вселенная, астрофотография.

Система оповещения обсерватории Рубина отправила 800 000 сигналов в первую ночь наблюдений.

Астрономы будут получать оповещения о небесных явлениях в течение нескольких минут после их обнаружения. Теренс О'Брайен, редактор раздела «Выходные». Публикации этого автора будут добавляться в вашу ежедневную рассылку по электронной почте и в ленту новостей на главной…

Мар 2, 2026
Женщина с длинными тёмными волосами в синем свете, нейтральный фон.

Расследование в отношении 61-фунтовой машины, которая «пожирает» пластик и выплевывает кирпичи.

Обзор компактного пресса для мягкого пластика Clear Drop — и что будет дальше. Шон Холлистер, старший редактор Публикации этого автора будут добавляться в вашу ежедневную рассылку по электронной почте и в ленту новостей на главной странице вашего…

Мар 2, 2026
Черный углеродное волокно с текстурой плетения, отражающий свет.

Материал будущего: как работает «бессмертный» композит

Учёные из Университета штата Северная Каролина представили композит нового поколения, способный самостоятельно восстанавливаться после серьёзных повреждений.  Речь идёт о модифицированном армированном волокном полимере (FRP), который не просто сохраняет прочность при малом весе, но и способен «залечивать» внутренние…

Мар 2, 2026
Круглый экран с изображением замка и горы, рядом электронная плата.

Круглый дисплей Waveshare для креативных проектов

Круглый 7-дюймовый сенсорный дисплей от Waveshare создан для разработчиков и дизайнеров, которым нужен нестандартный экран.  Это IPS-панель с разрешением 1 080×1 080 пикселей, поддержкой 10-точечного ёмкостного сенсора, оптической склейкой и защитным закалённым стеклом, выполненная в круглом форм-факторе.…

Мар 2, 2026

Впишите свой почтовый адрес и мы будем присылать вам на почту самые свежие новости в числе самых первых