Урок 6. Тяжелая артиллерия — запуск Spark-jobs через Apache Airflow

Урок 6. Тяжелая артиллерия — запуск Spark-jobs через Apache Airflow

 

Мы построили пайплайн, где данные забираются из базы и бережно складываются в HDFS. Теперь они лежат там мертвым грузом. Чтобы превратить сырые CSV в полезные отчеты, их нужно обработать: отфильтровать, агрегировать, джойнить. Делать это внутри самого Airflow (через PythonOperator и Pandas) — плохая идея если:

  • Память: Если файл весит 100 ГБ, ваш воркер Airflow просто лопнет (OOM Kill).
  • Скорость: Pandas работает на одном ядре. Spark распределяет задачу на сотни ядер.

Airflow здесь выступает только как кнопка «Пуск». Он запускает задачу на кластере Spark и смиренно ждет, пока «большой брат» закончит работу. Для этого используется SparkSubmitOperator.

 

Главная проблема — Airflow не умеет заходить в Spark «из коробки»

Здесь новички ломают копья. SparkSubmitOperator — это, по сути, обертка над консольной командой spark-submit. Чтобы этот оператор сработал, на машине (или в контейнере), где крутится воркер Airflow, должны быть физически установлены:

  • Java (OpenJDK) — потому что Spark работает на JVM.
  • Клиент Spark — набор бинарных файлов, чтобы отправить команду кластеру.

Стандартный образ apache/airflow, который мы использовали до этого, не содержит Java. Если вы попробуете запустить Spark-задачу сейчас, вы получите ошибку JAVA_HOME not set или spark-submit not found. Помните как мы с вами костылизировали с Apache Hadoop и AirFlow на прошлом уроке?

 

Анализ данных с помощью современного Apache Spark

Код курса
SPARK
Ближайшая дата курса
6 апреля, 2026
Продолжительность
32 ак.часов
Стоимость обучения
102 400

 

 

Шаг 1. Модернизируем Docker-образ для поддержки Java и Apache Spark

 

Нам придется снова немного «запачкать руки» и пересоздать свой образ Airflow, который мы с вами использовали на прошлом уроке ( поддержка Hadoop provider) и добавить туда Spark с поддержкой Java. Не пугайтесь, это стандартная практика.

Создайте(или отредактируйте существующий)  файл Dockerfile в корне проекта (рядом с docker-compose):

FROM apache/airflow:2.8.1

USER root

# 1. ОБЪЕДИНЕННАЯ УСТАНОВКА СИСТЕМНЫХ ПАКЕТОВ
# gcc, libkrb5-dev... — нужны для компиляции HDFS провайдера (Урок 5)
# default-jdk, procps, curl — нужны для работы Spark (Урок 6)
RUN apt-get update \
  && apt-get install -y --no-install-recommends \
         gcc \
         libkrb5-dev \
         krb5-user \
         libffi-dev \
         default-jdk \
         procps \
         curl \
  && apt-get autoremove -yqq --purge \
  && apt-get clean \
  && rm -rf /var/lib/apt/lists/*

# 2. НАСТРОЙКА JAVA (Урок 6)
ENV JAVA_HOME=/usr/lib/jvm/default-java
export JAVA_HOME

# 3. УСТАНОВКА SPARK-КЛИЕНТА (Урок 6)
ENV SPARK_VERSION=3.5.1
ENV HADOOP_VERSION=3
RUN curl -O https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz \
    && tar zxf spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz -C /opt/ \
    && rm spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz \
    && ln -s /opt/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION} /opt/spark

# 4. НАСТРОЙКА ПУТЕЙ SPARK
ENV SPARK_HOME=/opt/spark
ENV PATH=$PATH:$SPARK_HOME/bin

USER airflow

# 5. УСТАНОВКА ПРОВАЙДЕРОВ
# Ставим сразу оба: и для HDFS (чтобы работал DAG из 5 урока), и для Spark
RUN pip install --no-cache-dir \
    apache-airflow-providers-apache-hdfs \
    apache-airflow-providers-apache-spark

Пересоздаем наш образ «франкенштейн» состоящий теперь из бинарников Hadoop, Spark и провайдеров для них в Airflow вместе с Java — такова цена интеграции.

#--- и пересобираем его с нуля для уверенности - процесс может занять 5-10 минут
docker build --no-cache -t airflow-hdfs:2.8.1 .

#--- почистим старые images
docker image ls             #для проверки
docker image prune
docker image ls             #для проверки

#--- Уже запущенные контейнеры не обновят автоматически используемый image без опции --build

docker compose up -d --build

как пересоздать имиджа для запуска контейнеров с Airflow и провайдерами Spark, Hadoop и Java

 

Теперь нужно сказать docker-compose, чтобы он использовал вновь созданный файл image ‘airflow-hdfs:2.8.1‘, а не готовый образ из интернета. Измените секцию x-airflow-common в docker-compose.yaml добавив новые переменные и volumes для Spark Jobs:

x-airflow-common:
  &airflow-common
  image: airflow-hdfs:2.8.1      #   Изменяем используемый имидж
  # build: .                      
  environment:
    # ... старые переменные ...
    JAVA_HOME: /usr/lib/jvm/default-java      #   Добавляем новые переменные для Spark
    SPARK_HOME: /opt/spark
volumes:

    # --- НОВЫЙ ТОМ ДЛЯ УРОКА 6 (Скрипты Spark) ---
    - ${AIRFLOW_PROJ_DIR:-.}/jobs:/opt/airflow/jobs

 

Шаг 2. Добавляем Spark-кластер

 

Нам нужен сам Spark, который будет выполнять работу. Добавим мастер и один воркер в docker-compose.yaml (в секцию services):

# Урок 6 Spark jobs  - кластер для Airflow tasks
 spark-master:
   image: apache/spark:3.5.1
   container_name: spark-master
   hostname: spark-master
   profiles:
     - spark
   environment:
     - SPARK_NO_DAEMONIZE=true
     - SPARK_MASTER_HOST=spark-master
   ports:
     - "9090:8080" # Web UI (сместили на 9090, чтобы не конфликтовал с Airflow)
     - "7077:7077" # Мастер-порт
   command: /opt/spark/bin/spark-class org.apache.spark.deploy.master.Master

 spark-worker:
   image: apache/spark:3.5.1
   container_name: spark-worker
   profiles:
     - spark
   environment:
     - SPARK_NO_DAEMONIZE=true
     - SPARK_WORKER_CORES=1
     - SPARK_WORKER_MEMORY=1G
   depends_on:
     - spark-master
   command: /opt/spark/bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077

Не забудьте включить hadoop & datanode из прошлого примера или применить —profile spark.  Теперь выполняем команду пересборки и запуска: docker-compose up -d —build

Как установить Spark и hdfs provider в Apache Airflow

Зайдите на localhost:9090. Вы должны увидеть интерфейс Spark Master с одним живым воркером (Alive Workers: 1).

Проверка доступности Spark Master и Worker для интеграции Airflow SparkSubmit

 

Шаг 3. Настраиваем Connection

Airflow должен знать, где живет Spark Master.

  • Admin -> Connections.
  • Новое соединение:
Conn Id my_spark_conn
Conn Type Spark
Host spark://spark-master
Port 7077
Extra ( или Deploy mode) {«deploy-mode»: «client»} или cluster, но для Docker проще client, чтобы видеть логи сразу

 

Практика: PySpark Job для обработки данных

 

В прошлой статье мы положили файл users_{ds}.csv в HDFS. Давайте напишем скрипт на PySpark, который читает этот файл, считает распределение пользователей по датам и сохраняет отчет обратно в HDFS (или выводит в консоль).

Создайте папку jobs рядом с dags и положите туда скрипт user_analytics.py:

import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count

def main(input_path, output_path):
    # Создаем сессию
    spark = SparkSession.builder \
        .appName("AirflowUserAnalytics") \
        .getOrCreate()

    print(f"Reading from: {input_path}")
    
    # Читаем CSV. Так как хедер мы писали сами, указываем header=True
    df = spark.read.option("header", "true").csv(input_path)

    # Простая аналитика: сколько регистраций в каждую дату
    report = df.groupBy("date").agg(count("*").alias("total_users"))

    report.show()

    # Сохраняем результат (в формате Parquet, это стандарт для Big Data)
    # report.write.mode("overwrite").parquet(output_path)
    
    spark.stop()

if __name__ == "__main__":
    # Аргументы передаются из Airflow
    if len(sys.argv) != 3:
        print("Usage: user_analytics.py <input_path> <output_path>")
        sys.exit(-1)
        
    main(sys.argv[1], sys.argv[2])

 

Шаг 4. AirFlow DAG со SparkSubmitOperator 

 

Теперь самое главное — связать всё вместе. Код DAG-а (dags/process_spark.py):

from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from datetime import datetime

# Путь к HDFS, куда мы писали в прошлой статье
# Обратите внимание: spark внутри Docker-сети может обращаться к namenode
HDFS_INPUT = "hdfs://namenode:8020/user/airflow/backup/users_{{ ds }}.csv"
HDFS_OUTPUT = "hdfs://namenode:8020/user/airflow/reports/daily_{{ ds }}"

with DAG(
    dag_id="06.spark_processing",
    start_date=datetime(2023, 1, 1),
    schedule=None,
    catchup=False
) as dag:

    process_task = SparkSubmitOperator(
        task_id="run_spark_job",
        conn_id="my_spark_conn",
 # Путь к скрипту внутри контейнера AIRFLOW
        application="/opt/airflow/jobs/user_analytics.py", 
        
        # Аргументы для скрипта (input и output)
        application_args=[HDFS_INPUT, HDFS_OUTPUT],
        
        # Конфигурация ресурсов (сколько отдать Спарку)
        conf={
            "spark.driver.memory": "512m",
            "spark.executor.memory": "512m"
        },
        
        # Важно! Указываем пакеты, если нужно работать с S3 или другими системами
        # packages="org.apache.hadoop:hadoop-aws:3.2.0",
        
        verbose=True
    )

Как это работает — объяснялки

  • Airflow (Worker) берет параметры из оператора
  • Он формирует длинную команду: /opt/spark/bin/spark-submit —master spark://spark-master:7077 … user_analytics.py ….
  • Эта команда запускается в контейнере Airflow
  • Клиент Spark связывается с мастером, передает ему код
  • Мастер распределяет задачу на spark-worker
  • Воркер читает данные из HDFS, считает и пишет результат
  • Airflow видит, что процесс sparksubmit завершился с кодом 0, и красит задачу в зеленый цвет

Результаты отработки задачи Spark в логах выполнения DAG Airflow

 

Давайте попробуем?

 

Результаты процессинга данных DAG Airflow c использованием SparkSubmit

Конечно проверить финальный результат из карманного Hadoop и в паркетном формате  тот еще дополнительный квест, но он того стоил ( через docker exec -it скопировать данные с hdfs на локальную систему namenode, потом через docker cp скопировать все с namenode локально и уже финально установив pandas и fastparquet прочитать все с использованием простого питон скрипта).

 

По традиции все финальные версии файлов вы сможете найти у нас на GitHub

Урок 6. Запуск Spark-jobs через Apache Airflow from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime with DAG( dag_id="spark_submit_demo", start_date=datetime(2025, 1, 1), schedule="@daily", catchup=False ) as dag: run = BashOperator( task_id="run_job", bash_command="spark-submit app.py" ) GitHub code example Урок 6. Запуск Spark-jobs через Apache Airflow

 

Troubleshooting: Почему Spark падает?

Запуск Spark в контейнерах — это минное поле.

Ошибка 1: JAVA_HOME is not set

  • Причина: Вы не пересобрали Docker-образ или забыли ENV JAVA_HOME в Dockerfile.
  • Лечение: Проверьте docker exec -it <airflow_container> java -version. Если команды нет — пересобирайте образ.

Ошибка 2: Connection refused к HDFS внутри Spark-джобы

  • Причина: Spark Worker (отдельный контейнер) не может достучаться до Namenode.
  • Лечение: Убедитесь, что все контейнеры (spark-worker, namenode) находятся в одной сети default (docker-compose делает это автоматически, но если вы запускали их разными файлами — будут проблемы).

Ошибка 3: Driver runs, but Executor fails

  • Причина: Часто это нехватка памяти. Spark по умолчанию может просить 1ГБ на экзекьютор, а выделили вы Докеру всего 2ГБ на всё.
  • Лечение: Явно занижайте память в параметре conf: «spark.executor.memory«: «512m».

 

Apache Airflow для инженеров данных

Код курса
AIRF
Ближайшая дата курса
23 марта, 2026
Продолжительность
24 ак.часов
Стоимость обучения
76 800

 

 

Помощь Cursor: Пишем PySpark код для DAG Airflow

 

Если вы не помните синтаксис DataFrame API, Cursor сделает это за вас.

"Напиши скрипт на PySpark, который читает Parquet-файлы из папки /data/input, фильтрует пользователей старше 18 лет, группирует их по городам и сортирует по убыванию количества. Результат сохрани в CSV."

 

Промпт для дебага:

"Вот лог ошибки SparkSubmitOperator: [текст ошибки]. Объясни, почему Airflow не может найти класс org.apache.hadoop.fs.s3a.S3AFileSystem и какой пакет нужно добавить в spark-submit."

Итог: Мы преодолели самый сложный барьер интеграции — настроили запуск Java-приложений из Python-оркестратора. Теперь Airflow управляет мощнейшим кластером обработки данных Apache Spark.

Но Spark — это тяжело, долго и требует много памяти. Всегда ли нам нужен такой монстр если тем более у Вас его еще нет и не кому его приручить? В следующей статье мы рассмотрим легкую, питоническую альтернативу — Dask. Мы узнаем, как масштабировать Pandas-код на несколько серверов без установки Java и мучений с spark-submit.

Переходим к Dask?

 

Практическое применение Big Data аналитики для решения бизнес-задач

Код курса
PRUS
Ближайшая дата курса
20 апреля, 2026
Продолжительность
32 ак.часов
Стоимость обучения
102 400

 

Использованные референсы и материалы

 

Полный перечень статей Бесплатного курса «Apache Airflow для начинающих»

Урок 1. Apache Airflow с нуля: Архитектура, отличие от Cron и запуск в Docker

Урок 2. Масштабирование Airflow: Настройка CeleryExecutor и Redis в Docker Compose

Урок 3. Работа с базами данных в Airflow: Connections, Hooks и PostgresOperator

Урок 4. Airflow и S3: Интеграция с MinIO и Yandex Object Storage

Урок 5. Airflow и Hadoop: Настройка WebHDFS и работа с сенсорами (Sensors)

Урок 6. Запуск Apache Spark из Airflow: Гайд по SparkSubmitOperator

Урок 7. Airflow и Dask: Масштабирование тяжелых Python-задач и Pandas

Урок 8. Event-Driven Airflow: Запуск DAG по событиям из Apache Kafka

Урок 9. Загрузка данных в ClickHouse через Airflow: Быстрый ETL и батчинг

Урок 10. Airflow Best Practices: Динамические DAGи, TaskFlow API и Алертинг

Изменение базового тарифа с 1 января 2026 года Подробнее