Тестирование доступности веб-сайта с помощью http-хуков Apache AirFlow

Apache AirFlow HTTPHook, хуки AirFlow, обучение AirFlow, курсы AirFlow, курсы дата-инженеров, обучение инженеров данных, Школа Больших Данных Учебный Центр Коммерсант

Сегодня я покажу, как проверить доступность веб-сайта с помощью http-хука в Apache AirFlow и отправить результаты проверки в Телеграм-бот.

Еще раз про хуки и соединения Apache AirFlow

Доступность системы является ключевым свойством информационной безопасности. Проверить, что веб-сервис доступен, можно по статусу HTTP-ответа на GET-запрос. Чтобы делать такую проверку периодически, т.е. по расписанию, можно использовать Apache AirFlow. Этот пакетный ETL-оркестратор отлично подходит для подобных сценариев. Для синхронного взаимодействия с HTTP-серверами в AirFlow есть класс HttpHook, а для асинхронного – HttpAsyncHook. Вообще хук (hook) в AirFlow — это высокоуровневый интерфейс к внешней платформе, который позволяет быстро общаться со сторонними системами без необходимости писать низкоуровневый код, который обращается к их API или использует специальные библиотеки. Хуки также являются строительными блоками, из которых строятся Операторы.

Хуки в AirFlow интегрируются с подключениями (Connection) для сбора учетных данных – набор параметров: имя пользователя, пароль и хост, а также тип внешней системы. Также подключение имеет уникальное имя, называемое conn_id. Все эти параметры можно настроить программно, что я показывала здесь, или в пользовательском интерфейсе фреймворка. Например, я хочу проверять доступность нашего сайта, т.е. именно он будет выступать в роли внешней системы. Для этого надо задать его адрес в настройках подключения, чтобы использовать методы http-хуков.

Поскольку теперь я запускаю docker-контейнер с Apache AirFlow в режиме standalone на своей локальной машине, чтобы выйти в веб-интерфейс, мне достаточно обратиться к localhost и порту, на котором развернуто это приложение. Добавить новое подключение к внешней системе надо в разделе Admin/Connections.

GUI веб-сервера AirFlow
GUI веб-сервера AirFlow

Внешней системой является открытый веб-сайт, а не база данных и не частное приложение, поэтому указывать порт, логин и пароль нет необходимости. Достаточно только задать имя подключения, которое будет использоваться в коде Python-оператора, тип соединения и хост.

Создание нового подключения в AirFlow
Создание нового подключения в AirFlow

Пример реализации: код задачи и DAG

Код моего Python-файла под названием hook_task с функцией обращения к внешней системе достаточно прост:

from airflow.providers.http.hooks.http import HttpHook
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import logging

def hook_function(**kwargs):
    api_url = "https://bigdataschool.ru/"
    hook = HttpHook(method="GET", http_conn_id="api_url")

    @retry(
        wait=wait_exponential(multiplier=1, max=10), 
        stop=stop_after_attempt(3),
        retry=retry_if_exception_type(Exception)
    )
    def run_with_retry():
        try:
            response = hook.run(endpoint='', headers={}, extra_options={'data': {'url': api_url}})
            # Возвращаем статус код ответа сервера
            return response.status_code
        except Exception as e:
            logging.error(f"Ошибка при выполнении запроса: {e}")
            # Возвращаем сообщение об ошибке вместе со статусом
            raise Exception(f'не удалось выполнить запрос, ошибка {e}')

    return run_with_retry()

Этот код использует декоратор @retry из модуля tenacity, чтобы повторять попытки выполнения функции при возникновении исключений. А модуль logging нужен для записи ошибок в лог. В функции hook_function определяется ранее созданное подключение, к которому выполняется HTTP-запрос, и создается экземпляр HttpHook для вызова метода GET. Вложенная функция run_with_retry аннотирована декоратором @retry, который настроен так, чтобы:

  • использовать экспоненциальную задержку между попытками, начиная с 1 секунды и не более 10 секунд;
  • остановить повторные попытки после 3 неудачных попыток;
  • повторять попытки, если возникло исключение типа Exception;

Внутри функции run_with_retry() выполняется HTTP-запрос. Если возникает исключение, оно логируется и обрабатывается декоратором @retry. Если запрос выполнен успешно, возвращается статус код ответа сервера, иначе возвращается ошибка. Таким образом, функция hook_function() возвращает результат функции run_with_retry(), который представляет собой статус HTTP-ответа от сайта Школы Больших Данных https://bigdataschool.ru/ или ошибку, если все 3 попытки выполнения запроса не удались.

Функция hook_function() из hook_task.py импортируется в код DAG, который, помимо Python-оператора с вызовом этой функции также содержит Dummy-операторы в качестве стартовой и финишной точки конвейера, и Telegram-оператор для передачи результатов в месенджер. Расписание запуска установлено на первые 5 минут в начале каждого часа, что задано в параметре schedule_interval.

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.telegram.operators.telegram import TelegramOperator
from datetime import datetime, timedelta
from hook_task import hook_function

default_args = {
    'owner': 'airflow',
    'start_date': datetime.now() - timedelta(days=1),
    'retries': 1
}

dag = DAG(
    dag_id='web-hook_DAG',
    default_args=default_args,
    schedule_interval="5 * * * *"
)

start_task = DummyOperator(task_id='start_task', dag=dag)

hook_task = PythonOperator(
    task_id='hook_task',
    provide_context=True,
    python_callable=hook_function,
    dag=dag
)

telegram_token = 'my-TG-token'
telegram_chat_id = 'my-chat-ID'

send_notification_task = TelegramOperator(
    task_id='send_notification_task',
    token=telegram_token,
    chat_id=telegram_chat_id,
    text='Сайт был проверен {{ execution_date.strftime("%m/%d/%Y, %H:%M:%S") }}, статус ответа {{ ti.xcom_pull(task_ids="hook_task") }}',
    dag=dag
)

end_task = DummyOperator(task_id='end_task', dag=dag)

start_task >> hook_task >> send_notification_task >> end_task
Выполнение DAG AirFlow
Выполнение DAG AirFlow

Задача hook_task передает результаты выполнения, т.е. статус HTTP-вызова к проверяемому сайту или ошибку, задаче send_notification_task для рассылки уведомлений в Телеграм, с помощью механизма XCom. Просмотреть значение переданных переменных можно также в веб-интерфейсе фреймворка, в разделе Admin/XComs.

Значения переменных XCom
Значения переменных XCom

Финальный результат, т.е. ТГ-оповещение о проверке доступности сайта, появляется в Телеграм, согласно заданному расписанию.

Оповещение о доступности сайта в ТГ
Оповещение о доступности сайта в ТГ

Data Pipeline на Apache Airflow

Код курса
AIRF
Ближайшая дата курса
22 мая, 2024
Продолжительность
24 ак.часов
Стоимость обучения
72 000 руб.

Освоить на практике лучшие приемы использования этого оркестратора пакетных процессов в задачах реальной дата-инженерии вам помогут специализированные курсы в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://airflow.apache.org/docs/apache-airflow-providers-http/stable/_api/airflow/providers/http/hooks/http/index.html
  2. https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/connections.html
Поиск по сайту