Как публиковать в топик Kafka сообщения в формате Protobuf, используя реестр схем и библиотеку confluent-kafka. Пример Python-продюсера, конфигурационного YAML-файла для Docker-развертывания Kafka Confluent и тунелирование портов локального компьютера.
Подготовка инфраструктуры и определение схемы данных
Чтобы публиковать в свое Docker-развертывание Kafka Confluent данные, используя реестр схем, нужно сперва внести изменения в конфигурационный YAML-файл docker-compose.yml, последнюю версию которого я показывала в прошлой статье. Это необходимо сделать, поскольку в коде приложения-продюсера надо явно указывать веб-сервер Schema Registry. Для этого следует изменить порты, открыв порт 8085 на локальном хосте и отразив это в слушателях через конфигурацию SCHEMA_REGISTRY_LISTENERS.
После внесения изменений файл docker-compose.yml для запуска набора Docker-контейнеров с сервисами платформы Kafka Confluent в WSL выглядит так:
version: '3.6' volumes: zookeeper-data: driver: local zookeeper-log: driver: local kafka-data: driver: local services: akhq: # build: # context: . image: tchiotludo/akhq restart: unless-stopped environment: AKHQ_CONFIGURATION: | akhq: connections: docker-kafka-server: properties: bootstrap.servers: "kafka:9092" schema-registry: url: "http://schema-registry:8085" connect: - name: "connect" url: "http://connect:8083" ports: - 8080:8080 links: - kafka - schema-registry zookeeper: image: confluentinc/cp-zookeeper:7.5.0 restart: unless-stopped ports: - 2181:2181 volumes: - zookeeper-data:/var/lib/zookeeper/data:Z - zookeeper-log:/var/lib/zookeeper/log:Z environment: ZOOKEEPER_CLIENT_PORT: '2181' ZOOKEEPER_ADMIN_ENABLE_SERVER: 'false' kafka: image: confluentinc/cp-kafka:7.5.0 restart: unless-stopped volumes: - kafka-data:/var/lib/kafka/data:Z environment: KAFKA_BROKER_ID: '0' KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_NUM_PARTITIONS: '12' KAFKA_COMPRESSION_TYPE: 'gzip' KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: '1' KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: '1' KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: '1' KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092,PLAINTEXT_EXT://serveo.net:39092' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAINTEXT_EXT:PLAINTEXT' KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: 'false' KAFKA_JMX_PORT: '9091' KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer' KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true' ports: - 39092:39092 links: - zookeeper schema-registry: image: confluentinc/cp-schema-registry:7.5.0 restart: unless-stopped depends_on: - kafka ports: - "8085:8085" environment: SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka:9092' SCHEMA_REGISTRY_HOST_NAME: 'schema-registry' SCHEMA_REGISTRY_LISTENERS: 'http://0.0.0.0:8085' SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: 'INFO' SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_ORIGIN: '*' SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_METHODS: 'GET,POST,PUT,DELETE,OPTIONS' connect: image: confluentinc/cp-kafka-connect:7.5.0 restart: unless-stopped depends_on: - kafka - schema-registry ports: - 8083:8083 environment: CONNECT_BOOTSTRAP_SERVERS: 'kafka:9092' CONNECT_REST_PORT: '8083' CONNECT_REST_LISTENERS: 'http://0.0.0.0:8083' CONNECT_REST_ADVERTISED_HOST_NAME: 'connect' CONNECT_CONFIG_STORAGE_TOPIC: '__connect-config' CONNECT_OFFSET_STORAGE_TOPIC: '__connect-offsets' CONNECT_STATUS_STORAGE_TOPIC: '__connect-status' CONNECT_GROUP_ID: 'kafka-connect' CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: 'true' CONNECT_KEY_CONVERTER: 'io.confluent.connect.avro.AvroConverter' CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8085' CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: 'true' CONNECT_VALUE_CONVERTER: 'io.confluent.connect.avro.AvroConverter' CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8085' CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter' CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter' CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '1' CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '1' CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '1' CONNECT_PLUGIN_PATH: ' /usr/share/java/' ksqldb: image: confluentinc/cp-ksqldb-server:7.5.0 restart: unless-stopped depends_on: - kafka - connect - schema-registry ports: - 8088:8088 environment: KSQL_BOOTSTRAP_SERVERS: 'kafka:9092' KSQL_LISTENERS: 'http://0.0.0.0:8088' KSQL_KSQL_SERVICE_ID: 'ksql' KSQL_KSQL_SCHEMA_REGISTRY_URL: 'http://schema-registry:8085' KSQL_KSQL_CONNECT_URL: 'http://connect:8083' KSQL_KSQL_SINK_PARTITIONS: '1' KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: '1' test-data: image: gradle:8-jdk11 command: "gradle --no-daemon testInjectData -x installFrontend -x assembleFrontend" restart: unless-stopped working_dir: /app volumes: - ./:/app:z links: - kafka - schema-registry kafkacat: image: confluentinc/cp-kafkacat:7.1.14 restart: unless-stopped depends_on: - kafka command: - bash - -c - | kafkacat -P -b kafka:9092 -t json << EOF {"_id":"5c4b2b45ab234c86955f0802","index":0,"guid":"d3637b06-9940-4958-9f82-639001c14c34"} {"_id":"5c4b2b459ffa9bb0c0c249e1","index":1,"guid":"08612fb5-40a7-45e5-9ff2-beb89a1b2835"} {"_id":"5c4b2b4545d7cbc7bf8b6e3e","index":2,"guid":"4880280a-cf8b-4884-881e-7b64ebf2afd0"} {"_id":"5c4b2b45dab381e6b3024c6d","index":3,"guid":"36d04c26-0dae-4a8e-a66e-bde9b3b6a745"} {"_id":"5c4b2b45d1103ce30dfe1947","index":4,"guid":"14d53f2c-def3-406f-9dfb-c29963fdc37e"} {"_id":"5c4b2b45d6d3b5c51d3dacb7","index":5,"guid":"a20cfc3a-934a-4b93-9a03-008ec651b5a4"} EOF kafkacat -P -b kafka:9092 -t csv << EOF 1,Sauncho,Attfield,sattfield0@netlog.com,Male,221.119.13.246 2,Luci,Harp,lharp1@wufoo.com,Female,161.14.184.150 3,Hanna,McQuillan,hmcquillan2@mozilla.com,Female,214.67.74.80 4,Melba,Lecky,mlecky3@uiuc.edu,Female,158.112.18.189 5,Mordecai,Hurdiss,mhurdiss4@rambler.ru,Male,175.123.45.143 EOF kafkacat -b kafka:9092 -o beginning -G json-consumer json links: - kafka
Также необходимо открыть порты. Как в прошлый раз, сделаю это с помощью SSH-сервера Serveo. Необходимо открыть 3 порта локального хоста:
- 8080 для веб-интерфейса Kafka, в качестве которого используется AKHQ;
- 39093 для TCP-трафика, чтобы обеспечить подключение клиентов Kafka к брокеру;
- 8085 для веб-сервера реестра схем.
Создать схему в формате protobuf можно прямо в веб-интерфейсе AKHQ. Предположим, сперва в Kafka будут публиковаться данные о времени события, ФИО клиента и его email. Сообщение с этими полями данных в бинарном формате Protobuf будет выглядеть так:
syntax = "proto3"; message Application { string event_time = 2; string client = 3; string email = 4; }
Создадим эту схему в реестре схем Kafka через интерфейс AKHQ:
Предположим, во 2-ой версии этой схемы данных добавлены поля с телефоном и рейтингом клиента. Отразим это в эволюции схемы данных, просто изменив ее:
syntax = "proto3"; message ApplicationWithPhone { string event_time = 2; string client = 3; string email = 4; string phone = 5; int64 rating = 6; }
В Schema Registry есть несколько уровней совместимости (Compatibility Levels), выбор которых зависит от требований к изменениям в схемах. В рассматриваемом примере к существующей схеме данных добавляются новые поля (phone и rating). Эти изменения являются расширяющими (backward-compatible), то есть они не требуют изменений в существующих потребителях данных, которые могут просто игнорировать новые поля.
Для такой ситуации подойдут следующие уровни совместимости:
- Backward – новая схема совместима со старыми данными, т.е. потребители, работающие со старой схемой, смогут читать данные, созданные по новой схеме;
- Backward Transitive — то же самое, что и Backward, но применимо ко всем предыдущим версиям схемы, а не только к последней;
- Full — совместимость в обоих направлениях (backward и forward), т.е. новая схема может быть использована как для старых данных, так и для новых потребителей;
- Full Transitive — то же самое, что и Full, но применимо ко всем предыдущим версиям схемы.
Для полной универсальности надо выбирать Full или Full Transitive, но они требуют больше проверок, что может снижать производительность потоковой передачи. Подготовив инфраструктуру, далее напишем приложение-продюсер, которое будет публиковать данные в топик согласно представленным схемам.
Публикация сообщения в топик Kafka
Чтобы написать Python-код публикации данных в Kafka в IDE PyCharm, надо сперва установить необходимые библиотеки. В моем случае это confluent-kafka, protobuf, protoc-exe и Faker для генерации случайных данных. Сперва надо определить структуры данных в proto-файле. Это выглядит так:
syntax = "proto3"; message Application { string event_time = 2; string client = 3; string email = 4; } message ApplicationWithPhone { string event_time = 2; string client = 3; string email = 4; string phone = 5; int64 rating = 6; }
Далее нужно сгенерировать из proto-файла файл _pb2.py, необходимый для сериализации и десериализации данных в формате protobuf. Для этого в командной строке нужно запустить компилятор protoc, который входит в состав библиотеки protoc-exe. Для этого надо выполнить в терминале команду
protoc --python_out=. application.proto
Сгенерированный файл _pb2.py включает в себя Python-классы, соответствующие сообщениям, определённым в proto-файле. В нем определены методы для преобразования сообщений в байтовый формат и обратно. Это упрощает передачу данных между различными компонентами системы, даже если они написаны на разных языках программирования, обеспечивает обратную и прямую совместимость данных, а также компактное и быстрое кодирование данных.
Содержимое сгенерированного файла application_pb2.py следующее:
# -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: application.proto """Generated protocol buffer code.""" from google.protobuf.internal import builder as _builder from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool from google.protobuf import symbol_database as _symbol_database # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x11\x61pplication.proto\"@\n\x0b\x41pplication\x12\x12\n\nevent_time\x18\x02 \x01(\t\x12\x0e\n\x06\x63lient\x18\x03 \x01(\t\x12\r\n\x05\x65mail\x18\x04 \x01(\t\"h\n\x14\x41pplicationWithPhone\x12\x12\n\nevent_time\x18\x02 \x01(\t\x12\x0e\n\x06\x63lient\x18\x03 \x01(\t\x12\r\n\x05\x65mail\x18\x04 \x01(\t\x12\r\n\x05phone\x18\x05 \x01(\t\x12\x0e\n\x06rating\x18\x06 \x01(\x03\x62\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'application_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: DESCRIPTOR._options = None _globals['_APPLICATION']._serialized_start=21 _globals['_APPLICATION']._serialized_end=85 _globals['_APPLICATIONWITHPHONE']._serialized_start=87 _globals['_APPLICATIONWITHPHONE']._serialized_end=191 # @@protoc_insertion_point(module_scope)
Этот файл нельзя изменять вручную. Чтобы использовать структуры данных и функции из этого файла в коде своего приложения, их надо импортировать. Таким образом, Python-код моего приложения-продюсера, публикующего в Kafka сообщения определенной схемы данных будет следующим:
kafka_url="serveo.net:39092" schemaregistry_url="https://kafkaconnect.serveo.net" import random import time from confluent_kafka import Producer from faker import Faker import application_pb2 # импорт сгенерированных классов fake = Faker('ru_RU') # создание продюсера producer = Producer({ 'bootstrap.servers': kafka_url, }) id = 0 topic = 'top' # бесконечный цикл публикации данных. while True: schema = random.choice([1, 2]) start_time = time.time() producer_publish_time = time.strftime("%m/%d/%Y %H:%M:%S", time.localtime(start_time)) if schema == 1: application = application_pb2.Application( event_time=producer_publish_time, client=fake.name(), email=fake.free_email() ) value = application.SerializeToString() else: application = application_pb2.ApplicationWithPhone( event_time=producer_publish_time, client=fake.name(), email=fake.free_email(), phone=fake.phone_number(), rating=random.randint(1, 10) # пример рейтинга ) value = application.SerializeToString() producer.produce(topic=topic, value=value) print("Сообщение успешно опубликовано") print(f' [x] Содержимое заявки {application}') # повтор через 3 секунды. time.sleep(3)
Чтобы поработать с реестром схем, в котором есть 2 версии одной и той же схемы данных, в этом коде для отправляемых в Kafka сообщений случайным образом выбирается одна из двух возможных схем.
Просмотреть опубликованные сообщения можно в веб-интерфейсе AKHQ.
Таким образом, использование Protobuf-формата вместе с реестром схема Kafka обеспечивает эффективную сериализацию данных, централизованное управление схемами и проверку совместимости. Все это повышает производительность и надежность системы потоковой передачи событий.
Научитесь администрированию и эксплуатации Apache Kafka на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве: