Потоковая агрегация и передача данных из Kafka в Redis через SQL-запросы в RisingWave

Потоковая агрегация и передача данных из Kafka в Redis через SQL-запросы в RisingWave

    Как SQL-запросами соединить потоки из разных топиков Apache Kafka и отправить результаты в Redis: демонстрация ETL-конвейера на материализованных представлениях в RisingWave.

    Постановка задачи и проектирование потоковой системы

    Продолжая недавний пример потоковой агрегации данных из разных топиков Kafka с помощью SQL-запросов, сегодня расширим потоковый конвейер в RisingWave, добавив приемник данных – key-value хранилище Redis. RisingWave – это распределенная реляционная СУБД, которая позволяет работать с потоками данных как с обычными таблицами через типовые SQL-запросы. SQL-запросы выполняются над постепенно обновляемым, согласованными материализованными представлениям, которые принимают данные из внешних источников. Результаты выполнения этих запросов можно передавать их во внешние хранилища с помощью коннекторов.

    Предположим, в один топик Kafka публикуются данные о пользователях, а в другой – о событиях их пользовательского поведения. Source-коннектор RisingWave потребляет данные из топиков Kafka, записывая их в потоковые источники, к которым создается SQL-запрос на соединение данных по идентификатору пользователя и сохраняется в материализованном представлении. Это материализованное представление агрегирует данные о пользователе и событиях его поведения на веб-страницах, сохраняя их как JSON-документы в полях этого табличного представления.

    Архитектура потоковой системы схематично будет выглядеть так:

    Архитектура потокового ETL-конвейера
    Архитектура потокового ETL-конвейера

    Скрипты публикации данных и схемы полезной нагрузки приведены здесь, а SQL-запросы потребления данных из Kafka и создания материализованных представлений по пользователям и событиям приведены в прошлой статье.

    Реализация потокового ETL-конвейера

    Для соединения данных из разных топиков Kafka и выполнения преобразований над ними в RisingWave надо создать материализованное представление:

    CREATE MATERIALIZED VIEW user_events_mv AS
    SELECT 
        u.user_id,
        JSONB_BUILD_OBJECT(
            'email', u.email,
            'name', u.name,
            'registrated', u.event_timestamp
        ) AS user,
        JSONB_AGG(
            JSONB_BUILD_OBJECT(
                'event_timestamp', e.event_timestamp,
                'event', e.event,
                'page', e.page
            )
        ) AS events
    FROM 
        kafka_users u
    LEFT JOIN 
        kafka_events e 
    ON 
        u.user_id = e.user_id
    GROUP BY 
        u.user_id, u.email, u.name, u.event_timestamp;

    Чтобы передавать во внешнюю систему данные о всех зарегистрированных пользователях, даже если они не совершали никаких событий, в этом запросе используется LEFT JOIN. Группировка данных по user_id, email, name и event_timestamp, нужна чтобы обеспечить правильное агрегирование событий для каждого пользователя. Результат SELECT-запроса к матпредставлению user_events_mv в RisingWave выглядит так:

    Агрегированные и преобразованные данные из разных топиков Kafka
    Агрегированные и преобразованные данные из разных топиков Kafka

    Чтобы передать данные в Redis, надо сперва создать приемник данных (sink), используя соответствующий коннектор, а также указав, что будет ключом, а что – значением. RisingWave передает данные в Redis в виде строк, хранящих пары ключ-значение в указанном формате (JSON или TEMPLATE), поэтому первичный ключ всегда должен быть указан. В моем случае логично использовать в качестве ключа идентификатор пользователя user_id, а вся остальная информация о пользователе и его событиях будет значением. И ключ и значение в redis будут являться строками. SQL-запрос на передачу данных в Redis из RisingWave выглядит так:

    CREATE SINK IF NOT EXISTS redis_sink
    FROM user_events_mv WITH (
        connector = 'redis',
        primary_key = 'user_id',
        redis.url= 'redis://your-redis:password@host:port'
    ) FORMAT PLAIN ENCODE TEMPLATE (
        force_append_only='true',
        key_format = '{user_id}',
        value_format = '{user},{events}'
    );

    После выполнения этой инструкции в Redis появятся потребленные из Kafka, соединенные и агрегированные данные:

    Отображение результатов в Redis
    Отображение результатов в Redis

    Граф потокового запроса в RisingWave состоит из 3-х фрагментов, каждый из которых представляет собой цепочку SQL-операторов.

    Граф потокового SQL-запроса в RisingWave
    Граф потокового SQL-запроса в RisingWave

    Информация о публикации и потреблении данных также отображается в интерфейсе Kafka.

    График публикации и потребления сообщений в Kafka
    График публикации и потребления сообщений в Kafka

    Таким образом, потоковые базы данных с широким набором коннекторов позволяют строить интересные ETL-конвейеры обработки данных, выполняя над ними различные преобразования. Однако, они не отличаются высокой надежностью и устойчивостью, по крайней мере, их бесплатные инстансы. В частности, в работе над этим конвейером я часто сталкивалась с отказом платформы RisingWave, потерей соединения с моим экземпляром и сложностями с подключением ко внешним система. Например, не удалось передать данные в Elasticsearch, хотя экземпляр этой документо-ориентированной БД отлично работает и доступен извне, что подтверждает подобная разработка на платформе Decodable, о которой я писала здесь.

    Тем не менее, подобные потоковые платформы, позволяющие оперировать с потоками данных как с таблицами через SQL-запросы, знакомые каждому аналитику, отлично позволяют этим специалистам проверить гипотезу или быстро получить какие-то данные, не отвлекая инженеров.

    Как делать это эффективно с учетом всех особенностей Apache Kafka, вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

    [elementor-template id="13619"]