Публикация protobuf-сообщений и использование реестра схем Kafka

Kafka курсы примеры обучение, Kafka для разработчика, Kafka примеры курсы обучение дата-инженеров, Школа Больших Данных Учебный Центр Коммерсант

Как публиковать в топик Kafka сообщения в формате Protobuf, используя реестр схем и библиотеку confluent-kafka. Пример Python-продюсера, конфигурационного YAML-файла для Docker-развертывания Kafka Confluent и тунелирование портов локального компьютера.

Подготовка инфраструктуры и определение схемы данных

Чтобы публиковать в свое Docker-развертывание Kafka Confluent данные, используя реестр схем, нужно сперва внести изменения в конфигурационный YAML-файл docker-compose.yml, последнюю версию которого я показывала в прошлой статье. Это необходимо сделать, поскольку в коде приложения-продюсера надо явно указывать веб-сервер Schema Registry. Для этого следует изменить порты, открыв порт 8085 на локальном хосте и отразив это в слушателях через конфигурацию SCHEMA_REGISTRY_LISTENERS.

После внесения изменений файл docker-compose.yml для запуска набора Docker-контейнеров с сервисами платформы Kafka Confluent в WSL выглядит так:

version: '3.6'

volumes:
  zookeeper-data:
    driver: local
  zookeeper-log:
    driver: local
  kafka-data:
    driver: local

services:
  akhq:
    # build:
    #   context: .
    image: tchiotludo/akhq
    restart: unless-stopped
    environment:
      AKHQ_CONFIGURATION: |
        akhq:
          connections:
            docker-kafka-server:
              properties:
                bootstrap.servers: "kafka:9092"
              schema-registry:
                url: "http://schema-registry:8085"
              connect:
                - name: "connect"
                  url: "http://connect:8083"
    ports:
      - 8080:8080
    links:
      - kafka
      - schema-registry

  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    restart: unless-stopped
    ports:
      - 2181:2181
    volumes:
      - zookeeper-data:/var/lib/zookeeper/data:Z
      - zookeeper-log:/var/lib/zookeeper/log:Z
    environment:
      ZOOKEEPER_CLIENT_PORT: '2181'
      ZOOKEEPER_ADMIN_ENABLE_SERVER: 'false'

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    restart: unless-stopped
    volumes:
      - kafka-data:/var/lib/kafka/data:Z
    environment:
      KAFKA_BROKER_ID: '0'
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_NUM_PARTITIONS: '12'
      KAFKA_COMPRESSION_TYPE: 'gzip'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: '1'
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: '1'
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: '1'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092,PLAINTEXT_EXT://serveo.net:39092'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAINTEXT_EXT:PLAINTEXT'
      KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: 'false'
      KAFKA_JMX_PORT: '9091'
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer'
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
    ports:
      - 39092:39092
    links:
      - zookeeper

  schema-registry:
    image: confluentinc/cp-schema-registry:7.5.0
    restart: unless-stopped
    depends_on:
        - kafka
    ports:
        - "8085:8085"
    environment:
        SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka:9092'
        SCHEMA_REGISTRY_HOST_NAME: 'schema-registry'
        SCHEMA_REGISTRY_LISTENERS: 'http://0.0.0.0:8085'
        SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: 'INFO'
        SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_ORIGIN: '*'
        SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_METHODS: 'GET,POST,PUT,DELETE,OPTIONS'

  connect:
    image: confluentinc/cp-kafka-connect:7.5.0
    restart: unless-stopped
    depends_on:
      - kafka
      - schema-registry
    ports:
      - 8083:8083
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'kafka:9092'
      CONNECT_REST_PORT: '8083'
      CONNECT_REST_LISTENERS: 'http://0.0.0.0:8083'
      CONNECT_REST_ADVERTISED_HOST_NAME: 'connect'
      CONNECT_CONFIG_STORAGE_TOPIC: '__connect-config'
      CONNECT_OFFSET_STORAGE_TOPIC: '__connect-offsets'
      CONNECT_STATUS_STORAGE_TOPIC: '__connect-status'
      CONNECT_GROUP_ID: 'kafka-connect'
      CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: 'true'
      CONNECT_KEY_CONVERTER: 'io.confluent.connect.avro.AvroConverter'
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8085'
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: 'true'
      CONNECT_VALUE_CONVERTER: 'io.confluent.connect.avro.AvroConverter'
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8085'
      CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '1'
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '1'
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '1'
      CONNECT_PLUGIN_PATH: ' /usr/share/java/'

  ksqldb:
    image: confluentinc/cp-ksqldb-server:7.5.0
    restart: unless-stopped
    depends_on:
      - kafka
      - connect
      - schema-registry
    ports:
      - 8088:8088
    environment:
      KSQL_BOOTSTRAP_SERVERS: 'kafka:9092'
      KSQL_LISTENERS: 'http://0.0.0.0:8088'
      KSQL_KSQL_SERVICE_ID: 'ksql'
      KSQL_KSQL_SCHEMA_REGISTRY_URL: 'http://schema-registry:8085'
      KSQL_KSQL_CONNECT_URL: 'http://connect:8083'
      KSQL_KSQL_SINK_PARTITIONS: '1'
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: '1'

  test-data:
    image: gradle:8-jdk11
    command: "gradle --no-daemon testInjectData -x installFrontend -x assembleFrontend"
    restart: unless-stopped
    working_dir: /app
    volumes:
      - ./:/app:z
    links:
      - kafka
      - schema-registry

  kafkacat:
    image: confluentinc/cp-kafkacat:7.1.14
    restart: unless-stopped
    depends_on:
      - kafka
    command:
      - bash
      - -c
      - |
        kafkacat -P -b kafka:9092 -t json << EOF
        {"_id":"5c4b2b45ab234c86955f0802","index":0,"guid":"d3637b06-9940-4958-9f82-639001c14c34"}
        {"_id":"5c4b2b459ffa9bb0c0c249e1","index":1,"guid":"08612fb5-40a7-45e5-9ff2-beb89a1b2835"}
        {"_id":"5c4b2b4545d7cbc7bf8b6e3e","index":2,"guid":"4880280a-cf8b-4884-881e-7b64ebf2afd0"}
        {"_id":"5c4b2b45dab381e6b3024c6d","index":3,"guid":"36d04c26-0dae-4a8e-a66e-bde9b3b6a745"}
        {"_id":"5c4b2b45d1103ce30dfe1947","index":4,"guid":"14d53f2c-def3-406f-9dfb-c29963fdc37e"}
        {"_id":"5c4b2b45d6d3b5c51d3dacb7","index":5,"guid":"a20cfc3a-934a-4b93-9a03-008ec651b5a4"}
        EOF

        kafkacat -P -b kafka:9092 -t csv << EOF
        1,Sauncho,Attfield,sattfield0@netlog.com,Male,221.119.13.246
        2,Luci,Harp,lharp1@wufoo.com,Female,161.14.184.150
        3,Hanna,McQuillan,hmcquillan2@mozilla.com,Female,214.67.74.80
        4,Melba,Lecky,mlecky3@uiuc.edu,Female,158.112.18.189
        5,Mordecai,Hurdiss,mhurdiss4@rambler.ru,Male,175.123.45.143
        EOF

        kafkacat -b kafka:9092 -o beginning -G json-consumer json
    links:
      - kafka

Также необходимо открыть порты. Как в прошлый раз, сделаю это с помощью SSH-сервера Serveo. Необходимо открыть 3 порта локального хоста:

  • 8080 для веб-интерфейса Kafka, в качестве которого используется AKHQ;
  • 39093 для TCP-трафика, чтобы обеспечить подключение клиентов Kafka к брокеру;
  • 8085 для веб-сервера реестра схем.
Тунелирование портов локального хоста
Тунелирование портов локального хоста

Создать схему в формате protobuf можно прямо в веб-интерфейсе AKHQ. Предположим, сперва в Kafka будут публиковаться данные о времени события, ФИО клиента и его email. Сообщение с этими полями данных в бинарном формате Protobuf будет выглядеть так:

syntax = "proto3";
message Application {
    string event_time = 2;
    string client = 3;
    string email = 4;
}

Создадим эту схему в реестре схем Kafka через интерфейс AKHQ:

Создание схемы данных в формате protobuf в реестре схем Kafka
Создание схемы данных в формате protobuf в реестре схем Kafka

Предположим, во 2-ой версии этой схемы данных добавлены поля с телефоном и рейтингом клиента. Отразим это в эволюции схемы данных, просто изменив ее:

syntax = "proto3";

message ApplicationWithPhone {
    string event_time = 2;
    string client = 3;
    string email = 4;
    string phone = 5;
    int64 rating = 6;
}
Создание схемы данных в формате protobuf в реестре схем Kafka
Создание схемы данных в формате protobuf в реестре схем Kafka

В Schema Registry есть несколько уровней совместимости (Compatibility Levels), выбор которых зависит от требований к изменениям в схемах. В рассматриваемом примере к существующей схеме данных добавляются новые поля (phone и rating). Эти изменения являются расширяющими (backward-compatible), то есть они не требуют изменений в существующих потребителях данных, которые могут просто игнорировать новые поля.

Для такой ситуации подойдут следующие уровни совместимости:

  • Backward – новая схема совместима со старыми данными, т.е. потребители, работающие со старой схемой, смогут читать данные, созданные по новой схеме;
  • Backward Transitive — то же самое, что и Backward, но применимо ко всем предыдущим версиям схемы, а не только к последней;
  • Full — совместимость в обоих направлениях (backward и forward), т.е. новая схема может быть использована как для старых данных, так и для новых потребителей;
  • Full Transitive — то же самое, что и Full, но применимо ко всем предыдущим версиям схемы.

Для полной универсальности надо выбирать Full или Full Transitive, но они требуют больше проверок, что может снижать производительность потоковой передачи. Подготовив инфраструктуру, далее напишем приложение-продюсер, которое будет публиковать данные в топик согласно представленным схемам.

Публикация сообщения в топик Kafka

Чтобы написать Python-код публикации данных в Kafka в IDE PyCharm, надо сперва установить необходимые библиотеки. В моем случае это confluent-kafka, protobuf, protoc-exe и Faker для генерации случайных данных. Сперва надо определить структуры данных в proto-файле. Это выглядит так:

syntax = "proto3";

message Application {
    string event_time = 2;
    string client = 3;
    string email = 4;
}

message ApplicationWithPhone {
    string event_time = 2;
    string client = 3;
    string email = 4;
    string phone = 5;
    int64 rating = 6;
}

Далее нужно сгенерировать из proto-файла файл _pb2.py, необходимый для сериализации и десериализации данных в формате protobuf. Для этого в командной строке нужно запустить компилятор protoc, который входит в состав библиотеки protoc-exe. Для этого надо выполнить в терминале команду

protoc --python_out=. application.proto

Сгенерированный файл _pb2.py включает в себя Python-классы, соответствующие сообщениям, определённым в proto-файле. В нем определены методы для преобразования сообщений в байтовый формат и обратно. Это упрощает передачу данных между различными компонентами системы, даже если они написаны на разных языках программирования, обеспечивает обратную и прямую совместимость данных, а также компактное и быстрое кодирование данных.

Генерация _pb2.py из proto-файла
Генерация _pb2.py из proto-файла

Содержимое сгенерированного файла application_pb2.py следующее:

# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler.  DO NOT EDIT!
# source: application.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)

_sym_db = _symbol_database.Default()




DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x11\x61pplication.proto\"@\n\x0b\x41pplication\x12\x12\n\nevent_time\x18\x02 \x01(\t\x12\x0e\n\x06\x63lient\x18\x03 \x01(\t\x12\r\n\x05\x65mail\x18\x04 \x01(\t\"h\n\x14\x41pplicationWithPhone\x12\x12\n\nevent_time\x18\x02 \x01(\t\x12\x0e\n\x06\x63lient\x18\x03 \x01(\t\x12\r\n\x05\x65mail\x18\x04 \x01(\t\x12\r\n\x05phone\x18\x05 \x01(\t\x12\x0e\n\x06rating\x18\x06 \x01(\x03\x62\x06proto3')

_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'application_pb2', _globals)
if _descriptor._USE_C_DESCRIPTORS == False:

  DESCRIPTOR._options = None
  _globals['_APPLICATION']._serialized_start=21
  _globals['_APPLICATION']._serialized_end=85
  _globals['_APPLICATIONWITHPHONE']._serialized_start=87
  _globals['_APPLICATIONWITHPHONE']._serialized_end=191
# @@protoc_insertion_point(module_scope)

Этот файл нельзя изменять вручную. Чтобы использовать структуры данных и функции из этого файла в коде своего приложения, их надо импортировать. Таким образом, Python-код моего приложения-продюсера, публикующего в Kafka сообщения определенной схемы данных будет следующим:

kafka_url="serveo.net:39092"

schemaregistry_url="https://kafkaconnect.serveo.net"

import random
import time
from confluent_kafka import Producer
from faker import Faker
import application_pb2  # импорт сгенерированных классов

fake = Faker('ru_RU')

# создание продюсера
producer = Producer({
    'bootstrap.servers': kafka_url,
})

id = 0
topic = 'top'

# бесконечный цикл публикации данных.
while True:
    schema = random.choice([1, 2])

    start_time = time.time()
    producer_publish_time = time.strftime("%m/%d/%Y %H:%M:%S", time.localtime(start_time))

    if schema == 1:
        application = application_pb2.Application(
            event_time=producer_publish_time,
            client=fake.name(),
            email=fake.free_email()
        )
        value = application.SerializeToString()
    else:
        application = application_pb2.ApplicationWithPhone(
            event_time=producer_publish_time,
            client=fake.name(),
            email=fake.free_email(),
            phone=fake.phone_number(),
            rating=random.randint(1, 10)  # пример рейтинга
        )
        value = application.SerializeToString()

    producer.produce(topic=topic, value=value)
    print("Сообщение успешно опубликовано")
    print(f' [x] Содержимое заявки {application}')

    # повтор через 3 секунды.
    time.sleep(3)

Чтобы поработать с реестром схем, в котором есть 2 версии одной и той же схемы данных, в этом коде для отправляемых в Kafka сообщений случайным образом выбирается одна из двух возможных схем.

Публикация данных в Kafka из PyCharm
Публикация данных в Kafka из PyCharm

Просмотреть опубликованные сообщения можно в веб-интерфейсе AKHQ.

Просмотр сообщений в топике в интерфейсе AKHQ
Просмотр сообщений в топике в интерфейсе AKHQ

Таким образом, использование Protobuf-формата вместе с реестром схема Kafka обеспечивает эффективную сериализацию данных, централизованное управление схемами и проверку совместимости. Все это повышает производительность и надежность системы потоковой передачи событий.

Научитесь администрированию и эксплуатации Apache Kafka на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту