В предыдущей статье мы рассмотрели TaskFlow API, появившийся в Apache Airflow 2.0. Сегодня поговорим о способах задания операторов, отличных от PythonOperator, а также о способе группировки задач TaskGroup. Читайте далее: как сформировать BashOperator, используя TaskFlow API, когда следует использовать TaskGroup, в чем преимущества TaskGroup перед SubDag.
Используем Bash Operator в связке с TaskFlow API
TaskFlow API очевидно использует операторы Python. Однако и внутри задекорированных функций можно создавать все доступные операторы. Покажем это на примере.
Допустим, есть 2 задачи, которые возвращают небольшие строки, причем 2-я задача модифицирует строку 1-й задачи. После этого вызывается BashOperator
, который выводит модифицированную строку в стандартный вывод. Первые две задачи сформируем в виде функции под декоратором task
, а третью создадим внутри задекорированной функции dag
. Итак, наш код на Python с композицией нескольких задач Airflow выглядит следующим образом:
from airflow.decorators import dag, task from airflow.utils.dates import days_ago from airflow.operators.bash_operator import BashOperator default_args = { 'owner': 'romank', 'start_date': days_ago(0), } @task def t1(): return 'first' @task def t2(data): return data + ', second' @dag(default_args=default_args, schedule_interval=None) def res(): data1 = t1() data2 = t2(data1) bop = BashOperator( task_id='echo_task', bash_command=f'echo {data2}' ) data2 >> bop dag_res = res()
Как можно заметить, data2
является как возвращаемым значением функции t2
, так и самой задачей. Поэтому, помимо того, что data2
входит в аргумент команды echo
, она также является предыдущей задачей по отношению к BashOperator
.
Здесь оператор Bash был приведен в качестве примера, но его можно заменить на любой другой или использовать несколько. Таким образом, мы объединили Taskflow API, который использует явную кросс-коммуникацию (XCom), и традиционный способ задания операторов Airflow.
Data Pipeline на Apache Airflow
Код курса
AIRF
Ближайшая дата курса
19 марта, 2025
Продолжительность
24 ак.часов
Стоимость обучения
72 000 руб.
Объединяем графы с TaskGroup
В Airflow 2.0 появилась возможность представлять графы в виде отдельных задач. Достигается это с помощью TaskGroup, который используется в качестве контекстного менеджера [1]. Зачем это нужно? Для соблюдения принципа модульности и для визуального объединения задач. Также оно помогает избежать повторения кода, если подграф используется несколько раз или вовсе экспортирован в другой граф.
Расширим пример. Объединим задачи, которые модифицируют строки, в отдельные группы. Их результаты объединим в задаче merge_data
, которая в свою очередь пойдет на вход оператору Bash. Код на Python для разбиения задач Airflow на группы выглядит так:
from airflow.decorators import dag, task from airflow.utils.dates import days_ago from airflow.utils.task_group import TaskGroup default_args = { 'owner': 'romank', 'start_date': days_ago(0), } @task def t1(): return 'first' @task def t2(): return 'second' @task def transform_task_1(data): return data + ' changed' @task def transform_task_2(data): return data + ' modified' @task def merge_data(data_1, data_2): return data_1 + ', ' + data_2 @dag(default_args=default_args, schedule_interval=None) def dag_with_taskgroup(): with TaskGroup('section-1') as section_1: data = t1() data_1 = transform_task_1(data) with TaskGroup('section-2') as section_2: data = t2() data_2 = transform_task_2(data) result = merge_data(data_1, data_2) bop = BashOperator( task_id='echo_task', bash_command=f'echo {result}' ) result >> bop dag = dag_with_taskgroup()
Сам граф расположен на рисунке ниже, при нажатии на группу, она раскрывается. Задача merge_data
зависит от групп section-1
и section-2
, поскольку ожидает их результатов.
Стоит ли пользоваться TaskGroup
TaskGroup появился вместо SubDag. И он появился не зря. SubDag медлителен, нестабильный, сложен в использовании и приводит к странным последствиям; разработчики хотят его даже удалить в Airflow 3.0 [2]. Поэтому TaskGroup является альтернативой сложным манипуляциям с SubDag’ами. Группы задач из TaskGroup находятся внутри того же самого графа, отсюда все параметры и конфигурации также доступны через него.
Несмотря на то, что группировка задач выглядит удобно, не стоит её применять в и так уже разросшимся DAG’е, увеличивая тем самым его сложность. Представьте себе, что у вас собираются данных с разных API, систем ERP, запущенных скриптов и проч., тогда вливать это всё в единый конвейер будет не самой разумной идеей. Ведь такого “монстра” сложно будет поддерживать. В данном случае лучше всего придерживаться принципа декомпозиции: разделяй и властвуй. Поэтому применять TaskGroup стоит с излишней осторожностью. Если кажется, что задачи можно скомпоновать в отдельный граф, сделайте это. Тогда такой граф можно тестировать отдельно, а после его можно импортировать в другие графы. В другой нашей статье вы также узнаете о лучших практиках создания Big Data конвейеров данных в Apache Airflow.
Код курса
ADH-AIR
Ближайшая дата курса
по запросу
Продолжительность
ак.часов
Стоимость обучения
0 руб.
О конструировании графов, создании групп задач в Apache Airflow вы узнаете на наших образовательных курсах в лицензированном учебном центре обучения и повышения квалификации руководителей и ИТ-специалистов (менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data) в Москве: