Дубли при CDC-передаче данных из PostgreSQL в ClickHouse и как их устранить

ClickHouse примеры курсы обучение, архитектура данных примеры курсы обучение, интеграция PostgreSQL с ClickHouse

Почему табличный движок ReplacingMergeTree в PeerDB и ClickPipes не избавит от дублей при передаче измененных данных из PostgreSQL в ClickHouse и можно ли полностью выполнить дедупликацию с помощью модификатора FINAL, политики строк, обновляемых представлений или агрегатных и оконных функций.

Как движок ReplacingMergeTree допускает дубли при импорте изменений из PostgreSQL в ClickHouse

Недавно мы разбирали реализацию CDC-подхода к передаче данных из транзакционной базы PostgreSQL в аналитическое хранилище Clickhouse с помощью PeerDB и основанном на этом ETL/ELT-инструменте репликации ClickPipes — интеграционного движка для облачной версии. Напомним, ClickHouse лучше всего работает с аналитическими запросами на чтение, хорошо поддерживает пакетную вставку новых данных и не рекомендует частые обновления.

При репликации изменений из PostgreSQL в ClickHouse автоматически создаются таблицы с движком ReplacingMergeTree, столбцы которых имеют тип данных, который наиболее точно соответствует исходному типу. Напомним, движок ReplacingMergeTree отличается от MergeTree характером дедубликации: он удаляет повторяющиеся записи с одинаковым значением ключа сортировки по столбцу с оператором ORDER BY, а не по первичному ключу PRIMARY KEY. Это происходит только во время слияния, которое выполняется в фоновом режиме в неизвестное время. Такая стратегия экономит место, но не гарантирует отсутствия дубликатов, о чем мы писали здесь.

Движок ReplacingMergeTree представляет обновления данных как вставки с более новой версией, которая указывается в столбце_peerdb_version. Удаление данных тоже вставка с более новой версией и значением true в столбце _peerdb_is_deleted. Сохранение и дедубликация данных движок ReplacingMergeTree выполняет в фоновом режиме, сохраняя последнюю версию строки для заданного первичного ключа id. Это обеспечивает эффективную обработку изменений UPDATE и удалений DELETE как версионных вставок.

В настройках движка ReplacingMergeTree можно задать параметра для более частого объединения данных:

  • min_age_to_force_merge_seconds – минимальный возраст данных для запуска слияния (в секундах), по умолчанию равно 0 (отключено);
  • min_age_to_force_merge_on_partition_only – тоже самое, но применительно только ко всему разделу. По умолчанию false.

Можно установить эти значения для существующих ReplacingMergeTree-таблиц с помощью оператора ALTER TABLE. Изменение этих настроек увеличивает частоту слияний и может радикально сократить дубликаты, но не гарантирует их полное отсутствие. Как уже было отмечено ранее, с движком ReplacingMergeTree устаревшие версии строк в конечном итоге отбрасываются в процессе слияния. Однако время этого слияния непредсказуемо. Это означает, что в какие-то моменты времени запросы в ClickHouse могут возвращать несогласованные результаты. Как избежать этого, рассмотрим далее.

Дедупликация данных при передаче изменений

Одним из самых простых способов избежать дублирования данных, возникающих из-за особенностей табличного движка ReplacingMergeTree – это использовать модификатор FINAL в запросах к таблицам ClickHouse, в которые попадают изменения из PostgreSQL. Модификатор FINAL в ClickHouse выполняет дедупликацию строк, объединяя дубли во время запроса после фильтрации с оператором WHERE, но до агрегации GROUP BY. Это гарантирует возврат только дедуплицированных строк. Однако, FINAL добавляет некоторые накладные расходы к запросам, а также имеет другие недостатки, о которым мы рассказывали в этой статье. Чтобы не добавлять FINAL к каждому имени таблицы в запросе, можно настроить автоматическое применение этого модификатора ко всем таблицам в запросе как для отдельного запроса, так и для всего сеанса.

-- на уровне запроса ко всем таблицам 
SELECT count(*) FROM posts SETTINGS final = 1; 

-- на уровне сеанса 
SET final = 1; 6SELECT count(*) FROM posts;

Альтернативой FINAL может стать политика строк (ROW) — фильтр, который определяет, какие строки доступны пользователю или роли. Это имеет смысл только для пользователей с доступом только для чтения. Когда пользователи могут изменять таблицу или копировать разделы между таблицами, это отменяет ограничения политик строк.

-- применение политики строк ко всем пользователям
CREATE ROW POLICY cdc_policy ON votes FOR SELECT USING _peerdb_is_deleted = 0 TO ALL;

Также можно использовать представления – заранее вычисленные результаты, которые можно повторно использовать при выполнении SQL-запросов, позволяя снизить количество ресурсоемких операций соединения. Представления как виртуальные таблицы позволяют запросить данные в ClickHouse тем же SQL-запросом, как это делалось в источнике, т.е. PostgreSQL, т.к. поскольку представления не хранят никаких данных, а просто выполняют чтение из другой таблицы при каждом доступе. Более того, можно использовать обновляемые материализованные представления Refreshable Materialized View, которые позволяют планировать выполнение запроса для дедупликации строк и сохранения результатов в целевой таблице. При каждом запланированном обновлении целевая таблица заменяется последними результатами запроса. Например, создадим материализованное представление и запланируем его обновление каждый час. При этом в таблицу deduplicated_posts будут записываться только те записи из таблицы posts, которые не помечены как удалённые, т.е. _peerdb_is_deleted=0.

CREATE MATERIALIZED VIEW deduplicated_posts_mv REFRESH EVERY 1 HOUR TO deduplicated_posts AS 
SELECT * FROM posts FINAL where _peerdb_is_deleted=0

Это позволит поддерживать актуальную и очищенную от дубликатов версию данных в таблице deduplicated_posts, регулярно обновляемую из основной таблицы posts без включения удалённых записей. Главным преимуществом этого метода является однократное выполнение запроса с применением FINAL только во время обновления. Но данные в целевой таблице будут актуальны только согласно последним обновлениям. При этом интервалы обновления, т.е. загрузки изменений из PostgreSQL в ClickHouse могут быть длительными: от пары минут до нескольких часов.

Чтобы избежать неконсистентности данных в этом случае, можно применить агрегатную функцию argMax, которая вычисляет агрегат для максимального значения. Это подходит для динамической дедупликации строк во время выполнения запроса и особенно полезно, когда нужно сохранить самую последнюю или релевантную запись на основе столбца версий или временных меток. Поскольку PeerDB сохраняет версии данных, импортированных из PostgreSQL в ClickHouse, в столбце _peerdb_version, можно применить функцию argMax для выбора строки с наивысшим значением _peerdb_version для каждого первичного ключа. Такой подход позволяет эффективно удалять дубликаты, не изменяя базовые данные. Затем можно запустить агрегации в качестве подзапроса по этому дедуплицированному набору результатов для дальнейшего анализа. Например, следующий запрос показывает аналитику пользовательского поведения, возвращая для каждого пользователя количество активных целей и время последнего обновления каждой из них:

SELECT
    owned_user_id,
    COUNT(*) AS active_goals_count,
    MAX(ts) AS latest_goal_time
FROM
(
    SELECT
        id,
        argMax(owned_user_id, _peerdb_version) AS owned_user_id,
        argMax(goal_title, _peerdb_version) AS goal_title,
        argMax(goal_data, _peerdb_version) AS goal_data,
        argMax(enabled, _peerdb_version) AS enabled,
        argMax(ts, _peerdb_version) AS ts,
        argMax(_peerdb_synced_at, _peerdb_version) AS _peerdb_synced_at,
        argMax(_peerdb_is_deleted, _peerdb_version) AS _peerdb_is_deleted,
        max(_peerdb_version) AS _peerdb_version
    FROM peerdb.public_goals
    WHERE enabled = true
    GROUP BY id
) AS deduplicated_goals
GROUP BY owned_user_id;

Подзапрос выбирает для каждой цели наиболее актуальные данные, основываясь на поле _peerdb_version. Это обеспечивает, что в итоговом наборе данных нет дублирующихся записей для одной и той же цели. Фильтрация активных целей выполняется в подзапросе, где учитываются только те цели, у которых поле enabled установлено в true. Основной запрос выполняет агрегацию по пользователям, группируя полученные уникальные и активные цели по полю owned_user_id, подсчитывает количество активных целей у каждого пользователя и определяет самое последнее время ts достижения или обновления цели. Подзапрос FROM ( … ) AS deduplicated_goals предварительно обрабатывает данные для удаления дубликатов и получения актуальной информации по каждой цели.

Также можно использовать оконные функции ClickHouse для достижения аналогичной дедупликации, выбрав строку с наивысшим значением _peerdb_version в каждом разделе id. Например, следующий запрос тоже возвращает дедуплицированные строки о пользовательских целях и времени их создания/обновления:

SELECT
    owned_user_id,
    COUNT(*) AS active_goals_count,
    MAX(ts) AS latest_goal_time
FROM
(
    SELECT
        *,
        ROW_NUMBER() OVER (PARTITION BY id ORDER BY _peerdb_version DESC) AS rn
    FROM peerdb.public_goals
    WHERE enabled = true
) AS ranked_goals
WHERE rn = 1
GROUP BY owned_user_id;

Из таблицы peerdb.public_goals выбираются только активные цели, у которых enabled = true. Для каждой цели по ее id выбирается запись с самой высокой версией, т.е. с максимальным значением _peerdb_version с помощью оконной функции ROW_NUMBER().После выбора актуальных записей, они группируются по owned_user_id, и для каждой группы вычисляется количество активных целей active_goals_count и время последней цели latest_goal_time.

Рассмотренные способы не устраняют всех проблем с дублированием данных при передаче изменений из PostgreSQL в Clickhouse. Дополнительные сложности возникают из-за разных ключей сортировки, которые используются в ClickHouse для дедупликации, но сами могут стать причиной появления дублей. Как справиться с этим, читайте в нашей новой статье.

Освойте ClickHouse на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://clickhouse.com/blog/postgres-to-clickhouse-data-modeling-tips-v2
  2. https://docs.peerdb.io/bestpractices/clickhouse_datamodeling
Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.