Иногда возникает потребность в повторном чтении данных из Apache Kafka с определенного момента времени. Сегодня рассмотрим, как это сделать, написав простенький Python-скрипт потребления из раздела топика.
Публикация данных в Kafka
В качестве примера возьмем ранее рассмотренный в этой статье кейс приема потока обращений в интернет-магазин. Обращения могут представлять собой заявки от частных лиц и корпоративных клиентов на покупку товаров, а также вопросы по работе магазина, доставке или оплате. Все входящие обращения будут публиковаться в топик, который состоит из 3-х разделов. В раздел с номером 0 будут попадать все вопросы, в раздел с номером 1 – все корпоративные заявки, а в раздел 2- частные. Как обычно, я создала топик Kafka на инстансе, развернутом в облачной serverless-платформе Upstash. Топик называется InputsTopic и имеет 3 раздела.
Код продюсера, отправляющего клиентские обращения, написан на Python с использованием библиотеки kafka-python. В этом коде с использованием библиотеки Faker случайно формируются сообщения от частных или корпоративных клиентов и публикуются в топик каждые 3 секунды. Разделение сообщений по разделам реализуется с помощью явного указания раздела в методе продюсера send(). В этом случае ключ партиционирования игнорируется.
Сперва установим библиотеки и импортируем необходимые для работы модули:
####################################ячейка в Google Colab №1 - установка и импорт библиотек########################################### #установка библиотек !pip install kafka-python !pip install faker #импорт модулей import json import random from datetime import datetime import time from time import sleep from kafka import KafkaProducer # Импорт модуля faker from faker import Faker
Затем напишем сам скрипт формирования и публикации заявок:
####################################ячейка в Google Colab №2 - публикация сообщений в Kafka########################################### # объявление продюсера Kafka producer = KafkaProducer( bootstrap_servers=[kafka_url], sasl_mechanism='SCRAM-SHA-256', security_protocol='SASL_SSL', sasl_plain_username=username, sasl_plain_password=password, value_serializer=lambda v: json.dumps(v).encode('utf-8'), ) # Создание объекта Faker с использованием провайдера адресов для России fake = Faker() #списки полей в заявке products = ['bred', 'garlic', 'oil', 'apples', 'water', 'soup', 'dress', 'tea', 'cacao', 'coffee', 'bananas', 'butter', 'eggs', 'oatmeal'] questions = ['payment', 'delivery', 'discount', 'vip', 'staff'] #бесконечный цикл публикации данных while True: #подготовка данных для публикации в JSON-формате now=datetime.now() id=now.strftime("%m/%d/%Y %H:%M:%S") content = '' theme = '' corp = 0 part = 0 #подготовка списка возможных ключей маршрутизации (routing keys) corp = random.choice([1,0]) if corp==1 : #name = random.choice(names_corp) name=fake.company() routing_keys = ['app' + '.company.' + name, 'question'] else: #name = random.choice(names_fiz) name=fake.name() routing_keys = ['app', 'question'] #случайный выбор одного из ключей маршрутизации (из routing keys) subject=random.choice(routing_keys) #добавление дополнительных данных для заголовка и тела сообщения в зависимости от темы заявки if subject=='question': theme = random.choice(questions) content = 'Hello, I have a question about ' + theme part=0 #все вопросы записывать в раздел 0 else : theme ='app' content = random.choice(products) + ' ' + str(random.randint(0,100)) if corp==1 : part=1 #все корпоративные заявки записывать в раздел 1 else: part=2 #заявки от частных лиц записывать в раздел 2 #задаем ключ сообщения для Kafka mes_key = str.encode(subject+name) #создаем полезную нагрузку в JSON data = {'id': id, 'name': name, 'subject': subject, 'content': content} #публикуем данные в Kafka future = producer.send('InputsTopic', value=data, partition=part, key=mes_key) record_metadata = future.get(timeout=60) print(f' [x] Sent {record_metadata}') print(f' [x] Corp = {corp}') print(f' [x] Payload {data}') # сериализуем данные в формат JSON и вычисляем размер сообщения message_size = len(json.dumps(data).encode('utf-8')) # выводим размер сообщения на консоль print(f"Message size: {message_size} bytes") #повтор через 3 секунды time.sleep(3)
Запустив этот скрипт в Google Colab, получаем сообщения, опубликованные в Kafka согласно ранее заданной логике распределения по разделам:
Проверить, что сообщения опубликованы, можно в GUI-интерфейсе платформы Upstash:
Предположим, в приложении-потребителе, которое обрабатывает заявки от частных лиц, случился сбой и ему необходимо повторно считать данные с момента 17:30. Как это сделать, рассмотрим далее.
Повторное потребление с определенного момента
Несмотря на наличие поля отметки времени (timestamp), которое проставляется автоматически при публикации сообщения в Kafka, хронологический порядок потребления сообщения привязан не к этому параметру, а к смещению (offset). Поэтому в коде приложения-потребителя следует указать нужную позицию начала чтения сообщений из раздела, используя метод потребителя seek(), в параметрах которого указывается раздел и смещение. Например, считать с позиции смещения 300:
offset=300 # задание нужного смещения topic_partition = TopicPartition(topic, part) # указываем имя топика и номер раздела consumer.assign([topic_partition]) #назначаем потребителя на этот топик и раздел consumer.seek(topic_partition, offset)
Однако, на практике мы обычно знаем примерное время, но не точное смещение сообщений. Поэтому необходимо выполнить небольшое преобразование от момента, заданного в привычной людям форме, к смещению. Например, нужно считать данные по заявкам частных лиц, которые были опубликованы в Kafka после 17:30 МСК 11 июня 2023 года. За это отвечает следующий участок кода:
topic_partition = TopicPartition(topic, part) # указываем имя топика и номер раздела consumer.assign([topic_partition]) #назначаем потребителя на этот топик и раздел # определяем момент времени, с которого начнется чтение timestamp = datetime(2023, 6, 11, 17, 30, 0).timestamp() * 1000 # переводим в миллисекунды # определяем смещение для каждой партиции в топике offsets = consumer.offsets_for_times({topic_partition: timestamp}) # устанавливаем позицию для каждой партиции в топике if offsets[topic_partition] is not None: consumer.seek(topic_partition, offsets[topic_partition].offset)
Весь код приложения-потребителя, который считывает данные по заявкам частных лиц и заносит в определенный лист Google-таблицы, выглядит так:
####################################ячейка в Google Colab №1 - установка и импорт библиотек########################################### #установка библиотек !pip install kafka-python #импорт модулей from google.colab import auth auth.authenticate_user() import gspread from google.auth import default creds, _ = default() import json import random from datetime import datetime from kafka import KafkaConsumer from json import loads from kafka.structs import TopicPartition ####################################ячейка в Google Colab №2 - потребление из Kafka########################################### #объявление потребителя Kafka consumer = KafkaConsumer( bootstrap_servers=[kafka_url], sasl_mechanism='SCRAM-SHA-256', security_protocol='SASL_SSL', sasl_plain_username=username, sasl_plain_password=password, group_id='gr1', auto_offset_reset='earliest', enable_auto_commit=True ) topic='InputsTopic' #Google Sheets Autentificate gc = gspread.authorize(creds) # подписка потребителя на определенный раздел topic partition #все вопросы лежат в разделе 0 #все корпоративные заявки лежат в разделе 1 #заявки от частных лиц лежат в разделе 2 part=2 #задание раздела topic_partition = TopicPartition(topic, part) # указываем имя топика и номер раздела consumer.assign([topic_partition]) #назначаем потребителя на этот топик и раздел # определяем момент времени, с которого начнется чтение timestamp = datetime(2023, 6, 11, 17, 30, 0).timestamp() * 1000 # переводим в миллисекунды # определяем смещение для каждой партиции в топике offsets = consumer.offsets_for_times({topic_partition: timestamp}) # устанавливаем позицию для каждой партиции в топике if offsets[topic_partition] is not None: consumer.seek(topic_partition, offsets[topic_partition].offset) #Открытие заранее созданного файла Гугл-таблицы по идентификатору (взять из его URL, например, у меня это https://docs.google.com/spreadsheets/d/1ZQuotMVhaOuOtnZ56mvd1zX-5JOhsXc1WTG6GTBjzzM) sh = gc.open_by_key('1ZQuotMVhaOuOtnZ56mvd1zX-5JOhsXc1WTG6GTBjzzM') wks = sh.worksheet("Partition_"+ str(part)) #в какой лист гугл-таблиц будем записывать #начальный номер строки для записи данных x=1 #считывание из топика Kafka for message in consumer: payload=message.value.decode("utf-8") data=json.loads(payload) print(f"Offset: {message.offset}, Value: {message.value}") print(consumer.position(topic_partition)) # выводим текущее смещение print(f"Timestamp: {message.timestamp}, Value: {message.value}")# выводим текущую метку времени timestamp = message.timestamp / 1000.0 # переводим метку времени в секунды datetime_object = datetime.fromtimestamp(timestamp) # преобразуем метку времени в объект datetime formatted_timestamp = datetime_object.strftime('%Y-%m-%d %H:%M:%S.%f') # форматируем вывод метки времени print(f"Timestamp: {formatted_timestamp}, Value: {message.value}") #вывод исходных данных в консоль Goggle Colab print(data) #парсинг JSON-сообщения полезной нагрузки id=data['id'] name=data['name'] subject = data['subject'] content = data['content'] #вывод распарсенных данных в консоль Google Colab print('Заявка № ', id, ', клиент ', name, ', тема: ", subject, ', 'содержимое: ', content) #обновление данных в в Гугл-таблице global x #переход на следующую строку в гугл-таблицах x = x + 1 #запись данных в ячейки гугл-таблицы wks.update_cell(x,1,id) wks.update_cell(x,2,name) wks.update_cell(x,3,content)
Запуск этого скрипта в Google Colab выводит результаты считывания на консоль:
В гугл-таблице потребленные данные также отображаются:
Таким образом, используя всего несколько дополнительных методов библиотеки kafka-python, можно считать данные из Kafka повторно с нужного смещения или даже заданного момента времени.
Еще больше полезных приемов администрирования и эксплуатации Apache Kafka для потоковой аналитики больших данных вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Администрирование кластера Kafka
- Apache Kafka для инженеров данных
- Администрирование Arenadata Streaming Kafka