Как оповестить дата-инженера о задержке и результате выполнения задачи или всего DAG пакетного конвейера обработки данных: варианты отправки уведомлений в Apache AirFlow и особенности их применения.
Варианты отправки уведомлений в Apache AirFlow
Даже когда конвейер обработки данных разработан и успешно протестирован, в ходе его эксплуатации в рабочей среде неизбежно возникают ошибки и внештатные ситуации, о которых надо знать дата-инженеру. Для такого информирования в популярном оркестраторе пакетных процессов Apache AirFlow есть механизм уведомлений. Вместо того, чтобы периодически проверять пользовательский интерфейс фреймворка, определяя статус DAG в визуальном режиме, можно воспользоваться следующими вариантами уведомлений:
- письма по электронной почте: большинство операторов Airflow имеют параметры для отправки email-сообщений. Это подходит для таких сценариев в производственных конвейерах, где сбои или повторные попытки выполнения задачи требуют немедленного внимания дата-инженера.
- обратные вызовы Airflow _callback, параметры которого есть как на уровне задачи, так и на уровне DAG. Можно передать любой вызываемый объект или встроенный уведомитель (notifier) этим параметрам, и фреймворк запустит их в случае определенных событий, таких как сбой задачи. Этот вариант гибко настраивается и часто используется в производственной среде для определения поведения конвейера обработки данных в случае случаев сбоев или успешного выполнения задач.
- уведомитель Airflow – пользовательский класс для обратных вызовов, который можно использовать повторно и стандартизировать. Уведомители могут быть предоставлены параметрам обратного вызова, чтобы определить условие отправки уведомления, т.е. какая задача или состояние DAG должны вызывать их выполнение. Чаще всего это применяется для стандартизации действий при сбоях задач в нескольких экземплярах Airflow.
- соглашения об уровне обслуживания Airflow (SLA, Service Level Agreement), которое определяет время ожидания для завершения определенной задачи. Если SLA пропущено, выполняется вызываемый или уведомляющий параметр sla_miss_callback. При настроенном SMTP-соединении, также будет отправлено электронное письмо. Поскольку несоблюдение SLA не останавливает выполнение задачи, этот тип уведомления используется чаще всего для информирования, когда необходимо сообщить ответственному дата-инженеру, что какая-то задача выполняется дольше, чем нужно. Задачи, превышающие SLA, не отменяются, а продолжают выполняться до своего завершения. Если нужно отменить задачу после достижения определенного времени выполнения, следует установить тайм-ауты . Чтобы установить SLA для задачи, надо передать объект datetime.timedelta в параметр задачи или оператора sla. Также можно сделать обратный вызов sla_miss_callback, который будет вызван при несоблюдении SLA, чтобы запустить собственную логику обработки этой задержки. Можно полностью отключить проверку SLA в конфигурации Airflow, установив свойство check_slas равным False.
Особенности использования
Для отправки уведомлений по email надо внести определенные изменения в конфигурацию фреймворка. В частности, настроить SMTP-соединение, чтобы использовать параметры задачи email, email_on_failure и email_on_retry. Чтобы разрешить Airflow отправлять электронные письма, надо определить параметры SMTP-сервера и учетной записи в конфигурационном файле airflow.cfg. Можно также задать эти значения с помощью переменных среды. В этом случае всем параметрам предшествует префикс AIRFLOW__SMTP__. Например, smtp_host можно указать, установив переменную AIRFLOW__SMTP__SMTP_HOST. По умолчанию уведомления по электронной почте отправляются в стандартном формате, который определен в методах класса TaskInstance: email_alert() и get_email_subject_content(). Можно настроить это содержимое, установив в конфигурационном файле airflow.cfg переменные subject_template и/или html_content_template в пути к файлам шаблонов Jinja для темы и содержимого соответственно. Также можно использовать функции обратного вызова Airflow для отправки уведомлений, связанных с событиями выполнения задачи, например, предоставить функцию электронной почты параметру on_success_callback. Также можно использовать другие обратные вызовы или определить действия, которые необходимо выполнить в зависимости от различных состояний DAG или задач с помощью параметра *_callback:
- on_success_callback – вызывается при успешном выполнении задачи или группы DAG;
- on_failure_callback – вызывается при сбое задачи или группы DAG;
- on_skipped_callback – вызывается при пропуске задачи. Добавленный в Airflow 2.9, этот обратный вызов существует только на уровне задачи и вызывается только при возникновении исключения AiflowSkipException, а не при пропуске задачи по другим причинам, например, из-за правила срабатывания.
- on_execute_callback – вызывается непосредственно перед началом выполнения задачи и существует только на уровне задачи;
- on_retry_callback – вызывается при повторной попытке задачи и существует только на уровне задачи;
- sla_miss_callback – вызывается, когда задача или DAG не соответствует определенному соглашению об уровне обслуживания. Этот обратный вызов определяется на уровне DAG для DAG с определенными SLA и будет применяться к каждой задаче.
Можно предоставить любой вызываемый Python-объект параметрам *_callback или уведомителям Airflow. Для выполнения нескольких функций можно предоставить несколько элементов обратного вызова одному и тому же параметру обратного вызова в списке. Уведомители можно передавать в соответствующий *_callback параметр DAG в зависимости от того, какое событие должно отправлять уведомление. Уведомители определяются в пакетах поставщиков или импортируются из папки include и могут использоваться в любом DAG. Пакеты провайдеров поставляют готовые уведомители, например, SlackNotifier и пр.
Также дата-инженер может написать собственный уведомитель, унаследовав его от класса BaseNotifier и определив действие, которое следует выполнить в случае использования уведомителя в его методе notify(). Чтобы определить пользовательское уведомление на уровне DAG, надо задать параметры соответствующего обратного вызова *_callback в экземпляре DAG. Уведомления уровня DAG будут запускать функции обратного вызова на основе состояния всего выполнения DAG. Чтобы применить обратный вызов на уровне задачи к каждой задаче DAG, надо передать функцию обратного вызова параметру default_args, перечислив необходимые элементы в этом словаре аргументов по умолчанию. Обратные вызовы, определенные на уровне отдельной задачи, переопределят обратные вызовы, переданные через default_args.
Соглашения об уровне обслуживания Airflow имеют некоторые уникальные особенности, которые следует учитывать перед их внедрением:
- SLA относятся к дате выполнения DAG, а не ко времени начала задачи;
- SLA будут оцениваться только при запланированных запусках DAG, но не отслеживаются, когда DAG запускается вручную;
- SLA можно установить на уровне задачи, если для каждой задачи требуется разное SLA;
- Для SLA невозможно отключить оповещения по электронной почте, если есть настроенный SMTP-сервер в Airflow и заданы параметры отправки электронного письма, оно будет отправлено на эти адреса для каждого запуска DAG с пропущенными SLA.
Большинство уведомлений можно задать на уровне как DAG, так и задачи. Установка параметра в словаре DAG default_args применит его ко всем задачам в DAG.
А чтобы отслеживать выполнение пользовательского кода на основе событий, происходящих в любом месте среды Airflow, например, при обновлении любого набора данных или сбое любого экземпляра задачи, можно использовать слушатели (listeners). API Listener предназначен для вызова во всех DAG и всех операторах: нельзя прослушивать события, сгенерированные определенными DAG. Журналы и вызовы print() будут обрабатываться как часть слушателей. Чтобы создать свой слушатель, дата-инженеру следует импортировать пакет airflow.listeners.hookimpl и реализовать hookimpls для событий, которые должны генерировать уведомления. Пользовательская реализация слушателя должна принимать те же именованные параметры, что определены в спецификации фреймворка hookspec. Иначе , Pluggy, ядро управления плагинами и вызова хуков в AirFlow выдаст ошибку при попытке применить плагин. Чтобы использовать слушатель в развертывании Airflow, его надо включить как часть плагина.
Как использовать рассмотренные варианты отправки уведомлений на практике, рассмотрим в следующий раз, а в заключение отметим, что их многообразие в Apache Airflow еще раз подтверждает надежность и гибкость этого ETL-оркестратора.
Освойте администрирование и эксплуатацию Apache AirFlow для оркестрации пакетных процессов в задачах реальной дата-инженерии на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Data Pipeline на Apache AirFlow и Apache Hadoop
- AIRFLOW с использованием Yandex Managed Service for Apache Airflow™
Источники