На заметку разработчику: 3 причуды Apache Spark и как с ними бороться

курсы Apache Spark для разработчиков, разработка Spark, Apache Spark AWS S3 коннекторы, обучение разработчиков Apache Spark, Школа Больших Данных Учебный Центр Коммерсант

Развивая наши курсы по Apache Spark, сегодня мы рассмотрим несколько особенностей, с разработчик которыми может столкнуться при выполнении обычных операции, от чтения архивированного файла до обращения к сервисам Amazon. Читайте далее, что не так с методом getDefaultExtension(), зачем к AWS S3 так много коннекторов и почему PySpark нужно дополнительно конфигурировать вручную.

Сжатие и распаковка файлов в Apache Spark

Spark делает подключение к источникам данных простой операцией, предоставляя широкий выбор коннекторов и простой синтаксис для различных форматов: JSON, Parquet, ORC, таблицы СУБД и пр. При этом фреймворк умеет работать со многими популярными форматами сжатия файлов: SNAPPY, ZLIB, LZO, GZIP, BZIP2 и пр., отлично применяя правильные кодеки для сжатия RDD и Dataframe. Кроме того, из Apache Hadoop возможно заимствование других кодеков, например, gzip | .gz | org.apache.hadoop.io.compress.GzipCodec.

Обычно с этими файлами не возникает никаких проблем. Однако, если прочитать из корзины AWS S3 с именем test простой файл file_name, который содержит данные JSON и сжат с помощью gzip, результат вас удивит:

Команда на чтение

spark.read.json («s3a: // test / file_name «)

spark.printSchema ()

Результат:

 root

 | — _corrupt_record: строка (nullable = true)

Такой вывод получился из-за того, что Apache Spark полагается на расширения файлов, чтобы определять тип сжатия с помощью метода getDefaultExtension(). По умолчанию для файла gzip – должно быть расширение .gz, поэтому отсутствие расширения вызывает путаницу. Таким образом, файлы должны иметь соответствующие расширения. Обойти это ограничение из примера можно, самостоятельно расширив GzipCodec и переопределив метод getDefaultExtension():

package dre.spark.util.codecs

import org.apache.hadoop.io.compress.GzipCodec

class NoExtensionGzipCodec extends GzipCodec {

override def getDefaultExtension(): String = «»

}

Примечательно, что AWS Glue, serverless ETL-решение от Amazon, сперва сканирует каталог данных, а затем используется динамических фреймы, чтобы без проблем обрабатывать сжатие и распаковку файлов даже без точных расширений.

Особенности AWS S3: разделы и коннекторы

Наличие разделов и корзин в AWS S3 обеспечивает высокую производительность и распараллеливание при чтении и записи данных. Однако, при работе с этим облачным хранилищем стоит помнить, что Apache Spark очень требователен к именам сегментов для идентификации столбцов раздела. В частности, partitioning выполняется в стиле Hive, о чем мы упоминали здесь. А AWS Glue может идентифицировать столбцы разделов без атрибутивной информации и создавать безымянные столбцы в таблице в виде partition-0, partition-1 и т.д. [1]

Наконец, стоит отметить разнообразие коннекторов Spark к AWS S3. Например, S3 Select позволяет приложениям извлекать из объекта только часть данных, перекладывая работу по фильтрации больших датасетов с Amazon EMR на S3. Это может повысить производительность Spark-приложений и сократить объем данных, передаваемых между EMR и S3. S3 Select поддерживается файлами CSV и JSON с использованием значений s3selectCSV и s3selectJSON для указания формата данных. При этом некоторые Spark-параметры форматов CSV и JSON, такие как nanValue, positiveInf, negativeInf и режимы failfast и dropmalformed, связанные с поврежденными записями, не поддерживаются.

Также в AWS есть коммиттер, оптимизированный для EMRFS S3 для многокомпонентной загрузки EMRFS и повышения производительности при записи файлов Parquet в Amazon S3 с использованием Spark SQL, DataFrames и DataSets. Напомним, EMRFS – это файловая система EMR File System в кластерах Amazon EMR, позволяет использовать сервис S3 в качестве уровня хранения для Apache Hadoop [2]

Еще есть коннекторы S3n и S3a, которые отличаются отличаются производительностью и объемом передаваемых данных. Например, через URI-схему s3n: //… можно предавать файл не более 5ГБ, а у s3a: //… нет такого ограничения [1].

PySpark и Hadoop

При разработке кода на PySpark стоит помнить, что по умолчанию он не всегда включает все необходимые библиотеки Apache Hadoop и нужные зависимости. Pyspark является частью фреймворка Spark и не содержит всех jar-файлов. Например, в нем могут отсутствовать свежие пакеты hadoop-aws. Поэтому следует загружать последний дистрибутив Apache Spark в комплекте с последней версией Hadoop и в переменной среды SPARK_HOME указать место загрузки.

О других причудах этого фреймворка, связанных с API DataFrame, читайте в нашей новой статье. А подробнее разобраться с особенностями разработки распределенных приложений Apache Spark для аналитики больших данных вам помогут специализированные курсы в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

 

 

Источники

  1. https://aseficha.medium.com/demystifying-apache-spark-quirks-2c91ba2d3978
  2. https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-s3-performance.html

 

Поиск по сайту