Обработка ошибок в потребителе Kafka с try-except: пример

потребитель Kafka обработка исключений примеры курсы обучение, DLQ Kafka, очередь недоставленных сообщений в Kafka примеры курсы обучение, курсы по Apache Kafka, Kafka для разработчиков, обучение Kafka, Apache Kafka Dead Letter Queue, обучение большим данным, курсы Big Data, Школа Больших Данных Учебный Центр Коммерсант

Самый простой способ организовать обработку и логирование ошибок в приложении-потребителе, чтобы продолжать считывание из Apache Kafka, даже если продюсер изменил структуру полезной нагрузки сообщения.

Публикация данных в Kafka

Напомним, Apache Kafka, в отличие от RabbitMQ, не позволяет организовать очередь недоставленных сообщений (DLQ, Dead Letter Queue) средствами самой платформы, о чем мы писали здесь и здесь. Проверка корректности сообщения реализуется в коде приложения-потребителе с помощью блока try-cath для обработки ожидаемых или непредвиденных исключений. Рассмотрим как это сделать на примере простого  Python-приложения. Предположим, продюсер отправляет в Kafka заявки клиентов интернет-магазина с данными в JSON-документе следующей структуры:

{
  "$schema": "http://json-schema.org/draft-04/schema#",
  "type": "object",
  "properties": {
    "id": {
      "type": "string"
    },
    "name": {
      "type": "string"
    },
    "subject": {
      "type": "string"
    },
    "content": {
      "type": "string"
    }
  },
  "required": [
    "id",
    "name",
    "subject",
    "content"
  ]
}

Расшифруем каждое поле полезной нагрузки сообщения, т.е. ключ JSON-документа в таблице

Ключ JSON

Смысл

Пример значения

id

Идентификатор заявки

07/01/2023 10:54:46

name

Имя клиента

Anna Vichigova

subject

Тема заявки

question – для вопросов

app — для заявок на покупку товара

content

Содержание заявки: вопрос по работе магазина или желание купить товар

Hello, I have a question about delivery – пример вопроса по доставке

water 47 – пример заявки на покупку товара вода (water) в количестве 47 единиц

Код приложения-продюсера, которое каждые 3 секунды отправляет подобные сообщения в топик Kafka выглядит так:

####################################ячейка в Google Colab №1 - установка и импорт библиотек###########################################
#установка библиотек
!pip install kafka-python
!pip install faker

#импорт модулей
import json
import random
from datetime import datetime
import time
from time import sleep
from kafka import KafkaProducer

# Импорт модуля faker
from faker import Faker

####################################ячейка в Google Colab №2 - публикация сообщений в Kafka###########################################
# объявление продюсера Kafka
producer = KafkaProducer(
  bootstrap_servers=[kafka_url],
  sasl_mechanism='SCRAM-SHA-256',
  security_protocol='SASL_SSL',
  sasl_plain_username=username,
  sasl_plain_password=password,
  value_serializer=lambda v: json.dumps(v).encode('utf-8'),
  #batch_size=300
)

# Создание объекта Faker с использованием провайдера адресов для России
fake = Faker()
#fake.add_provider(Provider)

#списки полей в заявке
products = ['bred', 'garlic', 'oil', 'apples', 'water', 'soup', 'dress', 'tea', 'cacao', 'coffee', 'bananas', 'butter', 'eggs', 'oatmeal']
questions = ['payment', 'delivery', 'discount', 'vip', 'staff']

#бесконечный цикл публикации данных
while True:
  #подготовка данных для публикации в JSON-формате
  now=datetime.now()
  id=now.strftime("%m/%d/%Y %H:%M:%S")

  content = ''
  theme = ''
  corp = 0
  part = 0

  #подготовка списка возможных ключей маршрутизации (routing keys)
  corp = random.choice([1,0])

  if corp==1 :
    #name = random.choice(names_corp)
    name=fake.company()
    routing_keys = ['app' + '.company.' + name, 'question']
  else:
    #name = random.choice(names_fiz)
    name=fake.name()
    routing_keys = ['app', 'question']

  #случайный выбор одного из ключей маршрутизации (из routing keys)
  subject=random.choice(routing_keys)

  #добавление дополнительных данных для заголовка и тела сообщения в зависимости от темы заявки
  if subject=='question':
    theme = random.choice(questions)
    content = 'Hello, I have a question about ' + theme
    part=0 #все вопросы записывать в раздел 0
  else :
    theme ='app'
    content = random.choice(products) + ' ' + str(random.randint(0,100))
    if corp==1 :
      part=1 #все корпоративные заявки записывать в раздел 1
    else:
      part=2 #заявки от частных лиц записывать в раздел 2

  #задаем ключ сообщения для Kafka
  mes_key = str.encode(subject+name)

  #создаем полезную нагрузку в JSON
  data = {'id': id, 'name': name, 'subject': subject, 'content': content}

  #публикуем данные в Kafka
  future = producer.send('InputsTopic', value=data, partition=part)
  record_metadata = future.get(timeout=60)
  print(f' [x] Sent {record_metadata}')
  print(f' [x] Corp =  {corp}')
  print(f' [x] Payload {data}')

  # сериализуем данные в формат JSON и вычисляем размер сообщения
  message_size = len(json.dumps(data).encode('utf-8'))

  # выводим размер сообщения на консоль
  print(f"Message size: {message_size} bytes")

  #повтор через 3 секунды
  time.sleep(3)
####################################ячейка в Google Colab №3 - закрытие соединения###########################################
#Закрываем соединения
producer.close()

Как обычно, мой экземпляр Kafka развернут в облачной платформе Upstash, в GUI которой можно посмотреть публикуемые сообщения:

Kafka Upstash GUI опубликованные сообщения
Просмотр сообщений, опубликованных в Kafka, развернутой на платформе Upstash

Потребление данных

Приложение-потребитель считывает данные и заносит их в Google-таблицу. Распределение сообщений по разделам топика обусловлена бизнес-логикой: все вопросы должны попадать в раздел 0, заявки на покупку товара от корпоративных клиентов – в раздел 1, а от частных лиц – в раздел 2. Предположим, приложение-потребитель должно заносить корпоративные заявки на покупку товара на 2-ой лист Google-таблицы.

Kafka потребление
Запись потребленных из Kafka сообщений в Google-таблицу

Код приложения-потребителя выглядит следующим образом:

####################################ячейка в Google Colab №1 - установка и импорт библиотек###########################################
#установка библиотек
!pip install kafka-python

#импорт модулей
from google.colab import auth
auth.authenticate_user()
import gspread
from google.auth import default
creds, _ = default()

import json
import random
from datetime import datetime

from kafka import KafkaConsumer
from json import loads
from kafka.structs import TopicPartition

####################################ячейка в Google Colab №2 - потребление из Kafka###########################################
#объявление потребителя Kafka
consumer = KafkaConsumer(
  bootstrap_servers=[kafka_url],
  sasl_mechanism='SCRAM-SHA-256',
  security_protocol='SASL_SSL',
  sasl_plain_username=username,
  sasl_plain_password=password,
  group_id='gr1',
  auto_offset_reset='earliest',
  enable_auto_commit=True
)
topic='InputsTopic'
#consumer.unsubscribe()

#Google Sheets Autentificate
gc = gspread.authorize(creds)

# подписка потребителя на определенный раздел topic partition
#все вопросы лежат в разделе 0
#все корпоративные заявки лежат в разделе 1
#заявки от частных лиц лежат в разделе 2
part=1 #задание раздела
topic_partition = TopicPartition(topic, part)   # указываем имя топика и номер раздела
consumer.assign([topic_partition]) #назначаем потребителя на этот топик и раздел

#Открытие заранее созданного файла Гугл-таблицы по идентификатору (взять из его URL, например, у меня это https://docs.google.com/spreadsheets/d/1ZQuotMVhaOuOtnZ56mvd1zX-5JOhsXc1WTG6GTBjzzM)
sh = gc.open_by_key('1ZQuotMVhaOuOtnZ56mvd1zX-5JOhsXc1WTG6GTBjzzM')
wks = sh.worksheet("Partition_"+ str(part)) #в какой лист гугл-таблиц будем записывать

#начальный номер строки для записи данных
x=1

# обработка сообщений из Kafka
for message in consumer:
    try:
        # распаковка сообщения
        payload = message.value.decode("utf-8")
        data = json.loads(payload)

        # вывод сообщения в консоль
        print(f"Offset: {message.offset}, Value: {message.value}")
        print(consumer.position(topic_partition))
        print(f"Timestamp: {message.timestamp}, Value: {message.value}")
        timestamp = message.timestamp / 1000.0
        datetime_object = datetime.fromtimestamp(timestamp)
        formatted_timestamp = datetime_object.strftime('%Y-%m-%d %H:%M:%S.%f')
        print(f"Timestamp: {formatted_timestamp}, Value: {message.value}")
        print(data)

        # парсинг сообщения
        id = data['id']
        name = data['name']
        subject = data['subject']
        content = data['content']

        # вывод распарсенных данных в консоль
        print(f'Заявка № {id}, клиент {name}, тема: {subject}, содержимое: {content}')

        # обновление данных в Google Sheets
        x += 1
        wks.update_cell(x, 1, id)
        wks.update_cell(x, 2, name)
        wks.update_cell(x, 3, content)
    except Exception as e:
        # запись ошибок в лог-файл на Google Диске
        error_str = f"Error: {str(e)}, Offset: {message.offset}, Value: {message.value}\n"
        with open("dlq.txt", "a") as f:
            f.write(error_str)
        print(f"Error: {str(e)}")
###################################ячейка в Google Colab №3 - закрытие соединения###########################################
#отписываем потребителя и закрываем соединение
consumer.unsubscribe()
consumer.close()

В этом Python-коде в бесконечном цикле потребления данных добавлена обработка исключений, если структура данных полезной нагрузки сообщения будет отличаться от ожидаемой. Например, в GUI платформы Upstash я вручную опубликовала новое сообщение совершенно другой структуры, задав ключ распределения так, чтобы оно попало в раздел 1, куда публикуются заявки корпоративных клиентов:

GUI Kafka Upstash
Публикация сообщений в топик Kafka вручную через GUI платформы Upstash

Благодаря блоку try-except в коде приложения-продюсера он продолжил работать, а не остановился из-за возникшего исключения, когда структура данных не совпала с ожидаемой.

Google Colab Kafka Python consumer
Вывод потребленных из Kafka данных в Google Colab

Сообщения, которые потребитель не смог обработать, т.е. те, на которые возникло исключение, записываются в лог-файл под названием dlq.txt.

try-catch Kafka consumer exception
Запись некорректных сообщений в лог-файл

Позже можно просмотреть этот лог-файл с некорректными сообщениями, чтобы обработать его вручную или написать скрипт для автоматической обработки.

Таким образом, простое включение конструкции try-catch в цикл потребления сообщений позволяет избежать остановки конвейера потоковой обработки данных. Читайте в нашей новой статье, какая обработка исключений включена в релиз 3.5.0 для библиотеки Kafka Streams. А здесь вы узнаете, зачем ограничивать пропускную способность клиента Kafka и как это сделать с помощью механизма квотирования. 

Про спецификацию AsyncAPI для этих Python-приложений я рассказываю в этом материале.

Освойте администрирование и эксплуатацию Apache Kafka для потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту