Распараллеливание заданий в PySpark

Spark Python для дата-инженеров и разработчиков, Spark PySpark, параллельная обработка Spark, Spark примеры курсы обучение, Школа Больших Данных Учебный центр Коммерсант

Почему параллельное выполнение заданий в Apache Spark зависит от языка программирования и как можно обойти однопоточную природу Python в PySpark.

Что не так с параллельным выполнением заданий PySpark и как это исправить?

Apache Spark позволяет писать распределенные приложения благодаря инструментам для распределения ресурсов между вычислительными процессами. В режиме кластера каждое приложение Spark, представляющее собой экземпляр контекста SparkContext, запускает независимый набор процессов-исполнителей. Диспетчеры кластеров, на которых работает Spark, позволяют планировать запуск приложений, включающих несколько заданий. Задания представляют собой действия, которые могут выполняться параллельно, будто отправленные разными потоками. Для планирования ресурсов в каждом SparkContext предусмотрен планировщик.

Задание состоит из задач, выполняющих последовательность инструкций, таких как чтение, фильтрация и сопоставление данных. Задачи выполняются внутри исполнителя – виртуальной машины Java (JVM) на одном физическом узле кластера. Несколько задач объединяются в этап, в котором каждая задача выполняет одинаковый набор инструкций, что обеспечивает параллельную обработку данных. Поэтому задача является единицей параллелизма в Spark и ассоциируется с одним ядром и разделом в наборе данных.

При работе в кластере каждое приложение Spark получает независимый набор JVM-исполнителей, которые запускают задачи и хранят данные. Приложения, отправленные в автономный кластер выполняются в порядке очереди FIFO (First In, First Out), используя все доступные узлы, если это намеренно не ограничить в конфигурации. Однако, все это свойственно в первую очередь Spark-приложениям, написанным на JVM-языках, т.е. Scala и Java, о чем мы писали здесь. Но Apache Spark также поддерживает R и Python, который требует трансляции Python-кода в Java-объекты с помощью библиотеки Py4J. Поэтому PySpark изначально является однопоточным из-за однопоточного характера Python.

Python-интерфейс Spark не поддерживает синхронизацию потоков с потоками JVM, а запуск PySpark-заданий в нескольких потоках не гарантирует запуск каждого задания в отдельном потоке виртуальной машины Java. Из-за этого ограничения невозможно задать другую группу заданий в отдельном потоке виртуальной машины Python (PVM, Python Virual Machine) через sc.setJobGroup. Это метод назначает идентификатор группы всем заданиям, запущенным этим потоком, пока ему не будет присвоено другое значение.

В приложении Apache Spark можно сгруппировать задания с помощью контекстного метода sc.setJobGroup. После установки веб-интерфейс Spark будет связывать задания с группой. Чтобы отменить все запущенные в этой группе задания, используется другой метод контекста — sc.cancelJobGroup. Однако, из-за невозможности задать другую группу заданий PySpark в отдельном потоке виртуальной машины PVM с помощью метода sc.setJobGroup, отменить эти задания через sc.cancelJobGroup тоже нельзя. Поэтому рекомендуется использовать метод pyspark.InheritableThread вместе с потоком PVM для наследования наследуемых атрибутов, таких как локальные свойства в потоке JVM.

Если для группы заданий interruptOnCancel установлено значение true, то отмена задания приведет к прерыванию потока, т.е. вызову Thread.interrupt() в потоках-исполнителях задания. Это полезно для обеспечения своевременной остановки задач, но по умолчанию отключено из-за HDFS-1208, поскольку HDFS может реагировать на Thread.interrupt(), помечая узлы как неактивные. Поэтому при параллельном запуске заданий следует использовать локальное наследование потоков pyspark.InheritableThread. При этом режим закрепленного потока не закрывает соединение от Python к JVM, когда поток завершается на стороне Python. С этим классом Python собирает мусор экземпляра потока Python, а также закрывает соединение, что корректно завершает поток JVM.

Таким образом, однопоточная природа Python означает, что PySpark может плохо масштабироваться при работе с большими наборами данных или когда нужно выполнить один и тот же анализ для нескольких подмножеств данных. Изменить однопоточный характер Python, который не использует все доступные вычислительные ресурсы, можно следующим образом:

  • реализовать цикл for для последовательного вычисления, используя однопоточный подход. Такой подход является самым медленным, поскольку не позволяет эффективно использовать базовые ресурсы.
  • использовать модуль futures для одновременного запуска нескольких процессов. Модуль futures предоставляет высокоуровневый интерфейс для асинхронного выполнения вызываемых объектов. Это асинхронное выполнение может осуществляться потоками, используя ThreadPoolExecutor, или отдельными процессами, используя ProcessPoolExecutor. Оба реализуют один и тот же интерфейс, который определяется абстрактным классом Executor. ThreadPoolExecutor реализует распараллеливание операций с интенсивным использованием памяти, а ProcessPoolExecutor интенсивно использует ЦП.
  • разработать пользовательские функции для использования распределенных вычислений в PySpark. Однако, такие UDF довольно неэффективны на небольших и простых задачах из-за высоких накладных расходах преобразования.

Освойте Apache Spark для разработки приложений аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://spark.apache.org/docs/latest/job-scheduling.html#concurrent-jobs-in-pyspark
  2. https://blog.coeo.com/parallelising-python-on-spark-different-approaches
  3. https://docs.python.org/3/library/concurrent.futures.html
Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту