Устраняем дубли в потоковых данных с Apache Flink SQL

Устраняем дубли в потоковых данных с Apache Flink SQL

Чем опасны дубли данных при их потоковой обработке и как реализовать дедупликацию в Apache Flink SQL. Смотрим на практическом примере для обучения дата-инженеров и разработчиков распределенных приложений.

Потоковая дедупликация данных в Apache Flink SQL

Apache Flink можно назвать уникальный фреймворком для разработки распределенных приложений в области Big Data, который унифицирует пакетную и потоковую обработку, сохраняя поддержку ANSI SQL и предоставляя широкий набор функций для сценариев реального времени. В потоковой обработке данных приложения-продюсеры, которые генерируют события, иногда могут создавать дубли – записи с одним и тем же идентификатором. Или восходящие ETL-задания не реализуют гарантию строго однократной доставки, что также приводит к дублированию записей в приемнике в случае отработки отказа. Дубликаты записей повлияют на правильность последующих аналитических заданий, особенно в агрегатных функциях, таких как суммирование, подсчет количества значений и пр. Поэтому для обеспечения высокого качества данных необходимо удалить дубли, т.е. выполнить дедупликацию. Цель этого процесса  — обеспечить обработку только уникальных записей и избежать любых проблем, которые могут возникнуть из-за дублирования. В потоковой обработке дедупликация данных может повысить производительность всей Big Data системы.

Таким образом, дедупликация нужна, чтобы:

  • повысить производительность системы за счет уменьшения объема обрабатываемых данных через удаление повторов;
  • освободить место в хранилище, т.к. дубли занимают пространство жесткого диска;
  • улучшить точность анализа и повысить эффективность обработки данных.

Дедупликация данных работает путем выявления и удаления повторяющихся записей из потока данных. Обычно это делается путем сравнения данных в потоке с эталонным набором данных. При обнаружении повторяющейся записи она удаляется из потока. Apache Flink использует SQL-функцию ROW_NUMBER() для удаления дубликатов, как и в случае запроса Top-N. Теоретически дедупликация — это частный случай Top-N, в котором N равно единице и упорядочено по времени обработки или времени события. Ниже показан синтаксис оператора дедупликации:

SELECT [column_list]
FROM (
   SELECT [column_list],
     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
       ORDER BY time_attr [asc|desc]) AS rownum
   FROM table_name)
WHERE rownum = 1

Рассмотрим параметры этого запроса:

  • ROW_NUMBER() – функция, которая присваивает каждой строке уникальный порядковый номер, начиная с единицы;
  • PARTITION BY col1[, col2…] – ключ дедупликации, т.е. один или несколько столбцов раздела;
  • ORDER BY time_attr [asc|desc] — столбец сортировки, атрибут времени обработки или события. Аналогично стандартным опциям SQL-оператора ORDER BY, ASC означает сохранение первой строки, а DESC – сохранение последней строки.
  • WHERE rownum = 1 – условие уникальности каждой строки, что и означает дедупликацию.

Этот шаблон необходимо точно соблюдать, иначе оптимизатор Apache Flink не сможет преобразовать запрос. Как это работает, далее рассмотрим на практическом примере.

Потоковая обработка данных с помощью Apache Flink

Код курса
FLINK
Ближайшая дата курса
10 ноября, 2025
Продолжительность
16 ак.часов
Стоимость обучения
48 000

Практический пример

В качестве примера возьмем систему обработки заказов, которая каждые 5 секунд генерирует строки в памяти, которые означают повторяющиеся события с одним и тем же идентификатором order_id.

CREATE TABLE orders (
  id INT,
  order_time AS CURRENT_TIMESTAMP,
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='10',
  'fields.id.kind'='random',
  'fields.id.min'='1',
  'fields.id.max'='100'
);

Предположим, по правилам бизнес-логики для последующей обработки нужно сохранить только самое последнее событие. Для этого сперва используем комбинацию функции COUNT и предложения HAVING, чтобы проверить, в каких заказах есть более одного события.

--Check for duplicates in the `orders` table
SELECT id AS order_id,
       COUNT(*) AS order_cnt
FROM orders o
GROUP BY id
HAVING COUNT(*) > 1;

Затем отфильтруем эти события с помощью функции ROW_NUMBER():

--Use deduplication to keep only the latest record for each `order_id`
SELECT
  order_id,
  order_time
FROM (
  SELECT id AS order_id,
         order_time,
         ROW_NUMBER() OVER (PARTITION BY id ORDER BY order_time) AS rownum
  FROM orders
     )
WHERE rownum = 1;

Про другой вариант устранения дублей в потоковой обработке данных с помощью Flink SQL читайте в нашей новой статье. А на практике освоить этот прием и другие тонкости использования Apache Flink для потоковой обработки событий в распределенных приложениях аналитики больших данных вам помогут специализированные курсы в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

[elementor-template id=»13619″]

Источники

  1. https://www.ververica.com/blog/flink-sql-deduplication
  2. https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/queries/deduplication/
  3. https://github.com/ververica/flink-sql-cookbook/blob/main/aggregations-and-analytics/06_dedup/06_dedup.md