Урок 10. Продвинутые техники AirFlow и Best Practices

Урок 10. Продвинутые техники AirFlow и Best Practices

 

Представьте, что вы работаете в e-commerce. У вас есть 50 таблиц в Postgres (заказы, товары, пользователи, отзывы…), и каждую из них нужно переливать в ClickHouse по одной и той же схеме: Скачать -> Очистить -> Загрузить.

Новичок создаст 50 файлов: dag_orders.py, dag_users.py, dag_items.py… В каждом файле будет одинаковый код, отличающийся только названием таблицы. Это антипаттерн. Это нарушает принцип DRY (Don’t Repeat Yourself).

Apache Airflow — это Python. А значит, мы можем использовать циклы, конфиги и метапрограммирование, чтобы Airflow сам создавал себе задачи.

Динамическая генерация DAG-ов в Apache Airflow

Концепция проста: мы создаем один файл конфигурации (JSON или YAML), где перечисляем наши таблицы, и один Python-скрипт, который в цикле читает этот конфиг и «печет» DAG-и как пирожки.

Шаг 1. Конфигурация (dags/config/tables.yaml)

 

orders:
  schedule: "@daily"
  source_table: "public.orders"
  dest_table: "analytics.orders"

users:
  schedule: "@weekly"
  source_table: "public.users"
  dest_table: "analytics.users"

 

 

Шаг 2. Генератор (dags/dynamic_dag_generator.py)

Тут есть нюанс. Airflow ищет объекты DAG в глобальном пространстве имен (globals()).

 

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import yaml
import os

# Функция-фабрика, которая создает DAG
def create_dag(dag_id, schedule, source, dest):
    
    # Обычная функция обработки
    def process_data():
        print(f"Переливаем данные из {source} в {dest}")
        # Тут может быть вызов вашего S3Hook или ClickHouseHook
   generated_dag = DAG(
        dag_id=dag_id,
        schedule_interval=schedule,
        start_date=datetime(2023, 1, 1),
        catchup=False
    )

    with generated_dag:
        t1 = PythonOperator(
            task_id="process_table",
            python_callable=process_data
        )

    return generated_dag

# Читаем конфиг
config_path = os.path.join(os.path.dirname(__file__), "config", "tables.yaml")
with open(config_path) as f:
    config = yaml.safe_load(f)
# Магия генерации
for table_name, params in config.items():
    dag_id = f"etl_{table_name}"
    
    # Создаем объект DAG
    dag_obj = create_dag(
        dag_id=dag_id,
        schedule=params['schedule'],
        source=params['source_table'],
        dest=params['dest_table']
    )
    
    # ВАЖНО: Кладем DAG в globals(), чтобы Airflow его увидел
    globals()[dag_id] = dag_obj

Теперь, если вы добавите новую таблицу в YAML-файл, Airflow автоматически создаст новый DAG при следующем сканировании папки. Никакой копипасты.

 

Apache Airflow для инженеров данных

Код курса
AIRF
Ближайшая дата курса
23 марта, 2026
Продолжительность
24 ак.часов
Стоимость обучения
76 800

 

TaskFlow API — новый стиль написания кода в AirFlow

 

Если вы посмотрите на наши предыдущие статьи, мы везде использовали классический стиль: PythonOperator, op_kwargs, ручной xcom_pull. Это рабочий, но многословный подход (boilerplate code).

Начиная с версии 2.0, Airflow представил TaskFlow API. Это использование декораторов @task и @dag.

Было (Старый стиль):

def extract(**kwargs):
    return {"data": [1, 2, 3]}

def transform(**kwargs):
    ti = kwargs['ti']
    data = ti.xcom_pull(task_ids='extract')
    return [x * 2 for x in data['data']]

t1 = PythonOperator(task_id='extract', python_callable=extract)
t2 = PythonOperator(task_id='transform', python_callable=transform)
t1 >> t2

 

 

Стало (TaskFlow API):

 

from airflow.decorators import dag, task
from datetime import datetime

@dag(start_date=datetime(2023, 1, 1), schedule=None, catchup=False)
def taskflow_etl():

    @task
    def extract():
        return [1, 2, 3]

    @task
    def transform(data_list):
        # Airflow сам поймет, что data_list пришел из предыдущей задачи
        # и сам сделает xcom_pull под капотом
        return [x * 2 for x in data_list]

    @task
    def load(data_list):
        print(f"Loading: {data_list}")
  # Задаем порядок выполнения как вызов функций
    raw_data = extract()
    clean_data = transform(raw_data)
    load(clean_data)

# Инициализируем DAG
my_dag = taskflow_etl()

Это делает код чище, понятнее и больше похожим на обычный Python-скрипт. Airflow берет на себя всю «грязную работу» по передаче данных.

 

Мониторинг и Алертинг — что делать когда все сломалось

 

Никто не смотрит в монитор Airflow 24/7. Если DAG упал ночью, вы должны узнать об этом сразу. В Airflow есть механизм Callbacks (обратных вызовов). Мы можем повесить функцию на событие on_failure_callback.

Пример: Отправка уведомления в Telegram

Сначала напишем функцию отправки (понадобится pip install requests):

import requests

def send_telegram_alert(context):
    token = "ВАШ_БОТ_ТОКЕН"
    chat_id = "ВАШ_CHAT_ID"
    
    # context содержит информацию об упавшей задаче
    task_instance = context.get('task_instance')
    dag_id = task_instance.dag_id
    task_id = task_instance.task_id
    log_url = task_instance.log_url
    exception = context.get('exception')

    msg = f"""
    🔴 **ALARM! Task Failed**
    **DAG:** {dag_id}
    **Task:** {task_id}
    **Error:** {str(exception)[:100]}...
    [See Logs]({log_url})
    """
    
    url = f"https://api.telegram.org/bot{token}/sendMessage"
    requests.post(url, data={'chat_id': chat_id, 'text': msg, 'parse_mode': 'Markdown'})


Теперь подключим её ко всем задачам в DAG через default_args:
default_args = {
    'owner': 'airflow',
    'retries': 1,
    'on_failure_callback': send_telegram_alert # <--- Магия здесь
}

with DAG(..., default_args=default_args) as dag:
    # Теперь, если любая задача упадет, Airflow вызовет функцию алерта
    ...

 

Apache Airflow для инженеров данных

Код курса
AIRF
Ближайшая дата курса
23 марта, 2026
Продолжительность
24 ак.часов
Стоимость обучения
76 800

 

Troubleshooting: Ошибки, которые убивают прод

 

Ошибка 1: Тяжелый код на верхнем уровне (Top-Level Code) Планировщик Airflow сканирует папку dags каждые 30 секунд (по умолчанию). Он открывает каждый файл и исполняет код, чтобы найти объекты DAG.

  • Плохо: Делать запросы к БД или API вне функций операторов.

 

<!-- end list -->
# НИКОГДА ТАК НЕ ДЕЛАЙТЕ
my_data = requests.get("https://api.example.com").json() # <-- Этот код будет выполняться каждые 30 сек планировщиком!

with DAG(...) as dag:
    ...
  • Последствия: Планировщик зависнет, CPU загрузится на 100%, новые задачи перестанут запускаться.
  • Решение: Весь тяжелый код только внутри def my_func(): или внутри операторов.

Ошибка 2: Передача Гигабайтов через XCom TaskFlow API делает передачу данных легкой (extract() >> transform()), и возникает соблазн передавать огромные датафреймы.

  • Проблема: XCom пишет данные в базу метаданных (Postgres). Она не предназначена для хранения блобов по 500 МБ.
  • Последствия: База Airflow распухает, интерфейс тормозит, задачи падают с ошибкой базы.
  • Решение: Передавайте через XCom только пути к файлам в S3 (s3://bucket/data.csv), а не сами данные.

Ошибка 3: Зомби-задачи Иногда задача помечена как Running, но воркер уже умер или был перезагружен.

  • Решение: Настройте параметры scheduler_zombie_task_threshold. Airflow периодически проверяет, жив ли процесс, и если нет — помечает задачу как Failed, чтобы сработал Retry.

 

Как Cursor помогает в рефакторинге

Миграция старого кода на новые рельсы — идеальная задача для AI.

Сценарий: У вас есть старый DAG на PythonOperator, и вы хотите переписать его на TaskFlow API. Промпт для Cursor: «Перепиши этот код DAG-а, используя современный стиль Airflow TaskFlow API (декораторы @task и @dag). Убедись, что передача данных между задачами происходит через возвращаемые значения, а не через явный xcom_pull.»

Сценарий: Генерация конфига для динамических дагов. Промпт: «У меня есть список из 10 таблиц: [table1, table2…]. Создай YAML-конфиг файл для генерации DAG-ов Airflow, где для каждой таблицы будет указано расписание и имя целевой таблицы. Также напиши Python-код генератора, который читает этот YAML.»

 

Финал цикла

Поздравляю! Мы прошли путь от «Hello World» в консоли до построения сложной, отказоустойчивой платформы данных.

Чему мы научились за 10 статей:

Что дальше? Дальше — только практика. Apache Airflow — это стандарт индустрии. Знание того, как он работает «под капотом» (о чем мы говорили в каждой статье), делает вас не просто пользователем инструмента, а инженером, способным проектировать надежные системы.

Спасибо, что были с нами в этом путешествии. Удачных пайплайнов и зеленых дагов! 🟢

Все финальные варианты кода Dags и конфигурационных файлов лежат в репозитории на GitHub как и обычно.

Бесплатный курс Apache Airflow для новичков from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime with DAG( dag_id="spark_submit_demo", start_date=datetime(2025, 1, 1), schedule="@daily", catchup=False ) as dag: run = BashOperator( task_id="run_job", bash_command="spark-submit app.py" ) GitHub code example Бесплатный курс Apache Airflow для новичков

 

Использованные референсы и материалы

 

Полный перечень статей Бесплатного курса «Apache Airflow для начинающих»

Урок 1. Apache Airflow с нуля: Архитектура, отличие от Cron и запуск в Docker

Урок 2. Масштабирование Airflow: Настройка CeleryExecutor и Redis в Docker Compose

Урок 3. Работа с базами данных в Airflow: Connections, Hooks и PostgresOperator

Урок 4. Airflow и S3: Интеграция с MinIO и Yandex Object Storage

Урок 5. Airflow и Hadoop: Настройка WebHDFS и работа с сенсорами (Sensors)

Урок 6. Запуск Apache Spark из Airflow: Гайд по SparkSubmitOperator

Урок 7. Airflow и Dask: Масштабирование тяжелых Python-задач и Pandas

Урок 8. Event-Driven Airflow: Запуск DAG по событиям из Apache Kafka

Урок 9. Загрузка данных в ClickHouse через Airflow: Быстрый ETL и батчинг

Урок 10. Airflow Best Practices: Динамические DAGи, TaskFlow API и Алертинг

Изменение базового тарифа с 1 января 2026 года Подробнее