TaskFlow API и традиционные операторы Apache AirFlow: совместное использование

Apache AirFlow API TaskFlow, AirFlow для дата-инженера, обучение AirFlow, курсы AirFlow, курсы дата-инженеров, обучение инженеров данных, Школа Больших Данных Учебный Центр Коммерсант

Чем API TaskFlow отличается от традиционных операторов Apache Airflow, можно ли их использовать вместе и как это сделать для более эффективной передачи данных между задачами DAG с помощью механизма XCom: несколько примеров.

Что такое API TaskFlow в Apache Airflow

Чтобы реализовать конвейер обработки данных в Apache AirFlow, можно использовать традиционные операторы типа BashOperator, PythonOperator и пр. или выбрать более современный TaskFlow API, доступный с релиза 2.0. Этот способ разработки DAG особенно нравится дата-инженерам, которые используют простой Python-код, а не операторы. Обозначив задачу с помощью декораторов @task, API TaskFlow сам создает PythonOperator и передает переменную XCom, чтобы передавать данные между задачами. Также этот API автоматически рассчитывает зависимости: при вызове функции TaskFlow в файле DAG без ее фактического выполнения можно получить XCom-объект для результата (XComArg), чтобы использовать ее в качестве входных данных для последующих задач или операторов.

При работе с традиционными операторами для передачи данных между задачами DAG используется механизм push-and-pull в XCom, например:

def push_function():
  xcom_push("key", "value ")

def pull_function():
  data = xcom_pull("key")

В TaskFlow API можно передавать данные, просто установив зависимости задач:

@task
def push_task():
  return "value_to_pass"

@task
def pull_task(data):

Декорированные задачи являются гибкими: можно повторно использовать декорированную задачу в нескольких DAG, переопределяя ее параметры (task_id, queue, pool и пр.). Чтобы избавиться от ада зависимостей, характерного для Python-код, можно использовать API TaskFlow с виртуальной средой Python (с версии 2.0.2), контейнером Docker (с версии 2.2.0), ExternalPythonOperator (с версии 2.4.0) или KubernetesPodOperator (с версии 2.4.0). Это устраняет ограничения пакетов и системных библиотек рабочего процесса Airflow. При этом следует убедиться, что функции сериализуемы и что они используют локальный импорт только для дополнительных зависимостей, которые используются. Эти импортированные дополнительные библиотеки должны быть доступны в целевой среде, но не обязательно в основной среде Airflow. Какой из операторов выбрать, зависит от нескольких факторов: используется ли Docker или Kubernetes, допустимы ли накладные расходы на динамическое создание виртуальной среды с новыми зависимостями или нужно развернуть неизменяемую среду Python для всех компонентов Airflow.

Data Pipeline на Apache Airflow

Код курса
AIRF
Ближайшая дата курса
27 ноября, 2024
Продолжительность
24 ак.часов
Стоимость обучения
72 000 руб.

Особенности использования

Примечательно, что API TaskFlow можно использовать вместе с традиционными операторами, сочетая возможности обоих способов для построения эффективных DAG. Хотя API TaskFlow упрощает передачу данных за счет прямой передачи параметров от функции к функции, в некоторых случаях явный характер XCom в традиционных операторах будет более полезен для отладки. Для этого надо передавать данные между традиционными операторами и задачами API TaskFlow. При этом следует помнить, что API TaskFlow по своей сути является оболочкой существующих традиционных функций обработки XCom, которая устраняет необходимость в стандартном коде push/pull. Это означает, что можно извлечь значение из задачи TaskFlowAPI, используя тот же традиционный синтаксис ti.xcom_pull, даже явно не передавав значение из задачи push_data. Это связано с тем, что API TaskFlow берет возвращаемое значение и сохраняет его как Xcom, аналогично использованию функции xcom_push на выходе. Однако, поскольку API TaskFlow использует отложенный метод доступа, возвращаемое значение не будет сохранено как XCom-объект, если оно нигде не используется.

Например, в следующем участке кода декоратор @dag применяется для функции mixed_dag, которая создает DAG Airflow, который должен запускаться ежедневно, начиная с 1 января 2024 года, что указано в параметрах schedule_interval=»@daily» и start_date=datetime(2024, 1, 1).

@dag(schedule_interval="@daily", start_date=datetime(2024, 1, 1))
def mixed_dag():
    @task
    def push_data():
        return "TaskFlow_data"
        TaskFlow_data = push_data()

        def pull_from_traditional(**kwargs):
            ti = kwargs["ti"]
            received_data = ti.xcom_pull(task_ids="push_data")
            print(received_data)
            traditional_task = PythonOperator(
                task_id="pull_from_traditional",
                python_callable=pull_from_traditional,
                provide_context=True,
            )

mixed_dag_instance = mixed_dag()

Внутри функции mixed_dag определена вложенная функция push_data, декорированная декоратором @task. Эта функция выполняется как отдельная задача в рамках DAG и возвращает строку TaskFlow_data. После определения функции push_data следует строка TaskFlow_data = push_data(), которая пытается вызвать эту функцию внутри определения DAG. Справедливости ради стоит отметить, что в рабочем процессе Airflow задачи должны быть связаны друг с другом, а не вызываться напрямую во время определения графа.

Еще одна функция pull_from_traditional не декорирована и предназначена для извлечения данных из предыдущей задачи с помощью метода xcom_pull. Этот традиционный метод получает данные, переданные из задачи push_data. Внутри функции pull_from_traditional создается объект traditional_task класса PythonOperator. В конце создается экземпляр DAG с помощью вызова mixed_dag_instance = mixed_dag().

Помимо совместного использования API TaskFlow с традиционными операторами, также можно передать данные в задачу TaskFlow из традиционной задачи, чтобы использовать их. Хотя по-прежнему придется использовать **kwargs в качестве параметра в задаче push, возвращая значение или использовать .xcom_push, можно получить доступ к значению, вызвав taskname.output и сохранив выходные данные. Это позволит использовать выходные данные этого традиционного оператора так же, как если бы задача была создана с помощью API TaskFlow. В следующем примере результаты задачи traditional_push_task.output сохраняются как данные, а затем используются в качестве параметра для задачи pull_data, чтобы передать значение traditional_data в задачу pull_data.

@dag(schedule_interval="@daily", start_date=datetime(2024, 1, 1))
def another_mixed_dag():
    def push_from_traditional(**kwargs):
        return "Traditional_data"
        traditional_push_task = PythonOperator(
            task_id="push_from_traditional",
            python_callable=push_from_traditional,
            provide_context=True,
        )

@task
def pull_data(received_data):
    print(received_data)
    pull_data(traditional_push_task.output)
    another_mixed_dag_instance = another_mixed_dag()

С помощью метода output() можно передавать обработанные данные от традиционного оператора в задачу TaskFlow для проверки, просто ссылаясь на process_task.output. Преобразование стандартной Python-функции в задачу TaskFlow полезно, когда надо модифицировать код без фактического переписывания традиционных функций в задачи TaskFlow. Это также позволяет объявлять одну и ту же задачу с разными входными данными, заменив образец и создав экземпляр параллельной задачи с другим набором данных.

Если надо установить расширенные конфигурации или использовать методы оператора, доступные для традиционных задач, можно извлечь базовую традиционную задачу из задачи TaskFlow, которая представляет собой оболочку традиционной задачи. Это означает, что под декоратором Taskflow существует традиционный объект задачи со всеми его методами и атрибутами, доступ к которым можно получить. В следующем примере после строки traditional_task_version = my_taskflow_task.task переменная traditional_task_version содержит традиционное представление оператора PythonOperator задачи Taskflow. Можно взаимодействовать с этой традиционной задачей, получая доступ к ее методам, атрибутам и конфигурациям.

@task
def my_taskflow_task():
    return "Hello from Taskflow!"


traditional_task_version = my_taskflow_task.task

print(
    type(traditional_task_version)
)  # This would print something like: <class 'airflow.operators.python.PythonOperator'>

return my_taskflow_task()

Таким образом, комбинируя оба подхода, можно повысить эффективность разработки DAG в Apache AirFlow, сочетая гибкость API TaskFlow с наглядностью традиционных операторов.

Научитесь использовать Apache AirFlow для оркестрации пакетных процессов в задачах реальной дата-инженерии на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://www.astronomer.io/blog/apache-airflow-taskflow-api-vs-traditional-operators/
  2. https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/taskflow.html
Поиск по сайту