Чем опасны дубли данных при их потоковой обработке и как реализовать дедупликацию в Apache Flink SQL. Смотрим на практическом примере для обучения дата-инженеров и разработчиков распределенных приложений.
Потоковая дедупликация данных в Apache Flink SQL
Apache Flink можно назвать уникальный фреймворком для разработки распределенных приложений в области Big Data, который унифицирует пакетную и потоковую обработку, сохраняя поддержку ANSI SQL и предоставляя широкий набор функций для сценариев реального времени. В потоковой обработке данных приложения-продюсеры, которые генерируют события, иногда могут создавать дубли – записи с одним и тем же идентификатором. Или восходящие ETL-задания не реализуют гарантию строго однократной доставки, что также приводит к дублированию записей в приемнике в случае отработки отказа. Дубликаты записей повлияют на правильность последующих аналитических заданий, особенно в агрегатных функциях, таких как суммирование, подсчет количества значений и пр. Поэтому для обеспечения высокого качества данных необходимо удалить дубли, т.е. выполнить дедупликацию. Цель этого процесса — обеспечить обработку только уникальных записей и избежать любых проблем, которые могут возникнуть из-за дублирования. В потоковой обработке дедупликация данных может повысить производительность всей Big Data системы.
Таким образом, дедупликация нужна, чтобы:
- повысить производительность системы за счет уменьшения объема обрабатываемых данных через удаление повторов;
- освободить место в хранилище, т.к. дубли занимают пространство жесткого диска;
- улучшить точность анализа и повысить эффективность обработки данных.
Дедупликация данных работает путем выявления и удаления повторяющихся записей из потока данных. Обычно это делается путем сравнения данных в потоке с эталонным набором данных. При обнаружении повторяющейся записи она удаляется из потока. Apache Flink использует SQL-функцию ROW_NUMBER() для удаления дубликатов, как и в случае запроса Top-N. Теоретически дедупликация — это частный случай Top-N, в котором N равно единице и упорядочено по времени обработки или времени события. Ниже показан синтаксис оператора дедупликации:
SELECT [column_list] FROM ( SELECT [column_list], ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] ORDER BY time_attr [asc|desc]) AS rownum FROM table_name) WHERE rownum = 1
Рассмотрим параметры этого запроса:
- ROW_NUMBER() – функция, которая присваивает каждой строке уникальный порядковый номер, начиная с единицы;
- PARTITION BY col1[, col2…] – ключ дедупликации, т.е. один или несколько столбцов раздела;
- ORDER BY time_attr [asc|desc] — столбец сортировки, атрибут времени обработки или события. Аналогично стандартным опциям SQL-оператора ORDER BY, ASC означает сохранение первой строки, а DESC – сохранение последней строки.
- WHERE rownum = 1 – условие уникальности каждой строки, что и означает дедупликацию.
Этот шаблон необходимо точно соблюдать, иначе оптимизатор Apache Flink не сможет преобразовать запрос. Как это работает, далее рассмотрим на практическом примере.
Потоковая обработка данных с помощью Apache Flink
Код курса
FLINK
Ближайшая дата курса
2 декабря, 2024
Продолжительность
16 ак.часов
Стоимость обучения
48 000 руб.
Практический пример
В качестве примера возьмем систему обработки заказов, которая каждые 5 секунд генерирует строки в памяти, которые означают повторяющиеся события с одним и тем же идентификатором order_id.
CREATE TABLE orders ( id INT, order_time AS CURRENT_TIMESTAMP, WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS ) WITH ( 'connector' = 'datagen', 'rows-per-second'='10', 'fields.id.kind'='random', 'fields.id.min'='1', 'fields.id.max'='100' );
Предположим, по правилам бизнес-логики для последующей обработки нужно сохранить только самое последнее событие. Для этого сперва используем комбинацию функции COUNT и предложения HAVING, чтобы проверить, в каких заказах есть более одного события.
--Check for duplicates in the `orders` table SELECT id AS order_id, COUNT(*) AS order_cnt FROM orders o GROUP BY id HAVING COUNT(*) > 1;
Затем отфильтруем эти события с помощью функции ROW_NUMBER():
--Use deduplication to keep only the latest record for each `order_id` SELECT order_id, order_time FROM ( SELECT id AS order_id, order_time, ROW_NUMBER() OVER (PARTITION BY id ORDER BY order_time) AS rownum FROM orders ) WHERE rownum = 1;
Про другой вариант устранения дублей в потоковой обработке данных с помощью Flink SQL читайте в нашей новой статье. А на практике освоить этот прием и другие тонкости использования Apache Flink для потоковой обработки событий в распределенных приложениях аналитики больших данных вам помогут специализированные курсы в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Источники