Публикация и потребление AVRO-сообщений с реестром схем Apache Kafka: пример на Python

Публикация и потребление AVRO-сообщений с реестром схем Apache Kafka: пример на Python

    Версионирование схемы сообщений в формате AVRO с использованием реестра схем Apache Kafka и библиотеки confluent_kafka: практический пример на Python в Google Colab.

    Публикация сообщений в Kafka с использованием реестра схем

    Недавно я показывала пример использования реестра схем (Schema Registry) Apache Kafka при публикации сообщений. Сегодня рассмотрим версионирование схемы данных в формате AVRO, а также потребление сообщений. Реестр схем – это модуль Confluent для Apache Kafka, который позволяет централизовано управлять схемами данных полезной нагрузки сообщений в топиках. Определив схемы данных для полезной нагрузки и зарегистрировав ее в реестре, ее можно использовать повторно, облегчая код приложения-потребитель. Это обеспечивается благодаря отсутствию необходимости проверять структуру и формат потребленных данных. Также реестр схем снижает накладные расходы на сетевую передачу, передавая идентификатор схемы полезной нагрузки сообщения вместо ее полного определения. Подробный разбор того, как это работает, читайте в новой статье.

    Как обычно, мой экземпляр Kafka развернут в облачной платформе Upstash, которая в начале 2024 года включила поддержку реестра схем. Чтобы работать с ним для Python-клиентов необходимо использовать библиотеки kafka-schema-registry и confluent_kafka. Для публикации необходимо указать сериализатор, а для потребления сообщений – десериализатор, соответствующий формату. Для публикации сообщений используем сериализатор AvroSerializer для формата AVRO – линейно-ориентированный (строчный) формат хранения файлов, который сохраняет схему в независимом от реализации текстовом формате JSON. Соответственно, для потребления сообщений нужен десериализатор AvroDeserializer.

    Установим эти библиотеки и импортируем модули:

    #установка библиотек
    !pip install confluent_kafka
    !pip install kafka-schema-registry
    !pip install faker
    
    #импорт модулей
    import random
    from datetime import datetime
    import time
    from time import sleep
    
    from confluent_kafka import Producer
    from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField
    from confluent_kafka.schema_registry import SchemaRegistryClient
    from confluent_kafka.schema_registry.avro import AvroSerializer
    
    # Импорт модуля faker
    from faker import Faker

    Предположим, каждый 3 секунды приложение-продюсер может опубликовать сообщение с данными об имени и email клиента, а также id и отметкой времени согласно следующей схеме:

    {
      "type": "record",
      "name": "application",
      "namespace": "com.upstash",
      "fields": [
        {
          "name": "id",
          "type": "long"
        },
        {
          "name": "event_time",
          "type": "string"
        },
        {
          "name": "client",
          "type": "string"
        },
        {
          "name": "email",
          "type": "string"
        }
      ]
    }

    Или сообщение также может содержать сведения о номере телефона клиента и иметь другую схему данных:

    {
      "type": "record",
      "name": "application",
      "namespace": "com.upstash",
      "fields": [
        {
          "name": "id",
          "type": "long"
        },
        {
          "name": "event_time",
          "type": "string"
        },
        {
          "name": "client",
          "type": "string"
        },
        {
          "name": "email",
          "type": "string"
        },
        {
          "name": "phone",
          "type": "string"
        }
    
      ]
    }

    Код Python-приложения продюсера будет таким:

    # Определение схемы данных
    schema=''
    
    schema_1 = """{
    "type": "record",
    "name": "application",
    "namespace": "com.upstash",
    "fields": [
        {"name": "id", "type": "long"},
        {"name": "event_time", "type": "string"},
        {"name": "client", "type": "string"},
        {"name": "email", "type": "string"}
    ]
    }
    """
    
    schema_2 = """{
    "type": "record",
    "name": "application",
    "namespace": "com.upstash",
    "fields": [
        {"name": "id", "type": "long"},
        {"name": "event_time", "type": "string"},
        {"name": "client", "type": "string"},
        {"name": "email", "type": "string"},
        {"name": "phone", "type": "string"}
    ]
    }
    """
    
    topic='test'
    
    fake=Faker('ru_RU')
    
    #Создание продюсера
    producer = Producer({
        'bootstrap.servers': kafka_url,
        'sasl.mechanism': 'SCRAM-SHA-256',
        'security.protocol': 'SASL_SSL',
        'sasl.username': username,
        'sasl.password': password,
    })
    
    schema_registry_client = SchemaRegistryClient({
        'url': schemaregistry_url,
        'basic.auth.user.info': auths
    })
    
    string_serializer = StringSerializer('utf_8')
    
    id=0
    # Бесконечный цикл публикации данных.
    while True:
        
        schema=random.choice([schema_1, schema_2])
        avro_serializer = AvroSerializer(schema_registry_client, schema)
    
        start_time = time.time()
        id=id+1
        producer_publish_time = time.strftime("%m/%d/%Y %H:%M:%S", time.localtime(start_time))
        if schema==schema_1:
         application = {"id":id, "event_time":producer_publish_time, "client": fake.name(), "email": fake.free_email()}
        else:
         application = {"id":id, "event_time":producer_publish_time, "client": fake.name(), "email": fake.free_email(), "phone": fake.phone_number()}
    
        value = avro_serializer(application, SerializationContext(topic, MessageField.VALUE))
        future = producer.produce(topic=topic, value=value)
        print("Message sent successfully")
        print(f' [x] Payload {application }')
    
        # Повтор через 3 секунды.
        time.sleep(3)

    После запуска этого скрипта в Google Colab в реестре схем Kafka создается соответствующая схема значения, т.е. полезной нагрузки сообщения. Поскольку приложение-продюсер публикует данные с разными версиями схемы, в самом реестре схем сгенерируется новая версия схемы, что отобразится в GUI платформы Upstash. Чтобы потребители, использующие все ранее зарегистрированные схемы, могут читать данные, опубликованные продюсерами с использованием новой схемы, я изменила тип совместимости схемы на FORWARD_TRANSITIVE. Подробнее  про типы совместимости схем данных читайте в прошлой статье.

    Версионирование схемы данных в реестре схем Kafka
    Версионирование схемы данных в реестре схем Kafka

    Теперь посмотрим, как работает потребитель сообщений с использованием реестра схемы.

    Потребление сообщений из топика Kafka с реестром схем

    Чтобы понять, как работают вышеперечисленные пакеты, рассмотрим простой пример потребления AVRO-сообщений из Kafka, их парсинг и запись в столбцы Google-таблицы. Сперва необходимо установить библиотеки и импортировать модули. В интерактивной среде Google Colab это будет выглядеть так:

    #Установка библиотек и импорт модулей
    !pip install confluent_kafka
    !pip install kafka-schema-registry
    from confluent_kafka import Consumer
    
    from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField
    from confluent_kafka.schema_registry import SchemaRegistryClient
    from confluent_kafka.schema_registry.avro import AvroDeserializer
    
    #импорт модулей
    import json
    from google.colab import auth
    auth.authenticate_user()
    import gspread
    from google.auth import default
    creds, _ = default()

    Далее следует настроить десериализатор:

    #потребление AVRO с реестром схем
    avro_deserializer = AvroDeserializer(SchemaRegistryClient(
      {
        'url': schemaregistry_url,
        'basic.auth.user.info': auth_info
    }))

    Затем надо объявить потребитель Kafka и подписать его на топик:

    #объявление потребителя Kafka
    consumer = Consumer({
        'bootstrap.servers': kafka_url,
        'sasl.mechanism': 'SCRAM-SHA-256',
        'security.protocol': 'SASL_SSL',
        'sasl.username': username,
        'sasl.password': password,
        'group.id':  'gr_3',
        'auto.offset.reset': 'latest'
    })
    
    topic='test'
    
    consumer.subscribe([topic])

    Определим таблицу и лист Google Sheets для записи потребленных данных:

    #Google Sheets Autentificate
    gc = gspread.authorize(creds)
    
    #Открытие заранее созданного файла Гугл-таблицы по идентификатору 
    sh = gc.open_by_key(идентификатор гугл-таблицы')
    wks = sh.worksheet("test") #в какой лист гугл-таблиц будем записывать
    
    #начальный номер строки для записи данных
    x=1

    Наконец, запускаем бесконечный цикл потребления данных:

    try:
        while True:
            message = consumer.poll(1.0)
            if message is None:
                continue
            if message.error():
                print(message.error())
                continue
    
            # Создание контекста десериализации
            context = SerializationContext(message.topic(), MessageField.VALUE)
            try:
                # Десериализация сообщения
                deserialized_value = avro_deserializer(message.value(), context)
                if deserialized_value is not None:
                    print(f"Key: {message.key()}, Value: {deserialized_value} \n")
    
                    # Парсинг сообщения
                    fields = list(deserialized_value.keys()) # Получение ключей из словаря и преобразование их в список
                    
                    # обновление данных в Google Sheets
                    x += 1
                    a=0
                    for i in range(len(fields)):  #range() для создания итерируемого диапазона индексов
                      a=i+1
                      value = deserialized_value[str(fields[i])]  #использование квадратных скобок для доступа к значениям словаря
                      wks.update_cell(x, a,  value)
            except Exception as e:
                # логика обработки ошибок
                print(f"Error: {str(e)}")
    finally:
        consumer.close()

    В этом коде  потребитель с помощью метода poll() ожидает новые сообщения из Kafka. При получении сообщения проверяет наличие ошибок и десериализует сообщение с помощью десериализатора AvroDeserializer и парсит десериализованное сообщение, извлекая из него все поля. Извлеченные данные записываются в Google Sheets, начиная с первой строки и увеличивая номер строки после каждой записи.

    Если в процессе возникают ошибки, они отлавливаются блоком except и показываются в области вывода. После завершения работы или при возникновении исключения, потребитель закрывается с помощью метода close().

    Результаты потребления записываются в Google-таблицу:

    Результаты потребления данных с разными схемами
    Результаты потребления данных с разными схемами

    В GUI Upstash отображается график публикации и потребления сообщений.

    График публикации и потребления сообщений в GUI Upstash
    График публикации и потребления сообщений в GUI Upstash

    Узнайте больше про администрирование и эксплуатацию Apache Kafka для потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве: