Сегодня рассмотрим особенности использования оператора LIMIT в Spark SQL: как он выполняется и почему вместо него лучше использовать оператор TABLESAMPLE. Для этого в рамках обучения дата-инженеров, разработчиков распределенных приложений и аналитиков данных заглянем под капот оптимизатора Catalyst в Apache Spark и сравним физические планы выполнения SQL-запросов.
Недостатки оператора LIMIT в Spark SQL
Чтобы вывести конкретное число строк из таблицы, в конце SQL-запроса указывается оператор LIMIT. Apache Spark предоставляет возможность использовать этот оператор для выбора фрагмента данных через API Dataframe и в SQL-запросах. Будучи распределенным фреймворком, Spark выполняет LIMIT в два этапа: сначала LocalLimit, а затем GlobalLimit, поскольку в распределенных системах данные распределены по нескольким разделам. Поэтому заданное в LIMIT ограничение сперва рассчитывается индивидуально для каждого раздела, называемого LocalLimit. При этом может выполняться логическая оптимизация LimitPushDown, которая преобразует логические планы SQl-запросов LocalLimit с операторами UNION и JOIN. LimitPushDown является частью пакета операций предварительной фильтрации встроенного оптимизатора Spark SQL под названием Catalyst, о котором мы рассказывали здесь. LimitPushDown — это просто правило оптимизатора Catalyst для преобразования логических планов, то есть Rule[LogicalPlan].
Таким образом, при указании LIMIT n для каждого раздела вычисляется n записей. За этим локальным ограничением LocalLimit следует глобальное GlobalLimit, которое подготавливает окончательный результат.
Чтобы понять, как это работает, рассмотрим SQL-запрос с оператором LIMIT и план его выполнения:
val q = spark.range(10).limit(3) scala> q.explain(extended = true) == Parsed Logical Plan == GlobalLimit 3 +- LocalLimit 3 +- Range (0, 10, step=1, splits=Some(16)) == Analyzed Logical Plan == id: bigint GlobalLimit 3 +- LocalLimit 3 +- Range (0, 10, step=1, splits=Some(16)) == Optimized Logical Plan == GlobalLimit 3 +- LocalLimit 3 +- Range (0, 10, step=1, splits=Some(16)) == Physical Plan == CollectLimit 3 +- *(1) Range (0, 10, step=1, splits=16)
GlobalLimit вызывает перетасовку данных. А shuffle-операции замедляют вычисления из-за передачи данных по сети, о чем мы писали здесь. В итоге SQL-запрос с оператором LIMIT выполняется в два этапа. Причем на втором этапе, где происходит GlobalLimit, есть выполняется только 1 задача, но она делает LIMIT дорогой и трудоемкой операцией, поскольку именно перетасовываются и считываются данные. Для небольших датасетов это может быть незаметно, но для больших наборов данных чтение в случайном порядке и выполнение даже одной задачи для подготовки результата оказывает существенное значение на длительность вычислений.
GlobalLimit — это унарный логический оператор, сохраняющий порядок. Он создается с помощью утилиты Limit.apply, которая используется, когда AstBuilder, построитель абстрактного синтаксического дерева (AST, Abstract Syntax Tree) запрашивается операторами withQueryResultClauses и withSample. Также утилита Limit.apply применяется, если используется оператор Dataset.limit, LIMIT-оператор в Catalyst DSL — наборе неявных преобразований Scala для упрощения построения структур данных оптимизатора Spark SQL., выполняется логическая оптимизация RewriteNonCorrelatedExists или физическая оптимизация CombineLimits.
Таким образом, чем больше набор данных, тем дороже LIMIT. Кроме того, чем больше значение числа строк, указанных для оператора LIMIT , тем больше перетасовка. Впрочем, можно сделать SQL-запрос с LIMIT чуть быстрее, настроив параметр spark.sql.limit.scaleUpFactor, который сообщает Spark, сколько разделов на самом деле нужно сканировать, чтобы найти записи. Но лучше вообще не использовать LIMIT, а вместо него применять оператор TABLESAMPLE, который мы рассмотрим далее.
Преимущества TABLESAMPLE
Оператор TABLESAMPLE работает как один этап, включает параллелизм и работает быстрее по сравнению с LIMIT. Оператор TABLESAMPLE используется для выборки таблицы, возвращая приблизительное количество запрошенных строк или частей с помощью следующих методов:
- TABLESAMPLE(x ROWS) — выборка таблицы до заданного количества строк;
- TABLESAMPLE(x PERCENT) — выборка таблицы до заданного процента, от 0 до 100;
- TABLESAMPLE(BUCKET x OUT OF y) — выборка части таблицы до x из y.
Чтобы продемонстрировать преимущества этого оператора перед LIMIT, сперва рассмотрим план SQL-запроса с LIMIT:
== Physical Plan == Execute InsertIntoHadoopFsRelationCommand (5) +- * GlobalLimit (4) +- Exchange (3) +- * LocalLimit (2) +- Scan csv (1)
В этом плане выполняются LocalLimit и GlobalLimit, что замедляет вычисления. А план выполнения SQL-запроса с оператором TABLESAMPLE выглядит следующим образом:
== Physical Plan == Execute InsertIntoHadoopFsRelationCommand (3) +- * Sample (2) +- Scan csv (1)
Освойте администрирование и использование Apache Spark для задач дата-инженерии, разработки распределенных приложений и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Основы Apache Spark для разработчиков
- Анализ данных с Apache Spark
- Потоковая обработка в Apache Spark
- Машинное обучение в Apache Spark
- Графовые алгоритмы в Apache Spark
Источники
- https://towardsdatascience.com/stop-using-the-limit-clause-wrong-with-spark-646e328774f5
- https://stackoverflow.com/questions/56301329/understanding-spark-explain-collect-vs-global-vs-local-limit
- https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-Optimizer-LimitPushDown.html
- https://books.japila.pl/spark-sql-internals/logical-operators/GlobalLimit/
- https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-sampling.html