Потоковая агрегация и передача данных из Kafka в Redis через SQL-запросы в RisingWave

Kafka курсы примеры обучение, Kafka для разработчика, Kafka SQL, Kafka примеры курсы обучение дата-инженеров, Школа Больших Данных Учебный Центр Коммерсант

Как SQL-запросами соединить потоки из разных топиков Apache Kafka и отправить результаты в Redis: демонстрация ETL-конвейера на материализованных представлениях в RisingWave.

Постановка задачи и проектирование потоковой системы

Продолжая недавний пример потоковой агрегации данных из разных топиков Kafka с помощью SQL-запросов, сегодня расширим потоковый конвейер в RisingWave, добавив приемник данных – key-value хранилище Redis. RisingWave – это распределенная реляционная СУБД, которая позволяет работать с потоками данных как с обычными таблицами через типовые SQL-запросы. SQL-запросы выполняются над постепенно обновляемым, согласованными материализованными представлениям, которые принимают данные из внешних источников. Результаты выполнения этих запросов можно передавать их во внешние хранилища с помощью коннекторов.

Предположим, в один топик Kafka публикуются данные о пользователях, а в другой – о событиях их пользовательского поведения. Source-коннектор RisingWave потребляет данные из топиков Kafka, записывая их в потоковые источники, к которым создается SQL-запрос на соединение данных по идентификатору пользователя и сохраняется в материализованном представлении. Это материализованное представление агрегирует данные о пользователе и событиях его поведения на веб-страницах, сохраняя их как JSON-документы в полях этого табличного представления.

Архитектура потоковой системы схематично будет выглядеть так:

Архитектура потокового ETL-конвейера
Архитектура потокового ETL-конвейера

Скрипты публикации данных и схемы полезной нагрузки приведены здесь, а SQL-запросы потребления данных из Kafka и создания материализованных представлений по пользователям и событиям приведены в прошлой статье.

Реализация потокового ETL-конвейера

Для соединения данных из разных топиков Kafka и выполнения преобразований над ними в RisingWave надо создать материализованное представление:

CREATE MATERIALIZED VIEW user_events_mv AS
SELECT 
    u.user_id,
    JSONB_BUILD_OBJECT(
        'email', u.email,
        'name', u.name,
        'registrated', u.event_timestamp
    ) AS user,
    JSONB_AGG(
        JSONB_BUILD_OBJECT(
            'event_timestamp', e.event_timestamp,
            'event', e.event,
            'page', e.page
        )
    ) AS events
FROM 
    kafka_users u
LEFT JOIN 
    kafka_events e 
ON 
    u.user_id = e.user_id
GROUP BY 
    u.user_id, u.email, u.name, u.event_timestamp;

Чтобы передавать во внешнюю систему данные о всех зарегистрированных пользователях, даже если они не совершали никаких событий, в этом запросе используется LEFT JOIN. Группировка данных по user_id, email, name и event_timestamp, нужна чтобы обеспечить правильное агрегирование событий для каждого пользователя. Результат SELECT-запроса к матпредставлению user_events_mv в RisingWave выглядит так:

Агрегированные и преобразованные данные из разных топиков Kafka
Агрегированные и преобразованные данные из разных топиков Kafka

Чтобы передать данные в Redis, надо сперва создать приемник данных (sink), используя соответствующий коннектор, а также указав, что будет ключом, а что – значением. RisingWave передает данные в Redis в виде строк, хранящих пары ключ-значение в указанном формате (JSON или TEMPLATE), поэтому первичный ключ всегда должен быть указан. В моем случае логично использовать в качестве ключа идентификатор пользователя user_id, а вся остальная информация о пользователе и его событиях будет значением. И ключ и значение в redis будут являться строками. SQL-запрос на передачу данных в Redis из RisingWave выглядит так:

CREATE SINK IF NOT EXISTS redis_sink
FROM user_events_mv WITH (
    connector = 'redis',
    primary_key = 'user_id',
    redis.url= 'redis://your-redis:password@host:port'
) FORMAT PLAIN ENCODE TEMPLATE (
    force_append_only='true',
    key_format = '{user_id}',
    value_format = '{user},{events}'
);

После выполнения этой инструкции в Redis появятся потребленные из Kafka, соединенные и агрегированные данные:

Отображение результатов в Redis
Отображение результатов в Redis

Граф потокового запроса в RisingWave состоит из 3-х фрагментов, каждый из которых представляет собой цепочку SQL-операторов.

Граф потокового SQL-запроса в RisingWave
Граф потокового SQL-запроса в RisingWave

Информация о публикации и потреблении данных также отображается в интерфейсе Kafka.

График публикации и потребления сообщений в Kafka
График публикации и потребления сообщений в Kafka

Таким образом, потоковые базы данных с широким набором коннекторов позволяют строить интересные ETL-конвейеры обработки данных, выполняя над ними различные преобразования. Однако, они не отличаются высокой надежностью и устойчивостью, по крайней мере, их бесплатные инстансы. В частности, в работе над этим конвейером я часто сталкивалась с отказом платформы RisingWave, потерей соединения с моим экземпляром и сложностями с подключением ко внешним система. Например, не удалось передать данные в Elasticsearch, хотя экземпляр этой документо-ориентированной БД отлично работает и доступен извне, что подтверждает подобная разработка на платформе Decodable, о которой я писала здесь.

Тем не менее, подобные потоковые платформы, позволяющие оперировать с потоками данных как с таблицами через SQL-запросы, знакомые каждому аналитику, отлично позволяют этим специалистам проверить гипотезу или быстро получить какие-то данные, не отвлекая инженеров.

Как делать это эффективно с учетом всех особенностей Apache Kafka, вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту