Развивая наши курсы по Apache Spark, сегодня мы рассмотрим несколько особенностей, с разработчик которыми может столкнуться при выполнении обычных операции, от чтения архивированного файла до обращения к сервисам Amazon. Читайте далее, что не так с методом getDefaultExtension(), зачем к AWS S3 так много коннекторов и почему PySpark нужно дополнительно конфигурировать вручную.
Сжатие и распаковка файлов в Apache Spark
Spark делает подключение к источникам данных простой операцией, предоставляя широкий выбор коннекторов и простой синтаксис для различных форматов: JSON, Parquet, ORC, таблицы СУБД и пр. При этом фреймворк умеет работать со многими популярными форматами сжатия файлов: SNAPPY, ZLIB, LZO, GZIP, BZIP2 и пр., отлично применяя правильные кодеки для сжатия RDD и Dataframe. Кроме того, из Apache Hadoop возможно заимствование других кодеков, например, gzip | .gz | org.apache.hadoop.io.compress.GzipCodec.
Обычно с этими файлами не возникает никаких проблем. Однако, если прочитать из корзины AWS S3 с именем test простой файл file_name, который содержит данные JSON и сжат с помощью gzip, результат вас удивит:
Команда на чтение
spark.read.json («s3a: // test / file_name «)
spark.printSchema ()
Результат:
root
| — _corrupt_record: строка (nullable = true)
Такой вывод получился из-за того, что Apache Spark полагается на расширения файлов, чтобы определять тип сжатия с помощью метода getDefaultExtension(). По умолчанию для файла gzip – должно быть расширение .gz, поэтому отсутствие расширения вызывает путаницу. Таким образом, файлы должны иметь соответствующие расширения. Обойти это ограничение из примера можно, самостоятельно расширив GzipCodec и переопределив метод getDefaultExtension():
package dre.spark.util.codecs
import org.apache.hadoop.io.compress.GzipCodec
class NoExtensionGzipCodec extends GzipCodec {
override def getDefaultExtension(): String = «»
}
Примечательно, что AWS Glue, serverless ETL-решение от Amazon, сперва сканирует каталог данных, а затем используется динамических фреймы, чтобы без проблем обрабатывать сжатие и распаковку файлов даже без точных расширений.
Особенности AWS S3: разделы и коннекторы
Наличие разделов и корзин в AWS S3 обеспечивает высокую производительность и распараллеливание при чтении и записи данных. Однако, при работе с этим облачным хранилищем стоит помнить, что Apache Spark очень требователен к именам сегментов для идентификации столбцов раздела. В частности, partitioning выполняется в стиле Hive, о чем мы упоминали здесь. А AWS Glue может идентифицировать столбцы разделов без атрибутивной информации и создавать безымянные столбцы в таблице в виде partition-0, partition-1 и т.д. [1]
Наконец, стоит отметить разнообразие коннекторов Spark к AWS S3. Например, S3 Select позволяет приложениям извлекать из объекта только часть данных, перекладывая работу по фильтрации больших датасетов с Amazon EMR на S3. Это может повысить производительность Spark-приложений и сократить объем данных, передаваемых между EMR и S3. S3 Select поддерживается файлами CSV и JSON с использованием значений s3selectCSV и s3selectJSON для указания формата данных. При этом некоторые Spark-параметры форматов CSV и JSON, такие как nanValue, positiveInf, negativeInf и режимы failfast и dropmalformed, связанные с поврежденными записями, не поддерживаются.
Также в AWS есть коммиттер, оптимизированный для EMRFS S3 для многокомпонентной загрузки EMRFS и повышения производительности при записи файлов Parquet в Amazon S3 с использованием Spark SQL, DataFrames и DataSets. Напомним, EMRFS – это файловая система EMR File System в кластерах Amazon EMR, позволяет использовать сервис S3 в качестве уровня хранения для Apache Hadoop [2].
Еще есть коннекторы S3n и S3a, которые отличаются отличаются производительностью и объемом передаваемых данных. Например, через URI-схему s3n: //… можно предавать файл не более 5ГБ, а у s3a: //… нет такого ограничения [1].
PySpark и Hadoop
При разработке кода на PySpark стоит помнить, что по умолчанию он не всегда включает все необходимые библиотеки Apache Hadoop и нужные зависимости. Pyspark является частью фреймворка Spark и не содержит всех jar-файлов. Например, в нем могут отсутствовать свежие пакеты hadoop-aws. Поэтому следует загружать последний дистрибутив Apache Spark в комплекте с последней версией Hadoop и в переменной среды SPARK_HOME указать место загрузки.
О других причудах этого фреймворка, связанных с API DataFrame, читайте в нашей новой статье. А подробнее разобраться с особенностями разработки распределенных приложений Apache Spark для аналитики больших данных вам помогут специализированные курсы в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Основы Apache Spark для разработчиков
- Анализ данных с Apache Spark
- Потоковая обработка в Apache Spark
- Машинное обучение в Apache Spark
- Графовые алгоритмы в Apache Spark
Источники
- https://aseficha.medium.com/demystifying-apache-spark-quirks-2c91ba2d3978
- https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-s3-performance.html