Пошаговое руководство по преобразованию данных из API погоды в панель мониторинга на платформе Databricks.
Делиться

В этом году Databricks снова встряхнула рынок данных. Компания запустила бесплатную версию платформы Databricks со всеми включенными функциями. Это, как минимум, отличный ресурс для обучения и тестирования.
Именно поэтому я создал комплексный проект, призванный помочь вам освоить основы работы с основными ресурсами Databricks.
Этот проект демонстрирует полный рабочий процесс извлечения, преобразования и загрузки данных (ETL) в Databricks. Он интегрирует API OpenWeatherMap для получения данных и модель OpenAI GPT-4o-mini для предоставления персонализированных рекомендаций по выбору одежды в зависимости от погоды.
Давайте узнаем об этом подробнее.
Проект
В рамках проекта реализуется полноценный конвейер обработки данных в Databricks, включающий следующие этапы.
- Выдержка : Получает текущие данные о погоде в Нью-Йорке через API OpenWeatherMap [1].
- Transform : Преобразует метки времени UTC в местное время Нью-Йорка и использует GPT-4o-mini от OpenAI [2] для генерации персонализированных рекомендаций по выбору одежды на основе температуры.
- Загрузка : Сохраняет данные в каталоге Databricks Unity как в виде необработанных JSON-файлов, так и в виде структурированной таблицы Delta (Silver Layer).
- Оркестрация : Блокнот с этим ETL-кодом добавляется в задание и планируется к выполнению каждый час в Databricks.
- Аналитика : Серебряный слой используется для отображения информации о погоде на панели мониторинга Databricks, а также рекомендаций LLM.
Вот архитектура.

Отлично. Теперь, когда мы понимаем, что нам нужно делать, перейдем к практической части этого урока.
Примечание : если у вас еще нет учетной записи в Databricks, перейдите на страницу бесплатной версии Databricks [3], нажмите «Зарегистрироваться для бесплатной версии» и следуйте инструкциям на экране, чтобы получить бесплатный доступ.
Выдержка: Интеграция API и Databricks
Как я обычно говорю, для начала любого проекта, связанного с данными, нужны данные, верно? Поэтому наша задача здесь — интегрировать API OpenWeatherMap для прямой загрузки данных в блокнот PySpark в Databricks. На первый взгляд эта задача может показаться сложной, но поверьте, это не так.
На главной странице Databricks создайте новый блокнот, используя кнопку «+Новый» , затем выберите «Блокнот» .

Для части, отвечающей за извлечение данных , нам потребуется:
1. Ключ API из API OpenWeatherMap.
Чтобы получить его, перейдите на страницу регистрации API и пройдите бесплатную регистрацию. После входа в панель управления нажмите на вкладку «Ключ API», где вы сможете его увидеть.
2. Импорт пакетов
# Импорт запросов на импорт импорт json
Далее мы создадим класс Python, чтобы упорядочить наш код и сделать его готовым к использованию в производственной среде.
- Этот класс получает только что созданный нами API_KEY, а также город и страну для получения данных о погоде.
- Возвращает ответ в формате JSON.
# Создание класса для модульной организации кода class Weather: # Определение конструктора def __init__(self, API_KEY): self.API_KEY = API_KEY # Определение метода для получения данных о погоде def get_weather(self, city, country, units='imperial'): self.city = city self.country = country self.units = units # Выполнение GET-запроса к конечной точке API, возвращающей данные в формате JSON url = f»https://api.openweathermap.org/data/2.5/weather?q={city},{country}&APPID={w.API_KEY}&units={units}» response = requests.get(url) # Использование метода .json() для разбора текста ответа и возврата if response.status_code != 200: raise Exception(f»Error: {response.status_code} — {response.text}») return response.json()
Отлично. Теперь мы можем запустить этот класс. Обратите внимание, что мы используем dbutils.widgets.get(). Эта команда проверяет параметры запланированного задания, о чем мы поговорим позже в этой статье. Рекомендуется хранить секреты в безопасности.
# Получаем ключ API OpenWeatherMap API_KEY = dbutils.widgets.get('API_KEY') # Создаем экземпляр класса w = Weather(API_KEY=API_KEY) # Получаем данные о погоде nyc = w.get_weather(city='New York', country='US') nyc
Вот ответ.
{'coord': {'lon': -74.006, 'lat': 40.7143}, 'weather': [{'id': 804, 'main': 'Clouds', 'description': 'overcast clouds', 'icon': '04d'}], 'base': 'stations', 'main': {'temp': 54.14, 'feels_like': 53.44, 'temp_min': 51.76, 'temp_max': 56.26, 'pressure': 992, 'humidity': 89, 'sea_level': 992, 'grnd_level': 993}, 'visibility': 10000, 'wind': {'speed': 21.85, {'градус': 270, 'порыв ветра': 37.98}, 'облака': {'все': 100}, 'dt': 1766161441, 'sys': {'тип': 1, 'id': 4610, 'страна': 'США', 'восход': 1766146541, 'закат': 1766179850}, 'часовой пояс': -18000, 'id': 5128581, 'имя': 'Нью-Йорк', 'код': 200}
Получив этот ответ, мы можем перейти к этапу преобразования данных, где мы проведем их очистку и преобразование.
Преобразование: форматирование данных
В этом разделе мы рассмотрим задачи очистки и преобразования исходных данных. Начнем с выбора необходимых данных для нашей панели мониторинга. Это просто получение данных из словаря (или JSON).
# Получение информации id = nyc['id'] timestamp = nyc['dt'] weather = nyc['weather'][0]['main'] temp = nyc['main']['temp'] tmin = nyc['main']['temp_min'] tmax = nyc['main']['temp_max'] country = nyc['sys']['country'] city = nyc['name'] sunrise = nyc['sys']['sunrise'] sunset = nyc['sys']['sunset']
Далее, давайте преобразуем метки времени в нью-йоркский часовой пояс, поскольку он соответствует гринвичскому времени.
# Преобразование времени восхода и захода солнца в дату и время в часовом поясе Нью-Йорка from datetime import datetime, timezone from zoneinfo import ZoneInfo import time # Преобразование метки времени, времени восхода и захода солнца в часовой пояс Нью-Йорка target_timezone = ZoneInfo(«America/New_York») dt_utc = datetime.fromtimestamp(sunrise, tz=timezone.utc) sunrise_nyc = str(dt_utc.astimezone(target_timezone).time()) # получаем только время восхода солнца dt_utc = datetime.fromtimestamp(sunset, tz=timezone.utc) sunset_nyc = str(dt_utc.astimezone(target_timezone).time()) # получаем только время захода солнца dt_utc = datetime.fromtimestamp(timestamp, tz=timezone.utc) time_nyc = str(dt_utc.astimezone(target_timezone))
Наконец, мы преобразуем его в датафрейм Spark.
# Создание датафрейма из переменных df = spark.createDataFrame([[id, time_nyc, weather, temp, tmin, tmax, country, city, sunrise_nyc, sunset_nyc]], schema=['id', 'timestamp','weather', 'temp', 'tmin', 'tmax', 'country', 'city', 'sunrise', 'sunset'])

Последний шаг в этом разделе — добавление предложения от модели LLM. На этом этапе мы выберем некоторые данные, полученные из API, и передадим их модели, попросив ее вернуть предложение о том, как человек может одеться, чтобы подготовиться к погоде.
- Вам потребуется ключ API OpenAI.
- Учитывайте погодные условия, максимальную и минимальную температуру (weather, tmax, tmin).
- Попросите преподавателя магистратуры дать рекомендации по выбору одежды в зависимости от погоды.
- Добавьте предложенное решение в итоговый фрейм данных.
%pip install openai —quiet from openai import OpenAI import pyspark.sql.functions as F from pyspark.sql.functions import col # Получение ключа OpenAI OPENAI_API_KEY = dbutils.widgets.get('OPENAI_API_KEY') client = OpenAI( # Это значение по умолчанию, его можно опустить api_key=OPENAI_API_KEY ) response = client.responses.create( model=»gpt-4o-mini», instructions=»Вы синоптик, дающий советы о том, как одеться в зависимости от погоды. Ответьте одним предложением.»», input=f»Погода: {weather}, максимальная температура: {tmax}, минимальная температура: {tmin}. Как мне одеться?» ) suggestion = response.output_text # Добавление предложения в df df = df.withColumn('suggestion', F.lit(suggestion)) display(df)
Отлично. Мы почти закончили ETL. Теперь осталось только загрузить данные. Это следующий раздел.
Загрузка: Сохранение данных и создание серебряного слоя.
Последний этап ETL-процесса — загрузка данных. Мы загрузим их двумя разными способами.
- Сохранение исходных файлов в томе каталога Unity.
- Преобразованный датафрейм сохраняется непосредственно в слой Silver, который представляет собой таблицу Delta Table, готовую для использования на панели мониторинга.
Давайте создадим каталог, в котором будут храниться все данные о погоде, полученные из API.
— Создание каталога CREATE CATALOG IF NOT EXISTS pipeline_weather COMMENT 'Это каталог для конвейера обработки метеорологических данных';
Далее мы создадим схему для Lakehouse. В ней будет храниться том с полученными необработанными JSON-файлами.
— Создание схемы CREATE SCHEMA IF NOT EXISTS pipeline_weather.lakehouse COMMENT 'Это схема для конвейера обработки данных о погоде';
Теперь мы создадим том для исходных файлов.
— Давайте создадим том CREATE VOLUME IF NOT EXISTS pipeline_weather.lakehouse.raw_data COMMENT 'Это том необработанных данных для конвейера обработки погоды';
Мы также создаём ещё одну схему для хранения таблицы изменений серебряного слоя.
—Создание схемы для хранения преобразованных данных CREATE SCHEMA IF NOT EXISTS pipeline_weather.silver COMMENT 'Это схема для конвейера обработки погоды';
После того, как мы всё настроим, наш каталог будет выглядеть вот так.

Теперь давайте сохраним необработанный JSON-ответ в наш том Raw Volume. Чтобы все было упорядочено и предотвратилось перезапись, мы добавим уникальную метку времени к каждому имени файла.
Добавляя эти файлы к тому, а не просто перезаписывая их, мы создаём надёжный «журнал аудита». Это служит своего рода страховочной сеткой, означающей, что если последующий процесс завершится с ошибкой или мы столкнёмся с потерей данных, мы всегда сможем вернуться к источнику и повторно обработать исходные данные, когда это потребуется.
# Получение временной метки stamp = datetime.now().strftime('%Y-%m-%d_%H-%M-%S') # Путь для сохранения json_path = f'/Volumes/pipeline_weather/lakehouse/raw_data/weather_{stamp}.json' # Сохранение данных в файл JSON df.write.mode('append').json(json_path)
Хотя мы сохраняем исходный JSON в качестве «источника истины», настоящая магия происходит при сохранении очищенных данных в таблицу Delta Table на уровне Silver. Используя .mode(“append”) и формат Delta, мы гарантируем, что наши данные структурированы, соответствуют схеме и готовы к высокоскоростной аналитике или инструментам бизнес-аналитики. Этот уровень преобразует неструктурированные ответы API в надежную, доступную для запросов таблицу, которая растет с каждым запуском конвейера.
# Сохраняем преобразованные данные в таблицу (схему) ( df .write .format('delta') .mode(«append») .saveAsTable('pipeline_weather.silver.weather') )
Прекрасно! Теперь, когда всё готово, давайте посмотрим, как выглядит наш стол.

Давайте начнём автоматизировать этот процесс прямо сейчас.
Оркестрация: планирование автоматического запуска блокнота.
Продолжая работу над проектом, пришло время настроить этот конвейер на автономную работу с минимальным контролем. Для этого в Databricks есть вкладка «Задания и конвейеры», где легко запланировать выполнение заданий.
- В левой панели нажмите вкладку «Задания и конвейеры».
- Найдите кнопку «Создать» и выберите «Задание».
- Нажмите на значок блокнота, чтобы добавить его в задание.
- Настройте параметры в соответствии с приведенными ниже данными.
- Добавьте ключи API в параметры.
- Нажмите «Создать задачу» .
- Нажмите «Запустить сейчас» , чтобы проверить, работает ли программа.


После нажатия кнопки «Запустить сейчас» должен начаться запуск блокнота, и отобразится сообщение «Успешно».

Если задача выполняется без проблем, пора настроить её на автоматическое выполнение.
- Нажмите кнопку «Добавить триггер» в правой части экрана, прямо под разделом «Расписания и триггеры».
- Тип триггера = Запланированный.
- Тип расписания: выберите «Расширенное».
- Выберите «Каждый 1 час» из выпадающих списков.
- Сохрани это.
Отлично. Наш конвейер обработки данных теперь работает в автоматическом режиме! Каждый час система будет обращаться к API OpenWeatherMap, получать актуальную информацию о погоде в Нью-Йорке и сохранять её в нашей таблице Silver Layer Table.
Аналитика: создание панели мониторинга для принятия решений на основе данных.
Последним элементом этой головоломки является создание аналитического отчета, который будет отображать информацию о погоде и предоставлять пользователю полезные рекомендации о том, как одеваться по погоде на улице.
- Нажмите на вкладку «Панели мониторинга» на левой боковой панели.
- Нажмите кнопку «Создать панель управления».
- Это откроет перед нами чистый холст, на котором мы сможем работать.

Теперь панели мониторинга работают на основе данных, получаемых из SQL-запросов. Поэтому, прежде чем добавлять текст и графику на холст, нам необходимо сначала создать несколько метрик, которые будут переменными для ввода данных в карточки панели мониторинга и графику.
Итак, нажмите кнопку «+Создать из SQL» , чтобы запустить метрику. Дайте ей имя. Например, Location, чтобы получить последнее полученное название города, я должен использовать следующий запрос.
— Получить последнее полученное название города SELECT city FROM pipeline_weather.silver.weather ORDER BY timestamp DESC LIMIT 1
И нам нужно создать по одному SQL-запросу для каждой метрики. Все они доступны в репозитории GitHub [ ].
Далее мы переходим на вкладку «Панель управления» и начинаем перетаскивать элементы на холст.

После нажатия на «Текст» вы можете вставить прямоугольник на холст и отредактировать текст. При нажатии на графический элемент вставляется место для графического изображения, и открывается меню справа для выбора переменных и настроек.

Хорошо. После добавления всех элементов панель управления будет выглядеть так.

Как здорово! На этом наш проект завершается.
Прежде чем уйти
Этот проект легко можно повторить примерно за час, в зависимости от вашего опыта работы с экосистемой Databricks. Несмотря на быстроту разработки, он позволит вам применить множество ключевых инженерных навыков:
- Архитектурное проектирование : Вы научитесь создавать современное пространство для отдыха на озере с нуля.
- Бесшовная интеграция данных : Вы обеспечите связь между внешними веб-API и платформой Databricks для приема данных в режиме реального времени.
- Чистый, модульный код : Мы выходим за рамки простых скриптов, используя классы и функции Python для организации и удобства сопровождения кода.
- Автоматизация и оркестровка : Вы получите практический опыт планирования задач, чтобы обеспечить надежную работу вашего проекта в автоматическом режиме.
- Предоставление реальной ценности : цель состоит не просто в передаче данных, а в предоставлении ценности. Преобразуя необработанные метеорологические показатели в практические рекомендации по выбору одежды с помощью ИИ, мы превращаем «холодные данные» в полезный сервис для конечного пользователя.
Если вам понравился этот материал, мои контактные данные и дополнительную информацию обо мне вы найдете на моем сайте.
https://gustavorsantos.me
Репозиторий GitHub
Здесь находится репозиторий этого проекта.
https://github.com/gurezende/Databricks-Weather-Pipeline
Ссылки
[1. API OpenWeatherMap] (https://openweathermap.org/)
[2. Открытая платформа ИИ] (https://platform.openai.com/)
[3. Бесплатная версия Databricks] (https://www.databricks.com/learn/free-edition)
[4. Репозиторий GitHub] (https://github.com/gurezende/Databricks-Weather-Pipeline)
Источник: towardsdatascience.com



























