Содержание
- Почему LocalExecutor - это тупик для роста
- Новые игроки в команде: роль Redis и Celery
- Развертывание через Docker Compose - собираем конструктор
- Практика. Запускаем параллельные вычисления в Apache AirFlow
- Тонкости настройки и типичные ошибки Docker-окружения
- Проблема 1: "Permission denied" и файлы логов
- Проблема 2: Воркер не видит код
- Проблема 3: Redis Connection Error
- Помощь Cursor в документировании инфраструктуры
Статья 2. Архитектура для продакшена: учим Airflow работать в команде с Redis и Celery
В предыдущей статье мы запустили Airflow в режиме «все в одном». Это когда и планировщик, и исполнитель задач живут внутри одного процесса. Для обучения это подходит идеально, но в реальной жизни такая схема умирает первой. Представьте, что вам нужно запустить десять тяжелых SQL-запросов к базе данных и параллельно обработать пять гигабайт логов на Python. Если использовать LocalExecutor (локальный исполнитель), ресурсы вашего сервера — процессор и оперативная память — закончатся мгновенно. Интерфейс начнет тормозить, а новые задачи просто встанут в бесконечную очередь, ожидая, пока освободится хоть один байт памяти.
Важное примечание: Обработка 5 ГБ данных внутри Airflow — это на самом деле плохая практика (анти-паттерн). Airflow — это оркестратор, а не движок обработки. Но мы приводим этот пример, потому что в реальности такие ошибки случаются, и ваша архитектура должна уметь их переживать, не роняя весь продакшен. Правильные способы обработки таких объемов (через Spark и Dask) мы разберем в статьях 6 и 7.
Чтобы система не упала под нагрузкой, нам нужно разделить обязанности. В этой статье мы перейдем на промышленный стандарт архитектуры Airflow — CeleryExecutor. Мы разнесем «мозги» (планирование) и «мускулы» (выполнение) по разным углам, а связующим звеном между ними станет Redis.
Почему LocalExecutor — это тупик для роста
Главная проблема локального запуска — это так называемое вертикальное масштабирование. Когда задач становится больше, единственный способ выжить — это добавить больше оперативной памяти и ядер в ваш единственный сервер. Но это дорого и имеет физический предел.
В продакшене используется горизонтальное масштабирование. Мы хотим иметь возможность просто добавить еще один, два или десять серверов-воркеров, если нагрузка выросла, и отключить их, когда нагрузка спала. LocalExecutor так не умеет. Он жестко привязан к той машине, где запущен планировщик (Scheduler). Если этот сервер упадет, встанет вообще все. Нам нужна архитектура, где падение одного исполнителя не влияет на общую картину, и где планировщик занимается только управлением, не тратя силы на тяжелые вычисления.
Apache Airflow для инженеров данных
Код курса
AIRF
Ближайшая дата курса
2 марта, 2026
Продолжительность
24 ак.часов
Стоимость обучения
76 800
Новые игроки в команде: роль Redis и Celery
Чтобы разорвать жесткую связь между постановкой задачи и ее выполнением, нам нужен посредник. В экосистеме Python стандартом для таких задач является связка Celery (фреймворк для распределенных задач) и Redis (брокер сообщений).
Давайте представим работу современного таксопарка или агрегатора такси.
- Scheduler (Планировщик) — это центральная диспетчерская служба или алгоритм распределения заказов. Диспетчер знает расписание: в 8:00 нужно подать машину к офису (запустить ETL процесс), а в 9:00 — забрать отчет. Но сам диспетчер за руль не садится. Его задача — сформировать «заказ-наряд».
- Redis — это эфир или общая цифровая база заказов. Диспетчер отправляет туда заявку: «Заказ №123, маршрут от S3 до Postgres, оплата по тарифу High Memory». Эта заявка висит в базе и ждет.
- Celery Workers (Воркеры) — это таксисты на линии. Их может быть много. Свободный водитель смотрит в приложение (Redis), видит новый заказ, нажимает «Принять» и едет выполнять маршрут. Как только он освободился, он снова готов брать заказы.
- Flower — это панель администратора таксопарка. На ней видно, сколько машин сейчас на линии, кто стоит в пробке (зависшая задача), а кто сломался.
Почему именно Redis? Потому что это база данных, работающая целиком в оперативной памяти. Операции записи и чтения там происходят за микросекунды. Для Airflow, который может генерировать тысячи «заказов» в минуту, скорость передачи информации от диспетчера к водителю критически важна.
Развертывание через Docker Compose — собираем конструктор
Теперь самое интересное. Наш файл docker-compose.yaml станет сложнее. Нам нужно добавить сервис Redis, сервис для воркера и сервис мониторинга Flower.
Обратите внимание: все сервисы, которые работают с кодом (scheduler, webserver, worker), должны иметь доступ к одной и той же папке dags. Это критически важно. Если вы положите файл с DAG-ом в папку планировщика, но забудете пробросить ее воркеру, повар просто не поймет, по какому рецепту готовить, и выдаст ошибку.
Вот полная конфигурация для распределенного режима. Сохраните это в docker-compose.yaml (заменив прошлый вариант):
version: '3.8'
# Общие настройки для всех Airflow-сервисов, чтобы не дублировать код
x-airflow-common:
&airflow-common
image: apache/airflow:2.8.1
environment:
# Подключение к базе данных метаданных (Postgres)
- AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow
# Говорим Airflow использовать CeleryExecutor
- AIRFLOW__CORE__EXECUTOR=CeleryExecutor
# Указываем, где искать наш "почтовый ящик" (Redis)
- AIRFLOW__CELERY__BROKER_URL=redis://:@redis:6379/0
# Где хранить результаты выполнения задач (в базе данных)
- AIRFLOW__CELERY__RESULT_BACKEND=db+postgresql://airflow:airflow@postgres/airflow
- AIRFLOW__CORE__LOAD_EXAMPLES=False
- AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION=True
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
user: "${AIRFLOW_UID:-5000}:0"
depends_on:
redis:
condition: service_healthy
postgres:
condition: service_healthy
services:
postgres:
image: postgres:13
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
healthcheck:
test: ["CMD-SHELL", "pg_isready -U airflow"]
interval: 5s
timeout: 5s
retries: 5
# Наш новый компонент - Redis
redis:
image: redis:latest
ports:
- "6379:6379"
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
webserver:
<<: *airflow-common
command: webserver
ports:
- "8080:8080"
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
depends_on:
redis:
condition: service_healthy
postgres:
condition: service_healthy
scheduler:
<<: *airflow-common
command: scheduler
# Воркер - тот, кто реально делает работу
worker:
<<: *airflow-common
command: celery worker
# Интерфейс мониторинга очереди
flower:
<<: *airflow-common
command: celery flower
ports:
- "5555:5555"
Вам также понадобится создать файл .env в той же папке и прописать там ваш ID пользователя, чтобы не было проблем с правами доступа (об этом ниже):
AIRFLOW_UID=5000 # для Windows 5000 для Linux/MacOS 1000
(В Linux/Mac узнать свой ID можно командой id -u, в Windows оставьте 5000).
Детальный разбор настройки Redis
Давайте посмотрим на строку подключения, которую мы добавили: AIRFLOW__CELERY__BROKER_URL=redis://:@redis:6379/0
Без этой строки CeleryExecutor не запустится, так как он не будет знать, куда складывать сообщения.
Apache Airflow для инженеров данных
Код курса
AIRF
Ближайшая дата курса
2 марта, 2026
Продолжительность
24 ак.часов
Стоимость обучения
76 800
Практика. Запускаем параллельные вычисления в Apache AirFlow
Теперь запустим нашу ферму командой docker-compose up -d. Подождите пару минут, пока поднимутся все контейнеры. Теперь у вас доступны два интерфейса:
- Airflow: localhost:8080
- Flower: localhost:5555
Давайте напишем DAG, который наглядно покажет работу распределенной системы. Мы создадим задачу, которая имитирует долгую обработку данных (просто «спит» 30 секунд), и запустим 5 таких задач параллельно. В старой схеме с LocalExecutor (если бы у нас был 1 слот) они шли бы друг за другом 2.5 минуты. Здесь они должны выполниться одновременно за 30 секунд.
Создайте файл dags/parallel_test.py:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import time
import os
def heavy_computation(task_number):
print(f"Воркер начал задачу {task_number}")
# Имитируем бурную деятельность
time.sleep(30)
print(f"Воркер закончил задачу {task_number}")
return f"Done {task_number}"
with DAG(
dag_id="celery_parallel_demo",
start_date=datetime(2023, 1, 1),
schedule=None,
catchup=False
) as dag:
for i in range(1, 6):
PythonOperator(
task_id=f"processing_chunk_{i}",
python_callable=heavy_computation,
op_kwargs={"task_number": i}
)
Зайдите в Airflow, включите DAG и нажмите кнопку запуска (Trigger). Сразу после этого переключитесь в Flower (localhost:5555). Вы увидите магию: на вкладке Tasks появятся 5 задач со статусом STARTED. Это значит, что воркер подхватил их все одновременно. Redis отработал как диспетчер, мгновенно раздав поручения.
А теперь представьте, что задач стало 50. Один воркер захлебнется. В Docker Compose вы можете сделать финт ушами и добавить мощности одной командой:
docker-compose up -d --scale worker=3
Эта команда запустит еще два контейнера с воркерами. Теперь у вас три «повара» на кухне, и они разберут очередь в три раза быстрее. LocalExecutor о таком мог только мечтать.
Тонкости настройки и типичные ошибки Docker-окружения
Переход на распределенную архитектуру часто сопровождается болью настройки. Разберем главные грабли.
Проблема 1: «Permission denied» и файлы логов
Docker-контейнеры по умолчанию любят работать от имени root. Если воркер создаст файл лога от root, а вы попытаетесь открыть его с хост-машины (или другой контейнер попробует прочитать его), будет ошибка доступа. Именно для этого мы добавили переменную user: «${AIRFLOW_UID:-5000}:0». Она заставляет процессы внутри контейнера работать с тем же ID пользователя, что и у вас в системе. Это критически важно для Linux-пользователей. Если видите ошибки доступа к папке logs — проверьте .env файл.
Проблема 2: Воркер не видит код
Новички часто меняют код DAG-а и не понимают, почему воркер выполняет старую версию. Причина: воркер загружает код в память при старте процесса. В Airflow есть настройка, как часто сканировать папку, но иногда проще перезапустить воркер. В Docker Compose это делается так: docker-compose restart worker
Проблема 3: Redis Connection Error
Если в логах воркера вы видите ConnectionRefusedError по адресу redis, проверьте две вещи:
- Запущен ли контейнер Redis (docker ps).
- Правильно ли указано имя хоста в AIRFLOW__CELERY__BROKER_URL. Оно должно совпадать с именем сервиса в docker-compose.yaml.
Apache Airflow для инженеров данных
Код курса
AIRF
Ближайшая дата курса
2 марта, 2026
Продолжительность
24 ак.часов
Стоимость обучения
76 800
Помощь Cursor в документировании инфраструктуры
Когда ваш docker-compose.yaml разрастается, в нем легко запутаться. Здесь Cursor может стать вашим техническим писателем.
Попробуйте выделить весь файл конфигурации и отправить в чат Cursor такой промпт:
«Проанализируй этот docker-compose файл. Создай таблицу всех сервисов, объяснив для каждого его роль в кластере Airflow и укажи, от каких других сервисов он зависит. Также выдели все переменные окружения, которые отвечают за настройку Celery и Redis, и объясни, что они делают.»
Результат можно сразу копировать в README.md вашего проекта. Это спасет вас через полгода, когда вы забудете, зачем добавили flower или почему используется порт 5555.
Также можно попросить Cursor сгенерировать скрипт очистки (Это полезно при экспериментах, чтобы сбросить базу данных):
«Напиши bash-скрипт, который останавливает все контейнеры, удаляет их и очищает Docker volumes для этого проекта, чтобы я мог начать установку с чистого листа.»
В этой статье мы построили настоящий завод по переработке данных. Теперь у нас есть очередь задач, масштабируемые воркеры и мониторинг. Но пока что наши данные лежат либо в локальных файлах, либо в игрушечной базе.
В следующей статье мы выйдем во внешний мир: подключимся к объектному хранилищу S3 (MinIO). Мы научимся писать собственные Хуки (Hooks), чтобы Airflow мог забирать файлы из облака и класть их обратно, и разберем, как безопасно хранить пароли и ключи доступа, не светя их в коде.




