Чем BranchPythonOperator отличается от ShortCircuitOperator, что и когда выбирать для ветвления DAG в Apache Airflow: принципы работы и примеры использования.
Ветвления DAG в Apache AirFlow с помощью операторов
Чтобы поддерживать реализацию сложных конвейеров обработки данных, в Apache Airflow есть соответствующие механизмы ветвления графа задач, т.е. DAG (Directed Acyclic Graph). По умолчанию в DAG каждая следующая задача запускается только тогда, когда все предыдущие, от которых она зависит, выполнены успешно. Изменить это можно следующими способами:
- ветвление — выбор задачи, которую надо запустить, на основе одного или нескольких условий;
- правила триггеров — установка условий запуска задачи, о чем мы подробно писали здесь и здесь;
- задачи настройки и демонтажа;
- внутренние зависимости текущей задачи от ее предыдущего выполнения.
Пожалуй, самым простым способом реализации ветвления в Airflow является BranchPythonOperator. Он принимает любую функцию Python в качестве входных данных, если она возвращает список допустимых идентификаторов для задач, которые надо выполнить в DAG после завершения функции. При этом остальные задачи в DAG будут пропущены. Этот оператор или его декорированную версию @task.branch можно использовать в следующих случаях:
- когда надо выбрать один или несколько путей выполнения на основе условий;
- когда нужно выбрать конкретные задачи для выполнения из набора задач.
С учетом роста популярности TaskFlow API, в DAG рекомендуется использовать именно декоратор @task.branch вместо прямого создания экземпляра оператора BranchPythonOperator. Впрочем, для создания пользовательского оператора по-прежнему придется работать с традиционным API.
Начиная с Airflow 2.10, можно задать целую группу задач как непосредственный нисходящий элемент задачи ветвления, вернув ее task_group_id в Python-функцию или вместо идентификатора задачи task_id.
Как мы уже отмечали здесь, в Airflow есть несколько специфических реализаций операторов ветвления, которые работают аналогично BranchPythonOperator, но для более конкретных контекстов, например, BranchSQLOperator, BranchDayOfWeekOperator и BranchDateTimeOperator. Начиная с версии 2.7 появился ExternalBranchPythonOperator для работы с ExternalPythonOperator в уже существующей виртуальной среде, а с версии 2.8 – BranchPythonVirtualenvOperator для работы с PythonVirtualenvOperator во вновь создаваемой виртуальной среде. Все эти операторы принимают параметры follow_task_ids_if_true и follow_task_ids_if_false, предоставляя список задач для ветвления на основе логики, возвращаемой оператором.
Если есть нижестоящие задачи, которые должны выполняться независимо от выбранной ветви, придется обновить правило триггера. По умолчанию в Airflow для каждой задачи работает правило триггера all_success, т.е. зависимая задача будет выполняться, только если все вышестоящие (родительские) задачи выполнены успешно. Если задачи вышестоящего уровня пропущены, то нижестоящая задача не будет запущена.
Еще одним вариантом реализации условной логики в DAG является оператор короткого замыкания ShortCircuitOperator или его декорированная версия @task.short_circuit. Этот оператор принимает функцию Python, которая возвращает True или False. В случае возврата True выполнение DAG продолжается, а при возврате False все нижестоящие задачи пропускаются. Этот оператор полезен, если некоторые задачи в DAG должны запускаться только изредка. Например, DAG запускается ежедневно, но его отдельные задачи выполняются только по воскресеньям. Или в системе машинного обучения, надо запускать задачи публикации ML-модели, когда достигается определенная точность обучения. Таким образом, ShortCircuitOperator используется для прекращения выполнения последующих задач на основе условий. Если условие не выполнено, задачи, идущие после ShortCircuitOperator, будут пропущены. Это полезно, когда нужно полностью остановить выполнение последующих задач, если условие не выполнено, или когда выполнение всего поддерева задач зависит от результатов выполнения одного условия. Это происходит, когда ShortCircuitOperator настроен на игнорирование триггерных правил для нижестоящих задач, т.е. параметр ignore_downstream_trigger_rules установлен в значение True. Если задать для этого параметра значение False, будут пропущены только прямые нижестоящие задачи, но те, которые идут после них. Пример того, как это работает мы разбирали здесь.
Таким образом, BranchPythonOperator (или @task.branch) подходит, когда есть несколько путей выполнения, и требуется выбрать один или несколько из них. А ShortCircuitOperator (или @task.short_circuit) следует выбирать, когда необходимо остановить выполнение задач, если условие не выполнено, и нужно избежать выполнения всех последующих задач, чтобы сэкономить ресурсы AirFlow.
Узнайте больше про администрирование и эксплуатацию Apache AirFlow для оркестрации пакетных процессов в задачах реальной дата-инженерии на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Источники