В рамках обучения разработчиков Spark-приложений, сегодня рассмотрим, как сохранить датафрейм в памяти вне кучи исполнителя и зачем это нужно. Вас ждет краткий ликбез по управлению памятью в Apache Spark с описанием настраиваемых конфигураций. Также на простом практическом примере разберем, как это сделать и где в пользовательском веб-интерфейсе фреймворка посмотреть результаты сохранения данных.
Зачем сохранять датафреймы вне кучи: краткий ликбез по управлению памятью в Apache Spark
Памятью вне кучи (Off-Heap Memory) называется сегмент за пределами JVM, который иногда используется виртуальной машиной Java. К примеру, для метода интернирования String.intern(), который обеспечивает, что строки с одинаковым содержимым совместно используют одну и ту же область памяти. В Спарк память вне кучи также может использоваться для хранения сериализованных датафреймов и RDD.
Хотя большинство операций происходит внутри JVM в рамках памяти кучи, каждый исполнитель может также иногда обращаться к пространству за пределами виртуальной машины Java через API-интерфейсы sun.misc.Unsafe. Эта память вне кучи находится за пределами области сборки мусора, предоставляя разработчику приложения больше возможностей и снижая накладные расходы JVM. Еще фреймворк хранит в этой памяти данные при работе SQL-компонента Tungsten, который работает непосредственно на уровне байтов и повышает эффективность аналитической обработки [1]. Подробнее о видах памяти и ее распределении в этом фреймворке мы рассказывали здесь.
По умолчанию в рассматриваемой среде память вне кучи отключена. Включить ее можно с помощью следующих конфигураций [2]:
- memory.offHeap.size — размер памяти вне кучи в байтах, используемый фреймворком для хранения фактических датафреймов;
- memory.offHeap.enabled – значение этого параметра должно быть True, чтобы включить хранилище вне кучи.
Конфигурация spark.memory.offHeap.size не влияет на использование памяти кучи, поэтому, если общее потребление памяти Spark-исполнителями ограничено, нужно уменьшить размер кучи JVM соответственно. Если spark.memory.offHeap.enabled=True, то параметр spark.memory.offHeap.size должен принимать положительное значение. По умолчанию spark.memory.offHeap.size=0, а spark.memory.offHeap.enabled=False [3].
При этом общий размер памяти вне кучи для исполнителя задается конфигурацией spark.executor.memoryOverhead, по умолчанию равной 10% памяти исполнителя при минимальном размере 384 МБ. Если пользователь явно не определил значение этого параметра, фреймворк сам выделит 10% памяти исполнителя или 384 МБ, в зависимости накладных расходов JVM [1]. Этот объем дополнительной памяти для каждого процесса-исполнителя увеличивается по мере роста его размера. Это память учитывает накладные расходы JVM, интернированные строки и прочие источники потребления памяти, которые увеличиваются по мере роста исполнителя. Опция поддерживается в YARN и Kubernetes. Также дополнительная память включает память исполнителя PySpark, если spark.executor.pyspark.memory специально не настроен, и память, используемую другими процессами, которые не являются исполнителями, но работают в том же контейнере. Максимальный объем памяти контейнера для запущенного исполнителя равен сумме значений параметров spark.executor.memoryOverhead, spark.executor.memory, spark.memory.offHeap.size и spark.executor.pyspark.memory [3].
Таким образом, чтобы сохранить датафрейм в памяти вне кучи исполнителей, следует сперва включить конфигурацию spark.memory.offHeap.size. Сделать это можно в shell-оболочке фреймворка или через spark-submit с помощью следующих команд [2]:
—conf «spark.memory.offHeap.size=1000000000»
—conf «spark.memory.offHeap.enabled=true»
Далее проверим, как это работает.
Сохранение датафреймов вне кучи: практический пример
Предположим, данные, которые необходимо сохранить как датафрейм вне памяти, хранятся в CSV-файле под названием data. Прочитаем эти данные с помощью метода spark.read.format(), включая 1-ю строку как заголовок и знак точки с запятой как разделитель столбцов [2]:
val data = spark.read.format(«csv»).option(«header», «true»).option(«delimiter», «;»).load(«data.csv»)
Далее можно сохранить эти данные в память вне кучи, указав этот уровень хранения в параметрах метода persist():
import org.apache.spark.storage._
data.persist(StorageLevel.OFF_HEAP)
Напомним, persist() устанавливает уровень хранения датафрейма между операциями после первого вычисления. Этот метод можно использовать только для назначения нового уровня хранения, если он не был установлен ранее для этих данных. Об этом мы писали здесь.
Вызвав действие, например, метод show(), можно посмотреть, где хранится датафрейм data. Для этого следует открыть вкладку хранилища (Storage) в веб-интерфейсе фреймворка. О том, какие еще полезные сведения доступны в GUI этого фреймворка, читайте в нашей отдельной статье.
Примечательно, что рассматриваемый датафрейм показан как более примитивная структура данных – RDD. О структурах данных Apache Spark, их отличиях, достоинствах и недостатках читайте в этом материале.
Больше подробностей по использованию Apache Spark для разработки распределенных приложений и аналитики больших данных вам помогут узнать специализированные курсы в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Основы Apache Spark для разработчиков
- Анализ данных с Apache Spark
- Потоковая обработка в Apache Spark
- Машинное обучение в Apache Spark
- Графовые алгоритмы в Apache Spark
Источники