Содержание
Для чего смотреть планы выполнения запросов при работе с API pandas в Spark и как это сделать: примеры использования метода spark.explain() и его аргументов для вывода логических и физических планов. Разбираем на примере PySpark-скрипта.
API pandas и физический план выполнения запроса в Apache Spark
Мы уже писали, что PySpark, API-интерфейс Python в Apache Spark, позволяет работать с популярной библиотекой pandas, которая довольно известна, но по своей природе не очень хорошо работает с большим объемом данных. Тем не менее, многие аналитики данных и дата-инженеры по-прежнему используют pandas в своих PySpark-приложениях. Чтобы делать это более эффективно, можно попытаться избежать дорогостоящих операций с датафреймами, вызвав метод spark.explain(). Он построит физический план выполнения этой операции, который всегда представляет собой RDD. Если вызвать метод spark.explain() с аргументом True, он также покажет и логический план — краткое изложение всех шагов преобразования, которые необходимо выполнить. Логический план позволяет получить наиболее оптимизированную версию пользовательского выражения, но не предоставляет подробную информацию о драйвере (главном узле) или исполнителе (рабочем узле). За создание и хранение логического плана отвечает SparkContext, который также использует API pandas в Apache Spark.
Посмотрим, как это работает, написав несложный PySpark-скрипт с использованием API pandas для генерации датафрейма с зарплатами по разным специальностям, вычисления медианной зарплаты и фильтрации тех строк, где зарплата оказалась выше медианной. PySpark-скрипт, запускаемый в Colab, выглядит таким образом:
# Установка необходимых библиотек
!pip install pyspark
!pip install faker
# Импорт необходимых модулей из pyspark
import pyspark
from pyspark.sql import SparkSession
import pyspark.pandas as ps
# Импорт модуля faker
from faker import Faker
from faker.providers.address.ru_RU import Provider
#Создаем сессию Spark
spark = SparkSession.builder.getOrCreate()
# Создание объекта Faker с использованием провайдера адресов для России
fake = Faker('ru_RU')
fake.add_provider(Provider)
# Генерируем данные
i=2000
jobs = [fake.job() for _ in range(i)]
salaries = [fake.random_int(min=20, max=720) for _ in range(i)]
# Создаем Spark DataFrame
data = list(zip(jobs, salaries))
Sample_schema = ["job", "salary"]
dataframe = spark.createDataFrame(data, schema = Sample_schema)
# Выводим DataFrame
print('Исходный датафрейм')
dataframe.show(n=10, truncate=False)
# Преобразуем Spark DataFrame в Pandas DataFrame
psdf = ps.DataFrame(dataframe)
# Фильтруем строки, где зарплата выше определенного значения (например, медианы)
median_salary = psdf['salary'].median()
filtered_psdf = psdf[psdf['salary'] > median_salary].sort_values(by=['salary'], ascending=False)
print('Медианная зарплата = ', median_salary)
ps.set_option("display.max_rows", 10)
print('ТОП-10 профессий с зарплатой выше медианной')
display(filtered_psdf)
# Выводим информацию о плане выполнения запроса
filtered_psdf.spark.explain()
Физический план выполнения запроса с датафреймом pandas, вызываемый методом spark.explain() без аргумента True, выглядит так:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [__index_level_0__#2254L, job#2239, salary#2240L]
+- Sort [salary#2240L DESC NULLS LAST, __natural_order__#2301L ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(salary#2240L DESC NULLS LAST, __natural_order__#2301L ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=2697]
+- Project [distributed_sequence_id#2255L AS __index_level_0__#2254L, job#2239, salary#2240L, monotonically_increasing_id() AS __natural_order__#2301L]
+- Filter CASE WHEN CASE WHEN CASE WHEN isnull(cast(salary#2240L as double)) THEN false ELSE isnull(cast(salary#2240L as double)) END THEN false ELSE CASE WHEN isnull(cast(salary#2240L as double)) THEN false ELSE isnull(cast(salary#2240L as double)) END END THEN false ELSE CASE WHEN CASE WHEN isnull(cast(salary#2240L as double)) THEN false ELSE isnull(cast(salary#2240L as double)) END THEN false ELSE CASE WHEN isnull(cast(salary#2240L as double)) THEN false ELSE (cast(salary#2240L as double) > 366.0) END END END
+- AttachDistributedSequence[distributed_sequence_id#2255L, job#2239, salary#2240L] Index: distributed_sequence_id#2255L
+- Scan ExistingRDD[job#2239,salary#2240L]

Некоторые операции, например, sort_values(), сложнее выполнять в параллельной или распределенной среде, чем в памяти на одном компьютере, поскольку необходимо отправлять данные на другие узлы через сеть. Об этом свидетельствует операция Exchange в плане выполнения запросов. По возможности рекомендуется избегать таких shuffle-операций, поскольку они замедляют вычисления из-за накладных расходов передачи данных по сети.
Логический план выполнения запроса
При вызове метода spark.explain() с аргументом True, выводимая информация будет намного полнее. Теперь она включает не только физический план, но и логический план, причем все стадии работы с ним: от синтаксического анализа до оптимизации перед генерацией физического плана.
== Parsed Logical Plan ==
Project [__index_level_0__#2351L, job#2336, salary#2337L]
+- Project [__index_level_0__#2351L, job#2336, salary#2337L, monotonically_increasing_id() AS __natural_order__#2410L]
+- Project [__index_level_0__#2351L, job#2336, salary#2337L]
+- Sort [salary#2337L DESC NULLS LAST, __natural_order__#2398L ASC NULLS FIRST], true
+- Project [__index_level_0__#2351L, job#2336, salary#2337L, __natural_order__#2398L]
+- Project [__index_level_0__#2351L, job#2336, salary#2337L, monotonically_increasing_id() AS __natural_order__#2398L]
+- Project [__index_level_0__#2351L, job#2336, salary#2337L]
+- Filter CASE WHEN isnull(CASE WHEN isnull(CASE WHEN isnull((cast(salary#2337L as double) > 364.0)) THEN false ELSE (cast(salary#2337L as double) > 364.0) END) THEN false ELSE CASE WHEN isnull((cast(salary#2337L as double) > 364.0)) THEN false ELSE (cast(salary#2337L as double) > 364.0) END END) THEN false ELSE cast(CASE WHEN isnull(CASE WHEN isnull((cast(salary#2337L as double) > 364.0)) THEN false ELSE (cast(salary#2337L as double) > 364.0) END) THEN false ELSE CASE WHEN isnull((cast(salary#2337L as double) > 364.0)) THEN false ELSE (cast(salary#2337L as double) > 364.0) END END as boolean) END
+- Project [__index_level_0__#2351L, job#2336, salary#2337L, monotonically_increasing_id() AS __natural_order__#2356L]
+- Project [__index_level_0__#2351L, job#2336, salary#2337L]
+- Project [distributed_sequence_id#2352L AS __index_level_0__#2351L, job#2336, salary#2337L]
+- AttachDistributedSequence[distributed_sequence_id#2352L, job#2336, salary#2337L] Index: distributed_sequence_id#2352L
+- LogicalRDD [job#2336, salary#2337L], false
== Analyzed Logical Plan ==
__index_level_0__: bigint, job: string, salary: bigint
Project [__index_level_0__#2351L, job#2336, salary#2337L]
+- Project [__index_level_0__#2351L, job#2336, salary#2337L, monotonically_increasing_id() AS __natural_order__#2410L]
+- Project [__index_level_0__#2351L, job#2336, salary#2337L]
+- Sort [salary#2337L DESC NULLS LAST, __natural_order__#2398L ASC NULLS FIRST], true
+- Project [__index_level_0__#2351L, job#2336, salary#2337L, __natural_order__#2398L]
+- Project [__index_level_0__#2351L, job#2336, salary#2337L, monotonically_increasing_id() AS __natural_order__#2398L]
+- Project [__index_level_0__#2351L, job#2336, salary#2337L]
+- Filter CASE WHEN isnull(CASE WHEN isnull(CASE WHEN isnull((cast(salary#2337L as double) > 364.0)) THEN false ELSE (cast(salary#2337L as double) > 364.0) END) THEN false ELSE CASE WHEN isnull((cast(salary#2337L as double) > 364.0)) THEN false ELSE (cast(salary#2337L as double) > 364.0) END END) THEN false ELSE cast(CASE WHEN isnull(CASE WHEN isnull((cast(salary#2337L as double) > 364.0)) THEN false ELSE (cast(salary#2337L as double) > 364.0) END) THEN false ELSE CASE WHEN isnull((cast(salary#2337L as double) > 364.0)) THEN false ELSE (cast(salary#2337L as double) > 364.0) END END as boolean) END
+- Project [__index_level_0__#2351L, job#2336, salary#2337L, monotonically_increasing_id() AS __natural_order__#2356L]
+- Project [__index_level_0__#2351L, job#2336, salary#2337L]
+- Project [distributed_sequence_id#2352L AS __index_level_0__#2351L, job#2336, salary#2337L]
+- AttachDistributedSequence[distributed_sequence_id#2352L, job#2336, salary#2337L] Index: distributed_sequence_id#2352L
+- LogicalRDD [job#2336, salary#2337L], false
== Optimized Logical Plan ==
Project [__index_level_0__#2351L, job#2336, salary#2337L]
+- Sort [salary#2337L DESC NULLS LAST, __natural_order__#2398L ASC NULLS FIRST], true
+- Project [distributed_sequence_id#2352L AS __index_level_0__#2351L, job#2336, salary#2337L, monotonically_increasing_id() AS __natural_order__#2398L]
+- Filter CASE WHEN CASE WHEN CASE WHEN isnull(cast(salary#2337L as double)) THEN false ELSE isnull(cast(salary#2337L as double)) END THEN false ELSE CASE WHEN isnull(cast(salary#2337L as double)) THEN false ELSE isnull(cast(salary#2337L as double)) END END THEN false ELSE CASE WHEN CASE WHEN isnull(cast(salary#2337L as double)) THEN false ELSE isnull(cast(salary#2337L as double)) END THEN false ELSE CASE WHEN isnull(cast(salary#2337L as double)) THEN false ELSE (cast(salary#2337L as double) > 364.0) END END END
+- AttachDistributedSequence[distributed_sequence_id#2352L, job#2336, salary#2337L] Index: distributed_sequence_id#2352L
+- LogicalRDD [job#2336, salary#2337L], false
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [__index_level_0__#2351L, job#2336, salary#2337L]
+- Sort [salary#2337L DESC NULLS LAST, __natural_order__#2398L ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(salary#2337L DESC NULLS LAST, __natural_order__#2398L ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=2794]
+- Project [distributed_sequence_id#2352L AS __index_level_0__#2351L, job#2336, salary#2337L, monotonically_increasing_id() AS __natural_order__#2398L]
+- Filter CASE WHEN CASE WHEN CASE WHEN isnull(cast(salary#2337L as double)) THEN false ELSE isnull(cast(salary#2337L as double)) END THEN false ELSE CASE WHEN isnull(cast(salary#2337L as double)) THEN false ELSE isnull(cast(salary#2337L as double)) END END THEN false ELSE CASE WHEN CASE WHEN isnull(cast(salary#2337L as double)) THEN false ELSE isnull(cast(salary#2337L as double)) END THEN false ELSE CASE WHEN isnull(cast(salary#2337L as double)) THEN false ELSE (cast(salary#2337L as double) > 364.0) END END END
+- AttachDistributedSequence[distributed_sequence_id#2352L, job#2336, salary#2337L] Index: distributed_sequence_id#2352L
+- Scan ExistingRDD[job#2336,salary#2337L]

Вместо аргумента True в методе spark.explain() можно использовать аргумент mode=»extended«, который также выводит логические и физические планы выполнения запроса. А аргумент mode=»formatted» показывает разделенный вывод, созданный на основе оптимизированного физического плана, и раздел с деталями каждого узла:
== Physical Plan ==
AdaptiveSparkPlan (8)
+- Project (7)
+- Sort (6)
+- Exchange (5)
+- Project (4)
+- Filter (3)
+- AttachDistributedSequence (2)
+- Scan ExistingRDD (1)
(1) Scan ExistingRDD
Output [2]: [job#2724, salary#2725L]
Arguments: [job#2724, salary#2725L], MapPartitionsRDD[678] at applySchemaToPythonRDD at <unknown>:0, ExistingRDD, UnknownPartitioning(0)
(2) AttachDistributedSequence
Input [2]: [job#2724, salary#2725L]
Arguments: distributed_sequence_id#2740: bigint
(3) Filter
Input [3]: [distributed_sequence_id#2740L, job#2724, salary#2725L]
Condition : CASE WHEN CASE WHEN CASE WHEN isnull(cast(salary#2725L as double)) THEN false ELSE isnull(cast(salary#2725L as double)) END THEN false ELSE CASE WHEN isnull(cast(salary#2725L as double)) THEN false ELSE isnull(cast(salary#2725L as double)) END END THEN false ELSE CASE WHEN CASE WHEN isnull(cast(salary#2725L as double)) THEN false ELSE isnull(cast(salary#2725L as double)) END THEN false ELSE CASE WHEN isnull(cast(salary#2725L as double)) THEN false ELSE (cast(salary#2725L as double) > 379.0) END END END
(4) Project
Output [4]: [distributed_sequence_id#2740L AS __index_level_0__#2739L, job#2724, salary#2725L, monotonically_increasing_id() AS __natural_order__#2786L]
Input [3]: [distributed_sequence_id#2740L, job#2724, salary#2725L]
(5) Exchange
Input [4]: [__index_level_0__#2739L, job#2724, salary#2725L, __natural_order__#2786L]
Arguments: rangepartitioning(salary#2725L DESC NULLS LAST, __natural_order__#2786L ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=3155]
(6) Sort
Input [4]: [__index_level_0__#2739L, job#2724, salary#2725L, __natural_order__#2786L]
Arguments: [salary#2725L DESC NULLS LAST, __natural_order__#2786L ASC NULLS FIRST], true, 0
(7) Project
Output [3]: [__index_level_0__#2739L, job#2724, salary#2725L]
Input [4]: [__index_level_0__#2739L, job#2724, salary#2725L, __natural_order__#2786L]
(8) AdaptiveSparkPlan
Output [3]: [__index_level_0__#2739L, job#2724, salary#2725L]
Arguments: isFinalPlan=false

Когда над объектами Spark вызывается много операций API pandas, базовый планировщик фреймворка может замедлиться из-за огромного и сложного плана. В таком случае можно вызвать метод работы с контрольной точкой DataFrame.spark.checkpoint() или DataFrame.spark.local_checkpoint(), который удалит предыдущий план выполнения и построит его заново в более простом варианте. Результат предыдущего DataFrame сохраняется в настроенной файловой системе при вызове DataFrame.spark.checkpoint() или в исполнителе при вызове DataFrame.spark.local_checkpoint().
Поскольку в Apache Spark раздел является единицей параллелизма, рекомендуется избегать вычислений на одном разделе. Однако, некоторые API, такие как DataFrame.rank, используют оконные функции PySpark без указания раздела. Это перемещает все данные в один раздел на одном узле и может привести к серьезному снижению производительности. Таких API следует избегать для очень больших наборов данных. Например, добавим к фильтрации профессий с зарплатой выше медианной функцию rank() и посмотрим, как это отразится на физическом плане выполнения запроса.
В физическом плане появилась оконная функция, а сам он теперь выглядит так:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [__index_level_0__#3163L, _we0#3241 AS job#3229, _we1#3242 AS salary#3235]
+- Window [avg(_w2#3240) windowspecdefinition(salary#3149L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS _we1#3242], [salary#3149L]
+- Sort [salary#3149L ASC NULLS FIRST], false, 0
+- Project [__index_level_0__#3163L, _w2#3240, salary#3149L, _we0#3241]
+- Window [avg(_w0#3239) windowspecdefinition(job#3148, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS _we0#3241], [job#3148]
+- Sort [job#3148 ASC NULLS FIRST], false, 0
+- Project [__index_level_0__#3163L, _w0#3239, job#3148, _w2#3240, salary#3149L]
+- Window [row_number() windowspecdefinition(salary#3149L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _w2#3240], [salary#3149L ASC NULLS FIRST]
+- Sort [salary#3149L ASC NULLS FIRST], false, 0
+- Window [row_number() windowspecdefinition(job#3148 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _w0#3239], [job#3148 ASC NULLS FIRST]
+- Sort [job#3148 ASC NULLS FIRST], false, 0
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=4297]
+- Project [__index_level_0__#3163L, job#3148, salary#3149L]
+- Sort [salary#3149L DESC NULLS LAST, __natural_order__#3210L ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(salary#3149L DESC NULLS LAST, __natural_order__#3210L ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=4293]
+- Project [distributed_sequence_id#3164L AS __index_level_0__#3163L, job#3148, salary#3149L, monotonically_increasing_id() AS __natural_order__#3210L]
+- Filter CASE WHEN CASE WHEN CASE WHEN isnull(cast(salary#3149L as double)) THEN false ELSE isnull(cast(salary#3149L as double)) END THEN false ELSE CASE WHEN isnull(cast(salary#3149L as double)) THEN false ELSE isnull(cast(salary#3149L as double)) END END THEN false ELSE CASE WHEN CASE WHEN isnull(cast(salary#3149L as double)) THEN false ELSE isnull(cast(salary#3149L as double)) END THEN false ELSE CASE WHEN isnull(cast(salary#3149L as double)) THEN false ELSE (cast(salary#3149L as double) > 365.0) END END END
+- AttachDistributedSequence[distributed_sequence_id#3164L, job#3148, salary#3149L] Index: distributed_sequence_id#3164L
+- Scan ExistingRDD[job#3148,salary#3149L]

Чтобы rank() не замедлял работу распределенной программы, перемещая данные в один раздел, можно использовать метод groupBy.rank, который менее затратен, поскольку данные можно распределять и вычислять для каждой группы. В ранее приведенном примере нет явно выделяемых групп, поэтому оставим этот совет без иллюстрации.
Узнайте больше про возможности Apache Spark для разработки приложений аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Основы Apache Spark для разработчиков
- Потоковая обработка в Apache Spark
- Анализ данных с Apache Spark
- Машинное обучение в Apache Spark
- Графовые алгоритмы в Apache Spark
- Архитектура данных с Apache Spark
Источники


