Как тестировать конвейеры Apache NiFi: приемы и инструменты

NiFi ETL pipeline тестирование потоков данных пример курсы обучение, процессоры NiFi ETL-конвейер разработка тестирование, курсы Apache NiFi, Apache NiFi для инженеров данных и разработчиков Data Flow, ETL data pipeline Apache NiFi администрирование инженерия данных, обучение дата-инженеров, инженер данных курсы, Школа Больших Данных Учебный центр Коммерсант

В этой статье для обучения дата-инженеров поговорим про тестирование потоковых конвейеров обработки данных в Apache NiFi. Утилиты, классы и сервисы для проверки правильной работы процессоров, контроллеров и потоков.

Модульное тестирование процессоров Apache NiFi

Обычно тестирование компонентов крупной инфраструктуры не самая простая задача. В Apache NiFi проверка корректности обработки потоков данных зависит не только от правильно написанного кода процессоров, но и от их параметров, а также отношений и содержимого Flow File.

Apache NiFi имеет встроенный набор утилит для тестирования самого фреймворка и его отдельных процессоров. Можно изучить тестовый код любого связанного процессора, чтобы увидеть общие шаблоны тестирования с помощью выполнения фиктивных классов и сервисов.

В частности, модуль nifi-mock можно использовать вместе с JUnit для всестороннего тестирования компонентов. Чаще всего он используется для тестирования процессоров, но также дает возможность тестирования служб контроллеров.

Компоненты обычно тестируются путем создания функциональных тестов для проверки их поведения, поскольку основная логика работы процессора чаще всего заключена в методе onTrigger(). Интерфейс TestRunner позволяет тестировать процессоры и службы контроллеров, преобразовывая примитивные объекты, такие как файлы и байтовые массивы, в потоковые файлы. Также он обрабатывает создание сеансов процессора (ProcessSessions) и его контекста (ProcessContext), необходимых для работы процессора, и вызывает нужные методы жизненного цикла. Это позволяет дата-инженеру проверить, что процессор ведет себя в модульных тестах так же, как и в рабочей среде.

Таким образом, большинство модульных тестов для процессора или службы контроллера начинаются с создания экземпляра класса TestRunner. Чтобы добавить необходимые классы в процессор, можно использовать зависимость Maven:

<dependency>
  <groupId>org.apache.nifi</groupId>
  <artifactId>nifi-mock</artifactId>
  <version>${nifi version}</version>
</dependency>

Далее следует создать новый объект TestRunner, вызывая один из статических методов newTestRunner класса TestRunners, который находится в пакете org.apache.nifi.util. Эти методы принимают аргумент для тестируемого процессора, который может быть либо классом тестируемого процессора, либо его экземпляром, а также позволяют устанавливать имя процессора. После создания нового средства выполнения тестов можно добавить в него любые службы контроллера, которые потребуются процессору для работы. Это делается через вызов метода addControllerService() с указанием идентификатора службы контроллера и ее экземпляра.

Если необходимо настроить службу контроллера, ее свойства можно задать, вызвав метод setProperty(ControllerService, PropertyDescriptor, String), setProperty(ControllerService, String, String) или setProperty(ControllerService, PropertyDescriptor, AllowableValue). Каждый из этих методов возвращает объект ValidationResult, который можно проверить с помощью метода isValid(). Данные аннотации можно установить, вызвав метод setAnnotationData(ControllerService, String).

Теперь можно убедиться в валидности службы контроллера, вызвав метод assertValid(ControllerService) или удостовериться в обратном с помощью метода assertNotValid(ControllerService).

После добавления и настройки службы контроллера в средство выполнения тестов ее можно включить, вызвав метод enableControllerService(ControllerService). Если служба контроллера недействительна, этот метод вызовет исключение IllegalStateException. После настройки всех необходимых служб контроллера следует настроить процессор, вызвав те же методы, что и для служб контроллеров, без указания конкретного сервиса. Каждый из методов setProperty() снова возвращает свойство ValidationResult, которое можно использовать для проверки допустимости значения свойства, вызвав методы assertValid() и assertNotValid().

Перед запуском процессора обычно необходимо поставить потоковые файлы в очередь для обработки процессором. Для этого применяются методы класса TestRunner: например, метод enqueue() имеет несколько различных переопределений и позволяет добавлять данные в форме массива байтов, входящего потока или пути. Также есть метод постановки в очередь, который принимает var-args объектов потоковых файлов. Это пригодится для получения выходных данных одного процессора и подачи их на вход другого.

Настроив службы контроллера и установив в очередь необходимые файлы потока, можно запустить процессор, вызвав метод запуска TestRunner(). При вызове без аргументов, он вызовет любой метод в процессоре с аннотацией @OnScheduled, затем однократно вызовет метод процессора onTrigger(),запустит методы, аннотированные @OnUnscheduled и, наконец, @OnStopped. Если нужно выполнить несколько итераций метода onTrigger() до того, как будут инициированы другие события жизненного цикла @OnUnscheduled и @OnStopped, можно использовать метод run(int), чтобы указать, сколько количество вызовов метода onTrigger().

Если надо запустить процессор, но не запускать события жизненного цикла @OnUnscheduled и @OnStopped, например, для проверки состояния процессора до того, как произойдут эти события, можно следует добавить аргументы в метод run(). Для этого вызываем run(int, boolean) и передаем значение false в качестве второго аргумента.

Чтобы протестировать поведение нескольких потоков, используется метод setThreadCount() класса TestRunner. Важно, что вызов выполнения TestRunner указывает, сколько раз должен запускаться процессор, а не количество запусков для каждого потока. Поэтому, если количество потоков установлено равным 2, но вызывается run(1), будет использоваться только один поток.

По завершении работы процессора модульного теста должен проверить, что потоковые файлы переместились в нужное место. Это делается с помощью методов assertAllFlowFilesTransferred() и assertTransferCount() класса TestRunners. Первый метод принимает в качестве аргументов отношение и целое число, указывающее количество передаваемых ему потоковых файлов. Метод не пройдет модульный тест, если это количество FlowFiles не было передано в это отношение или если какой-либо потоковый файл не был передан в любое другое отношение. Метод assertTransferCount() проверяет только то, что фактическое количество потоковых файлов для данного отношения соответствует ожидаемому.

После проверки счетчиков можно получить фактические выходные потоковые файлы с помощью метода getFlowFilesForRelationship(), который возвращает список объектов MockFlowFile, которые имеют много методов проверки содержимого. В частности, в MockFlowFile есть методы для подтверждения существования атрибутов FlowFile, такие как assertAttributeExists(), подтверждения отсутствия других атрибутов assertAttributeNotExists() или того, что атрибуты имеют правильное значение assertAttributeEquals(), assertAttributeNotEquals(). Подобные методы существуют для проверки содержимого FlowFile. Содержимое потокового файла можно сравнить с массивом байтов, входящим потоком InputStream, файлом или строкой. Для текстовых данных лучше использовать тип данных String, который предоставляет более понятное сообщение об ошибке, если вывод не соответствует ожидаемому.

В дополнение к вышеупомянутым возможностям, предоставляемым средой тестирования, класс TestRunner предоставляет еще несколько методов проверки поведения процессора, например, для очистки его входной очереди. Модульные тесты могут получать ProcessContext, ProcessSessionFactory, ProvenanceReporter и другие объекты, зависящие от фреймворка, которые будут использоваться объектом класса TestRunner. Также есть метод для тестирования методов процессора, которые аннотированы для запуска только при выключении NiFi. Аннотационные данные могут быть установлены для процессоров, которые используют пользовательские интерфейсы пользователя. Наконец, количество потоков, которые должны использоваться для запуска процессора, можно установить с помощью метода setThreadCount(int).

Интеграционное тестирование потоков данных

Помимо модульных тестов отдельных процессоров и служб контроллера также важно проверить корректность работы всего конвейера обработки данных, т.е. выполнить интеграционное тестирование. Для этого можно создать динамические потоки в тестовом коде, настроив несколько процессоров и соединений, а затем передавать произвольные данные для оценки их поведения. Программное построение потока сначала может занять некоторое время, но после его завершения будет повторяемое определение потока, которое можно использовать со многими различными входными характеристиками. Например, интерфейс ITestHandleHttpRequest. Также можно протестировать изменение переменных в группах процессов.

Справедливости ради стоит отметить, что провести серию автоматических интеграционных тестов с запущенными процессами Apache NiFi и активным потоком данных не так-то просто, поскольку фреймворк не предоставляет простого API управления потоком, например, для запуска или остановки нескольких процессоров, развертывания шаблонов или проверки количества потоковых файлов потока в очереди. Кроме того, необходимо вручную интегрироваться с различными источниками данных. Чтобы упростить это, можно использовать Docker-контейнер, который предоставляет следующие функции для поддержки интеграционных тестов:

  • различные настройки NiFi, такие как Standalone Plain (nifi-sp), Standalone Secure (nifi-ss), Clusterd Plain (nifi-cp) и Clustered Secure (nifi-cs), настраиваемые в основном файле docker-compose;
  • тестовые примеры, ускоряющее добавление новых интеграционных тестов. Например, можно создать подкаталог и поместить в него файл скрипта с именем build-test, который создает и настраивает образ Docker-контейнера NiFi. Скрипт вызывается при выполнении команды сборки docker-compose. Файлы в тестовом каталоге будут добавлены в контейнер и доступны для основных тестовых сценариев.
  • том Docker, чтобы поддерживать частое обновление программ NiFi без создания образа контейнера. Общий том используется всеми контейнерами NiFi для загрузки библиотек. При обновлении содержимого общего тома и перезапуске контейнеров с помощью перезапуска docker-compose все контейнеры будут работать с обновленными модулями NiFi.

Использование такого Docker-образа ограничено размером среда, чтобы все контейнеры могли работать на одном хосте.

Чтобы протестировать влияние нескольких потоков на общую производительности системы, можно использовать процессор GenerateFlowFile для имитации статического или динамического содержимого и атрибутов FlowFile, передаваемых в группу процессов, где развернут тестируемый поток. При обновлении потока этот же процессор пригодится для проверки нового поведения. При запуске в производственной среде его можно отключить, переключив на этот же входной порт нужное входное соединение.

Одна из самых больших проблем при тестировании процессора NiFi, который подключается к удаленному ресурсу, заключается в том, что это нужно имитировать без фактического подключения к какому-либо удаленному ресурсу из модульного теста. Для этого можно самостоятельно запустить простой сервер в модульном тесте и настроить процессор для связи с ним. Но этот подход подразумевает много ручной работы: дата-инженеру нужно понять и реализовать специфичную для сервера спецификацию и правильно отправлять обратно сообщения об ошибках. Поэтому проще иметь в процессоре метод, который отвечает за получение соединения или клиента с удаленным ресурсом. Обычно такой метод помечается защищенным и реализуется через создания подкласса процессора. В модульном тесте вместо создания объекта класса TestRunner путем вызова метода TestRunners.newTestRunner(Class) и предоставления класса Processor создается другой подкласс процессора:

@Test
public void testConnectionFailure() {
  final TestRunner runner = TestRunners.newTestRunner(new MyProcessor() {
    protected Client getClient() {
      // Return a mocked out client here.
      return new Client() {
        public void connect() throws IOException {
          throw new IOException();
        }

        // ...
        // other client methods
        // ...
      };
    }
  });

  // rest of unit test.
}

Это позволяет реализовать клиента, который имитирует все сетевые коммуникации и возвращает различные результаты ошибок, которые надо протестировать, а также убедиться, что логика обработки данных корректна для посылки успешных вызовов клиенту.

Как эффективно использовать Apache NiFi  для построения эффективных ETL-конвейеров потоковой аналитики больших данных, вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://nifi.apache.org/docs.html
  2. https://stackoverflow.com/questions/57062240/how-can-apache-nifi-flow-be-tested
  3. https://github.com/ijokarumawak/nifi-integration-tests
Поиск по сайту