Java-хуки Apache Kafka для UDF-функций ksqlDB

Kafka курсы примеры обучение, Kafka для разработчика, Kafka SQL ksql ksqlDB, Kafka примеры курсы обучение дата-инженеров, Школа Больших Данных Учебный Центр Коммерсант

Как расширить возможности ksqlDB, реализовав пользовательскую функцию обработки данных, хранящихся в топиках Kafka, с помощью SQL-запросов: ликбез по UDF и практический пример.

Пользовательские функции в ksqlDB для работы с данными в топиках Apache Kafka

Поскольку Apache Kafka – то не просто брокер сообщений, а целая экосистема потоковой передачи событий, вокруг нее существует множество компонентов. Одним из них является ksqlDB – клиент-серверная база данных, которая позволяет работать с данными в топиках Apache Kafka с помощью SQL-запросов. Эта БД имеет широкий набор встроенных функций, а также позволяет разработчику реализовать собственные с использованием Java-перехватчиков (хуков, hook). Поскольку SQL имеет систему типов, которая независима от Java, пользовательские функции (UDF, User Defined Function) должны использовать определенные типы Java, чтобы ksqlDB могла сопоставлять типы данных SQL с типами данных Java.

Чтобы выбрать, какие классы загружать в качестве пользовательских функций, во время запуска ksqlDB сканирует JAR-файлы в своем каталоге расширений в поисках классов с аннотацией UDF. Каждая найденная функция анализируется и загружается в ksqlDB. Каждый экземпляр функции имеет свой собственный загрузчик child-first ClassLoader, который изолирован от других функций. Если в UDF используются сторонние библиотеки, они также должны быть частью JAR-пакета/ Это означает, что разработчику нужно создать Uber-JAR – JAR-файл, содержащий как само приложение, так и все его зависимости, упакованные в один файл. Это позволит избежать загрузки дополнительных библиотек при запуске приложения, поскольку все необходимые классы уже присутствуют внутри Uber-JAR. В ksqlDB классы в пользовательском Uber-JAR загружаются в первую очередь по сравнению с любыми классами в classpath, кроме того, что является частью пакетов org.apache.kafka и io.confluent.

UDF загружаются только один раз при запуске сервера ksqlDB, который не поддерживает горячую перезагрузку пользовательских функций. Поэтому при изменении кода UDF, придется создать новый Uber-JAR, заменив предыдущий, и перезапустить сервер. При кластерном развертывании ksqlDB разные узлы могут одновременно запускать разные версии UDF.

Для определения того, какие классы будут в UDF, используются аннотации. Также они нужны для отображения полезных метаданных. Например, когда класс аннотирован @UdfDescription, он сканируется на наличие любых публичных методов, аннотированных @Udf. Если он соответствует, класс загружается как скалярная, табличная или агрегатная функция. Скалярная функция потребляет одну строку в качестве входных данных и производит одну строку в качестве выходных данных. Это подходит для простого преобразования значений. Табличная функция (UDTF) принимает одну строку в качестве входных данных и выдает ноль или более строк в качестве выходных данных. Такое плоское отображение полезно, когда значение представляет собой множество меньших значений и его нужно разбить на отдельные части для обработки. Наконец, функция агрегации (UDAF) потребляет одну строку за раз и поддерживает представление всех исторических данных с сохранением состояния. Это пригодится для объединения данных из нескольких строк вместе.

Параметры каждого метода могут быть опционально аннотированы @UdfParameter. Аннотация @Udf применяется к публичным методам класса, аннотированного с помощью @UdfDescription. Каждый аннотированный метод станет вызываемой функцией в SQL. Аннотация @UdfParameter применяется к параметрам методов, аннотированных с помощью @Udf. Таким образом, ksqlDB использует информацию в аннотации @UdfParameter для указания схемы параметров, если ее невозможно вывести из типа Java, и для передачи метаданных.

Функции агрегации обозначаются статическим методом с аннотацией @UdafFactory, которая отличается от скалярных и табличных функций. Поскольку агрегации должны реализовывать несколько методов, это помогает ksqlDB различать агрегации при использовании нескольких сигнатур типов. Статический фабричный метод должен возвращать либо пакет Udaf, либо TableUdaf из пакета io.confluent.ksql.function.udaf. Если типы агрегата или возвращаемых данных UDAF различаются в зависимости от типа входных данных, разработчику придется написать отдельную функцию, аннотированную @UdafFactory для каждого типа, либо переопределить методы initializeTypeArguments(List<SqlArgument> argTypeList), getAggregateSqlType()и getReturnSqlType().

Чтобы понять, как работают пользовательские функции ksqlDB, далее рассмотрим небольшой пример обработки данных из топиков Apache Kafka с помощью SQL-запросов.

Практический пример

Например, следующий код определяет UDF-функцию ksqlDB, которая позволяет выполнять специфическую бизнес-логику в потоках данных, обрабатываемых с использованием Apache Kafka и SQL. В нем представлены две версии пользовательской функции summarize() для сложения целых числе или контенкации строк. В зависимости от типа входных данных (целые числа или символы) будет использована соответствующая версия функции.

package com.example;

import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;

@UdfDescription(name = "summarize",
                author = "anna",
                version = "1.0.1",
                description = "A custom logic to summarize data")
public class SummarizeUdf {

    @Udf(description = "The first version of the summarize with integer parameters")
    public Integer summarize(@UdfParameter int v1, @UdfParameter int v2) {
        return v1 + v2;
    }

    @Udf(description = "The second variant of the summarize with chars")
    public String summarize(@UdfParameter String v1, @UdfParameter String v2) {
        return v1 + v2;
    }
}

В этом коде аннотация @UdfDescription даёт общее описание пользовательской функции, включая ее имя в ksqlDB, автора, версию и краткое описание. Класс SummarizeUdf содержит методы, реализующие логику пользовательской функции. Первый из них работает с целочисленными параметрами, принимая два целых числа (int v1 и int v2), и возвращает их сумму. Второй метод работает со строковыми параметрами, принимая две строки (String v1 и String v2) и возвращает их объединение, т.е. конкатенацию. Аннотация @Udf указывает на то, что это ещё одна реализация пользовательской функции, а описание поясняет, что метод работает со строками.

Эти функции могут быть вызваны в KSQL запросах для выполнения соответствующих операций над данными, хранящимися в потоках Kafka. Например, есть поток данных transactions, который содержит информацию о транзакциях с полями transaction_id, amount1, и amount2, где amount1 и amount2 — это целые числа. Пусть данные поступают в Kafka в топик transactions-topic в формате JSON. Чтобы создать в ksqlDB новый поток, который будет содержать суммы этих двух полей, надо выполнить типовой SQL-запрос на создание таблицы:

CREATE STREAM transactions (
  transaction_id VARCHAR,
  amount1 INT,
  amount2 INT
) WITH (
  KAFKA_TOPIC='transactions-topic',
  VALUE_FORMAT='JSON'
);

Для получения суммы amount1 и amount2 воспользуемся UDF-функцией summarize, выполнив операцию выборки при создании потоковой таблицы:

CREATE STREAM summarized_transactions AS
SELECT
  transaction_id,
  summarize(amount1, amount2) AS total_amount
FROM transactions;

Теперь предположим, что в Kafka есть другой топик users-topic с данными в формате JSON о пользователях — поток данных users, содержащий поля user_id, first_name и last_name. Чтобы создать в ksqlDB новый поток, который будет содержать полное имя пользователя, выполним DDL-запрос:

CREATE STREAM users (
  user_id VARCHAR,
  first_name VARCHAR,
  last_name VARCHAR
) WITH (
  KAFKA_TOPIC='users-topic',
  VALUE_FORMAT='JSON'
);

Теперь воспользуемся UDF-функцией summarize для конкатенации имени и фамилии пользователя, т.е. параметров first_name и last_name, создав потоковую таблицу ksqlDB:

CREATE STREAM full_names AS
SELECT
  user_id,
  summarize(first_name, last_name) AS full_name
FROM users;

Этот SQL-запрос создаст новый поток full_names, который включает идентификатор пользователя и его полное имя, составленное с помощью пользовательской функции summarize.

Разумеется, любое нетиповое поведение системы, расширяемое с помощью пользовательских функций, чревато рисками нарушения безопасности. Поэтому в некоторых средах развертывания может потребоваться ограничить классы, к которым UDF имеют доступ. Чтобы уменьшить поверхность атаки пользовательских функций ksqlDB, можно внести некоторые классы и пакеты в черный список, чтобы их нельзя было использовать из UDF. Пример черного списка находится в каталоге расширений в текстовом файле resource-blacklist.txt. По умолчанию все записи в нем закомментированы. Этот файл содержит одну запись на строку, где каждая строка — это класс или пакет, который следует внести в черный список. Сопоставление имен основано на регулярном выражении. Поэтому если есть запись, например, java.lang.Process, это соответствует всем путям, начинающимся с java.lang.Process: java.lang.Process, java.lang.ProcessBuilder и пр. Если нужно добавить в черный список один класс, например, java.lang.Compiler следует внести в файл resource-blacklist.txt запись java.lang.Compiler$. Любые пустые строки или строки, начинающиеся с символа # в этом файле игнорируются.

По умолчанию ksqlDB устанавливает простой менеджер безопасности Java для выполнения пользовательских функций. Он блокирует попытки любых функций разветвить процессы с сервера ksqlDB и не позволяет им вызывать System.exit(..). Можно отключить менеджер безопасности, установив конфигурации ksql.udf.enable.security.manager значение false. Также можно отключить загрузку всех UDF в каталоге расширений, установив ksql.udfs.enabled в значение false. По умолчанию эти параметры включены. Для оптимизации производительности сбор метрик пользовательских функций по умолчанию отключен, но его можно включить, установив конфигурацию ksql.udf.collect.metrics равной true. Однако, это не рекомендуется для производственного использования, поскольку метрики собираются при каждом вызове и вносят некоторые накладные расходы во время обработки

Научитесь администрированию и эксплуатации Apache Kafka на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://docs.ksqldb.io/en/latest/concepts/functions/
  2. https://docs.ksqldb.io/en/latest/reference/user-defined-functions/
  3. https://docs.ksqldb.io/en/latest/how-to-guides/create-a-user-defined-function/
Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту