Что такое SparkListener, какие встроенные слушатели бывают в Apache Spark, как написать собственный перехватчик событий и зачем это нужно разработчику распределенного приложения. Также рассмотрим, как реализовать свой слушатель для приложения на PySpark и зачем включать уровень логирования INFO для SparkContext.
Что такое слушатель Spark
Apache Spark позволяет быстро обрабатывать большие объемы данных. Но как проверить, соответствует ли ожиданиям реальная производительность рабочих нагрузок с большими данными? Это можно увидеть, спустившись на базовый уровень задачи. Программным образом это можно сделать с помощью специального перехватчика событий, т.е. слушателя — SparkListener. Примечательно, что в старых версиях Apache Spark потребность в разработке своих слушателей была очень высока из-за отсутствия отображения системных метрик в веб-GUI вычислительного движка. Слушатели позволяли следить за работоспособностью потокового приложения, например, сколько времени заняло выполнение какого-то триггера, т. е. получение смещений, обработка данных и фиксация WAL, сколько времени заняло получение смещений для новых данных для обработки каждого из определенных источников и фиксации новых доступных смещений. С версии 3.0 это было устранено. Подробно об этом и других тонкостях потоковой передачи в Apache Spark мы рассказываем здесь.
Напомним, в Spark задача — это единица работы, которая выполняется в разделе. Если какой-либо раздел намного меньше или больше других, которые находятся на том же этапе, это может отразиться негативно на всем этапе. Например, все задачи будут выполняться медленнее или случится ошибка нехватки памяти из-за одной или нескольких задач, которые работают со слишком большими данными в разделах.
В Spark есть много счетчиков (counter), которые используются во время выполнения задания. Каждый из этих счетчиков отслеживает определенный компонент задания, например время выполнения задачи, время сборки мусора JVM, количество записей, прочитанных каждой задачей, количество записанных сообщений, время работы исполнителя, время сериализации и пр. Все это можно использовать для отладки производительности и генерации предупреждений, например, о задачах, которые могут привести к искажению данных, что сильно снижает производительность распределенного приложения.
В наглядном виде получить всю эту информацию можно из веб-интерфейса Spark, о чем мы писали здесь. Этот способ работает, пока задание выполняется, но он не самый удобный с точки зрения разработчика. Собрать все эти и многие другие данные программным путем поможет специальный перехватчик событий – SparkListener. Он перехватывает события планировщика Spark, которые он генерирует в ходе выполнения распределенного приложения. Фреймворк использует слушатели для отображения системных метрик в веб-интерфейсе, сохранения событий в History Server, динамического распределения исполнителей и других сервисов.
Класс SparkListener является прямой реализацией SparkListenerInterface, где все методы обратного вызова являются неоперативными или недействующими. Он представляет собой низкоуровневый способ мониторинга, который также использует пользовательский интерфейс фреймворка. Наименьшая степень детализации мониторинга осуществляется на уровне задач в дополнение к уровням приложений, заданий и этапов.
Изначально фреймворк предоставляет целый ряд встроенных слушателей или обратных вызовов для сбора различных метрик. Например, EventLoggingListener сохраняет события в кодировке JSON в файл журнала в каталоге spark.eventLog.dir при включенном логировании событий. А SparkFirehoseListener позволяет пользователям получать все события SparkListenerEvent, переопределяя только один метод onEvent. Если встроенных слушателей и методов интерфейса SparkListenerInterface недостаточно, разработчик может самостоятельно написать собственный (пользовательский) слушатель и зарегистрировать его в контексте Spark-приложения. Как это сделать, рассмотрим далее.
Как написать и использовать свой SparkListener: универсальный подход
Преимущество разработки пользовательских слушателей в том, что они не усложняют программный код и не влияют на время выполнения. Все, что нужно сделать, чтобы получить эти созданные и записанные события, это написать слушатель, упаковать его в JAR-файл и добавить в конфигурацию Spark-приложения. Написать собственный слушатель можно с помощью API-интерфейса разработчика SparkListener, который позволяет работать со множеством событий и отслеживать системные метрики приложения. Чтобы работать с пользовательским слушателем Spark, его следует зарегистрировать с помощью метода SparkContext.addSparkListener или параметра spark.extraListeners. Также следует включить уровень логирования INFO для регистратора org.apache.spark.SparkContext, чтобы видеть, когда регистрируются пользовательские слушатели Spark:
INFO SparkContext: Registered listener org.apache.spark.scheduler.StatsReportListener
На программном уровне интерфейс SparkListenerInterface — это частный контракт для слушателей Spark, обеспечивающий перехват событий от планировщика Spark. Слушатели SparkListener и SparkFirehoseListener являются прямыми реализациями контракта SparkListenerInterface, помогающие разрабатывать более сложные пользовательские перехватчики событий.
Написать пользовательский слушатель можно путем расширения класса SparkListener на языках Scala или Java. Прямого способа или готовой PySpark-библиотеки для разработки своего слушателя на Python нет, поскольку фреймворк написан на Scala и Java. Но подход добавления пользовательского SparkListener в задание является универсальным и может быть применен к PySpark также как к заданию на Scala или Java, чтобы захватить нужную информацию, которая поступает в сам слушатель. Этот подход включает следующие шаги:
- Написать собственный слушатель на Scala или Java и упаковать его в JAR-архив;
- Добавить JAR-файл в driver.extraClassPath и Spark.executor.extraClassPath в PySpark, в SparkConf приложения на Scala/Java или в конфигурациях по умолчанию;
- Добавить полное имя класса слушателя в свойство конфигурации extraListeners.
Это удобно, когда есть длительные задания и задания, интенсивно использующие память. Можно получить все их метрики самостоятельно, запустив вызов API, что удобно для разработчика. Фреймворк предоставляет набор API-интерфейсов, доступных на конечной точке сервера истории Spark, для которых в качестве входных данных требуется идентификатор приложения (applicationId).
Также можно получить показатели уровня запроса, реализовав QueryExecutionListener из пакета org.apache.spark.sql.util. Это публичный интерфейс слушателя выполнения SQL-запросов, который можно использовать для анализа метрик их выполнения. Разработчику Spark-приложения стоит помнить, что реализации этого интерфейса должны гарантировать потокобезопасность, поскольку они могут вызываться несколькими разными потоками. QueryExecutionListener включает только два абстрактных метода, которые нуждаются в реализации. Написанный пользовательский класс можно добавить программно в SparkConf или с помощью конфигурации. spark.sql.queryExecutionListener.
При этом используется ExecutionListenerManager — интерфейс управления для QueryExecutionListeners, которые прослушивают метрики выполнения:
- Name of the action — название действия, которое инициировало выполнение запроса;
- QueryExecution — конвейер выполнения SQL-запроса, который состоит из этапов;
- Execution time — время выполнения SQL-запроса в наносекундах.
ExecutionListenerManager доступен как свойство listenerManager сеанса SparkSession и свойство listenerManager состояния сеанса SessionState. Этот интерфейс принимает один SparkConf при создании и создается исключительно при запросе BaseSessionStateBuilder для ExecutionListenerManager во время сборки SessionState. Он использует внутренний реестр слушателей для зарегистрированных реализаций интерфейса QueryExecutionListeners. При создании ExecutionListenerManager свойство конфигурации spark.sql.queryExecutionListeners в качестве списка QueryExecutionListeners, которые должны автоматически добавляться во вновь созданные сеансы и регистрирует их. Читайте в нашей новой статье, как расширить типовой слушатель StreamingQueryListener, который есть в Java и Scala API библиотеки Spark Structured Streaming, но недоступен в PySpark.
Узнайте больше практических деталей по применению Apache Spark для задач дата-инженерии, разработки распределенных приложений и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Основы Apache Spark для разработчиков
- Анализ данных с Apache Spark
- Потоковая обработка в Apache Spark
- Машинное обучение в Apache Spark
- Графовые алгоритмы в Apache Spark