Обработка данных является одной из самых первоочередных задач анализа Big Data. Сегодня мы расскажем о самых полезных преобразованиях PySpark, которые можно выполнить над столбцами. Читайте далее, как привести значения к 0 или 1, как преобразовать из строк в числа и обратно, а также как обработать недостающие значения(Nan) с примерами в PySpark.
Бинаризация: приводим к 0 и 1
Бинаризация используется тогда, когда нужно привести значения к 0 и 1. Бинаризация может использоваться как один из видов категоризации признаков. В PySpark для этого вида преобразования используется класс Binarizer.
Binarizer принимает на вход столбцец inputCol и выходной столбец outputCol, а также порог бинаризации. Значения, превышающие пороговое значение, преобразуются в 1.0; значения, которые равны или меньше порогового значения, преобразуются в 0.0. Типы входных столбцов может быть также Vector и Double. Вот так выглядит пример бинаризации в PySpark:
from pyspark.ml.feature import Binarizer continuousDataFrame = spark.createDataFrame([ (0, 0.1), (1, 0.8), (2, 0.2) ], ["id", "feature"]) binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized") binarizedDataFrame = binarizer.transform(continuousDataFrame) print("Binarizer output with Threshold = %f" % binarizer.getThreshold()) binarizedDataFrame.show() # Binarizer output with Threshold = 0.500000 +---+-------+---------+ | id|feature|binarized| +---+-------+---------+ | 0| 0.1| 0.0| | 1| 0.8| 1.0| | 2| 0.2| 0.0| +---+-------+---------+
StringIndexer: из строк в числа
StringIndexer кодирует строковые столбцы в столбцы индексов. Он очень удобен при обучении алгоритмов Machine Learning, поскольку они работают только с числами. Ниже пример кода для преобразования из строковых значений в числовые в PySpark. Значения индексов будут находятся в диапазоне [0, количество_меток). Поддерживаются четыре варианта упорядочения (по умолчанию стоит frequencyDesc).
- frequencyDesc: в порядке убывания по частоте метки (наиболее частой метке присвоено 0)
- frequencyAsc: в порядке возрастания по частоте метки (наименее частой метке присвоено 0)
- alphabetDesc: алфавитный порядок по убыванию
- alphabetAsc: возрастающий алфавитный порядок
from pyspark.ml.feature import StringIndexer data = spark.createDataFrame([ (0, 'a'), (1, 'b'), (2, 'c'), (3, 'a'), (4, 'a'), (5, 'c'), ], ["id", "feature"]) indexer = StringIndexer(inputCol="feature", outputCol="indexed", ) indexerModel = indexer.fit(data) indexedData = indexerModel.transform(data) indexedData.show() # +---+-------+-------+ | id|feature|indexed| +---+-------+-------+ | 0| a| 0.0| | 1| b| 2.0| | 2| c| 1.0| | 3| a| 0.0| | 4| a| 0.0| | 5| c| 1.0| +---+-------+-------+
IndexToString: из чисел в строки
IndexToString является противоположной PySpark-операцией StringIndexer, т.е. производит преобразование из чисел в строки. IndexToString принимает на вход результат StringIndexer, и еще нужно указать название преобразованного столбца. Возвращаясь к предыдущем примеру преобразуем назад столбец indexedData в строки:
from pyspark.ml.feature import IndexToString, StringIndexer converter = IndexToString(inputCol="indexed", outputCol="originalCategory") converted = converter.transform(indexedData) converted.select("id", "indexed", "originalCategory").show() # +---+-------+----------------+ | id|indexed|originalCategory| +---+-------+----------------+ | 0| 0.0| a| | 1| 2.0| b| | 2| 1.0| c| | 3| 0.0| a| | 4| 0.0| a| | 5| 1.0| c| +---+-------+----------------+
Imputer: заменяем недостающие значения на среднее или медианное
Imputer дополняет недостающие значения в наборе данных средним или медианным значением столбцов, в которых находятся пропущенные значения. Столбцы должны быть числового типа. У Imputer могут возникнуть проблемы с категориальными признаками и возможно выдаст в неправильном формате. Кроме того Imputer может заменять отличные от NaN значения, их тогда нужно передать в аргумент missingValue
. Например, missingValue
=
0
заменит все 0 на средние или медианные значения. Пример преобразования Imputer в PySpark для замены недостающих значений на медианное выглядит следующим образом:
from pyspark.ml.feature import Imputer df = spark.createDataFrame([ (1.0, float("nan")), (2.0, float("nan")), (float("nan"), 3.0), (4.0, 4.0), (5.0, 5.0) ], ["a", "b"]) imputer = Imputer( strategy="median", missingValue=float("nan"), inputCols=["a", "b"], outputCols=["out_a", "out_b"],) model = imputer.fit(df) model.transform(df).show() # +---+---+-----+-----+ | a| b|out_a|out_b| +---+---+-----+-----+ |1.0|NaN| 1.0| 4.0| |2.0|NaN| 2.0| 4.0| |NaN|3.0| 2.0| 3.0| |4.0|4.0| 4.0| 4.0| |5.0|5.0| 5.0| 5.0| +---+---+-----+-----+
Чтобы заменить недостающие значения на средние, в аргумент strategy
следовало бы передать значение "mean"
.
О том, как применять различные преобразования в PySpark для решения реальных задач Big Data, вы узнаете на специализированном курсе «Анализ данных с Apache Spark» в лицензированном учебном центре обучения и повышения квалификации разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве.
Источники