Тестирование доступности веб-сайта с помощью http-хуков Apache AirFlow

Тестирование доступности веб-сайта с помощью http-хуков Apache AirFlow

    Сегодня я покажу, как проверить доступность веб-сайта с помощью http-хука в Apache AirFlow и отправить результаты проверки в Телеграм-бот.

    Еще раз про хуки и соединения Apache AirFlow

    Доступность системы является ключевым свойством информационной безопасности. Проверить, что веб-сервис доступен, можно по статусу HTTP-ответа на GET-запрос. Чтобы делать такую проверку периодически, т.е. по расписанию, можно использовать Apache AirFlow. Этот пакетный ETL-оркестратор отлично подходит для подобных сценариев. Для синхронного взаимодействия с HTTP-серверами в AirFlow есть класс HttpHook, а для асинхронного – HttpAsyncHook. Вообще хук (hook) в AirFlow — это высокоуровневый интерфейс к внешней платформе, который позволяет быстро общаться со сторонними системами без необходимости писать низкоуровневый код, который обращается к их API или использует специальные библиотеки. Хуки также являются строительными блоками, из которых строятся Операторы.

    Хуки в AirFlow интегрируются с подключениями (Connection) для сбора учетных данных – набор параметров: имя пользователя, пароль и хост, а также тип внешней системы. Также подключение имеет уникальное имя, называемое conn_id. Все эти параметры можно настроить программно, что я показывала здесь, или в пользовательском интерфейсе фреймворка. Например, я хочу проверять доступность нашего сайта, т.е. именно он будет выступать в роли внешней системы. Для этого надо задать его адрес в настройках подключения, чтобы использовать методы http-хуков.

    Поскольку теперь я запускаю docker-контейнер с Apache AirFlow в режиме standalone на своей локальной машине, чтобы выйти в веб-интерфейс, мне достаточно обратиться к localhost и порту, на котором развернуто это приложение. Добавить новое подключение к внешней системе надо в разделе Admin/Connections.

    GUI веб-сервера AirFlow
    GUI веб-сервера AirFlow

    Внешней системой является открытый веб-сайт, а не база данных и не частное приложение, поэтому указывать порт, логин и пароль нет необходимости. Достаточно только задать имя подключения, которое будет использоваться в коде Python-оператора, тип соединения и хост.

    Создание нового подключения в AirFlow
    Создание нового подключения в AirFlow

    Пример реализации: код задачи и DAG

    Код моего Python-файла под названием hook_task с функцией обращения к внешней системе достаточно прост:

    from airflow.providers.http.hooks.http import HttpHook
    from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
    import logging
    
    def hook_function(**kwargs):
        api_url = "https://bigdataschool.ru/"
        hook = HttpHook(method="GET", http_conn_id="api_url")
    
        @retry(
            wait=wait_exponential(multiplier=1, max=10), 
            stop=stop_after_attempt(3),
            retry=retry_if_exception_type(Exception)
        )
        def run_with_retry():
            try:
                response = hook.run(endpoint='', headers={}, extra_options={'data': {'url': api_url}})
                # Возвращаем статус код ответа сервера
                return response.status_code
            except Exception as e:
                logging.error(f"Ошибка при выполнении запроса: {e}")
                # Возвращаем сообщение об ошибке вместе со статусом
                raise Exception(f'не удалось выполнить запрос, ошибка {e}')
    
        return run_with_retry()

    Этот код использует декоратор @retry из модуля tenacity, чтобы повторять попытки выполнения функции при возникновении исключений. А модуль logging нужен для записи ошибок в лог. В функции hook_function определяется ранее созданное подключение, к которому выполняется HTTP-запрос, и создается экземпляр HttpHook для вызова метода GET. Вложенная функция run_with_retry аннотирована декоратором @retry, который настроен так, чтобы:

    • использовать экспоненциальную задержку между попытками, начиная с 1 секунды и не более 10 секунд;
    • остановить повторные попытки после 3 неудачных попыток;
    • повторять попытки, если возникло исключение типа Exception;

    Внутри функции run_with_retry() выполняется HTTP-запрос. Если возникает исключение, оно логируется и обрабатывается декоратором @retry. Если запрос выполнен успешно, возвращается статус код ответа сервера, иначе возвращается ошибка. Таким образом, функция hook_function() возвращает результат функции run_with_retry(), который представляет собой статус HTTP-ответа от сайта Школы Больших Данных https://bigdataschool.ru/ или ошибку, если все 3 попытки выполнения запроса не удались.

    Функция hook_function() из hook_task.py импортируется в код DAG, который, помимо Python-оператора с вызовом этой функции также содержит Dummy-операторы в качестве стартовой и финишной точки конвейера, и Telegram-оператор для передачи результатов в месенджер. Расписание запуска установлено на первые 5 минут в начале каждого часа, что задано в параметре schedule_interval.

    from airflow import DAG
    from airflow.operators.python import PythonOperator
    from airflow.operators.dummy_operator import DummyOperator
    from airflow.providers.telegram.operators.telegram import TelegramOperator
    from datetime import datetime, timedelta
    from hook_task import hook_function
    
    default_args = {
        'owner': 'airflow',
        'start_date': datetime.now() - timedelta(days=1),
        'retries': 1
    }
    
    dag = DAG(
        dag_id='web-hook_DAG',
        default_args=default_args,
        schedule_interval="5 * * * *"
    )
    
    start_task = DummyOperator(task_id='start_task', dag=dag)
    
    hook_task = PythonOperator(
        task_id='hook_task',
        provide_context=True,
        python_callable=hook_function,
        dag=dag
    )
    
    telegram_token = 'my-TG-token'
    telegram_chat_id = 'my-chat-ID'
    
    send_notification_task = TelegramOperator(
        task_id='send_notification_task',
        token=telegram_token,
        chat_id=telegram_chat_id,
        text='Сайт был проверен {{ execution_date.strftime("%m/%d/%Y, %H:%M:%S") }}, статус ответа {{ ti.xcom_pull(task_ids="hook_task") }}',
        dag=dag
    )
    
    end_task = DummyOperator(task_id='end_task', dag=dag)
    
    start_task >> hook_task >> send_notification_task >> end_task
    Выполнение DAG AirFlow
    Выполнение DAG AirFlow

    Задача hook_task передает результаты выполнения, т.е. статус HTTP-вызова к проверяемому сайту или ошибку, задаче send_notification_task для рассылки уведомлений в Телеграм, с помощью механизма XCom. Просмотреть значение переданных переменных можно также в веб-интерфейсе фреймворка, в разделе Admin/XComs.

    Значения переменных XCom
    Значения переменных XCom

    Финальный результат, т.е. ТГ-оповещение о проверке доступности сайта, появляется в Телеграм, согласно заданному расписанию.

    Оповещение о доступности сайта в ТГ
    Оповещение о доступности сайта в ТГ

    Чтобы выполнять представленный DAG более эффективно, можно использовать асинхронные операторы, которые экономят ресурсы при обращении к внешним системам, освобождая слоты для других задач на время ожидания. Читайте об этом в нашей новой статье про асинхронные операторы в Apache AirFlow. А здесь вы узнаете, как можно создать DAG без разработки на Python, просто описан его конфигурацию в YAML-файле.

    Освоить на практике лучшие приемы использования этого оркестратора пакетных процессов в задачах реальной дата-инженерии вам помогут специализированные курсы в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

    Источники

    1. https://airflow.apache.org/docs/apache-airflow-providers-http/stable/_api/airflow/providers/http/hooks/http/index.html
    2. https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/connections.html
    [elementor-template id="13619"]