Потоковая обработка данных из PostgreSQL с Flink SQL на платформе Ververica Cloud

Apache Flink примеры курсы обучение, разработка Flink -приложений, Flink для инженеров данных и разработчиков, Школа Больших Данных Учебный Центр Коммерсант

Как с помощью Flink SQL организовать потоковую агрегацию данных из таблицы PostgreSQL: знакомство с API таблиц в Ververica Cloud на практическом примере.

API таблиц Ververica Cloud:  создаем внешние источники и приемники данных

Как я недавно рассказывала, немецкая фирма Ververica создала высокопроизводительный облачный сервис для обработки данных в реальном времени на базе Apache Flink. Этот полностью управляемый облачный сервис позволяет запускать приложения Apache Flink, в т.ч. на бесплатном тарифе. Чтобы познакомиться с возможностями этой платформы поближе, сегодня рассмотрим пример потоковой агрегации данных в одной из таблиц PostgreSQL. Поскольку источником данных является реляционная СУБД, логично из всех API фреймворка выбрать именно табличный, т.е. Flink SQL. Он позволяет подключиться к внешним системам для чтения и записи как пакетных, так и потоковых таблиц. Таблица-источник обеспечивает доступ к данным, которые хранятся во внешних системах (таких как база данных, хранилище значений ключей, очередь сообщений или файловая система). Таблица-приемник передает таблицу во внешнюю систему хранения. В зависимости от типа источника и приемника API таблиц фреймворка может работать с различными форматами: JSON, CSV, AVRO, Parquet, ORC и пр.

Прежде всего, необходимо создать схему источника данных с помощью source-коннектора. В Ververica Cloud это делается в разделе Catalogs.

Создание таблицы Flink SQL на основе коннектора
Создание таблицы Flink SQL на основе коннектора

Затем следует задать схему таблицы и учетные данные подключения. Я буду работать с таблицей jwts, где сохраняются JWT-токены, генерируемые для авторизации менеджеров интернет-магазина. О проектировании и реализации этой системы я писала в блоге нашей Школы прикладного бизнес-анализа здесь и здесь. Токен имеет ограниченный срок действия, и записывается в cookie-файл заголовка HTTP-запроса, который клиент отправляет на сервер. Сам сохраняется в таблице jwts базы данных PostgreSQL, которая развернута в облачной платформе Neon.

PostgreSQL как источник данных
PostgreSQL как источник данных

Выполняем запросы Flink SQL

Запрос Flink SQL на создание таблицы-источника данных выглядит так:

CREATE TABLE `system`.`default`.`jwts` (
  `id` INT,
  `published` VARCHAR(2147483647),
  `token` VARCHAR(2147483647),
  `sysuser` INT
)
COMMENT 'jwts'
WITH (
  'connector' = 'postgres-cdc',
  'database-name' = 'neondb',
  'debezium.plugin.name' = 'pgoutput',
  'hostname' = 'my-host',
  'password' = 'my-pass',
  'port' = '5432',
  'schema-name' = 'public',
  'slot.name' = 'flink',
  'table-name' = 'jwts',
  'username' = 'my-user'
)

После его выполнения будет создана таблица источника данных.

Метаданные и схема созданной таблицы Flink SQL
Метаданные и схема созданной таблицы Flink SQL

Чтобы посмотреть ее содержимое средствами Flink SQL, надо задать следующий запрос уже в разделе SQL Editor:

CREATE TABLE print_table( 
  `id` INT,
  `published` STRING,
  `token` STRING,
  `sysuser` INT
) WITH ( 
'connector' = 'print', 
'logger' = 'true' 
); 
INSERT INTO print_table 
SELECT * FROM jwts;
Выполненный запрос Flink SQL
Выполненный запрос Flink SQL

Для выполнения потоковых агрегаций над этой таблицей необходимо создать новый запрос. Чтобы вычислить, сколько раз каждый пользователь входил в систему необходимо сделать группировку по полю sysuser. Для этого запрос будет следующим:

CREATE TABLE print_table( 
 `sysuser` INT,
 `q` BIGINT NOT NULL
) WITH ( 
'connector' = 'print', 
'logger' = 'true' 
); 
INSERT INTO print_table 
SELECT sysuser, COUNT(*) FROM jwts
GROUP BY sysuser;
Агрегация по таблице Flink SQL
Агрегация по таблице Flink SQL

Таким образом, пользователь с id 1003 входил в систему чаще всего.

Чтобы поэкспериментировать с sink-коннектором, я решила записать в NoSQL-хранилище типа key-value Redis JWT-токены, выдаваемые пользователям. Ключом будет id пользователя, т.е. поле sysuser, а значением – сам токен. Поскольку Redis позволяет задать разные типы данных для значений, лучше всего для этого случая подходит хэш-массив, где к одному ключу можно записать разные поля. Хэши Redis — это типы записей, структурированные как коллекции пар поле-значение. Для этого создала следующий запрос Flink SQL:

CREATE TABLE redis_sink (
      rkey INT,
      rvalue STRING
    ) with (
      'connector' = 'redis',
      'mode' = 'hashmap',
      'host' = 'my-host',
      'port' = 'my-port',
      'password' = 'my-password'
    );

   INSERT INTO redis_sink
    SELECT sysuser, token
    FROM jwts;

Результат выполнения запроса отображается в интерфейсе Flink SQL.

Таблица Flink SQL с sink-коннектором Redis
Таблица Flink SQL с sink-коннектором Redis

Однако, в самом хранилище этих данных не оказалось. Как это исправить, расскажу в следующий раз. В заключение отмечу, что выполнение каждой задачи и задания Flink со всеми системными метриками можно посмотреть в GUI фреймворка.

UI Flink
GUI Flink

Узнайте больше про использование Apache Flink для потоковой обработки событий в распределенных приложениях аналитики больших данных и машинного обучения на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/overview/
  2. https://docs.ververica.com/vvc/connectors-and-formats/built-in-connectors/postgress
  3. https://docs.ververica.com/vvc/connectors-and-formats/built-in-connectors/redis
Контакты авторизированного учебного центра
«Школа Больших Данных»
Адрес:
127576, г. Москва, м. Алтуфьево, Илимская ул. 5 корпус 2, офис 319, БЦ «Бизнес-Депо»
Часы работы:
Понедельник - Пятница: 09.00 – 18.00
Остались вопросы?
Звоните нам +7 (495) 414-11-21 или отправьте сообщение через контактную форму. Также вы можете найти ответы на ваши вопросы в нашем сборнике часто задаваемых вопросов.
Оставьте сообщение, и мы перезвоним вам в течение рабочего дня
Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Или напишите нам в соц.сетях
Поиск по сайту