Самый простой способ организовать обработку и логирование ошибок в приложении-потребителе, чтобы продолжать считывание из Apache Kafka, даже если продюсер изменил структуру полезной нагрузки сообщения.
Публикация данных в Kafka
Напомним, Apache Kafka, в отличие от RabbitMQ, не позволяет организовать очередь недоставленных сообщений (DLQ, Dead Letter Queue) средствами самой платформы, о чем мы писали здесь и здесь. Проверка корректности сообщения реализуется в коде приложения-потребителе с помощью блока try-cath для обработки ожидаемых или непредвиденных исключений. Рассмотрим как это сделать на примере простого Python-приложения. Предположим, продюсер отправляет в Kafka заявки клиентов интернет-магазина с данными в JSON-документе следующей структуры:
{
"$schema": "http://json-schema.org/draft-04/schema#",
"type": "object",
"properties": {
"id": {
"type": "string"
},
"name": {
"type": "string"
},
"subject": {
"type": "string"
},
"content": {
"type": "string"
}
},
"required": [
"id",
"name",
"subject",
"content"
]
}
Расшифруем каждое поле полезной нагрузки сообщения, т.е. ключ JSON-документа в таблице
|
Ключ JSON |
Смысл |
Пример значения |
|
id |
Идентификатор заявки |
07/01/2023 10:54:46 |
|
name |
Имя клиента |
Anna Vichigova |
|
subject |
Тема заявки |
question – для вопросов app — для заявок на покупку товара |
|
content |
Содержание заявки: вопрос по работе магазина или желание купить товар |
Hello, I have a question about delivery – пример вопроса по доставке water 47 – пример заявки на покупку товара вода (water) в количестве 47 единиц |
Код приложения-продюсера, которое каждые 3 секунды отправляет подобные сообщения в топик Kafka выглядит так:
####################################ячейка в Google Colab №1 - установка и импорт библиотек###########################################
#установка библиотек
!pip install kafka-python
!pip install faker
#импорт модулей
import json
import random
from datetime import datetime
import time
from time import sleep
from kafka import KafkaProducer
# Импорт модуля faker
from faker import Faker
####################################ячейка в Google Colab №2 - публикация сообщений в Kafka###########################################
# объявление продюсера Kafka
producer = KafkaProducer(
bootstrap_servers=[kafka_url],
sasl_mechanism='SCRAM-SHA-256',
security_protocol='SASL_SSL',
sasl_plain_username=username,
sasl_plain_password=password,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
#batch_size=300
)
# Создание объекта Faker с использованием провайдера адресов для России
fake = Faker()
#fake.add_provider(Provider)
#списки полей в заявке
products = ['bred', 'garlic', 'oil', 'apples', 'water', 'soup', 'dress', 'tea', 'cacao', 'coffee', 'bananas', 'butter', 'eggs', 'oatmeal']
questions = ['payment', 'delivery', 'discount', 'vip', 'staff']
#бесконечный цикл публикации данных
while True:
#подготовка данных для публикации в JSON-формате
now=datetime.now()
id=now.strftime("%m/%d/%Y %H:%M:%S")
content = ''
theme = ''
corp = 0
part = 0
#подготовка списка возможных ключей маршрутизации (routing keys)
corp = random.choice([1,0])
if corp==1 :
#name = random.choice(names_corp)
name=fake.company()
routing_keys = ['app' + '.company.' + name, 'question']
else:
#name = random.choice(names_fiz)
name=fake.name()
routing_keys = ['app', 'question']
#случайный выбор одного из ключей маршрутизации (из routing keys)
subject=random.choice(routing_keys)
#добавление дополнительных данных для заголовка и тела сообщения в зависимости от темы заявки
if subject=='question':
theme = random.choice(questions)
content = 'Hello, I have a question about ' + theme
part=0 #все вопросы записывать в раздел 0
else :
theme ='app'
content = random.choice(products) + ' ' + str(random.randint(0,100))
if corp==1 :
part=1 #все корпоративные заявки записывать в раздел 1
else:
part=2 #заявки от частных лиц записывать в раздел 2
#задаем ключ сообщения для Kafka
mes_key = str.encode(subject+name)
#создаем полезную нагрузку в JSON
data = {'id': id, 'name': name, 'subject': subject, 'content': content}
#публикуем данные в Kafka
future = producer.send('InputsTopic', value=data, partition=part)
record_metadata = future.get(timeout=60)
print(f' [x] Sent {record_metadata}')
print(f' [x] Corp = {corp}')
print(f' [x] Payload {data}')
# сериализуем данные в формат JSON и вычисляем размер сообщения
message_size = len(json.dumps(data).encode('utf-8'))
# выводим размер сообщения на консоль
print(f"Message size: {message_size} bytes")
#повтор через 3 секунды
time.sleep(3)
####################################ячейка в Google Colab №3 - закрытие соединения###########################################
#Закрываем соединения
producer.close()
Как обычно, мой экземпляр Kafka развернут в облачной платформе Upstash, в GUI которой можно посмотреть публикуемые сообщения:

Потребление данных
Приложение-потребитель считывает данные и заносит их в Google-таблицу. Распределение сообщений по разделам топика обусловлена бизнес-логикой: все вопросы должны попадать в раздел 0, заявки на покупку товара от корпоративных клиентов – в раздел 1, а от частных лиц – в раздел 2. Предположим, приложение-потребитель должно заносить корпоративные заявки на покупку товара на 2-ой лист Google-таблицы.

Код приложения-потребителя выглядит следующим образом:
####################################ячейка в Google Colab №1 - установка и импорт библиотек###########################################
#установка библиотек
!pip install kafka-python
#импорт модулей
from google.colab import auth
auth.authenticate_user()
import gspread
from google.auth import default
creds, _ = default()
import json
import random
from datetime import datetime
from kafka import KafkaConsumer
from json import loads
from kafka.structs import TopicPartition
####################################ячейка в Google Colab №2 - потребление из Kafka###########################################
#объявление потребителя Kafka
consumer = KafkaConsumer(
bootstrap_servers=[kafka_url],
sasl_mechanism='SCRAM-SHA-256',
security_protocol='SASL_SSL',
sasl_plain_username=username,
sasl_plain_password=password,
group_id='gr1',
auto_offset_reset='earliest',
enable_auto_commit=True
)
topic='InputsTopic'
#consumer.unsubscribe()
#Google Sheets Autentificate
gc = gspread.authorize(creds)
# подписка потребителя на определенный раздел topic partition
#все вопросы лежат в разделе 0
#все корпоративные заявки лежат в разделе 1
#заявки от частных лиц лежат в разделе 2
part=1 #задание раздела
topic_partition = TopicPartition(topic, part) # указываем имя топика и номер раздела
consumer.assign([topic_partition]) #назначаем потребителя на этот топик и раздел
#Открытие заранее созданного файла Гугл-таблицы по идентификатору (взять из его URL, например, у меня это https://docs.google.com/spreadsheets/d/1ZQuotMVhaOuOtnZ56mvd1zX-5JOhsXc1WTG6GTBjzzM)
sh = gc.open_by_key('1ZQuotMVhaOuOtnZ56mvd1zX-5JOhsXc1WTG6GTBjzzM')
wks = sh.worksheet("Partition_"+ str(part)) #в какой лист гугл-таблиц будем записывать
#начальный номер строки для записи данных
x=1
# обработка сообщений из Kafka
for message in consumer:
try:
# распаковка сообщения
payload = message.value.decode("utf-8")
data = json.loads(payload)
# вывод сообщения в консоль
print(f"Offset: {message.offset}, Value: {message.value}")
print(consumer.position(topic_partition))
print(f"Timestamp: {message.timestamp}, Value: {message.value}")
timestamp = message.timestamp / 1000.0
datetime_object = datetime.fromtimestamp(timestamp)
formatted_timestamp = datetime_object.strftime('%Y-%m-%d %H:%M:%S.%f')
print(f"Timestamp: {formatted_timestamp}, Value: {message.value}")
print(data)
# парсинг сообщения
id = data['id']
name = data['name']
subject = data['subject']
content = data['content']
# вывод распарсенных данных в консоль
print(f'Заявка № {id}, клиент {name}, тема: {subject}, содержимое: {content}')
# обновление данных в Google Sheets
x += 1
wks.update_cell(x, 1, id)
wks.update_cell(x, 2, name)
wks.update_cell(x, 3, content)
except Exception as e:
# запись ошибок в лог-файл на Google Диске
error_str = f"Error: {str(e)}, Offset: {message.offset}, Value: {message.value}\n"
with open("dlq.txt", "a") as f:
f.write(error_str)
print(f"Error: {str(e)}")
###################################ячейка в Google Colab №3 - закрытие соединения###########################################
#отписываем потребителя и закрываем соединение
consumer.unsubscribe()
consumer.close()
В этом Python-коде в бесконечном цикле потребления данных добавлена обработка исключений, если структура данных полезной нагрузки сообщения будет отличаться от ожидаемой. Например, в GUI платформы Upstash я вручную опубликовала новое сообщение совершенно другой структуры, задав ключ распределения так, чтобы оно попало в раздел 1, куда публикуются заявки корпоративных клиентов:

Благодаря блоку try-except в коде приложения-продюсера он продолжил работать, а не остановился из-за возникшего исключения, когда структура данных не совпала с ожидаемой.

Сообщения, которые потребитель не смог обработать, т.е. те, на которые возникло исключение, записываются в лог-файл под названием dlq.txt.

Позже можно просмотреть этот лог-файл с некорректными сообщениями, чтобы обработать его вручную или написать скрипт для автоматической обработки.
Таким образом, простое включение конструкции try-catch в цикл потребления сообщений позволяет избежать остановки конвейера потоковой обработки данных. Читайте в нашей новой статье, какая обработка исключений включена в релиз 3.5.0 для библиотеки Kafka Streams. А здесь вы узнаете, зачем ограничивать пропускную способность клиента Kafka и как это сделать с помощью механизма квотирования.
Про спецификацию AsyncAPI для этих Python-приложений я рассказываю в этом материале.
Освойте администрирование и эксплуатацию Apache Kafka для потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Apache Kafka для инженеров данных
- Администрирование кластера Kafka
- Администрирование Arenadata Streaming Kafka
[elementor-template id=»13619″]


