В прошлый раз мы говорили о том, как установить PySpark в Google Colab, а также скачали датасет с помощью Kaggle API. Сегодня на примере этого датасета покажем, как применять операции SQL в PySpark в рамках анализа Big Data. Читайте далее про вывод статистической информации, фильтрацию, группировку и агрегирование больших данных в PySparkSQL.
Датасет с домами на продажу
Датасет Kaggle содержит данные о домах на продажу в Бруклине с 2003 по 2017 года и доступен для скачивания. Он содержит 111 атрибутов (столбцов) и 390883 записей (строк). В атрибуты включены: дата продажи, дата постройки, цена на дом, налоговый класс, соседние регионы, долгота, ширина и др.
Итак, если у вас установлен PySpark, вам нужно только скачать датасет и прочитать его. Ниже представлен код в Python для представления данных в DataFrame. В методе для чтения CSV-файла мы указали inferSchema=True, чтобы PySpark сам произвел приведение типов для каждого атрибута, т.е., например, цена должна иметь тип int, а дата продажи — timestamp (формат даты в Python).
# Если у вас Google Colab, то раскомментируйте # import findspark # findspark.init() from pyspark.sql import SparkSession spark = SparkSession.builder.master("local[*]").getOrCreate() data = spark.read.csv( 'brooklyn_sales_map.csv', inferSchema=True, header=True)
Если же у вас не установлен PySpark, то прочитайте здесь.
Получаем основную информацию о датасете
Прежде всего следует узнать, что за данные будут анализироваться: какие именно столбцы, а также сколько строк. Для этого можно вызвать атрибут columns
, который вернет список столбцов, а метод count
вернет количество строк. Вот так это выглядит в Python:
data.count() # 390883 data.columns # ['_c0', 'borough1', 'neighborhood', 'tax_class' ... остальные 107 столбцов]
Также можно проверить, правильно ли PySpark определяет типы данных. В метод select
нужно передать название столбца или список столбцов, а затем вызвать dtypes
. Следующий код показывает типы данных для даты (sale date) и цены (sale price) продажи:
data.select(['sale_date', 'sale_price']).dtypes # [('sale_date', 'timestamp'), ('sale_price', 'decimal(9,0)')]
Как видим, PySpark правильно определил типы данных этих столбцов. А чтобы получить статистическую информацию, можно вызвать метод describe
, который выдаст:
- количество записей (count);
- среднее значение (mean);
- стандартное отклонение (stddev);
- максимальное значение (max);
- минимальное значение (min).
data.describe().show() +-------+-----------------+ |summary| sale_price| +-------+-----------------+ | count| 390883| | mean| 506754.4777| | stddev|2353964.664224616| | min| 0| | max| 499401179| +-------+-----------------+
Ленивые вычисления (lazy evaluation) PySpark SQL
В коде выше, помимо describe
, мы также вызвали show
, который показывает сам DataFrame. В Pandas достаточно было бы использовать только describe
. Но в PySpark вычисления осуществляются только в тот момент, когда нужны данные, а до этого момента они откладываются. Это называется отложенные или ленивые вычисления (lazy evaluation). В нашем случае вычисления происходят только при вызове show
. Этот метод принимает в качестве аргумента количество строк, которые нужно показать (по умолчанию стоит 10).
Сортировка данных
В SQL данные сортируются по ключевому слову ORDER BY. В PySparkSQL есть аналогичный метод orederBy
, который выполняет ту же операцию, что и SQL. Например, вот так в Python можно отсортировать данные по цене продажи в порядке возрастания:
data.orderBy('sale_price').show(3) +----------+----------+------------+ |sale_price|year_built|year_of_sale| +----------+----------+------------+ | 0| 1925| 2011| | 0| 1925| 2011| | 0| 1931| 2011| +----------+----------+------------+
Кроме того, чтобы сортировать сразу по нескольким атрибутам, нужно передать их в виде списка, а также дополнительно можно указать порядок сортировки для каждого из них. Ниже код сортирует данные по цене продажи в порядке убывания, а по дате постройки – в порядке возрастания.
ordered = data.orderBy(['sale_price', 'year_built'], ascending=[0, 1]) ordered.select(['sale_price', 'year_built']).show(3) +----------+----------+ |sale_price|year_built| +----------+----------+ | 499401179| 2002| | 345000000| 0| | 340000000| 1924| +----------+----------+
Группировка и агрегирование данных
Как и в классическом SQL, в PySpark можно сгруппировать данные методом groupBy
. После группировки можно выполнить агрегирование данных. Следующий код на Python группирует датасет по соседним регионам (neighborhood) и по классу постройки (building class category):
grouped = data.groupBy(['neighborhood', 'building_class_category'])
Теперь можем применять методы агрегирования данных.
- Вывод максимальной цены продажи для каждой группы:
grouped.max('sale_price').show(3) +-----------------+-----------------------+---------------+ | neighborhood|building_class_category|max(sale_price)| +-----------------+-----------------------+---------------+ | BROOKLYN HEIGHTS| 22 STORE BUILDINGS| 12000000| | MIDWOOD| 05 TAX CLASS 1 V...| 7881412| |WILLIAMSBURG-EAST| 08 RENTALS - ELEV...| 1669345| +-----------------+-----------------------+---------------+
- Вывод средней цены продажи для каждой группы:
grouped.mean('sale_price').show(3) +-----------------+-----------------------+---------------+ | neighborhood|building_class_category|avg(sale_price)| +-----------------+-----------------------+---------------+ | BROOKLYN HEIGHTS| 22 STORE BUILDINGS| 1748589.4138| | MIDWOOD| 05 TAX CLASS 1 V...| 455718.3492| |WILLIAMSBURG-EAST| 08 RENTALS - ELEV...| 834672.5000| +-----------------+-----------------------+---------------+
- Количество записей в каждой группе:
grouped.count().show(3) +-----------------+-----------------------+-----+ | neighborhood|building_class_category|count| +-----------------+-----------------------+-----+ | BROOKLYN HEIGHTS| 22 STORE BUILDINGS| 29| | MIDWOOD| 05 TAX CLASS 1 V...| 63| |WILLIAMSBURG-EAST| 08 RENTALS - ELEV...| 2| +-----------------+-----------------------+-----+
Фильтрация данных
Не менее важным этапом анализа больших данных является их фильтрация, т.е. получение записей, которые удовлетворяют условию. В SQL обычно применяется ключевое слово WHERE, в PySpark: для этого есть filter
и where
(where является синонимом filter, поэтому они работают одинаково). Например, вот так в Python можно вывести все цены, которые больше чем $200.000.000:
filtered = data.filter("sale_price > 200000000") filtered.select('sale_price').show() +----------+ |sale_price| +----------+ | 499401179| | 345000000| | 340000000| | 276947000| | 202500000| | 205020000| | 240000000| +----------+
Фильтрацию можно осуществлять и для строковых значений. Например, метод isin
выберет в столбце те записи, которые входят в один из элементов переданного списка. Полученные записи можно передать в filter
. Ниже код оставляет только те записи с домами, которые имеют соседние регионы Midwood или Williamsburg Eeast.
filtered = data['neighborhood'] \ .isin(['WILLIAMSBURG-EAST', 'MIDWOOD']) data.filter(filtered).show(3) +---+-----------------+----------+-------------------+ |_c0| neighborhood|sale_price| sale_date| +---+-----------------+----------+-------------------+ | 59| MIDWOOD| 67486441|2014-08-07 00:00:00| |128|WILLIAMSBURG-EAST| 43250000|2016-07-21 00:00:00| |143|WILLIAMSBURG-EAST| 39500000|2016-08-23 00:00:00| +---+-----------------+----------+-------------------+
О том, как на практике применять SQL-операции для анализа больших данных в PySpark вы узнаете на специализированном курсе «Анализ данных с Apache Spark» в нашем лицензированном учебном центре обучения и повышения квалификации разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве.