Чем полезна поддержка gRPC в Clickhouse и как ее реализовать: разбираем интерфейс удаленного вызова процедур на примере потоковой вставки событий пользовательского поведения из Kafka в таблицу колоночной базы данных со стриминговым выводом.
Поддержка gRPC в ClickHouse
ClickHouse поддерживает gRPC – фреймворк от Google и система удаленного вызова процедур с открытым исходным кодом, которая использует HTTP/2 и бинарный формат сериализации protobuf (Protocol Buffers) с встроенной схемой данных. Высокую скорость передачи данных в HTTP/2 обеспечивает мультиплексирование, эффективное сжатие заголовков и двунаправленный стриминг от клиента к серверу и наоборот. Строгая схема входных и выходных данных, задаваемая в proto-файле с определением исходных и результирующих сообщений, а также функций сервера, сокращает количество ошибок.
С точки зрения клиент-серверного взаимодействия gRPC предлагает четыре варианта:
- унарное в стиле запрос-ответ, когда клиент отправляет один запрос и получает один ответ;
- серверное потоковое, когда сервер начинает потоковую передачу данных в ответ на запрос клиента;
- клиентское потоковое, когда клиент отправляет поток запросов на сервер и получает от него один ответ;
- двунаправленное потоковое, когда клиент и сервер обмениваются друг с другом потоками данных в обоих направлениях.
Благодаря такому разнообразию вариантов взаимодействия gRPC позволяет реализовать множество интеграционных сценариев. Поэтому эта технология, поддерживаемая различными языками программирования, gRPC становится все более популярным для создания высокопроизводительных и масштабируемых распределённых систем.
Реализация gRPC в ClickHouse поддерживает: SSL-сертификат, аутентификацию, сеансы, сжатие, параллельные запросы через один и тот же канал, отмену запросов, получение логов и внешние таблицы.
Спецификация интерфейса описана в proto-файле clickhouse_grpc.proto:
syntax = "proto3"; package clickhouse.grpc; message NameAndType { string name = 1; string type = 2; } message ExternalTable { string name = 1; repeated NameAndType columns = 2; bytes data = 3; string format = 4; string compression_type = 6; map<string, string> settings = 5; } message ObsoleteTransportCompression { enum CompressionAlgorithm { NO_COMPRESSION = 0; DEFLATE = 1; GZIP = 2; STREAM_GZIP = 3; } enum CompressionLevel { COMPRESSION_NONE = 0; COMPRESSION_LOW = 1; COMPRESSION_MEDIUM = 2; COMPRESSION_HIGH = 3; } CompressionAlgorithm algorithm = 1; CompressionLevel level = 2; } message QueryInfo { string query = 1; string query_id = 2; map<string, string> settings = 3; string database = 4; bytes input_data = 5; bytes input_data_delimiter = 6; string output_format = 7; bool send_output_columns = 24; repeated ExternalTable external_tables = 8; string user_name = 9; string password = 10; string quota = 11; string jwt = 25; string session_id = 12; bool session_check = 13; uint32 session_timeout = 14; bool cancel = 15; bool next_query_info = 16; string input_compression_type = 20; string output_compression_type = 21; int32 output_compression_level = 19; string transport_compression_type = 22; int32 transport_compression_level = 23; ObsoleteTransportCompression obsolete_result_compression = 17; string obsolete_compression_type = 18; } enum LogsLevel { LOG_NONE = 0; LOG_FATAL = 1; LOG_CRITICAL = 2; LOG_ERROR = 3; LOG_WARNING = 4; LOG_NOTICE = 5; LOG_INFORMATION = 6; LOG_DEBUG = 7; LOG_TRACE = 8; } message LogEntry { uint32 time = 1; uint32 time_microseconds = 2; uint64 thread_id = 3; string query_id = 4; LogsLevel level = 5; string source = 6; string text = 7; } message Progress { uint64 read_rows = 1; uint64 read_bytes = 2; uint64 total_rows_to_read = 3; uint64 written_rows = 4; uint64 written_bytes = 5; } message Stats { uint64 rows = 1; uint64 blocks = 2; uint64 allocated_bytes = 3; bool applied_limit = 4; uint64 rows_before_limit = 5; bool applied_aggregation = 6; uint64 rows_before_aggregation = 7; } message Exception { int32 code = 1; string name = 2; string display_text = 3; string stack_trace = 4; } message Result { string query_id = 9; string time_zone = 10; string output_format = 11; repeated NameAndType output_columns = 12; bytes output = 1; bytes totals = 2; bytes extremes = 3; repeated LogEntry logs = 4; Progress progress = 5; Stats stats = 6; Exception exception = 7; bool cancelled = 8; } service ClickHouse { rpc ExecuteQuery(QueryInfo) returns (Result) {} rpc ExecuteQueryWithStreamInput(stream QueryInfo) returns (Result) {} rpc ExecuteQueryWithStreamOutput(QueryInfo) returns (stream Result) {} rpc ExecuteQueryWithStreamIO(stream QueryInfo) returns (stream Result) {} }
Согласно этому описанию, сервис предоставляет всего метода:
- ExecuteQuery(QueryInfo) принимает одно сообщений типа QueryInfo и возвращает одно сообщение типа Result. Это самый простой способ выполнения запроса в рамках унарного взаимодействия: клиент отправляет один запрос и получает один результат. Такой способ подходит для одиночных запросов с небольшим объемом данных.
- ExecuteQueryWithStreamInput(stream QueryInfo) принимает одно сообщений типа QueryInfo и возвращает одно сообщение типа Result. Это подходит для сценариев, где клиент должен отправить большой объем данных или несколько сообщений, прежде чем получить результат. Например, вставка множества строк, что характерно для ClickHouse, поскольку данные в этой колоночной базе хранятся блоками и при каждом INSERT-запросе создается новый блок.
- ExecuteQueryWithStreamOutput(QueryInfo) принимает поток сообщений типа QueryInfo и возвращает поток сообщений типа Result. Это полезно, когда запрос должен вернуть большой объем данных, который надо обработать по частям, не дожидаясь полного завершения передачи. При этом следует управлять соединением клиента с ClickHouse на время передачи стриминга.
- ExecuteQueryWithStreamIO(stream QueryInfo) принимает поток сообщений типа QueryInfo и возвращает поток сообщений типа Result. Этот вариант подходит для сложных сценариев, где требуется двунаправленная потоковая обработка.
Пример использования
В качестве примера напишем Python-скрипт, когда надо выполнить массовую вставку потока данных из топика Kafka в таблицу событий пользовательского поведения (user_session_id, event_type, timestamp) в Clickhouse и возвращать количество событий каждого типа. Данные в Kafka публикуются в формате JSON.
Сперва создадим таблицу в ClickHouse, которая будет хранить данные о событиях пользовательского поведения:
CREATE TABLE user_behavior_events ( user_session_id String, event_type Enum8('click' = 1, 'download' = 2, 'scroll' = 3, 'submit' = 4), timestamp DateTime ) ENGINE = MergeTree() ORDER BY (user_session_id, timestamp);
В этом запросе уникальный идентификатор сессии пользователя user_session_id хранится как строка, тип события event_type объявляется как перечисление (Enum8), где каждому типу события соответствует числовое значение, отметка времени события timestamp хранит данные в формате DateTime. Таблица использует движок MergeTree, чтобы эффективно выполнять запросы с упорядочиванием данных по пользовательской сессии и отметке времени.
Чтобы передать поток данных из Kafka в ClickHouse, используя gRPC, можно использовать библиотеку confluent-kafka для работы с Kafka и пакет grpcio для выполнения запросов к ClickHouse. При этом скрипт будет возвращать количество событий каждого типа, используя меотд ExecuteQueryWithStreamIO(stream QueryInfo).
import grpc from confluent_kafka import Consumer, KafkaError from clickhouse_pb2 import QueryInfo, Result from clickhouse_pb2_grpc import ClickHouseStub # Конфигурация Kafka KAFKA_TOPIC = "your_kafka_topic" KAFKA_BOOTSTRAP_SERVER = "localhost:9092" KAFKA_GROUP_ID = "your_group_id" # Конфигурация ClickHouse gRPC CLICKHOUSE_GRPC_SERVER = "localhost:9100" def consume_kafka_messages(): consumer = Consumer({ 'bootstrap.servers': KAFKA_BOOTSTRAP_SERVER, 'group.id': KAFKA_GROUP_ID, 'auto.offset.reset': 'earliest' }) consumer.subscribe([KAFKA_TOPIC]) try: while True: msg = consumer.poll(1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: continue else: print(msg.error()) break #сообщение в формате JSON event = msg.value().decode('utf-8') yield event finally: consumer.close() def insert_events_to_clickhouse(events): channel = grpc.insecure_channel(CLICKHOUSE_GRPC_SERVER) stub = ClickHouseStub(channel) for event in events: # Разбор события и подготовка данных для вставки user_session_id = event['user_session_id'] event_type = event['event_type'] timestamp = event['timestamp'] query = f"INSERT INTO user_behavior_events (user_session_id, event_type, timestamp) VALUES ('{user_session_id}', '{event_type}', '{timestamp}')" query_info = QueryInfo(query=query, database='default') # Выполнение запроса response_iterator = stub.ExecuteQueryWithStreamIO(iter([query_info])) # Обработка ответа for result in response_iterator: if result.exception.code != 0: print(f"Error: {result.exception.display_text}") else: print(f"Inserted event: {event}") def main(): events = consume_kafka_messages() insert_events_to_clickhouse(events) if __name__ == "__main__": main()
Разумеется, чтобы этот код работал, необходимо создать файлы clickhouse_pb2 и clickhouse_pb2_grpc, сгенерировав их из proto-файла определения gRPC-интерфейса с помощью компилятора protoc. В clickhouse_pb2 будут описаны классы входных и выходных данных для сериализации и десериализации сообщений, которые отправляются и принимаются через gRPC, а в clickhouse_pb2_grpc – определения абстрактных классов для сервера и методы для создания клиентов. При разработке на Python для файла clickhouse_pb2_grpc можно использовать встроенный Python-клиент, описанный в Github-репозитории ClickHouse.
Таким образом, использование gRPC может значительно улучшить производительность и надежность системы, которая обрабатывает потоки данных из Kafka и взаимодействует с Clickhouse для интерактивной аналитики больших данных.
Научиться работать с ClickHouse вам помогут специализированные курсы в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Источники