Сегодня я покажу, как проверить доступность веб-сайта с помощью http-хука в Apache AirFlow и отправить результаты проверки в Телеграм-бот.
Еще раз про хуки и соединения Apache AirFlow
Доступность системы является ключевым свойством информационной безопасности. Проверить, что веб-сервис доступен, можно по статусу HTTP-ответа на GET-запрос. Чтобы делать такую проверку периодически, т.е. по расписанию, можно использовать Apache AirFlow. Этот пакетный ETL-оркестратор отлично подходит для подобных сценариев. Для синхронного взаимодействия с HTTP-серверами в AirFlow есть класс HttpHook, а для асинхронного – HttpAsyncHook. Вообще хук (hook) в AirFlow — это высокоуровневый интерфейс к внешней платформе, который позволяет быстро общаться со сторонними системами без необходимости писать низкоуровневый код, который обращается к их API или использует специальные библиотеки. Хуки также являются строительными блоками, из которых строятся Операторы.
Хуки в AirFlow интегрируются с подключениями (Connection) для сбора учетных данных – набор параметров: имя пользователя, пароль и хост, а также тип внешней системы. Также подключение имеет уникальное имя, называемое conn_id. Все эти параметры можно настроить программно, что я показывала здесь, или в пользовательском интерфейсе фреймворка. Например, я хочу проверять доступность нашего сайта, т.е. именно он будет выступать в роли внешней системы. Для этого надо задать его адрес в настройках подключения, чтобы использовать методы http-хуков.
Поскольку теперь я запускаю docker-контейнер с Apache AirFlow в режиме standalone на своей локальной машине, чтобы выйти в веб-интерфейс, мне достаточно обратиться к localhost и порту, на котором развернуто это приложение. Добавить новое подключение к внешней системе надо в разделе Admin/Connections.
Внешней системой является открытый веб-сайт, а не база данных и не частное приложение, поэтому указывать порт, логин и пароль нет необходимости. Достаточно только задать имя подключения, которое будет использоваться в коде Python-оператора, тип соединения и хост.
Пример реализации: код задачи и DAG
Код моего Python-файла под названием hook_task с функцией обращения к внешней системе достаточно прост:
from airflow.providers.http.hooks.http import HttpHook from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type import logging def hook_function(**kwargs): api_url = "https://bigdataschool.ru/" hook = HttpHook(method="GET", http_conn_id="api_url") @retry( wait=wait_exponential(multiplier=1, max=10), stop=stop_after_attempt(3), retry=retry_if_exception_type(Exception) ) def run_with_retry(): try: response = hook.run(endpoint='', headers={}, extra_options={'data': {'url': api_url}}) # Возвращаем статус код ответа сервера return response.status_code except Exception as e: logging.error(f"Ошибка при выполнении запроса: {e}") # Возвращаем сообщение об ошибке вместе со статусом raise Exception(f'не удалось выполнить запрос, ошибка {e}') return run_with_retry()
Этот код использует декоратор @retry из модуля tenacity, чтобы повторять попытки выполнения функции при возникновении исключений. А модуль logging нужен для записи ошибок в лог. В функции hook_function определяется ранее созданное подключение, к которому выполняется HTTP-запрос, и создается экземпляр HttpHook для вызова метода GET. Вложенная функция run_with_retry аннотирована декоратором @retry, который настроен так, чтобы:
- использовать экспоненциальную задержку между попытками, начиная с 1 секунды и не более 10 секунд;
- остановить повторные попытки после 3 неудачных попыток;
- повторять попытки, если возникло исключение типа Exception;
Внутри функции run_with_retry() выполняется HTTP-запрос. Если возникает исключение, оно логируется и обрабатывается декоратором @retry. Если запрос выполнен успешно, возвращается статус код ответа сервера, иначе возвращается ошибка. Таким образом, функция hook_function() возвращает результат функции run_with_retry(), который представляет собой статус HTTP-ответа от сайта Школы Больших Данных https://bigdataschool.ru/ или ошибку, если все 3 попытки выполнения запроса не удались.
Функция hook_function() из hook_task.py импортируется в код DAG, который, помимо Python-оператора с вызовом этой функции также содержит Dummy-операторы в качестве стартовой и финишной точки конвейера, и Telegram-оператор для передачи результатов в месенджер. Расписание запуска установлено на первые 5 минут в начале каждого часа, что задано в параметре schedule_interval.
from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.dummy_operator import DummyOperator from airflow.providers.telegram.operators.telegram import TelegramOperator from datetime import datetime, timedelta from hook_task import hook_function default_args = { 'owner': 'airflow', 'start_date': datetime.now() - timedelta(days=1), 'retries': 1 } dag = DAG( dag_id='web-hook_DAG', default_args=default_args, schedule_interval="5 * * * *" ) start_task = DummyOperator(task_id='start_task', dag=dag) hook_task = PythonOperator( task_id='hook_task', provide_context=True, python_callable=hook_function, dag=dag ) telegram_token = 'my-TG-token' telegram_chat_id = 'my-chat-ID' send_notification_task = TelegramOperator( task_id='send_notification_task', token=telegram_token, chat_id=telegram_chat_id, text='Сайт был проверен {{ execution_date.strftime("%m/%d/%Y, %H:%M:%S") }}, статус ответа {{ ti.xcom_pull(task_ids="hook_task") }}', dag=dag ) end_task = DummyOperator(task_id='end_task', dag=dag) start_task >> hook_task >> send_notification_task >> end_task
Задача hook_task передает результаты выполнения, т.е. статус HTTP-вызова к проверяемому сайту или ошибку, задаче send_notification_task для рассылки уведомлений в Телеграм, с помощью механизма XCom. Просмотреть значение переданных переменных можно также в веб-интерфейсе фреймворка, в разделе Admin/XComs.
Финальный результат, т.е. ТГ-оповещение о проверке доступности сайта, появляется в Телеграм, согласно заданному расписанию.
Data Pipeline на Apache Airflow
Код курса
AIRF
Ближайшая дата курса
22 мая, 2024
Продолжительность
24 ак.часов
Стоимость обучения
72 000 руб.
Освоить на практике лучшие приемы использования этого оркестратора пакетных процессов в задачах реальной дата-инженерии вам помогут специализированные курсы в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Источники