Сегодня рассмотрим несколько простых способов ускорить обработку больших данных в рамках конвейера задач Apache Spark. Читайте далее про важность тщательной оценки входных и выходных данных, рандомизацию рабочей нагрузки Big Data кластера и замену JOIN-операций оконными функциями.
Оптимизируй это: почему конвейеры аналитической обработки больших данных с Apache Spark замедляются
Обычно со временем объемы данных только растут, а логика их обработки становится только сложнее. Поэтому каждый дата-инженер сталкивается с проблемой повышения производительности конвейера обработки Big Data, пытаясь решить ее наиболее эффективно, что означает максимум положительных результатов при минимальных изменениях архитектуры самого pipeline’а. Для этого можно воспользоваться следующими 3-мя способами:
- навести порядок в выводе конвейера;
- сбалансировать нагрузку с помощью рандомизации;
- заменить JOIN-соединения оконными функциями.
Каждый из этих способов мы подробнее рассмотрим далее, а сначала разберем на практическом примере, когда и почему проблема ускорения Spark-конвейера становится актуальной. Представьте Data Pipeline из тысячи независимых Spark-приложений, каждое из которых принимает данные клиента в Amazon S3, выполняет длинную цепочку преобразований и, наконец, снова записывает выходные данные в это облачное хранилище. Все эти приложения Big Data реализованы на PySpark в AWS EMR и из них сотни могут запускаться одновременно. При этом данные одного клиента могут быть намного больше, чем у другого, поэтому все независимые друг от друга Spark-приложения потребляют разное количество ресурсов. Иногда полный цикл обработки данных может занимать до 24 часов [1]. Этот пример наглядно иллюстрирует, почему по мере разрастания Big Data инфраструктуры, объема данных и логической сложности распределенных приложений возникает необходимость в ускорении Spark-конвейеров.
1. Приведите в порядок входные и выходные данные
Самый простой способ оптимизации производительности pipeline’а – это уменьшение объема данных в приложении, когда на вход принимаются только необходимых сырые данные, а промежуточные датафреймы остаются компактными. Здесь можно поиграть с источниками сырых данных, например, используя альтернативу с меньшей степенью детализации. На практике это может принести до 50% повышения производительности.
Если замена источника данных невозможна, есть смысл использовать другой формат данных, например, компактный Parquet с помощью команды input=spark.read.parquet(«fs: //path/file.parquet») .select (…), чтобы считывать только те столбы, которые действительно нужны.
Аналогичным чтению, запись меньшего объема выходных данных в целевой каталог также повышает скорость конвейера. К примеру, уменьшив размер вывода на 40%, можно увидеть ускорение Spark-pipeline’а на 2 часа. Дополнительным преимуществом этого простого способа повышения производительности является сокращение стоимости хранения данных и затрат на последующие процессы обработки ненужных строк. Таким образом, внимательность к объему и формату как входных, так и выходных данных повышает эффективность всей Big Data системы в целом.
2. Сбалансируйте рабочую нагрузку с помощью рандомизации
Оптимизируя производительность Spark-приложений, стоит помнить про архитектуру и особенности работы этого Big Data фреймворка: количество ядер и размер памяти для исполнителей, число разделов, уровень параллелизма, количество узлов в кластере и пр. Например, накладные расходы на передачу данных по сети могут составлять около 4-х часов. Так, уменьшение значения конфигурационного параметра spark.sql.shuffle.partitions, которое настраивает количество разделов при перетасовке данных для JOIN-операций или агрегаций [2], повышает скорость конвейера. Хотя много разделов повышает отказоустойчивость, их уменьшение позволяет кластеру избегать частой отправки данных, экономя сетевые ресурсы для вычислений в памяти.
В случае большого количества Spark-приложений, упакованных в единый конвейер, возникает необходимость «оптовой» оптимизации всего pipeline’а, не спуская на уровень каждого задания в отдельности. Обычно у дата-инженера нет возможности настраивать и назначать каждому отдельному приложению собственную идеальную конфигурацию ресурсов. В такой ситуации можно сегментировать клиентов каждого приложения по объему обрабатываемых данных, когда клиенты с большим объемом данных получают большую конфигурацию памяти, чем клиенты с небольшим объемом данных. Однако, такой подход чреват проблемой «длинного хвоста», когда последнее приложение конвейера занимает очень много времени, а большие задания блокируют более мелкие. Самый простой способ решения этой проблемы – рандомизация приложений, которая делает рабочую нагрузку равномерно распределенной, упрощая выделение ресурсов на уровне кластера, а также смешивает приложения разных размеров вместе, уменьшая время простоев [1].
3. Замените JOIN-операции оконными функциями
Мы уже рассказывали, что соединение (Join) считается достаточно сложной операцией в SQL, особенно когда речь идет о потоках данных. При обработке Big Data внутри Spark-конвейера входные датафреймы постоянно увеличиваются, создаются новые столбцы и промежуточные таблицы. Однако, когда цепочка обработки данных становится слишком длинной, простое соединение между промежуточными таблицами на более поздних этапах процесса может вызвать ошибку памяти. В этом случае не помогают даже методы кэширования промежуточных данных, такие как persist(), о котором мы писали здесь.
Чтобы избежать проблем с памятью Spark-приложений, можно использовать оконные функции для создания новых столбцов вместо Join-соединений. Это не только решает проблемы с памятью, но и повышает производительность на десятки %. В частности, в соединении участвуют две таблицы, т.е. идет работа со всеми строками. А оконная функция задействует всего несколько столбцов. Разумеется, написать оконную функцию немного сложнее чем SQL-запрос с JOIN-оператором, но такая замена упрощает внутреннюю реализацию графа вычислений [1].
В продолжение этого приема стоит отметить, что оптимизацию Spark-конвейера можно глубже спускать на уровень разработки Big Data приложения, например, настраивая производительность SQL-запросов через кэширование данных в памяти, использование широковещательных переменных, shuffle-разделы и т.д. [2]. Однако, как уже было отмечено выше, это точечные техники для одного отдельно взятого приложения, а не общая стратегия повышения производительности всего Big Data Pipeline’а. Поэтому 3 простых, но действенных способа ускорения Spark-конвейера, описанные в данной статье, пригодятся каждому дата-инженеру. О том, почему тормозят отдельные задачи Спарк и как ускорить их запуск, читайте в нашей следующей статье. А про экономику Spark-приложений и их оптимизацию с точки зрения руководителя, мы поговорим завтра.
Освойте практику инженерии больших данных с Apache Spark на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Core Spark – Основы Apache Spark для разработчиков
- Построение конвейеров обработки данных с Apache Airflow и Arenadata Hadoop
- Hadoop для инженеров данных
Источники