Декораторы в Apache AirFlow

Airflow TaskFlow API декораторы, обучение Airflow , курсы Airflow , курсы дата-инженеров, Airflow примеры курсы обучение

Что такое Python-декораторы в Airflow, зачем они нужны, какие они бывают и чем полезны: ликбез по TaskFlow API на практическом примере DAG.

Что такое Python-декораторы в Airflow и какие они бывают

Будучи написанным на Python, Apache Airflow использует именно этот язык в качестве средства разработки дата-конвейеров. После определения функции в задаче или DAG с помощью ключевого слова def, ее можно расширить, изменив ее поведение через декоратор @. В Python декораторы — это функции, которые принимают другую функцию в качестве аргумента и расширяют ее поведение. Для Airflow декораторы расширяет поведение обычной функции Python, превращая ее в задачу, группу задач или DAG. На декораторах основан TaskFlow API, который упрощает разработку, позволяя писать задачи Python с декораторами, обрабатывает передачу данных между задачами с помощью XCom и автоматически выводит зависимости задач.

В настоящее время Airflow поддерживает следующие декораторы:

  • @dag() — создает DAG;
  • @task_group() — создает TaskGroup;
  • @task() — создает задачу Python;
  • @task.bash() — создает задачу BashOperator ;
  • @task.virtualenv() – запускает пользовательскую задачу Python в виртуальной среде;
  • @task.docker() — создает задачу DockerOperator ;
  • @task.short_circuit() — оценивает условие и пропускает последующие задачи, если условие ложно;
  • @task.branch() — создает ветвление в DAG на основе оцененного условия;
  • @task.branch_external_python — создает ветвление в DAG, выполняющее код Python в уже существующей виртуальной среде;
  • @task.branch_virtualenv – создает ветвление в DAG, выполняющее код Python в недавно созданной виртуальной среде, которую можно кэшировать с помощью venv_cache_path;
  • @task.kubernetes() — запускает задачу KubernetesPodOperator ;
  • @task.sensor() — превращает Python-функцию в датчик (сенсор) для непрерывного мониторинга какого-либо условия до тех пор, пока оно не будет выполнено;
  • @task.pyspark() – для работы с объектами Spark-приложения  – SparkSession и SparkContext.

Кроме того, начиная с версии 2.2, дата-инженер может создать в Airflow свой собственный пользовательский декоратор в интерфейс TaskFlow из пакета провайдеров. Причем даже есть возможность встроить отображение своего пользовательского декоратора в среду разработки, упрощая процесс написания кода. Декорированные задачи очень гибкие: можно повторно использовать декорированную задачу в нескольких DAG, переопределяя ее параметры, такие как task_id, queue, pool, и пр.

Далее сравним, чем отличается код DAG, написанный в стиле традиционного API операторов Airflow от декораторов TaskFlow.

Пример DAG

Чтобы показать разницу между традиционным API Airflow с операторами и использованием декораторов TaskFlow API, рассмотрим кейс из прошлой статьи, написав простой  DAG из 3-х задач генерации и обработки данных о пользовательских действиях на веб-страницах. DAG называется запускается ежедневно, начиная с текущей даты, без повтора выполнения задач за прошедший период, т.к. catchup=False. Первая задача генерирует фейковые данные о пользовательских действиях, вторая обрабатывает эти данные для получения статистики, а третья записывает результаты в CSV-файл. Код DAG выглядит следующим образом:

from airflow import DAG, Dataset
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import EmptyOperator
from datetime import datetime, timedelta
import csv
import os

# Определяем путь к файлу
file_path = '/content/airflow/data/overall_statistics.csv'
log_file_path = '/content/airflow/data/dataset_errors.log'
output_file_path = '/content/airflow/data/user_statistics.csv'

# Определяем Dataset
data_file = Dataset(file_path)
processed_data_file = Dataset(output_file_path)

def create_dataset(**kwargs):
    # Проверяем, существует ли файл
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"Файл {file_path} не найден")
    # Проверяем, не пустой ли файл
    if os.path.getsize(file_path) == 0:
        with open(log_file_path, 'a') as log_file:
            log_file.write(f"Dataset not found: {file_path} - {datetime.now()}")
        return
    print(f"Файл {file_path} существует и готов к использованию")

def use_dataset(**kwargs):
    user_statistics = {}
    
    with open(file_path, 'r') as f:
        reader = csv.DictReader(f, delimiter=',')
        for row in reader:
            user = row['user']
            if user not in user_statistics:
                user_statistics[user] = 0
            user_statistics[user] += int(row['quantity'])
    
    with open(output_file_path, mode='w', newline='') as file:
        writer = csv.DictWriter(file, fieldnames=['user', 'events_quantity'])
        writer.writeheader()
        for user, count in user_statistics.items():
            writer.writerow({'user': user, 'events_quantity': count})

with DAG(
    dag_id='ANNA_DAG_Dataset',
    start_date=datetime.now() - timedelta(days=1),
    schedule_interval='@daily',
    catchup=False
) as dag:
    
    start_task = EmptyOperator(task_id='start_task')
    
    create_dataset_task = PythonOperator(
        task_id='create_dataset_task',
        python_callable=create_dataset,
        outlets=[data_file]
    )
    
    use_dataset_task = PythonOperator(
        task_id='use_dataset_task',
        python_callable=use_dataset,
        trigger_rule='none_failed_min_one_success',
        outlets=[processed_data_file]
    )
    
    end_task = EmptyOperator(task_id='end_task')
    
    # Задаем последовательность выполнения задач
    start_task >> create_dataset_task >> use_dataset_task >> end_task

При использовании TaskFlow API декоратор @dag используется для объявления самого DAG, а декоратор @task для определения задач внутри DAG:

from airflow import DAG, Dataset
from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator
from datetime import datetime, timedelta
import csv
import os

# Определяем путь к файлу
file_path = '/content/airflow/data/overall_statistics.csv'
log_file_path = '/content/airflow/data/dataset_errors.log'
output_file_path = '/content/airflow/data/user_statistics.csv'

# Определяем Dataset
data_file = Dataset(file_path)
processed_data_file = Dataset(output_file_path)

@dag(dag_id='ANNA_DAG_Dataset', start_date=datetime.now() - timedelta(days=1), schedule='@daily', catchup=False)
def anna_dag():

    @task(outlets=[data_file])
    def create_dataset():
        # Проверяем, существует ли файл
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"Файл {file_path} не найден")
        # Проверяем, не пустой ли файл
        if os.path.getsize(file_path) == 0:
            with open(log_file_path, 'a') as log_file:
                log_file.write(f"Dataset not found: {file_path} - {datetime.now()}")
            return
        print(f"Файл {file_path} существует и готов к использованию")

    @task(outlets=[processed_data_file], trigger_rule='none_failed_min_one_success')
    def use_dataset():
        user_statistics = {}
        
        with open(file_path, 'r') as f:
            reader = csv.DictReader(f, delimiter=',')
            for row in reader:
                user = row['user']
                if user not in user_statistics:
                    user_statistics[user] = 0
                user_statistics[user] += int(row['quantity'])
        
        with open(output_file_path, mode='w', newline='') as file:
            writer = csv.DictWriter(file, fieldnames=['user', 'events_quantity'])
            writer.writeheader()
            for user, count in user_statistics.items():
                writer.writerow({'user': user, 'events_quantity': count})

    start_task = EmptyOperator(task_id='start_task')
    end_task = EmptyOperator(task_id='end_task')

    # Определяем последовательность выполнения задач
    start_task >> create_dataset() >> use_dataset() >> end_task

# Инициализируем DAG
anna_dag_dag = anna_dag()

Код с декораторами выглядит лаконичнее и проще традиционного API.

Поскольку TaskFlow использует XCom для передачи переменных в каждую задачу, они должны быть сериализованы. Airflow изначально поддерживает все встроенные типы (int, str и пр.) и объекты, декорированные с помощью @dataclass или @attr.define. Если разработчик использует собственный тип данных, придется управлять сериализацией самостоятельно, добавив функцию сериализации serialize() и статический метод десериализации deserialize() в свой класс с параметрами, например,

deserialize(data: dict, version: int)

Можно также использовать набор данных (Dataset). Он автоматически регистрируется как inlet, если он используется как входной аргумент, или как outlet, если возвращаемое значение задачи равно Dataset или список наборов данных (list[Dataset]]). Подробнее про отличия обмена данными через Dataset от использования XCom мы писали здесь.

В заключение отметим, что декораторы упрощают код, но иногда их использование может немного снизить производительность из-за дополнительных оберток.

Узнайте больше про администрирование и эксплуатацию Apache AirFlow для оркестрации пакетных процессов в задачах реальной дата-инженерии на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://www.astronomer.io/docs/learn/airflow-decorators
  2. https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/decorators/index.html#module-airflow.decorators
  3. https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/taskflow.html
Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту