ksqlDB 0.19.0: июньские новинки для разработчиков Kafka от Confluent

ksqlDB курсы Kafka, обучение Kafka Streams для разработчиков, обучение Apache Kafka, курсы Apache Kafka, обучение KSQL, курсы KSQL, Школа Больших Данных Учебный центр Коммерсант

6 июня 2021 года компания Confluent, которая продвигает коммерческую версию платформы Apache Kafka, выпустила новый релиз ksqlDB. Сегодня рассмотрим самые важные исправления ошибок и новые функции ksqlDB 0.19.0, уделив особое внимание SQL-запросам соединения таблиц через JOIN по внешнему ключу.

ТОП-10 исправленных ошибок в новом релизе ksqlDB

Напомним, ksqlDB – это клиент-серверная СУБД потоковой передачи событий с API на основе SQL для запроса и обработки данных, хранящихся в топиках Apache Kafka. До ноября 2019 года этот SQL-движок для Apache Kafka от компании Confluent назывался KSQL. Свежий релиз ksqlDB, выпущенный в июне 2021, включает 10 новых функций и 14 исправленных ошибок. Сперва перечислим исправленные ошибки, наиболее важные для разработчиков приложений Kafka Streams [1]:

  • разрешена передача переменной среды KSQL_GC_LOG_OPTS, позволяя указывать параметры сборки мусора (Garbage Collection) в переменных среды, смоделированных на основе текущей конфигурации брокера K
  • уточнена формулировка сообщения об ошибке при операциях со значениями NULL;
  • исправлена проблема запроса INSERT, когда базовый запрос CREATE_AS не завершался при удалении потока через DROP из-за QueryRegistryImpl, который удалял ссылки на запросы из переменных сопоставления insertQueries и createAsQueries;
  • расширены тесты анонимайзера запросов с функциональными тестами, в частности, добавлена анонимизация пользовательских типов данных и операторов JOIN;
  • исправлено неверное определение типа Struct из схемы protobuf;
  • исправлен тест shouldNotBeAbleToUseWssIfClientDoesNotTrustServerCert, чтобы предупредить атаку типа «человек посередине» — значение по умолчанию для ssl.endpoint.identification.algorithm изменено на https;
  • запрос Select * теперь работает в n-сторонних соединениях с повторными разделами и несколькими уровнями вложенности, чтобы верно находить все исходные и дочерние узлы;
  • отклонены несовпадения десятичных дроби в топиках AVRO – теперь при наличии таких данных, выполняется попытка их преобразования в схему ksql. Если это невозможно, то значение пропускается и регистрируется исключение apache.kafka.common. errors.SerializationException в логе потоков;
  • исправлена ошибка с многоколоночными ключами в перегруппировке из-за несоответствия исходной схемы и схемы проекции. Теперь оператор SELECT * можно применять с исходной схемой без учета порядка ключей в условии PARTITION BY. А в проекции SELECT, где ключи упорядочены не так, как в источнике, они будут переупорядочены для пользователя в соответствии с исходной схемой.
  • Исправлены ошибки несовместимости с новыми версиями jetty/jackson, netty. Рекомендуется также обновить оболочку Maven и использовать Java Base64 вместо Jersey.

Однако, в свежем релизе ksqlDB 0.19.0 выполнен не только баг-фиксинг, но и добавлены 10 новых функций, подробнее о которых мы расскажем далее.

Новые фичи KSQL для приложений Apache Kafka Streams

В ksqlDB 0.19.0 добавлены следующие функциональные возможности:

  • таймаут простоя сервера, который могут задавать сами пользователи, чтобы обеспечить способ длительной потоковой передачи push-запросов, не позволяя отключенным клиентам слишком долго удерживать ресурсы сервера. Ранее один push-запрос мог передаваться в потоковом режиме не более 10 минут за раз, после чего сервер закрывал соединение.
  • функция NULLIF, которая сравнивает аргументы и возвращает NULL, если они равны, аналогично как в других SQL-платформах. Если сравниваемые значения отличаются, NULLIF возвращает первое из них. Это особенно полезно для преобразования в NULL таких значений, как пустая строка, контрольное значение или ноль.
  • масштабируемые физические операторы push-запросов. В частности, введен оператор PeekStreamOperator, который регистрирует ProcessingQueue с помощью ScalablePushRegistry. Вместе с существующими операторами pull-запросов (ProjectOperator и SelectOperator) это позволяет создать полный план выполнения. Новые операторы созданы с помощью PushPhysicalPlanBuilder, создающего PushPhysicalPlan, который фактически и выполняет запрос асинхронно в контексте Vertx. При этом для всей передачи строк не требуются какие-либо выделенные потоки, позволяя запросам выполняться одновременно и продолжительное время, не нагружая пулы потоков. Кроме того, общие операторы, которые используются как pull-, так и в push-запросах перемещены в единый пакет.
  • масштабируемая маршрутизация push-запросов, чтобы решить проблему невыполнения запросов при перебалансировке. Ранее push-запрос в ksqlDB определял набор хостов во время запуска, к которым нужно подключиться, и далее придерживается их. Теперь PushRouting делает все асинхронно, используя Vertx Context и CompletableFutures для обработки, не требуя пула потоков. Также добавлен клиента http2 для KsqlClient, нужный для выдачи запросов http2 как для запросов /query-stream между экземплярами кластера.
  • ScalablePushRegistry, который хранится в PersistentQueryMetadata и просматривает строки, до конца топологии приложения Kafka Streams. ProcessingQueue – это объект, зарегистрированный для данного масштабируемого push-запроса с помощью ScalablePushRegistry, и где предлагаются строки.
  • класс для вычисления важных метаданных, чтобы фильтровать анонимные запросы по физическому кластеру, организации и времени создания;
  • точность для десятичных знаков Avro в Kafka Connect, если в схеме явно не указано иное. Это необходимо, чтобы ksqlDB отражал поведение реестра схемы с точностью десятичных знаков до 64. Генерация схем вывод ksqlDB осталась прежней: точность явно записывается в схему, даже если использовалось значение по умолчанию.
  • возможность определять условие соединения таблиц по внешнему ключу, что позволяет реализовать связь «много-ко-многим», что часто требуется при нормализации схемы реляционной СУБД путем разложения данных на несколько таблиц и их соединение через внешний ключ. Ранее это было возможно, если строки в каждой из соединяемых таблиц имеют одинаковый первичный ключ. Также добавлено построение физического плана для соединений таблиц по внешнему ключу и поддержка ungate.Как теперь это работает, мы детально рассмотрим далее.

JOIN-соединения по внешнему ключу в SQL-движке Apache Kafka

Рассмотрим пример соединения таблиц users и orders, данные которых хранятся в топиках Apache Kafka:

CREATE TABLE orders (

     id INT PRIMARY KEY,

     user_id INT,

     value INT

   ) WITH (

     KAFKA_TOPIC = ‘my-orders-topic’,

     VALUE_FORMAT = ‘JSON’,

     PARTITIONS = 2

   );

 

CREATE TABLE users (

     u_id INT PRIMARY KEY,

     name VARCHAR,

     last_name VARCHAR

   ) WITH (

     KAFKA_TOPIC = ‘my-users-topic’,

     VALUE_FORMAT = ‘JSON’,

     PARTITIONS = 3

   );

 

CREATE TABLE orders_with_users AS

SELECT * FROM orders JOIN users ON user_id = u_id

EMIT CHANGES;

Можно указать любой столбец левой таблицы в условии соединения, чтобы выразить соответствие первичному ключу правой таблицы. Например, в кейсах аналитики больших данных, связанных с OLAP-запросами, можно рассматривать левую таблицу как таблицу фактов, а правую таблицу – как таблицу измерений в звездных схемах моделирования данных.

В данном примере выражение ON использует неключевой столбец в левой части вместо первичного ключа левой входной таблицы. Таблица результатов унаследует первичный ключ левой входной таблицы:

ORDERS_WITH_USERS <ID INT PRIMARY KEY, USER_ID INT, VALUE BIGINT, U_ID INT, NAME VARCHAR, LAST_NAME VARCHAR>

ksqlDB курсы Kafka, обучение Kafka Streams для разработчиков, обучение Apache Kafka, курсы Apache Kafka, обучение KSQL, курсы KSQL, Школа Больших Данных Учебный центр Коммерсант
Соединение по внешнему ключу в ksqlDB: обучение Kafka Streams для разработчиков

Подобно соединениям таблицы с первичным ключом, каждый раз при обновлении левой или правой входной таблицы, таблица результатов будет тоже обновляться. Поскольку соединение по внешнему ключу реализует связь «много-к-одному», обновление правой входной таблицы может привести к обновлению нескольких строк в таблице результатов. А обновление левой входной таблицы вызовет обновление одной строки в таблице результатов, подобно соединению таблицы с первичным ключом.

Однако, в отличие от соединений таблиц по первичному ключу, JOIN-запросы по внешнему ключу не требуют совместного разделения: каждая таблица может иметь разное количество разделов (партиций), как и рассмотрено в нашем примере. Таблица результатов унаследует количество разделов левой входной таблицы. Единственное ограничение в том, что левая часть условия соединения должна быть столбцом, а не выражением, содержащим столбец.

Соединения по внешнему ключу могут применять семантику INNER или LEFT OUTER, а по первичному ключу поддерживают INNER, LEFT OUTER и FULL OUTER. Кроме того, соединения по внешнему ключу нельзя использовать как часть n-стороннего соединения [2].

Другие практические особенности использования ksqlDB и других компонентов платформы Apache Kafka для разработки распределенных приложений потоковой аналитики больших данных вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://github.com/confluentinc/ksql/blob/master/CHANGELOG.md#0190-2021-06-08
  2. https://www.confluent.io/blog/ksqldb-0-19-adds-data-modeling-foreign-key-joins/

 

Поиск по сайту