Продолжая разбирать особенности бакетирования таблиц в Apache Spark, сегодня мы рассмотрим несколько примеров, как дата-инженер и аналитик данных могут работать с этим методом оптимизации SQL-запросов. Также читайте далее, какие конфигурации Apache Spark SQL связаны с бакетированием таблиц и что нового появилось в 3-ей версии этого Big Data фреймворка, чтобы такой способ повышения производительности стал еще эффективнее.
Особенности бакетирования Spark-таблиц
Бакетирование – это метод оптимизации в Apache Spark и Hive, чтобы предотвратить перетасовку данных за счет их сегментирования по одним бакетам (bucket) – столбцам кластеризации. Обычно бакетирование применяется в оптимизации производительности JOIN-запросов и особенно эффективно, когда предварительно перемешанные бакетированные таблицы используются в SQL-запросе более одного раза. Подробнее о том, как это работает, а также каковы основные достоинства и недостатки этого метода оптимизации Spark SQL, мы рассказывали здесь и здесь.
В отличие от метода партиционирования таблиц partitionBy(), который создает структуру каталогов и имеет ограниченную применимость к столбцам с высокой мощностью, метод бакетирования bucketBy() распределяет данные по фиксированному количеству сегментов и подходит для случаев с неограниченным количеством уникальных значений [1].
Если таблица разделена на бакеты, информация о этом сохраняется в хранилище метаданных, доступ к которому должен иметь Apache Spark. Например, в случае команды df=spark.table(table_name) фреймворк получает информацию о бакетах из хранилища. А при явном указании пути к данным в команде df=spark.read.parquet(path_to_data) Spark не связывается с хранилищем метаданных Hive и не получает информацию о бакетировании. В этом случае оно не будет применяться.
Примечательно, что до Spark 3.0, если бакетированный столбец назывался по-разному в двух соединяемых таблицах, и был переименован в DataFrame, бакетирование преставало работать. Эта проблема была исправлена в 3-ей версии фреймворка. Но типы данных соединяемых столбцов должны быть одинаковыми. Иначе Spark преобразует его, отбросив информацию о сегменте, и обе таблицы будут перетасованы.
Можно помочь Spark добиться одностороннего соединения без перемешивания, выполнив явное приведение типов данных к одному для обеих соединяемых таблиц, а затем перераспределить измененную таблицу на то же количество сегментов, что и в другой. Таким образом, перемешивание будет происходить только на одной стороне, а другая таблица не будет перемешиваться. Это эквивалентно ситуации, когда только одна таблица бакетирована, а другая — нет.
В Spark 3.1.1 была реализована новая функция распознавания ситуации, в которой бакетирование бесполезно. Она анализирует план SQL-запроса без JOIN-соединений или агрегатов, и игнорирует бакетирование, рассматривая данные так, как будто они не были сегментированы. Эта функция по умолчанию включена, и ею можно управлять с помощью параметра конфигурации spark.sql.sources.bucketing.autoBucketedScan.enabled.
Также стоит помнить, что об использовании пользовательских функций (UDF, User Defined Functions) в JOIN-запросах: UDF игнорирует информацию о бакетировании. Вызов UDF перед соединением аналогичен ситуации, когда бакетирована только одна таблица. Поэтому обе таблицы будут перемешаны или будет одностороннее соединение без перемешивания, если перераспределить таблицу или установить количество разделов перемешивания равным количеству бакетов. Если нужно полностью избежать перемешивания, следует вызвать UDF после соединения [2].
Как настроить бакетирование таблиц: 7 основных конфигураций Apache Spark SQL
Перечислим конфигурации, которые относятся к бакетированию таблиц в Apache Spark SQL [2]:
- sql.sources.bucketing.enabled – включение и выключение бакетирования, по умолчанию установлено в значение True;
- sql.sources.bucketing.maxBuckets — максимальное количество бакетов, которые можно задать для таблицы. По умолчанию это значение равно 100000.
- sql.sources.bucketing.autoBucketedScan.enabled, чтобы игнорировать информацию бакетирования, если по результатам анализа плана запроса этот метод оптимизации бесполезен. По умолчанию это True.
- sql.bucketing.coalesceBucketsInJoin.enabled — если обе таблицы имеют разное количество бакетов, этот метод объединит сегменты таблицы с большим числом бакетов, чтобы их количество стало как в другой таблице. Метод работает, только если оба числа бакетов кратны друг другу. По умолчанию это значение установлено в False.
- sql.bucketing.coalesceBucketsInJoin.maxBucketRatio — максимальное соотношение объединяемых сегментов. По умолчанию это 4. Например, если в одной бакетированной таблице более чем в 4 раза больше сегментов, чем в другой, то объединение сегментов выполняться не будет.
- sql.legacy.bucketedTableScan.outputOrdering – позволяет использовать информацию сортировки из бакетов, что полезно, если выполняется соотношение один файл на бакет. По умолчанию это значение установлено в False.
- sql.shuffle.partitions — контрольное количество перемешиваемых разделов, по умолчанию равное 200.
Про подключение Spark SQL к внешним источникам данных в виде СУБД через JDBC-драйверы читайте здесь. А практику применения Apache Spark для разработки распределенных приложений потоковой аналитики больших данных вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Основы Apache Spark для разработчиков
- Анализ данных с Apache Spark
- Потоковая обработка в Apache Spark
Источники
- https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html
- https://towardsdatascience.com/best-practices-for-bucketing-in-spark-sql-ea9f23f7dd53