Версионирование схемы сообщений в формате AVRO с использованием реестра схем Apache Kafka и библиотеки confluent_kafka: практический пример на Python в Google Colab.
Публикация сообщений в Kafka с использованием реестра схем
Недавно я показывала пример использования реестра схем (Schema Registry) Apache Kafka при публикации сообщений. Сегодня рассмотрим версионирование схемы данных в формате AVRO, а также потребление сообщений. Реестр схем – это модуль Confluent для Apache Kafka, который позволяет централизовано управлять схемами данных полезной нагрузки сообщений в топиках. Определив схемы данных для полезной нагрузки и зарегистрировав ее в реестре, ее можно использовать повторно, облегчая код приложения-потребитель. Это обеспечивается благодаря отсутствию необходимости проверять структуру и формат потребленных данных. Также реестр схем снижает накладные расходы на сетевую передачу, передавая идентификатор схемы полезной нагрузки сообщения вместо ее полного определения. Подробный разбор того, как это работает, читайте в новой статье.
Как обычно, мой экземпляр Kafka развернут в облачной платформе Upstash, которая в начале 2024 года включила поддержку реестра схем. Чтобы работать с ним для Python-клиентов необходимо использовать библиотеки kafka-schema-registry и confluent_kafka. Для публикации необходимо указать сериализатор, а для потребления сообщений – десериализатор, соответствующий формату. Для публикации сообщений используем сериализатор AvroSerializer для формата AVRO – линейно-ориентированный (строчный) формат хранения файлов, который сохраняет схему в независимом от реализации текстовом формате JSON. Соответственно, для потребления сообщений нужен десериализатор AvroDeserializer.
Установим эти библиотеки и импортируем модули:
#установка библиотек !pip install confluent_kafka !pip install kafka-schema-registry !pip install faker #импорт модулей import random from datetime import datetime import time from time import sleep from confluent_kafka import Producer from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField from confluent_kafka.schema_registry import SchemaRegistryClient from confluent_kafka.schema_registry.avro import AvroSerializer # Импорт модуля faker from faker import Faker
Предположим, каждый 3 секунды приложение-продюсер может опубликовать сообщение с данными об имени и email клиента, а также id и отметкой времени согласно следующей схеме:
{ "type": "record", "name": "application", "namespace": "com.upstash", "fields": [ { "name": "id", "type": "long" }, { "name": "event_time", "type": "string" }, { "name": "client", "type": "string" }, { "name": "email", "type": "string" } ] }
Или сообщение также может содержать сведения о номере телефона клиента и иметь другую схему данных:
{ "type": "record", "name": "application", "namespace": "com.upstash", "fields": [ { "name": "id", "type": "long" }, { "name": "event_time", "type": "string" }, { "name": "client", "type": "string" }, { "name": "email", "type": "string" }, { "name": "phone", "type": "string" } ] }
Код Python-приложения продюсера будет таким:
# Определение схемы данных schema='' schema_1 = """{ "type": "record", "name": "application", "namespace": "com.upstash", "fields": [ {"name": "id", "type": "long"}, {"name": "event_time", "type": "string"}, {"name": "client", "type": "string"}, {"name": "email", "type": "string"} ] } """ schema_2 = """{ "type": "record", "name": "application", "namespace": "com.upstash", "fields": [ {"name": "id", "type": "long"}, {"name": "event_time", "type": "string"}, {"name": "client", "type": "string"}, {"name": "email", "type": "string"}, {"name": "phone", "type": "string"} ] } """ topic='test' fake=Faker('ru_RU') #Создание продюсера producer = Producer({ 'bootstrap.servers': kafka_url, 'sasl.mechanism': 'SCRAM-SHA-256', 'security.protocol': 'SASL_SSL', 'sasl.username': username, 'sasl.password': password, }) schema_registry_client = SchemaRegistryClient({ 'url': schemaregistry_url, 'basic.auth.user.info': auths }) string_serializer = StringSerializer('utf_8') id=0 # Бесконечный цикл публикации данных. while True: schema=random.choice([schema_1, schema_2]) avro_serializer = AvroSerializer(schema_registry_client, schema) start_time = time.time() id=id+1 producer_publish_time = time.strftime("%m/%d/%Y %H:%M:%S", time.localtime(start_time)) if schema==schema_1: application = {"id":id, "event_time":producer_publish_time, "client": fake.name(), "email": fake.free_email()} else: application = {"id":id, "event_time":producer_publish_time, "client": fake.name(), "email": fake.free_email(), "phone": fake.phone_number()} value = avro_serializer(application, SerializationContext(topic, MessageField.VALUE)) future = producer.produce(topic=topic, value=value) print("Message sent successfully") print(f' [x] Payload {application }') # Повтор через 3 секунды. time.sleep(3)
После запуска этого скрипта в Google Colab в реестре схем Kafka создается соответствующая схема значения, т.е. полезной нагрузки сообщения. Поскольку приложение-продюсер публикует данные с разными версиями схемы, в самом реестре схем сгенерируется новая версия схемы, что отобразится в GUI платформы Upstash. Чтобы потребители, использующие все ранее зарегистрированные схемы, могут читать данные, опубликованные продюсерами с использованием новой схемы, я изменила тип совместимости схемы на FORWARD_TRANSITIVE. Подробнее про типы совместимости схем данных читайте в прошлой статье.
Теперь посмотрим, как работает потребитель сообщений с использованием реестра схемы.
Потребление сообщений из топика Kafka с реестром схем
Чтобы понять, как работают вышеперечисленные пакеты, рассмотрим простой пример потребления AVRO-сообщений из Kafka, их парсинг и запись в столбцы Google-таблицы. Сперва необходимо установить библиотеки и импортировать модули. В интерактивной среде Google Colab это будет выглядеть так:
#Установка библиотек и импорт модулей !pip install confluent_kafka !pip install kafka-schema-registry from confluent_kafka import Consumer from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField from confluent_kafka.schema_registry import SchemaRegistryClient from confluent_kafka.schema_registry.avro import AvroDeserializer #импорт модулей import json from google.colab import auth auth.authenticate_user() import gspread from google.auth import default creds, _ = default()
Далее следует настроить десериализатор:
#потребление AVRO с реестром схем avro_deserializer = AvroDeserializer(SchemaRegistryClient( { 'url': schemaregistry_url, 'basic.auth.user.info': auth_info }))
Затем надо объявить потребитель Kafka и подписать его на топик:
#объявление потребителя Kafka consumer = Consumer({ 'bootstrap.servers': kafka_url, 'sasl.mechanism': 'SCRAM-SHA-256', 'security.protocol': 'SASL_SSL', 'sasl.username': username, 'sasl.password': password, 'group.id': 'gr_3', 'auto.offset.reset': 'latest' }) topic='test' consumer.subscribe([topic])
Определим таблицу и лист Google Sheets для записи потребленных данных:
#Google Sheets Autentificate gc = gspread.authorize(creds) #Открытие заранее созданного файла Гугл-таблицы по идентификатору sh = gc.open_by_key(идентификатор гугл-таблицы') wks = sh.worksheet("test") #в какой лист гугл-таблиц будем записывать #начальный номер строки для записи данных x=1
Наконец, запускаем бесконечный цикл потребления данных:
try: while True: message = consumer.poll(1.0) if message is None: continue if message.error(): print(message.error()) continue # Создание контекста десериализации context = SerializationContext(message.topic(), MessageField.VALUE) try: # Десериализация сообщения deserialized_value = avro_deserializer(message.value(), context) if deserialized_value is not None: print(f"Key: {message.key()}, Value: {deserialized_value} \n") # Парсинг сообщения fields = list(deserialized_value.keys()) # Получение ключей из словаря и преобразование их в список # обновление данных в Google Sheets x += 1 a=0 for i in range(len(fields)): #range() для создания итерируемого диапазона индексов a=i+1 value = deserialized_value[str(fields[i])] #использование квадратных скобок для доступа к значениям словаря wks.update_cell(x, a, value) except Exception as e: # логика обработки ошибок print(f"Error: {str(e)}") finally: consumer.close()
В этом коде потребитель с помощью метода poll() ожидает новые сообщения из Kafka. При получении сообщения проверяет наличие ошибок и десериализует сообщение с помощью десериализатора AvroDeserializer и парсит десериализованное сообщение, извлекая из него все поля. Извлеченные данные записываются в Google Sheets, начиная с первой строки и увеличивая номер строки после каждой записи.
Если в процессе возникают ошибки, они отлавливаются блоком except и показываются в области вывода. После завершения работы или при возникновении исключения, потребитель закрывается с помощью метода close().
Результаты потребления записываются в Google-таблицу:
В GUI Upstash отображается график публикации и потребления сообщений.
Узнайте больше про администрирование и эксплуатацию Apache Kafka для потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Apache Kafka для инженеров данных
- Администрирование кластера Kafka
- Администрирование Arenadata Streaming Kafka