Почему пользовательские функции лучше применять как можно реже, каковы их возможности и ограничения: краткий обзор особенностей разработки и эксплуатации UDF в Apache Spark SQL, ksqlDB, Flink SQL, Greenplum и ClickHouse.
Чем полезны и опасны пользовательские функции в обработке больших данных?
Пользовательские функции (User-Defined Functions, UDF) позволяют разработчику расширить возможности фреймворка, реализовав собственные функции для обработки данных. Большинство современных фреймворков это позволяют. Например, в ksqlDB – базе данных, используемой для обработки сообщений, опубликованных в топики Apache Kafka посредством SQL-запросов, пользовательские функции реализуются с использованием Java-перехватчиков (хуков, hook). Все интерфейсы Apache Spark (PySpark, Java, Scala) тоже позволяют написать собственную функцию и повторно использовать ее для обработки нескольких датафреймов. API таблиц PyFlink поддерживает преобразования данных с помощью UDF-функций Python, обрабатывая по одной строке за раз или целым пакетом в векторизованных UDF.
Для демонстрации рассмотрим небольшой пример UDF на PySpark. Предположим, есть поток данных о транзакциях, и нужно определить их добросовестность на основе определённого набора правил. Например, транзакции на сумму свыше 100,000, совершенные в нерабочее время с 00:00 до 06:00 считаются подозрительными. Для проверки этих условий напишем собственную пользовательскую функцию на Python, используя Spark Structured Streaming для чтения входящих данных. Источником данных о транзакциях является топик Kafka под названием transactions, куда они публикуются в формате JSON. Следующий PySpark-код с использованием UDF будет считывать данные из Kafka, преобразовывать их из JSON в датафрейм, добавлять столбец с признаком подозрительности и выводить результат в консоль.
# Импорт пакетов from pyspark.sql import SparkSession from pyspark.sql.functions import udf from pyspark.sql.types import BooleanType from pyspark.sql.functions import col, from_json, hour from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType # Создание сеанса Spark spark = SparkSession.builder \ .appName("UDF Example") \ .getOrCreate() # Схема для JSON данных schema = StructType([ StructField("transaction_id", StringType(), True), StructField("amount", DoubleType(), True), StructField("timestamp", TimestampType(), True) ]) # Функция для проверки подозрительных транзакций def is_suspicious_transaction(amount, hour): return amount > 100000 and (hour >= 0 and hour < 6) # Регистрация UDF is_suspicious_transaction_udf = udf(is_suspicious_transaction, BooleanType()) # Чтение данных из Kafka transactions_df = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("subscribe", "transactions") \ .load() # Преобразование данных из Kafka (значение в JSON формат) transactions_json_df = transactions_df.selectExpr("CAST(value AS STRING) as json") # Преобразование JSON строки в датафрейм с использованием схемы transactions_parsed_df = transactions_json_df.select(from_json(col("json"), schema).alias("data")).select("data.*") # Добавление столбца с признаком подозрительных транзакций с использованием UDF transactions_with_suspicious_flag_df = transactions_parsed_df \ .withColumn("is_suspicious", is_suspicious_transaction_udf(col("amount"), hour(col("timestamp")))) # Вывод результата query = transactions_with_suspicious_flag_df \ .writeStream \ .outputMode("append") \ .format("console") \ .start() # Ожидание завершения потока query.awaitTermination()
Впрочем, можно обойтись и без UDF, используя встроенные функции Spark SQL для вычисления признаков подозрительных транзакций. В этом случае код будет выглядеть так:
# Импорт пакетов from pyspark.sql import SparkSession from pyspark.sql.functions import col, from_json, hour, when from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType # Создание сеанса Spark spark = SparkSession.builder \ .appName("Without UDF Example") \ .getOrCreate() # Схема для JSON данных schema = StructType([ StructField("transaction_id", StringType(), True), StructField("amount", DoubleType(), True), StructField("timestamp", TimestampType(), True) ]) # Чтение данных из Kafka transactions_df = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("subscribe", "transactions") \ .load() # Преобразование данных из Kafka (значение в JSON формат) transactions_json_df = transactions_df.selectExpr("CAST(value AS STRING) as json") # Преобразование JSON строки в датафрейм с использованием схемы transactions_parsed_df = transactions_json_df.select(from_json(col("json"), schema).alias("data")).select("data.*") # Добавление столбца с признаком подозрительных транзакций без использования UDF transactions_with_suspicious_flag_df = transactions_parsed_df \ .withColumn("is_suspicious", when((col("amount") > 100000) & (hour(col("timestamp")).between(0, 6)), True).otherwise(False)) # Вывод результата query = transactions_with_suspicious_flag_df \ .writeStream \ .outputMode("append") \ .format("console") \ .start() # Ожидание завершения потока query.awaitTermination()
Вместо пользовательской функции применяются операторы when и between для создания столбца is_suspicious, который равен True для подозрительных транзакций и False в противном случае. Код даже выглядит более лаконичным и понятным. Впрочем, этот пример довольно прост и не охватывает все возможные случаи, когда от UDF может быть больше пользы, чем вреда (хотя не факт)).
UDF есть на уровне хранения данных. К примеру, колоночная база данных ClickHouse может вызывать любую внешнюю исполняемую программу или скрипт для обработки данных согласно конфигурации исполняемых UDF, заданных в xml-файлах, путь к которой указан в параметре user_defined_executable_functions_config. База данных Greenplum основанная на PostgreSQL, тоже поддерживает пользовательские функции и процедуры. ETL-фреймворки Apache AirFlow и NiFi позволяют написать собственные операторы и процессоры для реализации нетривиальной логики обработки данных.
Таким образом, UDF-функции сегодня стали неотъемлемой частью практически любой платформы. Однако, до сих пор их использование сопровождается некоторыми сложностями. Например, в ksqlDB очень неудобно проверять и отлаживать пользовательские функции, т.к. они загружаются при запуске сервера и при изменении кода требуют пересоздания JAR-пакетов, а также перезапуска сервера. В PySpark пользовательские функции не оптимизируются оптимизатором Apache Spark, поэтому их производительность намного ниже встроенных функций Spark SQL. Кроме того, в Spark пользовательские функции не поддерживают условные выражения в булевых выражениях и не принимают ключевые аргументы на вызывающей стороне.
В Apache Flink при использовании UDF-функций можно столкнуться сложностями при ее отладке и тестировании, несмотря на наличие тестовых наборов. Кроме того, при использовании SQL для обработки данных в Java-фреймворках, таких как Spark SQL, ksqlDB, Flink SQL и пр., придется использовать определенные типы данных в своих структурах, чтобы сопоставить типы данных SQL с типами данных Java.
Распределенный характер фреймворков обработки больших данных и хранилищ также накладывает некоторые ограничения на работу с пользовательскими функциями. К примеру, в реляционной СУБД Greenplum файлы общей библиотеки UDF-функций должны находиться в одном и том же месте на каждом хосте во всем кластере, включая координаторы, сегменты и зеркала. А в колоночной СУБД ClickHouse, несмотря на то, что функция может иметь сколько угодно параметров, ее имя должно быть уникальным среди других пользовательских и системных функций. При этом исполняемые пользовательские функции могут принимать постоянные параметры, настроенные в разделе command. Также требуется опция execute_direct, чтобы гарантировать отсутствие уязвимости расширения аргументов оболочки. Рекурсивные функции не допускаются. Все переменные, используемые функцией, должны быть указаны в списке ее параметров. Если какое-то из перечисленных ограничений будет нарушено, возникнет исключение. Некоторые UDF-функции могут выдавать исключение, если данные недействительны. В этом случае запрос отменяется, а клиенту возвращается текст ошибки. При распределенной обработке, когда на одном из серверов возникает исключение, другие серверы также пытаются прервать запрос.
Получается, главным правилом при работе с пользовательскими функциями является рекомендация применять их как можно реже, чтобы избежать ошибок, ограничений и неполного применения всех возможностей внутренних оптимизаций самой распределенной среды. Если же обойтись без UDF-функций никак не получается, разработчику следует использовать их очень аккуратно, версионировать и документировать.
Как это сделать наиболее эффективным образом, вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве: