Как публиковать в топик Kafka сообщения в формате Protobuf, используя реестр схем и библиотеку confluent-kafka. Пример Python-продюсера, конфигурационного YAML-файла для Docker-развертывания Kafka Confluent и тунелирование портов локального компьютера.
Подготовка инфраструктуры и определение схемы данных
Чтобы публиковать в свое Docker-развертывание Kafka Confluent данные, используя реестр схем, нужно сперва внести изменения в конфигурационный YAML-файл docker-compose.yml, последнюю версию которого я показывала в прошлой статье. Это необходимо сделать, поскольку в коде приложения-продюсера надо явно указывать веб-сервер Schema Registry. Для этого следует изменить порты, открыв порт 8085 на локальном хосте и отразив это в слушателях через конфигурацию SCHEMA_REGISTRY_LISTENERS.
После внесения изменений файл docker-compose.yml для запуска набора Docker-контейнеров с сервисами платформы Kafka Confluent в WSL выглядит так:
version: '3.6'
volumes:
zookeeper-data:
driver: local
zookeeper-log:
driver: local
kafka-data:
driver: local
services:
akhq:
# build:
# context: .
image: tchiotludo/akhq
restart: unless-stopped
environment:
AKHQ_CONFIGURATION: |
akhq:
connections:
docker-kafka-server:
properties:
bootstrap.servers: "kafka:9092"
schema-registry:
url: "http://schema-registry:8085"
connect:
- name: "connect"
url: "http://connect:8083"
ports:
- 8080:8080
links:
- kafka
- schema-registry
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
restart: unless-stopped
ports:
- 2181:2181
volumes:
- zookeeper-data:/var/lib/zookeeper/data:Z
- zookeeper-log:/var/lib/zookeeper/log:Z
environment:
ZOOKEEPER_CLIENT_PORT: '2181'
ZOOKEEPER_ADMIN_ENABLE_SERVER: 'false'
kafka:
image: confluentinc/cp-kafka:7.5.0
restart: unless-stopped
volumes:
- kafka-data:/var/lib/kafka/data:Z
environment:
KAFKA_BROKER_ID: '0'
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_NUM_PARTITIONS: '12'
KAFKA_COMPRESSION_TYPE: 'gzip'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: '1'
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: '1'
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: '1'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092,PLAINTEXT_EXT://serveo.net:39092'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAINTEXT_EXT:PLAINTEXT'
KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: 'false'
KAFKA_JMX_PORT: '9091'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer'
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
ports:
- 39092:39092
links:
- zookeeper
schema-registry:
image: confluentinc/cp-schema-registry:7.5.0
restart: unless-stopped
depends_on:
- kafka
ports:
- "8085:8085"
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka:9092'
SCHEMA_REGISTRY_HOST_NAME: 'schema-registry'
SCHEMA_REGISTRY_LISTENERS: 'http://0.0.0.0:8085'
SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: 'INFO'
SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_ORIGIN: '*'
SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_METHODS: 'GET,POST,PUT,DELETE,OPTIONS'
connect:
image: confluentinc/cp-kafka-connect:7.5.0
restart: unless-stopped
depends_on:
- kafka
- schema-registry
ports:
- 8083:8083
environment:
CONNECT_BOOTSTRAP_SERVERS: 'kafka:9092'
CONNECT_REST_PORT: '8083'
CONNECT_REST_LISTENERS: 'http://0.0.0.0:8083'
CONNECT_REST_ADVERTISED_HOST_NAME: 'connect'
CONNECT_CONFIG_STORAGE_TOPIC: '__connect-config'
CONNECT_OFFSET_STORAGE_TOPIC: '__connect-offsets'
CONNECT_STATUS_STORAGE_TOPIC: '__connect-status'
CONNECT_GROUP_ID: 'kafka-connect'
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: 'true'
CONNECT_KEY_CONVERTER: 'io.confluent.connect.avro.AvroConverter'
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8085'
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: 'true'
CONNECT_VALUE_CONVERTER: 'io.confluent.connect.avro.AvroConverter'
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8085'
CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '1'
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '1'
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '1'
CONNECT_PLUGIN_PATH: ' /usr/share/java/'
ksqldb:
image: confluentinc/cp-ksqldb-server:7.5.0
restart: unless-stopped
depends_on:
- kafka
- connect
- schema-registry
ports:
- 8088:8088
environment:
KSQL_BOOTSTRAP_SERVERS: 'kafka:9092'
KSQL_LISTENERS: 'http://0.0.0.0:8088'
KSQL_KSQL_SERVICE_ID: 'ksql'
KSQL_KSQL_SCHEMA_REGISTRY_URL: 'http://schema-registry:8085'
KSQL_KSQL_CONNECT_URL: 'http://connect:8083'
KSQL_KSQL_SINK_PARTITIONS: '1'
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: '1'
test-data:
image: gradle:8-jdk11
command: "gradle --no-daemon testInjectData -x installFrontend -x assembleFrontend"
restart: unless-stopped
working_dir: /app
volumes:
- ./:/app:z
links:
- kafka
- schema-registry
kafkacat:
image: confluentinc/cp-kafkacat:7.1.14
restart: unless-stopped
depends_on:
- kafka
command:
- bash
- -c
- |
kafkacat -P -b kafka:9092 -t json << EOF
{"_id":"5c4b2b45ab234c86955f0802","index":0,"guid":"d3637b06-9940-4958-9f82-639001c14c34"}
{"_id":"5c4b2b459ffa9bb0c0c249e1","index":1,"guid":"08612fb5-40a7-45e5-9ff2-beb89a1b2835"}
{"_id":"5c4b2b4545d7cbc7bf8b6e3e","index":2,"guid":"4880280a-cf8b-4884-881e-7b64ebf2afd0"}
{"_id":"5c4b2b45dab381e6b3024c6d","index":3,"guid":"36d04c26-0dae-4a8e-a66e-bde9b3b6a745"}
{"_id":"5c4b2b45d1103ce30dfe1947","index":4,"guid":"14d53f2c-def3-406f-9dfb-c29963fdc37e"}
{"_id":"5c4b2b45d6d3b5c51d3dacb7","index":5,"guid":"a20cfc3a-934a-4b93-9a03-008ec651b5a4"}
EOF
kafkacat -P -b kafka:9092 -t csv << EOF
1,Sauncho,Attfield,sattfield0@netlog.com,Male,221.119.13.246
2,Luci,Harp,lharp1@wufoo.com,Female,161.14.184.150
3,Hanna,McQuillan,hmcquillan2@mozilla.com,Female,214.67.74.80
4,Melba,Lecky,mlecky3@uiuc.edu,Female,158.112.18.189
5,Mordecai,Hurdiss,mhurdiss4@rambler.ru,Male,175.123.45.143
EOF
kafkacat -b kafka:9092 -o beginning -G json-consumer json
links:
- kafka
Также необходимо открыть порты. Как в прошлый раз, сделаю это с помощью SSH-сервера Serveo. Необходимо открыть 3 порта локального хоста:
- 8080 для веб-интерфейса Kafka, в качестве которого используется AKHQ;
- 39093 для TCP-трафика, чтобы обеспечить подключение клиентов Kafka к брокеру;
- 8085 для веб-сервера реестра схем.

Создать схему в формате protobuf можно прямо в веб-интерфейсе AKHQ. Предположим, сперва в Kafka будут публиковаться данные о времени события, ФИО клиента и его email. Сообщение с этими полями данных в бинарном формате Protobuf будет выглядеть так:
syntax = "proto3";
message Application {
string event_time = 2;
string client = 3;
string email = 4;
}
Создадим эту схему в реестре схем Kafka через интерфейс AKHQ:

Предположим, во 2-ой версии этой схемы данных добавлены поля с телефоном и рейтингом клиента. Отразим это в эволюции схемы данных, просто изменив ее:
syntax = "proto3";
message ApplicationWithPhone {
string event_time = 2;
string client = 3;
string email = 4;
string phone = 5;
int64 rating = 6;
}

В Schema Registry есть несколько уровней совместимости (Compatibility Levels), выбор которых зависит от требований к изменениям в схемах. В рассматриваемом примере к существующей схеме данных добавляются новые поля (phone и rating). Эти изменения являются расширяющими (backward-compatible), то есть они не требуют изменений в существующих потребителях данных, которые могут просто игнорировать новые поля.
Для такой ситуации подойдут следующие уровни совместимости:
- Backward – новая схема совместима со старыми данными, т.е. потребители, работающие со старой схемой, смогут читать данные, созданные по новой схеме;
- Backward Transitive — то же самое, что и Backward, но применимо ко всем предыдущим версиям схемы, а не только к последней;
- Full — совместимость в обоих направлениях (backward и forward), т.е. новая схема может быть использована как для старых данных, так и для новых потребителей;
- Full Transitive — то же самое, что и Full, но применимо ко всем предыдущим версиям схемы.
Для полной универсальности надо выбирать Full или Full Transitive, но они требуют больше проверок, что может снижать производительность потоковой передачи. Подготовив инфраструктуру, далее напишем приложение-продюсер, которое будет публиковать данные в топик согласно представленным схемам.
Публикация сообщения в топик Kafka
Чтобы написать Python-код публикации данных в Kafka в IDE PyCharm, надо сперва установить необходимые библиотеки. В моем случае это confluent-kafka, protobuf, protoc-exe и Faker для генерации случайных данных. Сперва надо определить структуры данных в proto-файле. Это выглядит так:
syntax = "proto3";
message Application {
string event_time = 2;
string client = 3;
string email = 4;
}
message ApplicationWithPhone {
string event_time = 2;
string client = 3;
string email = 4;
string phone = 5;
int64 rating = 6;
}
Далее нужно сгенерировать из proto-файла файл _pb2.py, необходимый для сериализации и десериализации данных в формате protobuf. Для этого в командной строке нужно запустить компилятор protoc, который входит в состав библиотеки protoc-exe. Для этого надо выполнить в терминале команду
protoc --python_out=. application.proto
Сгенерированный файл _pb2.py включает в себя Python-классы, соответствующие сообщениям, определённым в proto-файле. В нем определены методы для преобразования сообщений в байтовый формат и обратно. Это упрощает передачу данных между различными компонентами системы, даже если они написаны на разных языках программирования, обеспечивает обратную и прямую совместимость данных, а также компактное и быстрое кодирование данных.

Содержимое сгенерированного файла application_pb2.py следующее:
# -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: application.proto """Generated protocol buffer code.""" from google.protobuf.internal import builder as _builder from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool from google.protobuf import symbol_database as _symbol_database # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x11\x61pplication.proto\"@\n\x0b\x41pplication\x12\x12\n\nevent_time\x18\x02 \x01(\t\x12\x0e\n\x06\x63lient\x18\x03 \x01(\t\x12\r\n\x05\x65mail\x18\x04 \x01(\t\"h\n\x14\x41pplicationWithPhone\x12\x12\n\nevent_time\x18\x02 \x01(\t\x12\x0e\n\x06\x63lient\x18\x03 \x01(\t\x12\r\n\x05\x65mail\x18\x04 \x01(\t\x12\r\n\x05phone\x18\x05 \x01(\t\x12\x0e\n\x06rating\x18\x06 \x01(\x03\x62\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'application_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: DESCRIPTOR._options = None _globals['_APPLICATION']._serialized_start=21 _globals['_APPLICATION']._serialized_end=85 _globals['_APPLICATIONWITHPHONE']._serialized_start=87 _globals['_APPLICATIONWITHPHONE']._serialized_end=191 # @@protoc_insertion_point(module_scope)
Этот файл нельзя изменять вручную. Чтобы использовать структуры данных и функции из этого файла в коде своего приложения, их надо импортировать. Таким образом, Python-код моего приложения-продюсера, публикующего в Kafka сообщения определенной схемы данных будет следующим:
kafka_url="serveo.net:39092"
schemaregistry_url="https://kafkaconnect.serveo.net"
import random
import time
from confluent_kafka import Producer
from faker import Faker
import application_pb2 # импорт сгенерированных классов
fake = Faker('ru_RU')
# создание продюсера
producer = Producer({
'bootstrap.servers': kafka_url,
})
id = 0
topic = 'top'
# бесконечный цикл публикации данных.
while True:
schema = random.choice([1, 2])
start_time = time.time()
producer_publish_time = time.strftime("%m/%d/%Y %H:%M:%S", time.localtime(start_time))
if schema == 1:
application = application_pb2.Application(
event_time=producer_publish_time,
client=fake.name(),
email=fake.free_email()
)
value = application.SerializeToString()
else:
application = application_pb2.ApplicationWithPhone(
event_time=producer_publish_time,
client=fake.name(),
email=fake.free_email(),
phone=fake.phone_number(),
rating=random.randint(1, 10) # пример рейтинга
)
value = application.SerializeToString()
producer.produce(topic=topic, value=value)
print("Сообщение успешно опубликовано")
print(f' [x] Содержимое заявки {application}')
# повтор через 3 секунды.
time.sleep(3)
Чтобы поработать с реестром схем, в котором есть 2 версии одной и той же схемы данных, в этом коде для отправляемых в Kafka сообщений случайным образом выбирается одна из двух возможных схем.

Просмотреть опубликованные сообщения можно в веб-интерфейсе AKHQ.

Таким образом, использование Protobuf-формата вместе с реестром схема Kafka обеспечивает эффективную сериализацию данных, централизованное управление схемами и проверку совместимости. Все это повышает производительность и надежность системы потоковой передачи событий.
Научитесь администрированию и эксплуатации Apache Kafka на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:


