Отладка конвейеров Apache AirFlow с on_failure_callback()

отладка и мониторинг DAG задачи Airflow, обратные вызовы Airflow, Airflow для дата-инженера, конвейеры обработки данных Airflow, регистрация и мониторинг событий Airflow, on_failure_callback Airflow, курсы Airflow, Airflow для дата-инженеров, Школа Больших Данных Учебный Центр Коммерсант

Как использовать функции обратного вызова для отладки конвейера обработки данных в Apache AirFlow, а также отправки оповещений об ошибках. Полезные примеры регистрации и мониторинга сбоев на уровне задачи и всего DAG с on_failure_callback().

Польза обратных вызовов Apache AirFlow на примере on_failure_callback

По мере роста и усложнения конвейеров данных, построенных с помощью Apache AirFlow, возникает потребность в мониторинге и оповещении о событиях ошибок. Ошибки можно обрабатывать на уровне DAG с использованием обратных вызовов, во всех задачах путем определения аргументов по умолчанию (default_args) и в отдельно взятых задачах с использованием обратных вызовов на уровне задачи. Из этих способов более предпочтителен первый, поскольку возможность писать собственный код в обратных вызовах позволяет интегрироваться со сторонними инструментами. Обратный вызов в AirFlow — это простая функция Python , которая вызывается на основе изменения состояния задачи, о чем  мы писали здесь. Обратные вызовы можно использовать, чтобы настроить оповещения об ошибках конвейера обработки данных по электронной почте или в месендежере. Для этого используется on_failure_callback() – один из обратных вызов AirFlow, которых всего 5: on_execute_callback(), on_success_callback(), on_retry_callback(), on_failure_callback() и sla_miss_callback().

Обратный вызов on_failure_callback() вызывается, когда состояние задачи в DAG изменится на failed. Функции обратного вызова вызываются только тогда, когда состояние задачи изменяется из-за выполнения рабочим процессом. Поэтому изменения задачи, сделанные через CLI или GUI-интерфейсы, не выполняют функции обратного вызова.

Чтобы использовать это для отладки конвейера обработки данных, сперва следует определить функцию для обработки сбоев в отдельном python-файле. Далее эта функция будет использоваться при обработке обработки обратного вызова on_failure_callback, в аргументах по умолчанию (default_args), которые применяются ко всем задачам в DAG. Это означает, что любая задача, которая завершается сбоем в этой DAG, автоматически вызывает функцию Python, определенную в обратном вызове on_failure_callback().

Если переопределить эту функцию обратного вызова, можно обрабатывать выборочные сбои для задач, используя параметр параметр on_failure_callback():

@task(on_failure_callback=create_incident)
    def check_variable():
        credentials = Variable.get('myvar')

Аналогичным образом, с помощью параметров можно настроить отправку оповещений об ошибках, указав в default_args параметр email и email_on_failure. Разумеется, перед этим следует настроить конфигурации почтового сервера в AirFlow.

Обратные вызовы пригодятся не только в обработке ошибок, но и для реализации повторных попыток выполнить какое-либо действие в конвейере обработки данных. Это удобно, если возникают эпизодические проблемы с сетевым подключением и надо повторить попытку, попутно отправив дата-инженеру уведомление об этом. В таком случае можно использовать комбинацию retries и on_retry_callback в default_args:

   default_args={
        "email_on_failure": False,
        "retries": 1,
        "on_retry_callback": create_incident,
    }

При использовании функции обратного вызова on_failure_callback() необходимо передать ей аргумент контекста context, который позволяет извлечь информацию о выполнении DAG и экземпляре задачи для определения точки отказа. При параллельной обработке нескольких задач в DAG обратный вызов on_failure_callback() будет вызываться при каждом сбое, если это определено в default_args или во всех параллельных задачах, которые потерпели сбой. Можно иметь общую терминальную задачу для них, в которой определить обратный вызов вместе с правилом триггера trigger_rule=all_done. О том, что такое правила триггеров и как с ними работать, мы писали здесь. Также можно использовать on_failure_callback() на уровне DAG  для отправки только одного сообщения об ошибке. Для этого следует вместо @task указать @dag:

@dag(
    start_date=datetime(2021, 1, 1),
    max_active_runs=1,
    schedule_interval="@daily",
    default_args={
        "email_on_failure": False,
        "retries": 0,
    },
    catchup=False,
    on_failure_callback=create_incident,
    tags = ['dag_callbacks', 'taskflow']
)

Метод обратного вызова принимает только один аргумент для передачи контекста, который можно определить следующим образом:

def create_incident(context):
    ti = context['task_instance']
    email = ti.xcom_pull(key='notification_params', task_ids='init')

Чтобы передавать несколько аргументов в параметрах обратного вызова, придется использовать XCom для обмена данными между задачами. Также можно создать отдельную функцию обработки ошибок, которая принимает нужное количество параметров и вызывать ее с помощью callback-метода, например:

import error_handler
def  create_incident (context):
    dag_id = context[ 'dag' ].dag_id
    task_id = context[ 'task' ].task_id
    error_handler(dag_id, task_id, 'mine@email.org' )

В заключение отметим, что обратные вызовы на уровне задачи выполняются реальной задачей, и логируются в журнале задач, тогда как обратные вызовы уровня DAG выполняются планировщиком, который логирует события в файле DAG_FILE.py.log. Этот файл журнала располагается в директории $AIRFLOW_HOME/logs/scheduler/latest/PROJECT/.

Узнайте больше про использование Apache AirFlow  для дата-инженерии и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://medium.com/@manmeetkaur.rangoola/airflow-logging-and-monitoring-fc3cf32c4d50
  2. https://medium.com/nerd-for-tech/airflow-features-callback-trigger-clsuter-policy-cc7f8022e7d3
Поиск по сайту