Динамическое сокращение разделов в Spark SQL

Spark SQL примеры курсы обучение, Spark SQL для дата-инженера и разработчика, обучение Apache Spark Школа Больших Данных Учебный Центр Коммерсант

Что такое Dynamic Partition Pruning в Spark SQL, как работает этот метод оптимизации пакетных запросов, зачем его использовать в задачах аналитики больших данных, и каким образом повысить эффективность его практического применения.

Что такое Dynamic Partition Pruning и зачем это нужно в Spark SQL

Параллельная обработка данных в Apache Spark обеспечивается благодаря их разделению. Каждый раздел обрабатывается отдельным процессом (исполнителем). Поэтому можно сказать, что раздел в Spark является единицей параллелизма. Однако, слишком большое количество разделов приводит к потере параллелизма, поскольку 1 исполнитель Spark может обрабатывать только 1 раздел в единицу времени. Поэтому для оптимизации запросов в Spark SQL, начиная с версии 3.0, есть механизм динамического сокращения разделов (DPP, Dynamic Partition Pruning). Этот метод оптимизации предотвращает сканирование ненужных разделов при чтении данных и применяется в пакетных запросах соединения партиционированных таблиц, которые соединяются по столбцам разделов. При этом условия фильтрации переносятся в большую таблицу фактов, сокращая количество строк для сканирования. Наилучшие результаты ожидаются в запросах с оператором JOIN между большой таблицей фактов и гораздо меньшей таблицей измерений, что характерно для звездообразной схемы.

Запрос в Apache Spark представляет собой простое соединение, которое в зависимости от размера набора данных преобразуется или нет в широковещательное соединение. Dynamic Partition Pruning работает с партиционированными наборами данных, преобразуя одну сторону соединения в широковещательный фильтр, используемый для пропуска нерелевантных разделов другой стороны соединения. Они нерелевантны из-за результата фильтрации, поэтому их чтение не сгенерирует никаких выходных строк и не имеет смысла.

DPP проверяет значения разделов, запрошенные в фильтрах и предикатах запроса, определяя нужные разделы для выполнения запроса. Все остальные разделы автоматически обрезаются и не включаются в план выполнения запроса. Поэтому DPP сокращает время обработки данных и потребление ресурсов. Dynamic Partition Pruning работает как со статическими разделами, так и с динамически генерируемыми разделами, которые добавляются посредством вставок или инкрементных загрузок: Spark распознает новые разделы и продолжает применять динамическое сокращение. Включение DPP происходит автоматически как оптимизация во время генерации плана запроса, на этапе логической оптимизации с использованием правил PartitionPruning и CleanupDynamicPruningFilters. Эта оптимизация контролируется свойством конфигурации spark.sql.optimizer.dynamicPartitionPruning.enabled.

Когда параметр spark.sql.optimizer.dynamicPartitionPruning.enabled установлен в значение true (по умолчанию), то DPP будет применяться к запросу, если тот является допустимым. Второе свойство spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly также управляет использованием DPP. Если это свойство установлено в true (по умолчанию), DPP будет применяться только тогда, когда широковещательный обмен BroadcastExchange может быть повторно использован в динамическом фильтре сокращения. Напомним, BroadcastExchangeExec – это унарный физический оператор Exchange для сбора и трансляции строк дочернего отношения рабочим узлам. BroadcastExchangeExec создается в Spark SQL исключительно тогда, когда оптимизация EnsureRequirements физического плана запроса обеспечивает широковещательное распределение BroadcastDistribution входных данных физического оператора, который может быть оператором BroadcastHashJoinExec или BroadcastNestedLoopJoinExec.

За атрибуты, используемые в DPP, отвечают следующие свойства:

  • sql.optimizer.dynamicPartitionPruning.useStats определяет , следует ли использовать уникальный счетчик атрибута JOIN$
  • sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio устанавливает резервное значение для использования в алгоритме, когда статистика отключена или недоступна.

Оба этих свойства важны для оценки целесообразности применения DPP, т.е. добавления подзапроса сокращения разделов в план его выполнения. Как выполняется эта оценка с использованием правил PartitionPruning и CleanupDynamicPruningFilters, рассмотрим далее.

Оценка применимости DPP-оптимизации

Правило PartitionPruning сперва оценивает целесообразность применения DPP-оптимизации с учетом типа запроса и селективности операции соединения. Оно начинается с проверки на левой стороне соединения. При этом в логическом плане запроса появляется метод getFilterableTableScan, который нужен, чтобы гарантировать, что сканирование таблицы с левой стороны может быть отфильтровано для заданного столбца. Сканирование таблицы должно быть либо партиционированным сканированием для заданного столбца раздела, либо сканированием, которое поддерживает фильтрацию во время выполнения по заданному атрибуту. Затем тип соединения проверяется методом canPruneLeft, чтобы убедиться, что тип соединения поддерживает обрезку разделов на левой стороне. При этом тип JOIN-соединения должен быть Inner, LeftSemi или RightOuter для поддержки сокращения разделов с левой стороны. Метод hasPartitionPruningFilter используется для проверки того, имеет ли правая сторона соединения селективный предикат, который может фильтровать ключ соединения. Когда все вышеперечисленные проверки пройдены, правило PartitionPruning вызывает метод insertPredicate для вставки предиката на левой стороне соединения. Перед вставкой предиката DPP метод insertPredicate запускает метод pruningHasBenefit для оценки затрат и выгод оптимизации DPP и вставляет предикат DPP только в том случае, если выгоды превышают затраты или для текущего сеанса Spark включено повторное использование широковещательного обмена.

Метод pruningHasBenefit оценивает выгоды DPP, используя размер плана в байтах на стороне сокращения разделов, умноженный на filterRatioи оценивает затраты DPP, используя общий размер в байтах на другой стороне соединения. FilterRatio оценивается с использованием статистики столбцов, если она доступна. Иначе оценка выполняется с использованием настроенного значения spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio.

Если какая-то из проверок по соединению таблицы с левой стороны не выполняется, те же самые проверки применяются к таблице с правой стороны для оценки применимости DPP-оптимизации.

Если оценка применимости DPP признает его полезным для плана выполнения запроса или включено повторное использование обмена, предикат DPP вставляется в «сокращаемую» сторону соединения с использованием фильтра на другой стороне. Создается пользовательское выражение DynamicPruning, которое оборачивает фильтр в выражение IN.

На этапе оптимизации логического плана правило PartitionPruning вставляет дублированный подзапрос с фильтром с другой стороны. Затем на этапе подготовки плана выполнения применяется правило планировщика Spark под названием PlanDynamicProuningFilters, чтобы удалить дубликат подзапроса путем повторного использования результатов трансляции.

Правило PlanDynamicProuningFilters сначала проверяет, может ли план запроса повторно использовать широковещательный обмен, для которого требуется флаг exchangeResueEnabled , установленный в значение true, и физический оператор соединения BroadcastHashJoinExec. Если план запроса может повторно использовать широковещательный обмен, дублирующийся подзапрос будет заменен повторно использованными результатами широковещания. В противном случае, если предполагаемая выгода от использования дублирующегося подзапроса все еще перевешивает использование исходного плана запроса без DPP-оптимизации, дублирующийся подзапрос сохраняется. Если нет, подзапрос будет удален.

Тонкости практического применения динамического сокращения разделов в Spark SQL

В связи с пакетным характером DPP-механизма, он не применяется к потоковым запросам. Оптимизация направлена на сокращение затрат на ввод-вывод данных, считываемых из источников данных. Поэтому Dynamic Partition Pruning особенно эффективен для аналитических задач и BI-приложений. В звездной схеме таблицы измерений обычно намного меньше таблиц фактов. Поэтому, когда большая таблица фактов соединяется с несколькими гораздо меньшими таблицами измерений, DPP-оптимизация имеет смысл. Также она полезна, если таблица фактов должна быть разделена на срезы и кубы по некоторым атрибутам таблиц измерений. При соединении таблицы фактов к таблице измерений DPP оптимизирует план запроса, создавая подзапрос из любых фильтров, которые применяются к таблице измерений. DPP транслирует этот подзапрос и строит из него хэш-таблицу, используя это на фазе сканирования большой таблицы фактов перед чтением из нее данных. Это помогает DPP сократить объем читаемых данных, а, значит, ускорить выполнение аналитического запроса.

Повысить эффективность применения DPP-оптимизации в Spark SQL помогут следующие рекомендации:

  • фильтрация данных на ранних этапах операций с датафреймами Spark помогает сокращать разделы, используя их метаданные;
  • сбор статистики с помощью ANALYZE TABLE позволяет Spark точнее определять, какие разделы можно игнорировать, поэтому следует периодически запускать этот оператор, который собирает статистические данные об одной или всех таблицах в указанной базе данных;
  • не нужно иметь слишком много разделов, поскольку это перегружает узел драйвера при сборе статистики. Рекомендуется от 10 до 100 разделов на одну большую таблицу.
  • Чтобы предотвратить случайные соединения, требующие перемещения всех данных, и оптимизировать объем сканирования, можно перераспределить датафреймы перед соединением, используя функцию repartition(numsPartition, cols). Она выполняет разделение датафрейма в памяти, создавая такое количество файлов разделов, которое указано в аргументе numsPartition, а аргумент cols обеспечивает создание только одного раздела для комбинации значений столбцов. Без указания параметров функция repartition() создает равное количество разделов примерно с одинаковыми размерами файлов.
  • Чтобы убедиться в целесообразности применения DPP-оптимизации, можно посмотреть план выполнения запроса, дополнив его операторов EXPLAIN. Пример того, как это сделать, мы разбирали в этой статье.

Узнайте больше про использование Apache Spark для разработки приложений аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://dataninjago.com/2022/02/03/spark-sql-query-engine-deep-dive-17-dynamic-partition-pruning/
  2. https://books.japila.pl/spark-sql-internals/dynamic-partition-pruning/
  3. https://docs.aws.amazon.com/prescriptive-guidance/latest/spark-tuning-glue-emr/pruning-dynamic-partitions.html
  4. https://www.waitingforcode.com/apache-spark-sql/whats-new-apache-spark-3-dynamic-partition-pruning/read
Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту