Когда имеются графы (dags), зависимые от других, то лучше всего объединить их в один или использовать TaskGroup, о котором говорили в прошлой статье. Но если по каким-то причинам сделать это не удается, то Apache Airflow предоставляет различные способы запуска графа внутри другого. Одним из таких является TriggerDagRunOperator. В этой статье на примере разберем способы его задания, его особенности и параметры.
Пример запуска зависимого графа Apache Airflow
Допустим, есть два графа: граф А (dag A
) и граф Б (dag B
). После выполнения задач графа А должен запуститься граф Б (рис. ниже). Как в Apache Airflow запустить один граф следом за другим? На помощь в таком случае приходит TriggerDagRunOperator. Этот оператор представляет граф в виде задачи, которую можно запустить изнутри другого графа. Он достаточно удобен и не создает много проблем, как это происходит с SubDag’ами.
Data Pipeline на Apache Airflow
Код курса
AIRF
Ближайшая дата курса
27 ноября, 2024
Продолжительность
24 ак.часов
Стоимость обучения
72 000 руб.
Итак, у нас есть граф А, который состоит из задач: parse (распарсить) и download (скачать). Также у нас есть универсальный граф Б, который мы постоянно используем (поэтому он выделен отдельно от остальных) и состоит из задач: transform (преобразовать) и download (загрузить). Граф Б должен запустить граф А. Как уже было сказано, мы оформим граф А в виде задачи оператора TriggerDagRunOperator.
Граф А, реализованный в Apache Airflow, выглядит следующим образом:
# dags/dagA.py from airflow import DAG from airflow.operators.dummy import DummyOperator from datetime import datetime default_args = { 'owner': 'romank', } with DAG( 'dagA', default_args=default_args, schedule_interval=None, start_date=datetime(2021, 1, 1), catchup=False ) as dag: download = DummyOperator(task_id='download') parse = DummyOperator(task_id='parse') download >> parse
Граф Б, который должен запустить граф А, вот так:
# dags/dagB.py from airflow import DAG from airflow.operators.dummy import DummyOperator from airflow.operators.trigger_dagrun import TriggerDagRunOperator from datetime import datetime default_args = { 'owner': 'romank', } with DAG( 'dagB', default_args=default_args, schedule_interval='@daily', catchup=False, start_date=datetime(2021, 1, 1) ) as dag: trigger_dagA = TriggerDagRunOperator( task_id='trigger_dagA', trigger_dag_id='dagA', ) transform = DummyOperator(task_id='transform') load = DummyOperator(task_id='load') trigger_dagA >> transform >> load
Параметром trigger_dag_id
был задан зависимый граф А, который в графе Б будет отражаться значением task_id
. Запустив граф Б вы сможете увидеть, как запустился граф А, при условии что он не стоит на паузе. Можно заметить, что значения schedule_interval
различаются, они могут быть и одинаковыми, ведь граф А не обязан запускаться напрямую. Чтобы узнать, как был запущен граф, можете навести курсор на сам граф во вкладке с древовидным видом. Если граф был запущен по расписанию там будет стоять Run id: scheduled
, а если напрямую, то — Run id: manual
.
В Airflow 2.1 появилось возможность посмотреть зависимости графов, который были реализованы через TriggerDagRunOperator (наш случай) или ExternalTaskSensor. Он находится во вкладке главного меню Browse -> DAG Dependencies. Читайте также о новшествах, появившихся в Apache Airflow 2.0 тут.
Параметры TriggerDagRunOperator
Если вы запустите графа Б ещё раз, то он провалится. Почему так? Потому что граф нельзя ставить на запуск в одну и ту же дату исполнения (execution date), а мы уже это сделали в первый раз. Если вам требуется перезапустить граф в ту же дату исполнения, то установите параметр reset_dag_run
, равным True
:
trigger_dagA = TriggerDagRunOperator( task_id='trigger_dagA', trigger_dag_id='dagA', reset_dag_run=True )
Если вам требуется установить дату исполнения самостоятельно, то используйте параметр execution_date
. Также можно передать параметра конфигурации, то делается это через conf
(в ранних версиях использовался python_callable
). Например, чтобы установить зависимому графу исполнения ту же, что и в вызывающем, то передайте ему макрос {{ ds }}
:
trigger_dagA = TriggerDagRunOperator( task_id='trigger_dagA', trigger_dag_id='dagA', execution_date='{{ ds }}' )
Ещё полезным параметром является wait_for_completion
, который при флаге True
будет ждать получения статуса success у зависимого графа. Попробуйте это сделать, поставив на паузу граф А, тогда вы увидите, что граф Б находится в статусе ожидания. Он будет опрашивать зависимый граф через промежутки, равные poke_interval
(по умолчанию стоит 60 секунд, это значение также можно изменить). Прошу обратить внимание, если у вас исполнитель Sequеntial, то параметр wait_for_completion
может “повесить” зависимый граф [1].
Код курса
ADH-AIR
Ближайшая дата курса
по запросу
Продолжительность
ак.часов
Стоимость обучения
0 руб.
TriggerDagRunOperator может стоять и в середине, и внизу
Зависимый граф под оператором TriggerDagRunOperator можно вставлять и после выполнения задач. Никто не мешает выполнить TriggerDagRunOperator как после первой задачи (рис. ниже):
transform >> trigger_dagA >> load
— так и в конце:
transform >> load >> trigger_dagA
Если вам потребовалось запускать DAG после выполнения условия, то, скорее всего, лучше использовать BranchPythonOperator. А ещё больше подробностей о зависимых графах Apache Airflow, как ими управлять вы узнаете на наших образовательных курсах в лицензированном учебном центре обучения и повышения квалификации руководителей и ИТ-специалистов (менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data) в Москве:
Источники