Потоковая агрегация данных из Kafka на SQL в RisingWave: пример

Kafka курсы примеры обучение, Kafka для разработчика, Kafka SQL, Kafka примеры курсы обучение дата-инженеров, Школа Больших Данных Учебный Центр Коммерсант

Как соединить данные из разных топиков Apache Kafka с помощью пары SQL-запросов: коннекторы, материализованные представления и потоковая база данных вместо полноценного потребителя. Подробная демонстрация запросов в RisingWave.

Проектирование и реализация потоковой агрегации данных из Kafka в RisingWave

Вчера я показывала пример потоковой агрегации данных из разных топиков Kafka с помощью Python-приложения, которое потребляет данные о пользователях и событиях их поведения на сайтах из разных топиков Kafka и соединяет их по ключевому идентификатору. Сегодня рассмотрим, как решить эту задачу быстрее с помощью потоковой базы данных RisingWave и коннекторов к Kafka. RisingWave – это распределенная реляционная СУБД, которая позволяет работать с потоками данных как с обыкновенными таблицами с помощью SQL-запросов к постепенно обновляемым, согласованным материализованным представлениям. Результаты выполнения запросов можно сохранять в самой RisingWave или передавать их во внешние хранилища, также используя коннекторы. Как это сделать, я показываю в новой статье.

Постановка задачи и исходные данные аналогичны вчерашней демонстрации: в один топик Kafka публикуются данные о пользователях, а в другой – о событиях их пользовательского поведения. Скрипты публикации данных и схемы полезной нагрузки приведены здесь. Архитектура потоковой системы схематично будет выглядеть так:

Архитектура потоковой системы
Архитектура потоковой системы

Для реализации этой потоковой системы в RisingWave надо сперва создать источники данных. Это делается с помощью SQL-инструкций. Для потребления данных из Kafka ее следует объявить как источник данных, используя одноименный коннектор. При подключении к источнику задаются не только учетные данные, но и формат полезной нагрузки, в моем случае это JSON. Код в RisingWave для создания источника данных о пользователях, публикуемых в топик Kafka под названием CorpAppsTopic, выглядит так:

CREATE SOURCE IF NOT EXISTS kafka_users (
    event_timestamp TIMESTAMP,
    user_id VARCHAR,
    email VARCHAR,
    name VARCHAR
) WITH (
    connector='kafka',
    topic='CorpAppsTopic',
    properties.bootstrap.server='your-host:your-port',    
    scan.startup.mode='latest',
    properties.sasl.mechanism='SCRAM-SHA-256',
    properties.security.protocol='SASL_SSL',
    properties.sasl.username='your-username',
    properties.sasl.password='your-password'
) FORMAT PLAIN ENCODE JSON;

Аналогично с помощью SQL-запроса создадим источник данных о событиях пользовательского поведения, публикуемых в топик Kafka под названием test:

CREATE SOURCE IF NOT EXISTS kafka_events (
    event_timestamp TIMESTAMP,
    user_id VARCHAR,
    page VARCHAR,
    event VARCHAR
) WITH (
    connector='kafka',
    topic='test',
    properties.bootstrap.server='your-host:your-port',    
    scan.startup.mode='latest',
    properties.sasl.mechanism='SCRAM-SHA-256',
    properties.security.protocol='SASL_SSL',
    properties.sasl.username='your-username',
    properties.sasl.password='your-password'
) FORMAT PLAIN ENCODE JSON;

После этого можно создать материализованное представление для агрегации данных по видам событий из источника данных о событиях пользовательского поведения:

CREATE MATERIALIZED VIEW IF NOT EXISTS events_statisics_mv AS
SELECT event, COUNT(*)
FROM
   kafka_events
GROUP BY event;

Наконец, можно считать результаты агрегации:

SELECT * FROM events_statisics_mv;
Агрегация данных по видам событий
Агрегация данных по видам событий

Аналогично создадим материализованное представление для агрегации событий по веб-страницам, на которых пользователи совершают действия:

CREATE MATERIALIZED VIEW IF NOT EXISTS pages_statisics_mv AS
SELECT page, COUNT(*)
FROM
   kafka_events
GROUP BY page;

И прочитаем данные из этого материализованного представления:

SELECT * FROM pages_statisics_mv;
Агрегация событий по веб-страницам
Агрегация событий по веб-страницам

Наконец, выведем информацию о пользователях и количестве событий, которые они совершили. Для этого сперва создадим соответствующее материализованное представление, которое соединяет данные из двух источников по ключу идентификатора пользователя и группирует их по этому же user_id.

CREATE MATERIALIZED VIEW IF NOT EXISTS users_statisics_mv AS
SELECT kafka_events.user_id, email, name, COUNT(kafka_events.user_id) AS events_quantity 
FROM
   kafka_events
JOIN   
   kafka_users
ON kafka_users.user_id=kafka_events.user_id   
GROUP BY kafka_events.user_id, email, name;

Для просмотра результатов потоковой агрегации данных из разных топиков Kafka в RisingWave надо всего лишь сделать SQL-запрос на выборку к соответствующему материализованному представлению.

SELECT * FROM users_statisics_mv;
Потоковая агрегация и соединение данных по пользователям и их событиям
Потоковая агрегация и соединение данных по пользователям и их событиям

RisingWave распределяет свои вычисления по легковесным потокам, называемым «стриминговыми акторами», которые одновременно выполняются на ядрах ЦП. Распределяя потоковые данные по ядрам, RisingWave обеспечивает мощные параллельные вычисления с высокой производительностью, масштабируемостью и пропускной способностью. Выполняя потоковые задания обработки данных, RisingWave представляет их как SQL-запросы над непрерывно генерируемыми данными с помощью так называемых узлов. Вычислительные узлы отвечают за прием данных из систем-источников, анализ и выполнение SQL-запросов, а также доставку данных в системы-приемники. Узлы-уплотнители управляют хранением данных и извлечением из объектного хранилища, а также сжимают данные для оптимизации их хранения в БД.

В RisingWave при выполнении плана потокового запроса он делится на несколько независимых фрагментов для обеспечения параллельного выполнения. Каждый фрагмент представляет собой цепочку операторов SQL. Под капотом он выполняется параллельными акторами. Степень параллелизма между фрагментами может быть разной. Параллелизм относится к технике одновременного выполнения нескольких операций или запросов базы данных для повышения производительности и повышения эффективности. Он включает в себя разделение рабочей нагрузки базы данных на более мелкие задачи и их параллельное выполнение на нескольких процессорах или машинах. Просмотреть граф потокового запроса как визуализацию материализованного представления можно в GUI системы.

Граф потокового запроса как визуализация материализованного представления в RisingWave
Граф потокового запроса как визуализация материализованного представления в RisingWave

Если сравнить время на разработку приложения-потребителя, которое выполняет потоковую агрегацию и соединение данных из разных топиков, с использованием коннекторов и материализованных представлений,  они сильно отличаются. Использование RisingWave намного ускоряет потоковую обработку и позволяет получить результаты агрегации событий из Kafka намного быстрее и проще. Кроме того, выполнить подобные вычисления может даже непрограммирующий аналитик, просто знакомый с SQL. Поэтому неудивительно, что в последнее время потоковые базы данных, подобные RisingWave, становятся все более популярны. Подробно о таких системах я писала здесь. Впрочем, они имеют довольно ограниченные возможности, не позволяя работать со всеми источниками и приемниками данных, которые могут встретиться в реальной жизни. Таким образом, знакомство с подобными системами – полезный навык для дата-инженера, но он не исключает необходимости разрабатывать полноценные приложения-потребители.

Как делать это эффективно с учетом всех особенностей Apache Kafka, вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту