Урок 9. Аналитическая мощь ClickHouse как финальная точка DAG AirFlow

Урок 9. Аналитическая мощь ClickHouse как финальная точка DAG AirFlow

 

Если Postgres — это надежный банковский сейф, где каждая транзакция на вес золота, то ClickHouse — это промышленная мясорубка. Ему все равно, уникальны ли ваши записи (по умолчанию), он не поддерживает классические транзакции, но зато он умеет делать SELECT count(*) FROM hits по миллиарду строк за доли секунды.

Для инженера Airflow работа с ClickHouse кардинально отличается от работы с обычными реляционными базами. Главное правило ClickHouse: Никогда не вставляйте данные по одной строке. Если вы напишете цикл в Python, который делает INSERT INTO table VALUES (…) миллион раз, вы положите кластер. ClickHouse любит, когда в него вставляют данные большими кусками (батчами) по 10–100 тысяч строк за раз. И Airflow должен уметь это организовать.

 

Шаг 1. Добавляем ClickHouse в инфраструктуру ETL pipeline Airflow

 

Расширим наш docker-compose.yaml. ClickHouse очень экономен к ресурсам, поэтому для тестов нам хватит минимальной конфигурации.

Добавьте этот сервис:

clickhouse:
    image: clickhouse/clickhouse-server:latest
    ports:
      - "8123:8123" # HTTP порт (для веб-клиентов и некоторых драйверов)
      - "9000:9000" # Нативный TCP порт (самый быстрый, для Python-драйвера)
    ulimits:
      nofile:
        soft: 262144
        hard: 262144
    healthcheck:
      test: ["CMD", "wget", "--spider", "-q", "localhost:8123/ping"]
      interval: 30s
      timeout: 10s
      retries: 3

Не забудьте docker-compose up -d. Проверить работу можно, открыв localhost:8123 в браузере (должен ответить «Ok»).

Также нам понадобится провайдер для Airflow. Добавьте в ваш Dockerfile: RUN pip install apache-airflow-providers-clickhouse clickhouse-driver И пересоберите образ.

 

Apache Airflow для инженеров данных

Код курса
AIRF
Ближайшая дата курса
23 марта, 2026
Продолжительность
24 ак.часов
Стоимость обучения
76 800

Шаг 2. Настройка Connection: HTTP vs Native

 

В Airflow есть путаница с типами подключений к ClickHouse.

  • HTTP (порт 8123): Проще, работает через requests. Надежно, но чуть медленнее на огромных объемах.
  • Native (порт 9000): Работает через бинарный TCP-протокол. Это выбор чемпионов. Библиотека clickhouse-driver использует именно его.

Настроим соединение clickhouse_native. Admin -> Connections -> Add

Conn Id my_clickhouse
Conn Type ClickHouse (если провайдер установлен корректно)
Host clickhouse (имя сервиса Docker)
Login/Password default / (пусто), если не меняли настройки
Port 9000 (для нативного протокола)

 

Шаг 3. Практика: Загрузка данных из S3 в ClickHouse

 

У нас есть два пути загрузки данных, и выбор зависит от объема.

«Ленивый» (через движок S3)

ClickHouse настолько крут, что умеет сам ходить в S3 и забирать данные, вообще не нагружая Airflow. Airflow просто посылает команду: «Эй, ClickHouse, вот бакет, забери файлы». Это лучший способ для больших данных (ГБ и ТБ).

«Классический ETL» (Python Driver)

Airflow читает файл, преобразует его (например, меняет формат дат) и вставляет в ClickHouse. Этот способ мы разберем подробно, так как он учит работать с батчами и хуками.

Напишем DAG, который берет CSV из S3 (результат прошлых статей) и вставляет его в таблицу user_stats.

 

Подготовка таблицы (DDL)

 

Сначала создадим таблицу. Обратите внимание на движок MergeTree — это стандарт для аналитики.

 

CREATE TABLE IF NOT EXISTS user_stats (
    date Date,
    name String,
    count UInt32
) ENGINE = MergeTree()
ORDER BY date;

Код Airflow DAG: s3_to_clickhouse.py

 

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow_clickhouse_plugin.hooks.clickhouse import ClickHouseHook # Или стандартный
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from datetime import datetime
import io
import csv

def load_data_to_clickhouse(**context):
    # 1. Читаем данные из S3
    s3_hook = S3Hook(aws_conn_id="minio_s3")
    bucket = "airflow-bucket"
    key = "users_export_2023-01-01.csv" # В реальности используйте шаблоны {{ ds }}
    
    # Скачиваем файл в память (для больших файлов лучше стримить или качать на диск!)
    obj = s3_hook.get_key(key, bucket)
    file_content = obj.get()['Body'].read().decode('utf-8')
    
    # Парсим CSV в список кортежей
    # ClickHouse драйвер ждет список: [(date, name, count), (date, name, count)]
    data = []
    reader = csv.DictReader(io.StringIO(file_content))
    for row in reader:
        data.append((
            row['date'],
            row['name'],
            int(row.get('count', 1)) # Защита от пустых значений
        ))
    
    print(f"Подготовлено {len(data)} строк для вставки.")

    # 2. Вставляем в ClickHouse
    # Используем execute с параметром params для bulk-вставки
    ch_hook = ClickHouseHook(clickhouse_conn_id="my_clickhouse")
    
    sql = "INSERT INTO user_stats (date, name, count) VALUES"
    
    # Магия clickhouse-driver: мы передаем список данных вторым аргументом.
    # Драйвер сам разобьет это на блоки и отправит бинарным потоком.
    # Это В РАЗЫ быстрее, чем циклы INSERT.
    ch_hook.execute(sql, data)
    
    print("Вставка завершена.")

with DAG(
    dag_id="s3_to_clickhouse_loader",
 start_date=datetime(2023, 1, 1),
    schedule=None,
    catchup=False
) as dag:

    # 0. Создаем таблицу (лучше вынести в отдельный скрипт миграций, но для теста сойдет)
    create_table = PythonOperator(
        task_id="init_table",
        python_callable=lambda: ClickHouseHook(clickhouse_conn_id="my_clickhouse").execute(
            "CREATE TABLE IF NOT EXISTS user_stats (date Date, name String, count UInt32) ENGINE = MergeTree() ORDER BY date"
        )
    )

    # 1. Грузим данные
    load_task = PythonOperator(
        task_id="load_from_s3",
        python_callable=load_data_to_clickhouse
    )

    create_table >> load_task

 

Построение DWH на ClickHouse

Код курса
CLICH
Ближайшая дата курса
18 мая, 2026
Продолжительность
24 ак.часов
Стоимость обучения
76 800

 

 

Тонкости и подводные камни

Работа с ClickHouse в Airflow полна нюансов, о которых не пишут в Quickstart-гайдах.

Проблема идемпотентности (Дубликаты)

ClickHouse не проверяет уникальность (Primary Key) при вставке в обычный MergeTree. Если вы запустите DAG два раза, у вас будет двойной объем данных.

  • Решение для новичков: Перед вставкой делать ALTER TABLE … DELETE WHERE date = ‘{{ ds }}’. Но в ClickHouse операции удаления (Mutation) — тяжелые и асинхронные.
  • Решение для профи: Использовать движок ReplacingMergeTree (он схлопывает дубликаты в фоне) или вставлять данные во временную таблицу, а потом делать EXCHANGE PARTITION (атомарная замена куска данных).

Типизация

Postgres простит вам, если вы передадите число как строку «123». ClickHouse при вставке через нативный протокол строг. Если колонка UInt32, а вы суете str, драйвер упадет. Всегда явно приводите типы в Python (как мы сделали int(row[‘count’])).

Таймауты

ClickHouse быстрый, но если вы попытаетесь вставить 10 ГБ одним запросом, соединение может разорваться.

  • Совет: Разбивайте данные на чанки (chunks) по 10–50 тысяч строк внутри Python-кода и делайте ch_hook.execute в цикле.

 

Альтернатива: ClickHouseOperator

 

В провайдере есть готовый ClickHouseOperator. Он удобен для простых SQL-команд (оптимизация, удаление, создание таблиц).

 

from airflow_clickhouse_plugin.operators.clickhouse import ClickHouseOperator

optimize_table = ClickHouseOperator(
    task_id="optimize_user_stats",
    clickhouse_conn_id="my_clickhouse",
    sql="OPTIMIZE TABLE user_stats FINAL"
)

Используйте его для сервисных задач, а загрузку данных делайте через Python/Hooks, так как вам нужен контроль над форматом данных.

Исправьте финальные варианты кода Dags  и  конфигурационных файлов и при необходимости сравните с нашими на GitHub где лежит код к Уроку 9.

Урок 9. Интеграция ClickHouse аналитики с Airflow from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime with DAG( dag_id="spark_submit_demo", start_date=datetime(2025, 1, 1), schedule="@daily", catchup=False ) as dag: run = BashOperator( task_id="run_job", bash_command="spark-submit app.py" ) GitHub code example Урок 9. Интеграция ClickHouse аналитики с Airflow

 

Помощь Cursor: Генерация SQL и кода вставки

ClickHouse SQL (диалект) местами специфичен. Cursor поможет не лезть в документацию за синтаксисом движков.

Промпт 1 (DDL):

 

"Напиши SQL для создания таблицы ClickHouse events, которая хранит логи веб-сайта (timestamp, user_id, url). Используй движок MergeTree, партиционирование по месяцам и TTL (время жизни), чтобы удалять данные старше года."

 

 

Промпт 2 (Оптимизация вставки):

"Посмотри на этот Python-код вставки в ClickHouse. Перепиши его так, чтобы использовать генератор (generator) и вставлять данные батчами по 20 000 строк, чтобы не перегружать оперативную память."

 

Итог: Мы построили полный цикл: Данные -> Postgres -> S3 -> Обработка -> ClickHouse. Теперь в нашей базе лежат «золотые» данные, готовые к построению графиков в Grafana или Superset.

Но есть одна проблема, с которой вы столкнетесь, когда дагов станет 50 штук. Как не писать один и тот же код 50 раз? Как создавать DAG-и динамически, на основе конфигурационных файлов, а не копипасты? В финальной статье мы поговорим про Best Practices, динамическую генерацию DAG-ов и организацию «чистого кода» в Airflow.

Готовы к рефакторингу и высшему пилотажу?

 

 

Использованные референсы и материалы

 

Полный перечень статей Бесплатного курса «Apache Airflow для начинающих»

Урок 1. Apache Airflow с нуля: Архитектура, отличие от Cron и запуск в Docker

Урок 2. Масштабирование Airflow: Настройка CeleryExecutor и Redis в Docker Compose

Урок 3. Работа с базами данных в Airflow: Connections, Hooks и PostgresOperator

Урок 4. Airflow и S3: Интеграция с MinIO и Yandex Object Storage

Урок 5. Airflow и Hadoop: Настройка WebHDFS и работа с сенсорами (Sensors)

Урок 6. Запуск Apache Spark из Airflow: Гайд по SparkSubmitOperator

Урок 7. Airflow и Dask: Масштабирование тяжелых Python-задач и Pandas

Урок 8. Event-Driven Airflow: Запуск DAG по событиям из Apache Kafka

Урок 9. Загрузка данных в ClickHouse через Airflow: Быстрый ETL и батчинг

Урок 10. Airflow Best Practices: Динамические DAGи, TaskFlow API и Алертинг

Изменение базового тарифа с 1 января 2026 года Подробнее