YAML вместо Python: LowCode-разработка DAG в Apache AirFlow с DAG Factory

AirFlow DAG Factory , обучение AirFlow , курсы AirFlow, курсы дата-инженер разработка DAG AirFlow, Школа Больших Данных Учебный Центр Коммерсант

Как написать DAG в Apache AirFlow без программирования, определив его конфигурацию в YAML-файле, и автоматически получить пакетный конвейер обработки данных с помощью Python-библиотеки DAG Factory.

Демократизация разработки ETL-конвейеров или что такое DAG Factory в Apache AirFlow

Хотя Apache AirFlow и так считается довольно простым фреймворком для оркестрации пакетных процессов и реализации ETL-операций благодаря использованию Python, разработчики стараются сделать его еще проще. Это соответствует устойчивому тренду в инженерии данных с фокусированием на бизнес-логике обработки данных, а не фактическом программировании. Например, использование SQL-запросов для потоковых агрегаций и преобразований вместо разработки полноценного приложения потребителя, что я недавно показывала здесь и здесь. Таким образом, проектировать и реализовывать конвейеры обработки данных могут не только опытные дата-инженеры, но и сами потребители этих операций, т.е. аналитики.

В Apache AirFlow тоже реализуется такая тенденция демократизации инженерии данных. Поэтому в июле 2024 года компания Astronomer, которая активно развивает и продвигает этот open-source фреймворк, а также свою коммерческую платформу оркестрации на его основе, объявила об интеграции проекта DAG Factory в AirFlow. Этот проект с открытым исходным кодом, изначально созданный Адамом Боскарино, позволяет генерировать DAG-файлы из YAML-файлов. Это избавляет разработчика ETL-конвейера от необходимости программирования на Python, ведь YAML-формат знаком каждому аналитику по спецификациям OpenAPI.

Разумеется, YAML-файл с описанием пакетного конвейера трансформируется в DAG не сам по себе, а благодаря Python-пакету DAG Factory, который работает с версией языка от 3.8.0+ и Apache Airflow® 2.0+. Чтобы создать DAG из YAML-файла описывающего его конфигурацию, нужно один раз в этой же самой директории создать py-файл, в котором надо указать расположение файла конфигурации и вызвать у объекта dagfactory метод generate_dags(globals()). Как это сделать, я покажу далее на простом примере.

Практический пример генерации DAG из YAML-файла

Рассмотрим простой конвейер из 3-х задач: первая проверяет доступность внешнего веб-сайта, отправляя к нему GET-запрос, а две другие передают результат в Telegram-бот, в зависимости от успеха предыдущей задачи. Пример подобного конвейера я показывала здесь.

Схема бизнес-процесса проверки доступности сайта
Схема бизнес-процесса проверки доступности сайта

Реализовать эту простую бизнес-логику в AirFlow можно с помощью триггерных правил. По умолчанию в AirFlow задано правило триггера all_success, когда зависимая задача выполняется, если все вышестоящие задачи выполнены успешно. А правило триггера one_failed запустит нижестоящую задачу запускается при сбое хотя бы одной вышестоящей. Эти триггеры отлично подойдут для рассматриваемой задачи.

DAG проверки доступности сайта в Airflow
DAG проверки доступности сайта в Airflow

Если писать DAG Airflow на Python, он будет выглядеть так:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.sensors.http_sensor import HttpSensor
from airflow.providers.telegram.operators.telegram import TelegramOperator

default_args = {
    'owner': 'anna',
    'retries': 1,
    'start_date': datetime(2024, 7, 25),
}

with DAG(
    'site_check_tg',
    default_args=default_args,
    description='DAG проверки доступности сайта и отправки результата в TG',
    schedule_interval='0 5 * * *',
    catchup=False,
) as dag:

    get_request = HttpSensor(
        task_id='get_request',
        http_conn_id='site',
        endpoint='',
        request_params={},
        response_check=lambda response: response.status_code == 200,
        poke_interval=5,
        timeout=20,
        retries=1,
        mode='poke'
    )

    send_telegram_message_success = TelegramOperator(
        task_id='send_telegram_message_success',
        telegram_conn_id='telegram_default',
        token='your token',
        chat_id='your chat-id',
        text='Сайт доступен!',
        trigger_rule='all_success'
    )

    send_telegram_message_failure = TelegramOperator(
        task_id='send_telegram_message_failure',
        telegram_conn_id='telegram_default',
        token='your-token',
        chat_id='your chat-id',
        text='Сайт недоступен!',
        trigger_rule='one_failed'
    )

    get_request >> send_telegram_message_success
    get_request >> send_telegram_message_failure

Но с помощью DAG Factory можно представить конфигурацию этого конвейера в виде YAML-файла:

site_check_tg:
  default_args:
    owner: 'anna'
    retries: 1
    start_date: '2024-07-25'
  schedule_interval: '0 5 * * *'
  catchup: False
  description: 'DAG проверки доступности сайта и отправки результата в TG'
  tasks:
    get_request:
      operator: airflow.sensors.http_sensor.HttpSensor
      http_conn_id: 'site'
      endpoint: ''
      request_params: {}
      response_check_lambda: "lambda response: response.status_code == 200"
      poke_interval: 5
      timeout: 20
      retries: 1
      mode: 'poke'
    send_telegram_message:
      operator: airflow.providers.telegram.operators.telegram.TelegramOperator
      token: 'your-token'
      chat_id: 'your-chait_id'
      text: 'Сайт доступен!'
      dependencies: [get_request]
      trigger_rule: 'all_success'
    send_telegram_message_failure:
      operator: airflow.providers.telegram.operators.telegram.TelegramOperator
      token: 'your token'
      chat_id: 'your chat-id'
      text: 'Сайт недоступен!'
      dependencies: [get_request]
      trigger_rule: 'one_failed'

Чтобы Airflow сгенерировал DAG на основе этого YAML-файла конфигурации, надо в рабочую директорию с конвейерами положить следующий py-файл:

from airflow import DAG  
import dagfactory
from pathlib import Path

config_file = Path('/opt/airflow/dags/yaml/config_file.yml')
dag_factory = dagfactory.DagFactory(config_file)

dag_factory.clean_dags(globals())
dag_factory.generate_dags(globals())

Функция clean_dags(globals()) очищает текущие DAG-и, удаляя или деактивируя те, которые больше не актуальны. Она может использовать глобальный контекст, что предоставляется через аргумент globals() для доступа к текущему состоянию и переменным. А функция generate_dags(globals()) отвечает за динамическое создание новых DAG-ов на основе определенных конфигураций в YAML-файлах. Она тоже использует глобальный контекст для доступа к данным и переменным. Сами YAML-файлы конфигурации рекомендуется хранить в отдельной папке, доступ к которой надо указать в py-файле.

Пример YAML-файла с конфигурацией DAG в редакторе VSCode
Пример YAML-файла с конфигурацией DAG в редакторе VSCode

Если ошибок в YAML-файле конфигурации нет, т.е. синтаксис верный и в качестве операторов используются те, которые установлены в среде Airflow, в GUI фреймворка появится DAG, сгенерированный по этому описанию.

Выполнение DAG, сгенерированного из YAML-файла
Выполнение DAG, сгенерированного из YAML-файла

Таким образом, задача проектирования и реализации конвейера в Apache AirFlow с DAG Factory стала еще проще. Кроме использования готовых операторов, эта Python-библиотека также поддерживает использование пользовательских операторов, позволяя задать путь к пользовательскому оператору в ключе operator в файле конфигурации и добавить любые дополнительные параметры.

Наконец, DAG Factory поддерживает планирование DAG с помощью наборов данных (датасетов). Для этого надо указать датасет в ключе outlets YAML-файла конфигурации, определив его местоположение. А в ключе schedule потребительского DAG можно задать датасет, который надо запланировать, чтобы запустить его, когда все наборы данных будут доступны. Если YAML-конфигурация DAG слишком большая, ее можно разделить на несколько файлов. Также можно динамически создавать разное количество задач, используя ключ mapped_tasks в YAML-файле конфигурации, который представляет собой список словарей, где каждый словарь – это задача. Как это сделать, я покажу в следующий раз.

Освойте администрирование и эксплуатацию Apache AirFlow для оркестрации пакетных процессов в задачах реальной дата-инженерии на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://www.astronomer.io/blog/astronomer-adopts-dag-factory-democratize-writing-data-pipelines/
  2. https://www.astronomer.io/docs/learn/dag-factory
  3. https://github.com/astronomer/dag-factory
Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту