Копирование сложных структур данных из Kafka в СУБД с SMT и JDBC Sink Connector

обучение Apache Kafka курсы примеры, Apache Kafka Connect для разработчиков, парсинг JSON Kafka Streams KsqlDB Connect, обучение большим данным, Kafka SMT использование пример, Школа Больших Данных Учебный Центр Коммерсант

Мы уже рассматривали особенности обработки вложенных структур данных на примере парсинга JSON-файлов с Apache Spark и Hive. Развивая эту тему, сегодня поговорим про перенос записей с вложенными массивами из топиков Apache Kafka в реляционные СУБД с пользовательскими SMT-преобразователями и JDBC-коннектором: кейс для разработчиков.

Проблемы обработки сложных структур данных с JDBC-коннектором Apache Kafka

Для передачи данных из Apache Kafka в реляционные РСУБД можно использовать JDBC-коннектор от Confluent, который потребляет записи из топика Kafka и отправляет их в базу данных с помощью JDBC-драйвера. JDBC Connector (Source and Sink) поддерживает множество баз данных, не требуя специального кода для каждой из них: данные загружаются путем периодического выполнения SQL-запроса и создания выходной записи для каждой строки в результирующем наборе. По умолчанию все таблицы в базе данных копируются, каждая в свой отдельный выходной топик. Сама база данных отслеживается на наличие новых или удаленных таблиц и автоматически адаптируется.

При копировании данных из таблицы коннектор может загружать только новые или измененные строки, указывая, какие столбцы следует использовать для обнаружения новых или измененных данных. Можно добиться идемпотентной записи с помощью операций вставки-обновления (upserts). Также поддерживается автоматическое создание таблиц и ограниченное автоматическое развитие.

Коннектор приемника требует знания схем, поэтому следует использовать подходящий преобразователь, например, AVRO, который поставляется с реестром схем (Schema Registry), или преобразователь JSON с включенными схемами. Ключи записи Kafka могут быть примитивными типами или структурой Connect, а значение записи должно быть структурой Connect. Поля, выбираемые из структур Connect, должны относиться к примитивным типам. Если данные в теме имеют несовместимый формат, может потребоваться реализация пользовательского преобразователя.

На практике этот коннектор отлично работает при соблюдении следующих условий:

  • схема записей, которые необходимо отправить в СУБД, плоская и не содержит массивов или аналогичных сложных структур, таких как вложенные массивы объектов с несколькими уровнями;
  • каждая запись из топика Kafka однозначно сопоставляется с одной строкой в ​​целевой таблице реляционной базы данных.

В реальности эти условия соблюдаются не всегда: некоторые записи имеют сложные схемы данных со вложенными массивами, которые также могут содержать многоуровневые структуры. При их отправке из топика Kafka с помощью JDBC Sink-коннектора можно столкнуться с исключением

org.apache.kafka.connect.errors.ConnectException: Unsupported source data type: ARRAY

Для решения этой проблемы стоит рассмотреть следующие варианты:

  • сгладить схему данных с помощью топологии Kafka Streams, а затем использовать эту упрощенную схему в качестве входных данных для JDBC-коннектора приемника;
  • применить функцию explode() в ksqlDB;
  • написать свой собственный Java-потребитель или коннектор Kafka;
  • использовать JDBC Sink с функцией Flatten, который является расширением существующего JDBC-коннектора от Confluent. При его использовании сопоставления и массивы будут разделены и записаны в отдельные целевые таблицы.

Каждый из перечисленных вариантов имеет свои достоинства и недостатки. В частности, если требуется универсальное решение, которое можно повторно использовать для нескольких сценариев с Kafka Connect и минимальными усилиями на разработку кода, предпочтительным кажентся последнее решение – расширение JDBC Sink-коннектора с функцией Flatten. Однако, для сложных схем AVRO потребуется его тщательная настройка и отладка. Поэтому имеет смысл разработать собственный преобразователь отдельных сообщений (SMT, Single Message Transformation) для применения к сообщениям из топика Kafka по мере их прохождения через платформу Connect. SMT преобразуют входящие сообщения после их создания коннектором источника, но до того, как они будут записаны в Kafka. SMT преобразуют исходящие сообщения перед их отправкой на коннектор приемника. Как это реализовать, рассмотрим далее.

Пользовательский SMT для обработки вложенных структур

Если большинство СУБД-приемников поддерживают работу со строками JSON или строками XML, можно использовать обычный JDBC-коннектор приемника Kafka, добавив специальный SMT, который преобразует исходную сложную схему в более простую, содержащую только одно поле. Содержимое этого поля — исходная схема со всеми данными в строковом представлении JSON или XML. Эта строка помещается в промежуточную таблицу «Ключ/Значение» в целевой БД. Логика на стороне БД с использованием триггеров и хранимых процедур анализирует строку JSON или XML и при необходимости сопоставляет ее с любой реляционной моделью. Исходный код этого коннектора доступен на Github по лицензии Apache 2, а его архитектура выглядит следующим образом:

  • потребитель считывает данные со сложными схемами и вложенными структурами из топика Kafka;
  • SMT-преобразователь с JDBC-коннектором приемника на платформе Kafka упрощает схему данных;
  • плоское представление исходной схемы данных в виде строки JSON или XML записывается в промежуточную таблицу «Ключ/Значение» в целевой реляционной базы;
  • парсинг JSON-строки выполняется с помощью хранимых процедур в целевой СУБД;
  • итоговый результат записывается в нужные таблицы базы-приемника.
курсы Kafka обучение пример, Kafka Connect пример SMT JSON парсинг RDBS
Архитектура решения по переносу записей с вложенными массивами из топиков Apache Kafka в реляционные СУБД с пользовательскими SMT-преобразователями и JDBC-коннектором

Представленное решение имеет следующие преимущества:

  • отсутствие дополнительных компонентов типа Kafka Streams или KSQL в интеграционном конвейере;
  • универсальное решение, которое можно применять для нескольких вариантов использования со сходной структурой и большинства реляционных СУБД;
  • изменения схемы данных в сообщениях не отразятся на стороне Kafka Connect, а затронут только хранимые процедуры СУБД;
  • SMT можно комбинировать с другими подобными преобразователями для фильтрации или предварительного преобразования данных.

Обратной стороной этих достоинств являются следующие недостатки:

  • Рост нагрузки на базу данных и повышенное потребление места на жестком диске из-за промежуточных таблиц, куда записываются огромные блоки данных, которые необходимо обрабатывать на стороне БД. При этом из объемной записи JSON может требоваться только один или несколько атрибутов, а остальная строка не нужна, а лишь занимает место.
  • в некоторых реляционных СУБД работа с JSON и XML может быть сложна и требует от разработчика определенных навыков;
  • изменения схемы данных необходимо учитывать в хранимых процедурах, поэтому знать об этом необходимо заранее;
  • мониторинга Kafka Connect недостаточно, также необходимо следить за выполнением логики обработки данных в самой базе;
  • отсутствие безопасности типов – при работе со строками разработчику придется заботиться о совместимости типов данных и применяемых к ним функций самостоятельно.

Некоторые из этих недостатков можно обойти, дополнив описанное SMT-решение функциями поиска конкретной информации из вложенных массивов в записи подключения, например, JsonPath/XPath, чтобы передавать в реляционные СУБД только те данные, которые действительно необходимы, а не огромные строки JSON целиком.

Больше практических деталей по администрированию и использованию Apache Kafka для потоковой аналитики больших данных вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков больших данных в Москве:

Источники

  1. https://medium.com/bearingpoint-technology-advisory/handle-arrays-and-nested-arrays-in-kafka-jdbc-sink-connector-41929ea46301
  2. https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc
  3. https://docs.confluent.io/platform/current/connect/transforms/overview.html
  4. https://github.com/an0r0c/kafka-connect-transform-tojsonstring
Поиск по сайту