Мы уже писали про поиск сложных событий при их потоковой обработке средствами Apache Flink. Продолжая эту важную для обучения дата-инженеров тему, сегодня рассмотрим, как CDC-коннектор от GetIndata упрощает запуск распознавание шаблонов на потоках данных из многих источников. Проблемы захвата измененных данных из реляционной базы с помощью JDBC-драйвера и способы их решения с Flink SQL.
CEP и CDC в реальном времени
Возможность в реальном времени обнаруживать шаблоны в потоках событий, чтобы заблаговременно принять соответствующие меры успешно решается в современных Big Data системах с помощью специализированных CEP-библиотек (Complex Event Processing, CEP). Они обеспечивают сопоставление потока входящих событий с заранее определенным шаблоном, на лету выполняя поиск релевантных данные по сохраненному запросу и отбрасывая неподходящие компоненты потока. CEP позволяет найти объект по шаблонизированному запросу в бесконечном потоке входных данных. Таким объектом может быть все что угодно, что имеет определенное поведение в виде последовательности событий. А шаблоном будет набор действий, которые может выполнять этот объект. Как только CEP обнаружит данные, удовлетворяющие заданному шаблону, система идентифицирует искомый объект в режиме реального времени.
В Apache Flink идея CEP отлично реализуется благодаря потоковой природе фреймворка, а также его высокой пропускной способности обработки данных. Впервые API библиотеки CEP в Apache Flink был представлен в 2016 году. Библиотека FlinkCEP не является частью бинарного дистрибутива фреймворка, поэтому для ее использования следует явно задать зависимость FlinkCEP в файл pom.xml Flink-проекта, как мы показывали здесь.
Идеи CEP коррелируют с захватом измененных данных (Change Data Capture, CDC) – подход, который работает с обнаружением изменения данных в исходной системе, чтобы взять именно эти изменения для дальнейшей обработки. Например, можно использовать CDC для сбора изменений данных в реляционной базе данных и создания потока событий, которые описывают эти изменения. Поток состоит из последовательности событий, которые описывают операции вставки, обновления и удаления строк таблицы. Разумеется, поток CDC может быть обработан Apache Flink, что позволяет выполнять сложные задания аналитики больших данных, в т.ч. распознавание паттернов.
Чтобы посмотреть ,как это работает, разберем практический пример. Для большей наглядности сформулируем требование в виде пользовательской истории: «Как бизнес-аналитик, я хочу проверить эффективность маркетинговой кампании по быстрым кредитам с использованием мобильного приложения». Определим критерии приемки этой User Story как результат, который отображает список пользователей, которые:
- выполнили транзакцию на сумму более 1000, используя мобильное приложение в течение 3 дней после транзакции;
- не взяли кредит;
- повторно использовали мобильное приложение в течение 7 дней после предыдущего случая.
Предположим, исходные данные для этого кейса хранятся в нескольких источниках: в топиках Apache Kafka есть события транзакций и мобильных приложений, а PostgreSQL содержит информацию о пользовательских кредитах.
Оперативное обновление результата в режиме реального времени обеспечат технологии потоковой обработки событий, в частности, Apache Flink, его CEP-библиотека для распознавания шаблонов, а также коннекторы. Как это реализовать, рассмотрим далее.
CDC и коннекторы Apache Flink
Apache Flink имеет несколько предустановленных коннекторов, например, к Kafka, который позволяет рассматривать топик как таблицу в Flink SQL. Благодаря этому можно обрабатывать информацию о транзакциях и событиях мобильных приложений. Однако, захват изменений от БД является более сложным сценарием: нужно преобразовать изменения данных из реляционных баз в качестве потока событий. Сделать это можно, используя готовые инструменты CDC. Одним из них является Debezium – популярный фреймворк с открытым исходным кодом, который поддерживается Red Hat. Он захватывает изменения данных из журналов транзакций БД и публикует соответствующие события в топиках Kafka, не снижая производительности источника. Debezium может собирать измененные данные в реальном времени, включая операции удаления и никогда не пропускает события, отражая все изменения в таблице.
Недостатком Debezium является его сложная настройка, которая требует использования Apache Kafka, потребность в доступе к базе данных Binlog и ограниченный набор поддерживаемых баз данных.
Альтернативой Debezium могут стать CDC-коннекторы от Ververica, которая занимается коммерциализацией и развитием Apache Flink. Flink-CDC-коннектор включает Debezium как CDC-движок, поэтому для него тоже характерны ранее указанные ограничения. Кроме того, этот коннектор поддерживает только MySQL (5.7, 8.0.x), PostgreSQL (9,6, 10, 11, 12) и MongoDB (4,0, 4,2, 5,0).
Также для реализации CDC можно использовать официальный Flink Connector JDBC, который позволяет писать и читать данные из реляционных баз (MySQL, PostgreSQL, Derby) непосредственно во Flink SQL. Это встроенный инструмент фреймворка, который считывает данные из таблицы только один раз, поэтому его нельзя назвать средством полноценной поддержки CDC.
Поэтому дата-инженеры компании GetIndata разработали собственный CDC-коннектор JDBC, который позволяет периодически читать данные из таблиц в реляционных базах. В отличие от Debezium, он не требует дополнительных компонентов и совместим с любой базой данных. Достаточно просто составить SQL-запросы для JDBC-драйвера. Коннектор использует чистый SQL для CDC, не нуждаясь в специальной конфигурации базы данных. Однако, этот коннектор не поддерживает операции удаления, а некоторые CDC-стратегии требуют дополнительных столбцов, например, last_modify_date. Работа коннектора может оказать влияние на базу данных, поскольку он считывает данные с помощью периодического запуска SQL-запроса, что потенциально снижает производительность. Коннектор GetIndata обновляет данные почти в реальном времени: логика CDC работает через определенные интервалы, которые можно настроить очень часто. Тем не менее, есть риск пропустить событие CDC в часто изменяющихся строках, например, когда вставка и удаление строки произошли раньше, чем коннектор обновил данные.
Для разработки этого CDC-коннектора дата-инженеры GetIndata использовали табличный API, предоставленный Flink. Flink предоставляет интерфейсы, которые должны быть реализованы пользовательской логикой для обработки внешних источников данных, таких как таблица. Затем эту таблицу можно обработать с помощью Flink SQL. При этом выполнении запроса Flink не изменит никаких внешних данных: движок выполнения фреймворка использует определение таблицы для чтения всех данных из источника.
Рассмотрим, как соединить идеи CDC с CEP, т.е. как определить распознавание шаблонов событий с помощью Flink SQL:
SELECT * FROM events MATCH_RECOGNIZE( PARTITION BY customer_id ORDER BY ts MEASURES TRX.event_id AS trx_event_id, TRX.customer_id AS trx_customer_id, TRX.type AS trx_type, TRX.payload AS trx_payload, TRX.ts AS trx_ts, APP_1.event_id AS app_1_event_id, APP_1.customer_id AS app_1_customer_id, APP_1.type AS app_1_type, APP_1.payload AS app_1_payload, APP_1.ts AS app_1_ts ONE ROW PER MATCH PATTERN (TRX APP_1 NOT_LOAN*? APP_2) WITHIN INTERVAL '10' DAY DEFINE TRX AS TRX.type = 'trx_event' AND TRX.payload > 1000, APP_1 AS APP_1.type = 'clickstream_event' AND APP_1.ts < TRX.ts + INTERVAL '3' DAY, APP_2 AS APP_2.type = 'clickstream_event' AND APP_2.ts > APP_1.ts AND APP_2.ts < APP_1.ts + INTERVAL '7' DAY, NO_LOAN AS NOT_LOAN.type <> 'loan_event' ) MR;
В вышеприведенном коде определены события из ранее описанной пользовательской истории о проверке эффективности маркетинговой кампании по быстрым кредитам с использованием мобильного приложения:
- TRX — событие, которое описывает транзакцию с суммой более 1000;
- APP_1 — событие, представляющее взаимодействие с пользователем с приложением в течение 3 дней после транзакции;
- NO_LOAN — событие, которое сообщает, что пользователь не взял кредит.
Также в коде указан порядок событий и ожидаемый выход, чтобы найти шаблон из этой пользовательской истории. Для определения шаблона используется регулярное выражение.
Читайте в нашей новой статье про другой адаптер от GetIndata для Apache Flink. А освоить тонкости использования Apache Flink и Spark для потоковой обработки событий в распределенных приложениях аналитики больших данных вы сможете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Источники