Сегодня рассмотрим важную тему для обучения дата-инженеров и разработчиков распределенных Spark-приложений. Как устроена потоковая обработка данных в Apache Spark Structured Streaming, зачем нужны водяные знаки и с какими сложностями при этом можно столкнуться.
Как работают водяные знаки в потоковой передача событий Apache Spark
Библиотека потоковой обработки событий Structured Streaming основана на механизме Spark SQL, который обрабатывает потоковые данные постепенно и непрерывно по мере их поступления. Поскольку данные могут приходить из источника с опозданием, возникает задержка между исходным временем происхождения события в реальном мире и его поступлением в обработку. Для этого в Structured Streaming есть механизм водяных знаков (watermark), который позволяет поддерживать состояние поступающих данных, сохранять их в памяти и точно обновлять, объединяя их с данными, поступившими с опозданием. Чтобы выполнять этот запрос в течение нескольких дней, системе должна быть известна информация о накоплении состояния в памяти. Это связано с тем, что приложение должно знать, когда оно перестанет получать просроченные данные, чтобы упростить сбор данных. Проще говоря, поздние данные в пределах порога будут агрегированы, но данные после порога начнут отбрасываться. Внутри Spark Structured Streaming это обрабатывается следующим образом: водяной знак для каждого микропакета вычисляется в конце предыдущего пакета. Это означает, что водяной знак рассчитывается для каждого микропакета еще до начала выполнения. В случае нескольких потоков Spark отслеживает самый высокий водяной знак среди них. Сначала рассчитываются индивидуальные водяные знаки, а минимальное значение выбирается позже в качестве глобального водяного знака, который используется для исключения событий.
Существуют некоторые ограничения на использование водяных знаков, которые должны быть соблюдены:
- Spark поддерживает несколько режимов вывода, о которых мы писали здесь. Из них только 2, append (добавить) и update (обновить) поддерживаются для водяных знаков;
- функцию withWatermark() можно вызывать только для того же столбца, который используется в агрегате. Этот метод надо вызывать перед агрегацией, чтобы использовать детали водяного знака. Поэтому, например, withWatermark(«time», «1 min»).groupBy(«time2»).count() не будет действовать в режиме вывода с добавлением, т.к. водяной знак не определен в столбце агрегации. Аналогично, df.groupBy(«time»).count().withWatermark(«time», «1 min») тоже не будет действовать в режиме вывода append.
Помимо этих ограничений, также разработчики Spark-приложения стоит помнить о других особенностях Structured Streaming, связанных с версиями этого вычислительного движка. В частности, Apache Spark 2.3.2 не поддерживает стандартный сбор и визуализацию метрик. Поэтому придется прикреплять к приложению класс слушателя, который извлекает статистику из каждого микропакета и сохраняет ее, например, в формате JSON в HDFS. Про слушатели в Apache Spark мы писали в этой статье. Такие слушатели позволяют следить за работоспособностью потокового приложения, например, сколько времени заняло выполнение какого-то триггера, т. е. получение смещений, обработка данных и фиксация WAL, сколько времени заняло получение смещений для новых данных для обработки каждого из определенных источников и фиксации новых доступных смещений.
В Apache Spark 3.0 использовать такие слушатели нет необходимости, т.к. все эти системные метрики отображаются в пользовательском веб-GUI на вкладке «Потоковая передача» в режиме реального времени. Но, помимо отсутствия этой визуализации, в более ранних версиях Apache Spark есть еще пара проблем с потоковой передачей. Например, отсутствие поддержки полных внешних соединений (full outer join) и проблема согласованности данных в left join.
Кроме того, в Apache Spark 2.3 можно столкнуться с проблемой замедления больших заданий, которая обусловлена сжатием метаданных. Как ее решить, рассмотрим далее.
Контрольные точки и сжатие метаданных
Контрольные точки — это способ достижения семантики строго однократной доставки сообщений (exactly once) в Apache Spark. Потоковое задание сохраняет информацию о входных и выходных пакетах в контрольной точке и фиксирует файлы для каждого микропакета. Поскольку потоковые задания выполняются круглосуточно и непрерывно, это приводит к обработке большого объема данных. Размер файлов со временем увеличивается и может достигать 10 ГБ. В сжатых файлах хранятся сведения обо всех файлах, обработанных и созданных заданием потоковой передачи с момента его начала.
Сжатие файла фиксации (_Spark_Metadata) выполняется в драйвере для каждых 10 пакетов (начиная с 0). Это единый монолитный процесс, который занимает до 10–15 минут в зависимости от размера файла сжатия. Он резко снижает производительность задания Apache Spark Structured Streaming, поскольку, пока драйвер работает над сжатием файла, все рабочие узлы ничего не делают, и задание тратит ресурсы впустую, оставаясь полностью бездействующим. Определить интервал уплотнения можно через конфигурацию spark.sql.streaming.fileSink.log.compactInterval.
По умолчанию это значение равно 10, т.е. после каждых 10 микропакетов происходит сжатие файла фиксации. Он добавляет дельты последних 9 пакетов к предыдущему файлу .compact и создает новый файл. Эти компактные файлы имеют формат JSON, а также отслеживают метки времени для файлов с другой информацией. Если объем компактного файла слишком большой и изменение интервала уплотнения не помогает, дата-инженер может написать код для очистки метаданных из каталога _Spark_Metadata. Для этого можно использовать следующий подход:
- удалить задание Spark, чтобы прочитать последний JSON-файл .comapct для _Spark_Metadata;
- отфильтровать все записи для старых файлов по сроку хранения;
- сохранить старый компактный файл для резервного копирования;
- переместить новый измельченный компактный файл в каталог _Spark_Metadata, с сохранением его структуры и имени;
- проверить, что задание потоковой передачи Spark не запущено, пока шло обновление компактного файла метаданных.
Приведенный ниже код извлекает последний файл .compact из каталога _Spark_Metadata, очищает метаданные на основе управляемого конфигурацией параметра RetentionPeriod и перезаписывает cброшенный файл .compact. Можно запускать это приложение примерно раз в месяц, если время запуска триггера 60 секунд и более. Если время триггера меньше 60 секунд, а Spark-приложение обрабатывает огромный объем, можно выполнять эту очистку один раз в неделю. Можно добавить этот код в начало потокового приложения, чтобы при его перезапуске сначала выполнялась очистка файлов сжатия.
import org.apache.hadoop.fs._ import org.apache.spark.sql._ import org.apache.spark.sql.execution.streaming._ import org.json4s._ import org.json4s.jackson.JsonMethods._ object MetadataRemover extends App { val spark = SparkSession .builder .master(ApplicationConfig.sparkMaster) .appName(ApplicationConfig.sparkAppName) .getOrCreate() spark.sparkContext.setLogLevel("ERROR") val sc = spark.sparkContext /** * regex to find last compact file */ val file_pattern = """(hdfs://.*/_spark_metadata/\d+\.compact)""".r.unanchored val fs = FileSystem.get(sc.hadoopConfiguration) /** * implicit hadoop RemoteIterator convertor */ implicit def convertToScalaIterator[T](underlying: RemoteIterator[T]): Iterator[T] = { case class wrapper(underlying: RemoteIterator[T]) extends Iterator[T] { override def hasNext: Boolean = underlying.hasNext override def next: T = underlying.next } wrapper(underlying) } /** * delete file or folder recursively */ def removePath(dstPath: String, fs: FileSystem): Unit = { val path = new Path(dstPath) if (fs.exists(path)) { println(s"deleting ${dstPath}...") fs.delete(path, true) } } /** * remove json entries older than `days` from compact file * preserve `v1` at the head of the file * re write the small file back to the original destination */ def compact(file: Path, days: Int) = { val ttl = new java.util.Date().getTime - java.util.concurrent.TimeUnit.DAYS.toMillis(days) val compacted_file = s"/tmp/${file.getName.toString}" // If this path already exists : then it will be removed removePath(compacted_file, fs) val lines = sc.textFile(file.toString) val reduced_lines = lines.mapPartitions({ p => implicit val formats = DefaultFormats p.collect({ case "v1" => "v1" case x if { parse(x).extract[SinkFileStatus].modificationTime > ttl } => x }) }).coalesce(1) println(s"removing ${lines.count - reduced_lines.count} lines from ${file.toString}...") reduced_lines.saveAsTextFile(compacted_file) FileUtil.copy(fs, new Path(compacted_file + "/part-00000"), fs, file, false, sc.hadoopConfiguration) removePath(compacted_file, fs) } /** * get last compacted files if exists: */ def getLastCompactFile(path: Path) = { fs.listFiles(path, true).toList.sortBy(_.getModificationTime).reverse.collectFirst({ case x if (file_pattern.findFirstMatchIn(x.getPath.toString).isDefined) => x.getPath }) } val landingDir = ApplicationConfig.landingPath val retentionPeriod = ApplicationConfig.retentionPeriod val metadataDir = new Path(s"$landingDir/_spark_metadata") getLastCompactFile(metadataDir).map(x => compact(x, retentionPeriod)) }
Анализ данных с помощью современного Apache Spark
Код курса
SPARK
Ближайшая дата курса
7 октября, 2024
Продолжительность
32 ак.часов
Стоимость обучения
96 000 руб.
Освойте администрирование и использование Apache Spark для задач дата-инженерии, разработки распределенных приложений и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Основы Apache Spark для разработчиков
- Анализ данных с Apache Spark
- Потоковая обработка в Apache Spark
- Машинное обучение в Apache Spark
- Графовые алгоритмы в Apache Spark