Практическая демонстрация потоковой агрегации событий пользовательского поведений из Apache Kafka с записью результатов в Redis на платформе RisingWave: примеры Python-кода и конвейера из SQL-инструкций.
Постановка задачи
Одной из ярких тенденций в современном стеке Big Data сегодня стали платформы данных, которые позволяют интегрировать разные системы между собой, поддерживая как пакетную, так и потоковую передачу. Еще одним достоинство таких платформ является демократизация обработки данных: благодаря множеству готовых коннекторов они позволяют получить и передать данных, а также выполнить операции преобразования без разработки сложного кода. Основным инструментом пользователя в этом случае является SQL, который знаком каждому аналитику. Таким образом, строить пакетные и потоковые конвейеры обработки данных могут даже исследователи и аналитики без профессионального владения Java/Scala и других языков промышленной разработки.
Одной из таких платформ данных является RisingWave – распределенная потоковая реляционная СУБД, которая обеспечивает простую, эффективную и надежную обработку потоковых данных. RisingWave позволяет оперировать результатами потоковой обработки, создавая постепенно обновляемые, согласованные материализованные представления. Это очень упрощает создание приложений потоковой обработки, позволяя выражать сложную логику потоковой обработки в виде каскадных материализованных представлений. Также можно сохранять данные в самой RisingWave, устраняя необходимость передачи результатов во внешние приемники. Впрочем, сегодня я покажу, как передать результаты потоковой обработки во внешнюю базу данных.
В качестве практической задачи рассмотрим обработку событий пользовательского поведения. Предположим, в Apache Kafka сохраняются результаты действий пользователя на веб-страницах:
- время события (event_timestamp);
- логин пользователя (user);
- адрес веб-страницы (page);
- событие пользовательского поведения (event), такое как клик, прокрутка, заполнение и отправка формы ввода, загрузка файла, фокус на каком-либо элементе GUI.
Для генерации событий, как обычно, пишу небольшой Python-скрипт и запускаю его в Google Colab:
#установка библиотек
!pip install kafka-python
!pip install faker
#импорт модулей
import json
import random
from datetime import datetime
import time
from time import sleep
from kafka import KafkaProducer
# Импорт модуля faker
from faker import Faker
#################################### публикация сообщений в Kafka###########################################
# объявление продюсера Kafka
producer = KafkaProducer(
bootstrap_servers=[kafka_url],
sasl_mechanism='SCRAM-SHA-256',
security_protocol='SASL_SSL',
sasl_plain_username=username,
sasl_plain_password=password,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
#batch_size=300
)
topic='test'
# Создание объекта Faker с использованием провайдера адресов для России
fake = Faker()
#списки веб-страниц
k=100 #количество веб-страниц
pages = [] # Инициализация списка для элементов заказа
for i in range(k):
wpage=fake.url()
pages.append(wpage)
#списки пользователей
u=1000 #количество пользователей
users=[]
for i in range(u):
username=fake.ascii_free_email()
users.append(username)
#списки событий
events=['click', 'scroll', 'submit', 'download', 'focus']
#бесконечный цикл публикации данных
while True:
#подготовка данных для публикации в JSON-формате
now=datetime.now()
event_timestamp=now.strftime("%Y-%m-%d %H:%M:%S")
user=random.choice(users)
page=random.choice(pages)
event=random.choice(events)
# Создаем полезную нагрузку в JSON
data = {"event_timestamp": event_timestamp, "user": user, "page": page, "event": event}
#публикуем данные в Kafka
future = producer.send(topic, value=data)
record_metadata = future.get(timeout=60)
print(f' [x] Sent {record_metadata}')
print(f' [x] Payload {data}')
#повтор через 3 секунды
time.sleep(3)

Предположим, необходимо в режиме онлайн знать актуальную информацию о количестве событий каждого вида: сколько кликов было сделано, сколько загрузок и т.д. Эту агрегацию по полю event будем вычислять с помощью SQL-запроса в RisingWave и передавать в NoSQL-хранилище типа ключ-значение Redis.

Реализация потокового конвейера с Apache Kafka и Redis
В отличие от других подобных платформ, например, Decodable, о работе с которой я писала здесь, в RisingWave источники и приемники данных нужно создавать с помощью SQL-инструкций, а не в GUI. Для чтения данных из Kafka необходимо объявить ее как источник данных, используя одноименный коннектор. Поскольку данные в Kafka записываются в формате JSON, это тоже необходимо указать при подключении к источнику, которое позволит обращаться к потребленным данным как к записям таблицы user_events. Код в RisingWave для этого выглядит так:
CREATE SOURCE IF NOT EXISTS user_events (
event_timestamp TIMESTAMP,
user VARCHAR,
page VARCHAR,
event VARCHAR
) WITH (
connector='kafka',
topic='test',
properties.bootstrap.server='your-host:your-port',
scan.startup.mode='latest',
properties.sasl.mechanism='SCRAM-SHA-256',
properties.security.protocol='SASL_SSL',
properties.sasl.username='your-username',
properties.sasl.password='your-password'
) FORMAT PLAIN ENCODE JSON;
Поскольку RisingWave оперирует материализованными представлениями, создадим материализованное представление event_statistics_mv, сразу выполнив количественную агрегацию по событиям пользовательского поведения:
CREATE MATERIALIZED VIEW IF NOT EXISTS events_statisics_mv AS
SELECT event, COUNT(*)
FROM
user_events
GROUP BY event;
Далее подключимся к Redis, объявив ключи и значения как строки:
CREATE SINK IF NOT EXISTS redis_sink
FROM events_statisics_mv WITH (
connector = 'redis',
primary_key = 'event',
redis.url= 'redis://your-username:your-password@your-host:your-host'
) FORMAT PLAIN ENCODE TEMPLATE (
force_append_only='true',
key_format = '{event}',
value_format = '{count}'
);
После этого выведем агрегированную информацию о количестве событий каждого типа:
SELECT * FROM events_statisics_mv;
Запуск всех этих инструкции в RisingWave приводит к созданию источника, приемника и материализованного представления:

Также в интерфейсе платформы можно просмотреть визуализацию агрегированных метрик.

Агрегированные данные успешно передаются в Redis, что также можно увидеть в GUI этой СУБД, которая, как и Kafka, у меня развернута на serverless-платформе Upstash.

В следующий раз я покажу другой пример потоковой обработки событий из Kafka с помощью SQL-инструкций на платформе RisingWave, выполнив соединение нескольких потоков.
Узнайте больше про применение Apache Kafka в проектировании надежных конвейеров потоковой обработки данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:


