Take() вместо collect() и еще 2 совета по Apache Spark для дата-инженера

обучение Spark SQL примеры курсы обучение, анализ данных Spark, Spark разработка конфигурирование приложений для разработчика примеры курсы обучение, Spark Databrics Lightspeed примеры курсы обучение, Apache Spark Structured Streaming примеры курсы обучение, разработка приложения Spark, Apache Spark разработчик примеры курсы обучение, обучение большим данным, курсы дата-инженер аналитик Big Data, Школа Больших Данных Учебный Центр Коммерсант

Постоянно добавляя в наши курсы для дата-инженеров и разработчиков распределенных Spark-приложений интересные примеры, сегодня мы хотим поделиться с вами простыми, но эффективными приемами, как улучшить производительность этого вычислительного движка. Чем метод take() лучше collect() в Apache Spark, какие открытые инструменты помогут выполнить профилирование кода и как быстро прочитать множество маленьких файлов, написав свою UDF.

Сканирование или сбор данных: take() vs collect()

Напомним, действия над данными в Apache Spark отличаются от преобразований. Преобразования (transformations) – это отложенные (ленивые) вычисления, которые фактически не выполняются сразу, а после материализации запроса и вызове какого-либо действия. Хотя при этом создается план запроса, сами данные все еще находятся в хранилище и ожидают обработки. Действия (actions) – это функции, запрашивающие вывод, при которых создается план запроса и и оптимизируется оптимизатором Spark. А физический план компилируется в RDD DAG, который делится на этапы (stages) и задачи (tasks), выполняемые в кластере. Оптимизированный план запроса генерирует высокоэффективный Java-код, который работает с внутренним представлением данных в формате Tungsten.

При вызове действия collect(), которое выводит все записи на экран, выполняется сбор данных от всех исполнителей и они передаются драйверу. При работе с огромными объемами данных, узлу драйвера может не хватить памяти, ведь все операции в Apache Spark выполняются в памяти. Это приведет к сбою из-за ошибки OOM, Out Of Memory, о которой мы писали здесь. Однако, если данные уже отфильтрованы или достаточно агрегированы, их размер не будет проблемой для драйвера и вызов collect() не станет причиной сбоя.

Если же данных слишком много и/или они еще не прошли первичную обработку, лучше использовать действие take() вместо collect(). Метод take() сканирует первый найденный раздел и возвращает результат. Если в параметрах этой функции указать целое число, т.е. вызвать take(n), она вернет записи, в указанном количестве. Это пригодится, чтобы проверить наличие данных в отдельном датафрейме или просто получить представление о данных, достаточно вызвать take(1).

Профилирование заданий Apache Spark

Поскольку Apache Spark предназначен для распределенных вычислений, для его производительности особенно важно эффективное использование ресурсов драйвера и исполнителей, а также устранение перекоса между задачами и данными. Сюда же можно отнести необходимость решения проблем, связанных с высоким уровнем сбора мусора. Справиться с этим помогают инструменты профилирования данных, которые собирают и анализируют метрики заданий Spark, позволяя дата-инженеру сделать выводы для повышения производительности фреймворка. В разработке ПО профилирование представляет собой форму динамического анализа программы, которая измеряет, например, потребление памяти или сложность алгоритма, использование конкретных инструкций, частоту и продолжительность вызовов функций. Эта информация помогает определить возможности для оптимизации программы и эффективного использования доступных ресурсов, чтобы повысить производительность кода.

Для Apache Spark есть целый ряд инструментов профилирования с открытым исходным кодом, например, Sparklens, sparkMeasure, Sparklint, Dr. Elephant, SparkOscope. В частности, Sparklens включает встроенный симулятор планировщика Spark, написанный на Scala. Его можно использовать с любым Spark-приложением, чтобы понять пределы масштабируемости заданий и определить различные возможности для их эффективной настройки. Также этот инструмент помогает понять, насколько эффективно Spark-приложение использует предоставленные ему вычислительные ресурсы и оценить различные варианты: возможно, приложение будет работать быстрее с большим количеством исполнителей или с большим объемом памяти драйвера.

Чтобы включить Sparklens для заданий Spark, нужно добавить следующие дополнительные параметры конфигурации в spark-submit или spark-shell:

  • packages qubole:sparklens:0.1.2-s_2.11
  • conf spark.extraListeners=com.qubole.sparklens.QuboleJobListener

Для каждого задания Spark, отправленного с конфигурацией Sparklens, отчет Sparklens будет включен по умолчанию. Получить доступ к этим отчетам можно, используя менеджер ресурсов YARN, логи сервера PHS (Persistent History Server, постоянный кластер, который доступен 24/7 в среде Google Cloud Platform Data Proc), JSON-файл в корзине Google Cloud Strorage или интерфейс самого Sparklens.

Быстрое чтение множества маленьких файлов

Будучи инструментом стека Big Data, Apache Spark отлично справляется с обработкой больших наборов данных благодаря своей распределенной и масштабируемой архитектуре. Но из-за того, что он использует Hadoop HDFS в качестве поставщика хранилища состояний, где все данные хранятся в сопоставлении памяти на первом этапе, а затем поддерживаются файлами в файловой системе, совместимой с HDFS, при слишком большом количестве файлов возникает дополнительная нагрузка на метаданные. Особенно это ощутимо, когда эти файлы меньше размера блока HDFS по умолчанию, который обычно варьируется от 64 МБ до 128 МБ. Из-за постоянных обращений к диску во время выполнения вычислений возникают накладные расходы возникают и производительность Spark-задания сильно снижается.

Решить эту проблему можно следующими способами:

  • изменить конфигурацию источника данных, чтобы он выдавал больше данных в файл, создавая меньшее количество больших файлов. А если использовать эффективный столбцовый формат Parquet или строковый AVRO, это может дополнительно улучшить производительность.
  • предварительно обработать пакет небольших файлов, объединив их в файлы большего размера. Проще всего сделать это, написав bash-скрипт для группировки файлов с помощью команды cat. Также можно сжать файлы.
  • написать код Spark-задания таким образом, чтобы файлы можно было принимать параллельно, создавая оптимальное распределение задания между процессорами кластера. При этом нужно получить список файлов для чтения, установить оптимальное количество разделов, создать объект rdd при распределении файлов и создать UDF-функцию для получения содержимого файла из источника. Наконец, надо вызвать эту пользовательскую функцию для каждого раздела, используя метод mapPartitions rdd().

Освойте администрирование и использование Apache Spark для задач дата-инженерии, разработки распределенных приложений и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://medium.com/@msdilli1997/spark-optimization-techniques-681cb85d01f9
  2. https://medium.com/walmartglobaltech/apache-spark-performance-engineering-using-sparklens-62c2da5bfc2
  3. https://medium.com/@fmonteiro.alex/improving-apache-spark-processing-performance-when-reading-small-files-dd240ea4be6d

 

Поиск по сайту