13 сентября 2023 года вышел Apache Spark 3.5. Знакомимся с самыми важными новинками свежего релиза: расширения Spark Connect и SQL, поддержка DeepSpeed, улучшения потоковой передачи и свежие UDF-функции Python.
ТОП-5 новинок Apache Spark 3.5.0
В Apache Spark 3.5. добавлено много исправлений и улучшений, а также реализованы новые функции. Наиболее интересными из них с точки зрения дата-инженера можно назвать следующие:
- Spark Connect поддерживает больше сценариев с общедоступностью клиента Scala, поддержкой распределенного обучения и вывода, паритетом API Pandas в SPARK и улучшенной совместимостью для структурированной потоковой передачи.
- функции SQL для управления массивами, расширенная поддержка функций SQL для API Scala, Python и R, поддержка именованных аргументов для вызовов функций SQL;
- агрегаты, а также оптимизированные для Arrow пользовательские функции Python, табличные UDF-функции Python, API тестирования PySpark и расширенные классы ошибок в PySpark;
- распределенное обучение с помощью DeepSpeed в кластерах Spark и английский SDK для Apache Spark, о котором мы писали здесь;
- улучшения потоковой передачи, включая новый API и изменения в провайдере key-value базы данных RocksDB, которые уменьшают компромиссы по сравнению с другими резидентными хранилищами состояний.
Рассмотрим некоторые из этих новинок более подробно.
Улучшения Spark Connect
Напомним, Spark Connect представляет собой клиент-серверную архитектуру, которая позволяет разделить клиентский и серверный компоненты, обеспечивая удаленное подключение к кластерам Spark через API DataFrame и неразрешенные логические планы в качестве протокола. Благодаря такому разделению между клиентом и сервером Spark, его можно использовать откуда угодно, включая различные IDE, интерактивные среды выполнения типа Google Colab и Jupyter Notebook, а также различные языки программирования. Кроме того, в эти среды можно встроить Spark Connect, чтобы обеспечить бесшовную интеграцию со Spark. Впервые полноценная поддержка Spark Connect была выпущена в версии 3.4.0, опубликованной в апреле 2023 года, о чем мы писали здесь.
С момента выпуска релиза 3.4.0 было выполнено более 600 улучшений Spark Connect. Ключевым результатом для фреймворка в релизе 3.5 и его компонента Spark Connect является общедоступность клиента Scala, реализованного с помощью рефакторинга подмодуля SQL. Теперь он разделен на клиентский (SQL-API) и серверный модули, чтобы уменьшить набор зависимостей, необходимых клиенту для изоляции пути к классам. До выпуска Spark 3.5 было невозможно использовать библиотеку машинного обучения MLlib напрямую с Spark Connect, поскольку он использует шлюз Py4J, требующий совместного размещения клиентского приложения. В Spark 3.5 можно выполнять распределенное обучение и логический вывод с помощью Spark Connect с использованием новой платформы распределенного выполнения на основе PyTorch. В настоящее время этот модуль поддерживает классификаторы логистической регрессии, преобразователи базовых функций, оценщики базовых моделей, конвейеры машинного обучения и кросс-валидацию. Эта платформа легко интегрируется с векторизованной платформой Python UDF в Spark, расширяя ее возможностью выполнения UDF с использованием барьерного режима выполнения.
Также в релиз 3.5.0 добавлена поддержка управления зависимостями Python на основе сеансов Spark Connect. Эта новая функция позволяет динамически обновлять зависимости Python во время выполнения. Как именно это работает, читайте в нашей новой статье.
Новые возможности PySpark
В выпуске Apache Spark 3.5.0 представлены значительные улучшения PySpark, включая оптимизированные для Arrow пользовательские функции Python, в т.ч. табличные (UDTF), улучшенные сообщения об ошибках и новый API тестирования, который значительно повышает удобство использования, производительность и тестируемость PySpark-приложений. Теперь UDF- функции Python будут использовать колоночный формат Apache Arrow для повышения производительности, когда для конфигурации spark.sql.execution.pythonUDF.arrow.enabled установлено значение True, или когда для параметра useArrow установлено значение True с помощью декоратора UDF. Благодаря этой оптимизации пользовательские функции Python могут работать до 2 раз быстрее на современных архитектурах ЦП, благодаря векторизованному вводу-выводу. Рассмотрим, как включить возможность использования Apache Arrow через установку конфигурации:
spark.conf.set("spark.sql.execution.pythonUDF.arrow.enabled", True)
или в коде PySpark-приложения через декоратор:
@udf("integer", useArrow=True) def my_len_udf(s: str) -> int: return len(s)
Пользовательская табличная функция (UDTF, User Defined Table Function) — это тип определяемой пользователем функции, которая возвращает всю выходную таблицу вместо одного скалярного значения результата. Пользователи PySpark теперь могут писать свои собственные UDTF, интегрируя свою логику Python, и использовать их в PySpark и SQL.
Также Apache Spark 3.5 представляет новые служебные функции API для проверки равенства DataFrame, включая подробные сообщения об ошибках теста с цветовой кодировкой, которые четко указывают на различия между схемами датафрейма и данными внутри него. Эти функции (pyspark.testing.assertDataFrameEqual, pyspark.testing.assertPandasOnSparkEqual, pyspark.testing.assertSchemaEqual) позволяют разработчикам легко добавлять тесты на равенство, которые дают практические результаты для их приложений и повышают производительность.
Наконец, добавлены расширенные сообщения об ошибках в PySpark. Ранее набор исключений, создаваемых драйвером Python Spark, не использовал классы ошибок, представленные в Apache Spark 3.3. Теперь все ошибки из DataFrame и SQL перенесены в соответствующий пакет и содержат нужные классы и коды ошибок.
Новинки Spark SQL
Apache Spark 3.5 представляет множество новых функций и улучшений SQL, упрощая создание запросов с помощью API-интерфейсов SQL и DataFrame. В частности, новые встроенные функции SQL для управления массивами помогают пользователям легко манипулировать их значениями вместо создания собственных UDF. А предложение IDENTIFIER позволяет гибко и безопасно создавать новые шаблоны SQL-запросов без риска атак через SQL-инъекции. Например, предложение IDENTIFIER для строковых литералов с указанием имен таблиц/столбцов/функций можно использовать вместе с функцией параметров запроса, добавленной в предыдущем выпуске Spark:
spark.sql( "CREATE TABLE IDENTIFIER(:tbl)(col INT) USING json", args = { "tbl": "my_schema.my_tbl" } ) spark.sql( "SELECT IDENTIFIER(:col) FROM IDENTIFIER(:tbl)", args = { "col": "col", "tbl": "my_schema.my_tbl" } ).show()
До версии 3.5 в Spark было множество функций SQL, недоступных в API-интерфейсах Scala, Python или R DataFrame. В выпуске 3.5 представлено более 150 функций SQL в API DataFrame, включая поддержку именованных аргументов для вызовов функций SQL. Теперь в SQL-запросах, подобно Python-скрипту, можно вызывать функции с именами параметров, предшествующими их значениям. Это соответствует спецификации стандарта SQL и приводит к более четкому и надежному языку запросов, когда функция имеет много параметров и/или некоторые параметры имеют значения по умолчанию, например:
SELECT mask( 'AbCD123-@$#', lowerChar => 'q', upperChar => 'Q', digitChar => 'd')
Наконец, Apache Spark 3.5 включает новые функции SQL для точного и эффективного подсчета уникальных значений внутри групп, включая сохранение результатов промежуточных вычислений в буферах эскизов, которые можно сохранить в хранилище и позже загрузить обратно. В этих реализациях используется библиотека Apache Datasketches для простой интеграции с другими инструментами. Подробнее об этой библиотеке стохастических алгоритмов читайте в нашей новой статье.
Новости для специалистов по Data Science
В релизе 3.5.0 в Python-интерфейсе PySpark добавлен модуль DeepspeedTorchDistributor, который упрощает распределенное обучение с помощью библиотеки DeepSpeed в кластерах Spark. Эта библиотека оптимизации глубокого обучения с открытым исходным кодом для PyTorch снижает вычислительную мощность и потребление памяти. Она предназначена для обучения больших распределенных моделей с лучшим параллелизмом. Модуль DeepspeedTorchDistributor является расширением модуля TorchDistributor, выпущенного в Apache Spark 3.4. Под капотом DeepspeedTorchDistributor инициализирует среду и каналы связи, необходимые для DeepSpeed. Модуль поддерживает распределение обучающих заданий как в одноузловых кластерах с несколькими графическими процессорами, так и в многоузловых кластерах графических процессоров.
Также в релиз 3.5.0 добавлен англоязычный набор инструментов разработчика (English SDK for Apache Spark), который компилирует инструкции на английском языке в датафреймы Python. English SDK основан на PySpark-AI, Python-оболочке, которая использует модели генеративного языка для упрощения генерации кода PySpark. Принимая инструкции на английском языке, он объединяет возможности фреймворка с такими моделями, как GPT-4 и GPT-3.5. PySpark-AI принимает на вход англоязычные инструкции и выполняет их, позволяя пользователю сфокусироваться на обработке данных, а не на кодировании. Подробнее об этом мы писали здесь.
Потоковая передача
Предыдущая версия фреймворка позволяла выполнять операции с отслеживанием состояния (агрегацию, дедупликацию, соединения потоков и т. д.) несколько раз в одном и том же запросе, включая агрегирование временных окон по цепочке. Однако, соединение временных интервалов потока с последующим оператором с отслеживанием состояния не поддерживалось. Это ограничение снято в релизе 3.5, чтобы обеспечить более сложные рабочие нагрузки, например соединение потоков рекламы и кликов, а также агрегирование по временному интервалу.
Кроме того, в релизе 3.5.0 представлен новый механизм контрольных точек для провайдера хранилища состояний RocksDB под названием Changelog Checkpointing, который сохраняет журнал изменений состояния. Это значительно уменьшает задержку фиксации и снижает сквозную задержку. Чтобы включить эту функцию, надо установить для свойства конфигурации spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled значение true. Примечательно, что включить новую функцию можно даже с существующими контрольными точками.
Также в Apache Spark реализовано более детальное управление памятью, которое позволяет пользователям ограничивать общее использование памяти экземплярами RocksDB в одном процессе-исполнителе, позволяя анализировать и настраивать использование памяти для каждого процесса-исполнителя. Это повышает производительность и надежность использования key-value СУБД в качестве хранилища состояний для stateful-приложений.
Для дедупликации событий в потоковых запросах представлен новый API dropDuplicatesWithinWatermark(), который позволяет обрабатывать ситуацию, с дублирующимися событиями, даже если их фактические отметки времени немного отличаются. Такое часто случается, когда данные потребляются из топика Kafka, будучи отправлены туда неидемпотентным продюсером, а в качестве времени события используется автоматическая отметка времени записи.
Освойте возможности Apache Spark для разработки приложений аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Основы Apache Spark для разработчиков
- Потоковая обработка в Apache Spark
- Анализ данных с Apache Spark
- Машинное обучение в Apache Spark
- Графовые алгоритмы в Apache Spark
Источники