Модульное тестирование в Apache NiFi

разработка и тестирование NiFi, процессоры Apache NiFi, курсы Apache NiFi дата-инженер обучение администратор, обучение Apache NiFi, Apache NiFi для инженеров данных и администраторов, инженерия больших данных курсы обучение, курсы дата-инженеров и администраторов NiFi, Школа Больших Данных Учебный центр Коммерсант

Как тестировать пользовательские процессоры и службы контроллера Apache NiFi: знакомимся с методами интерфейса TestRunner в модуле nifi-mock. Как создать тестовый объект, настроить его и проверить валидность работы собственного компонента Apache NiFi.

Тестирование компонентов Apache NiFi: создание тестовых объектов и их настройка

Будучи разработанным на Java, Apache NiFi позволяет использовать возможности этого языка для разработки собственных компонентов. В частности, здесь мы рассказывали, как написать свой процессор. Однако, разработка ПО предполагает обязательный этап тестирования. Для этого в Apache NiFi есть модуль nifi-mock, который можно использовать вместе с платформой JUnit для проведения обширного тестирования компонентов.

Напомним, для модульного тестирования Java-программ часто используется фреймворк JUnit, который определяет API тестового движка (TestEngineAPI) для разработки среды тестирования. Также эта платформа предоставляет средство запуска консоли для запуска платформы из командной строки и механизм JUnit Platform Suite Engine для запуска специального набора тестов с использованием одного или нескольких механизмов тестирования на платформе. JUnit поддерживается многими популярными IDE, включая IntelliJ IDEA, Eclipse, NetBeans и Visual Studio Code, а также инструментами сборки (Gradle , Maven и Visual Studio Code).

Хотя Mock Framework, главным образом, нацелен на тестирование процессоров, он также позволяет тестировать службы контроллера путем создания функциональных тестов для проверки поведения компонента. При том, что процессор NiFi состоит из нескольких вспомогательных методов, его основная логика заключена в методе onTrigger().

Mock-объект представляет собой конкретную фиктивную реализацию интерфейса, предназначенную исключительно для тестирования взаимодействия и относительно которого высказывается проверяемое утверждение. В Apache NiFi интерфейс TestRunner дает возможность тестировать процессоры и службы контроллеров путем преобразования более простых объектов, таких как файлы и байтовые массивы, в файлы потока FlowFiles. Также этот интерфейс обрабатывает создание ProcessSessions и ProcessContexts, необходимых процессору для его работы, и вызов необходимых методов жизненного цикла. Таким образом, тестирование позволяет гарантировать, что процессор будет вести себя в рабочей среде также, как и в модульных тестах.

Большинство модульных тестов для процессора или службы контроллера начинаются с создания экземпляра класса TestRunner. Чтобы добавить необходимые классы в пользовательский процессор, можно использовать зависимость Maven:

<dependency>
  <groupId>org.apache.nifi</groupId>
  <artifactId>nifi-mock</artifactId>
  <version>${nifi version}</version>
</dependency>

После этого можно создать новый объект TestRunner, вызвав функцию newTestRunner() —  один из статических  методов класса TestRunners, который находится в пакете org.apache.nifi.util. Эти методы принимают аргумент для тестируемого процессора, который может быть классом тестируемого процессора или его экземпляром, а также позволяют устанавливать имя процессора.

После создания нового средства выполнения тестов можно добавить в него любые службы контроллера, которые потребуются процессору для работы. Сделать это можно, вызвав метод addControllerService(), в котором нужно передать идентификатор службы контроллера и ее экземпляр.

Если необходимо настроить службу контроллера, ее свойства можно задать, вызвав метод setProperty(ControllerService, PropertyDescriptor, String)setProperty(ControllerService, String, String) или setProperty(ControllerService, PropertyDescriptor, AllowableValue). Каждый из этих методов возвращает файл ValidationResult, который можно проверить, чтобы убедиться в допустимости свойства, вызвав метод isValid(). Данные аннотации можно установить, вызвав метод setAnnotationData(ControllerService, String). После этого можно убедиться, что служба контроллера валидна или нет, вызвав ее метод assertValid(ControllerService) или assertNotValid(ControllerService).

После того как служба контроллера была добавлена ​​в объект TestRunner и настроена, ее теперь можно включить, вызвав метод enableControllerService(ControllerService). Если служба контроллера невалидна, этот метод выдаст исключение IllegalStateException

После настройки всех необходимых служб контроллера необходимо настроить процессор, вызвав те же методы, что и для служб контроллера, но без указания какой-либо службы, например, setProperty(PropertyDescriptor, String) и пр. Каждый из методов установки свойств снова возвращает свойство ValidationResult, которое можно использовать для проверки допустимости значения свойства. Аналогично можно вызвать assertValid() и assertNotValid(), чтобы убедиться, что конфигурация процессора валидна и соответствует ожиданиям.

Запуск процессора

Прежде чем запустить процессор, обычно необходимо поставить в очередь файлы потока, которые он будет обрабатывать. Это можно сделать, используя метод enqueue() класса TestRunner. Этот метод имеет несколько различных переопределений и позволяет добавлять данные в виде массива байтов byte[], входного потока InputStream или пути Path. Каждый из этих методов также поддерживает вариант сопоставления Map<String, String>, позволяющий  добавлять атрибуты FlowFile. Также существует enqueue-метод, который принимает переменные аргументы объектов FlowFile, что пригодится для получения выходных данных процессора и их последующей подачи на вход процессора.

После настройки служб контроллера и постановки в очередь необходимых FlowFiles процессор можно запустить, вызвав  метод run() у объекта TestRunner. Если этот метод вызывается без каких-либо аргументов, он вызовет любой метод процессора с аннотациями @OnScheduled и @onTrigger, а затем запустит методы, аннотированные @OnUnscheduled и наконец @OnStopped.

Если желательно запустить несколько итераций метода onTrigger() до того, как будут запущены другие события жизненного цикла, аннотированные @OnUnscheduled и @OnStopped, метод run(int) можно использовать для указания количества итераций, которые  следует вызвать с помощью @onTrigger.

Если необходимо запустить процессор, но не запускать события жизненного цикла @OnUnscheduled и @OnStopped,  например, для проверки состояния процессора до возникновения этих событий, можно вызвать метод run(int, boolean), передав ему значение false в качестве второго аргумента. Однако после этого может возникнуть проблема вызова методов жизненного цикла, аннотированных @OnScheduled. Впрочем, можно запустить onTrigger() снова, без вызова этих событий, добавив в метод run() еще один параметр типа boolean:  run(int,boolean,boolean), которому также передается значение false.

Если требуется протестировать поведение, происходящее с несколькими потоками, можно использовать метод setThreadCount() объекта TestRunner. По умолчанию количество потоков равно 1. При использовании нескольких потоков важно помнить, что вызов run() сообщает TestRunner, сколько раз должен запускаться процессор, а не количество выполнения работы процессора для каждого потока. Поэтому, если для счетчика потоков установлено значение 2, но метод run() вызывается один раз, т.е. run(1), будет использоваться только один поток.

После завершения работы процессора модульный тест проверяет, что файлы потока направлены в место назначения, используя методы assertAllFlowFilesTransferred() и assertTransferCount() объекта TestRunner. Метод assertAllFlowFilesTransferred()принимает в качестве аргументов отношение и целое число, определяющее, сколько потоковых файлов должно быть передано в него. Метод не пройдет модульный тест, если это условие не выполнено или если какой-то FlowFile не передан в любую другую связь. Метод assertTransferCount() проверяет только совпадение фактического количества файлов потока с ожидаемым для конкретного отношения.

После проверки счетчиков можно получить фактические выходные файлы потока с помощью метода getFlowFilesForRelationship(), который возвращает список List<MockFlowFile>, который имеет множество методов проверки содержимого. В частности, у MockFlowFile есть методы для подтверждения существования атрибутов FlowFile – assertAttributeExists(), подтверждения отсутствия других атрибутов – assertAttributeNotExists() или того, что атрибуты имеют правильное значение: assertAttributeEquals()assertAttributeNotEquals(). Подобные методы существуют для проверки содержимого FlowFile, которое можно сравнить с массивом байтов byte[], файлом входящего потока InputStream или строкой String, что предпочтительно для текстовых данных.

В заключение рассмотрим более сложный кейс тестирования процессора NiFi, который подключается к удаленному ресурсу, который нужно имитировать во время модульного теста. Хотя можно создать простой сервер в модульном тесте и настроить процессор для связи с ним, но тогда придется реализовать спецификацию конкретного сервера, которая может не совпадать с реальным. Чтобы отправлять обратно сообщения об ошибках, используется защищенный метод, отвечающий за получение соединения или клиента к удаленному ресурсу.  В модульном тесте вместо создания TestRunner путем вызова TestRunners.newTestRunner(Class) и предоставления класса процессора надо создать подкласс процессора следующим образом:

@Test
public void testConnectionFailure() {
  final TestRunner runner = TestRunners.newTestRunner(new MyProcessor() {
    protected Client getClient() {
      // Return a mocked out client here.
      return new Client() {
        public void connect() throws IOException {
          throw new IOException();
        }

        // ...
        // other client methods
        // ...
      };
    }
  });
  // rest of unit test.
}

Так можно реализовать клиента, который имитирует все сетевые коммуникации и возвращает различные результаты ошибок, которые надо протестировать, а также гарантировать корректную логику обработки успешных вызовов клиента.

В дополнение к вышеупомянутым возможностям платформы тестирования, TestRunner предоставляет несколько удобных методов проверки поведения процессора. Например, методы, которые гарантируют, что входная очередь процессора очищена. Модульные тесты могут получать ProcessContext, ProcessSessionFactory, ProvenanceReporter и другие объекты, специфичные для платформы, которые будет использовать TestRunner. Метод shutdown() предоставляет возможность тестировать методы процессора, аннотированные для запуска только при выключении NiFi. Данные аннотации могут быть установлены для процессоров, использующих интерфейсы пользователя. Наконец, можно установить количество потоков для запуска процессора setThreadCount(int).

Узнайте больше про администрирование и использование Apache NiFi для построения эффективных ETL-конвейеров потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://nifi.apache.org/docs.html
  2. https://junit.org/junit5/docs/current/user-guide/
Поиск по сайту