Как соединить данные из разных топиков Apache Kafka с помощью пары SQL-запросов: коннекторы, материализованные представления и потоковая база данных вместо полноценного потребителя. Подробная демонстрация запросов в RisingWave.
Проектирование и реализация потоковой агрегации данных из Kafka в RisingWave
Вчера я показывала пример потоковой агрегации данных из разных топиков Kafka с помощью Python-приложения, которое потребляет данные о пользователях и событиях их поведения на сайтах из разных топиков Kafka и соединяет их по ключевому идентификатору. Сегодня рассмотрим, как решить эту задачу быстрее с помощью потоковой базы данных RisingWave и коннекторов к Kafka. RisingWave – это распределенная реляционная СУБД, которая позволяет работать с потоками данных как с обыкновенными таблицами с помощью SQL-запросов к постепенно обновляемым, согласованным материализованным представлениям. Результаты выполнения запросов можно сохранять в самой RisingWave или передавать их во внешние хранилища, также используя коннекторы. Как это сделать, я показываю в новой статье.
Постановка задачи и исходные данные аналогичны вчерашней демонстрации: в один топик Kafka публикуются данные о пользователях, а в другой – о событиях их пользовательского поведения. Скрипты публикации данных и схемы полезной нагрузки приведены здесь. Архитектура потоковой системы схематично будет выглядеть так:
Для реализации этой потоковой системы в RisingWave надо сперва создать источники данных. Это делается с помощью SQL-инструкций. Для потребления данных из Kafka ее следует объявить как источник данных, используя одноименный коннектор. При подключении к источнику задаются не только учетные данные, но и формат полезной нагрузки, в моем случае это JSON. Код в RisingWave для создания источника данных о пользователях, публикуемых в топик Kafka под названием CorpAppsTopic, выглядит так:
CREATE SOURCE IF NOT EXISTS kafka_users ( event_timestamp TIMESTAMP, user_id VARCHAR, email VARCHAR, name VARCHAR ) WITH ( connector='kafka', topic='CorpAppsTopic', properties.bootstrap.server='your-host:your-port', scan.startup.mode='latest', properties.sasl.mechanism='SCRAM-SHA-256', properties.security.protocol='SASL_SSL', properties.sasl.username='your-username', properties.sasl.password='your-password' ) FORMAT PLAIN ENCODE JSON;
Аналогично с помощью SQL-запроса создадим источник данных о событиях пользовательского поведения, публикуемых в топик Kafka под названием test:
CREATE SOURCE IF NOT EXISTS kafka_events ( event_timestamp TIMESTAMP, user_id VARCHAR, page VARCHAR, event VARCHAR ) WITH ( connector='kafka', topic='test', properties.bootstrap.server='your-host:your-port', scan.startup.mode='latest', properties.sasl.mechanism='SCRAM-SHA-256', properties.security.protocol='SASL_SSL', properties.sasl.username='your-username', properties.sasl.password='your-password' ) FORMAT PLAIN ENCODE JSON;
После этого можно создать материализованное представление для агрегации данных по видам событий из источника данных о событиях пользовательского поведения:
CREATE MATERIALIZED VIEW IF NOT EXISTS events_statisics_mv AS SELECT event, COUNT(*) FROM kafka_events GROUP BY event;
Наконец, можно считать результаты агрегации:
SELECT * FROM events_statisics_mv;
Аналогично создадим материализованное представление для агрегации событий по веб-страницам, на которых пользователи совершают действия:
CREATE MATERIALIZED VIEW IF NOT EXISTS pages_statisics_mv AS SELECT page, COUNT(*) FROM kafka_events GROUP BY page;
И прочитаем данные из этого материализованного представления:
SELECT * FROM pages_statisics_mv;
Наконец, выведем информацию о пользователях и количестве событий, которые они совершили. Для этого сперва создадим соответствующее материализованное представление, которое соединяет данные из двух источников по ключу идентификатора пользователя и группирует их по этому же user_id.
CREATE MATERIALIZED VIEW IF NOT EXISTS users_statisics_mv AS SELECT kafka_events.user_id, email, name, COUNT(kafka_events.user_id) AS events_quantity FROM kafka_events JOIN kafka_users ON kafka_users.user_id=kafka_events.user_id GROUP BY kafka_events.user_id, email, name;
Для просмотра результатов потоковой агрегации данных из разных топиков Kafka в RisingWave надо всего лишь сделать SQL-запрос на выборку к соответствующему материализованному представлению.
SELECT * FROM users_statisics_mv;
RisingWave распределяет свои вычисления по легковесным потокам, называемым «стриминговыми акторами», которые одновременно выполняются на ядрах ЦП. Распределяя потоковые данные по ядрам, RisingWave обеспечивает мощные параллельные вычисления с высокой производительностью, масштабируемостью и пропускной способностью. Выполняя потоковые задания обработки данных, RisingWave представляет их как SQL-запросы над непрерывно генерируемыми данными с помощью так называемых узлов. Вычислительные узлы отвечают за прием данных из систем-источников, анализ и выполнение SQL-запросов, а также доставку данных в системы-приемники. Узлы-уплотнители управляют хранением данных и извлечением из объектного хранилища, а также сжимают данные для оптимизации их хранения в БД.
В RisingWave при выполнении плана потокового запроса он делится на несколько независимых фрагментов для обеспечения параллельного выполнения. Каждый фрагмент представляет собой цепочку операторов SQL. Под капотом он выполняется параллельными акторами. Степень параллелизма между фрагментами может быть разной. Параллелизм относится к технике одновременного выполнения нескольких операций или запросов базы данных для повышения производительности и повышения эффективности. Он включает в себя разделение рабочей нагрузки базы данных на более мелкие задачи и их параллельное выполнение на нескольких процессорах или машинах. Просмотреть граф потокового запроса как визуализацию материализованного представления можно в GUI системы.
Если сравнить время на разработку приложения-потребителя, которое выполняет потоковую агрегацию и соединение данных из разных топиков, с использованием коннекторов и материализованных представлений, они сильно отличаются. Использование RisingWave намного ускоряет потоковую обработку и позволяет получить результаты агрегации событий из Kafka намного быстрее и проще. Кроме того, выполнить подобные вычисления может даже непрограммирующий аналитик, просто знакомый с SQL. Поэтому неудивительно, что в последнее время потоковые базы данных, подобные RisingWave, становятся все более популярны. Подробно о таких системах я писала здесь. Впрочем, они имеют довольно ограниченные возможности, не позволяя работать со всеми источниками и приемниками данных, которые могут встретиться в реальной жизни. Таким образом, знакомство с подобными системами – полезный навык для дата-инженера, но он не исключает необходимости разрабатывать полноценные приложения-потребители.
Как делать это эффективно с учетом всех особенностей Apache Kafka, вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве: