Бизнес-логика в DAG Apache AirFlow c ShortCircuitOperator

DAG Apache AirFlow ShortCircuitOperator примеры курсы обучение, обучение дата-инженер AirFlow, Apache Airflow для дата-инженера примеры курсы обучение, Школа Больших Данных Учебный Центр Коммерсант

Как реализовать условную логику выполнения задач в DAG-конвейере Apache AirFlow, используя оператор ShortCircuitOperator. А также зачем использовать декоратор и при чем здесь правило триггера.

Что такое ShortCircuitOperator в Apache AirFlow и как он работает

Мы уже писали здесь и здесь, что с помощью операторов, существующих в Apache AirFlow, дата-инженер может составить не просто линейный конвейер обработки данных из последовательно выполняющихся задач, а пропускать или выполнять определенные задачи в этом DAG. Одним из таких полезных операторов является оператор ShortCircuitOperator, который может предотвратить выполнение нижестоящих задач, если вышестоящие задачи не завершились успешно. Это позволяет избежать проблем с неполными данными, а также сэкономить время и ресурсы. 

ShortCircuitOperator представляет собой логический оператор, который оценивает первый операнд и, в зависимости от результата, может оценивать второй или пропускать его, фактически реализуя бизнес-логику условных операций.

В AirFlow операторы ShortCircuitOperator могут использоваться в DAG для пропуска ненужных задач. Когда задача зависит от нескольких вышестоящих, каждая из них должна быть завершена, прежде чем можно будет запустить зависимую задачу. Но если одна из задач завершается сбоем или пропускается, то зависимая задача также может быть пропущена.

ShortCircuitOperator принимает вызываемый объект Python, возвращая True или False в зависимости от логики. Если возвращено значение True, DAG продолжит работу, а если возвращено значение False, все нижестоящие задачи будут пропущены. Этот оператор лучше всего использовать, когда известно, что часть DAG запускается лишь время от времени. Например, DAG может выполняться ежедневно, но некоторые задачи должны выполняться только по воскресеньям. Или DAG управляет моделью машинного обучения, и задачи, которые публикуют модель, должны выполняться только в том случае, если после обучения достигается определенная точность. Такая условная логика также может быть реализована с помощью BranchPythonOperator, который требует возврата идентификатора задачи. ShortCircuitOperator больше подходит для случаев, когда условная логика эквивалентна «запускать или нет», а не «запускать одно или другое».

Таким образом, ShortCircuitOperator является производным от PythonOperator и позволяет продолжить конвейер на основе результата python_callable. Если возвращенный результат равен False, нисходящие задачи будут пропущены (skipped) в зависимости от настроенного режима оператора. Если возвращаемый результат равен True, он передается механизму взаимодействия между задачами XCom, чтобы нижестоящие задачи выполнялись как обычно.

Оператор ShortCircuitOperator можно настроить так, чтобы он учитывал или игнорировал набор триггерных правил (trigger_rule) для нижестоящих задач. Для этого есть параметр ignore_downstream_trigger_rules. Если для этого параметра установлено значение True (по умолчанию), все подчиненные задачи пропускаются без учета правила правил триггера, определенного для задач. Но если для этого параметра задано значение False, прямые нижестоящие задачи пропускаются, хотя заданное правило триггера для других последующих нижестоящих задач соблюдается. В этом режиме оператор предполагает, что намеренно должны быть пропущены только прямые последующие задачи, но не другие.

Рекомендуется использовать декоратор @task.short_circuit, вместо классического ShortCircuitOperator для конвейеров через вызываемые объекты Python, чтобы контролировать, будет ли конвейер продолжаться, если условие выполнено, т.е. получено значение True. Оценка условия и его значения выполняется через выходные данные декорированной функции. Если декорированная функция возвращает True, конвейер продолжает работу, отправляя XCom результат вывода. Если вывод имеет значение False, конвейер пропускает следующую задачу.

Декорированной декоратором @task.short_circuit Python-функции можно передать дополнительные аргументы, как обычной (без декоратора). Аналогично PythonOperator, можно использовать шаблоны Jinja.

Чтоб наглядно проиллюстрировать работу оператора ShortCircuitOperator, рассмотрим пример. Задачи, следующие за задачей condition_is_True, будут выполняться, а задачи, следующие за задачей condition_is_False, будут пропущены.

DAG Apache AirFlow ShortCircuitOperat
DAG в Apache AirFlow c ShortCircuitOperator

Python-код, демонстрирующий работу ShortCircuitOperator-оператора Apache AirFlow, выглядит следующим образом:

from __future__ import annotations

import pendulum

from airflow import DAG
from airflow.models.baseoperator import chain
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import ShortCircuitOperator
from airflow.utils.trigger_rule import TriggerRule

with DAG(
    dag_id="example_short_circuit_operator",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:
    cond_true = ShortCircuitOperator(
        task_id="condition_is_True",
        python_callable=lambda: True,
    )

    cond_false = ShortCircuitOperator(
        task_id="condition_is_False",
        python_callable=lambda: False,
    )

    ds_true = [EmptyOperator(task_id="true_" + str(i)) for i in [1, 2]]
    ds_false = [EmptyOperator(task_id="false_" + str(i)) for i in [1, 2]]

    chain(cond_true, *ds_true)
    chain(cond_false, *ds_false)

    [task_1, task_2, task_3, task_4, task_5, task_6] = [
        EmptyOperator(task_id=f"task_{i}") for i in range(1, 7)
    ]

    task_7 = EmptyOperator(task_id="task_7", trigger_rule=TriggerRule.ALL_DONE)

    short_circuit = ShortCircuitOperator(
        task_id="short_circuit", ignore_downstream_trigger_rules=False, python_callable=lambda: False
    )

    chain(task_1, [task_2, short_circuit], [task_3, task_4], [task_5, task_6], task_7)

В этом примере задача short_circuit настроена на соблюдение правил триггера нисходящего потока. Это означает, что хотя задачи, которые следуют за задачей short_circuit, будут пропущены, поскольку декорированная функция возвращает значение False. Но , «задача_7» по-прежнему будет выполняться, поскольку она настроена на выполнение после завершения выполнения вышестоящих задач независимо от состояния, т. е. правила триггера TriggerRule.ALL_DONE.

Освойте все возможности Apache AirFlow  для дата-инженерии и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://datageeks.medium.com/shortcircuitoperator-in-airflow-fd21b5989266
  2. https://registry.astronomer.io/providers/apache-airflow/modules/shortcircuitoperator/#docs
  3. https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html#howto-operator-shortcircuitoperator
Поиск по сайту