Сегодня я покажу простую демонстрацию потоковой агрегации данных из разных топиков Apache Kafka на примере Python-приложений для соединения событий пользовательского поведения с информацией о самом пользователе.
Постановка задачи
Рассмотрим примере кликстрима, т.е. потокового поступления данных о событиях пользовательского поведения на страницах сайта. Предположим, данные о самом пользователе: его идентификаторе, электронном адресе и имени попадают в топик под названием CorpAppsTopic. JSON-схема полезной нагрузки выглядит так:
{ "$schema": "http://json-schema.org/draft-07/schema#", "title": "Generated schema for Root", "type": "object", "properties": { "event_timestamp": { "type": "string" }, "user_id": { "type": "string" }, "email": { "type": "string" }, "name": { "type": "string" } }, "required": [ "event_timestamp", "user_id", "email", "name" ] }
Данные о непосредственного событиях пользовательского поведения, т.е. на какой странице сайта он что-то скачал, кликнул, просмотрел и пр., публикуются в топик test. Они тоже представлены в виде JSON-документов следующей структурой:
{ "$schema": "http://json-schema.org/draft-07/schema#", "title": "Generated schema for Root", "type": "object", "properties": { "event_timestamp": { "type": "string" }, "user_id": { "type": "string" }, "page": { "type": "string" }, "event": { "type": "string" } }, "required": [ "event_timestamp", "user_id", "page", "event" ] }
Данные в оба топика поступают непрерывным потоком, причем один и тот же пользователь может совершить от 1 до 10 событий на любых веб-страницах. Необходимо получить агрегированные данные о том, сколько событий совершил каждый пользователь и сколько видов событий совершено вообще.
Чтобы реализовать эту задачу, прежде всего надо написать код приложений-продюсеров, которые будут публиковать данные в Kafka. А, поскольку, данные о пользователе используются для генерации событий пользовательского поведения, одно и то же приложение будет одновременно и потреблять данные из одного топика Kafka, и публиковать их в другой. Схематично это будет выглядеть так:
Скрипт PlantUML для отрисовки диаграммы:
@startuml !include https://raw.githubusercontent.com/plantuml-stdlib/C4-PlantUML/master/C4_Container.puml LAYOUT_LEFT_RIGHT() title Потоковая агрегация событий пользовательского поведения из Kafka Container(app1C, "App1", "Python", "Публикация данных о пользователе") Container(app2C, "App2", "Python", "Потребление данных о пользователе и публикация данных о событиях пользовательского поведения") Container(app3C, "App3", "Python", "Потребление и потоковая агрегация данных о событиях пользовательского поведения") Container_Boundary(Kafka, "Apache Kafka") { ContainerQueue(kafkaCorpAppsTopic, "CorpAppsTopic", "Kafka", "Топик для регистрации пользователей") ContainerQueue(kafkaTest, "test", "Kafka", "Топик для регистрации событий пользовательского поведения") } Rel(app1C, kafkaCorpAppsTopic, "Публикация данных о пользователе: user_id, email, name") Rel(kafkaCorpAppsTopic, app2C, "Потребление\nданных") Rel(app2C, kafkaTest, "Публикация данных: user_id, page, event") Rel(kafkaTest, app3C, "Потребление данных") Rel(kafkaCorpAppsTopic, app3C, "Потребление данных") @enduml
Разобравшись с топологией потоковой системы, далее реализуем код для публикации и потребления данных.
Публикация и потребление данных в Kafka
Как обычно, экземпляр Kafka у меня развернут в облачной платформе Upstash, а писать приложения я буду на Python, используя библиотеку kafka-python. Код приложения-продюсера, которое каждые 3 секунды генерирует фейковые данные о пользователях, выглядит так:
#установка библиотек !pip install kafka-python !pip install faker #импорт модулей import json import random from datetime import datetime import time from time import sleep from kafka import KafkaProducer # Импорт модуля faker from faker import Faker from faker.providers.person.ru_RU import Provider # объявление продюсера Kafka producer = KafkaProducer( bootstrap_servers=[kafka_url], sasl_mechanism='SCRAM-SHA-256', security_protocol='SASL_SSL', sasl_plain_username=username, sasl_plain_password=password, value_serializer=lambda v: json.dumps(v).encode('utf-8'), #batch_size=300 ) topic='CorpAppsTopic' # создание объекта Faker с локализацией для России fake = Faker('ru_RU') fake.add_provider(Provider) #бесконечный цикл публикации данных while True: #подготовка данных для публикации в JSON-формате now=datetime.now() event_timestamp=now.strftime("%Y-%m-%d %H:%M:%S") email=fake.ascii_free_email() name=fake.name() user_id=str(hash(email)) # Создаем полезную нагрузку в JSON data = {"event_timestamp": event_timestamp, "user_id": user_id, "email": email, "name": name} #публикуем данные в Kafka future = producer.send(topic, value=data) print(f' [x] Опубликовано {data}') #повтор через 3 секунды time.sleep(3)
Идентификатор пользователя вычисляется как строка от результата применения хэш-функции к его емейлу.
Код приложения, которое потребляет эти данные о пользователе и случайным генерирует события на веб-страницах, выглядит так:
#установка библиотек !pip install kafka-python from kafka import KafkaProducer, KafkaConsumer !pip install faker #импорт модулей import json import random from datetime import datetime import time from time import sleep # Импорт модуля faker from faker import Faker # объявление продюсера Kafka producer = KafkaProducer( bootstrap_servers=[kafka_url], sasl_mechanism='SCRAM-SHA-256', security_protocol='SASL_SSL', sasl_plain_username=username, sasl_plain_password=password, value_serializer=lambda v: json.dumps(v).encode('utf-8'), #batch_size=300 ) topic_to='test' # Создание объекта Faker с использованием провайдера адресов для России fake = Faker() #списки веб-страниц k=100 #количество веб-страниц pages = [] # Инициализация списка для элементов заказа us=[] for i in range(k): wpage=fake.url() pages.append(wpage) #объявление потребителя Kafka для чтения данных пользователей consumer = KafkaConsumer( bootstrap_servers=[kafka_url], sasl_mechanism='SCRAM-SHA-256', security_protocol='SASL_SSL', sasl_plain_username=username, sasl_plain_password=password, group_id='a', auto_offset_reset='earliest', enable_auto_commit=True ) topic_from='CorpAppsTopic' #списки событий events=['click', 'scroll', 'submit', 'download', 'focus'] consumer.subscribe([topic_from]) #бесконечный цикл потребления и публикации данных for message in consumer: try: # распаковка сообщения payload = message.value.decode("utf-8") data_consumed = json.loads(payload) print(f' [x] Получено {data_consumed}') # парсинг сообщения user_id = data_consumed['user_id'] email = data_consumed['email'] name = data_consumed['name'] x=random.randint(1,10) #случайное количество событий for i in range(x): #подготовка данных для публикации в JSON-формате now=datetime.now() event_timestamp=now.strftime("%Y-%m-%d %H:%M:%S") page=random.choice(pages) event=random.choice(events) # Создаем полезную нагрузку в JSON data_publish = {"event_timestamp": event_timestamp, "user_id": user_id, "page": page, "event": event} #публикуем данные в Kafka future = producer.send(topic_to, value=data_publish) print(f' [x] Опубликовано {data_publish}') except Exception as e: # запись ошибок в лог-файл на Google Диске error_str = f"Error: {str(e)}, Offset: {message.offset}, Value: {message.value}\n" with open("dlq.txt", "a") as f: f.write(error_str) print(f"Error: {str(e)}")
Для потоковой агрегации данных из 2-х топиков надо написать еще одно приложение. Потреблять данные оно будет одновременно из разных топиков. Его код выглядит так:
# Установим необходимые библиотеки !pip install kafka-python from kafka import KafkaConsumer !pip install kafka-python pandas import json import pandas as pd # Объявление потребителя Kafka для чтения данных пользователей и событий consumer = KafkaConsumer( bootstrap_servers=[kafka_url], sasl_mechanism='SCRAM-SHA-256', security_protocol='SASL_SSL', sasl_plain_username=username, sasl_plain_password=password, group_id='yx', auto_offset_reset='earliest', enable_auto_commit=True ) # Подписываемся на топики topic_from = 'CorpAppsTopic' topic_to = 'test' consumer.subscribe([topic_from, topic_to]) # Инициализация списков для хранения сообщений messages_from = [] messages_to = [] # Бесконечный цикл потребления данных for message in consumer: try: # Распаковка сообщения payload = message.value.decode("utf-8") data = json.loads(payload) # Сортировка сообщений по топикам if message.topic == topic_from: messages_from.append(data) elif message.topic == topic_to: messages_to.append(data) # Преобразуем сообщения в DataFrame df_users = pd.DataFrame(messages_from) df_events = pd.DataFrame(messages_to) # Проверяем наличие столбцов 'id' и 'user_id' if 'user_id' in df_users.columns and 'user_id' in df_events.columns: # Объединяем данные по пользователям и событиям merged_df = pd.merge(df_events, df_users, left_on='user_id', right_on='user_id') # Агрегация по пользователям user_event_counts = merged_df.groupby(['user_id', 'name', 'email']).size().reset_index(name='event_count') # Агрегация по видам событий event_counts = df_events.groupby('event').size().reset_index(name='count') # Вывод результатов print("\nСобытия по пользователям:") print(user_event_counts) print("\nКоличество видов событий:") print(event_counts) except Exception as e: # Запись ошибок в лог-файл на Google Диске error_str = f"Error: {str(e)}, Offset: {message.offset}, Value: {message.value}\n" with open("dlq.txt", "a") as f: f.write(error_str) print(f"Error: {str(e)}")
Чтобы вычислить, сколько событий совершил каждый пользователь, надо соединить данные из топика CorpAppsTopic с данными из топика test по ключу user_id. А чтобы понять, сколько событий каждого вида совершено, нужно сделать агрегацию данных из топика test с группировкой по полю event.
Разумеется, все события публикации и потребления данных можно посмотреть в GUI платформы Upstash, на которой развернут экземпляр Kafka.
Таким образом, чтобы реализовать эту довольно простую с точки зрения бизнес-постановки задачу, пришлось писать полноценный потребитель. Вместо этого можно использовать коннекторы потоковой базы данных RisingWave, о которой я писала здесь. Как это сделать, покажу завтра в новой статье.
Освойте все тонкости работы с Apache Kafka на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве: