Тестовые наборы для проверки UDF-функций Flink-приложения

тестовые наборы Apache Flink примеры курсы обучение, курсы Apache Flink разработка и тестирование SQL примеры курсы обучение , Flink SQL примеры обучение курсы, обучение большим данных, курсы по flink, обучение Apache Hadoop Flink SQL, Flink разработка и тестирование приложений, курсы Apache Hadoop Flink SQL, курсы Hadoop для инженеров данных обучение примеры, обучение большим данным, Школа Больших Данных Учебный центр Коммерсант

Мы уже писали про тестирование приложений Apache Flink, используя SQL-клиентов, Table API, тестовые наборы операторов и режим локального мини-кластера. Сегодня рассмотрим, как с помощью тестовых наборов тестировать UDF-функции, использующих состояние и таймеры.

Модульное тестирование UDF-функций Flink-приложения с помощью тестовых наборов

При работе с Apache Flink разработчики часто сталкиваются с проблемами при тестировании определяемых пользователем функций (UDF), использующих состояние и таймеры. Дополнительную сложность создают такие функции, как KeyedProcessFunction. Чтобы упростить работу с такими функциями, как мы уже упоминали здесь, Flink включает в себя специально разработанные тестовые наборы (TestHarness). Эти тестовые наборы были созданы, чтобы помочь проверить правильность встроенных операторов Flink, а также их можно использовать для тестирования UDF-функций, использующих состояние и таймеры. Используя эти наборы тестов, разработчики могут убедиться, что их пользовательские функции работают правильно и могут обрабатывать различные типы входных данных. Это позволит сэкономить время и усилия в процессе разработки и может обеспечить надежность и точность пользовательских функций.

Все тестовые наборы Flink для тестирования UDF-функций, а также пользовательских операторов делятся по следующим категориям:

  • OneInputStreamOperatorTestHarness (для операторов потоков данных);
  • KeyedOneInputStreamOperatorTestHarness (для операторов KeyedStreams);
  • TwoInputStreamOperatorTestHarness (для операторов ConnectedStreams двух DataStreams);
  • KeyedTwoInputStreamOperatorTestHarness (для операторов вConnectedStreams двух KeyedStreams);

Операторы KeyedOneInputStreamOperatorTestHarness и KeyedTwoInputStreamOperatorTestHarness создаются путем дополнительного предоставления KeySelector, включая TypeInformation для класса ключа. Таким образом, KeyedOneInputStreamOperatorTestHarness включает в себя KeyedProcessOperator, который, в свою очередь, содержит функцию KeyedProcessFunction, предназначенную для оценки.

UDF Apache Flink test harnesss
Зависимости UDF-функций Flink

Для использования тестовых наборов операторов необходимы дополнительные зависимости.  В частности, нужно добавить некоторые зависимости в проект, чтобы использовать тестовые наборы. Например, чтобы протестировать задания DataStream, необходимо добавить в блок зависимостей pom.xml проекта Maven следующий участок кода:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-test-utils</artifactId>
    <version>1.18.0</version>
    <scope>test</scope>
</dependency>

А для тестирования заданий Table/SQL надо добавить в блок зависимостей pom.xml для проекта Maven в дополнение к вышеупомянутым flink-test-utils еще кое-что:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-test-utils</artifactId>
    <version>1.18.0</version>
    <scope>test</scope>
</dependency>

Потоковая обработка данных с помощью Apache Flink

Код курса
FLINK
Ближайшая дата курса
2 декабря, 2024
Продолжительность
16 ак.часов
Стоимость обучения
48 000 руб.

Тестирование UDF-функций с помощью оператора позволяет проверить протестировать функцию в контексте оператора и посмотреть, как она ведет себя при использовании в конвейере Flink. Поскольку Flink предоставляет наборы тестов только для операторов, разработчику придется вручную создать экземпляр соответствующего оператора, чтобы использовать эти наборы тестов. Это означает, что нужно будет создать экземпляр оператора и настроить его с необходимыми входными данными и параметрами. После этого можно использовать тестовые наборы для проверки правильности UDF-функции. На практике такой подход может оказаться более сложным, чем использование других методов тестирования, поскольку он требует ручной настройки оператора и настройки его с соответствующими входными данными. Следующий пример Java-кода показывает создание экземпляра оператора KeyedProcessOperator, который затем будет протестирован.

@Test
public void testMyProcessFunction() throws Exception {
    KeyedProcessOperator<String, String, String> operator =
        new KeyedProcessOperator<>(new MyKeyedProcessFunction());

    // setup test harness
    // push in data
    // verify results
}

Чтобы протестировать пользовательскую функцию во Flink, сначала необходимо создать соответствующий тестовый набор (TestHarness). После этого можно использовать его для тестирования различных аспектов UDF-функции и оператора, в котором она используется. Например, можно проверить, что обработка элемента с оператором создает состояние и что это состояние позже очищается. Пример Java-кода для такого теста может выглядеть так:

public class MyKeyedProcessFunctionTest {
   private KeyedOneInputStreamOperatorTestHarness<Long, Long, Long> testHarness;

   @Before
   public void setupTestHarness() throws Exception {

       KeyedProcessOperator<Long, Long, Long> operator =
               new KeyedProcessOperator<>(MyKeyedProcessFunction);

       testHarness =
               new KeyedOneInputStreamOperatorTestHarness<>(operator, e -> e.key, Types.LONG);

       testHarness.open();
   }

   @Test
   public void testingStatefulFunction() throws Exception {

       assertThat(testHarness.numKeyedStateEntries(), is(0));
       testHarness.setProcessingTime(0L);
       
       testHarness.processElement(2L, 100L);
       assertThat(testHarness.numKeyedStateEntries(), is(not(0)));
       
       testHarness.setProcessingTime(3600000L);
       assertThat(testHarness.numKeyedStateEntries(), is(0));
   }
}

При тестировании UDF-функции Flink-приложения важно проверить все аспекты ее поведения, особенно создание и срабатывание таймеров. Для этого также пригодятся тестовые наборы. Можно создать TestHarness для отправки тестовых данных через оператора и проверки создания таймера. Затем продвинуть водяной знак (watermark) и убедиться, что таймер сработал. В дополнение к тестированию таймеров также можно проверить, что обработка элемента создает состояние, и посмотреть результаты этой обработки.

Например, срабатывание таймера для тестовой цели ожидается в течение 20 миллисекунд. В следующем примере кода показано, как протестировать его с помощью тестового набора.

@Test
public void testTimelyOperator() throws Exception {

    // setup initial conditions
    testHarness.processWatermark(0L);
    assertThat(testHarness.numEventTimeTimers(), is(0));

    // send in some data
    testHarness.processElement(3L, 100L);

    // verify timer
    assertThat(testHarness.numEventTimeTimers(), is(1));

    // advance the time to 20 in order to fire the timer.
    testHarness.processWatermark(20);
    assertThat(testHarness.numEventTimeTimers(), is(0));

    // verify results
    assertThat(testHarness.getOutput(), containsInExactlyThisOrder(3L));
    assertThat(testHarness.getSideOutput(new OutputTag<>("invalidRecords")), hasSize(0))
}

Важно отметить, что тестовые наборы считаются внутренними, поэтому их API может меняться при изменении версии Apache Flink. Хотя обычно эти изменения совместимы с предыдущими версиями, возможно, потребуется внести некоторые обновления в тесты. Также с тестовыми наборами связаны еще некоторые ограничения интеграционного тестирования, которые мы разберем далее.

Интеграционное тестирование

При всей своей мощи, тестовые наборы Flink позволяют тестировать только один оператор, а не целый конвейер операторов. Это означает, что для тестирования UDF-функции, которая используется в конвейере операторов, разработчику потребуется настроить и протестировать каждый оператор отдельно.

Впрочем, для облегчения этой задачи Apache Flink предоставляет правило JUnit под названием MiniClusterWithClientResource для тестирования полных заданий на локальном встроенном мини-кластере. Для использования MiniClusterWithClientResource требуется одна дополнительная зависимость (с областью действия теста):

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-test-utils</artifactId>
    <version>1.18-SNAPSHOT</version>    
    <scope>test</scope>
</dependency>

При выполнении интеграционного тестирования с MiniClusterWithClientResource рекомендуется применять следующие лучшие практики:

  • Вместо копирования всего кода конвейера из рабочей среды в тестовую можно сделать источники и приемники подключаемыми и/или внедрить в тесты специальные тестовые источники и тестовые приемники.
  • Поскольку Flink сериализует все операторы перед их распределением по кластеру для взаимодействия с операторами, созданными локальным мини-кластером Flink, можно использовать статические переменные или записать данные в файлы во временном каталоге с тестовым приемником.
  • Для заданий, использующих таймеры времени события, целесообразно реализовать пользовательскую функцию параллельного источника для создания водяных знаков;
  • Рекомендуется всегда тестировать конвейеры локально с параллелизмом > 1, чтобы выявить ошибки, которые проявляются только для конвейеров, выполняемых параллельно.
  • Вместо декоратора @Rule рекомендуется использовать @ClassRule, чтобы несколько тестов могли использовать один и тот же кластер Flink. Это экономит много времени на запуск и завершение работы кластеров Flink.
  • Если конвейер содержит пользовательскую обработку состояния, ее правильность можно проверить, включив контрольные точки и перезапустив задание в мини-кластере. Для этого нужно инициировать сбой, создав исключение из UDF-функции (только для тестирования) в пользовательском конвейере.

Таким образом, рассмотренные техники и инструменты тестирования приложений Apache Flink позволяют убедиться в его корректной работе. В заключение отметим, что важно протестировать приложение на нескольких уровнях, от модульного тестирования UDF-функций, использующих состояние и таймеры, до интеграционного и нагрузочного тестирования. Именно такой многоуровневый подход к тестированию позволит убедиться, что приложение работает должным образом и готово к запуску в производственной среде.

Узнайте больше про применение Apache Flink для потоковой обработки событий в распределенных приложениях аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://www.ververica.com/blog/flinks-test-harnesses-uncovered
  2. https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/testing/
Поиск по сайту