Говоря про обучение Apache Spark для разработчиков, сегодня мы рассмотрим, как быстро конвертировать Python-скрипты в задания PySpark и какие конфигурационные параметры при этом нужно настроить, чтобы эффективно использовать все возможности распределенных вычислений над большими данными (Big Data). Читайте далее, чем отличаются датафреймы в Pandas и Apache Spark, для чего нужны Arrow и Koalas, а также как оптимально превратить локальный датасет в RDD.
5 шагов от Python-скриптов к заданиям PySpark
Прежде всего, напомним, что язык программирования Python ориентирован на локальную работу с данными в пределах одного компьютера, тогда как Apache Spark – это фреймворк распределенных вычислений, где данные распределены по нескольким узлам кластера. Поэтому, несмотря на наличие API-интерфейса Python в Spark, называемого PySpark, чтобы использовать всю мощь распределенной среды, Python-код не просто конвертируется в задания PySpark, а требует последующей настройки. При этом необходимо поработать как с датасетом, приведя в соответствие структуры данных, так и с программной логикой, выполнив следующие действия [1]:
- преобразование локального датафрейма Pandas в Spark Dataframe. Это можно сделать с помощью Apache Arrow, независимого от языка столбцового формата данных в оперативной памяти или Koalas, который представляет собой API Pandas в Apache Spark. Koalas дополняет API DataFrame PySpark, обеспечивая совместимость с Pandas.
При работе с Apache Arrow, следует установить конфигурационный параметр Spark spark.sql.execution.arrow.pyspark.enabled в значение True. Ниже показан пример, как это сделать на PySpark:
import numpy as np import pandas as pd # Enable Arrow-based spark configuration spark.conf.set(“spark.sql.execution.arrow.enabled”, “true”) # Generate a pandas DataFrame data = [1,2,3,4,5] pdf = pd.DataFrame(data) # Create a Spark DataFrame from a pandas DataFrame using Arrow df = spark.createDataFrame(pdf) # Convert the Spark DataFrame back to a pandas DataFrame using Arrow final_pdf = df.select(“*”).toPandas()
- Создание пользовательской функции PySpark (UDF, User Defined Function), аналогичной функции Python.
UDF в PySpark можно определить двумя способами:
- через лямбда-выражение udfname = udf(LAMBDA_EXPRESSION, RETURN_TYPE )
- через настраиваемую функцию udfname = udf(CUSTOM_FUNCTION, RETURN_TYPE)
В любом случае, UDF PySpark принимает столбцы и применяет логику построчно для создания нового столбца. Например, следующий код показывает, как определить UDF-функцию возведения целого числа в квадрат:
from pyspark.sql.functions import col, udf from pyspark.sql.types import IntegerType def squared(s): return s * s square_udf = udf(lambda x: squared(x), IntegerType()) df = spark.createDataFrame([(4),(8)], [“numbers”]) df.withColumn(“num_square”, square_udf(col(“numbers”))
- Загрузка датасета в распределенные структуры данных Spark: RDD или DataFrame, что можно сделать следующим образом:
#Load dataset as RDD path=”file path with file name” text_rdd = sc.textFile(path) #Load dataset as DataFrame df=spark.read.format(“csv”).option(“header”,”true”).option(“inferSchema”,”true”).load(path)
- Использование преобразования map() вместо циклов для каждого элемента RDD с применением функции, возвращающей новый RDD. Это повысит производительность кода, т.к. к примеру, из-за цикла for вычисления будут происходить последовательно, а не параллельно, увеличивая время выполнения программы.
- Учет взаимозависимости датафреймов: например, если новое значение столбца DataFrame зависит от других таких же структур данных, целесообразно соединить их через JOIN и вызвать UDF-функцию, чтобы получить новое значение столбца.
Полученное таким образом приложение PySpark нуждается в дополнительной настройке параметров. Это стоит сделать, чтобы более эффективно работать в кластере Apache Spark, оптимально используя все ресурсы распределенной среды. Частично эту проблему решает развертывание Spark-заданий решает их развертывание в кластере Kubernetes, о чем мы рассказываем здесь. А как самостоятельно оптимизировать переход на распределенный режим с локального Python-приложения, мы рассмотрим далее.
Core Spark - основы для разработчиков
Код курса
CORS
Ближайшая дата курса
16 декабря, 2024
Продолжительность
16 ак.часов
Стоимость обучения
48 000 руб.
Что настроить в приложении Apache Spark: 5 главных факторов
Итак, поскольку приложение PySpark выполняется в распределенной среде кластера Apache Spark, при настройке Spark-заданий перед их запуском стоит учитывать следующие аспекты [1]:
- уровень параллелизма – если он недостаточный, часть ресурсов кластера Spark будет простаивать, что неэффективно с точки зрения экономики. С другой стороны, при слишком большом распараллеливании заданий, увеличиваются накладные расходы, связанные с каждым разделом (partition) для распределенной обработки RDD. Настроить уровень параллелизма можно такими способами:
- указать количество разделов при вызове операций перетасовки данных (shuffle);
- распространять данные с помощью repartition() – для дататфреймов или coalesce(), чтобы уменьшить количество разделов.
- количество исполнителей (executor) и ядер. Примечательно, что простое увеличение этих факторов не всегда ведет к повышению производительности. Каждая задача будет обрабатываться одним ядром процессора в кластере. Например, если указано 3 ядра, один исполнитель будет обрабатывать 3 задачи параллельно. Обычно, рекомендуется задавать не более 5 ядер. Можно автоматически изменять этот параметр, используя динамическое размещение dynamicAllocation.enabled, которое масштабирует количество исполнителей в приложении в зависимости от рабочей нагрузки.
- Широковещательные переменные (broadcast variable), которые похожи на распределенный кэш в Hadoop, позволяя локально хранить переменную только для чтения в кэше на отдельном узле кластера, а не отправлять ее копию вместе с задачами.
Широковещательный набор данных означает, что данные будут доступны для всех исполнителей, что снижает перетасовку данных и уменьшает накладные расходы на передачу данных по сети. Использовать broadcast-переменные целесообразно в следующих случаях:
- для эффективного предоставления каждому узлу копии большого входного набора данных;
- при работе со справочными данными, которые доступны только для чтения и не меняются на протяжении всего жизненного цикла Spark-приложения;
- одни и те же данные используются на нескольких этапах выполнения приложения;
- данных слишком мало, чтобы заполнить память рабочего узла;
- большой набор данных соединяется с небольшим датасетом, broadcasting которого поможет повысить общую производительность приложения.
По умолчанию параметр spark.sql.autoBroadcastJoinThreshold равен 10 МБ, что означает максимальный размер в байтах для таблицы, которая будет транслироваться всем рабочим узлам при выполнении соединения. Изменить это значение можно до того размера, который требуется установить для трансляции большого датасета другим узлам кластера Spark [2].
- Кэширование данных, которые используются более одного раза в задании Spark, чтобы избежать повторного вычисления RDD или DataFrame.
- Сериализация данных, которая в Spark по умолчанию выполняется на базе Java. Это обеспечивает гибкость и совместимость с большинством классов, но производится достаточно медленно. Для ускорения сериализации и десериализации рекомендуется делать это с помощью библиотеки Kryo. Она поддерживает кэширование и перетасовку RDD, но изначально не поддерживает сериализацию на диск. А часто используемые на практике методы saveAsObjectFile() в RDD и objectFile() в SparkContext поддерживают только Java-сериализацию. Тем не менее, если нужно повысить производительность и снизить потребление памяти, стоит попробовать Kryo. В частности, сериализация влияет на JOIN-операции и группировки, которые обычно включают перетасовку данных. Однако, чем меньше объем данных для перетасовки, тем быстрее будет выполняться операция [3]. Задать количество разделов, используемых при перетасовке данных для объединений или агрегатов можно с помощью параметра sql.shuffle.partitions.
Переключиться на использование Kryo можно, инициализировав задание с помощью объекта SparkConf, например, conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”). Также можно задать сериализатор в команде spark-submit: — conf spark.serializer = org.apache.spark.serializer.KryoSerializer.
С сериализацией и десериализацией также связан фактор формата данных. В частности, Avro и Parquet предпочтительнее для файлов с большими данными, чем текстовые форматы, CSV и JSON.
Анализ данных с помощью современного Apache Spark
Код курса
SPARK
Ближайшая дата курса
16 декабря, 2024
Продолжительность
32 ак.часов
Стоимость обучения
96 000 руб.
Завтра мы продолжим разбираться с методами повышения производительности Spark-приложений и рассмотрим особенности кэширования SQL-запросов в этом Big Data фреймворке. А освоить все особенности администрирования и разработки Apache Spark для аналитики больших данных вы сможете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Основы Apache Spark для разработчиков
- Анализ данных с Apache Spark
- Потоковая обработка в Apache Spark
- Машинное обучение в Apache Spark
- Графовые алгоритмы в Apache Spark
Источники