Интеграция ClickHouse с Apache AirFlow

интеграция ClickHouse с Apache AirFlow, курсы ClickHouse для дата-инженера, курсы Apache AirFlow, инженерия данных примеры курсы обучение, ETL с ClickHouse и Apache AirFlow, Школа Больших Данных Учебный Центр Коммерсант

Чем полезна интеграция ClickHouse с Apache Airflow и как ее реализовать: операторы в пакете провайдера и плагине на основе Python-драйвера. Принципы работы и примеры использования.

2 способа интеграции ClickHouse с AirFlow

Продолжая разговор про интеграцию ClickHouse с другими системами, сегодня рассмотрим, как связать эту колоночную СУБД с мощным ETL-движком Apache AirFlow. Помимо интеграционных табличных движков, которые входят в ядро ClickHouse и позволяют связать эту СУБД с внешними системами, также есть партнерские решения и разработки сообщества, которые создаются и поддерживаются разработчиками-энтузиастами, членами профессионального сообщества. Хотя они не предоставляют прямой поддержки, кроме общедоступных репозиториев GitHub и каналов сообщества Slack, среди этих решений много полезного для инженерии данных. Например, интеграция ClickHouse с Apache Airflow.

Будучи мощным ETL-фреймворком, Airflow отлично подходит для регулярного перемещения данных в ClickHouse и из нее, включая автоматизацию рутинных задач по извлечению, преобразованию и загрузке данных, а также генерацию отчетов. Организовать такое интеграционное взаимодействие между ClickHouse и Apache Airflow можно с помощью следующих инструментов:

  • пакет провайдера apache-airflow-providers-clickhouse, который содержит все необходимые операторы и хуки для взаимодействия с ClickHouse. Этот провайдер упрощает процесс написания DAG, которые выполняют запросы ClickHouse или управляют передачей данных.
  • плагин Apache Airflow для ClickHouse на основе Python-драйвера ClickHouse с поддержкой нативного TCP-интерфейса. Этот плагин предоставляет 2 способа связи с сервером (чистый клиента и API БД) и имеет 2 семейства операторов: расширенные и стандартизированные. Операторы предлагают полную функциональность clickhouse-driver и рекомендуются для опытных дата-инженеров.

Разумеется, каждый из этих способов имеет свои достоинства и недостатки, которые мы рассмотрим далее.

Использование пакета провайдеров

Провайдер apache-airflow-providers-clickhouse разрешает подключение Apache Airflow к базе данных Clickhouse и выполняет запросы с помощью ClickhouseOperator. Чтобы использовать его, сперва надо установить этот пакет с помощью менеджера пакетов pip:

pip install apache-airflow-providers-clickhouse

После успешной установки пакет появится в списке провайдеров, что можно увидеть в веб-GUI Airflow

Установленные пакеты провайдеров AirFlow
Установленные пакеты провайдеров AirFlow

Затем надо создать новое подключение к ClickHouse в пользовательском интерфейсе Airflow, например, как здесь.  При этом надо задать уникальный идентификатор подключения, тип подключения ClickHouse, хоста сервера ClickHouse, имя базы данных в параметре схема, логин и пароль пользователя для аутентификации, а также порт сервера ClickHouse (по умолчанию — 8123).

Создание подключения к ClickHouse в Apache AirFlow
Создание подключения к ClickHouse в Apache AirFlow

Далее следует проверить соединение, чтобы убедиться, что Airflow может взаимодействовать с ClickHouse. Наконец, можно использовать операторы ClickHouse в DAG AirFlow для выполнения таких задач, как запрос данных, вставка и передача. Например, следующий код настраивает автоматический рабочий процесс для создания таблицы в базе данных ClickHouse, который будет выполняться каждый день, начиная со вчерашнего дня по времени запуска скрипта:

from airflow import DAG
from airflow.providers.clickhouse.operators.clickhouse import ClickHouseOperator
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'airflow',
}

dag = DAG(
    'clickhouse_workflow',
    default_args=default_args,
    description='A simple ClickHouse workflow',
    schedule_interval='@daily',
    start_date=days_ago(1),
    tags=['example'],
)

create_table = ClickHouseOperator(
    task_id='create_clickhouse_table',
    clickhouse_conn_id='clickhouse_default',
    sql='CREATE TABLE my_table ...',
    dag=dag,
)

А этот пример DAG будет автоматически выполнять задачу query_task каждые 5 минут, отправляя SQL-запрос в ClickHouse и получая результаты:

from airflow.providers.clickhouse.operators.clickhouse import ClickHouseOperator
dag = DAG('clickhouse_query', default_args=default_args, schedule_interval=timedelta(minutes=5))

query_task = ClickHouseOperator(
    task_id='perform_query',
    clickhouse_conn_id='clickhouse_default',
    sql='SELECT * FROM my_table',
    dag=dag
)

Чтобы использовать этот интеграционный плагин эффективно, рекомендуется управлять подключениями, поскольку это ограниченный ресурс Airflow. Для этого можно реализовать пул подключений, например, с помощью PGBouncer – программы управления пулом соединений PostgreSQL. Это решение поможет снизить нагрузку на ClickHouse, но оно подходит только для PostgreSQL-интерфейса ClickHouse, например, когда надо подключить к ClickHouse клиентское приложение PostgreSQL, которое еще не поддерживается колоночной СУБД напрямую. Для MySQL можно применить обработку соединений на основе потоков, что актуально при использовании табличного движка MySQL в Clickhouse, который позволяет выполнять запросы SELECT и INSERT над данными в удаленной БД MySQL.

Для повышения производительности интеграционных процессов следует контролировать использование ЦП в AirFlow с помощью параметра scheduler__min_file_process_interval. Эта конфигурация задает количество секунд, после которых анализируется файл DAG. Чем меньше это число, тем выше нагрузка на ЦП.  Также рекомендуется настроить другие конфигурации масштабирования AirFlow, чтобы по максимуму использовать производительность ClickHouse для параллельной обработки и крупномасштабной аналитики больших данных.

Наконец, для эффективной передачи данных между задачами AirFlow ка обычно, рекомендуется использовать API TaskFlow и равномерно распределять задачи между рабочими процессами, чтобы сбалансировать нагрузку. Для управления сложными структурами DAG следует применять SubDAGs и TaskGroups, а также внедрять универсальные практики дата-инженерии с AirFlow. Например, описывать каждую задачу и запросы к БД в отдельных файлах, а также предварительно и централизованно настроить подключения к внешним системам, отделив это от кода самого DAG.

Использование плагина

Как уже было отмечено ранее, плагин Apache Airflow для ClickHouse основан на Clickhouse-драйвере, который предназначен для связи с сервером ClickHouse из Python по собственному протоколу. В отличие от HTTP-протокола для связи с сервером ClickHouse, собственный TCP-протокол более гибкий и имеет больше настроек. Он позволяет быстро передавать двоичные данные, из которых можно быстрее создавать типы Python. Сжатие LZ4 в TCP-протоколе быстрее, чем gzip в протоколе HTTP. Также в собственном протоколе доступна информация о профиле запроса, позволяя читать строки перед предельной метрикой. Драйвер clickhouse использует собственный протокол (порт 9000) и, помимо синхронной,  также и имеет асинхронную оболочку aioch.

Плагин имеет 2 семейства операторов: расширенные clickhouse_driver.Client.execute и стандартизированные, совместимые с Python DB API 2.0. Оба семейства операторов полностью поддерживаются и тестируются для разных версий Airflow и Python. Операторы clickhouse-драйвера (ClickHouseOperator, ClickHouseHook, ClickHouseSensor) основаны на методе Client.execute() Python-драйвера ClickHouse. Они предлагают полную функциональность clickhouse-driver, реализуя следующие функции:

  • шаблонное создание SQL-запросов и других параметров;
  • выполнение нескольких запросов SQL в одном файле ClickHouseOperator. Результат последнего запроса передается в XCom, что настраивается с помощью свойства do_xcom_push. Если свойство do_xcom_push равно False , результат не передается в XCom.
  • логирование — выполненные запросы протоколируются в наглядном формате, что упрощает их отслеживание и отладку;
  • пользовательские параметры подключения к ClickHouse, включая таймауты, сжатие и пр. через свойство extra в объекте подключения Airflow.

Чтобы использовать это семейство операторов плагина, его надо сперва установить с помощью менеджера пакетов pip:

pip install -U airflow-clickhouse-plugin

Этому семейству операторов плагина необходимы только 2 зависимости: сам apache-airflow и clickhouse-driver.

Также плагин Apache Airflow для ClickHouse поддерживает семейство операторов Python DB API 2.0: ClickHouseSQLExecuteQueryOperator, ClickHouseSQLColumnCheckOperator, ClickHouseSQLTableCheckOperator, ClickHouseSQLCheckOperator, ClickHouseSQLValueCheckOperator, ClickHouseSQLIntervalCheckOperator, ClickHouseSQLThresholdCheckOperator, ClickHouseBranchSQLOperator, хуки БД ClickHouseDbApiHook и сенсор ClickHouseSqlSensor. Эти операторы пакета apache-airflow-providers-common-sql сочетаются с clickhouse_driver.dbapi. Хотя они имеют ограниченную функциональность по сравнению с Client.execute, т.к. поддерживают не все аргументы, они предоставляют стандартизированный интерфейс. Это полезно при переносе конвейеров Airflow в ClickHouse от другого поставщика SQL, поддерживаемого пакетом common.sql, например MySQL, Postgres, BigQuery и др. Набор функций этой версии полностью основан на пакете common.sql провайдера Airflow. Поэтому при установке плагина необходимо установить зависимость common.sql, чтобы включить операторы DB API 2.0:

pip install -U airflow-clickhouse-plugin[common.sql]

Зависимости apache-airflow-providers-common-sql, обычно предварительно упакованные с помощью Airflow, должны быть установлены в дополнение к самому apache-airflow и Python-драйверу Clickhouse.

Узнайте больше про ClickHouse и Apache Airflow для аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://clickhouse.com/docs/en/integrations
  2. https://www.restack.io/docs/airflow-knowledge-apache-providers-clickhouse
  3. https://github.com/bryzgaloff-pypi/apache-airflow-providers-clickhouse
  4. https://github.com/mymarilyn/clickhouse-driver
  5. https://pypi.org/project/airflow-clickhouse-plugin/
Поиск по сайту