Содержание
- Шаг 1. Добавляем ClickHouse в инфраструктуру ETL pipeline Airflow
- Шаг 2. Настройка Connection: HTTP vs Native
- Шаг 3. Практика: Загрузка данных из S3 в ClickHouse
- "Ленивый" (через движок S3)
- "Классический ETL" (Python Driver)
- Подготовка таблицы (DDL)
- Код Airflow DAG: s3_to_clickhouse.py
- Тонкости и подводные камни
- Альтернатива: ClickHouseOperator
- Помощь Cursor: Генерация SQL и кода вставки
- Использованные референсы и материалы
- Полный перечень статей Бесплатного курса "Apache Airflow для начинающих"
Если Postgres — это надежный банковский сейф, где каждая транзакция на вес золота, то ClickHouse — это промышленная мясорубка. Ему все равно, уникальны ли ваши записи (по умолчанию), он не поддерживает классические транзакции, но зато он умеет делать SELECT count(*) FROM hits по миллиарду строк за доли секунды.
Для инженера Airflow работа с ClickHouse кардинально отличается от работы с обычными реляционными базами. Главное правило ClickHouse: Никогда не вставляйте данные по одной строке. Если вы напишете цикл в Python, который делает INSERT INTO table VALUES (…) миллион раз, вы положите кластер. ClickHouse любит, когда в него вставляют данные большими кусками (батчами) по 10–100 тысяч строк за раз. И Airflow должен уметь это организовать.
Шаг 1. Добавляем ClickHouse в инфраструктуру ETL pipeline Airflow
Расширим наш docker-compose.yaml. ClickHouse очень экономен к ресурсам, поэтому для тестов нам хватит минимальной конфигурации.
Добавьте этот сервис:
clickhouse:
image: clickhouse/clickhouse-server:latest
ports:
- "8123:8123" # HTTP порт (для веб-клиентов и некоторых драйверов)
- "9000:9000" # Нативный TCP порт (самый быстрый, для Python-драйвера)
ulimits:
nofile:
soft: 262144
hard: 262144
healthcheck:
test: ["CMD", "wget", "--spider", "-q", "localhost:8123/ping"]
interval: 30s
timeout: 10s
retries: 3
Не забудьте docker-compose up -d. Проверить работу можно, открыв localhost:8123 в браузере (должен ответить «Ok»).
Также нам понадобится провайдер для Airflow. Добавьте в ваш Dockerfile: RUN pip install apache-airflow-providers-clickhouse clickhouse-driver И пересоберите образ.
Apache Airflow для инженеров данных
Код курса
AIRF
Ближайшая дата курса
23 марта, 2026
Продолжительность
24 ак.часов
Стоимость обучения
76 800
Шаг 2. Настройка Connection: HTTP vs Native
В Airflow есть путаница с типами подключений к ClickHouse.
- HTTP (порт 8123): Проще, работает через requests. Надежно, но чуть медленнее на огромных объемах.
- Native (порт 9000): Работает через бинарный TCP-протокол. Это выбор чемпионов. Библиотека clickhouse-driver использует именно его.
Настроим соединение clickhouse_native. Admin -> Connections -> Add
| Conn Id | my_clickhouse | |
| Conn Type | ClickHouse | (если провайдер установлен корректно) |
| Host | clickhouse | (имя сервиса Docker) |
| Login/Password | default | / (пусто), если не меняли настройки |
| Port | 9000 | (для нативного протокола) |
Шаг 3. Практика: Загрузка данных из S3 в ClickHouse
У нас есть два пути загрузки данных, и выбор зависит от объема.
«Ленивый» (через движок S3)
ClickHouse настолько крут, что умеет сам ходить в S3 и забирать данные, вообще не нагружая Airflow. Airflow просто посылает команду: «Эй, ClickHouse, вот бакет, забери файлы». Это лучший способ для больших данных (ГБ и ТБ).
«Классический ETL» (Python Driver)
Airflow читает файл, преобразует его (например, меняет формат дат) и вставляет в ClickHouse. Этот способ мы разберем подробно, так как он учит работать с батчами и хуками.
Напишем DAG, который берет CSV из S3 (результат прошлых статей) и вставляет его в таблицу user_stats.
Подготовка таблицы (DDL)
Сначала создадим таблицу. Обратите внимание на движок MergeTree — это стандарт для аналитики.
CREATE TABLE IF NOT EXISTS user_stats (
date Date,
name String,
count UInt32
) ENGINE = MergeTree()
ORDER BY date;
Код Airflow DAG: s3_to_clickhouse.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow_clickhouse_plugin.hooks.clickhouse import ClickHouseHook # Или стандартный
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from datetime import datetime
import io
import csv
def load_data_to_clickhouse(**context):
# 1. Читаем данные из S3
s3_hook = S3Hook(aws_conn_id="minio_s3")
bucket = "airflow-bucket"
key = "users_export_2023-01-01.csv" # В реальности используйте шаблоны {{ ds }}
# Скачиваем файл в память (для больших файлов лучше стримить или качать на диск!)
obj = s3_hook.get_key(key, bucket)
file_content = obj.get()['Body'].read().decode('utf-8')
# Парсим CSV в список кортежей
# ClickHouse драйвер ждет список: [(date, name, count), (date, name, count)]
data = []
reader = csv.DictReader(io.StringIO(file_content))
for row in reader:
data.append((
row['date'],
row['name'],
int(row.get('count', 1)) # Защита от пустых значений
))
print(f"Подготовлено {len(data)} строк для вставки.")
# 2. Вставляем в ClickHouse
# Используем execute с параметром params для bulk-вставки
ch_hook = ClickHouseHook(clickhouse_conn_id="my_clickhouse")
sql = "INSERT INTO user_stats (date, name, count) VALUES"
# Магия clickhouse-driver: мы передаем список данных вторым аргументом.
# Драйвер сам разобьет это на блоки и отправит бинарным потоком.
# Это В РАЗЫ быстрее, чем циклы INSERT.
ch_hook.execute(sql, data)
print("Вставка завершена.")
with DAG(
dag_id="s3_to_clickhouse_loader",
start_date=datetime(2023, 1, 1),
schedule=None,
catchup=False
) as dag:
# 0. Создаем таблицу (лучше вынести в отдельный скрипт миграций, но для теста сойдет)
create_table = PythonOperator(
task_id="init_table",
python_callable=lambda: ClickHouseHook(clickhouse_conn_id="my_clickhouse").execute(
"CREATE TABLE IF NOT EXISTS user_stats (date Date, name String, count UInt32) ENGINE = MergeTree() ORDER BY date"
)
)
# 1. Грузим данные
load_task = PythonOperator(
task_id="load_from_s3",
python_callable=load_data_to_clickhouse
)
create_table >> load_task
Построение DWH на ClickHouse
Код курса
CLICH
Ближайшая дата курса
18 мая, 2026
Продолжительность
24 ак.часов
Стоимость обучения
76 800
Тонкости и подводные камни
Работа с ClickHouse в Airflow полна нюансов, о которых не пишут в Quickstart-гайдах.
Проблема идемпотентности (Дубликаты)
ClickHouse не проверяет уникальность (Primary Key) при вставке в обычный MergeTree. Если вы запустите DAG два раза, у вас будет двойной объем данных.
- Решение для новичков: Перед вставкой делать ALTER TABLE … DELETE WHERE date = ‘{{ ds }}’. Но в ClickHouse операции удаления (Mutation) — тяжелые и асинхронные.
- Решение для профи: Использовать движок ReplacingMergeTree (он схлопывает дубликаты в фоне) или вставлять данные во временную таблицу, а потом делать EXCHANGE PARTITION (атомарная замена куска данных).
Типизация
Postgres простит вам, если вы передадите число как строку «123». ClickHouse при вставке через нативный протокол строг. Если колонка UInt32, а вы суете str, драйвер упадет. Всегда явно приводите типы в Python (как мы сделали int(row[‘count’])).
Таймауты
ClickHouse быстрый, но если вы попытаетесь вставить 10 ГБ одним запросом, соединение может разорваться.
- Совет: Разбивайте данные на чанки (chunks) по 10–50 тысяч строк внутри Python-кода и делайте ch_hook.execute в цикле.
Альтернатива: ClickHouseOperator
В провайдере есть готовый ClickHouseOperator. Он удобен для простых SQL-команд (оптимизация, удаление, создание таблиц).
from airflow_clickhouse_plugin.operators.clickhouse import ClickHouseOperator
optimize_table = ClickHouseOperator(
task_id="optimize_user_stats",
clickhouse_conn_id="my_clickhouse",
sql="OPTIMIZE TABLE user_stats FINAL"
)
Используйте его для сервисных задач, а загрузку данных делайте через Python/Hooks, так как вам нужен контроль над форматом данных.
Исправьте финальные варианты кода Dags и конфигурационных файлов и при необходимости сравните с нашими на GitHub где лежит код к Уроку 9.
Помощь Cursor: Генерация SQL и кода вставки
ClickHouse SQL (диалект) местами специфичен. Cursor поможет не лезть в документацию за синтаксисом движков.
Промпт 1 (DDL):
"Напиши SQL для создания таблицы ClickHouse events, которая хранит логи веб-сайта (timestamp, user_id, url). Используй движок MergeTree, партиционирование по месяцам и TTL (время жизни), чтобы удалять данные старше года."
Промпт 2 (Оптимизация вставки):
"Посмотри на этот Python-код вставки в ClickHouse. Перепиши его так, чтобы использовать генератор (generator) и вставлять данные батчами по 20 000 строк, чтобы не перегружать оперативную память."
Итог: Мы построили полный цикл: Данные -> Postgres -> S3 -> Обработка -> ClickHouse. Теперь в нашей базе лежат «золотые» данные, готовые к построению графиков в Grafana или Superset.
Но есть одна проблема, с которой вы столкнетесь, когда дагов станет 50 штук. Как не писать один и тот же код 50 раз? Как создавать DAG-и динамически, на основе конфигурационных файлов, а не копипасты? В финальной статье мы поговорим про Best Practices, динамическую генерацию DAG-ов и организацию «чистого кода» в Airflow.
Готовы к рефакторингу и высшему пилотажу?
Использованные референсы и материалы
- ClickHouse Python Driver Documentation
https://clickhouse-driver.readthedocs.io/en/latest/
Как работать с нативным TCP-протоколом ClickHouse из Python. - Optimizing Bulk Inserts
https://clickhouse.com/docs/en/optimize/bulk-inserts/
Почему в ClickHouse нельзя вставлять данные построчно, и зачем нам нужны батчи. - MergeTree Table Engine
https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/mergetree/
Как устроен основной движок таблиц и почему важен ключ сортировки.
Полный перечень статей Бесплатного курса «Apache Airflow для начинающих»
Урок 1. Apache Airflow с нуля: Архитектура, отличие от Cron и запуск в Docker
Урок 2. Масштабирование Airflow: Настройка CeleryExecutor и Redis в Docker Compose
Урок 3. Работа с базами данных в Airflow: Connections, Hooks и PostgresOperator
Урок 4. Airflow и S3: Интеграция с MinIO и Yandex Object Storage
Урок 5. Airflow и Hadoop: Настройка WebHDFS и работа с сенсорами (Sensors)
Урок 6. Запуск Apache Spark из Airflow: Гайд по SparkSubmitOperator
Урок 7. Airflow и Dask: Масштабирование тяжелых Python-задач и Pandas
Урок 8. Event-Driven Airflow: Запуск DAG по событиям из Apache Kafka
Урок 9. Загрузка данных в ClickHouse через Airflow: Быстрый ETL и батчинг
Урок 10. Airflow Best Practices: Динамические DAGи, TaskFlow API и Алертинг


