В этой статье для разработчиков Data Flow, инженеров данных и администраторов Apache AirFlow рассмотрим, как организовать мониторинг этого batch-оркестратора через популярный корпоративный мессенджер Slack. Хотя по умолчанию Airflow имеет встроенную возможность отправлять оповещения по электронной почте, это не самый оперативный способ сообщить о критичной проблеме, к примеру, когда DAG с важными бизнес-задачами вышел из строя. Эффективнее получать оповещения там, где за ними следит вся команда дата-инженеров, например, в Slack.
Зачем следить за Airflow через Slack и как это организовать
Если Slack уже используется в качестве корпоративного мессенджера и средства командного взаимодействия, можно организовать в его каналах мониторинг конвейеров обработки данных на Apache AirFlow, независимо от их количества. Для этого требуется совсем немного пререквизитов:
- запущенный Airflow и базовые знания о том, как написать DAG с помощью PythonOperator;
- рабочая область Slack с правами администратора или хотя бы возможность запрашивать разрешения для создания веб-перехватчиков в мессенджере;
- выделенный канал мессенджера.
Теперь рассмотрим по шагам, как организовать мониторинг рабочих процессов в AirFlow через Slack, выполнив такую последовательность действий:
- создать приложение Slack и активировать веб-хуки или перехватчики;
- подключить Slack Webhook к Airflow;
- включить в DAG AirFlow код для отправки сообщений на канал Slack.
Каждый из этих шагов мы подробнее рассмотрим далее.
Начало работы в Slack API и веб-хуки
В Slack входящие веб-перехватчики или хуки (hooks) — это простой способ по желанию публиковать сообщения из приложений в канал мессенджера, используя определенный URL-адрес. Создание входящего веб-перехватчика дает уникальный URL-адрес, куда можно отправлять полезную нагрузку JSON с текстом сообщения и некоторыми параметрами. Это полезно, когда действия, которые могут привести к публикации сообщения, происходят в удаленном сервисе. Например, в системе отслеживания проблем нужно опубликовать сообщение в канале о создании или устранении ошибки. Для этого пригодится входящий веб-перехватчик, вызываемый из самой системы отслеживания проблем.
Создать новое приложение Slack можно прямо в его веб-API по адресу: https://api.slack.com/apps?new_app=1. Задав в соответствующих полях имя приложения и рабочее пространство Slack, можно создать приложение и перейти к веб-хукам. Их следует активировать, т.е. разрешить использование входящих перехватчиков в новом только что созданном приложении и добавить новый веб-перехватчик в рабочую область. Это может делать только пользователь с правами администратора выбранного рабочего пространства Slack. Поэтому следует сперва запросить разрешение на данную операцию. Также нужно указать канал, на который Airflow будет отправлять оповещения и сообщения. Далее следует скопировать отображаемый URL-адрес Webhook и перейти в пользовательский интерфейс администратора Airflow.
Подключение Slack Webhook к Airflow
В пользовательском интерфейсе администратора Airflow нужно создать новое подключение и с типом Slack Webhook. Идентификатор подключение (Conn Id) может быть назван любым именем, а в поле Хост следует вставить URL-адрес Webhook. Выбранный тип подключения Slack включает интеграцию AirFlow с этим мессенджером. При этом можно обеспечить безопасный доступ, выполнив аутентификацию в мессенджер с помощью токена Slack API. При указании подключения в переменной среды следует указывать его с использованием синтаксиса URI, но все компоненты URI должны быть закодированы в URL. Например,
export AIRFLOW_CONN_SLACK_DEFAULT=’slack://:token@’
Включение в DAG AirFlow кода для отправки сообщений на канал Slack
Напомним, направленный ациклический граф (DAG) в Airflow представляет собой цепочку задач, которая отражает их зависимости и отношения. DAG определены в Python-скриптах и помогают планировщику определить, какие задания выполнять. Есть два варианта мониторинга задач Airflow через Slack:
- оповещения о задачах, когда в мессенджер отправляются сообщение при сбое задачи;
- Slack Message Tasks, когда в мессенджер отправляются сообщения о ходе выполнения batch-конвейера. Это может быть в начале или в конце DAG, а также между входящими в него задачами.
Оба варианта используют SlackWebhookOperator, который обычно применяется для создания отчетов и предупреждений путем планирования входящих сообщений в каналы Slack при выполнении некоторого условия срабатывания. Этот оператор позволяет отправлять сообщения в мессенджер с помощью входящих веб-перехватчиков. Оператор принимает как токен веб-перехватчика Slack напрямую, так и соединение с его токеном. Если оба указаны, http_conn_id будет использоваться как base_url, а webhook_token будет использоваться как конечная точка, относительный путь URL-адреса. Каждый токен веб-перехватчика Slack можно предварительно настроить для использования определенного канала, имени пользователя и значка. Можно переопределить эти значения, задав следующие параметры:
- http_conn_id (str) — подключение, где в дополнительном поле указан токен Slack webhook;
- webhook_token (str) – токен веб-хука Slack;
- message (str) — сообщение, которое следует отправить в корпоративный мессенджер;
- attachments (list) — вложения для отправки в Slack в виде списка словарей;
- blocks (list) — блоки для отправки в мессенджер в виде списка словарей;
- channel (str) – канал мессенджера, куда будет отправляться сообщение;
- username (str) — имя пользователя для публикации сообщения в мессенджере;
- icon_emoji (str) — эмоджи в качестве значка для пользователя, публикующего сообщения;
- icon_url (str) — строка URL-адреса изображения значка вместо иконки по умолчанию;
- link_names (bool) — логическая переменная, которая показывает необходимость находить и связывать каналы и имена пользователей в сообщении;
- proxy (str) — прокси-сервер для вызова Slack webhook;
- extra_options (dict) – дополнительные параметры для http-хука
По умолчанию SlackWebhookOperator имеет следующие поля:
template_fields = [‘webhook_token’, ‘message’, ‘attachments’, ‘blocks’, ‘channel’, ‘username’, ‘proxy’, ‘extra_options’]
Код курса
ADH-AIR
Ближайшая дата курса
Продолжительность
ак.часов
Стоимость обучения
0 руб.
Больше деталей про администрирование и эксплуатацию Apache AirFlow для организации ETL/ELT-процессов в аналитике больших данных вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- https://medium.com/geekculture/airflow-monitoring-via-slack-c692cd6ee85a
- https://airflow.apache.org/docs/apache-airflow-providers-slack/stable/connections/slack.html
- https://api.slack.com/messaging/webhooks
- https://www.mikulskibartosz.name/send-cusomized-slack-notification-when-airflow-task-fails/
- https://medium.com/datareply/integrating-slack-alerts-in-airflow-c9dcd155105
- https://airflow.apache.org/docs/apache-airflow/1.10.12/_api/airflow/contrib/operators/slack_webhook_operator/index.html