Самый простой способ организовать обработку и логирование ошибок в приложении-потребителе, чтобы продолжать считывание из Apache Kafka, даже если продюсер изменил структуру полезной нагрузки сообщения.
Публикация данных в Kafka
Напомним, Apache Kafka, в отличие от RabbitMQ, не позволяет организовать очередь недоставленных сообщений (DLQ, Dead Letter Queue) средствами самой платформы, о чем мы писали здесь и здесь. Проверка корректности сообщения реализуется в коде приложения-потребителе с помощью блока try-cath для обработки ожидаемых или непредвиденных исключений. Рассмотрим как это сделать на примере простого Python-приложения. Предположим, продюсер отправляет в Kafka заявки клиентов интернет-магазина с данными в JSON-документе следующей структуры:
{ "$schema": "http://json-schema.org/draft-04/schema#", "type": "object", "properties": { "id": { "type": "string" }, "name": { "type": "string" }, "subject": { "type": "string" }, "content": { "type": "string" } }, "required": [ "id", "name", "subject", "content" ] }
Расшифруем каждое поле полезной нагрузки сообщения, т.е. ключ JSON-документа в таблице
Ключ JSON |
Смысл |
Пример значения |
id |
Идентификатор заявки |
07/01/2023 10:54:46 |
name |
Имя клиента |
Anna Vichigova |
subject |
Тема заявки |
question – для вопросов app — для заявок на покупку товара |
content |
Содержание заявки: вопрос по работе магазина или желание купить товар |
Hello, I have a question about delivery – пример вопроса по доставке water 47 – пример заявки на покупку товара вода (water) в количестве 47 единиц |
Код приложения-продюсера, которое каждые 3 секунды отправляет подобные сообщения в топик Kafka выглядит так:
####################################ячейка в Google Colab №1 - установка и импорт библиотек########################################### #установка библиотек !pip install kafka-python !pip install faker #импорт модулей import json import random from datetime import datetime import time from time import sleep from kafka import KafkaProducer # Импорт модуля faker from faker import Faker ####################################ячейка в Google Colab №2 - публикация сообщений в Kafka########################################### # объявление продюсера Kafka producer = KafkaProducer( bootstrap_servers=[kafka_url], sasl_mechanism='SCRAM-SHA-256', security_protocol='SASL_SSL', sasl_plain_username=username, sasl_plain_password=password, value_serializer=lambda v: json.dumps(v).encode('utf-8'), #batch_size=300 ) # Создание объекта Faker с использованием провайдера адресов для России fake = Faker() #fake.add_provider(Provider) #списки полей в заявке products = ['bred', 'garlic', 'oil', 'apples', 'water', 'soup', 'dress', 'tea', 'cacao', 'coffee', 'bananas', 'butter', 'eggs', 'oatmeal'] questions = ['payment', 'delivery', 'discount', 'vip', 'staff'] #бесконечный цикл публикации данных while True: #подготовка данных для публикации в JSON-формате now=datetime.now() id=now.strftime("%m/%d/%Y %H:%M:%S") content = '' theme = '' corp = 0 part = 0 #подготовка списка возможных ключей маршрутизации (routing keys) corp = random.choice([1,0]) if corp==1 : #name = random.choice(names_corp) name=fake.company() routing_keys = ['app' + '.company.' + name, 'question'] else: #name = random.choice(names_fiz) name=fake.name() routing_keys = ['app', 'question'] #случайный выбор одного из ключей маршрутизации (из routing keys) subject=random.choice(routing_keys) #добавление дополнительных данных для заголовка и тела сообщения в зависимости от темы заявки if subject=='question': theme = random.choice(questions) content = 'Hello, I have a question about ' + theme part=0 #все вопросы записывать в раздел 0 else : theme ='app' content = random.choice(products) + ' ' + str(random.randint(0,100)) if corp==1 : part=1 #все корпоративные заявки записывать в раздел 1 else: part=2 #заявки от частных лиц записывать в раздел 2 #задаем ключ сообщения для Kafka mes_key = str.encode(subject+name) #создаем полезную нагрузку в JSON data = {'id': id, 'name': name, 'subject': subject, 'content': content} #публикуем данные в Kafka future = producer.send('InputsTopic', value=data, partition=part) record_metadata = future.get(timeout=60) print(f' [x] Sent {record_metadata}') print(f' [x] Corp = {corp}') print(f' [x] Payload {data}') # сериализуем данные в формат JSON и вычисляем размер сообщения message_size = len(json.dumps(data).encode('utf-8')) # выводим размер сообщения на консоль print(f"Message size: {message_size} bytes") #повтор через 3 секунды time.sleep(3) ####################################ячейка в Google Colab №3 - закрытие соединения########################################### #Закрываем соединения producer.close()
Как обычно, мой экземпляр Kafka развернут в облачной платформе Upstash, в GUI которой можно посмотреть публикуемые сообщения:
Потребление данных
Приложение-потребитель считывает данные и заносит их в Google-таблицу. Распределение сообщений по разделам топика обусловлена бизнес-логикой: все вопросы должны попадать в раздел 0, заявки на покупку товара от корпоративных клиентов – в раздел 1, а от частных лиц – в раздел 2. Предположим, приложение-потребитель должно заносить корпоративные заявки на покупку товара на 2-ой лист Google-таблицы.
Код приложения-потребителя выглядит следующим образом:
####################################ячейка в Google Colab №1 - установка и импорт библиотек########################################### #установка библиотек !pip install kafka-python #импорт модулей from google.colab import auth auth.authenticate_user() import gspread from google.auth import default creds, _ = default() import json import random from datetime import datetime from kafka import KafkaConsumer from json import loads from kafka.structs import TopicPartition ####################################ячейка в Google Colab №2 - потребление из Kafka########################################### #объявление потребителя Kafka consumer = KafkaConsumer( bootstrap_servers=[kafka_url], sasl_mechanism='SCRAM-SHA-256', security_protocol='SASL_SSL', sasl_plain_username=username, sasl_plain_password=password, group_id='gr1', auto_offset_reset='earliest', enable_auto_commit=True ) topic='InputsTopic' #consumer.unsubscribe() #Google Sheets Autentificate gc = gspread.authorize(creds) # подписка потребителя на определенный раздел topic partition #все вопросы лежат в разделе 0 #все корпоративные заявки лежат в разделе 1 #заявки от частных лиц лежат в разделе 2 part=1 #задание раздела topic_partition = TopicPartition(topic, part) # указываем имя топика и номер раздела consumer.assign([topic_partition]) #назначаем потребителя на этот топик и раздел #Открытие заранее созданного файла Гугл-таблицы по идентификатору (взять из его URL, например, у меня это https://docs.google.com/spreadsheets/d/1ZQuotMVhaOuOtnZ56mvd1zX-5JOhsXc1WTG6GTBjzzM) sh = gc.open_by_key('1ZQuotMVhaOuOtnZ56mvd1zX-5JOhsXc1WTG6GTBjzzM') wks = sh.worksheet("Partition_"+ str(part)) #в какой лист гугл-таблиц будем записывать #начальный номер строки для записи данных x=1 # обработка сообщений из Kafka for message in consumer: try: # распаковка сообщения payload = message.value.decode("utf-8") data = json.loads(payload) # вывод сообщения в консоль print(f"Offset: {message.offset}, Value: {message.value}") print(consumer.position(topic_partition)) print(f"Timestamp: {message.timestamp}, Value: {message.value}") timestamp = message.timestamp / 1000.0 datetime_object = datetime.fromtimestamp(timestamp) formatted_timestamp = datetime_object.strftime('%Y-%m-%d %H:%M:%S.%f') print(f"Timestamp: {formatted_timestamp}, Value: {message.value}") print(data) # парсинг сообщения id = data['id'] name = data['name'] subject = data['subject'] content = data['content'] # вывод распарсенных данных в консоль print(f'Заявка № {id}, клиент {name}, тема: {subject}, содержимое: {content}') # обновление данных в Google Sheets x += 1 wks.update_cell(x, 1, id) wks.update_cell(x, 2, name) wks.update_cell(x, 3, content) except Exception as e: # запись ошибок в лог-файл на Google Диске error_str = f"Error: {str(e)}, Offset: {message.offset}, Value: {message.value}\n" with open("dlq.txt", "a") as f: f.write(error_str) print(f"Error: {str(e)}") ###################################ячейка в Google Colab №3 - закрытие соединения########################################### #отписываем потребителя и закрываем соединение consumer.unsubscribe() consumer.close()
В этом Python-коде в бесконечном цикле потребления данных добавлена обработка исключений, если структура данных полезной нагрузки сообщения будет отличаться от ожидаемой. Например, в GUI платформы Upstash я вручную опубликовала новое сообщение совершенно другой структуры, задав ключ распределения так, чтобы оно попало в раздел 1, куда публикуются заявки корпоративных клиентов:
Благодаря блоку try-except в коде приложения-продюсера он продолжил работать, а не остановился из-за возникшего исключения, когда структура данных не совпала с ожидаемой.
Сообщения, которые потребитель не смог обработать, т.е. те, на которые возникло исключение, записываются в лог-файл под названием dlq.txt.
Позже можно просмотреть этот лог-файл с некорректными сообщениями, чтобы обработать его вручную или написать скрипт для автоматической обработки.
Таким образом, простое включение конструкции try-catch в цикл потребления сообщений позволяет избежать остановки конвейера потоковой обработки данных. Читайте в нашей новой статье, какая обработка исключений включена в релиз 3.5.0 для библиотеки Kafka Streams. А здесь вы узнаете, зачем ограничивать пропускную способность клиента Kafka и как это сделать с помощью механизма квотирования.
Про спецификацию AsyncAPI для этих Python-приложений я рассказываю в этом материале.
Освойте администрирование и эксплуатацию Apache Kafka для потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Apache Kafka для инженеров данных
- Администрирование кластера Kafka
- Администрирование Arenadata Streaming Kafka