Как Apache Spark планирует и запускает задания в кластере

Планирование запуск заданий Spark в кластере, Apache Spark Для разработчика, разработка Spark-приложений, Apache Spark для дата-инженера, Школа Больших Данных Учебный Центр Коммерсант

Какие механизмы и компоненты позволяют Apache Spark планировать задания и эффективно утилизировать ресурсы кластера. Чем статическое разделение ресурсов отличается от динамического, и как настроить планировщик для ускорения вычислений.

Планирование заданий в Apache Spark

Распределенный характер Apache Spark предполагает наличие инструментов для разделения ресурсов между вычислениями. В режиме кластера каждое приложение Spark, т.е. каждый экземпляр SparkContext запускает независимый набор процессов-исполнителей. Диспетчеры кластеров, на которых работает Spark, позволяют планировать запуск приложений, в каждом из которых есть несколько заданий. Задания – это действия, которые могут выполняться одновременно, будто отправленные разными потоками. Для планирования ресурсов в каждом SparkContext есть планировщик.

Как мы уже писали здесь, задание состоит из задач, а сама задача — это наименьшая исполнительная единица в Spark, которая выполняет серию инструкций, например, чтение данных, фильтрацию и сопоставление. Задачи выполняются внутри исполнителя – виртуальной машины Java (JVM) на одном физическом узле кластера. Несколько задач объединяются в этап, в котором каждая задача выполняет один и тот же набор инструкций. Это обеспечивает распараллеливание обработки данных. Таким образом, задача – это модуль параллелизма в Spark. Каждая задача сопоставляется с одним ядром и разделом в наборе данных.

При работе в кластере каждое приложение Spark получает независимый набор JVM-исполнителей, которые только запускают задачи и хранят данные для этого приложения. Если нескольким пользователям необходимо совместно использовать кластер, существуют разные варианты управления распределением в зависимости от диспетчера кластера.

В любом диспетчере кластера возможно статическое разделение ресурсов, когда каждому приложению предоставляется максимальное количество ресурсов, которые оно может использовать, с удержанием их на протяжении всего своего существования. Этот подход используется в автономном режиме Spark и YARN , а также Mesos. Распределение ресурсов можно настроить следующим образом в зависимости от типа кластера. 

По умолчанию приложения, отправленные в кластер автономного режима, будут работать в порядке FIFO (First In, First Out), и каждое приложение попытается использовать все доступные узлы. Можно ограничить количество узлов, используемых приложением, задав свойство конфигурации spark.cores.max, или изменить значение по умолчанию через параметр spark.deploy.defaultCores. Помимо управления ядрами ЦП, можно контролировать использование памяти каждым приложением через конфигурацию spark.executor.memory.

Чтобы использовать статическое разделение в Mesos, надо установить для  свойства конфигурации spark.mesos.coarse значение true и ограничение общего ресурса spark.cores.max для каждого приложения, как в автономном режиме. Также следует настроить потребление памяти исполнителя, используя spark.executor.memory. Также на Mesos доступно динамическое совместное использование ядер ЦП. В этом режиме каждое приложение Spark по-прежнему имеет фиксированное и независимое распределение памяти, что задается параметром spark.executor.memory. Но когда приложение не выполняет задачи на компьютере, другие приложения могут выполнять задачи на этих ядрах. Этот режим полезен, если работает много не слишком активных приложений, например, сеансы оболочки от отдельных пользователей. Однако, это сопряжено с риском менее предсказуемой задержки, поскольку приложению может потребоваться некоторое время, чтобы вернуть ядра ЦП на одном узле для выполнения задания. Использовать этот режим можно через mesos://URL и параметр spark.mesos.coarse, установленный в значение false.

При использовании YARN параметр клиента Spark —num-executors определяет, сколько исполнителей будет выделено в кластере. Для настройки потребления ресурсов каждого исполнителя используются свойства конфигурации spark.executor.instances, spark.executor.memory и spark.executor.cores

Ни один из этих режимов пока не обеспечивает совместное использование памяти между приложениями. Если это требуется, можно запустить одно серверное приложение, которое будет обслуживать несколько запросов, запрашивая одни и те же RDD.

Планирование заданий Spark в кластере
Планирование заданий Spark в кластере

Динамическое распределение ресурсов

Также Spark предоставляет механизм динамической настройки ресурсов, занимаемых приложением, в зависимости от рабочей нагрузки. Это означает, что приложение может возвращать ресурсы кластеру, если они больше не используются, и запрашивать их снова позже, когда возникнет потребность. Эта функция особенно полезна, если несколько приложений совместно используют ресурсы в кластере. Функция Dynamic Resource Allocation отключена по умолчанию и доступна во всех крупномасштабных менеджерах кластеров, т. е. в автономном режиме, режиме YARN, Mesos и K8s.

В автономном режиме без явной настройки spark.executor.cores, каждый исполнитель получит все доступные ядра ЦП. Если динамическое выделение ресурсов включено, исполнителей будет выделено больше, чем ожидалось. Чтобы использовать динамическое распределение ресурсов в автономном режиме, рекомендуется явно указать ядра для каждого исполнителя. Подробнее об этом мы писали здесь и здесь.

Приложение Spark с включенным динамическим распределением ресурсов запрашивает дополнительных исполнителей, когда у него есть невыполненные задачи, ожидающие планирования. Это означает, что существующий набор исполнителей недостаточен для одновременного насыщения всех поставленных, но еще не завершенных задач.

Исполнители запрашиваются по раундам. Фактический запрос запускается, когда в течение нескольких секунд существуют ожидающие задачи период, равный значению свойства spark.dynamicAllocation.schedulerBacklogTimeout. Запрос повторяется снова каждые spark.dynamicAllocation.sustainedSchedulerBacklogTimeout секунды, если очередь ожидающих задач сохраняется. Кроме того, количество исполнителей, запрошенных в каждом раунде, увеличивается в геометрической прогрессии по сравнению с предыдущим раундом. Например, приложение добавит 1 исполнителя в первом раунде, а затем 2, 4, 8 и так далее в последующих. Такой экспоненциальный рост позволяет плавно наращивать количество исполнителей.

Политика удаления исполнителей намного проще. Приложение Spark удаляет исполнителя, если он бездействует более нескольких секунд, количество которых задано в spark.dynamicAllocation.executorIdleTimeout. Это логично, т.к. исполнитель не должен бездействовать, если еще есть незавершенные задачи, которые необходимо запланировать.

Если перед динамическим распределением исполнитель завершает работу одновременно с завершением работы связанного приложения, то все состояние, связанное с исполнителем, больше не требуется и удаляется. Однако при динамическом размещении приложение продолжает работать даже после явного удаления исполнителя. Если приложение попытается получить доступ к состоянию, хранящемуся или записанному исполнителем, ему придется выполнить перерасчет состояния. Поэтому Spark использует механизм для корректного вывода из эксплуатации исполнителя, сохраняя его состояние перед его удалением. Это особенно важно для shauffle-операций, когда исполнитель сначала записывает свои собственные выходные данные этапа Map локально на диск, а затем действует как сервер для этих файлов, когда другие исполнители пытаются их получить. В случае отстающих задач, то есть задач, которые выполняются намного дольше других, динамическое распределение может удалить исполнителя до завершения перемешивания. Тогда shuffle-файлы, записанные этим исполнителем, придется вычислять заново. Избежать этого помогает использование внешней службы для сохранения shuffle-файлов, которая появилась в версии 1.2. Эта служба ExternalShuffleService относится к длительному процессу, который выполняется на каждом узле кластера независимо от приложений Spark и их исполнителей. Если служба включена, исполнители будут получать файлы в случайном порядке из этой службы, а не друг от друга. Поэтому любое shuffle-состояние, записанное исполнителем, может продолжать обслуживаться и после жизни исполнителя.

Помимо записи shuffle-файлов, исполнители также кэшируют данные либо на диске, либо в памяти. Однако при удалении исполнителя все кэшированные данные больше не будут доступны. Чтобы избежать этого, по умолчанию исполнители, содержащие кэшированные данные, никогда не удаляются. Настроить это поведение можно с помощью spark.dynamicAllocation.cachedExecutorIdleTimeout. Если для параметра spark.shuffle.service.fetch.rdd.enabled установлено значение true, Spark может использовать ExternalShuffleService для извлечения сохраненных на диске блоков RDD. В случае динамического выделения, если эта функция включена, исполнители, имеющие только сохраненные на диске блоки, считаются простаивающими в течении spark.dynamicAllocation.executorIdleTimeout секунд и освобождаются. В будущем кэшированные данные могут сохраняться в хранилище вне кучи, аналогично тому, как shuffle-файлы сохраняются с помощью внешней службы ExternalShuffleService.

Внутри приложения Spark, представляющего собой экземпляр SparkContext, несколько параллельных заданий могут выполняться одновременно, если они были отправлены из разных потоков. Под заданием понимается действие, например save(), collect() и пр., что явно выполняется сразу, а не в отложенном режиме, в отличие от преобразований. Планировщик полностью потокобезопасен и позволяет приложениям обслуживать несколько запросов, например, для разных пользователей.

По умолчанию планировщик выполняет задания в порядке FIFO. Каждое задание разделено на этапы, например, этапы сопоставления Map и свертки Reduce. Первое задание получает приоритет для всех доступных ресурсов. Когда на этапах первого задания есть задачи для запуска, второе задание получает приоритет и т.д. Если заданию во главе очереди не обязательно использовать весь кластер, более поздние задания могут начать выполняться сразу. Но если задания в начале очереди большие, то последующие задания могут быть значительно задержаны. Начиная с версии 0.8, также можно настроить справедливое распределение ресурсов между заданиями, когда Spark распределяет задачи между заданиями по круговому принципу, так что все задания получают примерно равную долю ресурсов кластера. Это означает, что короткие задания, отправленные во время выполнения длинного задания, могут сразу же начать получать ресурсы и при этом обеспечить хорошее время отклика, не дожидаясь завершения длинного задания. Такой режим лучше всего подходит для многопользовательских настроек. Эта функция отключена по умолчанию и доступна во всех крупномасштабных диспетчерах кластеров, т.е. в автономном режиме, YARN, K8s и Mesos .

В заключение отметим, что PySpark по умолчанию не поддерживает синхронизацию потоков виртуальной машины Python (PVM) с потоками JVM. А запуск нескольких заданий в нескольких потоках PVM не гарантирует запуск каждого задания в каждом соответствующем потоке JVM. Из-за этого ограничения невозможно установить другую группу заданий через sc.setJobGroup отдельный поток PVM. Также невозможно отменить задание через sc.cancelJobGroup позднее. Поэтому параметр pyspark.InheritableThread рекомендуется использовать вместе для потока PVM, чтобы наследовать наследуемые атрибуты (локальные свойства) в потоке JVM.

Освойте все возможности Apache Spark для аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://spark.apache.org/docs/latest/job-scheduling.html
  2. https://www.researchgate.net/figure/Scheduling-process-of-the-Spark-application_fig1_352723608
  3. https://medium.com/@badwaik.ojas/job-scheduling-in-apache-spark-e7b2ce2fbd
Поиск по сайту