Содержание
- Динамическая генерация DAG-ов в Apache Airflow
- Шаг 1. Конфигурация (dags/config/tables.yaml)
- Шаг 2. Генератор (dags/dynamic_dag_generator.py)
- TaskFlow API - новый стиль написания кода в AirFlow
- Мониторинг и Алертинг - что делать когда все сломалось
- Troubleshooting: Ошибки, которые убивают прод
- Как Cursor помогает в рефакторинге
- Финал цикла
- Использованные референсы и материалы
- Полный перечень статей Бесплатного курса "Apache Airflow для начинающих"
Представьте, что вы работаете в e-commerce. У вас есть 50 таблиц в Postgres (заказы, товары, пользователи, отзывы…), и каждую из них нужно переливать в ClickHouse по одной и той же схеме: Скачать -> Очистить -> Загрузить.
Новичок создаст 50 файлов: dag_orders.py, dag_users.py, dag_items.py… В каждом файле будет одинаковый код, отличающийся только названием таблицы. Это антипаттерн. Это нарушает принцип DRY (Don’t Repeat Yourself).
Apache Airflow — это Python. А значит, мы можем использовать циклы, конфиги и метапрограммирование, чтобы Airflow сам создавал себе задачи.
Динамическая генерация DAG-ов в Apache Airflow
Концепция проста: мы создаем один файл конфигурации (JSON или YAML), где перечисляем наши таблицы, и один Python-скрипт, который в цикле читает этот конфиг и «печет» DAG-и как пирожки.
Шаг 1. Конфигурация (dags/config/tables.yaml)
orders: schedule: "@daily" source_table: "public.orders" dest_table: "analytics.orders" users: schedule: "@weekly" source_table: "public.users" dest_table: "analytics.users"
Шаг 2. Генератор (dags/dynamic_dag_generator.py)
Тут есть нюанс. Airflow ищет объекты DAG в глобальном пространстве имен (globals()).
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import yaml
import os
# Функция-фабрика, которая создает DAG
def create_dag(dag_id, schedule, source, dest):
# Обычная функция обработки
def process_data():
print(f"Переливаем данные из {source} в {dest}")
# Тут может быть вызов вашего S3Hook или ClickHouseHook
generated_dag = DAG(
dag_id=dag_id,
schedule_interval=schedule,
start_date=datetime(2023, 1, 1),
catchup=False
)
with generated_dag:
t1 = PythonOperator(
task_id="process_table",
python_callable=process_data
)
return generated_dag
# Читаем конфиг
config_path = os.path.join(os.path.dirname(__file__), "config", "tables.yaml")
with open(config_path) as f:
config = yaml.safe_load(f)
# Магия генерации
for table_name, params in config.items():
dag_id = f"etl_{table_name}"
# Создаем объект DAG
dag_obj = create_dag(
dag_id=dag_id,
schedule=params['schedule'],
source=params['source_table'],
dest=params['dest_table']
)
# ВАЖНО: Кладем DAG в globals(), чтобы Airflow его увидел
globals()[dag_id] = dag_obj
Теперь, если вы добавите новую таблицу в YAML-файл, Airflow автоматически создаст новый DAG при следующем сканировании папки. Никакой копипасты.
Apache Airflow для инженеров данных
Код курса
AIRF
Ближайшая дата курса
23 марта, 2026
Продолжительность
24 ак.часов
Стоимость обучения
76 800
TaskFlow API — новый стиль написания кода в AirFlow
Если вы посмотрите на наши предыдущие статьи, мы везде использовали классический стиль: PythonOperator, op_kwargs, ручной xcom_pull. Это рабочий, но многословный подход (boilerplate code).
Начиная с версии 2.0, Airflow представил TaskFlow API. Это использование декораторов @task и @dag.
Было (Старый стиль):
def extract(**kwargs):
return {"data": [1, 2, 3]}
def transform(**kwargs):
ti = kwargs['ti']
data = ti.xcom_pull(task_ids='extract')
return [x * 2 for x in data['data']]
t1 = PythonOperator(task_id='extract', python_callable=extract)
t2 = PythonOperator(task_id='transform', python_callable=transform)
t1 >> t2
Стало (TaskFlow API):
from airflow.decorators import dag, task
from datetime import datetime
@dag(start_date=datetime(2023, 1, 1), schedule=None, catchup=False)
def taskflow_etl():
@task
def extract():
return [1, 2, 3]
@task
def transform(data_list):
# Airflow сам поймет, что data_list пришел из предыдущей задачи
# и сам сделает xcom_pull под капотом
return [x * 2 for x in data_list]
@task
def load(data_list):
print(f"Loading: {data_list}")
# Задаем порядок выполнения как вызов функций
raw_data = extract()
clean_data = transform(raw_data)
load(clean_data)
# Инициализируем DAG
my_dag = taskflow_etl()
Это делает код чище, понятнее и больше похожим на обычный Python-скрипт. Airflow берет на себя всю «грязную работу» по передаче данных.
Мониторинг и Алертинг — что делать когда все сломалось
Никто не смотрит в монитор Airflow 24/7. Если DAG упал ночью, вы должны узнать об этом сразу. В Airflow есть механизм Callbacks (обратных вызовов). Мы можем повесить функцию на событие on_failure_callback.
Пример: Отправка уведомления в Telegram
Сначала напишем функцию отправки (понадобится pip install requests):
import requests
def send_telegram_alert(context):
token = "ВАШ_БОТ_ТОКЕН"
chat_id = "ВАШ_CHAT_ID"
# context содержит информацию об упавшей задаче
task_instance = context.get('task_instance')
dag_id = task_instance.dag_id
task_id = task_instance.task_id
log_url = task_instance.log_url
exception = context.get('exception')
msg = f"""
🔴 **ALARM! Task Failed**
**DAG:** {dag_id}
**Task:** {task_id}
**Error:** {str(exception)[:100]}...
[See Logs]({log_url})
"""
url = f"https://api.telegram.org/bot{token}/sendMessage"
requests.post(url, data={'chat_id': chat_id, 'text': msg, 'parse_mode': 'Markdown'})
Теперь подключим её ко всем задачам в DAG через default_args:
default_args = {
'owner': 'airflow',
'retries': 1,
'on_failure_callback': send_telegram_alert # <--- Магия здесь
}
with DAG(..., default_args=default_args) as dag:
# Теперь, если любая задача упадет, Airflow вызовет функцию алерта
...
Apache Airflow для инженеров данных
Код курса
AIRF
Ближайшая дата курса
23 марта, 2026
Продолжительность
24 ак.часов
Стоимость обучения
76 800
Troubleshooting: Ошибки, которые убивают прод
Ошибка 1: Тяжелый код на верхнем уровне (Top-Level Code) Планировщик Airflow сканирует папку dags каждые 30 секунд (по умолчанию). Он открывает каждый файл и исполняет код, чтобы найти объекты DAG.
- Плохо: Делать запросы к БД или API вне функций операторов.
<!-- end list -->
# НИКОГДА ТАК НЕ ДЕЛАЙТЕ
my_data = requests.get("https://api.example.com").json() # <-- Этот код будет выполняться каждые 30 сек планировщиком!
with DAG(...) as dag:
...
- Последствия: Планировщик зависнет, CPU загрузится на 100%, новые задачи перестанут запускаться.
- Решение: Весь тяжелый код только внутри def my_func(): или внутри операторов.
Ошибка 2: Передача Гигабайтов через XCom TaskFlow API делает передачу данных легкой (extract() >> transform()), и возникает соблазн передавать огромные датафреймы.
- Проблема: XCom пишет данные в базу метаданных (Postgres). Она не предназначена для хранения блобов по 500 МБ.
- Последствия: База Airflow распухает, интерфейс тормозит, задачи падают с ошибкой базы.
- Решение: Передавайте через XCom только пути к файлам в S3 (s3://bucket/data.csv), а не сами данные.
Ошибка 3: Зомби-задачи Иногда задача помечена как Running, но воркер уже умер или был перезагружен.
- Решение: Настройте параметры scheduler_zombie_task_threshold. Airflow периодически проверяет, жив ли процесс, и если нет — помечает задачу как Failed, чтобы сработал Retry.
Как Cursor помогает в рефакторинге
Миграция старого кода на новые рельсы — идеальная задача для AI.
Сценарий: У вас есть старый DAG на PythonOperator, и вы хотите переписать его на TaskFlow API. Промпт для Cursor: «Перепиши этот код DAG-а, используя современный стиль Airflow TaskFlow API (декораторы @task и @dag). Убедись, что передача данных между задачами происходит через возвращаемые значения, а не через явный xcom_pull.»
Сценарий: Генерация конфига для динамических дагов. Промпт: «У меня есть список из 10 таблиц: [table1, table2…]. Создай YAML-конфиг файл для генерации DAG-ов Airflow, где для каждой таблицы будет указано расписание и имя целевой таблицы. Также напиши Python-код генератора, который читает этот YAML.»
Финал цикла
Поздравляю! Мы прошли путь от «Hello World» в консоли до построения сложной, отказоустойчивой платформы данных.
Чему мы научились за 10 статей:
- Поняли архитектуру (Scheduler, Webserver, Worker).
- Запустили Airflow в Docker.
- Подружили его с Postgres и научились прятать пароли в Connections.
- Построили Data Lake на S3 (MinIO/Yandex).
- Освоили HDFS и работу с файловыми системами.
- Запустили тяжелые вычисления на Spark.
- Масштабировали Python-код через Dask.
- Сделали систему реактивной с Kafka.
- Загрузили данные в аналитическую ClickHouse.
- Освоили генерацию DAG-ов и мониторинг.
Что дальше? Дальше — только практика. Apache Airflow — это стандарт индустрии. Знание того, как он работает «под капотом» (о чем мы говорили в каждой статье), делает вас не просто пользователем инструмента, а инженером, способным проектировать надежные системы.
Спасибо, что были с нами в этом путешествии. Удачных пайплайнов и зеленых дагов! 🟢
Все финальные варианты кода Dags и конфигурационных файлов лежат в репозитории на GitHub как и обычно.
Использованные референсы и материалы
- Airflow Best Practices Guide
https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html
«Библия» инженера: что делать, а чего категорически избегать в продакшене. - Tutorial on the TaskFlow API
https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html
Как писать современные и чистые DAG-и через декораторы @task. - Logging and Monitoring Architecture
https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/callbacks.html
Как настроить алерты и коллбэки (on_failure_callback) для отслеживания ошибок.
Полный перечень статей Бесплатного курса «Apache Airflow для начинающих»
Урок 1. Apache Airflow с нуля: Архитектура, отличие от Cron и запуск в Docker
Урок 2. Масштабирование Airflow: Настройка CeleryExecutor и Redis в Docker Compose
Урок 3. Работа с базами данных в Airflow: Connections, Hooks и PostgresOperator
Урок 4. Airflow и S3: Интеграция с MinIO и Yandex Object Storage
Урок 5. Airflow и Hadoop: Настройка WebHDFS и работа с сенсорами (Sensors)
Урок 6. Запуск Apache Spark из Airflow: Гайд по SparkSubmitOperator
Урок 7. Airflow и Dask: Масштабирование тяжелых Python-задач и Pandas
Урок 8. Event-Driven Airflow: Запуск DAG по событиям из Apache Kafka
Урок 9. Загрузка данных в ClickHouse через Airflow: Быстрый ETL и батчинг
Урок 10. Airflow Best Practices: Динамические DAGи, TaskFlow API и Алертинг


