В этой статье по обучению дата-инженеров и разработчиков Big Data рассмотрим, как эффективно записать большие данные в СУБД PostgreSQL с применением Apache Spark. Читайте далее, чем отличается foreach() от foreachBatch(), как это связано с количеством подключений к БД, асимметрией разделов и семантикой доставки сообщений.
Как Spark-приложение записывает данные в PostgreSQL
Если в вашем конвейере данных Spark-приложение отправляет записи в объектно-реляционную базу PostgreSQL, стоит помнить о следующих особенностях этого Big Data фреймворка [1]:
- пакетный или потоковый режим работы;
- количество разделов Spark;
- асимметрия или перекос (Skew) данных в разделах Spark;
- применение метода foreach() или foreachBatch() в потоковой передаче;
- SQL-оператор для записи данных в БД.
Каждый из этих аспектов мы подробнее рассмотрим далее.
Пакеты или потоки Big Data
Пакетный или потоковый режим работы конвейера данных определяет выбор вычислительного примитива и соответствующей библиотеки – Spark Streaming или Structured Streaming, об отличиях которых мы писали здесь. В случае Spark Streaming обработка данных идет в виде дискретизированного потока (DStream на основе RDD) из микро-пакетов данных, а Structured Streaming оперирует с высокоуровневыми и оптимизированными API Spark SQL DataFrame и DataSet практически в реальном времени. Поэтому именно Structured Streaming отлично пойдет, если нужно, например, прочитать 10 миллионов строк и записать их в СУБД за 10–15 секунд.
Как количество разделов Spark влияет на число подключений к БД
В Spark Streaming можно использовать встроенный метод записи в СУБД, который отсутствует в Structured Streaming. Вместо него есть приемники foreach() и foreachbatch(), у которых число соединений для подключения к БД зависит от количества разделов. По умолчание количество одновременных подключений в Postgres ограничено значением 100. Поэтому выбор разделов определяется тем, сколько Spark-заданий и приложений одновременно пытаются получить доступ к БД. Увеличение этого параметра снижает производительность базы данных и общую скорость всего Big Data конвейера=, поскольку каждое соединение PostgreSQL потребляет оперативную память для управления соединением или клиентом, использующим его. В случае большого количества подключений рекомендуется использовать pg_bouncer или драйвер PostgreSQL, чтобы объединить их и сократить потребление [2].
Перекосы Spark и запись данных в СУБД
Асимметрия или перекос (Skew) данных в разделах Spark происходит из-за их неравномерного распределения и зависит от источника. Исправить это можно, явно указав количество нужных разделов Spark с помощью метода dataframe.repartition(), который перетасует данные между ними и распределит их поровну. Это займет несколько секунд в зависимости от объема данных. Эта проблема отсутствует, если источником данных является Kafka, где есть возможность равномерно распределять данные между разделами топика. В этом случае нужно просто поддерживать одни и те же разделы при записи в БД.
Foreach() vs foreachBatch()
Оба этих метода позволяют применять произвольные операции и записывать логику к выходным данным потокового запроса Spark. Однако, у них разные варианты использования: foreach() позволяет настраивать логику записи для каждой строки, а foreachBatch() допускает произвольные операции и настраиваемую логику на выходе каждого микро-пакета потокового запроса.
С foreachBatch() можно делать следующее [3]:
- повторно использовать существующие источники пакетных данных;
- обеспечить запись в несколько расположений, отправив выходной DataFrame или Dataset несколько раз. Чтобы избежать повторных вычислений в случае сбоя, рекомендуется кэшировать выходные данные, а затем извлекать их из кэша.
- Выполнять дополнительные операции DataFrame, которые не поддерживаются в Spark Streaming, с самостоятельной разработкой сквозной семантики их выполнения. По умолчанию foreachBatch обеспечивает только гарантию записи хотя бы один раз (at-least-once). Предотвратить дублирование данных позволит идентификатор микро-пакета (batchId), проверка которого имитирует строго-однократную доставку сообщений (exactly once).
Однако, поскольку foreachBatch() работает с микро-пакетами, он не подходит для непрерывной обработки. В этом случае следует использовать foreach(), который работает следующим образом [3]:
- логика записи данных реализуется с помощью методов open(), process() и close();
- при запуске потокового запроса Spark вызывает функцию или методы объекта так, что одна его копия отвечает за все данные, генерируемые одной задачей в запросе, и один экземпляр отвечает за обработку одного раздела распределенных данных.
- объект должен быть сериализуемым, потому что каждая задача получит новую сериализованную-десериализованную копию предоставленного объекта;
- любую инициализацию записи данных, например, открытие соединения или запуск транзакции, рекомендуется выполнять после вызова метода open(), чтобы задача была готова к генерации данных;
- в каждом разделе (с partition_id) для каждого пакета или эпохи потоковой передачи данных (epoch_id) вызывается метод open(partitionId, epochId). Если он возвращает значение true, то для каждой строки в разделе и пакете/эпохе вызывается метод process(row);
- метод close(error) вызывается с ошибкой при обработке строк и возвращается успешно независимо от возвращаемого значения, кроме случаев, сбоя процесса JVM или Python;
Spark не гарантирует одинаковый вывод для (partitionId, epochId), поэтому так нельзя предотвратить дублирование данных, например, если источник предоставляет различное количество разделов или их изменяет оптимизация Spark. Поэтому при строгих требованиях к отсутствию дублирования в результатах следует использовать foreachBatch().
SQL-оператор для записи данных в СУБД
Чаще всего для записи данных в БД используется оператор INSERT. Например, встроенный метод DStream работает именно так. Однако, в случае огромного числа записей применяется оператор COPY, который записывает данные быстрее. В Spark-приложении для отправки записей в Postgres пригодится пакета COPY Manager. Он подходит как для приемника foreach(), так и для foreachbatch() и выполняет следующие действия [1]:
- считывание DataFrame;
- выполнение требуемых преобразований;
- для последнего датафрейма с помощью foreach() метод open() открывает соединение с БД и инициализирует необходимую переменную, метод process() выполняет любое преобразование на уровне строки и записывает его в построитель строк, обеспечивая единый поток ввода вместо построчной записи, а метод close() записывает построитель строк в БД и закрывает соединение.
- При использовании приемника foreachbatch() для заключительного датафрейма каждый микропакет разбивается на разделы, откуда строки добавляются строки в построитель строк, который записывается в БД.
Больше подробностей по разработке потоковых и пакетных приложений аналитики больших данных с Apache Spark вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Основы Apache Spark для разработчиков
- Анализ данных с Apache Spark
- Потоковая обработка в Apache Spark
- Машинное обучение в Apache Spark
- Графовые алгоритмы в Apache Spark
Источники