Python считается из основных языков программирования в областях Data Science и Big Data, поэтому не удивительно, что Apache Spark предлагает интерфейс и для него. Data Scientist’ы, которые знают Python, могут запросто производить параллельные вычисления с PySpark. Читайте в нашей статье об инициализации Spark-приложения в Python, различии между Pandas и PySpark, доступных форматов для чтения и записи, а также интеграция с базами данных.
Инициализация через SparkContext, SparkConf и SparkSession
В первую очередь, Spark создает SparkContext — объект, который определяет, как получить доступ к кластеру в момент выполнения программы. Также определяются параметры конфигурации через SparkConf. К ним может относиться кластерный менеджер (master), с которым соединяется приложение через URL, название приложения, количество ядер и т.д (с полным списком можно ознакомиться в документации). Вот так может выглядеть инициализация Spark:
conf = SparkConf().setAppName('appName').setMaster('local[*]') sc = SparkContext(conf=conf) spark = SparkSession(sc)
Начиная со Spark 2.0, был представлен SparkSession, который служит единой точкой входа и устраняет необходимость явно создавать SparkConf и SparkContext, поскольку они инкапсулируются в SparkSession. Все функции, которые были доступны в SparkContext, теперь доступны в SparkSession. Например, код выше может быть переписан следующим образом:
spark = SparkSession.builder \ .master('local[*]') \ .appName('appName') \ .getOrCreate()
Pandas и PySpark
Python-библиотека Pandas — главный инструмент Data Scientist’а при работе с данными. Интерфейсы Pandas и PySpark имеют множество сходств, поэтому тем, кто уже знаком с Pandas, не трудно понять и PySpark. В частности, оба используют DataFrame для представления табличных данных, отсюда следует множество схожих методов. При разных названиях, они пересекаются с операциями SQL: методы distinct, where в PySpark соответствуют методам unique, filter в Pandas.
Главное отличие между PySpark и Pandas состоит в режиме выполнения. PySpark реализует lazy execution (ленивое выполнение), в то время как Pandas — eager execution (мгновенное выполнение). Допустим, требуется загрузить данные на диске и применить к ним операции трансформации (map). Pandas выполнит их тут же. PySpark же сохранит всю последовательность необходимых операций и выполнит их в том случае, когда данные понадобятся. Ниже приведены примеры с сортировкой данных в PySpark и Pandas.
import pandas as pd data = pd.read_csv(“file.csv”) data = data.apply(lambda x: sorted(x)) # сразу же изменяются данные
data = spark.read.csv(“file.csv”) def sort_cols_asc(input_df): return input_df.select(*sorted(input_df.columns)) data = data.transform(sort_cols_asc) # в ожидании выполнения операций data.show() # вот теперь применит
Отметим также — очень просто перейти от PySpark к Pandas и наоборот:
# Из Pandas в PySpark spark_df = spark.createDataFrame(pandasDF) # Из PySpark в Pandas pandasDF = spark_df.toPandas()
Доступные форматы для чтения и записи
PySpark поддерживает такие основные форматы, как CSV, JSON, ORC, Parquet. Разберемся с синтаксисом чтения и записи, который практически одинаковый.
- CSV (Comma-separated values, значения, разделенные запятыми) — наиболее часто используемый формат для хранения датасетов:
data = spark.read.csv(“file.csv”) # прочитать data.write.csv(”file_dir”, sep=',') # записать
- JSON (JavaScript Object Notation) применяется для сериализации данных, используется также в MongoDB:
data = spark.read.json(“file.json”) data.write.json(”file_dir”)
- ORC (Optimized Row Columnar) — формат хранения данных экосистем Apache Hadoop:
data = spark.read.orc(“orc_file”) data.write.orc()
- Parquet- еще один формат экосистем Apache Hadoop, который может изменяться в соответствии с изменением данных, а также поддерживает слияние схем:
data = spark.read.parquet(“parquet_file”) data.write.parquet(”file_dir”)
Такие примеры записи написаны больше в Python-стиле. PySpark также поддерживает функциональный стиль программирования. Вот так, например, будет выглядеть чтение CSV-файла:
data = spark.read \ .format(“json”) \ .load(“file.json”) \ .option("header", True)
Доступные базы данных
Также PySpark может взаимодействовать с SQL и NoSQL базами данных. Рассмотрим также доступные базы данных и их синтаксис взаимодействия. Ниже будет уже подразумеваться, что все базы данных подключены, поэтому остается только установить взаимодействие с PySpark.
- Реляционные СУБД, например, MySQLили PostgreSQL. Для чтения необходимо указать соединение с базой данных, соответствующую таблицу и пароль:
dbURL = "jdbc:mysql://localhost/bigdataschool" data = spark.read. \ .format("jdbc") \ .options( url=dbURL, database="bigdataschool", dbtable='some_table', user="root", password="your_pass") \ .load()
- Cassandra- распределенная NoSQL СУБД с упором на надежность и работу с большими данными (Big Data). Здесь необходимо указать таблицу, а также пространство ключей:
data = spark.read \ .format("org.apache.spark.sql.cassandra") \ .option(table="t2", keyspace="test") \ .load()
- MongoDB- также является NoSQL-СУБД, которая использует формат JSON. Нужно указать соединение с коллекцией:
data = spark.read \ .format("com.mongodb.spark.sql.DefaultSource") .option("uri","mongodb://127.0.0.1/bigdataschool.courses") \ .load()
- ApacheHive — СУБД на основе платформы Hadoop. В методе table указывается названии таблицы в формате <база_данных>.<таблица>:
courses = spark.table("bigdataschool.courses")
Подобным образом, DataFrame можно записать в базу данных. Правда, для Apache Hive при инициализации SparkSession необходимо включить его поддержку, вызвав метод enableHiveSupport [1]:
spark = SparkSession \ .builder \ .appName("Python Spark SQL Hive integration example") \ .config("spark.sql.warehouse.dir", "spark-warehouse") \ .enableHiveSupport() \ .getOrCreate()
В следующий раз рассмотрим пример выполнения SQL-операций PySpark на конкретном датасете в Google Colab. А как на практике использовать PySpark в проектах аналитики больших данных, вы узнаете на специализированном курсе «Анализ данных с Apache Spark» в нашем лицензированном учебном центре обучения и повышения квалификации разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве.