Тестирование доступности веб-сайта с помощью http-хуков Apache AirFlow

Apache AirFlow HTTPHook, хуки AirFlow, обучение AirFlow, курсы AirFlow, курсы дата-инженеров, обучение инженеров данных, Школа Больших Данных Учебный Центр Коммерсант

Сегодня я покажу, как проверить доступность веб-сайта с помощью http-хука в Apache AirFlow и отправить результаты проверки в Телеграм-бот.

Еще раз про хуки и соединения Apache AirFlow

Доступность системы является ключевым свойством информационной безопасности. Проверить, что веб-сервис доступен, можно по статусу HTTP-ответа на GET-запрос. Чтобы делать такую проверку периодически, т.е. по расписанию, можно использовать Apache AirFlow. Этот пакетный ETL-оркестратор отлично подходит для подобных сценариев. Для синхронного взаимодействия с HTTP-серверами в AirFlow есть класс HttpHook, а для асинхронного – HttpAsyncHook. Вообще хук (hook) в AirFlow — это высокоуровневый интерфейс к внешней платформе, который позволяет быстро общаться со сторонними системами без необходимости писать низкоуровневый код, который обращается к их API или использует специальные библиотеки. Хуки также являются строительными блоками, из которых строятся Операторы.

Хуки в AirFlow интегрируются с подключениями (Connection) для сбора учетных данных – набор параметров: имя пользователя, пароль и хост, а также тип внешней системы. Также подключение имеет уникальное имя, называемое conn_id. Все эти параметры можно настроить программно, что я показывала здесь, или в пользовательском интерфейсе фреймворка. Например, я хочу проверять доступность нашего сайта, т.е. именно он будет выступать в роли внешней системы. Для этого надо задать его адрес в настройках подключения, чтобы использовать методы http-хуков.

Поскольку теперь я запускаю docker-контейнер с Apache AirFlow в режиме standalone на своей локальной машине, чтобы выйти в веб-интерфейс, мне достаточно обратиться к localhost и порту, на котором развернуто это приложение. Добавить новое подключение к внешней системе надо в разделе Admin/Connections.

GUI веб-сервера AirFlow
GUI веб-сервера AirFlow

Внешней системой является открытый веб-сайт, а не база данных и не частное приложение, поэтому указывать порт, логин и пароль нет необходимости. Достаточно только задать имя подключения, которое будет использоваться в коде Python-оператора, тип соединения и хост.

Создание нового подключения в AirFlow
Создание нового подключения в AirFlow

Пример реализации: код задачи и DAG

Код моего Python-файла под названием hook_task с функцией обращения к внешней системе достаточно прост:

from airflow.providers.http.hooks.http import HttpHook
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import logging

def hook_function(**kwargs):
    api_url = "https://bigdataschool.ru/"
    hook = HttpHook(method="GET", http_conn_id="api_url")

    @retry(
        wait=wait_exponential(multiplier=1, max=10), 
        stop=stop_after_attempt(3),
        retry=retry_if_exception_type(Exception)
    )
    def run_with_retry():
        try:
            response = hook.run(endpoint='', headers={}, extra_options={'data': {'url': api_url}})
            # Возвращаем статус код ответа сервера
            return response.status_code
        except Exception as e:
            logging.error(f"Ошибка при выполнении запроса: {e}")
            # Возвращаем сообщение об ошибке вместе со статусом
            raise Exception(f'не удалось выполнить запрос, ошибка {e}')

    return run_with_retry()

Этот код использует декоратор @retry из модуля tenacity, чтобы повторять попытки выполнения функции при возникновении исключений. А модуль logging нужен для записи ошибок в лог. В функции hook_function определяется ранее созданное подключение, к которому выполняется HTTP-запрос, и создается экземпляр HttpHook для вызова метода GET. Вложенная функция run_with_retry аннотирована декоратором @retry, который настроен так, чтобы:

  • использовать экспоненциальную задержку между попытками, начиная с 1 секунды и не более 10 секунд;
  • остановить повторные попытки после 3 неудачных попыток;
  • повторять попытки, если возникло исключение типа Exception;

Внутри функции run_with_retry() выполняется HTTP-запрос. Если возникает исключение, оно логируется и обрабатывается декоратором @retry. Если запрос выполнен успешно, возвращается статус код ответа сервера, иначе возвращается ошибка. Таким образом, функция hook_function() возвращает результат функции run_with_retry(), который представляет собой статус HTTP-ответа от сайта Школы Больших Данных https://bigdataschool.ru/ или ошибку, если все 3 попытки выполнения запроса не удались.

Функция hook_function() из hook_task.py импортируется в код DAG, который, помимо Python-оператора с вызовом этой функции также содержит Dummy-операторы в качестве стартовой и финишной точки конвейера, и Telegram-оператор для передачи результатов в месенджер. Расписание запуска установлено на первые 5 минут в начале каждого часа, что задано в параметре schedule_interval.

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.telegram.operators.telegram import TelegramOperator
from datetime import datetime, timedelta
from hook_task import hook_function

default_args = {
    'owner': 'airflow',
    'start_date': datetime.now() - timedelta(days=1),
    'retries': 1
}

dag = DAG(
    dag_id='web-hook_DAG',
    default_args=default_args,
    schedule_interval="5 * * * *"
)

start_task = DummyOperator(task_id='start_task', dag=dag)

hook_task = PythonOperator(
    task_id='hook_task',
    provide_context=True,
    python_callable=hook_function,
    dag=dag
)

telegram_token = 'my-TG-token'
telegram_chat_id = 'my-chat-ID'

send_notification_task = TelegramOperator(
    task_id='send_notification_task',
    token=telegram_token,
    chat_id=telegram_chat_id,
    text='Сайт был проверен {{ execution_date.strftime("%m/%d/%Y, %H:%M:%S") }}, статус ответа {{ ti.xcom_pull(task_ids="hook_task") }}',
    dag=dag
)

end_task = DummyOperator(task_id='end_task', dag=dag)

start_task >> hook_task >> send_notification_task >> end_task
Выполнение DAG AirFlow
Выполнение DAG AirFlow

Задача hook_task передает результаты выполнения, т.е. статус HTTP-вызова к проверяемому сайту или ошибку, задаче send_notification_task для рассылки уведомлений в Телеграм, с помощью механизма XCom. Просмотреть значение переданных переменных можно также в веб-интерфейсе фреймворка, в разделе Admin/XComs.

Значения переменных XCom
Значения переменных XCom

Финальный результат, т.е. ТГ-оповещение о проверке доступности сайта, появляется в Телеграм, согласно заданному расписанию.

Оповещение о доступности сайта в ТГ
Оповещение о доступности сайта в ТГ

Чтобы выполнять представленный DAG более эффективно, можно использовать асинхронные операторы, которые экономят ресурсы при обращении к внешним системам, освобождая слоты для других задач на время ожидания. Читайте об этом в нашей новой статье про асинхронные операторы в Apache AirFlow. А здесь вы узнаете, как можно создать DAG без разработки на Python, просто описан его конфигурацию в YAML-файле.

Освоить на практике лучшие приемы использования этого оркестратора пакетных процессов в задачах реальной дата-инженерии вам помогут специализированные курсы в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://airflow.apache.org/docs/apache-airflow-providers-http/stable/_api/airflow/providers/http/hooks/http/index.html
  2. https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/connections.html
Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту