Модульное тестирование Kafka-приложений

Apache Kafka для разработчиков и дата-инженеров примеры курсы обучение, тестирование Kafka-приложений, Kafka Streams курсы примеры обучение, обучение большим данным, Школа Больших Данных Учебный центр Коммерсант

Чтобы сделать наши курсы по Apache Kafka еще полезнее, сегодня разберем, как тестировать распределенные приложения на базе этой платформы потоковой обработки событий. Краткий ликбез для разработчика Kafka Streams и дата-инженера: классы, методы и приемы модульных тестов с примерами.

Ликбез по модульному тестированию: что такое mock-объекты

Про виды тестирования мы уже писали здесь. Напомним, именно модульное тестирование (Unit Testing) лежит в основе других видов проверки качества работы ПО, когда тестируются отдельные модули или компоненты, чтобы проверить, что каждый из них работает должным образом. Обычно модульное тестирование выполняется разработчиками, к примеру, в случае V-образной модели разработки это является основной концепцией самого процесса. Модульные тесты изолируют часть кода и проверяют его работоспособность. Модульное тестирование предшествует интеграционному и чаще всего проводится по принципу т.н. «белого ящика», поскольку известны не только входы и выходы проверяемого компонента, но и его внутреннее устройство. Отсутствие модульного тестирования при разработке кода значительно увеличивает уровень дефектов при дальнейшем продвижении по уровням тестовой пирамиды, т.е. интеграционном, системном, и приемочном тестировании. Поэтому, хотя на первый взгляд, unit-тесты и увеличивают срок разработки кода, они в итоге экономят время и деньги за счет раннего выявления дефектов.

тестирование ПО, основы тестирования ПО
Пирамида тестирования: виды тестов

В тестировании активно используются так называемые mock-объекты для симуляции поведения реальных объектов. Они заменяют реальные объект в условиях теста и позволяют проверять вызовы своих членов как часть системы в рамках модульного тестирования. Обычно mock-объекты содержат заранее запрограммированные ожидания вызовов, которые они ожидают получить, и применяются в основном для проверки поведения, т.е. интерактивного взаимодействия приложения с пользователем или внешней системой.

Mock-объект не просто возвращает предустановленные данные, но еще и записывает все вызовы, которые проходят через него, чтобы в unit-тесте проверить, что конкретные методы отдельных классов работают должным образом. Например, в модульном тестировании проверяется состояние объекта после прохождения unit-теста (state-based testing) и взаимодействие между объектами, поведение тестируемого метода, последовательность вызовов методов и их параметры и т.д. (interaction/behavioral testing).  А про подходы к организации юнит-тестирования Spark-приложений читайте в нашей новой статье.

Тестирование потребителя Apache Kafka

Для модульного тестирования простого потребителя Apache Kafka можно использовать MockConsumer, предоставляемый библиотекой org.apache.kafka:kafka-clients:X.X.X. Для этого при разработке приложения или рефакторинге необходимо сделать классы, использующие или обертывающие экземпляр KafkaConsumer, тестируемыми. Это означает, что Consumer<K, V> будет передан через вызов фабричного метода, непосредственно в конструкторе или внедрен как bean-компонент.

Чтобы создать экземпляр MockConsumer, нужно просто сопоставить тип ключа и значение записей, сообщив, с чего начать чтение:

val mockConsumer = MockConsumer<String, String>(OffsetResetStrategy.LATEST)

Как KafkaConsumer, так и MockConsumer реализуют интерфейс Consumer<K,V>, поэтому при передаче его в тестируемый класс, он будет взаимодействовать с ним как настоящий потребитель. Но здесь есть дополнительные методы, которые можно использовать для подготовки условий тестов:

  • addRecord() к mock-объекту, чтобы по существу подготовить записи для чтения перед началом теста;
  • schedulePollTask(), где можно передать любой Runnable для выполнения в последующей отправке;
  • setPollException() для проверки поведения потребителя при исключении.

Поскольку это предназначено только для модульного тестирования, здесь можно проверить, способен ли реальный потребитель:

  • десериализовать записи, которые он должен читать;
  • обрабатывать записи, таким образом, как это ожидается, включая фильтрацию;
  • обрабатывать ошибки при десериализации или обработке, а также при подключении к Kafka, включая подписку на топик, считывание сообщений и фиксацию смещаний.

При этом если потребитель манипулирует смещениями нестандартным способом, можно использовать один и тот же экземпляр между всеми модульными тестами, но с помощью методов updateBeginningOffsets() и updateEndOffsets().

Тестирование продюсера

Библиотека kafka-clients также включает класс MockProducer, который реализует тот же интерфейс Producer<K, V>, что и KafkaProducer. Таким образом, подобно тестированию потребителя, классы приложения должны быть разработаны так, чтобы пройти проверку как mock-объект.

Чтобы создать экземпляр MockProducer, следует сопоставить тип ключа и значение записей и «сказать» ему, следует ли автоматически успешно завершить запросы отправки (autocomplete=true) или вы хотите выполнить их явно, вызвав completeNext() или errorNext(). Например,

val mockProduer = MockProducer(true, StringSerializer(), StringSerializer())

Чтобы протестировать нормальный поток работы продюсера, нужно выстроить его работу, используя автозаполнение, а затем проверить, что было отправлено, используя метод history() в MockProducer. Это вернет список всех записей (ProducerRecord), отправленных с момента последнего вызова метода clear() на mock-объекте.

Чтобы проверит обработку исключений, можно установить для автозаполнения значение false и errorNext(), что вызовет любое исключение RuntimeException, которое случится при незавершенном вызове отправки.

Поскольку речь идет о модульном тестировании, можно проверить, способен ли продюсер:

  • сериализовать записи, которые ему нужны;
  • обрабатывать ошибки сериализации;
  • обрабатывать ошибки, связанные с подключением к Kafka, т.е. при вызове метода send();
  • правильно фильтровать записи, т.е. что количество фактически отправленных сообщений соответствует ожидаемому числу;
  • отправлять записи в ожидаемом формате, корректно обогащая или изменяя их формат.

 

Тестирование приложений Kafka Streams

Для Kafka Streams тестовые и рабочие классы разделены на отдельные библиотеки, поэтому нужно добавить зависимость org.apache.kafka:kafka-streams:X.X.X для использования потоков, а затем org.apache.kafka:kafka-streams-test- utils:X.X.X, чтобы использовать удобные тестовые классы.

Теперь вместо создания mock-объекта и передачи его тестируемому классу создается экземпляр TopologyTestDriver, куда передается топологию и свойства потокового приложения. Поэтому, чтобы сделать Kafka Streams-приложение пригодным для модульного тестирования, нужно создать свою топологию и передать ее тестовому драйверу:

val driver = TopologyTestDriver(myTopology, myProperties)

Когда есть экземпляр драйвера, следует явно создать все топики для вашей топологии, например:

val myInputTopic = driver.createInputTopic(
       inputTopicName,
       Serdes.String().serializer(), // key type
       Serdes.String().serializer()  // value type
)val myOutputTopic = driver.createOutputTopic(
       outputTopicName,
       Serdes.String().deserializer(), //  key type
       Serdes.String().deserializer()  //  value type
).... // create as many output topics as your topology hasval myDlqTopic = driver.createOutputTopic(
       dlqTopicName,
       Serdes.String().deserializer(), //  key type
       Serdes.String().deserializer()  //  value type
)

После настройки всех тестовых входных и выходных топиков (TestInputTopics и TestOutputTopics) можно приступать к тестированию:

myInputTopic.pipeInput(key, validValue)
...
assertTrue(myDlqTopic.isEmpty)
assertFalse(myOutputTopic.isEmpty)val actualRecord = myOutputTopic.readRecord()
assertEquals(expectedRecord, actualRecord, "Oh no, records don't match)

Можно также работать с несколькими входными значениями одновременно:

  • pipeValueList(List<V>) — если тестовый класс работает только со значениями;
  • pipeKeyValueList(List<KeyValue<K,V>> — если тестовый класс работает с ключами и значениями;
  • pipeRecordList(List<TestRecord<K,V>> — если тестовый класс использует только заголовки или временные метки;

Аналогично для вывода можно применять следующие методы:

  • readValuesToList() — если нужно проверить только значения вывода;
  • readKeyValuesToList() или readKeyValuesToMap() — если нужно проверить только ключ и значение вывода;
  • readRecordsToList() — если внужно проверить заголовки и временные метки вывода.

Эти модульные тесты проверяют, что приложение Kafka Streams может:

  • сериализовать и десериализовать необходимые записи;
  • обрабатывать исключения должным образом;
  • обрабатывать записи как положено, т.е. формат вывода совпадает с ожидаемым, а сам поток данных воспринимается как черный ящик с известными входами и выходами, но скрытым содержимым;
  • выполнять любую фильтрацию, как и ожидалось, т.е. фактическое количество выходных записей соответствует ожидаемому, независимо от того, сколько их было введено.

Потоковый процессор

При использовании потокового обработчика (процессора) пригодится MockProcessorContext для инициализации реализации этого интерфейса. С его помощью вы можете проверить, пересылаются ли записи в нужную тему и фиксируются ли смещения.

val mockContext = MockProcessorContext<String, String>()
val processor = MyProcessor() // implementing Processor
processor.init(mockContext)...processor.process(record)
val forwardedRecords = mockContext.forwareded()
assertEquals(1, forwardedRecords.size)
assertEquals(expectedRecord, forwardedRecords.map{it.record()}.first())
assertEqual(expectedTopic, forwardedRecords[0].childName().get())// if you have scheduled commit (or other action) manipulate time bymockContext.scheduledPunctuators()[0].punctuator.punctuate(time)// check if scheduled task is doneassertTrue(mockContext.committed())

Читайте в нашей следующей статье про интеграционное тестирование Kafka-приложений.

Узнайте больше про тестирование и разработку распределенных приложений с Apache Kafka для потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков больших данных в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://medium.com/codex/testing-kafka-applications-libraries-for-unit-and-integration-tests-732c0f13b915
  2. https://logrocon.ru/news/unit_testing
  3. https://gist.github.com/vertigra/696e9d92dc72070584e556e2169e850d
Поиск по сайту