Постоянно добавляя в наши курсы по Apache Spark и машинному обучению практические примеры для эффективного повышения квалификации Data Scientist’ов и инженеров данных, сегодня рассмотрим задачу пакетного прогнозирования и планирование ее запуска по расписанию без применения масштабных MLOps-решений.
Apache Spark для пакетного прогнозирования
Есть много готовых решений и инструментов для пакетного прогнозирования. Самым простым является самостоятельно написание кода на Python и планирование его запуска с помощью Cron. Однако, с этим подходом есть две проблемы:
- с каждым прогнозом возникают накладные расходы на сбор/распаковку модели;
- невозможность применять оптимизацию scikit-learn для прогнозирования результатов.
Вместо этого можно использовать специальные пакетные планировщики, например, Apache Airflow. Для пакетного прогнозирования также доступные облачные решения, например, MLFlow и Amazon Sagemaker. Однако, если нужно обойтись без внешних инструментов, но воспользоваться преимуществами оптимизации Scikit-Learn для обработки больших объемов данных, подойдет простой и надежный PySpark в Apache Spark. К примеру, можно создавать модели машинного обучения с использованием конвейеров scikit-learn и tensorflow, развертывать их через Spark-приложение, а затем использовать пакетную обработку данных для прогнозирования.
Spark не только предоставляет возможности планирования и мониторинга для пакетных прогнозов, но и масштабируется для более сложных алгоритмов и датасетов без необходимости подключать дополнительные сторонние решения для развертывания ML-моделей. Чтобы показать, как это работает, далее рассмотрим, как развернуть предиктивные модели машинного обучения в Spark и получать результаты прогнозирования с использованием возможностей параллельной обработки данных этого фреймворка.
Пакетное прогнозирование с использованием Spark состоит из следующих типовых шагов, которые одинаковы для задач классификации и регрессии:
- создать модель машинного обучения и упаковать ее в Pickle-файл – бинарный вариант Python-объекта для сериализации и десериализации его структуры. Преобразованную иерархию объектов Python в поток байтов в виде Pickle-файл далее следует сохранить в Hadoop
- написать Spark-задание и распаковать Python-объект из Pickle-файл;
- распространить этот Python-объект на все узлы кластера Spark;
- создать пользовательскую PySpark-функцию и вызвать метод прогнозирования для широковещательного объекта ML-модели;
- создать столбцы фичей, на которых была обучена ML-модель;
- создать Spark-датафрейм для прогнозирования с одним уникальным столбцом и фичами из предыдущего шага.
- создать столбец прогноза в Dataframe с предыдущего шага и вызвать UDF со столбцами фичей.
Таким образом, получим Spark-датафрейм с уникальным идентификатором, всеми столбцами фичей и столбцом прогноза. Этот датафрейм можно повторно использовать для многих предиктивных ML-моделей. В качестве примера рассмотрим реализацию Python-кода для всех вышеуказанных шагов.
Пример реализации ML-модели пакетного прогнозирования
Сперва создаем ML-модель и сохраняем ее в Pickle-файл:
import pickle pickle_out = open("model.pkl", "wb") pickle.dump(rf, pickle_out) pickle_out.close()
В вышеприведенном коде rf — это ML-модель случайного леса, обученная обнаружению мошенничества с кредитными картами. Далее создадим Spark-задание, распакуем Python-объект и транслируем его на узлы кластера. Широковещательный Python-объект сделает ML-модель доступной на нескольких узлах для параллельной пакетной обработки.
# Create spark session spark = SparkSession.builder.getOrCreate() sc = spark.sparkContext # Unpickle, pkl file model_rdd_pkl = sc.binaryFiles("model.pkl") model_rdd_data = model_rdd_pkl.collect() # Load and broadcast python object over spark nodes creditcardfrauddetection_model = pickle.loads(model_rdd_data[0][1]) broadcast_creditcardfrauddetection_model = sc.broadcast(creditcardfrauddetection_model) print(broadcast_creditcardfrauddetection_model.value)
Далее создадим UDF-функцию PySpark для прогнозирования обнаружения мошенничества.
# Create udf and call predict method on broadcasted model def predict(*cols): prediction = broadcast_creditcardfrauddetection_model.value.predict_proba((cols,)) return float(prediction[0,1]) predict_udf = udf(predict, DoubleType())
Создадим список фичей и сгенерируем датафрейм с уникальными идентификаторами.
# Load dataset for prediction dataset = pd.read_csv('creditcard.csv') dataset.head() X = dataset.drop(["Class"], axis = 1) y = dataset["Class"] X1 = X.drop(["Time"], axis = 1) # Create feature column list on which model was trained feature_columns = X1.columns.to_list() print(feature_columns) # Create spark dataframe for prediction df = spark.read.csv('creditcard.csv', header=True) df.show()
Наконец, вызовем PySpark UDF-функцию для датафрейма с предыдущего шага и создадим прогнозы со столбцами фичей:
# Create predictions using spark udf df = df.withColumn("score", predict_udf(*feature_columns)) df.show() # Calculate accuracy count = df.count() correct_predictions = df.filter(col("Class")==col("score")).count() incorrect_predictions = df.filter(col("Class")!=col("score")).count() accuracy = (correct_predictions/count)*100 print(accuracy)
Разработка и внедрение ML-решений
Код курса
MLOPS
Ближайшая дата курса
10 марта, 2025
Продолжительность
24 ак.часов
Стоимость обучения
54 000 руб.
Используя эти шаги, любую модель пакетного прогнозирования, созданную с помощью Python-библиотеки Scikit-Learn, можно развернуть в кластере Spark. О тонкостях переноса ML-моделей мы с помощью Pickle и других форматах мы рассказывали вчера. Хотя изложенный пример нельзя назвать полноценным MLOps-решением, но такой подход разработки и запуска в production моделей машинного обучения с помощью пакетной обработки Apache Spark оказывается достаточно простым и быстрым. В зависимости от частоты изменения входных данных, пакетные прогнозы могут планироваться один или несколько раз в день. Пакетные прогнозы с использованием Spark не имеют состояния и могут быть объединены с прогнозами в реальном времени для параллельного выполнения. Apache Spark позволяет планировать задания в YARN с функциями логирования и мониторинга. Наконец, поскольку Spark отлично масштабируется и можно самостоятельно управлять кластером, весь процесс пакетного прогнозирования намного ускоряется.
Как внедрить лучшие практики MLOps с Apache Spark и других инструментов аналитики больших данных, вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Источники