Интеграция приложений Apache Spark с облачными хранилищами

Spark HDFS S3. Spark разработка примеры курсы обучение, Spark для дата-инженера и разработчика, обучение Apache Spark Школа Больших Данных Учебный Центр Коммерсант

Чем объектное хранилище данных отличается от классической файловой системы POSIX, как это влияет на разработку Spark-приложений, почему операция переименования снижает производительность облачных вычислений и что поможет ее избежать.

Еще раз об отличиях объектных и файловых хранилищ и как это влияет на приложения Spark

Будучи компонентом экосистемы Apache Hadoop, фреймворк Spark для создания распределенных приложений, изначально настроен на работу с данными, хранящимися в HDFS. Однако, сегодня вместо файловых хранилищ все чаще используются объектные, предлагаемые крупными провайдерами облачных услуг. Несмотря на коммитеры, которые решают проблему совместимости объектного хранилища AWS S3 с HDFS, о чем мы писали здесь, при разработке Spark-приложений, взаимодействующих с объектными хранилищами надо учитывать их особенности.

В частности, объектное хранилище не является классической файловой системой POSIX. Для безотказного хранения сотен петабайт данных объектные хранилища заменяют классическое дерево каталогов файловой системы на более простую модель objectname => data. Для обеспечения удаленного доступа операции над объектами обычно используется REST API. Но HTTP-операции довольно медленные. Приложение Apache Spark может читать и записывать данные в объектные хранилищах через коннекторы файловой системы Hadoop или от облачных провайдеров. Эти коннекторы делают хранилища объектов похожими на файловые системы с каталогами и файлами и классическими операциями над ними, такими как показать список, удалить и переименовать. Тем не менее, объектные хранилища не могут полностью заменить кластерную файловую систему HDFS по следующим причинам:

  • способы эмуляции каталогов замедляют работу с ними;
  • операции переименования могут выполняться очень медленно и в случае неудачи оставлять хранилище в неизвестном состоянии;
  • поиск внутри файла может требовать новых HTTP-вызовов, снижая производительность.

Эти отличия существенно влияют на Spark-приложения. В частности, чтение и запись данных происходят медленнее, чем работа с обычной файловой системой. Сканирование структуры каталогов может быть неэффективно во время расчета разделения запроса на несколько параллельных операций. Алгоритм на основе переименования, с помощью которого Spark обычно фиксирует задание при сохранении своих структур данных (RDD, DataFrame или Dataset), потенциально является медленным и ненадежным. Поэтому не безопасно использовать хранилище объектов в качестве прямого назначения запросов или промежуточного хранилища в цепочке запросов. Более того, ни один из коннекторов облачного хранилища не дает никаких гарантий относительно того, как клиенты справятся с объектами, которые перезаписываются, пока поток их читает. Не гарантии, что старый файл можно безопасно прочитать, или что есть какой-то ограниченный период времени, в течение которого изменения станут видимыми. Также нет гарантии, что клиенты не выйдут из строя, если читаемый файл будет перезаписан. Поэтому рекомендуется избегать перезаписи файлов в тех случаях, когда другие клиенты будут их активно читать.

Для объектных хранилищ, модель согласованности которых подразумевает, что фиксации на основе переименования безопасны, рекомендуется использовать 2-ю версию алгоритма FileOutputCommitter, если производительность в приоритете. Когда в приоритете безопасность, лучше подойдет версия 1. Это задается в настройке

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version X

где x – номер версии (1 или 2).

Исходный алгоритм фиксации 1-ой версии переименовывает выходные данные успешных задач в каталог попыток выполнения задания, а затем переименовывает все файлы в этом каталоге в конечное место назначения во время фазы фиксации задания. Версия 2 делает меньше переименований в конце задания, чем версия 1. Тем не менее, из-за вызова функции rename() для фиксации файлов, этот алгоритм небезопасно использовать, когда объектное хранилище не имеет согласованных метаданных или списков.

Коммиттер также можно настроить на игнорирование сбоев при очистке временных файлов. Это снижает риск того, что временная проблема сети вызовет сбой всего задания Spark. Для этого параметру spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored присваивается значение true:

spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored true

Медленная производительность имитированных переименований на Amazon S3 делает этот алгоритм очень неэффективным. Поэтому рекомендуется использовать коммиттер S3 без переименований (Zero Rename).

Поскольку хранение файлов в облачных платформах платное, функция удаления временных каталогов вызывается регулярно. Для AWS S3 рекомендуется задать ограничение, как долго многокомпонентные загрузки могут оставаться невыполненными. Это позволит сэкономить на оплате счетов за незавершенные загрузки. Для облака Google переименование каталога выполняется пофайлово. Поэтому лучше использовать коммиттер версии 2 и писать код, который генерирует идемпотентный вывод, включая имена файлов, поскольку он более безопасен и работает быстрее.

Потоковая обработка данных в объектных хранилищах

Spark Streaming может отслеживать файлы, добавленные в объектное хранилище FileInputDStream через отслеживание пути посредством вызова StreamingContext.textFileStream(). Время сканирования новых файлов пропорционально количеству файлов в пути, а не количеству новых файлов. Поэтому сканирование может быть медленной операцией. Справиться с этим поможет установление размера окна. Файлы появляются в объектном хранилище только после того, как они полностью записаны, без необходимости в записи-переименовании. Приложения могут писать напрямую в контролируемый каталог. В случае файлового менеджера контрольных точек по умолчанию FileContextBasedCheckpointFileManager, контрольные точки должны быть установлены только в хранилище, реализующем быструю и атомарную операцию rename(). Иначе контрольные точки могут быть медленными и ненадежными. На AWS S3 с Hadoop от 3.3.1 и выше с использованием коннектора S3A можно использовать прерываемый потоковый файловый менеджер контрольных точек, установив конфигурацию spark.sql.streaming.checkpointFileManagerClass в значение org.apache.spark.internal.io.cloud.AbortableStreamBasedCheckpointFileManager. Это устранит медленное переименование. Однако, следует избегать повторного использования местоположения контрольной точки среди нескольких запросов, работающих параллельно, поскольку это может привести к повреждению данных контрольных точек.

При использовании динамической перезаписи разделов в Apache Spark, когда таблицу можно обновить, заменив только те разделы, в которые добавлены новые данные нужно внимательно помнить о переименовании файлов. Функция динамической перезаписи разделов используется в Spark SQL в форме INSERT OVERWRITE TABLE, а также когда наборы данных записываются в режиме перезаписи:

eventDataset.write
  .mode("overwrite")
  .partitionBy("year", "month")
  .format("parquet")
  .save(tablePath)

Из-за того, что функция использует переименование файлов, к файловой системе применяются особые требования:

  • рабочий каталог коммиттера должен находиться в целевой файловой системе;
  • целевая файловая система должна эффективно поддерживать переименование файлов.

Эти условия не выполняются коммиттерами S3A и хранилищем AWS S3. Коммиттеры других облачных хранилищ, которые поддерживают эту функцию, являются полностью совместимыми с Apache Spark. Если коммиттер несовместим, операция динамической перезаписи раздела завершится неудачей с сообщением об ошибке PathOutputCommitter does not support dynamicPartitionOverwrite. Если для целевой файловой системы нет совместимого коммиттера, придется использовать облачный формат хранения данных.

В заключение отметим, что задания Spark должны проходить аутентификацию в объектных хранилищах для доступа к данным в них. Например, в AWS S3 обычно это обычно реализуется автоматически: команда отправки Spark-задания в кластер spark-submit считывает переменные среды AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY и AWS_SESSION_TOKEN и устанавливает соответствующие параметры аутентификации для коннекторов s3n и s3a для Amazon S3. В кластере Hadoop настройки могут быть заданы в конфигурационном файле core-site.xml. Данные аутентификации можно вручную добавить в конфигурацию Spark в конфигурационном файле spark-defaults.conf или программно задать в экземпляре SparkConf, используемом для настройки контекста приложения Spark. Разумеется, нельзя хранить секреты аутентификации в репозиториях исходного кода, особенно в публичных.

Узнайте больше про использование Apache Spark для разработки приложений аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://spark.apache.org/docs/latest/cloud-integration.html
  2. https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/filesystem/introduction.html
Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту