Содержание
Как написать DAG в Apache AirFlow без программирования, определив его конфигурацию в YAML-файле, и автоматически получить пакетный конвейер обработки данных с помощью Python-библиотеки DAG Factory.
Демократизация разработки ETL-конвейеров или что такое DAG Factory в Apache AirFlow
Хотя Apache AirFlow и так считается довольно простым фреймворком для оркестрации пакетных процессов и реализации ETL-операций благодаря использованию Python, разработчики стараются сделать его еще проще. Это соответствует устойчивому тренду в инженерии данных с фокусированием на бизнес-логике обработки данных, а не фактическом программировании. Например, использование SQL-запросов для потоковых агрегаций и преобразований вместо разработки полноценного приложения потребителя, что я недавно показывала здесь и здесь. Таким образом, проектировать и реализовывать конвейеры обработки данных могут не только опытные дата-инженеры, но и сами потребители этих операций, т.е. аналитики.
В Apache AirFlow тоже реализуется такая тенденция демократизации инженерии данных. Поэтому в июле 2024 года компания Astronomer, которая активно развивает и продвигает этот open-source фреймворк, а также свою коммерческую платформу оркестрации на его основе, объявила об интеграции проекта DAG Factory в AirFlow. Этот проект с открытым исходным кодом, изначально созданный Адамом Боскарино, позволяет генерировать DAG-файлы из YAML-файлов. Это избавляет разработчика ETL-конвейера от необходимости программирования на Python, ведь YAML-формат знаком каждому аналитику по спецификациям OpenAPI.
Разумеется, YAML-файл с описанием пакетного конвейера трансформируется в DAG не сам по себе, а благодаря Python-пакету DAG Factory, который работает с версией языка от 3.8.0+ и Apache Airflow® 2.0+. Чтобы создать DAG из YAML-файла описывающего его конфигурацию, нужно один раз в этой же самой директории создать py-файл, в котором надо указать расположение файла конфигурации и вызвать у объекта dagfactory метод generate_dags(globals()). Как это сделать, я покажу далее на простом примере.
Практический пример генерации DAG из YAML-файла
Рассмотрим простой конвейер из 3-х задач: первая проверяет доступность внешнего веб-сайта, отправляя к нему GET-запрос, а две другие передают результат в Telegram-бот, в зависимости от успеха предыдущей задачи. Пример подобного конвейера я показывала здесь.

Реализовать эту простую бизнес-логику в AirFlow можно с помощью триггерных правил. По умолчанию в AirFlow задано правило триггера all_success, когда зависимая задача выполняется, если все вышестоящие задачи выполнены успешно. А правило триггера one_failed запустит нижестоящую задачу запускается при сбое хотя бы одной вышестоящей. Эти триггеры отлично подойдут для рассматриваемой задачи.

Если писать DAG Airflow на Python, он будет выглядеть так:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.sensors.http_sensor import HttpSensor
from airflow.providers.telegram.operators.telegram import TelegramOperator
default_args = {
'owner': 'anna',
'retries': 1,
'start_date': datetime(2024, 7, 25),
}
with DAG(
'site_check_tg',
default_args=default_args,
description='DAG проверки доступности сайта и отправки результата в TG',
schedule_interval='0 5 * * *',
catchup=False,
) as dag:
get_request = HttpSensor(
task_id='get_request',
http_conn_id='site',
endpoint='',
request_params={},
response_check=lambda response: response.status_code == 200,
poke_interval=5,
timeout=20,
retries=1,
mode='poke'
)
send_telegram_message_success = TelegramOperator(
task_id='send_telegram_message_success',
telegram_conn_id='telegram_default',
token='your token',
chat_id='your chat-id',
text='Сайт доступен!',
trigger_rule='all_success'
)
send_telegram_message_failure = TelegramOperator(
task_id='send_telegram_message_failure',
telegram_conn_id='telegram_default',
token='your-token',
chat_id='your chat-id',
text='Сайт недоступен!',
trigger_rule='one_failed'
)
get_request >> send_telegram_message_success
get_request >> send_telegram_message_failure
Но с помощью DAG Factory можно представить конфигурацию этого конвейера в виде YAML-файла:
site_check_tg:
default_args:
owner: 'anna'
retries: 1
start_date: '2024-07-25'
schedule_interval: '0 5 * * *'
catchup: False
description: 'DAG проверки доступности сайта и отправки результата в TG'
tasks:
get_request:
operator: airflow.sensors.http_sensor.HttpSensor
http_conn_id: 'site'
endpoint: ''
request_params: {}
response_check_lambda: "lambda response: response.status_code == 200"
poke_interval: 5
timeout: 20
retries: 1
mode: 'poke'
send_telegram_message:
operator: airflow.providers.telegram.operators.telegram.TelegramOperator
token: 'your-token'
chat_id: 'your-chait_id'
text: 'Сайт доступен!'
dependencies: [get_request]
trigger_rule: 'all_success'
send_telegram_message_failure:
operator: airflow.providers.telegram.operators.telegram.TelegramOperator
token: 'your token'
chat_id: 'your chat-id'
text: 'Сайт недоступен!'
dependencies: [get_request]
trigger_rule: 'one_failed'
Чтобы Airflow сгенерировал DAG на основе этого YAML-файла конфигурации, надо в рабочую директорию с конвейерами положить следующий py-файл:
from airflow import DAG
import dagfactory
from pathlib import Path
config_file = Path('/opt/airflow/dags/yaml/config_file.yml')
dag_factory = dagfactory.DagFactory(config_file)
dag_factory.clean_dags(globals())
dag_factory.generate_dags(globals())
Функция clean_dags(globals()) очищает текущие DAG-и, удаляя или деактивируя те, которые больше не актуальны. Она может использовать глобальный контекст, что предоставляется через аргумент globals() для доступа к текущему состоянию и переменным. А функция generate_dags(globals()) отвечает за динамическое создание новых DAG-ов на основе определенных конфигураций в YAML-файлах. Она тоже использует глобальный контекст для доступа к данным и переменным. Сами YAML-файлы конфигурации рекомендуется хранить в отдельной папке, доступ к которой надо указать в py-файле.

Если ошибок в YAML-файле конфигурации нет, т.е. синтаксис верный и в качестве операторов используются те, которые установлены в среде Airflow, в GUI фреймворка появится DAG, сгенерированный по этому описанию.

Таким образом, задача проектирования и реализации конвейера в Apache AirFlow с DAG Factory стала еще проще. Кроме использования готовых операторов, эта Python-библиотека также поддерживает использование пользовательских операторов, позволяя задать путь к пользовательскому оператору в ключе operator в файле конфигурации и добавить любые дополнительные параметры.
Наконец, DAG Factory поддерживает планирование DAG с помощью наборов данных (датасетов). Для этого надо указать датасет в ключе outlets YAML-файла конфигурации, определив его местоположение. А в ключе schedule потребительского DAG можно задать датасет, который надо запланировать, чтобы запустить его, когда все наборы данных будут доступны. Если YAML-конфигурация DAG слишком большая, ее можно разделить на несколько файлов. Также можно динамически создавать разное количество задач, используя ключ mapped_tasks в YAML-файле конфигурации, который представляет собой список словарей, где каждый словарь – это задача. Как это сделать, я покажу в следующий раз.
Освойте администрирование и эксплуатацию Apache AirFlow для оркестрации пакетных процессов в задачах реальной дата-инженерии на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Data Pipeline на Apache AirFlow и Apache Hadoop
- AIRFLOW с использованием Yandex Managed Service for Apache Airflow™
Источники


