Чтобы добавить в наши курсы для дата-инженеров и разработчиков распределенных приложений еще больше практических примеров, сегодня рассмотрим, как написать Python-код для вычисления задержки потребителя Apache Kafka, расширив типовой слушатель StreamingQueryListener, который есть в Java и Scala API библиотеки Spark Structured Streaming, но недоступен в PySpark.
Проблема отставания потребителя Apache Kafka в Spark Structured Streaming
Одной из важных метрик в потоковой передаче событий является задержка потребителя, которая представляет собой количество сообщений, которые не были обработаны для каждого раздела Kafka. Задержка потребителя — важная метрика, поскольку она показывает, насколько далеко конвейер отстает в обработке входящих сообщений. Рассмотрим конвейер на Apache Spark Structured Streaming, который использует один или несколько топиков Kafka, выполняя преобразование данных, а затем создает сообщения в топике приемника.
В этой среде есть две проблемы, связанные с получением отставания потребителя:
- потоковый конвейер не фиксирует смещение Kafka, а полагается на файлы контрольных точек, чтобы возобновить потребление из топика;
- невозможно получить объект потребителя Kafka, используемый внутри Spark и последнее смещение Kafka, используемое в настоящее время потребителем.
Поэтому нужен способ вычисления отставания потребителя. API потоковой передачи Spark предоставляет интерфейс слушателя StreamingQueryListener, который обеспечивает обратный вызов кода приложения в течение жизненного цикла обработки микропакета. Что такое слушатели в Apache Spark, мы рассказывали здесь и здесь. С помощью этого интерфейса можно получить последнее обработанное смещение для каждого раздела каждой темы после обработки микропакета. Этот интерфейс имеет 3 API:
- onQueryProgress, который вызывается при обработке микропакета потокового запроса;
- onQueryStarted, который вызывается при запуске потокового запроса;
- onQueryTerminated, который вызывается, когда потоковый запрос завершается.
API onQueryProgress вызывается всякий раз, когда обрабатывается микропакет потокового запроса. При вызове onQueryProgress предоставляется объект QueryProgressEvent, который включает информацию о последнем зафиксированном смещении для каждого раздела топика Kafka, обработанного микропакетом, на который делается ссылка. Однако, интерфейс onQueryProgress доступен только в Spark JVM, т.е. нужно писать код на Scala/Java, что сложнее Python. Чтобы воспользоваться этим интерфейсом в PySpark, нужен способ предоставить интерфейс для Python. Этого можно сделать с помощью паттерна программирования «Наблюдатель» и шлюза Py4J.
Библиотека Py4J позволяет Python-программам динамически обращаться к объектам Java внутри JVM, вызывая их методы так, будто они находятся в интерпретаторе Python. А доступ к коллекциям Java можно получить с помощью стандартных методов коллекций Python. Библиотека Py4J также позволяет программам Java вызывать объекты Python, создавать из Python классические коллекции Java (Array, List, Set и Map), конвертировать Python-коллекции в Java коллекции. Также Py4J дает возможность из Python реализовывать интерфейсы Java.
В отличие от Jython, Py4J не выполняет код Python в JVM, поэтому с Py4J можно работать со всеми библиотеками классического Cython. В отличие от JPype, Py4J не связывает потоки Python и Java, а использует сокеты, вместо JNI для связи с JVM. Основным недостатком Py4J считается скорость передачи данных между Python и Java из-за медленного взаимодействия через сокеты. Поэтому не рекомендуется выбирать Py4J для работы с данными больше несколько мегабайт. Тем не менее, Py4J даже в этих случаях выполняет свою главную задачу – предоставить доступ к объектам Java, сохранив возможность использования Python-библиотек. Чтобы использовать это для решения проблем с получением отставания потребителя Kafka в PySpark Structured Streaming, необходимо выполнить целый набор шагов, которые мы рассмотрим далее.
7 шагов для использования Py4J
Сперва следует реализовать интерфейс наблюдателя на Java, который отражает интерфейс StreamingQueryListener. Конкретная реализация этого интерфейса будет предоставлена конвейером PySpark и будет действовать как прокси-сервер между PySpark и Spark JVM.
public interface PythonObserver { void onQueryProgress(Object event); void onQueryStarted(Object event); void onQueryTerminated(Object event); }
Затем надо реализовать конкретный класс StreamingQueryListener в Java. Этот класс StreamingQueryListener примет объект PythonObserver и делегирует ему все события.
public class PythonStreamingQueryListener extends StreamingQueryListener { private final PythonObserver observer; public PythonStreamingQueryListener(PythonObserver observer) { this.observer = observer; } @Override public void onQueryProgress(QueryProgressEvent event) { observer.onQueryProgress(event); } @Override public void onQueryStarted(QueryStartedEvent event) { observer.onQueryStarted(event); } @Override public void onQueryTerminated(QueryTerminatedEvent event) { observer.onQueryTerminated(event); } }
Как только эти два класса станут доступны для PySpark, нужно предоставить конкретную реализацию PythonObserver и передать ее в качестве аргумента PythonStreamingListener в PySpark. Как только эти две части соединены, для каждого завершения микропакета QueryProgressEvent будет пересылаться в PythonObserver, который может использовать предоставленную информацию в QueryProgressEvent для вычисления задержек потребителя.
Далее следует реализовать PythonObserver в PySpark, а затем зарегистрировать его как наблюдателя в PythonStreamingQueryListener:
class StreamingObserver(Subject): class Java: implements = ["com.intuit.data.strmprocess.spark.observer.PythonObserver"] def __init__(self, config: ConsumerLagConfig, processedOffsetCallback = None): self.config = config self.processedOffsetCallback = processedOffsetCallback def onQueryProgress(self, queryProgressEvent): try: processedOffset = self._getSourceLastProcessedOffset(queryProgressEvent) if self.processedOffsetCallback: self.processedOffsetCallback(processedOffset) except Exception as e: error(StreamingObserver._LOGGER, str(e)) raise e def _getSourceLastProcessedOffset(self, queryProgressEvent) -> Dict[str, Dict[str, str]]: sources = queryProgressEvent.progress().sources() sourceOffset = {} for source in sources: endOffset = json.loads(source.endOffset()) sourceOffset.update(endOffset) return sourceOffset def onQueryStarted(self, queryStartedEvent): ... def onQueryTerminated(self, queryTerminatedEvent): ...
Поскольку PythonObserver является классом Java, нужен шлюз-модуль Python, который поможет установить канал связи между кодом Python и Java. В Py4J таким компонентом является модуль случае java_gateway.py, который запускает процесс шлюза для установления канала связи с Py4JServer. Сперва он определяет этот класс как реализацию Java-интерфейса PythonObserver, определенный ранее:
class Java: implements = ["com.intuit.data.strmprocess.spark.observer.PythonObserver"]
Конструктор этого класса, __init__, принимает 2 аргумента: объект ConsumerLagConfig и функцию обратного вызова, которая используется позже, чтобы вытолкнуть зафиксированные смещения в дальнейшей обработке для вычисления задержки потребителя Kafka.
Apache Kafka для инженеров данных
Код курса
DEVKI
Ближайшая дата курса
20 января, 2025
Продолжительность
24 ак.часов
Стоимость обучения
72 000 руб.
Остальная часть определения класса определяет реализацию трех API из интерфейса PythonObserver. Основная часть реализации этого класса находится в API onQueryProcessed. Объект queryProgressEvent содержит последнее обработанное смещение последнего обработанного микропакета в API-интерфейсе endOffset, который возвращает строку JSON для каждого исходного топика. Каждая строка JSON содержит последнее обработанное смещение для каждого раздела. В рассматриваемой реализации анализируется строка JSON и создается словарь последнего обработанного смещения с ключом топика и раздела.
После того, как PythonObserver определен, нужно передать его в PythonStreamingQueryListener:
def addListener(self, listener): jvm = SparkContext._active_spark_context._jvm jlistener = jvm.com.intuit.data.strmprocess.spark.observer.PythonStreamingQueryListener( listener ) self._jsqm.addListener(jlistener) return jlistener
В этом определении функции предоставленный слушатель является экземпляром класса StreamingObserver, определенного ранее. Экземпляр PythonStreamingQueryListener создается путем предоставления экземпляра StreamingObserver в качестве аргумента. Оставшийся код просто регистрирует объект PythonStreamingQueryListener в качестве слушателя SparkContext.
class KafkaOffsetManager: def __init__(self, consumerLagConfig: ConsumerLagConfig): self.topics = consumerLagConfig.getTopics() self.consumerLagConfig = consumerLagConfig self.consumer = self.getKafkaConsumer() self.topicPartitions = {} for topic in self.topics: partitions = [TopicPartition(topic, p) for p in self.consumer.partitions_for_topic(topic)] self.topicPartitions[topic] = partitions def getLatestPartitionOffset(self) -> Dict[str, Dict[str, str]]: lastOffsets = {} for topic in self.topics: partitions = self.topicPartitions[topic] lastOffsetPerPartition = self.consumer.end_offsets(partitions) lastOffsets[topic] = lastOffsetPerPartition return lastOffsets def getKafkaConsumer(self): return KafkaConsumer(*self.topics, bootstrap_servers=self.consumerLagConfig.getBootstrapServer(), security_protocol=self.consumerLagConfig.getSecurityProtocol())
Как только получено последнее обработанное смещение микропакета, нужно получить текущее последнее смещение исходного топика. Разница между ними и является отставанием потребителя Kafka.
class KafkaOffsetManager: def __init__(self, consumerLagConfig: ConsumerLagConfig): self.topics = consumerLagConfig.getTopics() self.consumerLagConfig = consumerLagConfig self.consumer = self.getKafkaConsumer() self.topicPartitions = {} for topic in self.topics: partitions = [TopicPartition(topic, p) for p in self.consumer.partitions_for_topic(topic)] self.topicPartitions[topic] = partitions def getLatestPartitionOffset(self) -> Dict[str, Dict[str, str]]: lastOffsets = {} for topic in self.topics: partitions = self.topicPartitions[topic] lastOffsetPerPartition = self.consumer.end_offsets(partitions) lastOffsets[topic] = lastOffsetPerPartition return lastOffsets def getKafkaConsumer(self): return KafkaConsumer(*self.topics, bootstrap_servers=self.consumerLagConfig.getBootstrapServer(), security_protocol=self.consumerLagConfig.getSecurityProtocol())
В конструкторе __init__ создается объект KafkaConsumer в конструкторе и выполняется получение всех разделов топика с помощью API partitions_for_topic, предоставленного KafkaConsumer. API getLatestPartitionOffset получает последнее смещение для каждого раздела через API end_offsets(). Также возращается словарь смещений с ключом топика и раздела.
Поскольку создание объекта KafkaConsumer довольно сложная и дорогостоящая операция, рекомендуется создавать его в конструкторе и повторно использовать каждый раз при вызове getLatestPartitionOffset. Иначе сильно возрастут накладные расходы, что приведет к более высокому значению смещения и создаст ложное впечатление о высокой задержке потребителя.
Имея последнее обработанное смещение и последнее смещение для каждого раздела топика, можно вычислить разницу, чтобы получить задержку потребителя. Это возможно с помощью класса ConsumerLagManager:
class ConsumerLagManager: def __init__(self, consumerLagConfig: ConsumerLagConfig, consumerLagCallback: None): if consumerLagCallback: self.consumerLagCallback = consumerLagCallback self.consumerLagConfig = consumerLagConfig self.kafkaOffsetManager = KafkaOffsetManager(consumerLagConfig) if (consumerLagConfig.getConsumerLagStrategy() == ConsumerLagStrategy.PASSIVE): PassiveConsumerLagManager(consumerLagConfig, self.computeConsumerLag) else: ProactiveConsumerLagManager(consumerLagConfig, self.computeConsumerLag) def computeConsumerLag(self, latestProcessedOffset: Dict[str, Dict[str, str]]): latestOffset = self.kafkaOffsetManager.getLatestPartitionOffset() offsetLag = self.__computeOffsetLag(latestOffset, latestProcessedOffset) if self.consumerLagCallback: self.consumerLagCallback(offsetLag) def __computeOffsetLag(self, latestOffset: Dict[str, Dict[str, str]], latestProcessedOffset: Dict[str, Dict[str, str]]): offsetLagsForAllTopics = {} for topic in self.consumerLagConfig.getTopics(): offsetLagForTopic = {} processedPartitionsOffset = latestProcessedOffset[topic] latestPartitionsOffset = latestOffset[topic] for partition in processedPartitionsOffset: commitOffsetForPartition = processedPartitionsOffset[partition] topicP = TopicPartition(topic, int(partition)) latestPartitionOffset = latestPartitionsOffset[topicP] offsetLagForTopic[partition] = latestPartitionOffset - commitOffsetForPartition offsetLagsForAllTopics[topic] = offsetLagForTopic return offsetLagsForAllTopics
Конструктор __init__ принимает объект ConsumerLagConfig и функцию обратного вызова ConsumerLagCallback. А ConsumerLagConfig — это простая оболочка для различных параметров конфигурации:
class ConsumerLagConfig: def __init__(self, consumerLagStrategy: ConsumerLagStrategy, checkpointLocation: str, topics: str, bootstrapServer: str, securityProtocol: str): self.__topics = topics self.__checkpointLocation = checkpointLocation self.__bootstrapServer = bootstrapServer self.__consumerLagStrategy = consumerLagStrategy self.__securityProtocol = securityProtocol def getTopics(self) -> Tuple[str]: topics = [] if self.__topics is not None: topics.extend(self.__topics.split(",")) return topics def getCheckpointLocation(self) -> str: return self.__checkpointLocation def getBootstrapServer(self) -> str: return self.__bootstrapServer def getConsumerLagStrategy(self) -> str: return self.__consumerLagStrategy def getSecurityProtocol(self) -> str: return self.__securityProtocol def getCheckpointReader(self) -> CheckpointReader: if self.__checkpointLocation.startswith("s3"): return S3CheckpointReader() else: return LocalCheckpointReader()
Функция обратного вызова используется для обработки вычисленной задержки потребителя. Вызывающий объект может передать функцию для вывода запаздывания потребителя на консоль или передать задержку потребителя на любую платформу мониторинга метрик. Определить, какой механизм вычисления задержки потребителя использовать поможет следующий код:
if (consumerLagConfig.getConsumerLagStrategy() == ConsumerLagStrategy.PASSIVE): PassiveConsumerLagManager(consumerLagConfig, self.computeConsumerLag) else: ProactiveConsumerLagManager(consumerLagConfig, self.computeConsumerLag)
Чтобы использовать рассмотренный в этой статье механизм, нужно передать PASSIVE ConsumerLagStrategy, отражая, что он пассивно уведомляется обработчиком Spark. А стратегия PROACTIVE проактивно просматривает последний файл контрольной точки, чтобы получить последнее обработанное смещение. Функция calculateConsumerLag используется в качестве функции обратного вызова, переданной в StreamingObserver. При вызове этой функции возвращается последнее обработанное смещение недавно завершенного микропакета. KafkaOffsetManager вызывается для получения последнего смещения разделов темы. Затем он вызывает внутреннюю вспомогательную функцию __computeOffsetLag для вычисления разницы между последним смещением и последним обработанным смещением. Различия возвращаются путем вызова функции обратного вызова ConsumerLagCallback.
Наконец, нужно будет зарегистрировать реализацию StreamingQueryListener с помощью среды выполнения Spark. Проще всего сделать это, добавив файл jar в параметр конфигурации spark.driver.extraClassPath.
В заключение отметим, что рассмотренный механизм использует преимущества существующего в Spark слушателя жизненного цикла микропакетов StreamingQueryListener для уведомления, чтобы вычислить задержку потребителя. Однако, поскольку StreamingQueryListener недоступен в PySpark, необходимо расширить его, чтобы передать уведомления от JVM к PySpark. StreamingQueryListener применим только для конвейеров микропакетной потоковой передачи, а PySpark еще предоставляет модель непрерывной обработки, которая не основана на микропакетах и потребует другого механизма для вычисления задержки потребителя, который мы рассмотрим в следующий раз.
Освойте использование Apache Spark и Kafka для потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Apache Kafka для инженеров данных
- Основы Apache Spark для разработчиков
- Анализ данных с Apache Spark
- Потоковая обработка в Apache Spark
Источники