Недавно мы рассматривали оптимизацию SQL-запросов и выполнение JOIN-операций в Apache Spark. Сегодня поговорим, что обеспечивает строго однократную семантику доставку сообщений (exactly once) в этом Big Data фреймворке и как на это влияют особенности микро-пакетной обработки больших данных с помощью заданий Spark Structured Streaming.
Особенности exactly once доставки сообщений в Apache Spark Structured Streaming
Сперва напомним, как обеспечивается гарантия строго однократной доставки сообщений в Apache Spark Structured Streaming. Семантика exactly once означает, что каждое сообщение будет доставлено ровно 1 раз, без повторов (дублей) и потерь данных. Это наиболее оптимальный вариант с точки зрения бизнес-логики, однако, самый сложный в реализации. Чем она отличается от других семантик доставки сообщений (хотя бы 1 раз – at least once и не более 1-го раза at most once), мы рассказывали здесь на примере Apache Kafka. В Apache Spark этот дорогостоящий механизм строго однократной доставки гарантирует, что в случае сбоя одного задания потоковой передачи все последующие получают ровно одну копию данных. Это реализуется с помощью перечисления и определения микро-пакета для обработки, усложняя функционирование больших систем, потребляющих тысячи файлов.
На практике это обеспечивается с помощью контрольных точек исходных данных, позволяя проверять работоспособность задания через обработку всех входных файлов только один раз и самостоятельное восстановление после неудачных запусков.
В Apache Spark параметр spark.sql.streaming.checkpointLocation добавляет каталог HDFS для объекта сеанса Spark, и далее фреймворк использует этот каталог, чтобы хранить информацию о контрольных точках для запросов Structured Streaming. В частности, для контрольной точки исходных данных структурированный потоковый запрос создаст новый каталог в местоположении контрольной точки с queryName, который будет содержать несколько каталогов с конкретной информацией о checkpoint’ах для каждого микропакета и один файл метаданных с деталями запроса. Spark-задание создает один файл для каждого потокового микропакета, включая следующие сведения [1]:
- коммит (commit) — фиксация показателей пакета;
- смещение (offset) для каждого пакета – водяной знак (watermark), метка времени (timestamp) и Spark-конфигурация;
- источник (source) – информация о входном файле, обработанном в конкретном микропакете, с деталями об имени, метке времени и пакетом для обработки;
- метаданные (metadata) с информацией о запросе в виде его идентификатора. Это единственный файл, в котором содержится информация о запросе Spark Structured Streaming.
Важно также фиксировать целевые данные в виде коммитов, чтобы в случае перезапуска задания убедиться, что последний микропакет был успешно завершен. Для этого каждый выходной commit-файл содержит информацию о метриках микропакета: путь к файлу, его имя и размер, флаг isDir, время изменения, размера блока и действия [1].
Про водяной знак каждого пакета стоит отметить, что при выполнении запроса Spark Structured Streaming отслеживает максимальное время события, наблюдаемого в каждом входном потоке, вычисляя watermark на основе соответствующей задержки. Далее выбирается один глобальный водяной знак, который будет использоваться для операций с отслеживанием состояния. По умолчанию в качестве глобального водяного знака выбирается минимум, который гарантирует, что никакие данные не будут случайно сброшены слишком поздно, если один из потоков отстает от других, например, из-за их сбоев. Другими словами, глобальный watermark будет безопасно перемещаться в темпе самого медленного потока, и вывод запроса будет соответственно задерживаться. Если нужно получить более быстрые результаты, даже через удаление данных из самого медленного потока, можно установить политику нескольких водяных знаков, чтобы выбрать максимальное значение в качестве глобального watermark. Это делается через установку знания max в параметре SQL-конфигурации spark.sql.streaming.multipleWatermarkPolicy. Это позволит глобальному водяному знаку двигаться со скоростью самого быстрого потока. Однако, побочным эффектом такого решения станет потеря данных из более медленных потоков [2].
Checkpoint’ы и commit’ы
Примечательно, что каждый 10-й файл в каталоге контрольных точек и commit-директория имеет расширение .compact. Компактный файл объединяет последние 9 файлов и сжимает все данные в один файл, чтобы избежать слишком большого числа небольших файлов, поскольку, как мы уже упоминали, задание Structured Streaming может выполняться годами и обрабатывать миллионы пакетов. По умолчанию Spark хранит только последние 100 файлов в каталоге контрольных точек (10 компактных на каждые 10 commit-файлов). Это можно настроить через следующие параметры [1]:
- sql.streaming.minBatchesToRetain (по умолчанию 100);
- sql.streaming.fileSink.log.compactInterval (по умолчанию 10);
- sql.streaming.fileSource.log.compactInterval (по умолчанию 10);
Обычно компактный файл в каталоге исходных контрольных точек имеет список всех файлов, обработанных заданием Spark Structured Streaming до сегодняшнего дня, а компактный файл в метаданных выходных коммитов будет содержать список файлов, созданных заданием до сегодняшнего дня. Непосредственно обработка контрольных точек для исходных данных в Apache Spark реализуется через класс MicroBatch:
- задание Spark Structured Streaming проверяет номер последнего завершенного пакета, увеличивает его на 1 и создает новый номер пакета для следующего задания;
- в новом задании перечислены файлы в исходном каталоге, упорядоченные по метке времени HDFS-файла;
- проверяются обработанные файлы из исходного каталога контрольных точек и отфильтровываются необработанные;
- из списка необработанных файлов вырезается пакет, определяемый максимальным пороговым значением пакета;
- на стороне источника создается файл с именем номера пакета, который будет обработан;
- после успешного завершения пакета создается commit-файл пакетной фиксации со всеми деталями выходного файла в каталоге _spark_metadata.
В случае ошибки, если исходная контрольная точка и директория фиксации вывода (_spark_metadata) имеют одинаковый идентификатор последнего пакета, задание начнется с вышеизложенных шагов. Если же задание уже создало пакет на стороне источника и сбой возник при его обработке, будут использованы уже созданные данные исходного пакета из файла его контрольных точек. Поскольку задания потоковой передачи весьма чувствительны к директории состояния, то при несоответствии между директориями, отказы будут возникать снова.
Таким образом, описанный механизм гарантирует, что последующие задания получают ровно одну копию данных, т.к. задание Spark Structured Streaming фиксирует весь пакет только после того, как все входные файлы будут обработаны. Если пакет завершился ошибкой после обработки частичных данных, в каталоге вывода задания будут некоторые файлы вывода для пакета, в котором произошел сбой, но не будет файла фиксации для пакета в каталоге spark_metadata. Во время чтения данных Spark проверяет расположение входных данных задания, чтобы задание Structured Streaming считывало данные из каталога spark_metadata, если он доступен в месте ввода. Благодаря этому генерируется только список входных файлов из _spark_metadata, исключая чтение частичных пакеты и гарантируя семантику строго однократной доставки для последующих заданий [1].
На практике оказывается, что при высокой отказоустойчивости и надежности комбинации checkpoints + commit-файлы для всей Big Data системы в целом, он чреват недостатками, связанными с интегральным увеличением объема данных [3]. Об этих проблемах и способах их решения мы поговорим завтра. А о том, как перезапустить задания потоковой передачи Спарк по меткам времени Apache Kafka, читайте в нашей новой статье.
Как на практике использовать эти и другие тонкости Apache Spark Structured Streaming для потоковой аналитики больших данных и разработки распределенных приложений, вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Потоковая обработка в Apache Spark
- Core Spark – Основы Apache Spark для разработчиков
- Анализ данных с Apache Spark
Источники
- https://medium.com/@Iqbalkhattra85/exactly-once-mechanism-in-spark-structured-streaming-7a27d8423560
- https://docs.databricks.com/spark/latest/structured-streaming/production.html
- https://medium.com/@Iqbalkhattra85/optimize-spark-structured-streaming-for-scale-d5dc5dee0622