Что такое барьерный режим выполнения в Apache Spark, чем он отличается от вычислительной модели MapReduce, как связан с глубоким машинным обучением и где используется на практике.
Что такое барьерный режим выполнения в Apache Spark
Способ выполнения заданий Spark определяется режимом выполнения приложения, заданным на уровне фреймворка. На платформе. Именно от режима зависит, как задания будут разделены на несколько параллельных задач и каким образом они планируются. Исторически наиболее распространенным режимом выполнения в инфраструктуре больших данных, включая Spark, является MapReduce. Этот режим выполнения достаточно гибок для обработки самых разных рабочих нагрузок, от ETL-процессов до машинного обучения.
Суть вычислительной модели и режима выполнения MapReduce сводится к следующему:
- задание представляет собой совокупность этапов, каждый из которых является сопоставлением, т.е. локальной вычислительной операцией, или сверткой результатов. Между этими этапами обычно происходит перетасовка, т.е. передача данных по сети между вычислительными узлами (экземплярами распределенного приложения).
- каждый этап состоит из независимых друг от друга задач, что позволяет системе масштабироваться по мере доступности большего количества ресурсов. Поскольку задачи независимы друг от друга, при сбое одной из них повторяется только эта задача.
- Количество задач на этапе Map определяется объемом данных, а количество задач на этапе Reduce определяется разработчиком.
Однако, для глубокого обучения (Deep Learning) режим выполнения с вычислительной моделью MapReduce не подходит из-за слишком большой задержки. Фреймворки глубокого обучения хорошо работают с другой моделью выполнения, называемой MPI (Message Passing Interface). В MPI все рабочие процессы запускаются одновременно и передают сообщения. Этот программный интерфейс передачи сообщений используется во многих DL-фреймворках, например, open-source платформе Horovod, о которой мы писали здесь. Поэтому для поддержки глубокого обучения, в Spark 3.0 добавлен новый режим выполнения заданий, работающий по принципам MPI. Этот режим называется барьерным (Barrier Execution Mode) и работает следующим образом:
- Как и в MapReduce, задание – это совокупность этапов, между которыми происходит перетасовка данных;
- каждый этап представляет собой набор задач, которые запускаются одновременно и зависят друг от друга, в отличии от MapReduce. Если слотов задач недостаточно, будет создано исключение. Взаимодействие задач между собой обеспечивает BarrierTaskContext, объединяющий несколько контекстов задач TaskContext, в котором они выполняются. BarrierTaskContext создает конечную точку удаленного вызова процедур (RpcEndpointRef) для координатора барьеров, используемого для обработки операторов задач барьерного этапа и агрегации всех результатов.
- поскольку задачи зависят друг от друга, при сбое одной из них все задачи повторяются. Это тоже радикально отличается от MapReduce. При сбое задачи Spark прерывает все остальные задачи, а планировщик задач TaskScheduler уничтожает все остальные запущенные задачи этого барьерного этапа и перезапускает его из известного состояния (контрольной точки, checkpoint).
- количество задач на этапе всегда определяет разработчик, поскольку даже при небольшом объеме данных вычисления могут быть сложными и требовать больше ресурсов, чем обычная обработка. Такое планирование обеспечивает достаточно ресурсов для совместного выполнения всех задач.
Таким образом, барьерный режим выполнения предъявляет строгое требование к планировщику Spark запускать все задачи барьерного этапа одновременно или не запускать их вообще, ожидая доступности необходимых ресурсов. Барьерный режим выполнения позволяет одновременно выполнять столько задач, сколько позволяет ResourceProfile.
Поскольку в режиме выполнения MapReduce задачи этапа в Spark не зависят друг от друга, их можно планировать независимо. Это позволяет планировать задачи только с привязкой к ресурсам, например, 5 задач можно запланировать на 1 ядре ЦП в 5 последовательных пакетах. Но режим MPI обеспечивают большую гибкость и поддерживает зависимость между задачами.
Где используется барьерный режим: пара практических примеров
Машинное обучение основано на ряде вычислительных алгоритмов, например, градиентный бустинг, который представляет собой ансамбль деревьев решений. Он основан на итеративном обучении деревьев решений для сокращении функции потерь. Благодаря этому градиентный бустинг может работать с категориальными признаками и справляться с нелинейностями входных данных.
Одной из наиболее известных реализаций алгоритма градиентного бустинга, активно используемого в машинном обучении, является LightGBM. Модели LightGBM можно включать в существующие конвейеры Spark ML и использовать для пакетной и потоковой передачи, а также для обслуживания рабочих нагрузок. LightGBM в Spark на 10–30% быстрее Spark ML и точнее на 15%. Кроссплатформенный LightGBM на Spark поддерживается интерфейсами PySpark и SparklyR. Также LightGBM поддерживается в SynapseML, фреймворке, позволяющем создавать масштабируемые конвейеры машинного обучения.
SynapseML передает данные из разделов Spark в собственные наборы данных LightGBM перед тем, как передать управление фактическому исполняемому коду LightGBM для обучения ML-модели и вывода результатов.
По умолчанию LightGBM использует обычную парадигму Spark для запуска задач, связываясь с драйвером для координации выполнения задач. Поток драйвера объединяет всю информацию о сокете (хосте и порте) задачи, а затем передает полный список обратно рабочим процессам для вызова NetworkInit(). Эта процедура требует, чтобы драйвер знал, сколько задач имеется, а несоответствие между ожидаемым количеством задач и фактическим количеством приводит к тупику инициализации. При проблемах с сетью можно использовать барьерный режим выполнения Spark. SynapseML предоставляет флаг UseBarrierExecutionMode для барьерного режима Apache Spark, чтобы гарантировать одновременное выполнение всех задач. Этот режим выполнения меняет логику для синхронизированного агрегирования информации по всем задачам, запущенных на разных хостах и портах.
Еще одной популярной библиотекой градиентного бустинга, поддерживаемой Apache Spark, является XGBoost4J — JVM-пакет распределенного пакета xgboost с алгоритмами машинного обучения для регрессии и классификации. Ядром распределенного обучения в библиотеке xgboost4j-spark, который может запускать распределенный модуль xgboost в Apache Spark, является пакет XGBoost.trainDistributed.
XGBoost4J-Spark представляет собой проект интеграции XGBoost и Apache Spark, позволяя сочетать высокопроизводительную реализацию алгоритма XGBoost с мощным механизмом обработки данных Spark для извлечения фичей, преобразования данных, уменьшения размерности, построение, оценка и настройка ML-конвейеров, а также их сохранение и целостная загрузка.
Узнайте больше про возможности Apache Spark для машинного обучения и разработки приложений аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Источники
- https://issues.apache.org/jira/browse/SPARK-24374
- https://blog.madhukaraphatak.com/barrier-execution-mode-part-1
- https://books.japila.pl/apache-spark-internals/barrier-execution-mode/
- https://microsoft.github.io/SynapseML/docs/Explore%20Algorithms/LightGBM/Overview
- https://xgboost.readthedocs.io/en/stable/jvm/xgboost4j_spark_tutorial.html