Тонкости тестирования приложений Apache Flink SQL

курсы Apache Flink разработка и тестирование SQL примеры курсы обучение , Flink SQL примеры обучение курсы, обучение большим данных, курсы по flink, обучение Apache Hadoop Flink SQL, Flink разработка и тестирование приложений, курсы Apache Hadoop Flink SQL, курсы Hadoop для инженеров данных обучение примеры, обучение большим данным, Школа Больших Данных Учебный центр Коммерсант

Как протестировать работу приложения Apache Flink, используя SQL-клиентов, Table API, тестовые наборы операторов и режим локального мини-кластера. Разбираем особенности ручного и автоматизированного тестирования Flink SQL на уровне отдельных функций, модулей и их интеграционного взаимодействия.

Модульное и интеграционное тестирование приложений Apache Flink SQL

Тестирование является неотъемлемой частью любого процесса разработки ПО, тем более, когда речь идет о распределенных приложениях аналитики больших данных в реальном времени, для чего част используется Apache Flink. Этот фреймворк включает инструменты тестирования исходного кода приложения на нескольких уровнях пирамиды тестирования, от модульного до интеграционного.

Самым простым способом протестировать SQL-код Flink-приложения является клиент, который обеспечивает интерактивную среду, где можно выполнять SQL-запросы и просматривать результаты, чтобы быстро проверять код и вносить изменения. Но этот способ подходит только для небольшой доли случаев ручного тестирования, чего недостаточно для полной проверки приложения. Для комплексной проверки следует использовать инструменты автоматизированного тестирования, которые дают возможность протестировать код с несколькими наборами данных и различными сценариями. Автоматизированные тесты включают модульное и интеграционное тестирование. Модульные тесты используются для тестирования отдельных компонентов приложения, а интеграционные тесты — для проверки интеграции между различными компонентами. Все эти тесты помогают выявить проблемы, связанные с обработкой данных, такие как неправильный синтаксис SQL или некорректное преобразование данных.

Поскольку при написании кода приложения разработчик часто создает собственные функции (UDF, User Defined Function), их необходимо проверять в первую очередь. Рекомендуется как можно больше тестировать те классы, которые содержат основную бизнес-логику, модульными тестами. UDF-функцию, которая использует сборщик org.apache.flink.util.Collector, например, FlatMapFunction или ProcessFunction, можно легко протестировать, предоставив фиктивный объект-заглушку с той же функциональностью вместо реального сборщика.

Тестирование UDF-функции, которая использует управляемое состояние или таймеры, является более сложной задачей, поскольку включает проверку взаимодействия между пользовательским кодом и средой выполнения Flink. Для этого Flink поставляется с так называемыми тестовыми наборами операторов, которые можно использовать для тестирования UDF-функций, а также пользовательских операторов:

  • OneInputStreamOperatorTestHarness (для операторов потоков данных);
  • KeyedOneInputStreamOperatorTestHarness (для операторов KeyedStreams);
  • TwoInputStreamOperatorTestHarness (для операторов ConnectedStreams двух DataStreams);
  • KeyedTwoInputStreamOperatorTestHarness (для операторов в ConnectedStreams двух KeyedStreams);

Для использования тестовых наборов операторов необходимы дополнительные зависимости.  Операторы KeyedOneInputStreamOperatorTestHarness и KeyedTwoInputStreamOperatorTestHarness создаются путем дополнительного предоставления KeySelector, включая TypeInformation для класса ключа.

Учитывая важность модульного тестирования UDF-функций, в дополнение к предыдущим тестовым комплектам, которые можно использовать непосредственно для тестирования ProcessFunction, Flink предоставляет фабрику тестовых комплектов с именем ProcessFunctionTestHarnesses, которая упрощает создание экземпляров тестовых комплектов. Выполнить модульное тестирование такой функции с помощью ProcessFunctionTestHarnesses можно, передав подходящие аргументы и проверив вывод.

Тестовые наборы можно использовать для передачи записей и водяных знаков в UDF-функции или пользовательские операторы, контроля времени обработки и утверждения на выходе оператора, включая побочные выходные данные.

Автоматизированное тестирование

Выполнение модульных и интеграционных тестов для проверки поведения кода легко автоматизировать, что очень удобно для тестирования небольших единиц, таких как части запроса или функции. А благодаря широким возможностям настройки, можно определять отдельные сценарии тестирования с конкретными входными данными без потоков журнала изменений и контроля над атрибутами времени строки.

Еще одним преимуществом этого подхода является то, что он позволяет сократить время выполнения заданий при разработке UDF-функций, помогая выявить проблемы с выводом типов данных на ранних этапах процесса разработки. Однако, этот метод тестирования предполагает использование кода на Java или Scala.

Если нужно предоставить входные данные для тестирования, можно использовать метода TableEnvironment.fromValues(). Но этот метод поддерживает только вставки и не допускает потоков журналов изменений. Кроме того, он не обеспечивает никакого контроля над порядком данных и не поддерживает доступных параметров времени строки или водяных знаков.

Альтернативный подход заключается в использовании соединителя значений с TestValuesTableFactory. Это позволяет определять входные таблицы, используя весь набор опций, доступных в таблице DDL, включая возможность указать время строки и атрибуты водяных знаков, а также любые другие ее особенности. Но это закрытый API, не предназначенный для использования в производственных средах.

Чтобы собрать результаты запроса или операции в таблице, можно использовать метод TableResult.collect(), который извлекает результаты в виде CloseableIterator<Row> — специализированного итератора, позволяющего перебирать строки в результирующем наборе. Преимущество метода collect()в том, что он позволяет выполнять детальное сравнение результатов, включая атрибуты RowKind, используемые для указания типа изменения, произошедшего со строкой в потоке журнала изменений, т.е. выполненные операции вставки, обновления и удаления. Сравнивая RowKinds фактических результатов с ожидаемыми значениями, можно выявить любые несоответствия или проблемы с запросом или операцией.

Если нужно предоставить входные данные для тестирования в Flink 1.13 или выше, пригодится метод TableEnvironment.fromValues() или StreamTableEnvironment.fromChangelogStream(), который позволяет определить входные данные как DataStream<T> или DataStream<Row> с атрибутами RowKind. Этот метод предлагает автоматическое преобразование типов и сохраняет время события и водяные знаки во всех операциях. Кроме того, он позволяет определять пользовательские определения схемы так же, как в табличном DDL, обеспечивая большую гибкость и контроль над входными данными. В целом, использование метода fromChangelogStream() может быть более мощным и универсальным способом предоставления входных данных для тестирования в Flink 1.13 и выше.

Для тестирования заданий на локальном встроенном мини-кластере Apache Flink удобно воспользоваться правилом JUnit под названием MiniClusterWithClientResource, для использования которого требуется одна дополнительная зависимость с областью действия теста.

Чтобы не копировать весь конвейерный код из рабочей среды в тестовую при интеграционном тестировании в режиме MiniClusterWithClientResource, рекомендуется сделать специальные тестовые источники и приемники данных. При этом используется статическая переменная в CollectSink, поскольку Flink сериализует все операторы перед их распределением по кластеру. В качестве альтернативы можно записать данные в файлы во временном каталоге с тестовым приемником. Если задание Flink SQL использует таймеры времени события, можно реализовать пользовательскую функцию параллельного источника для создания водяных знаков. Когда конвейер содержит пользовательскую обработку состояния, ее правильность можно проверить, включив контрольные точки и перезапустив задание в мини-кластере. Для этого нужно инициировать сбой, создав исключение из UDF-функции для тестирования конвейера обработки данных. Подробнее про использование мини-кластера для интеграционного тестирования UDF-функций читайте в нашей новой статье.

Узнайте больше про применение Apache Flink для потоковой обработки событий в распределенных приложениях аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://www.ververica.com/blog/how-to-test-flink-sql-application
  2. https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/testing/
Поиск по сайту