Создание и тестирование источника данных в Apache Flink

Apache Flink data source Примеры курсы обучение, Apache Flink для разработчиков и дата-инженеров примеры курсы обучение, потоковая обработка больших данных с Apache Flink обучение примеры курсы, Школа Больших Данных Учебный Центр Коммерсант

Недавно мы писали про источники данных Apache Flink. Сегодня рассмотрим, как создать и протестировать собственный источник данных для их обработки в распределенном приложении.

Создание своего источника данных в Apache Flink

Напомним, источник данных в Apache Flink состоит из трех основных компонентов: Split, SplitEnumerator и SourceReader.

  • Splits — это часть данных, используемых источником, например, файл или раздел журнала. Это степень детализации, с которой источник распределяет работу и распараллеливает чтение данных.
  • SourceReader запрашивает разделы и обрабатывает их, например, читая файл или раздел журнала, представленный разделом. Считыватели источника работают параллельно с диспетчерами задач и создают параллельный поток событий/записей.
  • SplitEnumerator генерирует разделения и назначает их считывателям источника. Он работает как отдельный экземпляр в Диспетчере заданий и отвечает за сохранение незавершенных операций разделения и их сбалансированное назначение читателям.

Класс Source — это точка входа в API, которая связывает вместе три вышеуказанных компонента. Благодаря унифицированной обработке данных в Apache Flink, API источников данных поддерживает неограниченные потоковые источники и ограниченные пакетные. Разница между обоими случаями минимальна: в случае пакета SplitEnumerator генерирует фиксированный набор разбиений. В случае неограниченной потоковой передачи разбиения не являются конечными, т.е. счетчик продолжает генерировать новые разбиения.

Экземпляры SourceReader запрашивают разбиения у SplitEnumerator , и взамен им назначаются результирующие разбиения. Flink предоставляет реализацию SourceReaderBase, которая отвечает за все потоки. Flink предоставляет расширение этого класса SingleThreadMultiplexSourceReaderBase, в котором уже настроена многопоточная модель: каждый экземпляр SplitReader считывает разделы, используя один поток. Чтобы создать собственный источник данных, надо в классе SourceReader выполнить следующий набор действий:

  • указать провайдера SplitReader;
  • создать RecordEmitter;
  • создать общие ресурсы для SplitReaders (сеансы и пр.). Поскольку поставщик SplitReader создается в конструкторе SourceReader в вызове super(), можно использовать фабрики SourceReader для создания общих ресурсов и передачи их провайдеру;
  • реализовать метод start(),  запросив SplitEnumerator для первого разделения;
  • переопределить метод close() в родительском классе SourceReaderBase, чтобы освободить созданные ресурсы;
  • реализовать метод initializedState() для создания изменяемого состояния SplitState из Split;
  • реализовать метод toSplitType() для создания Split из изменяемого состояния SplitState;
  • реализовать метод onSplitFinished(), чтобы запросить SplitEnumerator для следующего разделения в пакетной обработке.

SourceSplit представляет собой раздел исходных данных, который следует рассматривать как неизменяемый объект: любое его обновление должно выполняться в связанном с ним состоянием (SplitState). Состояние разделения хранится внутри контрольных точек Flink, которые случаются между двумя выборками для одного разделения. Поэтому читая разделение, следует сохранить в состоянии разделения текущее состояние процесса чтения. Это текущее состояние должно сериализуемым, поскольку оно будет частью контрольной точки. Также благодаря сериализации Flink сможет возобновить работу, продолжив читать данные с места остановки. Это гарантирует отсутствие дубликатов или потерянных данных.
Например, если порядок чтения записей в бэкенде детерминирован, то в состоянии разделения может храниться n уже прочитанных записей, то после отработки отказа перезапуск возобновится с n+1.

SplitEnumerator отвечает за создание разделений и предоставление их считывателям. Рекомендуется генерировать разделения отложенными вычислениями, т.е. считыватель запрашивает у счетчика разделение, SplitEnumerator генерирует его по запросу и назначает считывателю. Для этого необходимо реализовать метод handleSplitRequest(). Генерация отложенных разделений предпочтительнее, поскольку позволяет сэкономить потребление памяти. Apache Flink предлагает возможность действовать при регистрации считывателя, реализуя метод addReader(). Когда разделения потребляет слишком много ресурсов, можно заранее создать пакет с ограниченным количеством разделений, чтобы не потреблять слишком много памяти за раз при разделении огромного количества исходных данных. Чтобы оптимальное количество разделений, следует заранее оценить размер исходных данных и указать максимальный объем памяти, который потребуется для разделения. Так можно настроить этот параметр в соответствии с памятью, доступной в диспетчерах задач. Если разделение читается полностью, следует убедиться, что общее содержимое разделения (записи из источника) помещается в память.

SplitReader считывает записи в форме промежуточного формата записи, который разработчик предоставляет для каждой записи. Это может любой формат сериализации данных, который содержит все необходимое для преобразования в выходной формат записи. Для этого при создании собственного источника данных нужно реализовать метод emitRecord(), чтобы сделать это преобразование. Необходимо инициализировать идемпотентную реализацию RecordEmitter с функцией сопоставления, чтобы в случае прерывания тот же набор записей снова был передан генератору записей позже.

При этом важно помнить, что использование сериализации Java запрещено во Flink, поэтому разработчику придется вручную написать поля объектов, используя ObjectOutputStream. Если класс не поддерживается ObjectOutputStream, надо записать размер объекта в байтах как Integer, а затем записать объект, преобразованный в byte[]. Аналогичный метод используется для сериализации коллекций: надо сперва написать количество элементов коллекции, затем сериализовать все содержащиеся объекты. Для десериализации такое же чтение реализуется в обратном порядке. Для большого количества разделений рекомендуется кэшировать OutputStream, используемый в SplitSerializer. Это можно сделать это с помощью ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE = ThreadLocal.withInitial(() -> new DataOutputSerializer(64));

Начальный размер потока зависит от размера разделения. Разобравшись с основными принципами создания собственного источника данных для пакетной или потоковой обработки с Apache Flink, далее рассмотрим, как его протестировать.

Тестирование источника данных

Чтобы протестировать созданные сериализаторы для Split и SplitEnumeratorState в модульных тестах, надо создать объект, сериализовать его, затем десериализовать и проверить совпадение данных. Функции hascode() и equals() должны быть реализованы для сериализованных объектов. Для тестов, требующих работающего бэкенда, Flink предоставляет тестовую среду исходного кода JUnit5, включая среду Flink, серверную среду, семантику контрольных точек и контекст теста. Тестовый класс расширяет SourceTestSuiteBase — набор тестов, который уже содержит все необходимые тесты, включая одиночное и множественное разделения, чтение в режиме ожидания и пр. Он предназначен для пакетных и потоковых источников, поэтому лишнее следует отключить, переопределив их в исходном классе и аннотировав с помощью @Disabled. Можно добавить свои собственные интеграционные тесты. Но в большинстве случаев нужно только предоставить классы тестовой среды Flink для настройки ITCase, добавив аннотированное поле в класс ITCase.

@TestEnv
MiniClusterTestEnvironment flinkTestEnvironment = new MiniClusterTestEnvironment();

Чтобы протестировать коннектор, нужен бэкенд для его запуска. Эта тестовая среда TestEnvironment предоставляет все, что связано с серверной частью: контейнер, его конфигурацию, сеанс для подключения к нему и все элементы, привязанные ко всему тестовому сценарию (табличное пространство, запросы инициализации и пр.). Добавим это аннотированное поле в ITCase.

@TestExternalSystem
MyBackendTestEnvironment backendTestEnvironment = new MyBackendTestEnvironment();

Для интеграции с JUnit5 BackendTestEnvironment реализует TestResource. Эта среда ограничена набором тестов, поэтому именно здесь настраивается серверную часть и общие ресурсы (сеанс, табличное пространство и т. д.), реализуя методы startup() и tearDown(). Для этого рекомендуется использовать тестовые контейнеры (testContainers), которые полагаются на Docker-образы, чтобы предоставить реальный серверный экземпляр, а не макет для интеграционных тестов. Настроить тестовые контейнеры можно следующим образом:

  • Перенаправить вывод контейнера (ошибки и стандартный вывод) в журналы Flink;
  • Установите различные тайм-ауты, чтобы справиться с нагрузкой на сервер CI;
  • Установить механизмы повторной попытки подключения, запросы инициализации и т. д. по той же причине.

С помощью следующего кода можно проверить, что источник поддерживает семантику строго однократной доставки данных:

@TestSemantics
CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE};

При этом можно столкнуться с проблемой при выполнении тестов: по умолчанию в исходной тестовой среде Flink считается, что данные считываются в том же порядке, в котором они были записаны. Это неверно для большинства бэкендов больших данных, где порядок обычно не является детерминированным. Чтобы поддерживать неупорядоченные проверки и по-прежнему использовать все тесты, предоставляемые фреймворком, нужно переопределить SourceTestSuiteBase#checkResultWithSemantic в ITCase:

@Override
protected void checkResultWithSemantic(
  CloseableIterator<Pojo> resultIterator,
  List<List<Pojo>> testData,
  CheckpointingMode semantic,
  Integer limit) {
    if (limit != null) {
      Runnable runnable =
      () -> CollectIteratorAssertions.assertUnordered(resultIterator)
        .withNumRecordsLimit(limit)
        .matchesRecordsFromSource(testData, semantic);
      assertThat(runAsync(runnable)).succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT);
    } else {
        CollectIteratorAssertions.assertUnordered(resultIterator)
                .matchesRecordsFromSource(testData, semantic);
    }
}

Это копирование родительского метода, в котором метод assertOrdered() класса CollectIteratorAssertions заменен на  assertUnordered(). Контекст теста предоставляет Flink средства для взаимодействия с серверной частью, такие как вставка тестовых данных, создание таблиц или создание источника. Он ограничен тестовым набором и связан с ITCase через фабрику TestContext:

@TestContext
TestContextFactory contextFactory = new TestContextFactory(testEnvironment);

TestContext реализует DataStreamSourceExternalContext без подключения к серверной части в каждом тестовом случае. Поэтому общие ресурсы, такие как сеанс, создаются внутренней тестовой средой в рамках набора тестов. Затем они передаются в тестовый контекст конструктором. Также в конструкторе инициализируются серверные ресурсы тестового примера, такие как таблица тестового набора и используются следующие методы:

  • close() — удалить созданные ресурсы тестового примера;
  • getProducedType() – указать тип тестового вывода источника, например, тестовый Pojo;
  • getConnectorJarPaths() — предоставить список подключенных JAR-архивов;
  • createSource() – создание источника данных;
  • createSourceSplitDataWriter()  — создать ExternalSystemSplitDataWriter, ответственный за запись тестовых данных, которые представляют собой список созданных объектов типа, таких как определенные в методе getProducedType();
  • generateTestData() – создать список тестовых данных, которые будут переданы в ExternalSystemSplitDataWriter. Следует убедиться, что метод equals() возвращает false, когда 2 записи этого списка принадлежат разным разделениям. Для этого проще включить разделенный идентификатор в создаваемый тип и убедиться, что функции equals() и hashcode() правильно реализованы для включения этого поля.

Читайте в нашей новой статье про гибридные источники данных, которые позволяют последовательно считывать информацию из разных мест, комбинируя пакетную и потоковую обработку.

Все эти и другие подробности использования Apache Flink для потоковой обработки событий в распределенных приложениях аналитики больших данных вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/sources/
  2. https://flink.apache.org/2023/05/03/howto-create-a-batch-source-with-the-new-source-framework/
  3. https://flink.apache.org/2023/05/12/howto-test-a-batch-source-with-the-new-source-framework/
Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту