Каждому специалисту по Data Science и инженеру данных знакома Python-библиотека pandas. Однако, для работы с большими данными она не очень подходит из-за высокого потребления памяти. Тем не менее, отказаться от старых привычек сложно. Поэтому разбираемся, зачем использовать API Pandas в Apache Spark и как это сделать наиболее эффективно.
Чем отличается реализация API Pandas в Spark от исходной Python-библиотки
Многие дата-инженеры и разработчики приложений аналитики больших данных знают, что старый добрый pandas становится слишком медленным при работе с огромными объемами данных. Частично этот недостаток устранен в новом релизе pandas 2.0, который вышел в апреле 2023. В частности, добавлена поддержка pyarrow, чтобы использовать эффективный колоночный формат Apache Arrow для хранения и параллельной обработки датафреймов. Также pandas 2.0 теперь поддерживает отложенные вычисления и оптимизацию копирования при записи, создавая при копировании объекта pandas (DataFrame или Series) ссылку на исходные данные вместо фактической генерации новой копии, пока эти данные не будут изменены. Так несколько неизменных копий одних и тех же данных ссылаются на одну и ту же память, что очень снижает потребление ресурсов и увеличивает производительность системы. Еще расширена поддержка типов данных в индексах, что тоже ускоряет вычисления. Подробнее об этих новинках pandas 2.0 читайте здесь.
Впрочем, несмотря на то, что pandas 2.0 стал на пару шагов ближе к PySpark, можно столкнуться с трудностями при использовании API этой python-библиотеке во фреймворке для разработки приложений аналитики больших данных. Хотя Apache Spark и поддерживает Pandas API, эта совместимость не 100%. Поэтому иногда приходится искать пути обхода этих ограничений, чтобы конвертировать код Pandas API в PySpark.
Датафреймы pandas в PySpark и аналогичная структура данных в исходной Python-библиотеке похожи, однако, распределенный характер Spark приводит к тому, что при преобразовании DataFrame.to_pandas() данные передаются между несколькими компьютерами. Таким образом, преобразование датафрейма pandas-on-Spark в pandas требует сбора всех данных на клиентском компьютере. Поэтому лучше использовать API Pandas для Spark или PySpark, вызвав метод DataFrame.to_spark(). Пример того, как это работает, смотрите в нашей новой статье.
Хотя датафреймы Pandas и Spark практически взаимозаменяемы, при преобразовании датафрейма Pandas из аналогичной структуры данных PySpark по умолчанию создается новый индекс. Чтобы избежать этих накладных расходов, можно явно указывать столбец, который будет использоваться в качестве индекса, поскольку индекс по умолчанию обычно неэффективен. Вообще при преобразовании датафрейма pandas-on-Spark из PySpark или наоборот типы данных автоматически преобразуются.
Однако есть несколько типов данных, предоставляемых только библиотекой pandas, которые не поддерживаются в API Pandas в Spark:
- Timedelta — продолжительность, разница между двумя датами или временем;
- Categorical – категориал, категориальная переменная в классическом стиле R/S-plus, которая может принимать только ограниченное и обычно фиксированное количество возможных значений (категорий). В отличие от статистических категориальных переменных, категориальные переменные могут иметь порядок, но числовые операции с ними (сложение, деление и т. д.) невозможны. Все значения категориала находятся либо в категориях , либо в np.nan. Присвоение значений вне категорий приведет к возникновению ошибки ValueError. Порядок определяется порядком категорий, а не лексическим порядком значений.
- CategoricalDtype – типы для категориальных переменных с упорядоченностью.
Возможно, в скором будущем эти типы данных pandas будут поддерживаться в API pandas PySpark. А поддержка вот этих не планируется: pd.SparseDtype, pd.DatetimeTZDtype, pd.UInt*Dtype, pd.BooleanDtype и pd.StringDtype.
Также API Pandas в Spark не поддерживает несколько типов данных в одном столбце и структурированную потоковую передачу. Обойти это ограничение можно, используя API Pandas в Spark с методом foreachBatch в Structured Streaming, который позволяет указать функцию для выполнения над выходными данными каждого микропакета потокового запроса, например,
spark.readStream.format("rate").load().writeStream.foreachBatch(func).start()
Как использовать API Pandas в PySpark более эффективно: лучшие практики
Поскольку Pandas API в Spark базируется на ядре этого распределенного фреймворка, многие его компоненты, функции и методы оптимизации производительности доступны для использования. Например, сервер истории, веб-интерфейс и режимы развертывания, также доступны с API-интерфейсом pandas в Spark, как и контекст и сеансы Spark-приложения. Если есть настроенный контекст Spark или запущенные сеансы, API pandas в Spark использует их. При отсутствии контекста или сеанса Spark можно установить значения SparkContext и/или SparkSession, чтобы использовать их API pandas в Spark. Например, настроить память исполнителя в Spark можно следующим образом:
from pyspark import SparkConf, SparkContext conf = SparkConf() conf.set('spark.executor.memory', '2g') # Pandas API on Spark automatically uses this Spark context with the configurations set. SparkContext(conf=conf) import pyspark.pandas as ps
Другой распространенной конфигурацией может быть оптимизация Apache Arrow в PySpark, чтобы использовать преимущества этого колоночного формата для быстрой обработки SQL-запросов:
from pyspark.sql import SparkSession builder = SparkSession.builder.appName("pandas-on-spark") builder = builder.config("spark.sql.execution.arrow.pyspark.enabled", "true") # Pandas API on Spark automatically uses this Spark session with the configurations set. builder.getOrCreate() import pyspark.pandas as ps
Как уже было отмечено ранее, начиная с версии 2.0, pandas поддерживает концепцию отложенных вычислений. А Spark реализует эту идею изначально. Поэтому дорогостоящие операции можно спрогнозировать, используя DataFrame.spark.explain() в PySpark перед фактическими вычислениями, поскольку API pandas в Spark также поддерживает отложенные выполнения. Таким образом, разработчик может проверить фактические планы выполнения SQL-запросов, чтобы определить причину их медленного выполнения. Например, после множества операций над API pandas над объектами Spark базовый планировщик Spark может замедлиться из-за огромного и сложного плана. В этом случае пригодятся функции генерации точек сохранения DataFrame.spark.checkpoint() или DataFrame.spark.local_checkpoint(). Результат предыдущего датафрейма сохраняется в настроенной файловой системе при вызове DataFrame.spark.checkpoint() и в исполнителе при вызове DataFrame.spark.local_checkpoint(). Подробнее про визуализацию планов выполнения запросов в Apache Spark читайте в нашей новой статье с примерами.
Некоторые операции, например, sort_values() сложнее выполнять в параллельной или распределенной среде, чем в памяти на одном компьютере, поскольку необходимо отправлять данные на другие узлы и обмениваться данными между несколькими узлами через сети. Аналогично ведут себя вычисления на одном разделе. Поэтому API Pandas в Spark по умолчанию запрещает операции с разными датафреймами или сериями, чтобы предотвратить дорогостоящие операции. Некоторые API Pandas в Spark, например, DataFrame.rank, используют оконные функции PySpark без указания раздела, перемещая все данные в один раздел на одном компьютере. Это может привести к серьезному снижению производительности, поэтому подобных функций следует избегать для больших наборов данных. Вместо этого следует использовать GroupBy.rank, который менее затратен, поскольку данные можно распределять и вычислять для каждой группы.
Как уже было отмечено ранее, API Pandas в Spark прикрепляет индекс по умолчанию при преобразовании датафрейма Spark в аналогичную структуру данных pandas-on-Spark. Это снижает производительность. Последовательность (sequence) требует вычислений на одном разделе, что не рекомендуется. Если надо обрабатывать большие данные, их нужно распределить, настроив индекс по умолчанию на distributed или distributed-sequence.
Поскольку столбцы с начальными и конечными подчеркиваниями (__) в названиях зарезервированы в API Pandas в Spark для обработки внутреннего поведения индекса, пользовательские столбцы рекомендуется называть без этих символов подчеркивания. Также запрещено использовать повторяющиеся имена столбцов, поскольку Spark SQL вообще не допускает этого, а API Pandas в Spark наследует такое поведение. Не рекомендуется называть столбцы с учетом регистра, API Pandas в Spark по умолчанию запрещает это. Хотя при необходимости разработчик может включить это в конфигурации spark.sql.caseSensitive.
Освойте возможности Apache Spark для разработки приложений аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Основы Apache Spark для разработчиков
- Потоковая обработка в Apache Spark
- Анализ данных с Apache Spark
- Машинное обучение в Apache Spark
- Графовые алгоритмы в Apache Spark
Источники