Запуск зависимых графов Apache Airflow

Когда имеются графы (dags), зависимые от других, то лучше всего объединить их в один или использовать TaskGroup, о котором говорили в прошлой статье. Но если по каким-то причинам сделать это не удается, то Apache Airflow предоставляет различные способы запуска графа внутри другого. Одним из таких является TriggerDagRunOperator. В этой статье на примере разберем способы его задания, его особенности и параметры.

Пример запуска зависимого графа Apache Airflow

Допустим, есть два графа: граф А (dag A) и граф Б (dag B). После выполнения задач графа А должен запуститься граф Б (рис. ниже). Как в Apache Airflow запустить один граф следом за другим? На помощь в таком случае приходит TriggerDagRunOperator. Этот оператор представляет граф в виде задачи, которую можно запустить изнутри другого графа. Он достаточно удобен и не создает много проблем, как это происходит с SubDag’ами.

Data Pipeline на Apache Airflow

Код курса
AIRF
Ближайшая дата курса
27 ноября, 2024
Продолжительность
24 ак.часов
Стоимость обучения
72 000 руб.

Итак, у нас есть граф А, который состоит из задач: parse (распарсить) и download (скачать). Также у нас есть универсальный граф Б, который мы постоянно используем (поэтому он выделен отдельно от остальных) и состоит из задач: transform (преобразовать) и download (загрузить). Граф Б должен запустить граф А. Как уже было сказано, мы оформим граф А в виде задачи оператора TriggerDagRunOperator.

Dag запускается внутри другого
Зависимый граф А, который запускается графом Б

Граф А, реализованный в Apache Airflow, выглядит следующим образом:

# dags/dagA.py
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from datetime import datetime

default_args = { 'owner': 'romank', }

with DAG(
    'dagA',
    default_args=default_args,
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    catchup=False
) as dag:

    download = DummyOperator(task_id='download')

    parse = DummyOperator(task_id='parse')

    download >> parse

Граф Б, который должен запустить граф А, вот так:

# dags/dagB.py
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from datetime import datetime

default_args = { 'owner': 'romank', }

with DAG(
    'dagB',
    default_args=default_args,
    schedule_interval='@daily',
    catchup=False,
    start_date=datetime(2021, 1, 1)
) as dag:

    trigger_dagA = TriggerDagRunOperator(
        task_id='trigger_dagA',
        trigger_dag_id='dagA',
    )

    transform = DummyOperator(task_id='transform')

    load = DummyOperator(task_id='load')

    trigger_dagA >> transform >> load

Параметром trigger_dag_id был задан зависимый граф А, который в графе Б будет отражаться значением task_id. Запустив граф Б вы сможете увидеть, как запустился граф А, при условии что он не стоит на паузе. Можно заметить, что значения schedule_interval различаются, они могут быть и одинаковыми, ведь граф А не обязан запускаться напрямую. Чтобы узнать, как был запущен граф, можете навести курсор на сам граф во вкладке с древовидным видом. Если граф был запущен по расписанию там будет стоять Run id: scheduled, а если напрямую, то — Run id: manual.

Dag Dependencies (apache airflow)
Зависимости графа в Apache Airflow

В Airflow 2.1 появилось возможность посмотреть зависимости графов, который были реализованы через TriggerDagRunOperator (наш случай) или ExternalTaskSensor. Он находится во вкладке главного меню Browse -> DAG Dependencies. Читайте также о новшествах, появившихся в Apache Airflow 2.0 тут.

Параметры TriggerDagRunOperator

Если вы запустите графа Б ещё раз, то он провалится. Почему так? Потому что граф нельзя ставить на запуск в одну и ту же дату исполнения (execution date), а мы уже это сделали в первый раз. Если вам требуется перезапустить граф в ту же дату исполнения, то установите параметр reset_dag_run, равным True:

trigger_dagA = TriggerDagRunOperator(
    task_id='trigger_dagA',
    trigger_dag_id='dagA',
    reset_dag_run=True
)

Если вам требуется установить дату исполнения самостоятельно, то используйте параметр execution_date. Также можно передать параметра конфигурации, то делается это через conf (в ранних версиях использовался python_callable). Например, чтобы установить зависимому графу исполнения ту же, что и в вызывающем, то передайте ему макрос {{ ds }}:

trigger_dagA = TriggerDagRunOperator(
    task_id='trigger_dagA',
    trigger_dag_id='dagA',
    execution_date='{{ ds }}'
)

Ещё полезным параметром является wait_for_completion, который при флаге True будет ждать получения статуса success у зависимого графа. Попробуйте это сделать, поставив на паузу граф А, тогда вы увидите, что граф Б находится в статусе ожидания. Он будет опрашивать зависимый граф через промежутки, равные poke_interval (по умолчанию стоит 60 секунд, это значение также можно изменить). Прошу обратить внимание, если у вас исполнитель Sequеntial, то параметр wait_for_completion может “повесить” зависимый граф [1].

Код курса
ADH-AIR
Ближайшая дата курса
по запросу
Продолжительность
ак.часов
Стоимость обучения
0 руб.

TriggerDagRunOperator может стоять и в середине, и внизу

Зависимый граф под оператором TriggerDagRunOperator можно вставлять и после выполнения задач. Никто не мешает выполнить TriggerDagRunOperator как после первой задачи (рис. ниже):

transform >> trigger_dagA >> load

— так и в конце:

transform >> load >> trigger_dagA 
Вставка графа Airflow в конец
Зависимый граф в середине

 

Если вам потребовалось запускать DAG после выполнения условия, то, скорее всего, лучше использовать BranchPythonOperator. А ещё больше подробностей о зависимых графах Apache Airflow, как ими управлять вы узнаете на наших образовательных курсах в лицензированном учебном центре обучения и повышения квалификации руководителей и ИТ-специалистов (менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data) в Москве:

Смотреть расписание

Источники

  1. https://github.com/apache/airflow/issues/16157
Поиск по сайту