Совместное использование Apache Kafka и Spark очень часто встречается в потоковой аналитике больших данных, например, в прогнозировании пользовательского поведения, о чем мы рассказывали вчера. Однако, временные метки (timestamp) в приложении Spark Structured Streaming могут отличаться от времени события в топике Kafka. Читайте далее, почему это случается и какие подходы к обработке Big Data применять в этом случае.
Как течет время в потоковой передаче или еще раз об интеграции Apache Kafka и Spark
Итак, рассмотрим сценарий, когда приложение Spark Structured Streaming считывает данные из топиков Apache Kafka, и происходит один из следующих случаев [1]:
- изменяется конфигурация заданий источника потоковой передачи, например, параметры maxOffsetsPerTrigger, Trigger time, Watermark и пр.;
- приложение обновилось и нужно отменить предыдущие состояния;
- обнаружена ошибка в коде и требуется повторно запустить задание с определенной отметки времени;
- в один из топиков попали поврежденные или смешанные данные;
- нужно запустить задание Spark Structured Streaming с определенной отметки времени.
Во всех этих случаях нельзя просто перезапустить текущее или запустить новое задание Spark Structured Streaming с существующей контрольной точки (checkpoint) – понадобится создать новое место для хранения информации для восстановления после сбоев. Поэтому следует обеспечить плавное и последовательное хранение смещений Kafka по определенным меткам времени (timestamp).
Напомним, в Apache Kafka события хранятся в топиках (topic), каждый из которых разбит на разделы (partition). Любая запись в разделе имеет смещение (offset), определяющее порядок внутри раздела. В версиях Spark 2.х задание Structured Streaming полагается на контрольную точку для реализации строго однократной доставки (exactly once) сообщений в случае микропакетной передачи или хотя бы однократной для непрерывной обработки данных. Об особенностях реализации строго однократной доставки в Spark Structured Streaming мы недавно рассказывали здесь.
Каждый запрос-триггер Spark Structured Streaming будет сохранять смещения в каталог смещения в местоположении контрольной точки, определенном в параметре checkpointLocation или spark.sql.streaming.checkpointLocation. Драйвер StreamExecution проверяет и вычисляет, какие смещения уже были обработаны, и потребляет новые записи по этим смещениям и другим конфигурациям, например, maxOffsetsPerTrigger.
3 способа перезапуска заданий Spark Structured Streaming
Таким образом, обновить и/или перезапустить задание Spark Structured Streaming с новой метки времени можно следующими способами [1]:
- считать последнее смещение контрольной точки из HDFS с помощью команды Hdfs dfs -ls /checkpointLocation/offsets и скопировать его в параметр скопируйте смещения в параметр startOffsets структуры readStream;
- получить данные о минимальном смещении, отметка времени которого больше, чем нужный timestamp, используя метод consumer.offsets_for_times() библиотеки Kafka-Python, о которой мы рассказывали здесь. При этом идет перебор топиков Apache Kafka с учетом нужной временной метки, которая устанавливается для каждого раздела, после чего вызывается метод offsets_for_times() для получения смещений. Конечный результат выводится в JSON-структуре.
- Наконец, можно использовать встроенную реализацию startOffsetsByTimestamp в Apache Spark 3.0, которая устанавливает временную метку в миллисекундах для каждого раздела TopicPartition. О других нововведениях крупного релиза этого Big Data фреймворка читайте в этом материале. Этот способ имеет некоторые особенности применения, о которых мы поговорим далее.
Особенности конфигурации startOffsetsByTimestamp в Apache Spark 3.0
Параметр startOffsetsByTimestamp является опциональным и описывает начальную точку отметки времени при запуске запроса в виде JSON-строки для каждого раздела топика Kafka (TopicPartition). Возвращаемое смещение для каждого раздела – это самое раннее смещение, метка времени которого больше или равна заданной метке времени в соответствующем разделе. Если совпадающее смещение не существует, запрос немедленно завершится ошибкой, чтобы предотвратить непреднамеренное чтение из такого раздела. Это своего рода ограничение, которое планируется устранить в будущих релизах Apache Spark, который просто передает информацию о временной метке в KafkaConsumer.offsetsForTimes и никак не интерпретирует это значение.
Значение метки времени может варьироваться в зависимости от конфигурации Kafka log.message.timestamp.type. При использовании startOffsetsByTimestamp нужно помнить о следующих особенностях этого параметра [2]:
- он поддерживается версией Kafka 0.10.1.0 или выше;
- startOffsetsByTimestamp имеет приоритет над startOffsets, который означает начальную точку при запуске запроса или самую последнюю, определяющую начальное смещение для каждого раздела TopicPartition.
- для потоковых запросов этот параметр применим только при запуске нового запроса, и возобновление всегда будет начинаться с того места, где запрос был остановлен. Новые обнаруженные во время запроса разделы будут запущены раньше.
О примерах совместного использования Apache Kafka и Spark Structured Streaming читайте здесь. А еще больше практических кейсов и лучших практик разработки распределенных приложений для потоковой аналитики больших данных с Apache Spark и Kafka вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Apache Kafka для разработчиков
- Потоковая обработка в Apache Spark
- Построение конвейеров обработки данных с Apache Airflow и Arenadata Hadoop
Источники
- https://medium.com/@ZeevFeldbeine/how-to-start-spark-structured-streaming-by-a-specific-kafka-timestamp-e4b0a3e9c009
- https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html