Airflow и TaskFlow: композиция операторов и задач с TaskGroup

Airflow и TaskFlow: композиция операторов и задач с TaskGroup

    В предыдущей статье мы рассмотрели TaskFlow API, появившийся в Apache Airflow 2.0. Сегодня поговорим о способах задания операторов, отличных от PythonOperator, а также о способе группировки задач TaskGroup. Читайте далее: как сформировать BashOperator, используя TaskFlow API, когда следует использовать TaskGroup, в чем преимущества TaskGroup перед SubDag.

    Используем Bash Operator в связке с TaskFlow API

    TaskFlow API очевидно использует операторы Python. Однако и внутри задекорированных функций можно создавать все доступные операторы. Покажем это на примере.

    Допустим, есть 2 задачи, которые возвращают небольшие строки, причем 2-я задача модифицирует строку 1-й задачи. После этого вызывается BashOperator, который выводит модифицированную строку в стандартный вывод. Первые две задачи сформируем в виде функции под декоратором task, а третью создадим внутри задекорированной функции dag. Итак, наш код на Python с композицией нескольких задач Airflow выглядит следующим образом:

    from airflow.decorators import dag, task
    from airflow.utils.dates import days_ago
    from airflow.operators.bash_operator import BashOperator
    
    default_args = {
        'owner': 'romank',
        'start_date': days_ago(0),
    }
    
    @task
    def t1():
        return 'first'
    
    @task
    def t2(data):
        return data + ', second'
    
    @dag(default_args=default_args, schedule_interval=None)
    def res():
        data1 = t1()
        data2 = t2(data1)
        bop = BashOperator(
            task_id='echo_task',
            bash_command=f'echo {data2}'
        )
        data2 >> bop
    
    dag_res = res()
    

    Как можно заметить, data2 является как возвращаемым значением функции t2, так и самой задачей. Поэтому, помимо того, что data2 входит в аргумент команды echo, она также является предыдущей задачей по отношению к BashOperator.

    Здесь оператор Bash был приведен в качестве примера, но его можно заменить на любой другой или использовать несколько. Таким образом, мы объединили Taskflow API, который использует явную кросс-коммуникацию (XCom), и традиционный способ задания операторов Airflow.

    Apache Airflow для инженеров данных

    Код курса
    AIRF
    Ближайшая дата курса
    1 декабря, 2025
    Продолжительность
    24 ак.часов
    Стоимость обучения
    72 000

    Объединяем графы с TaskGroup

    В Airflow 2.0 появилась возможность представлять графы в виде отдельных задач. Достигается это с помощью TaskGroup, который используется в качестве контекстного менеджера [1]. Зачем это нужно? Для соблюдения принципа модульности и для визуального объединения задач. Также оно помогает избежать повторения кода, если подграф используется несколько раз или вовсе экспортирован в другой граф.

    Расширим пример. Объединим задачи, которые модифицируют строки, в отдельные группы. Их результаты объединим в задаче merge_data, которая в свою очередь пойдет на вход оператору Bash. Код на Python для разбиения задач Airflow на группы выглядит так:

    from airflow.decorators import dag, task
    from airflow.utils.dates import days_ago
    from airflow.utils.task_group import TaskGroup
    
    default_args = { 'owner': 'romank', 'start_date': days_ago(0), }
    
    @task
    def t1():
        return 'first'
    
    @task
    def t2():
        return 'second'
    
    @task
    def transform_task_1(data):
        return data + ' changed'
    
    @task
    def transform_task_2(data):
        return data + ' modified'
    
    @task
    def merge_data(data_1, data_2):
        return data_1 + ', ' + data_2
    
    @dag(default_args=default_args, schedule_interval=None)
    def dag_with_taskgroup():
        with TaskGroup('section-1') as section_1:
            data = t1()
            data_1 = transform_task_1(data)
        with TaskGroup('section-2') as section_2:
            data = t2()
            data_2 = transform_task_2(data)
        result = merge_data(data_1, data_2)
        bop = BashOperator(
            task_id='echo_task',
            bash_command=f'echo {result}'
        )
        result >> bop
    
    dag = dag_with_taskgroup()
    

    Сам граф расположен на рисунке ниже, при нажатии на группу, она раскрывается. Задача merge_data зависит от групп section-1 и section-2, поскольку ожидает их результатов.

    Группировка задач TaskGroup
    Сгруппированные задачи

    Стоит ли пользоваться TaskGroup

    TaskGroup появился вместо SubDag. И он появился не зря. SubDag медлителен, нестабильный, сложен в использовании и приводит к странным последствиям; разработчики хотят его даже удалить в Airflow 3.0 [2]. Поэтому TaskGroup является альтернативой сложным манипуляциям с SubDag’ами. Группы задач из TaskGroup находятся внутри того же самого графа, отсюда все параметры и конфигурации также доступны через него.

    Несмотря на то, что группировка задач выглядит удобно, не стоит её применять в и так уже разросшимся DAG’е, увеличивая тем самым его сложность. Представьте себе, что у вас собираются данных с разных API, систем ERP, запущенных скриптов и проч., тогда вливать это всё в единый конвейер будет не самой разумной идеей. Ведь такого “монстра” сложно будет поддерживать. В данном случае лучше всего придерживаться принципа декомпозиции: разделяй и властвуй. Поэтому применять TaskGroup стоит с излишней осторожностью. Если кажется, что задачи можно скомпоновать в отдельный граф, сделайте это. Тогда такой граф можно тестировать отдельно, а после его можно импортировать в другие графы. В другой нашей статье вы также узнаете о лучших практиках создания Big Data конвейеров данных в Apache Airflow.

    О конструировании графов, создании групп задач в Apache Airflow вы узнаете на наших образовательных курсах в лицензированном учебном центре обучения и повышения квалификации руководителей и ИТ-специалистов (менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data) в Москве:

    1. https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags/#taskgroups
    2. https://lists.apache.org/thread//ra52746f9c8274469d343b5f0251199de776e75ab75ded6830886fb6a%40%3Cdev.airflow.apache.org%3E?noscript