UDF во фреймворках Big Data: благо или необходимое зло?

Kafka Spark Flink SQL курсы примеры обучение, Kafka Spark Flink Greenplum Clickhouse для разработчика, Kafka Flink Spark Greenplum ClickHouse SQL, Big Data UDF примеры курсы обучение дата-инженеров, Школа Больших Данных Учебный Центр Коммерсант

Почему пользовательские функции лучше применять как можно реже, каковы их возможности и ограничения: краткий обзор особенностей разработки и эксплуатации UDF в Apache Spark SQL, ksqlDB, Flink SQL, Greenplum и ClickHouse.

Чем полезны и опасны пользовательские функции в обработке больших данных?

Пользовательские функции (User-Defined Functions, UDF) позволяют разработчику расширить возможности фреймворка, реализовав собственные функции для обработки данных. Большинство современных фреймворков это позволяют. Например, в ksqlDB – базе данных, используемой для обработки сообщений, опубликованных в топики Apache Kafka посредством SQL-запросов, пользовательские функции реализуются с использованием Java-перехватчиков (хуков, hook). Все интерфейсы Apache Spark (PySpark, Java, Scala) тоже позволяют написать собственную функцию и повторно использовать ее для обработки нескольких датафреймов. API таблиц PyFlink поддерживает преобразования данных с помощью UDF-функций Python, обрабатывая по одной строке за раз или целым пакетом в векторизованных UDF.

Для демонстрации рассмотрим небольшой пример UDF на PySpark. Предположим, есть поток данных о транзакциях, и нужно определить их добросовестность на основе определённого набора правил. Например, транзакции на сумму свыше 100,000, совершенные в нерабочее время с 00:00 до 06:00 считаются подозрительными. Для проверки этих условий напишем собственную пользовательскую функцию на Python, используя Spark Structured Streaming для чтения входящих данных. Источником данных о транзакциях является топик Kafka под названием transactions, куда они публикуются в формате JSON. Следующий PySpark-код с использованием UDF будет считывать данные из Kafka, преобразовывать их из JSON в датафрейм, добавлять столбец с признаком подозрительности и выводить результат в консоль.

# Импорт пакетов
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import col, from_json, hour
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

# Создание сеанса Spark
spark = SparkSession.builder \
    .appName("UDF Example") \
    .getOrCreate()

# Схема для JSON данных
schema = StructType([
    StructField("transaction_id", StringType(), True),
    StructField("amount", DoubleType(), True),
    StructField("timestamp", TimestampType(), True)
])

# Функция для проверки подозрительных транзакций
def is_suspicious_transaction(amount, hour):
    return amount > 100000 and (hour >= 0 and hour < 6)

# Регистрация UDF
is_suspicious_transaction_udf = udf(is_suspicious_transaction, BooleanType())

# Чтение данных из Kafka
transactions_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "transactions") \
    .load()

# Преобразование данных из Kafka (значение в JSON формат)
transactions_json_df = transactions_df.selectExpr("CAST(value AS STRING) as json")

# Преобразование JSON строки в датафрейм с использованием схемы
transactions_parsed_df = transactions_json_df.select(from_json(col("json"), schema).alias("data")).select("data.*")

# Добавление столбца с признаком подозрительных транзакций с использованием UDF
transactions_with_suspicious_flag_df = transactions_parsed_df \
    .withColumn("is_suspicious", is_suspicious_transaction_udf(col("amount"), hour(col("timestamp"))))

# Вывод результата
query = transactions_with_suspicious_flag_df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

# Ожидание завершения потока
query.awaitTermination()

Впрочем, можно обойтись и без UDF, используя встроенные функции Spark SQL для вычисления признаков подозрительных транзакций. В этом случае код будет выглядеть так:

# Импорт пакетов
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, hour, when
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

# Создание сеанса Spark
spark = SparkSession.builder \
    .appName("Without UDF Example") \
    .getOrCreate()

# Схема для JSON данных
schema = StructType([
    StructField("transaction_id", StringType(), True),
    StructField("amount", DoubleType(), True),
    StructField("timestamp", TimestampType(), True)
])

# Чтение данных из Kafka
transactions_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "transactions") \
    .load()

# Преобразование данных из Kafka (значение в JSON формат)
transactions_json_df = transactions_df.selectExpr("CAST(value AS STRING) as json")

# Преобразование JSON строки в датафрейм с использованием схемы
transactions_parsed_df = transactions_json_df.select(from_json(col("json"), schema).alias("data")).select("data.*")

# Добавление столбца с признаком подозрительных транзакций без использования UDF
transactions_with_suspicious_flag_df = transactions_parsed_df \
    .withColumn("is_suspicious", 
                when((col("amount") > 100000) & (hour(col("timestamp")).between(0, 6)), True).otherwise(False))

# Вывод результата
query = transactions_with_suspicious_flag_df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

# Ожидание завершения потока
query.awaitTermination()

Вместо пользовательской функции применяются операторы when и between для создания столбца is_suspicious, который равен True для подозрительных транзакций и False в противном случае. Код даже выглядит более лаконичным и понятным. Впрочем, этот пример довольно прост и не охватывает все возможные случаи, когда от UDF может быть больше пользы, чем вреда (хотя не факт)).

UDF есть на уровне хранения данных. К примеру, колоночная база данных ClickHouse может вызывать любую внешнюю исполняемую программу или скрипт для обработки данных согласно конфигурации исполняемых UDF, заданных в xml-файлах, путь к которой указан в параметре user_defined_executable_functions_config. База данных Greenplum основанная на PostgreSQL, тоже поддерживает пользовательские функции и процедуры. ETL-фреймворки Apache AirFlow и NiFi позволяют написать собственные операторы и процессоры для реализации нетривиальной логики обработки данных.

Таким образом, UDF-функции сегодня стали неотъемлемой частью практически любой платформы. Однако, до сих пор их использование сопровождается некоторыми сложностями. Например, в ksqlDB очень неудобно проверять и отлаживать пользовательские функции, т.к. они загружаются при запуске сервера и при изменении кода требуют пересоздания JAR-пакетов, а также перезапуска сервера. В PySpark пользовательские функции не оптимизируются оптимизатором Apache Spark, поэтому их производительность намного ниже встроенных функций Spark SQL. Кроме того, в Spark пользовательские функции не поддерживают условные выражения в булевых выражениях и не принимают ключевые аргументы на вызывающей стороне.

В Apache Flink при использовании UDF-функций можно столкнуться сложностями при ее отладке и тестировании, несмотря на наличие тестовых наборов. Кроме того, при использовании SQL для обработки данных в Java-фреймворках, таких как Spark SQL, ksqlDB, Flink SQL и пр., придется использовать определенные типы данных в своих структурах, чтобы сопоставить типы данных SQL с типами данных Java.

Распределенный характер фреймворков обработки больших данных и хранилищ также накладывает некоторые ограничения на работу с пользовательскими функциями. К примеру, в реляционной СУБД Greenplum файлы общей библиотеки UDF-функций должны находиться в одном и том же месте на каждом хосте во всем кластере, включая координаторы, сегменты и зеркала. А в колоночной СУБД ClickHouse, несмотря на то, что функция может иметь сколько угодно параметров, ее имя должно быть уникальным среди других пользовательских и системных функций. При этом исполняемые пользовательские функции могут принимать постоянные параметры, настроенные в разделе command. Также требуется опция execute_direct, чтобы гарантировать отсутствие уязвимости расширения аргументов оболочки. Рекурсивные функции не допускаются. Все переменные, используемые функцией, должны быть указаны в списке ее параметров. Если какое-то из перечисленных ограничений будет нарушено, возникнет исключение. Некоторые UDF-функции могут выдавать исключение, если данные недействительны. В этом случае запрос отменяется, а клиенту возвращается текст ошибки. При распределенной обработке, когда на одном из серверов возникает исключение, другие серверы также пытаются прервать запрос.

Получается, главным правилом при работе с пользовательскими функциями является рекомендация применять их как можно реже, чтобы избежать ошибок, ограничений и неполного применения всех возможностей внутренних оптимизаций самой распределенной среды. Если же обойтись без UDF-функций никак не получается, разработчику следует использовать их очень аккуратно, версионировать и документировать.

Как это сделать наиболее эффективным образом, вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту