12 апреля 2023 года вышел очередной релиз Apache Spark. Разбираемся с самыми главными новинками этого выпуска, которые порадуют аналитиков, разработчиков, инженеров данных и специалистов по Data Science. Расширенная поддержка Python, улучшения Spark SQL и Structured Streaming.
Обновления Spark SQL и новинки для пользователей Python
Apache Spark 3.4.0 — это пятый выпуск линейки 3.x, в котором более 2500 изменений: новые фичи, исправления ошибок и улучшения ранее существовавших возможностей. Разработчики, дата-инженеры и специалисты по Data Science, которые активно используют Python, будут особенно довольны. Специально для них в этом релизе представлен клиент Python для Spark Connect, структурированная потоковая передача дополнена асинхронным отслеживанием хода выполнения и произвольной обработкой состояния, расширен охват API Pandas и реализована поддержка ввода NumPy. В частности, теперь экземпляры NumPy официально поддерживаются в PySpark, позволяя создавать датафреймы (spark.createDataFrame) и предоставлять их в качестве входных данных в выражениях SQL и даже для машинного обучения.
Также в версию 3.4 модуль TorchDistributor добавлен в PySpark, чтобы Data Scientist мог проводить распределенное машинное обучение своих моделей с помощью PyTorch в кластерах Spark. Под капотом модуль TorchDistributor инициализирует среду и каналы связи между рабочими узлами и использует команду CLI-интерфейса torch.distributed.run для запуска распределенного обучения между рабочими узлами. Модуль поддерживает распределение ML-заданий как на одном узле с несколькими графическими процессорами, так и на многоузловых кластерах GPU.
Введена поддержка значений DEFAULT для столбцов в таблицах: SQL-запросы теперь поддерживают указание значений по умолчанию для столбцов таблиц в форматах CSV, JSON, ORC, Parquet. Эта функция работает сразу во время создания таблицы и может быть применена позже. Последующие команды INSERT, UPDATE, DELETE и MERGE могут впоследствии ссылаться на значение по умолчанию любого столбца, используя явное ключевое слово DEFAULT. Если какое-либо назначение INSERT имеет явный список с меньшим количеством столбцов, чем целевая таблица, соответствующие значения столбца по умолчанию будут заменены на оставшиеся столбцы или пропуски (NULL), если значение по умолчанию не указано.
Вообще в Apache Spark 3.4 теперь можно использовать ссылки на псевдонимы столбцов в SQL-запросах SELECT. Эта функция обеспечивает значительное удобство при составлении запросов, заменяя необходимость написания сложных подзапросов и общих табличных выражений. Дополнительным преимуществом, связанным с SQL-запросами, в новой версии стала их параметризация. Это делает запросы более пригодными для повторного использования и повышает безопасность за счет предотвращения SQL-атак путем внедрения кода. SparkSession API расширен за счет переопределения метода sql, который принимает сопоставление, где ключи — это имена параметров, а значения — литералы Scala/Java:
def sql(sqlText: String, args: Map[String, Any]): DataFrame
Благодаря этому расширению текст SQL-команды теперь может включать именованные параметры в любых позициях, где разрешены константы, такие как литеральные значения. Пример такой параметризации SQL-запроса выглядит следующим образом:
spark.sql( sqlText = "SELECT * FROM tbl WHERE date > :startDate LIMIT :maxRows", args = Map( "startDate" -> LocalDate.of(2022, 12, 1), "maxRows" -> 100))
Как это использовать на практике, читайте в нашей новой статье с примерами.
До версии Apache Spark 3.4 API Dataset предоставлял метод PIVOT, но не его обратную операцию MELT. Теперь это упущение исправлено: можно развернуть DataFrame из широкого формата, сгенерированного PIVOT, в исходный длинный формат, опционально оставив установленными столбцы идентификаторов. Это противоположность операции groupBy(…).pivot(…).agg(…), за исключением необратимой агрегации. Эта операция полезна для преобразования DataFrame в формат, где некоторые столбцы являются идентификаторами, а все остальные – значениями, которые не сводятся к строкам, оставляя только два столбца без идентификатора с указанными именами.
Также стало возможным использовать предложение OFFSET в SQL-запросах, чтобы ограничивать количество возвращаемых строк с помощью предложения LIMIT, отбросив первые N строк. Движок Spark SQL сам создаст и выполнит эффективный план запроса для этой операции.
C 2021 года стандарт SQL теперь охватывает синтаксис для вызова полиморфных функций табличных значений. Apache Spark 3.4 также поддерживает этот синтаксис, чтобы упростить запрос и преобразование коллекций данных стандартными способами.
5 полезностей для разработчиков распределенных приложений
В версии 3.4 Spark Connect представляет несвязанную архитектуру клиент-сервер, которая обеспечивает удаленное подключение к кластерам Spark из любого приложения, работающего в любом месте. Такое разделение клиента и сервера позволяет современным приложениям для работы с данными, IDE, блокнотам и языкам программирования получать интерактивный доступ к Spark, используя возможности DataFrame API.
Благодаря Spark Connect клиентские приложения влияют только на собственную среду, работая даже вне кластера Spark. Это помогает устранить конфликты зависимостей в драйвере Spark, позволяя избежать изменений в клиентские приложения при обновлении Spark. А разработчики могут выполнять пошаговую отладку приложений прямо в своих IDE.
Также добавлен новый тип данных TIMESTAMP WITHOUT TIMEZONE для представления значений временной метки без часового пояса. Ранее предполагалось, что значения, выраженные с использованием существующего типа данных TIMESTAMP Spark, встроенные в запросы SQL или переданные через JDBC, находятся в локальном часовом поясе сеанса и перед обработкой преобразуются в UTC. Хотя так происходит во многих случаях, например, при работе с календарями, часто пользователи предпочитают указывать значения временных меток независимо от часовых поясов, например, в лог-файлах. Теперь это стало возможным благодаря новому типу данных TIMESTAMP_NTZ.
Еще Apache Spark 3.4 представляет новый API под названием Dataset.to(StructType) для преобразования всего исходного датафрейма в указанную схему. Его поведение похоже на вставку таблицы, когда входной запрос настраивается в соответствии со схемой таблицы, но он расширен для работы и для внутренних полей. Это включает изменение порядка столбцов и внутренних полей в соответствии с указанной схемой, проецирование столбцов и внутренних полей, которые не нужны указанной схеме, а также приведение столбцов и внутренних полей к ожидаемым типам данных.
В новой версии фреймворка расширена поддержка SQLSTATE — пятибайтового кода, который стал стандартом де-факто для представления статусов возврата из запросов и команд SQL. SQLSTATE позволяет нескольким клиентам и серверам стандартизировать взаимодействие и упростить их реализацию. Это особенно верно для SQL-запросов и команд, отправляемых по соединениям JDBC и ODBC. В частности, значение SQLSTATE 22003 представляет числовое значение вне допустимого диапазона, а 22012 представляет собой деление на ноль. Получая подобные результаты в стандартизованном виде, разработчик может быстрее отладить программу.
Продолжая разговор про отладку программ, следует отметить изменения структуры сообщений об ошибках, которая стала более понятной и детальной. Исключения PySpark также теперь используют новую структуру и классифицируют классы ошибок и коды, позволяя разработчику определять желаемое поведение программы для конкретных случаев ошибок при возникновении исключений.
Ранее профилировщик памяти для пользовательских функций PySpark не включал поддержку профилирования исполнителей Spark. Хотя PySpark-программы, работающие на драйвере Spark, можно профилировать с помощью других профилировщиков, как и любой процесс Python, профилировать память в исполнителях Spark было не очень удобно. В новой версии PySpark включает профилировщик памяти, позволяя разработчику построчно профилировать свои UDF-функции и проверять потребление памяти.
Улучшения Spark Structured Streaming
На практике операции управления смещением могут занимать до 30–50% времени выполнения определенных конвейеров потоковой обработки данных. Асинхронное выполнение этих операций с возможностью настройки частоты их запуска помогут ускорить потоковые вычисления. Подробнее об этом читайте в нашей новой статье.
Также в Apache Spark 3.4 пользователи могут выполнять операции с отслеживанием состояния (агрегирование, дедупликация, объединение потоков и т. д.) несколько раз в одном запросе, включая агрегирование с цепочкой временных окон. Благодаря этому больше не нужно создавать несколько потоковых запросов с промежуточным хранилищем между ними, что влечет за собой дополнительные затраты на инфраструктуру и обслуживание, а также снижает производительность. Однако, пока это улучшение работает только в режиме добавления (Append), о котором мы писали здесь.
До версии 3.4 PySpark не поддерживал произвольную обработку с отслеживанием состояния, что вынуждало пользователей использовать API Java/Scala для реализации сложной логики UDF-функций. Но эти языки программирования на порядок сложнее Python. Поэтому в выпуске Apache Spark 3.4 появилась возможность напрямую выражать сложные stateful-функции на PySpark.
В заключение отметим реализованную поддержку бинарного формата Protobuf, который активно используется в сценариях потоковой передачи. В Apache Spark 3.4 пользователи могут читать и записывать записи в этом легковесном формате, используя встроенные функции from_protobuf() и to_protobuf().
Читайте в нашей новой статье, какие изменения внесены в отладочный выпуск 3.4.1, опубликованный в июне 2023 года.
Узнать больше о новинках свежего релиза и освоить все возможности Apache Spark для разработки приложений аналитики больших данных вы сможете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Основы Apache Spark для разработчиков
- Анализ данных с Apache Spark
- Потоковая обработка в Apache Spark
- Машинное обучение в Apache Spark
- Графовые алгоритмы в Apache Spark
Источники