Почему на самом деле нельзя избежать shuffle-операций в Spark SQL, в чем разница перетасовки RDD и датафреймов, а также как сократить негативное влияние перемешивания данных по узлам кластера, настроив конфигурации распределенного приложения.
Что такое shuffle-операции в Apache Spark SQL и зачем они нужны
Распределенный характер вычислительного движка Apache Spark позволяет создавать аналитические приложения обработки огромных объемов данных за счет использования специализированных проектных решений. Одним из них является механизм перетасовки (shuffle) в Spark SQL, который реализует перераспределение или повторное разделение данных, чтобы перегруппировать их в разделах. В зависимости от размера обрабатываемых данных количество разделов RDD или DataFrame приходиться уменьшить или увеличить с помощью конфигурации spark.sql.shuffle.partitions или вызовом метода repartition() в коде распределенного приложения.
К операциям перетасовки в Spark SQL относятся repartition(), groupByKey(), reduceByKey(), cogroup() и join(). А countByKey() не является shuffle-операцией, поскольку не вызывает перемещения данных между разделами, а делает вычисления с учетом значения заданного ключа партиционирования.
Операции перетасовки в Spark SQL являются очень дорогостоящими и часто являются причиной снижения скорости выполнения распределенного приложения из-за передачи данных по сети между исполнителями или даже между рабочими узлами в кластере. Поэтому рекомендуется избегать shuffle-операций, которые включают в себя следующие процессы:
- дисковый ввод-вывод;
- сериализацию и десериализацию данных;
- сетевой ввод-вывод.
Например, при запуске операции reduceByKey() для агрегирования данных по ключам, в Spark SQL выполняется ряд шагов:
- Spark запускает задачи сопоставления во всех разделах, которые группируют все значения для одного ключа;
- результаты задач сопоставления сохраняются в памяти;
- если результаты не помещаются в памяти, Spark сохраняет данные на диске, т.е. происходит так называемый перелив на диск (spill-эффект), о котором мы писали здесь;
- Spark перемешивает сопоставленные данные между разделами, иногда сохраняя перетасованные данные на диск для повторного использования;
- затем запускается сборка мусора (Grabage Collection);
- наконец, выполняется свертка результатов в каждом разделе на основе ключа.
Хотя метод reduceByKey() инициирует перемешивание данных, он не изменяет количество разделов, поскольку RDD наследуют размер раздела от родительской распределенной коллекции данных.
В отличие от RDD, DataFrame API увеличивает количество разделов, когда перетасовку выполняет операция преобразования, к которым относятся все агрегатные функции (sum, avg, min, max), а также join(). DataFrame автоматически увеличивает номер раздела до 200 при выполнении shuffle-операций. Этот номер раздела в случайном порядке по умолчанию берется из конфигурации Spark SQL spark.sql.shuffle.partitions, для которой по умолчанию установлено значение 200. Можно изменить это значение по умолчанию с помощью метода conf() объекта SparkSession или через изменение конфигурации команды Spark Submit:
spark.conf.set("spark.sql.shuffle.partitions",100)
В зависимости от размера набора данных, конфигурации исполнителя (количество ядер ЦП и памяти) shuffle-операции могут принести пользу или навредить Spark-заданию. Если набор данных не очень большой, рекомендуется сокращать разделы в случайном порядке, чтобы уменьшить количество мелких файлов с небольшим количеством записей в каждом разделе и снизить количество запускаемых задач.
С другой стороны, если данных очень много, а разделов недостаточно, может возникнуть ошибка нехватки памяти (OOM, Out Of Memory), о которой мы писали здесь. Вычисление правильного размера раздела в случайном порядке является довольно сложной задачей, но это ключевое свойство, которое влияет на производительностью заданий Spark.
Таким образом, не стоит считать shuffle-операции абсолютным злом. Их негативное влияние можно уменьшить, применив ряд мероприятий, которые мы рассмотрим далее.
Как сократить вред от shuffle-операций
Поскольку операции перетасовки предполагают передачу данных по сети, чтобы снизить негативный эффект shuffle-операций, можно уменьшить сетевой ввод-вывод, установив более эффективный сериализатор, например, Kryo. Сделать это можно, задав для конфигурации spark.serializer значение org.apache.spark.serializer.KryoSerializer. При использовании сериализатора Kryo может потребоваться зарегистрировать классы, которые надо сериализовать/десериализовать, с помощью Kryo.register(). Это связано с тем, что Kryo не поддерживает встроенный в Java механизм сериализации и требует явной регистрации классов. Kryo превосходит нативный сериализатор Java как по скорости вычислений и по количеству данных, которые могут быть обработаны в секунду. В частности, тест производительности, проведенный Intel, показал, что Kryo может сериализовать и десериализовать данные до 10 раз быстрее сериализатора Java, и обрабатывать до 5 раз больше данных в секунду. Аналогичные результаты показало бенчмаркинговое сравнение Databricks: Kryo может сериализовать и десериализовать данные до 4 раз быстрее сериализатора Java, и обрабатывать до 2,5 раз больше данных в секунду. Тонкости сериализации и десериализации данных мы разбирали в этом материале.
Чтобы сократить операции дискового ввода-вывода, смягчив или исключив spill-эффект, рекомендуется изменить значения следующих конфигураций:
- shuffle.compress — сжатие выходных данных в случайном порядке (по умолчанию true);
- shuffle.spill.compress — сжатие промежуточных файлов перетасовки (по умолчанию true).
Наиболее эффективным кодеком сжатия данных, задаваемого конфигурацией spark.io.compression.codec, сегодня считается zstld, который работает быстрее Snappy. Впрочем, это не единственные конфигурации, которые следует настроить для оптимизации shuffle-операций. В официальной документации Apache Spark перечисляется целый ряд таких настроек, которые мы рассмотрим в следующий раз, разбираясь с внешней службой перетасовки (ExternalShuffleService).
А в заключение отметим, что, поскольку Apache Spark использует память для хранения промежуточных результатов во время вычислений, включая операции перетасовки, рекомендуется настроить потребление памяти, задав параметр spark.memory.fraction. По умолчанию для этого параметра установлено значение 0,6. Это означает, что 60% памяти исполнителя используется для хранения и кэширования данных, а 40% — для выполнения. Подробнее о сегментах памяти Apache Spark и оптимальных значениях их конфигураций мы писали здесь. А о том, как можно использовать AQE-механизм для динамического объединения разделов после применения shuffle-операций, читайте в нашей новой статье.
Освойте тонкости применения Apache Spark для разработки приложений аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Основы Apache Spark для разработчиков
- Анализ данных с Apache Spark
- Потоковая обработка в Apache Spark
- Машинное обучение в Apache Spark
- Графовые алгоритмы в Apache Spark
Источники