В рамках обучения разработчиков Spark-приложений, аналитиков данных и дата-инженеров, сегодня рассмотрим, как улучшить и визуализировать понимание обработки данных в этом Big Data фреймворке. Читайте далее про API встроенных механизмов наблюдения за качеством данных в Apache Spark и открытые библиотеки профилирования на примере Deequ.
2 уровня абстракции мониторинга Spark-приложений для дата-инженера
Из-за высокой популярности Apache Spark и большой глубиной его распространения в production, для разработчика и дата-инженера критически важно иметь средства мониторинга ключевых метрик этой распределенной среды. Таблицы и графики встроенного веб-интерфейса фреймворка, о возможностях которого мы рассказывали здесь, информируют о загрузке кластера и состоянии заданий, а не о фактических показателях, связанных с данными. Поэтому при проблемах, вызванными изменением схемы данных или их некорректным представлениям, типовые вкладки Spark UI практически бесполезны. Чтобы сократить время на поиск источника проблемы, дата-инженеру нужен фактический мониторинг распределенных приложений и качества обрабатываемых данных.
В таком мониторинге можно выделить 2 уровня абстракции:
- низкий – на основе внутренних инструментов Spark, такие как Listener API и Query Execution Listeners;
- высокий – с использованием внешних систем и библиотек отслеживания показателей качества данных.
Хотя высокоуровневые подходы более гибкие, они имеют большой недостаток: снижение производительности. Поскольку каждый расчет вызывает дополнительную Spark-операцию, общие накладные расходы могут существенно вырасти, особенно для больших наборов данных. Каждая операция проверки качества данных может привести к полному сканированию датасета. Поэтому прежде чем приступать к профилированию данных, следует убедиться, что оно не повредит производительности всей системы в целом. Далее рассмотрим, как реализуются оба подхода.
Низкоуровневый мониторинг со слушателями
Самым надежным способом получения значений фактических показателей является API слушателей (Listener). Именно этот механизм использует UI фреймворка. API слушателей Spark позволяет разработчикам отслеживать события, которые среда генерирует во время выполнения приложения: старт и финиш приложения, задания, этапа и пр. Эти слушатели можно настроить и использовать для сбора собственных показателей. После выполнения каждой операции Spark вызовет Listener и передаст ему эти метаданные, включая время выполнения, чтение/запись строк, байтов и другую подобную информацию.
Такой простой и низкоуровневый мониторинг качества данных будет проверять количество и размер записей. Например, задание запускается ежедневно, выполняя операции аналитики больших данных на входящих датасетах. Можно написать слушателя, который проверяет, сколько записей было прочитано сегодня, и сравнивает показатель с результатом предыдущего дня. Если разница значительна и превышает допустимый порог отклонений, можно предположить, что возникли проблемы с источником данных и оперативно решать их, не тратя время на поиск причин в самом распределенном приложении.
Разработав собственный Listener, его можно добавить в приложение несколькими способами:
- программно
SparkSession spark = SparkSession.builder().getOrCreate(); spark.sparkContext().addSparkListener(new SomeSparkListener());
- в виде параметра через spark-submit:
spark-submit --conf "spark.extraListeners=ai.databand.SomeSparkListener"
Несмотря на простоту идеи, этот подход требует разработки собственных решений мониторинга: необходимо хранилище значений метрик и настройка механизмов оповещения. Также стоит помнить, что при изменении кода приложения, все ключи метрик тоже изменятся.
Аналогичный низкоуровневый прием работает и для Spark SQL. Cлушиватель выполнения запросов позволяет разработчикам подписываться на события завершения запроса. Он предоставляет метаданные более высокого уровня о выполняемых запросах: логические и физические планы, а также показатели выполнения: записи, прочитанные или записанные по запросу, а не для отдельных задач, заданий или этапов.
Планы также содержат такую полезную информацию, как расположение данных и схема, которую можно извлекать и сохранить вместе с размерами датафрейма, чтобы сравнить с предыдущими запусками и вызвать предупреждение, когда что-то идет не так. Однако извлечение данных из плана может быть сложным, поскольку потребуется разрабатывать код, используя низкоуровневый API. Кроме того, остаются все операционные нагрузки, связанные с внедрением механизмов хранения метрик и предупреждений. Разработанные слушатели выполнения запросов могут быть добавлены к приложению программно:
SparkSession spark = SparkSession.builder().getOrCreate(); spark.listenerManager().register(new ExampleQueryExecutionListener()); или через spark-submit: spark-submit --conf "spark.extraListeners=ai.databand.ExampleQueryExecutionListener"
Хотя реализация такого низкоуровневого мониторинга требует усилий, этот способ имеет огромное преимущество: отсутствие дополнительных вычислительных затрат. Благодаря тому, что метаданные генерируются и записываются внутренними компонентами фреймворка, их получение не увеличивает время выполнения запросов. Кроме того, использование Listener”ов для мониторинга возможно даже без изменения кода приложения! Это особенно актуально для отслеживания данных по существующим и устаревшим приложениям, при невозможности их изменений. Достаточно просто написать своего слушателя, передать его через конфигурацию spark-submit и наблюдать данные.
Высокоуровневый мониторинг
Самым тривиальным способ проверки качества данных является ручной. Например, ожидается не менее X записей во входном источнике данных. Тогда простейший код проверки качества входящих данных на PySpark будет выглядеть следующим образом [1]:
df = spark.read("path") if (df.count < X) { throw new RuntimeException("Input data is missing") }
Разумеется, можно проверять не только количество записей, но и определять число ненулевых значений, предполагаемые схемы данных и пр. Однако, ручная предобработка данных достаточно трудоемка. Поэтому современные дата-специалисты чаще всего используют автоматизированные инструменты, которые реализуют все типовые проверки качества данных. Одной из них является библиотека Deequ. Она предоставляет богатый предметно-ориентированный язык (DSL) для большинства сценариев проверки качества данных, а также включает возможность вычисления базовых статистик, построения гистограмм, обнаружения аномалий и другие полезные функции.
Deequ построена на основе Apache Spark для проведения unit-тестов для больших наборов данных, чтобы обнаружить ошибки и неточности в датасете до этапа анализа и ML-моделирования. Deequ работает с табличными данными, например, с CSV-файлами, таблицами СУБД, лог-файлами, плоскими структурами JSON – т.е. всеми форматами, которые можно поместить в датафрейм. Большинство приложений, работающих с данными, имеют неявные предположения об этих данных, например, что атрибуты имеют определенные типы, не содержат значений NULL и пр. Если эти предположения нарушаются, Spark-приложение может аварийно завершить работу или выдать некорректные результаты. Идея Deequ состоит в том, чтобы явно сформулировать эти предположения в форме unit-теста для данных, который можно проверить на их фрагменте. Если в данных есть ошибки, их можно исправить, прежде чем передать датасет приложению.
После запуска Deequ переводит тест в серию заданий Spark, которые вычисляют показатели для данных. Затем вызываются заданные функции (предположения) для этих показателей, чтобы проверить, сохраняются ли ограничения на данные. Таким образом результат верификации позволяет увидеть, обнаружил ли тест ошибки, например, пропуски, несоответствия типов данных, выход за границы значений и пр.
Deequ дает возможность сохранять результаты проверок и автоматически запускать сравнения с предыдущими запусками с помощью репозиториев метрик. Также можно написать собственную реализацию и интегрировать Deequ в существующую инфраструктуру мониторинга. Для разработчиков на Python есть PyDeequ, интерфейс Python для Deequ. Читайте в нашей новой статье, как расширить типовой слушатель StreamingQueryListener, который есть в Java и Scala API библиотеки Spark Structured Streaming, но недоступен в PySpark. А здесь мы рассказываем про связанный подход мониторинга качества данных с помощью аккумуляторов — глобальных переменных Apache Spark.
Подготовка данных для Data Mining на Python
Код курса
DPREP
Ближайшая дата курса
по запросу
Продолжительность
32 ак.часов
Стоимость обучения
72 000 руб.
Больше практических примеров использования Apache Spark для разработки распределенных приложений и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Основы Apache Spark для разработчиков
- Анализ данных с Apache Spark
- Потоковая обработка в Apache Spark
- Машинное обучение в Apache Spark
- Графовые алгоритмы в Apache Spark
- https://towardsdatascience.com/apache-spark-monitoring-how-to-use-spark-api-open-source-libraries-to-get-better-data-9f48074a4fc9
- https://github.com/awslabs/deequ