Содержание
Когда имеются графы (dags), зависимые от других, то лучше всего объединить их в один или использовать TaskGroup, о котором говорили в прошлой статье. Но если по каким-то причинам сделать это не удается, то Apache Airflow предоставляет различные способы запуска графа внутри другого. Одним из таких является TriggerDagRunOperator. В этой статье на примере разберем способы его задания, его особенности и параметры.
Пример запуска зависимого графа Apache Airflow
Допустим, есть два графа: граф А (dag A) и граф Б (dag B). После выполнения задач графа А должен запуститься граф Б (рис. ниже). Как в Apache Airflow запустить один граф следом за другим? На помощь в таком случае приходит TriggerDagRunOperator. Этот оператор представляет граф в виде задачи, которую можно запустить изнутри другого графа. Он достаточно удобен и не создает много проблем, как это происходит с SubDag’ами.
Apache Airflow для инженеров данных
Код курса
AIRF
Ближайшая дата курса
1 декабря, 2025
Продолжительность
24 ак.часов
Стоимость обучения
72 000
Итак, у нас есть граф А, который состоит из задач: parse (распарсить) и download (скачать). Также у нас есть универсальный граф Б, который мы постоянно используем (поэтому он выделен отдельно от остальных) и состоит из задач: transform (преобразовать) и download (загрузить). Граф Б должен запустить граф А. Как уже было сказано, мы оформим граф А в виде задачи оператора TriggerDagRunOperator.

Граф А, реализованный в Apache Airflow, выглядит следующим образом:
# dags/dagA.py
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from datetime import datetime
default_args = { 'owner': 'romank', }
with DAG(
    'dagA',
    default_args=default_args,
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    catchup=False
) as dag:
    download = DummyOperator(task_id='download')
    parse = DummyOperator(task_id='parse')
    download >> parse
Граф Б, который должен запустить граф А, вот так:
# dags/dagB.py
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from datetime import datetime
default_args = { 'owner': 'romank', }
with DAG(
    'dagB',
    default_args=default_args,
    schedule_interval='@daily',
    catchup=False,
    start_date=datetime(2021, 1, 1)
) as dag:
    trigger_dagA = TriggerDagRunOperator(
        task_id='trigger_dagA',
        trigger_dag_id='dagA',
    )
    transform = DummyOperator(task_id='transform')
    load = DummyOperator(task_id='load')
    trigger_dagA >> transform >> load
Параметром trigger_dag_id был задан зависимый граф А, который в графе Б будет отражаться значением task_id. Запустив граф Б вы сможете увидеть, как запустился граф А, при условии что он не стоит на паузе. Можно заметить, что значения schedule_interval различаются, они могут быть и одинаковыми, ведь граф А не обязан запускаться напрямую. Чтобы узнать, как был запущен граф, можете навести курсор на сам граф во вкладке с древовидным видом. Если граф был запущен по расписанию там будет стоять Run id: scheduled, а если напрямую, то — Run id: manual.

В Airflow 2.1 появилось возможность посмотреть зависимости графов, который были реализованы через TriggerDagRunOperator (наш случай) или ExternalTaskSensor. Он находится во вкладке главного меню Browse -> DAG Dependencies. Читайте также о новшествах, появившихся в Apache Airflow 2.0 тут.
Параметры TriggerDagRunOperator
Если вы запустите графа Б ещё раз, то он провалится. Почему так? Потому что граф нельзя ставить на запуск в одну и ту же дату исполнения (execution date), а мы уже это сделали в первый раз. Если вам требуется перезапустить граф в ту же дату исполнения, то установите параметр reset_dag_run, равным True:
trigger_dagA = TriggerDagRunOperator(
    task_id='trigger_dagA',
    trigger_dag_id='dagA',
    reset_dag_run=True
)
Если вам требуется установить дату исполнения самостоятельно, то используйте параметр execution_date. Также можно передать параметра конфигурации, то делается это через conf (в ранних версиях использовался python_callable). Например, чтобы установить зависимому графу исполнения ту же, что и в вызывающем, то передайте ему макрос {{ ds }}:
trigger_dagA = TriggerDagRunOperator(
    task_id='trigger_dagA',
    trigger_dag_id='dagA',
    execution_date='{{ ds }}'
)
Ещё полезным параметром является wait_for_completion, который при флаге True будет ждать получения статуса success у зависимого графа. Попробуйте это сделать, поставив на паузу граф А, тогда вы увидите, что граф Б находится в статусе ожидания. Он будет опрашивать зависимый граф через промежутки, равные poke_interval (по умолчанию стоит 60 секунд, это значение также можно изменить). Прошу обратить внимание, если у вас исполнитель Sequеntial, то параметр wait_for_completion может “повесить” зависимый граф [1].
TriggerDagRunOperator может стоять и в середине, и внизу
Зависимый граф под оператором TriggerDagRunOperator можно вставлять и после выполнения задач. Никто не мешает выполнить TriggerDagRunOperator как после первой задачи (рис. ниже):
transform >> trigger_dagA >> load
— так и в конце:
transform >> load >> trigger_dagA

Если вам потребовалось запускать DAG после выполнения условия, то, скорее всего, лучше использовать BranchPythonOperator. А ещё больше подробностей о зависимых графах Apache Airflow, как ими управлять вы узнаете на наших образовательных курсах в лицензированном учебном центре обучения и повышения квалификации руководителей и ИТ-специалистов (менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data) в Москве:
Источники

			
			
			
			

