От чего зависит задержка передачи данных из Apache Kafka в ClickHouse, как ее определить и ускорить интеграцию брокера сообщений с колоночной СУБД: настройки и лучшие практики.
Интеграция ClickHouse с Kafka
Чтобы связать ClickHouse с внешними системами, в этой колоночной СУБД есть специальные механизмы – интеграционные движки таблиц. Например, для взаимодействия ClickHouse с Apache Kafka есть одноименный интеграционный движок, который позволяет публиковать потоки данных в топики и подписываться на них, а также обрабатывать сообщения по мере появления. Сама по себе таблица ClickHouse с движком Kafka является лишь интерфейсом чтения данных из топика, но не хранит данные внутри ClickHouse. Kafka-движок ClickHouse ожидает, что данные будут приходить из самой Kafka, а не вставляться с помощью SQL-оператора INSERT.
При использовании этого интеграционного движка следует помнить некоторые правила его работы:
- полученные сообщения отслеживаются автоматически, поэтому из одной группы потребителей Kafka каждое сообщение считывается только один раз. Если необходимо получить данные дважды, придется создать в Clickhouse копию таблицы с другим именем группы. Группы потребителей пластичны и синхронизированы на кластере. Например, если есть 10 топиков и 5 копий таблицы в кластере Clickhouse, то в каждую копию попадет по 2 топика. Если количество копий изменится, то распределение топиков по копиям изменится автоматически.
- Если для потребления данных использовать оператор SELECT, каждое сообщения из Kafka может быть прочитано только один раз. Поэтому практичнее создавать потоки реального времени с помощью материализованного преставления, которое преобразует данные от Kafka-движка и помещает их в отдельную, заранее созданную таблицу ClickHouse с соответствующей схемой.
- Когда к Kafka-движку присоединяется материализованное представление, оно начинает в фоновом режиме непрерывно получать сообщения от Kafka и преобразовывать их в необходимый формат с помощью SELECT. Материализованных представлений у одной Kafka-таблицы ClickHouse может быть сколько угодно, т.к. они не считывают данные из Kafka-таблицы непосредственно, а лишь получают новые записи (блоками). Поэтому можно писать в несколько таблиц с разным уровнем детализации, например, с группировкой, агрегацией и без них.
- для улучшения производительности потребленные из Kafka сообщения группируются в блоки размера max_insert_block_size. Если блок не удалось сформировать за stream_flush_interval_ms миллисекунд, то данные будут сброшены в таблицу независимо от полноты блока.
- Если необходимо изменить целевую таблицу с помощью команды ALTER TABLE, ее материализованное представление рекомендуется отключить, чтобы избежать рассогласования данных между целевой таблицей и представлением.
Все вышеперечисленные особенности характерны как для локального развертывания ClickHouse, так и для его облачной версии. При связи ClickHouse Cloud с Apache Kafka, помимо соответствующего интеграционного движка также используется коннектор и платформа Kafka Connect, которая обеспечивает взаимодействие сервиса колоночной СУБД и брокера сообщений. Приложения-продюсеры отправляют данные в топики Kafka, которые извлекаются коннектором Kafka Connect посредством опроса, передающим записи в ClickHouse. Практический пример такой интеграции Apache Kafka с ClickHouse Cloud я показывала здесь.
Хотя Kafka и ClickHouse сами по себе работают достаточно быстро, обеспечивая потоковую обработку данных в реальном времени, при их взаимной интеграции неизбежно возникают задержки. От чего они зависят и как их снизить, рассмотрим далее.
Как определить и устранить задержку передачи данных
Чтобы понять, есть ли проблема с задержкой в потоковом конвейере интеграции Kafka с ClickHouse, надо сперва измерить эту задержку. Проще всего сделать это, добавив значения временных меток к сообщениям в различных точках конвейера. Позже их можно сравнить и визуализировать, чтобы определить конкретные этапы, вызывающие задержку. Для этого можно использовать SMT-преобразования одиночных сообщений (SMT, Single Message Transform), которые применяются к сообщениям по мере их прохождения через Kafka Connect. SMT преобразуют входящие сообщения после того, как их создал коннектор источника, но до того, как они будут записаны в Kafka. SMT преобразуют исходящие сообщения перед их отправкой в коннектор-приемник.
Это преобразование берет несколько готовых полей (Смещение, Раздел, Метка времени и Топик) и добавляет их в объект, который коннектор передает в ClickHouse, без изменения продюсера. Это позволяет определить время выполнения операции на разных этапах интеграционного конвейера. Также можно точно понять, когда именно сообщение было вставлено в ClickHouse, используя временную метку вставки через выражение DEFAULT. Сравнив эту отметку времени с более ранними преобразованиями, можно сделать выводы о тенденции изменения задержки передачи данных.
Как уже было отмечено ранее, при использовании облачно ClickHouse Cloud, интеграция этой колоночной СУБД с Apache Kafka реализуется с использованием платформы Connect. Поэтому с увеличением звеньев интеграционной цепочки возрастает количество точек отказа и снижения эффективности передачи данных. В частности, задержка может возникнуть между Kafka и Kafka Connect. Записи извлекаются Kafka Connect независимо от коннектора. Настроить количество потребляемых данных можно с помощью конфигураций consumer.fetch.min.bytes и consumer.fetch.max.bytes. Частота выполнения poll-запросов регулируется параметром consumer.fetch.max.wait.ms, который также можно настроить.
В Kafka Connect задачи — это исполнители, которые фактически копируют данные. Поэтому есть рекомендация, что количество задач не должно быть меньше количества разделов топика. Однако, ClickHouse оптимизирован для больших объемов данных, а не для мелких частых вставок. Каждая вставка в ClickHouse обрабатывается отдельно для каждой задачи Kafka Connect, поэтому много разделов и задач не всегда оптимально. Количество разделов топика также влияет на объем вставок и размеры пакетов. Процесс опроса и выборки данных выполняется для каждой задачи, поэтому много разделов увеличивает время заполнения пакета и задержку передачи данных. Поэтому рекомендуется вставлять данные довольно большими пакетами, порядка нескольких тысяч строк за раз, от 10 000 до 100 000.
Также может возникнуть задержка между между Kafka Connect и ClickHouse Cloud. Одна из причин такой задержки – неактивный, т.е. приостановленный, экземпляр, поскольку облачная платформа переводит экземпляры в спящий режим, когда к сервису не выполняются запросы. Поэтому при наличии перерывов в активности продюсера, вставки и запросы тоже останавливаются. Если при этом продюсер снова начинает публиковать данные в Kafka, повторный запуск ClickHouse Cloud займет какое-то время. Если эта задержка неприемлема для бизнес-пользователей, можно настроить тайм-аут простоя службы в соответствии с рабочей нагрузкой, увеличив его так, чтобы более короткие периоды бездействия не приводили к простою.
По умолчанию коннектор Kafka Connect отправляет данные в ClickHouse Cloud посредством синхронных вставок, ожидая подтверждения об успешной записи в СУБД перед отправкой нового сообщения. Это надежно, но приводит к некоторой задержке, поскольку каждая вставка требует подтверждения. Если скорость важнее надежности, можно использовать асинхронные вставки, просто отправляя данные без ожидания подтверждения. Такое решение с асинхронным продюсером дает меньшие гарантии надежности, обеспечивая в лучшем случае семантику хотя бы один раз (at least once).
Про интеграцию ClickHouse с Apache AirFlow читайте в нашей новой статье. А освоить все тонкости использования ClickHouse и Apache Kafka для аналитики больших данных вы сможете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Источники