Сегодня рассмотрим Apache Spark с важной для разработчиков распределенных приложений точки зрения, разобрав как в рамках этого Big Data фреймворка справиться с утечками данных при их потоковой передаче. Читайте далее, почему возникает OutOfMemory Exception в Spark-приложениях и как дата-инженеры компании Disney решили эту проблему с нехваткой памяти для JVM.
Зачем Disney нужен Apache Spark Streaming и что с ним не так
Прежде всего поясним бизнес-контекст рассматриваемого примера: Disney Streaming Services — это подразделение одной из крупнейших мультимедиа-корпораций Disney, которое контролирует все услуги потребительской подписки на цифровое видео. Главная миссия Disney Streaming Service – предоставить пользователю свободу доступа к контенту с любого подключенного устройства, в любое время и в любом месте [1].
Disney Streaming Services использует Apache Spark и Spark Structured Streaming для конвейеров обработки больших данных. Spark-приложения работают в среде выполнения Databricks (DBR), которая довольно удобна для пользователя [2]. Databricks Runtime – это набор программных артефактов, которые запускаются на кластерах машин под управлением коммерческого решения Databricks на базе Apache Spark и компонентов для улучшения удобства эксплуатации, производительности и безопасности аналитики больших данных [3]. В частности, в Disney Streaming Services одно из заданий структурированной потоковой передачи использует API-интерфейс flatMapGroupsWithState для накопления состояний и группировки событий в соответствии с бизнес-логикой [2]. Это выполняется в рамках произвольной или настраиваемой обработки событий с отслеживанием состояния, например, чтобы выдавать предупреждение о превышении порогового значения с течением времени для группы или типа событий. Другой сценарии требуют подобной индивидуальной обработки событий во времени – это поддержка пользовательских сеансов в течение определенного или неопределенного времени и их сохранение для последующего анализа. API-интерфейс структурированной потоковой передачи Spark Structured Streaming flatMapGroupsWithState отлично подходит для этих кейсов и может выдавать одну или несколько строк результатов для каждой группы событий [4].
Что такое OutOfMemoryError в Spark-приложениях
В Disney Streaming Services часто возникали сбои Spark-приложения с flatMapGroupsWithState по причине необработанного исключения OutOfMemoryError (OOM), связанного с нехваткой памяти кучи (heap) для виртуальных машин Java, которые запускаются как исполнители или драйверы в составе приложения Apache Spark [2]. Напомним, размер памяти для исполнителя (executor) Spark по умолчанию равен 1 ГБ. Если этого недостаточно, происходит полная сборка, которая приводит к высвобождению избыточной памяти. Но если объем памяти, высвобождаемой после каждого цикла полной сборки мусора, составляет менее 2% в последних 5 последовательных циклах работы сборщика мусора (Garbage Collector), JVM выдаcт исключение нехватки памяти – Out Of Memory Error. Обычно при этом рекомендуется увеличить память исполнителя Spark, а при работе в YARN – также увеличить накладную память для потоков JVM, внутренних метаданных и т. Д. Это настройка фиксируется через spark-submit или в файле spark-defaults.conf [5]:
—conf “spark.executor.memory=12g”
—conf “spark.yarn.executor.memoryOverhead=2048”
or
— «executor-memory=12g»
Подробнее о том, как работает сборка мусора в Apache Spark, читайте в нашей новой статье.
Core Spark - основы для разработчиков
Код курса
CORS
Ближайшая дата курса
7 октября, 2024
Продолжительность
16 ак.часов
Стоимость обучения
48 000 руб.
7 шагов по поиску и устранению причины OOM-ошибки
В случае Disney Streaming Services исключение OutOfMemoryError вызывало перезапуск приложения в среде DBR, обрывая непрерывность потоковой передачи данных. Такая ситуация неприемлема для режима near real-time и SLA с малой временной задержкой. Для решения этой задачи дата-инженеры компании предложили 7-шаговый подход, который позволит четко идентифицировать причину проблемы и устранить ее. Для этого необходимо проверить следующие факторы [2].
- логи драйвера, где остаются записи о каждой проблеме со сбоем Spark-задания. Например, если задача отказывает больше, чем задано настройкой task.maxFailures, причина последней неудачи будет описана в журнале драйвера с подробным описанием причины отказа всего задания.
- логи исполнителя, обычно доступных через ssh. Так, к примеру, можно увидеть, что OOM-ошибка (java.lang.OutOfMemoryError) возникла по одной из двух основных причин: превышение предел накладных расходов Garbage Collector или пространство кучи Java (JavaHeapSpace). Также JavaHeapSpace OOM может возникнуть, когда системе недостаточно памяти для данных, которые необходимо обработать. В некоторых случаях эту проблему решает увеличение ресурсов инстанса (CPU, RAM). Также можно настроить параметры для обеспечения потребления количества памяти для объема данных, которые необходимо обработать за один пакет.
- активность Garbage Collector, который может занимать слишком много времени и приводить к сбою из-за ошибки превышения предела накладных расходов при полной сборки мусора. Решить эту проблему может G1GC – сборщик мусора серверного типа для многопроцессорных машин с большой памятью. Он соответствует целевым показателям времени паузы при сборке мусора, обеспечивая высокую пропускную способность. Операции с целой кучей, такие как глобальная маркировка, выполняются одновременно с потоками приложения. Это предотвращает прерывания, пропорциональные размеру кучи или оперативных данных. Включить G1GC в Apache Spark можно, задав конфигурацию executor.extraJavaOptions: -XX: + UseG1GC. Подробнее о видах памяти фреймворка и конфигурации JVM-настроек читайте в нашей здесь, а про определение оптимальной конфигурации исполнителя — здесь.
- состояние кластера, которое можно проверить с помощью инструмента мониторинга, например, Ganglia от Databricks, которая на наглядных дэшбордах покажет потребление памяти и другие важные параметры производительности распределенной системы. В частности, если память кластера какое-то время была стабильной, начала расти, продолжала расти, а затем упала, это означает, что состояние не очищалось с течением времени или возможна утечка памяти.
- показатели потоковой передачи – метрики, генерируемые Spark с информацией о каждом обработанном пакете. Например, можно построить график изменения значения во времени параметра numRowsTotal. Его стабильность исключает возможность возникновения OOM из-за сохранения состояния. Поэтому целесообразно включить дамп кучи, чтобы увидеть, что занимает столько памяти.
- анализ дампов кучи (HeapDumpOnOutOfMemory), для получения которого в конфигурации Spark Cluster на стороне исполнителя следует включить параметр spark.executor.extraJavaOptions: —XX: + HeapDumpOnOutOfMemoryError —XX: HeapDumpPath = / dbfs / heapDumps. Также можно указать путь для сохранения дампов кучи. Доступ к этим файлам возможен через ssh в workerr’ы и rsync. Периодическое получение дампов кучи позволяет анализировать работу Spark-приложения, сравнивая его нормальную работу с OOM-ошибками. Анализ дампа кучи можно выполнить с помощью таких инструментов, как YourKit или Eclipse MAT.
- поиск места утечки памяти, например, через обозреватель объектов YourKit во время проверки файлов hprof. В частности, в случае Disney Streaming Services это было некорректно закрытое соединение при подключении к Amazon Kinesis — масштабируемый и надежный сервис для потоковой передачи данных в реальном времени. Закрывался только KinesisClient:
override def close(errorOrNull: Throwable): Unit = { kinesisClient.close() }
А клиент Apache Http оставался открытым, что привело к увеличению числа созданных Http-клиентов и открытию множества TCP-соединений, что и вызвало утечку памяти с OOM-ошибками. Проверить это предположение дата-инженерам Disney Streaming Services позволил следующий скрипт на Java:
import $ivy.`software.amazon.awssdk:apache-client:2.13.37` // causes OOM (1 to 1e6.toInt).foreach { _ => software.amazon.awssdk.http.apache.ApacheHttpClient.builder.build() } // doesn't cause OOM (1 to 1e6.toInt).foreach { _ => software.amazon.awssdk.http.apache.ApacheHttpClient.builder.build().close() }
Для закрытия созданных клиентов Apache HTTP нужно написать следующий Java-код:
override def close(errorOrNull: Throwable): Unit = { client.close() httpClient.close() }
Анализ данных с помощью современного Apache Spark
Код курса
SPARK
Ближайшая дата курса
7 октября, 2024
Продолжительность
32 ак.часов
Стоимость обучения
96 000 руб.
Таким образом, рассмотренный пример показывает разнообразие инструментов, которыми должен владеть дата-инженер и разработчик распределенных Big Data приложений на Apache Spark. Завтра мы продолжим разбираться с ними и рассмотрим пример интеграции Спарк с BI-системой Tableau с помощью коннектора. Освоить все эти и другие средства на практике, а также разобраться с другими особенностями эксплуатации Apache Spark для аналитики больших данных вы сможете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Основы Apache Spark для разработчиков
- Анализ данных с Apache Spark
- Потоковая обработка в Apache Spark
- Машинное обучение в Apache Spark
- Графовые алгоритмы в Apache Spark
Источники
- https://medium.com/disney-streaming/about
- https://medium.com/disney-streaming/a-step-by-step-guide-for-debugging-memory-leaks-in-spark-applications-e0dd05118958
- https://databricks.com/glossary/what-is-databricks-runtime
- https://databricks.com/blog/2017/10/17/arbitrary-stateful-processing-in-apache-sparks-structured-streaming.html
- https://support.datafabric.hpe.com/s/article/Spark-Troubleshooting-guide-Memory-Management-How-to-troubleshooting-out-of-memory-OOM-issues-on-Spark-Executor