Под капотом Apache Spark: 3 секрета для дата-инженера и разработчика

обучение Spark SQL примеры курсы обучение, анализ данных Spark, Spark разработка конфигурирование приложений для разработчика примеры курсы обучение, Spark Databrics примеры курсы обучение, Apache Spark Structured Streaming примеры курсы обучение, разработка приложения Spark, Apache Spark разработчик примеры курсы обучение, обучение большим данным, курсы дата-инженер аналитик Big Data, Школа Больших Данных Учебный Центр Коммерсант

Постоянно добавляя в наши курсы по Apache Spark полезные материалы, сегодня мы рассмотрим, что происходит под капотом этого вычислительного движка, чтобы помочь разработчикам распределенных приложений и дата-инженерам повысить его эффективность. Тонкости сериализации данных, компиляции SQL-запросов в JavaBytecode и сборка мусора.

2 библиотеки сериализации данных в Apache Spark

В распределенных системах передача данных по сети может стать источником проблем с высоким потреблением памяти и низкой производительностью. Чтобы избежать таких проблем, нужно внимательно подходить к объемам и форматам передаваемых данных, т.е. учитывать тонкости сериализации – преобразование объектов в поток байтов для передачи по узлам сети, сохранения в файле или буфере памяти. Обратный процесс называется десериализацией и поддерживается теми же средствами, что и первичный перевод объектов в байты. Apache Spark предоставляет 2 библиотеки сериализации, а режимы этого процесса поддерживаются и настраиваются через свойство spark.serializer:

  • сериализация Java (по умолчанию), которая используется фреймворком при запуске драйвера. Spark сериализует объекты с помощью платформы Java ObjectOutputStream. Сериализация класса обеспечивается классом, реализующим интерфейс io.Serializable. Классы, которые не реализуют этот интерфейс, не будут сериализованы или десериализованы. Все подтипы сериализованного класса сериализуются сами. При этом класс никогда не сериализуется, а сериализации подвергается только конкретная реализация экземпляра этого класса, т.е. объект. Сериализация Java выполняется медленно и приводит к большим сериализованным форматам для многих классов. Чтобы точно настроить производительность при использовании этой библиотеки, следует расширить интерфейс java.io.Externalizable.
  • сериализация Kryo (рекомендуется), которая представляет собой легковесную, быструю и эффективная среда сериализации графов двоичных объектов для Java. Kryo работает быстро, имеет небольшой размер и простой в использовании API, позволяя сохранить объекты в файле, базе данных или передать по сети. Kryo также может выполнять автоматическое глубокое и поверхностное копирование/клонирование, напрямую копируя из объекта в объект, а не объект в байты в объект.

Kryo требует меньше памяти, чем сериализация Java, что важно при передачи по сети и кэшировании больших объемов данных. Однако, Kryo не используется по умолчанию из-за необходимости пользовательской регистрации и ручной настройки. Когда Kryo сериализует объект, он создает экземпляр ранее зарегистрированного класса Serializer для преобразования в байты. Сериализаторы по умолчанию можно использовать без дополнительных настроек.

В Apache Spark изначально оба метода сериализации данных при записи на диск, saveAsObjectFile() в RDD и objectFile() в SparkContext, поддерживают только сериализацию Java. Чтобы использовать преимущества сериализации Kryo с полным контролем этого процесса, разработчик Spark-приложения может написать собственный класс Serializer и зарегистрировать его в Kryo или позволить классу самостоятельно выполнять сериализацию. Как это сделать, показывает следующий участок кода на Java:

val conf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryoserializer.buffer.mb","24")

val sc = new SparkContext(conf)val sc = new SparkContext(conf)

val spark = SparkSession.builder().appName(“KryoSerializerExample”) 
.config(someConfig) .config(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”) .config(“spark.kryoserializer.buffer”, “1024k”) .config(“spark.kryoserializer.buffer.max”, “1024m”) .config(“spark.kryo.registrationRequired”, “true”) 
.getOrCreate }
Kryo сериализация данных Apache Spark
Использование сериализации Kryo в Apache Spark

Метод muxBufferSizeMb() определяет размер буфера для хранения самого большого объекта, который сериализуется. Он должен быть достаточно большим для оптимальной производительности. KryoSerializer — это вспомогательный класс, предоставляемый Spark для работы с Kryo. Создается один экземпляр класса KryoSerializer, который настраивает требуемые размеры буфера, указанные в конфигурации.

Избежать проблем с сериализацией помогут следующие рекомендации:

  • сделать объект/класс сериализуемым;
  • объявить экземпляр в лямбда-функции;
  • объявлять функции внутри объекта как можно чаще;
  • переопределить переменные, предоставляемые конструкторам классов внутри функций.

SQL-запросы, байт-код Java и оптимизатор

Не отходя далеко от сериализации данных и байт-кода, рассмотрим, как происходит выполнение SQL-запросов в Apache Spark. Напомним, движок включает встроенный оптимизатор SQL-запросов Catalyst, который выполняет не только оценку выражений и строит планы их выполнения, но и компилирует запрос в исполняемый код. Catalyst поддерживает генерацию JavaByteCode для повышения скорости выполнения запроса. Байт-код Java или JavaBytecode — это скомпилированный формат программы Java, чтобы ее можно было передать по сети и выполнить на виртуальной машине JVM. JavaByteCode не зависит от платформы, поскольку JVM преобразует байт-код для понимания аппаратным оборудованием. Catalyst использует специальную функцию Scala — Quasiquotes, чтобы упростить генерацию байт-кода. Она упрощает работу компилятора Scala, позволяя программно строить абстрактные синтаксические деревья (AST, Abstract Syntax Tree) на этом языке программирования, чтобы передать их компилятору во время выполнения для генерации бай-кода. Генерация AST из дерева, представляющего выражение в SQL, помогает в оценке этого выражения. По сути, оптимизатор Catalyst поддерживает генерацию байт-кода путем вычисления выражения в SQL с построением AST через Quasiquotes.

Без кодогенерации выражения пришлось бы интерпретировать для каждой строки данных. Каждый узел дерева выражения представлен объектом с методом оценки, который определяет, как вычислить результат выражения для данной строки. При интерпретируемом выполнении вызывается метод оценки для корня дерева, а также для каждого из его дочерних элементов и, наконец, вычисляется результат выражения. Это влияет на производительность, делая много вызовов виртуальных функций, расширяя код оценки множеством ветвей if-else из-за необходимости обрабатывать различные типы входных данных и выделяя дополнительный объект для универсального возвращаемого типа.

Quasiquotes генерирует абстрактные синтаксические деревья, которые упрощает процесс генерации кода. В AST представлены не все детали синтаксиса, а только структурные или связанные с содержанием детали. Это древовидное представление отображает реальную структуру кода. В итоге последовательность генерации кода SQL-запросов выглядит так:

  • дерево выражений SQL-запроса преобразуется в AST на Scala с помощью Quasiquotes;
  • AST-деревья передаются в компилятор для генерации JaveByteCode;
  • Байт-код Java используется для запуска на виртуальных машинах Java.
Spark Catalyst Кодогенерация при оптимизации SQL-запросов в Apache Spark с оптимизатором Catalyst Quasiquotes SQL JavaByteCode
Кодогенерация при оптимизации SQL-запросов в Apache Spark с оптимизатором Catalyst и Quasiquotes

Таким образом, Quasiquotes проверяют данные во время компиляции, чтобы обеспечить замену соответствующих AST или литералов. Это приводит к тому, что Scala AST помогает избежать запуска синтаксического анализатора Scala во время выполнения. Таким образом, оптимизатор Catalyst использует Quasiquotes для ускорения выполнения SQL-запроса.

Генеральная уборка: тонкости сборки мусора в Spark-приложениях

Поскольку базовым языком разработки самого фреймворка Apache Spark является Scala, а программы исполняются в виртуальной машине Java, неудивительно, что концепция сборки мусора важна для управления памятью этого вычислительного движка. Напомним, пространство кучи Java разделено на две области:

  • Young — для хранения короткоживущих объектов;
  • Old — для объектов с более длительным сроком действия.

Пространство Young делится на три региона: Eden, Survivor1 и Survivor2. В итоге процедура сборки мусора, т.е. удаления из памяти неиспользуемых объектов, сводится к следующим шагам:

  • Когда Eden заполнен, в нем запускается минорная сборка мусора, а используемые объекты из Eden и Survivor1 копируются в Survivor2;
  • Содержимое регионов Survivor1 и Survivor2 меняется местами;
  • Если объект достаточно старый или Survivor2 заполнен, объект перемещается в область Old.
  • Когда Old близок к заполнению, вызывается полная сборка мусора, включая трассировку всех объектов в куче, удаление тех, на которые нет ссылок, и перемещение других, чтобы заполнить неиспользуемое пространство. Это самая медленная операция сборки мусора.

Целью настройки сборки мусора в Spark является обеспечение того, чтобы в Old-пространстве хранились только долгоживущие кэшированные наборы данных, а размер Young- был достаточен для хранения всех недолговечных объектов. Это поможет избежать полной сборки мусора для сбора временных объектов, созданных во время выполнения задачи. Реализовать эту простую, но эффективную идею помогут следующие рекомендации:

  • собрать статистику сборки мусора, чтобы определить, не слишком ли часто она запускается. Если полная сборка мусора вызывается несколько раз до завершения задачи, это означает, что для выполнения задач недостаточно памяти. Поэтому следует уменьшить объем памяти, используемый Spark для кэширования, изменив значение конфигурации memory.fraction, о чем мы писали здесь.
  • если второстепенных коллекций слишком много, но меньше полных сборок мусора, можно выделить больший объем памяти для Eden в соответствии с оценкой того, сколько памяти потребуется для каждой задачи. Если размер Eden определен как E, то рекомендуемый размер Young устанавливается через опцию -Xmn=4/3*E. Увеличение на 4/3 учитывает пространство, используемое областями Survivor1 и Survivor2.

Например, если задача считывает данные из HDFS, потребляемый ей объем памяти можно оценить, используя размер считываемого блока данных. При этом размер распакованного блока в 2-3 раза больше исходного. Поэтому, если нужно рабочее пространство на 3 или 4 задачи, а размер блока HDFS составляет 128 МБ, то примерный размер Eden будет 43 128 МБ.

Также можно запустить сборщик мусора G1GC с опцией -XX:+UseG1GC. Это может повысить производительность в тех ситуациях, когда сборка мусора является узким местом, и нет возможности исправить это, переопределив размеры областей кучи. Важно, что при больших размерах кучи исполнителя следует увеличить размер области G1 с помощью опции -XX:G1HeapRegionSize и следить за тем, как частота и время, затрачиваемое на сборку мусора, меняются с новыми настройками. Хотя эффект настройки сборки мусора зависит от самого Spark-приложения и объема доступной памяти, на практике управление частотой полной сборки мусора может сократить накладные расходы. Для этого рекомендуется указать флаги настройки сборки мусора для исполнителей в конфигурации задания, установив нужное значение параметру spark.executor.extraJavaOptions. Подробно о сборке мусора в Spark-приложениях мы писали здесь. А о том, как повысить производительность заданий Spark, читайте в нашей новой статье.

Освойте администрирование и использование Apache Spark для задач дата-инженерии, разработки распределенных приложений и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://selectfrom.dev/apache-spark-all-about-serialization-f84f38c99f5b
  2. https://teepika-r-m.medium.com/spark-optimization-quasiquotes-eef599cd102d
  3. https://joydipnath.medium.com/garbage-collection-tuning-concepts-in-spark-cf1a784e83c
Поиск по сайту