PySpark позволяет работать не только с большими данными (Big data), но и создавать модели машинного обучения (Machine Learning). Сегодня мы расскажем вам о модуле ML и покажем, как обучить модель Machine Learning для решения задачи классификации. Читайте у нас: подготовка данных, применение логистической регрессии, а также использование метрик качеств в PySpark.
Датасет с домами на продажу
В качестве примера мы будем использовать датасет Kaggle, который содержит данные о домах на продажу в Бруклине с 2003 по 2017 года и доступен для скачивания. Он содержит 111 атрибутов (столбцов) и 390883 записей (строк). В атрибуты включены: дата продажи, дата постройки, цена на дом, налоговый класс, соседние регионы, долгота, ширина и др.
# Если у вас Google Colab, то раскомментируйте # import findspark # findspark.init() from pyspark.sql import SparkSession spark = SparkSession.builder.master("local[*]").getOrCreate() data = spark.read.csv( 'brooklyn_sales_map.csv', inferSchema=True, header=True)
О Kaggle API и о том, как установить PySpark в Google Colab, читайте здесь.
Готовим атрибут для последующей бинарной классификации
Допустим, требуется классифицировать налоговый класс на дом (tax_class). Всего имеется 10 таких классов. Поскольку данные распределены неравномерно (например, в классе 1 имеется 198969 записей, а в 3-м — только 18), мы разделим их на 2 категории: те, которые принадлежат классу 1, и остальные. В Python это делается очень просто, нужно просто вызвать метод replace
:
by_1 = ['1', '1A', '1B', '1C'] by_others = ['2', '2A', '2B', '2C', '3', '4'] data = data.replace(by_others, '0', ['tax_class']) data = data.replace(by_1, '1', ['tax_class'])
Кроме того, алгоритмы Machine Learning в PySpark работают с числовым значениями, а не со строками. Поэтому преобразуем значения столбца tax_class
в тип int:
data = data.withColumn('tax_class', data.tax_class.cast('int'))
Подбор признаков и преобразование категорий
Выберем следующие признаки для обучения модели Machine Learning: год постройки (year_of_sale
), цена на дом (sale_price)
и соседние регионы (neighborhood
). Последние атрибут является категориальным признаком — в данных имеется 20 соседних регионов. Но опять же все значения этих категорий являются строковыми, поэтому нужно преобразовать их в числовые.
Можно воспользоваться методом replace
, как это сделано выше, но придётся сначала извлечь названия всех 20 регионов. А можно использовать специальный класс StringIndexer из PySpark-модуля ML, который выполнит за нас всю работу. Объект этого класса принимает в качестве аргументов: название атрибута, который нужно преобразовать (inputCol
), и название, которое будет иметь преобразованный атрибут (outputCol
). Вот так это выглядит в Python:
from pyspark.ml.feature import StringIndexer indexer = StringIndexer(inputCol="neighborhood", outputCol="neighborhood_id") data = indexer.fit(data).transform(data)
Преобразованные категории имеют вид:
data.groupBy('neighborhood_id').count().show() # +---------------+-----+ |neighborhood_id|count| +---------------+-----+ | 8.0|13215| | 0.0|27279| | 7.0|13387| | 49.0| 2271| | 29.0| 5074| | 47.0| 2422| | 42.0| 3086| | 44.0| 2802| | 35.0| 4000| | 62.0| 2| | 18.0| 7342| | 1.0|21206| | 39.0| 3396| | 37.0| 3894| | 34.0| 4037| | 25.0| 5809| | 36.0| 3984| | 41.0| 3138| | 4.0|14608| | 23.0| 6374| +---------------+-----+
Теперь выберем необходимые признаки, а также отбросим строки с пустыми значениями с помощью метода dropna
в PySpark:
features = ['year_of_sale', 'sale_price', 'neighborhood_id'] target = 'tax_class' attributes = features + [target] sample = data.select(attributes).dropna()
Векторизация признаков
Поскольку алгоритмы машинного обучения в PySpark принимают на вход только вектора, то нужно провести векторизацию. Для преобразования признаков в вектора используется класс VectorAssembler. Объект этого класса принимает в качестве аргументов список с названиями признаков, которые нужно векторизовать (inputCols), и название преобразованного признака (outputCol). После создания объекта VectorAssembler вызывается метод transform
.
Для начала выберем в качестве признака для преобразования — цену на дом. Код на Python:
from pyspark.ml.feature import VectorAssembler assembler = VectorAssembler(inputCols=['sale_price'], outputCol='features') output = assembler.transform(sample)
Полученный после векторизации DataFrame выглядит следующим образом:
output.show(5) +------------+----------+---------------+---------+--------------+ |year_of_sale|sale_price|neighborhood_id|tax_class| features| +------------+----------+---------------+---------+--------------+ | 2008| 499401179| 48.0| 0|[4.99401179E8]| | 2016| 345000000| 41.0| 0| [3.45E8]| | 2016| 340000000| 27.0| 0| [3.4E8]| +------------+----------+---------------+---------+--------------+
Разделение датасета и обучение модели
Для решения задач Machine Learning всегда нужно иметь, как минимум, две выборки — обучающую и тестовую. На обучающей мы будем обучать модель, а на тестовой проверять эффективность обученной модели. В PySpark сделать это очень просто, нужно просто вызвать метод randomSplit
, который разделит исходный датасет в заданной пропорции. Мы разделим в пропорции 80:20, в Python это выглядит так:
train, test = output.randomSplit([0.8, 0.2])
Теперь воспользуемся логистической регрессией (Logistic Regression) [1], которая есть в PySpark, в качестве алгоритма Machine learning. Для этого нужно указать признаки, на которых модель обучается, и признак, который нужно классифицировать. Мы преобразовали цену на дом (sale price) в вектор под названием features
, поэтому именно его и указываем в аргументе:
from pyspark.ml.classification import LogisticRegression lr = LogisticRegression(featuresCol='features', labelCol='tax_class') model = lr.fit(train)
Осталось только получить предсказания. Для этого вызывается метод transform
, который принимает тестовую выборку:
predictions = model.transform(test)
Проверим эффективность модели, используя метрику качества. И в этом случае PySpark нас выручает, поскольку у него есть класс BinaryClassificationEvaluator. Нужно лишь указать целевой признак (tax class), а затем вызвать метод evaluate
и передать в него наши предсказания. В Python это выглядит так:
from pyspark.ml.evaluation import BinaryClassificationEvaluator evaluator = BinaryClassificationEvaluator(labelCol='tax_class') print('Evaluation:', evaluator.evaluate(predictions)) # Evaluation: 0.5242388483600111
Как видим, мы получили точность только 52%, что очень мало. Попробуем добавить ещё несколько признаков для обучения.
Добавление признаков
Векторизуем также год постройки (year_of_sale) и соседние регионы (neighborhood_id). Для этого нужно только в VectorAssembler указать выбранные признаки:
features = ['year_of_sale', 'sale_price', 'neighborhood_id'] assembler = VectorAssembler(inputCols=features, outputCol='features') output = assembler.transform(sample)
Python-код для остальных шагов — разделение на тестовую и обучающую выборки, обучение и оценивание модели — остаётся все тем же. В итоге, мы смогли повысить точность до 60%:
print('Evaluation:', evaluator.evaluate(predictions)) # Evaluation: 0.6019972898385996
Отметим также, что при большем количестве классов в качестве метрики следует использовать MultilabelClassificationEvaluator, вместо BinaryClassificationEvaluator.
Ещё больше подробностей о том, как готовить данные и применять модели Machine Learning для решения задачи классификации в PySpark на реальных примерах Data Science и Big Data, вы узнаете на специализированном курсе «Анализ данных с Apache Spark» в лицензированном учебном центре обучения и повышения квалификации разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве.
Источники