Урок 7. Масштабирование Python-задач или как Airflow управляет Dask-кластером

Урок 7. Масштабирование Python-задач или как Airflow управляет Dask-кластером

 

В прошлых статьях мы выяснили: если задача тяжелая и требует Java (Spark), мы используем SparkSubmitOperator. Но что делать, если у вас «тяжелый» Python? Типичная ситуация когда вы написали отличный код на Pandas внутри PythonOperator. На тестовом файле в 100 Мб все летало. В продакшене пришел файл на 10 Гб. Как результат OOM Kill (Out Of Memory). Воркер Airflow падает, задача фейлится, соседние легкие задачи тоже умирают, потому что процесс был убит операционной системой.

Проблема архитектуры Airflow: PythonOperator выполняет код локально на том воркере, где он запущен. Это значит, что вы ограничены ресурсами одной машины. Пытаться наращивать RAM на воркере Airflow — это тупиковый путь.

Решение: Нам нужно вынести исполнение кода за пределы Airflow, оставив за ним только функцию «кнопки пуск» и контроля статуса. Для Python-задач идеальным «внешним процессором» является Dask. В этой статье мы научим Airflow делегировать тяжелые вычисления удаленному кластеру, не меняя при этом привычный Python-стек.

 

Архитектура делегирования — Как Airflow использует Dask для масштабирования

 

В этой связке Airflow выступает в роли заказчика.

  • Airflow Worker: Запускает задачу. Но вместо того чтобы грузить данные в свою память, он создает легкий объект-клиент.
  • Сетевой вызов: Этот клиент стучится по TCP к планировщику Dask (Scheduler).
  • Удаленное исполнение: Dask забирает инструкцию и данные (из S3), «перемалывает» их на своих мощностях.
  • Ожидание: Airflow-задача висит в ожидании ответа (или мониторит статус), потребляя минимум ресурсов.

Для Airflow это выглядит как обычный Python-скрипт, но физически нагрузка уходит на другие серверы.

 

Практическое применение Big Data аналитики для решения бизнес-задач

Код курса
PRUS
Ближайшая дата курса
20 апреля, 2026
Продолжительность
32 ак.часов
Стоимость обучения
102 400

 

Настройка инфраструктуры для масштабирования выполнения задач Airflow

Чтобы Airflow мог управлять Dask-ом, внутри контейнера Airflow должна стоять библиотека dask[distributed], s3fs и pandas  нужных версий. Убедитесь, что она есть в вашем Dockerfile или установите её.

обновляем dockerfile до:

FROM apache/airflow:2.8.1-python3.10
USER root
# 1. ОБЪЕДИНЕННАЯ УСТАНОВКА СИСТЕМНЫХ ПАКЕТОВ
# gcc, libkrb5-dev... — нужны для компиляции HDFS провайдера (Урок 5)
# default-jdk, procps, curl — нужны для работы Spark (Урок 6)
RUN apt-get update 
  && apt-get install -y --no-install-recommends 
         gcc 
         libkrb5-dev 
         krb5-user 
         libffi-dev 
         default-jdk 
         procps 
         curl 
  && apt-get autoremove -yqq --purge 
  && apt-get clean 
  && rm -rf /var/lib/apt/lists/*

# 2. НАСТРОЙКА JAVA (Урок 6)
ENV JAVA_HOME=/usr/lib/jvm/default-java

# 3. УСТАНОВКА SPARK-КЛИЕНТА (Урок 6)
ENV SPARK_VERSION=3.5.1
ENV HADOOP_VERSION=3
RUN curl -O https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz 
    && tar zxf spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz -C /opt/ 
    && rm spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz 
    && ln -s /opt/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION} /opt/spark

# 4. НАСТРОЙКА ПУТЕЙ SPARK
ENV SPARK_HOME=/opt/spark
ENV PATH=$PATH:$SPARK_HOME/bin:

USER airflow

#--- Добавляем ограничения на установку версий совместимых с Airflow и Python
ARG AIRFLOW_VERSION=2.8.1
ARG PYTHON_VERSION=3.10
ARG CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"

# 5. УСТАНОВКА ПРОВАЙДЕРОВ
# Ставим сразу оба: и для HDFS (чтобы работал DAG из 5 урока), и для Spark (Урок 6)
#--- ОБНОВЛЕНИЕ ЗДЕСЬ ---
RUN pip install --no-cache-dir "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"\
    apache-airflow-providers-apache-hdfs \
    apache-airflow-providers-apache-spark \
    "dask==2023.12.1" \
    "distributed==2023.12.1" \
    s3fs \
    pandas

# ПОСЛЕДНЯЯ СТРОКА Добавляем dask, distributed и s3fs (для работы с Yandex S3) только нужной версии

Build-им снова docker image airflow-hdfs:2.8.1

как настроить интеграцию Dask c Apache Airflow для параллельных вычислений задач DAG

Но и это еще не все к сожалению многокомпонентные распределенные системы, часто конфликтуют из-за  совместимости и на стороне выполнения задач ( Dask worker и scheduler). Dockerfile, который мы подготовили, настроен для сборки Airflow  контейнеров, и если вы соберете сейчас имидж с помощью ниже описанного docker-compose.yaml файла, вылезут несовместимости по python библиотекам используемым на Dask узлах поэтому сделаем небольшую сборку для Dask:

#--- соберем кастомный имидж для установки Dask контейнеров с треьования по версионности компонент 
FROM daskdev/dask:2023.12.1

COPY requirements-dask.txt /tmp/requirements-dask.txt

RUN pip install --no-cache-dir -r /tmp/requirements-dask.txt \
 && python -c "import s3fs,fsspec,pandas; print('OK', s3fs.__version__, fsspec.__version__, pandas.__version__)"

Создадим requirements-dask.txt для сборки нового имиджа под Dask

#-- версии подходящие для dask:2023.12.1
fsspec==2023.12.1
s3fs==2023.12.1

aiobotocore==2.7.0
botocore==1.31.64
boto3==1.28.64

pandas<3

Соберем все воедино: docker build -t dask-s3fs:2023.12.1 -f dockerfile.dask-s3fs .

Custom версионированный имидж для Dask контейнера с нужными версиями Python и S3 компонент

Вот два имиджа которые мы будем использовать для запуска непротиворечивого интегрированного кластера Airflow c Dask.

Теперь нам нужно добавить в наш docker-compose.yaml «исполнительный цех» — кластер Dask c custom dask-s3fs:2023.12.1 имиджем. Для Airflow это просто внешний сервис, как Postgres или SMTP-сервер.

Добавьте в docker-compose.yaml:

# --- СЕРВИСЫ УРОКА 7 (DASK) ---
  
  # Планировщик Dask (раздает задачи)
  dask-scheduler:
    image: dask-s3fs:2023.12.1
    container_name: dask-scheduler
    hostname: dask-scheduler
    ports:
      - "8786:8786" # Порт для подключения Airflow (Client)
      - "8787:8787" # Красивый дэшборд мониторинга
    command: ["dask-scheduler"]

  # Воркер Dask (выполняет Pandas-код)
  dask-worker:
    image: dask-s3fs:2023.12.1
    container_name: dask-worker
    depends_on:
      - dask-scheduler
    command: ["dask-worker", "tcp://dask-scheduler:8786"]
    environment:
      # Настройки для доступа к Yandex Object Storage
      # Dask использует библиотеку s3fs, которая читает эти переменные
      - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} # Возьмет из вашего .env или окружения
      - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} # Возьмет из вашего .env или окружения
      - AWS_ENDPOINT_URL=https://storage.yandexcloud.net
      # Важно: чтобы s3fs не пытался стучаться в Amazon AWS
      - AWS_DEFAULT_REGION=ru-central1

вариант подключения к Yandex_s3 почти такой же ( возьмите параметры из Yandex_s3 коннектора и добавьте AWS_DEFAULT_REGION=ru-central1 чтобы s3fs не стучался в Amazon AWS) только лучше поместить их в .env и скрыть

Переменные окружения .env позволяют скрыть детали из docker compose yaml файла

AWS_ENDPOINT_URL https://storage.yandexcloud.net
AWS_ACCESS_KEY_ID ${AWS_ACCESS_KEY_ID}
AWS_SECRET_ACCESS_KEY ${AWS_SECRET_ACCESS_KEY}
AWS_DEFAULT_REGION ru-central1

Проверяем код на правильность docker compose config и запустите docker compose up -d. Чтобы почувствовать мощь, можно набрать docker compose scale dask-worker=3, добавив мощностей.

Масштабирования потоков Dask для задач Python в DAG Airflow

 

Практика написания DAG — Паттерн «Remote Execution» для Yandex S3

 

Давайте напишем DAG. Наша цель — показать, как PythonOperator превращается в пульт управления удаленным кластером. В отличие от MinIO, где мы использовали http, здесь мы идем по https. Также Dask-воркеру нужно явно передать storage_options для подключения к Яндекс Облаку.

Задача:

  1. Соединиться с Dask.
  2. Отправить туда задачу на обработку файла из S3.
  3. Дождаться завершения и подтвердить успех в Airflow.

Сохраните этот код как dags/07_dask_yandex.py:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from datetime import datetime
from dask.distributed import Client, wait
import logging

# Настройки бакета
BUCKET_NAME = "airflow-course"  # Ваш бакет
S3_FILE_PATTERN = "users_export_*.csv" # Маска файлов, которые загружали ранее

def offload_to_dask():
    logging.info("Инициализация подключения к Dask Cluster...")
    
    # 1. Подключаемся к планировщику (имя сервиса из docker-compose)
    client = Client("tcp://dask-scheduler:8786")
    logging.info(f"Подключение успешно: {client}")

    # 2. Функция обработки, которая полетит на кластер
    # ВАЖНО: Внутри функции нужно импортировать библиотеки заново, 
    # так как она выполняется в другом процессе/контейнере
    def heavy_processing_task(bucket, pattern, aws_key, aws_secret):
        import dask.dataframe as dd
        
        # Настройки для Yandex Object Storage (передаем в s3fs)
        storage_opts = {
            "key": aws_key,
            "secret": aws_secret,
            "client_kwargs": {
                "endpoint_url": "https://storage.yandexcloud.net",
                "region_name": "ru-central1",
            },
            "config_kwargs": {
              "s3": {"addressing_style": "path"}
              },
        }
        
        # Читаем CSV прямо из S3 (ленивое чтение)
        s3_path = f"s3://{bucket}/{pattern}"
        ddf = dd.read_csv(s3_path, storage_options=storage_opts)
        
        # Пример тяжелой операции: группировка и подсчет
        # .compute() запускает реальные вычисления на воркере
        # result = ddf.groupby('date').size().compute()
        expr = ddf.groupby("date").size()
        future = client.compute(expr)
        result = future.result()
        
        # Сохраняем результат обратно в S3 (в формате CSV для наглядности)
        #output_path = f"s3://{bucket}/dask_results/report.csv"
        fs = s3fs.S3FileSystem(
        key=aws_key,
        secret=aws_secret,
        client_kwargs={"endpoint_url": endpoint, "region_name": region},
        config_kwargs={"s3": {"addressing_style": "path"}},
    )

    out_key = f"{BUCKET_NAME}/dask_results/report.csv"
    result = result.rename("count")
    with fs.open(out_key, "w") as f:
        result.to_csv(f)

    client.close()
    return result.to_dict()

        # Превращаем результат (Pandas Series) обратно в Dask DataFrame для записи
        dd.from_pandas(result, npartitions=1).to_csv(
            output_path, 
            storage_options=storage_opts,
            single_file=True
        )
        
        return output_path

    # 3. Получаем креды Яндекса (лучше брать из Airflow Connections, но для простоты - Variables или хардкод)
    # Предполагаем, что вы создали Variable в Admin -> Variables c именем 'yandex_creds'
    # В формате JSON: {"key": "...", "secret": "..."}
    # ИЛИ (для теста) впишите свои ключи ниже, если не хотите возиться с Variables
    # aws_key = "ВАШ_ACCESS_KEY"
    # aws_secret = "ВАШ_SECRET_KEY"
    
    # Чтобы не светить ключи в коде, попробуем достать из Environment (если прокинули в docker-compose)
    import os
    aws_key = os.getenv("AWS_ACCESS_KEY_ID", "ЗАМЕНИТЕ_НА_КЛЮЧ_ЕСЛИ_НЕТ_ENV")
    aws_secret = os.getenv("AWS_SECRET_ACCESS_KEY", "ЗАМЕНИТЕ_НА_СЕКРЕТ_ЕСЛИ_НЕТ_ENV")

    # 4. Отправляем задачу на кластер
    future = client.submit(heavy_processing_task, BUCKET_NAME, S3_FILE_PATTERN, aws_key, aws_secret)
    
    logging.info("Задача отправлена в Dask. Ждем...")
    
    # Ждем завершения
    wait(future)
    
    # Получаем результат (путь к файлу)
    try:
        result_path = future.result()
        logging.info(f"Успех! Данные сохранены в: {result_path}")
    except Exception as e:
        logging.error(f"Ошибка вычислений в Dask: {e}")
        raise e
        
    client.close()

with DAG(
    dag_id="07.dask_yandex_processing",
    start_date=datetime(2023, 1, 1),
    schedule=None,
    catchup=False,
    tags=['dask', 'yandex']
) as dag:

    run_on_dask = PythonOperator(
        task_id="run_on_dask_cluster",
        python_callable=offload_to_dask
    )

Не забудьте прописать свои ключи в DAG файл ( конечно с точки зрения это не правильно, но наверняка вы не столкнетесь с кучей ошибок при попытке сохранить результаты из dask worker ов, а это уже совсем другая история которую мы расскажем вам на курсе  ниже по ссылке

 

Apache Airflow для инженеров данных

Код курса
AIRF
Ближайшая дата курса
23 марта, 2026
Продолжительность
24 ак.часов
Стоимость обучения
76 800

 

А вот и наш результат

Результат отработки распределенной задачи DAG в Dask кластере

Хотя конечно мы все равно получили предупреждения о ращличиях в версиях компонент на Dask кластере, но не критичных для нашего DAG кода.

 

Главные ошибки при интеграции Dask (Airflow-Specific)

При такой схеме работы инженеры часто наступают на специфические грабли синхронизации.

Проблема синхронизации окружений (Dependency Hell) Airflow «сериализует» (упаковывает) вашу функцию и отправляет её в Dask.

  • Сценарий: Вы используете в функции библиотеку Dask версии 2023.5.0, которая стоит на Airflow. А на Dask-воркере стоит Dask 2026.1.2(latest).
  • Результат: Задача упадет с ошибкой десериализации.
  • Правило Airflow: Образы Docker для Airflow Worker и Dask Worker должны иметь идентичный набор Python-библиотек. Если обновляете requirements.txt в Airflow — обновляйте и в Dask.

Давайте поразмышляем  немного над этим. В нашем случае при старте нашего DAG мы сразу получаем ошибку и пытаемся разобраться в причинах

Как лечить ошибку Dependencies hell ( ад зависимостей) в компонентах DASK и Airflow

Присмотритесь над выделенным фрагментом — мы  увидим разные версии на клиенте и на сервере ( worker+scheduler) для dask[distributed] и Python. вспоминаем опции которые мы прописали в dockerfile, когда создавали имидж для поднятия Airflow с библиотеками Dask и имиджа «FROM apache/airflow:2.8.1», но версии утилит Dask и Python, да и других утилит мы не выбирали. Без споров на будущее и с уверенностью что изменение конфигурации имиджа не разрушит совместимость с нашими прошлыми уроками (1-6) и установленными компонентами (Hadoop,Spark) выбираем фиксированную версию исходного имиджа Airflow c Python 3.10  

Фиксированная версия Apache Airflow 2.8.1 и Python 3.10

В dockerfile вносим изменения вместо «FROM apache/airflow:2.8.1″ ->»FROM apache/airflow:2.8.1-python3.10″ и для установки пакетов Dask, Distributed тоже жестко фиксируем номера версий («dask==2023.12.1» «distributed==2023.12.1» «dask[distributed]==2023.12.1» s3fs pandas), чтобы и в docker-compose.yaml при конфигурации dask workers и schedulers использовал не «image: daskdev/dask:latest«, а версию максимально близкую и совместимую с клиентом допустим «image: daskdev/dask:2023.12.1«.

Изменение dockerfile для фиксированной версии dask airflow image

Пересобираем docker image и проверяем работу DAGа  ( займет чуть больше времени)

docker compose down
docker build --no-cache -t airflow-hdfs:2.8.1 .
docker compose up -d

 

 Ошибка «Вернуть данные в return» Новички часто пишут: return heavy_dataframe.

  • Что происходит: Dask возвращает гигабайты данных по сети обратно в Airflow Client. Airflow пытается записать это в XCom (мета-базу Postgres).
  • Результат: База зависает, Airflow падает.
  • Правило: Задачи на Dask должны читать из S3 и писать в S3. В Airflow возвращаем только пути к файлам или статус (True/False).

Сетевая доступность Если Airflow запущен локально, а Dask в Docker (или наоборот), Client(…) не сможет подключиться. В нашем примере все работает, потому что оба сервиса живут в одной сети docker-compose. В реальном проде Airflow и Dask могут быть в разных подсетях Kubernetes, и вам придется настраивать доступы.

Исправьте финальные варианты кода Dags  и  конфигурационных файлов и при необходимости сравните с нашими на GitHub где лежит код к Уроку 7.

Урок 7. Масштабирование Python-задач в Airflow from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime with DAG( dag_id="spark_submit_demo", start_date=datetime(2025, 1, 1), schedule="@daily", catchup=False ) as dag: run = BashOperator( task_id="run_job", bash_command="spark-submit app.py" ) GitHub code example Урок 7. Масштабирование Python-задач в Airflow

 

Роль Cursor в написании «оберток» для масштабирование Airflow c Dask

 

Код делегирования часто шаблонный. Cursor может сэкономить время.

Промпт:

Напиши функцию-обертку для PythonOperator Airflow. 
Функция должна принимать адрес Dask-планировщика и словарь с параметрами. 
Внутри она должна подключаться к кластеру, запускать переданную функцию processing_logic, ждать результата и логировать прогресс. 
Обработай возможные ошибки подключения (TimeOut).

Итог: Мы научились использовать PythonOperator не как исполнителя, а как контроллер. Теперь Airflow может запускать задачи любой тяжести, делегируя их Dask-кластеру. Мы решили проблему нехватки памяти (OOM) архитектурным способом, не меняя язык программирования.

Теперь, когда мы умеем работать с тяжелыми файлами, пришло время поговорить о скорости реакции. В следующей статье мы разберем Event-Driven архитектуру. Мы узнаем, как заставить Airflow запускать DAG не по расписанию, а мгновенно — как только в Kafka прилетело сообщение о событии.

Переходим к Kafka и событийной модели?

 

Apache Airflow для инженеров данных

Код курса
AIRF
Ближайшая дата курса
23 марта, 2026
Продолжительность
24 ак.часов
Стоимость обучения
76 800

 

 

Использованные референсы и материалы

 

Полный перечень статей Бесплатного курса «Apache Airflow для начинающих»

Урок 1. Apache Airflow с нуля: Архитектура, отличие от Cron и запуск в Docker

Урок 2. Масштабирование Airflow: Настройка CeleryExecutor и Redis в Docker Compose

Урок 3. Работа с базами данных в Airflow: Connections, Hooks и PostgresOperator

Урок 4. Airflow и S3: Интеграция с MinIO и Yandex Object Storage

Урок 5. Airflow и Hadoop: Настройка WebHDFS и работа с сенсорами (Sensors)

Урок 6. Запуск Apache Spark из Airflow: Гайд по SparkSubmitOperator

Урок 7. Airflow и Dask: Масштабирование тяжелых Python-задач и Pandas

Урок 8. Event-Driven Airflow: Запуск DAG по событиям из Apache Kafka

Урок 9. Загрузка данных в ClickHouse через Airflow: Быстрый ETL и батчинг

Урок 10. Airflow Best Practices: Динамические DAGи, TaskFlow API и Алертинг

Изменение базового тарифа с 1 января 2026 года Подробнее