Потоковая агрегация событий из Apache Kafka в RisingWave

архитектура распределенных систем паттерны примеры курсы обучение, архитектор Big Data, Kafka курсы примеры обучение, NoSQL обучение примеры курсы, архитектура данных с Kafka, проектирование потокового конвейера примеры курсы обучение, интеграция Kafka и Redis, Kafka примеры курсы обучение дата-инженеров, Школа Больших Данных Учебный Центр Коммерсант

Практическая демонстрация потоковой агрегации событий пользовательского поведений из Apache Kafka с записью результатов в Redis на платформе RisingWave: примеры Python-кода и конвейера из SQL-инструкций.

Постановка задачи

Одной из ярких тенденций в современном стеке Big Data сегодня стали платформы данных, которые позволяют интегрировать разные системы между собой, поддерживая как пакетную, так и потоковую передачу. Еще одним достоинство таких платформ является демократизация обработки данных: благодаря множеству готовых коннекторов они позволяют получить и передать данных, а также выполнить операции преобразования без разработки сложного кода. Основным инструментом пользователя в этом случае является SQL, который знаком каждому аналитику. Таким образом, строить пакетные и потоковые конвейеры обработки данных могут даже исследователи и аналитики без профессионального владения Java/Scala и других языков промышленной разработки.

Одной из таких платформ данных является RisingWave – распределенная потоковая реляционная СУБД, которая обеспечивает простую, эффективную и надежную обработку потоковых данных. RisingWave позволяет оперировать результатами потоковой обработки, создавая постепенно обновляемые, согласованные материализованные представления. Это очень упрощает создание приложений потоковой обработки, позволяя выражать сложную логику потоковой обработки в виде каскадных материализованных представлений. Также можно сохранять данные в самой RisingWave, устраняя необходимость передачи результатов во внешние приемники. Впрочем, сегодня я покажу, как передать результаты потоковой обработки во внешнюю базу данных.

В качестве практической задачи рассмотрим обработку событий пользовательского поведения. Предположим, в Apache Kafka сохраняются результаты действий пользователя на веб-страницах:

  • время события (event_timestamp);
  • логин пользователя (user);
  • адрес веб-страницы (page);
  • событие пользовательского поведения (event), такое как клик, прокрутка, заполнение и отправка формы ввода, загрузка файла, фокус на каком-либо элементе GUI.

Для генерации событий, как обычно, пишу небольшой Python-скрипт и запускаю его в Google Colab:

#установка библиотек
!pip install kafka-python
!pip install faker

#импорт модулей
import json
import random
from datetime import datetime
import time
from time import sleep
from kafka import KafkaProducer

# Импорт модуля faker
from faker import Faker
#################################### публикация сообщений в Kafka###########################################
# объявление продюсера Kafka
producer = KafkaProducer(
  bootstrap_servers=[kafka_url],
  sasl_mechanism='SCRAM-SHA-256',
  security_protocol='SASL_SSL',
  sasl_plain_username=username,
  sasl_plain_password=password,
  value_serializer=lambda v: json.dumps(v).encode('utf-8'),
  #batch_size=300
)
topic='test'
# Создание объекта Faker с использованием провайдера адресов для России
fake = Faker()

#списки веб-страниц
k=100 #количество веб-страниц
pages = [] # Инициализация списка для элементов заказа

for i in range(k):
 wpage=fake.url()
 pages.append(wpage)

#списки пользователей
u=1000 #количество пользователей
users=[]
for i in range(u):
 username=fake.ascii_free_email()
 users.append(username)

#списки событий
events=['click', 'scroll', 'submit', 'download', 'focus']

#бесконечный цикл публикации данных
while True:
  #подготовка данных для публикации в JSON-формате
  now=datetime.now()
  event_timestamp=now.strftime("%Y-%m-%d %H:%M:%S")

  user=random.choice(users)
  page=random.choice(pages)
  event=random.choice(events)

  # Создаем полезную нагрузку в JSON
  data = {"event_timestamp": event_timestamp, "user": user, "page": page, "event": event}

  #публикуем данные в Kafka
  future = producer.send(topic, value=data)
  record_metadata = future.get(timeout=60)
  print(f' [x] Sent {record_metadata}')
  print(f' [x] Payload {data}')

  #повтор через 3 секунды
  time.sleep(3)
Публикация событий пользовательского поведения в Apache Kafka
Публикация событий пользовательского поведения в Apache Kafka

Предположим, необходимо в режиме онлайн знать актуальную информацию о количестве событий каждого вида: сколько кликов было сделано, сколько загрузок и т.д. Эту агрегацию по полю event будем вычислять с помощью SQL-запроса в RisingWave и передавать в NoSQL-хранилище типа ключ-значение Redis.

Архитектура потокового конвейера
Архитектура потокового конвейера

Реализация потокового конвейера с Apache Kafka и Redis

В отличие от других подобных платформ, например, Decodable, о работе с которой я писала здесь, в RisingWave источники и приемники данных нужно создавать с помощью SQL-инструкций, а не в GUI. Для чтения данных из Kafka необходимо объявить ее как источник данных, используя одноименный коннектор. Поскольку данные в Kafka записываются в формате JSON, это тоже необходимо указать при подключении к источнику, которое позволит обращаться к потребленным данным как к записям таблицы user_events. Код в RisingWave для этого выглядит так:

CREATE SOURCE IF NOT EXISTS user_events (
    event_timestamp TIMESTAMP,
    user VARCHAR,
    page VARCHAR,
    event VARCHAR
) WITH (
    connector='kafka',
    topic='test',
    properties.bootstrap.server='your-host:your-port',
    scan.startup.mode='latest',
    properties.sasl.mechanism='SCRAM-SHA-256',
    properties.security.protocol='SASL_SSL',
    properties.sasl.username='your-username',
    properties.sasl.password='your-password'
) FORMAT PLAIN ENCODE JSON;

Поскольку RisingWave оперирует материализованными представлениями, создадим материализованное представление event_statistics_mv, сразу выполнив количественную агрегацию по событиям пользовательского поведения:

CREATE MATERIALIZED VIEW IF NOT EXISTS events_statisics_mv AS
SELECT event, COUNT(*)
FROM
    user_events
GROUP BY event;

Далее подключимся к Redis, объявив ключи и значения как строки:

CREATE SINK IF NOT EXISTS redis_sink
FROM events_statisics_mv WITH (
    connector = 'redis',
    primary_key = 'event',
    redis.url= 'redis://your-username:your-password@your-host:your-host'
) FORMAT PLAIN ENCODE TEMPLATE (
    force_append_only='true',
    key_format = '{event}',
    value_format = '{count}'
);

После этого выведем агрегированную информацию о количестве событий каждого типа:

SELECT * FROM events_statisics_mv;

Запуск всех этих инструкции в RisingWave  приводит к созданию источника, приемника и материализованного представления:

Реализация потоковой агрегации в RisingWave
Реализация потоковой агрегации в RisingWave

Также в интерфейсе платформы можно просмотреть визуализацию агрегированных метрик.

Визуализация метрик
Визуализация метрик

Агрегированные данные успешно передаются в Redis, что также можно увидеть в GUI этой СУБД, которая, как и Kafka, у меня развернута на serverless-платформе Upstash.

NoSQL Redis
Отображение записанных результатов агрегации в NoSQL-хранилище Redis

В следующий раз я покажу другой пример потоковой обработки событий из Kafka с помощью SQL-инструкций на платформе RisingWave, выполнив соединение нескольких потоков.

Узнайте больше про применение Apache Kafka в проектировании надежных конвейеров потоковой обработки данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту