Dataset vs XCom: что выбрать для обмена данными между задачами в Apache AirFlow

Airflow XCom Dataset, обучение Airflow , курсы Airflow , курсы дата-инженеров, Airflow примеры курсы обучение

Чем обмен данными через XCom отличается от использования Dataset и какой из механизмов выбирать для обмена данными между задачами Apache Airflow: разбираем на практическом примере.

Обмен данными через XCom

В Apache Airflow есть несколько механизмов для обмена данными между задачами: XCom и набор данных (Dataset). При общей цели они предназначены для разных сценариев и имеют свои особенности. XCom(Cross-Communication) позволяет задачам обмениваться небольшими сообщениями. Обычно XCom используется в следующих случаях:

  • когда нужно передать небольшие объемы данных между задачами;
  • когда данные должны быть доступны немедленно после выполнения задачи;
  • когда необходимо передать параметры конфигурации или метаданные.

Понятие Dataset появилось в Airflow с версии 2.4, о чем мы писали здесь. Набор данных помогает организовать зависимости между задачами на уровне данных. Dataset определяет, какие данные были обновлены и какие задачи должны быть запущены в ответ на эти изменения. Например, есть задача, которая обновляет таблицу в базе данных, и нужно, чтобы другие задачи запускались только после того, как данные будут обновлены. Dataset используется для отслеживания состояния данных, и задачи, зависящие от этого Dataset, будут выполнены автоматически.

Обычно набор данных используется в следующих ситуациях:

  • когда необходимо организовать зависимости задач на основе состояния данных;
  • когда одна задача может влиять на выполнение множества других задач в зависимости от обновленных данных;
  • когда данные обновляются асинхронно, и необходимо запускать задачи только после завершения этого процесса.

Таким образом, выбирая между XCom и Dataset, первый вариант подходит, когда нужно передать конкретные значения между задачами в рамках одного DAG, и эти данные являются частью логики выполнения задач. Dataset подойдет для более сложных зависимостей на уровне данных, особенно когда их обновление влияет на выполнение нескольких задач в разных DAG или важна отслеживаемость состояния данных.

Чтобы продемонстрировать, как это работает, напишем DAG из 3-х задач генерации и обработки данных о пользовательских действиях на веб-страницах. DAG называется запускается ежедневно, начиная с текущей даты, без повтора выполнения задач за прошедший период, т.к. catchup=False. 1-ая задача генерирует фейковые данные о пользовательских действиях, 2-ая обрабатывает эти данные для получения статистики, а 3-я логирует результаты в CSV-файл. Обмен данными между задачами организуем через XCom. В качестве стартовой и конечной точки добавим задачи с пустыми операторами. Код DAG-файла, написанный в стиле смешения традиционного и TaskFlow API, выглядит так:

from airflow import DAG
from airflow.decorators import task
from airflow.operators.dummy import EmptyOperator
from datetime import datetime, timedelta
from faker import Faker
import random
import csv
import os

default_args = {
    'owner': 'airflow',
    'start_date': datetime.now() - timedelta(days=1),
    'retries': 1
}

dag = DAG(
    dag_id='ANNA_DAG_XCom',
    default_args=default_args,
    schedule_interval='@daily'
)

@task()
def generate_user_data(**kwargs):
    fake = Faker()
    
    # Генерация веб-страниц и пользователей
    num_pages = 10
    num_users = 10

    pages = [fake.url() for _ in range(num_pages)]
    users = [fake.ascii_free_email() for _ in range(num_users)]

    # События
    events = ['click', 'scroll', 'submit', 'download', 'focus']

    # Генерация пользовательских событий для 5 пользователей
    user_events_list = []

    for _ in range(5):  # Генерация данных для 5 пользователей
        num_user_pages = random.randint(1, 5)  # Количество страниц для пользователя
        user_events = {
            'user': random.choice(users),
            'date': datetime.now().strftime('%Y-%m-%d'),
            'actions': [
                {
                    'page': random.choice(pages),
                    'events': [
                        {
                            'type': random.choice(events),
                            'quantity': fake.random_int(min=0, max=10)
                        } for _ in range(random.randint(1, 5))  # Количество событий на странице
                    ]
                } for _ in range(num_user_pages)  # Генерация действий на страницах
            ]
        }
        user_events_list.append(user_events)

    kwargs['ti'].xcom_push(key='user_data', value=user_events_list)

@task()
def calculate_statistics(**kwargs):
    user_data = kwargs['ti'].xcom_pull(key='user_data', task_ids='generate_user_data')
    overall_statistics = []

    for entry in user_data:
        user = entry['user']
        date = entry['date']

        for action in entry['actions']:
            page = action['page']

            for event in action['events']:
                event_type = event['type']
                quantity = event['quantity']

                # Общая статистика
                overall_statistics.append({
                    'date': date,
                    'user': user,
                    'page': page,
                    'event': event_type,
                    'quantity': quantity
                })

    kwargs['ti'].xcom_push(key='overall_statistics', value=overall_statistics)

@task()
def log_statistics(**kwargs):
    # Логирование общей статистики
    overall_statistics = kwargs['ti'].xcom_pull(key='overall_statistics', task_ids='calculate_statistics')

    with open('/content/airflow/data/overall_statistics.csv', mode='w', newline='') as file:
        writer = csv.DictWriter(file, fieldnames=['date', 'user', 'page', 'event', 'quantity'])
        writer.writeheader()
        for entry in overall_statistics:
            writer.writerow(entry)

with DAG(dag_id='ANNA_DAG_XCom', start_date=datetime.now() - timedelta(days=1), schedule='@daily', catchup=False) as dag:
    start_task = EmptyOperator(task_id='start_task')
    generate_user_data_task = generate_user_data()
    calculate_statistics_task = calculate_statistics()
    log_statistics_task=log_statistics()
    end_task = EmptyOperator(task_id='end_task')

    # Задаем последовательность выполнения задач
    start_task >> generate_user_data_task >> calculate_statistics_task >> log_statistics_task >> end_task

Запустим этот DAG в AirFlow.

Выполнение DAG в AirFlow
Выполнение DAG в AirFlow

XCom-объекты отображаются в интерфейсе фреймворка.

Сохраненные XCom-объекты для обмена данными между задачами
Сохраненные XCom-объекты для обмена данными между задачами

Сгенерированный файл статистики пользовательского поведения сохранен в директории Colab, где я запускала AirFlow. Этот CSV-файл можно посмотреть и скачать.

Созданный файл с данными
Созданный файл с данными

Обмен данными через Dataset в Airflow

Рассмотрим аналогичный пример с Dataset для обмена данными. Dataset предоставляет возможность обмениваться данными между DAG-ами, но его реализация требует некоторой настройки среды. Например, я буду использовать ранее сгенерированный файл в качестве источника данных. Сперва определим два набора данных: исходный файл данных data_file и файл с обработанными данными processed_data_file. Задача create_dataset проверяет существование исходного файла. Если файл не найден или пуст, записывает сообщение об ошибке в лог-файл. Задача use_dataset считывает данные из CSV-файла и подсчитывает количество событий для каждого пользователя, а также записывает результаты в новый CSV-файл user_statistics.csv. Чтобы выполнять эту задачу, если ни одна из предыдущих задач не завершилась с ошибкой и хотя бы одна завершилась успешно, зададим ей правило триггера none_failed_min_one_success. Аналогично предыдущему примеру в качестве стартовой и конечной точки добавим задачи с пустыми операторами. Код DAG-файла выглядит так:

from airflow import DAG, Dataset
from airflow.decorators import task
from airflow.operators.dummy import EmptyOperator
from datetime import datetime, timedelta
import csv
import os

# Определяем путь к файлу
file_path = '/content/airflow/data/overall_statistics.csv'
log_file_path = '/content/airflow/data/dataset_errors.log'
output_file_path = '/content/airflow/data/user_statistics.csv'

# Определяем Dataset
data_file = Dataset(file_path)
processed_data_file = Dataset(output_file_path)

@task(outlets=[data_file])
def create_dataset():
    # Проверяем, существует ли файл
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"Файл {file_path} не найден")

    # Проверяем, не пустой ли файл
    if os.path.getsize(file_path) == 0:
        with open(log_file_path, 'a') as log_file:
            log_file.write(f"Dataset not found: {file_path} - {datetime.now()}")
        return

    print(f"Файл {file_path} существует и готов к использованию")

@task(outlets=[processed_data_file], trigger_rule='none_failed_min_one_success')
def use_dataset():
    user_statistics = {}
    
    with open(file_path, 'r') as f:
        reader = csv.DictReader(f, delimiter=',')
        for row in reader:
            user = row['user']
            if user not in user_statistics:
                user_statistics[user] = 0
            user_statistics[user] += int(row['quantity'])
    
    with open(output_file_path, mode='w', newline='') as file:
        writer = csv.DictWriter(file, fieldnames=['user', 'events_quantity'])
        writer.writeheader()
        for user, count in user_statistics.items():
            writer.writerow({'user': user, 'events_quantity': count})

with DAG(dag_id='ANNA_DAG_Dataset', start_date=datetime.now() - timedelta(days=1), schedule='@daily', catchup=False) as dag:
    start_task = EmptyOperator(task_id='start_task')
    create_dataset_task = create_dataset()
    use_dataset_task = use_dataset()
    end_task = EmptyOperator(task_id='end_task')

    # Задаем последовательность выполнения задач
    start_task >> create_dataset_task >> use_dataset_task >> end_task

Запустим этот DAG в AirFlow.

Выполнение DAG в AirFlow
Выполнение DAG в AirFlow

Сгенерированные датасеты отображаются в веб-интерфейсе фреймворка.

Созданные датасеты для обмена данными между задачами
Созданные датасеты для обмена данными между задачами

Как и в предыдущем случае, сгенерированный CSV-файл статистики пользовательского поведения сохранен в директории Colab, где запущен AirFlow.

Файл с результатами аналитики данных
Файл с результатами аналитики данных

Обмен данными через XCom-объекты мне показался немного проще, чем через наборы данных. Однако, поскольку XCom имеет ограниченный размер и по умолчанию сохраняется в базе данных метаданных, он подходит только для простых кейсов. Поэтому дата-инженеру полезно знать про возможности Dataset, который может быть подгружен из любого, в т.ч. внешнего источника, и уметь пользоваться этим механизмом.

Узнайте больше про администрирование и эксплуатацию Apache AirFlow для оркестрации пакетных процессов в задачах реальной дата-инженерии на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту