Сегодня я покажу, как проверить доступность веб-сайта с помощью http-хука в Apache AirFlow и отправить результаты проверки в Телеграм-бот.
Еще раз про хуки и соединения Apache AirFlow
Доступность системы является ключевым свойством информационной безопасности. Проверить, что веб-сервис доступен, можно по статусу HTTP-ответа на GET-запрос. Чтобы делать такую проверку периодически, т.е. по расписанию, можно использовать Apache AirFlow. Этот пакетный ETL-оркестратор отлично подходит для подобных сценариев. Для синхронного взаимодействия с HTTP-серверами в AirFlow есть класс HttpHook, а для асинхронного – HttpAsyncHook. Вообще хук (hook) в AirFlow — это высокоуровневый интерфейс к внешней платформе, который позволяет быстро общаться со сторонними системами без необходимости писать низкоуровневый код, который обращается к их API или использует специальные библиотеки. Хуки также являются строительными блоками, из которых строятся Операторы.
Хуки в AirFlow интегрируются с подключениями (Connection) для сбора учетных данных – набор параметров: имя пользователя, пароль и хост, а также тип внешней системы. Также подключение имеет уникальное имя, называемое conn_id. Все эти параметры можно настроить программно, что я показывала здесь, или в пользовательском интерфейсе фреймворка. Например, я хочу проверять доступность нашего сайта, т.е. именно он будет выступать в роли внешней системы. Для этого надо задать его адрес в настройках подключения, чтобы использовать методы http-хуков.
Поскольку теперь я запускаю docker-контейнер с Apache AirFlow в режиме standalone на своей локальной машине, чтобы выйти в веб-интерфейс, мне достаточно обратиться к localhost и порту, на котором развернуто это приложение. Добавить новое подключение к внешней системе надо в разделе Admin/Connections.
Внешней системой является открытый веб-сайт, а не база данных и не частное приложение, поэтому указывать порт, логин и пароль нет необходимости. Достаточно только задать имя подключения, которое будет использоваться в коде Python-оператора, тип соединения и хост.
Пример реализации: код задачи и DAG
Код моего Python-файла под названием hook_task с функцией обращения к внешней системе достаточно прост:
from airflow.providers.http.hooks.http import HttpHook from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type import logging def hook_function(**kwargs): api_url = "https://bigdataschool.ru/" hook = HttpHook(method="GET", http_conn_id="api_url") @retry( wait=wait_exponential(multiplier=1, max=10), stop=stop_after_attempt(3), retry=retry_if_exception_type(Exception) ) def run_with_retry(): try: response = hook.run(endpoint='', headers={}, extra_options={'data': {'url': api_url}}) # Возвращаем статус код ответа сервера return response.status_code except Exception as e: logging.error(f"Ошибка при выполнении запроса: {e}") # Возвращаем сообщение об ошибке вместе со статусом raise Exception(f'не удалось выполнить запрос, ошибка {e}') return run_with_retry()
Этот код использует декоратор @retry из модуля tenacity, чтобы повторять попытки выполнения функции при возникновении исключений. А модуль logging нужен для записи ошибок в лог. В функции hook_function определяется ранее созданное подключение, к которому выполняется HTTP-запрос, и создается экземпляр HttpHook для вызова метода GET. Вложенная функция run_with_retry аннотирована декоратором @retry, который настроен так, чтобы:
- использовать экспоненциальную задержку между попытками, начиная с 1 секунды и не более 10 секунд;
- остановить повторные попытки после 3 неудачных попыток;
- повторять попытки, если возникло исключение типа Exception;
Внутри функции run_with_retry() выполняется HTTP-запрос. Если возникает исключение, оно логируется и обрабатывается декоратором @retry. Если запрос выполнен успешно, возвращается статус код ответа сервера, иначе возвращается ошибка. Таким образом, функция hook_function() возвращает результат функции run_with_retry(), который представляет собой статус HTTP-ответа от сайта Школы Больших Данных https://bigdataschool.ru/ или ошибку, если все 3 попытки выполнения запроса не удались.
Функция hook_function() из hook_task.py импортируется в код DAG, который, помимо Python-оператора с вызовом этой функции также содержит Dummy-операторы в качестве стартовой и финишной точки конвейера, и Telegram-оператор для передачи результатов в месенджер. Расписание запуска установлено на первые 5 минут в начале каждого часа, что задано в параметре schedule_interval.
from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.dummy_operator import DummyOperator from airflow.providers.telegram.operators.telegram import TelegramOperator from datetime import datetime, timedelta from hook_task import hook_function default_args = { 'owner': 'airflow', 'start_date': datetime.now() - timedelta(days=1), 'retries': 1 } dag = DAG( dag_id='web-hook_DAG', default_args=default_args, schedule_interval="5 * * * *" ) start_task = DummyOperator(task_id='start_task', dag=dag) hook_task = PythonOperator( task_id='hook_task', provide_context=True, python_callable=hook_function, dag=dag ) telegram_token = 'my-TG-token' telegram_chat_id = 'my-chat-ID' send_notification_task = TelegramOperator( task_id='send_notification_task', token=telegram_token, chat_id=telegram_chat_id, text='Сайт был проверен {{ execution_date.strftime("%m/%d/%Y, %H:%M:%S") }}, статус ответа {{ ti.xcom_pull(task_ids="hook_task") }}', dag=dag ) end_task = DummyOperator(task_id='end_task', dag=dag) start_task >> hook_task >> send_notification_task >> end_task
Задача hook_task передает результаты выполнения, т.е. статус HTTP-вызова к проверяемому сайту или ошибку, задаче send_notification_task для рассылки уведомлений в Телеграм, с помощью механизма XCom. Просмотреть значение переданных переменных можно также в веб-интерфейсе фреймворка, в разделе Admin/XComs.
Финальный результат, т.е. ТГ-оповещение о проверке доступности сайта, появляется в Телеграм, согласно заданному расписанию.
Чтобы выполнять представленный DAG более эффективно, можно использовать асинхронные операторы, которые экономят ресурсы при обращении к внешним системам, освобождая слоты для других задач на время ожидания. Читайте об этом в нашей новой статье про асинхронные операторы в Apache AirFlow. А здесь вы узнаете, как можно создать DAG без разработки на Python, просто описан его конфигурацию в YAML-файле.
Освоить на практике лучшие приемы использования этого оркестратора пакетных процессов в задачах реальной дата-инженерии вам помогут специализированные курсы в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Data Pipeline на Apache AirFlow и Apache Hadoop
- AIRFLOW с использованием Yandex Managed Service for Apache Airflow™
Источники