3 дата-шаблона проектирования процессоров Apache NiFi

пользовательские процессоры Apache NiFi, шаблоны проектирования процессоров Apache NiFi, дата-инженерия с Apache NiFi, разработка процессора NiFi, Apache NiFi примеры курсы обучение ETL-конвейеры, обучение дата-инженеров, инженер данных NiFI примеры курсы обучение, Школа Больших Данных Учебный Центр Коммерсант

Что пригодится дата-инженеру при разработке пользовательского процессора Apache NiFi: паттерны приема и выдачи данных, а также обогащение содержимого потокового файла. Знакомимся с принципами работы шаблонов проектирования процессоров NiFi, ориентированных на данные.

Прием данных (Data Ingress)

Apache NiFi предоставляет более 300 готовых процессоров – обработчиков, которые выполняют определенные действия с потоковыми файлами в рамках конвейера обработки данных. Также дата-инженер может написать свой собственный процессор самостоятельно, о чем мы уже упоминали здесь и здесь. При этом пользовательский процессор будет соответствовать одному из возможных шаблонов проектирования.

Шаблон под названием Прием данных (Data Ingress) предполагает, что процессор, который принимает данные в NiFi, имеет одно успешное отношение (Success). Напомним, для каждого процессора определяются 0 или более отношений, которые указывают результат обработки потокового файла. Завершив обработку FlowFile, процессор направляет его в одно из отношений (Relationship).

Процессор типа Data Ingress генерирует новые потоковые файлы с помощью create() метода сессии процесса (ProcessSession) и не извлекает потоковые файлы из входящих подключений. Обычно название такого процессора начинается со слов Get или Listen, в зависимости от того, опрашивает ли он внешний источник или предоставляет интерфейс, к которому могут подключаться внешние источники. Также в названии указывается протокол связи, например, GetFile, GetSFTP, ListenHTTP и GetHTTP.

Процессор этого типа может создать или инициализировать пул соединений в методе, использующем аннотацию @OnScheduled. Однако, поскольку проблемы со связью могут помешать установлению соединений или привести к их разрыву, сами подключения создаются не на этом этапе в методе onTrigger().

Метод onTrigger()начинает с аренды соединения из пула или создает соединение с внешней службой. Когда данные из внешнего источника недоступны, метод yield() контекста ProcessContext вызывается процессором, и возвращается, чтобы избежать постоянной работы и холостого потребления ресурсов. В успешном случае этот процессор создает FlowFile с помощью метода create(). При этом потоковому файлу назначается имя и путь к нему путем добавления соответствующих атрибутов. Выходной поток OutputStream для содержимого FlowFile получается с помощью write() метода записи ProcessSession через передачу обратного вызова OutputStreamCallback, который обычно является анонимным внутренним классом. Из этого обратного вызова процессор может записывать в FlowFile и передавать содержимое из внешнего ресурса в OutputStream. Если требуется записать все содержимое InputStream во FlowFile, метод importFrom() класса ProcessSession может быть более удобным.

Для эффективной обработки множества небольших файлов целесообразно создать несколько FlowFile из одного сеанса перед его фиксацией. Процессор генерирует событие происхождения данных (Provenance), регистрируя создание FlowFile в логах. После фиксации ProcessSession процессор подтверждает получение данных и/или удаляет данные из внешнего источника, чтобы предотвратить дубликаты. Несоблюдение этого принципа может привести к потере данных, поскольку перезапуск NiFi до фиксации сеанса приведет к удалению временного файла. Но есть риск получить дубли, поскольку приложение может быть перезапущено после фиксации сеанса и до подтверждения или удаления данных из внешнего источника. Однако, потенциальное дублирование данных предпочтительнее потенциальной потери данных. Соединение возвращается или добавляется в пул соединений, в зависимости от того, было ли соединение изначально арендовано из пула соединений или было создано в методе onTrigger().

Если возникает проблема со связью, соединение обычно прерывается и не возвращается (или не добавляется) в пул соединений. Соединения с удаленными системами разрываются, а пул соединений отключается в методе, аннотированном аннотацией @OnStopped, чтобы восстановить ресурсы.

Вывод данных (Data Egress)

Процессор типа Data Egress публикует данные во внешний источник и имеет два отношения: успех и отказ. Обычно название процессора начинается с Put, за которым следует протокол, используемый для передачи данных, например, PutEmail, PutSFTP. Сюда же относится PostHTTP. Процессор этого типа может создать или инициализировать пул соединений в методе, использующем аннотацию @OnScheduled. Однако, поскольку проблемы со связью могут помешать установлению соединений или привести к разрыву соединений, сами соединения не создаются на этом этапе, а создаются или арендуются из пула в методе onTrigger.

Метод onTrigger сначала получает FlowFile из ProcessSession с помощью метода get(). Если FlowFile недоступен, метод возвращается без установления соединения с удаленным ресурсом. Если хотя бы один FlowFile доступен, процессор получает соединение из пула соединений или создает новое соединение. Иначе FlowFile перенаправляется в отношение failure с регистрацией этого события.

Если соединение было получено, процессор получает InputStream для содержимого FlowFile, вызывая метод read() в ProcessSession и передавая обратный вызов InputStreamCallback. Этот обратный вызов часто является анонимным внутренним классом, и внутри передает содержимое FlowFile в пункт назначения. Событие регистрируется вместе с количеством времени, затраченным на передачу файла, и скоростью передачи данных. О событии SEND сообщается в ProvenanceReporter путем получения генератора отчетов из ProcessSession с помощью метода getProvenanceReporter() и вызова метода отправки в генераторе отчетов. Соединение возвращается или добавляется в пул соединений в зависимости от того, было ли оно арендовано из пула или создано заново методом onTrigger.

При возникновении проблем со связью соединение обычно прерывается и не возвращается или не добавляется в пул соединений. Когда проблема связана с состоянием сети, FlowFile обычно перенаправляется в отношение failure без штрафа (penalization). В отличие от процессора приема данных, в ProcessContext не вызывается yield(), поскольку в случае загрузки FlowFile не существует до тех пор, пока процессор не сможет выполнить свою функцию. Однако, в случае с процессором Data Ingress диспетчер потока данных может выбрать маршрутизацию сбоя на другой процессор, чтобы использовать резервную систему или для распределения нагрузки между многими системами.

В случае проблем с данными FlowFile может быть оштрафован и направлен в отношение failure. Так обстоит дело, например, с процессором PutFTP, когда FlowFile не может быть передан из-за конфликта имен файлов. Файл в конечном итоге будет удален из каталога, чтобы можно было передать новый файл. Для этого FlowFile штрафуется и направляется в отношение сбоя, чтобы повторить попытку позже. Соединения с удаленными системами разрываются, а пул соединений отключается с помощью метода, аннотированного @OnStopped, чтобы восстановить ресурсы.

NiFi-процессор обогащения содержимого потокового файла

Шаблон Enrich/Modify Content очень распространен и очень универсален. Он отвечает за любую модификацию общего содержимого. В большинстве случаев процессор этого типа помечен аннотациями @SideEffectFree и @SupportsBatching. Такой процессор имеет любое количество обязательных и необязательных свойств, в зависимости от его функции. Также процессор обычно имеет два отношения: успех и сбой, когда входной файл оказался не в ожидаемом формате.

Процессор получает FlowFile и обновляет его с помощью метода записи класса ProcessSession в обратном вызове StreamCallback, чтобы читать из содержимого потокового файла и записывать контент в его в следующую версию. Если во время обратного вызова возникают ошибки, обратный вызов вызовет исключение ProcessException. Вызов метода записи ProcessSession заключен в блок try/catch, который перехватывает ProcessException и направляет FlowFile к ошибке. Если обратный вызов завершается успешно, генерируется событие происхождения CONTENT_MODIFIED.

Завтра мы продолжим разговор про паттерны проектирования процессоров Apache NiFi и рассмотрим маршрутизацию по атрибутам и разделение содержимого. А о лучших практиках проектирования и реализации пользовательского процессора читайте в нашей новой статье.

Узнайте больше про использование Apache NiFi  для построения эффективных ETL-конвейеров потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://nifi.apache.org/docs.html
Поиск по сайту