Вчера мы писали про паттерны проектирования процессоров Apache NiFi, ориентированные на данные. Сегодня рассмотрим шаблоны с фокусом на маршрутизацию потоковых файлов, которые можно применять при разработке пользовательского процессора.
Маршрутизация на основе содержимого (Route Based on Content)
Процессор этого типа направляет входящий FlowFile на основе его содержимого в одно или несколько место назначения. Такой процессор NiFi имеет два отношения: совпадающее и несовпадающее. Если ожидается определенный формат данных, у процессора также будет отношение отказа (failure), когда ввод не соответствует ожидаемому формату. Процессор предоставляет свойство для задания критериев маршрутизации.
Если свойство, определяющее критерии маршрутизации, требует обработки, например компиляции регулярного выражения, эта обработка выполняется в методе с аннотацией @OnScheduled, а результат сохраняется в переменной volatile.
Метод onTrigger получает один FlowFile, считывает его содержимое FlowFile с помощью метода read() класса ProcessSession, оценивая критерии соответствия по мере передачи данных. Далее процессор определяет, в какое отношение направить FlowFile и по результатам маршрутизации генерирует событие Provenance ROUTE. Этот процессор NiFi имеет аннотации @SideEffectFree и @SupportsBatching из пакета org.apache.nifi.annotation.behavior.
Если процессор направляет один FlowFile к нескольким отношениям, они обычно динамически определяются дата-инженером. Чтобы пользователь мог определить дополнительные свойства, необходимо переопределить метод getSupportedDynamicPropertyDescriptor(). Этот метод возвращает PropertyDescriptor с предоставленным именем и применимым валидатором, чтобы убедиться, что указанные пользователем критерии соответствия действительны.
В таком процессоре набор отношений, возвращаемый методом getRelationships(), является переменной volatile. При этом переопределяется метод onPropertyModified():при добавлении или удалении свойства создается новое отношение с тем же названием (unmatched). Метод isDynamic() дескриптора PropertyDescriptor поможет проверить, является ли UDF-свойство динамически определяемым. Если это свойство является динамическим, создается новый набор отношений, в который копируется предыдущий.
Если свойства, определяющие критерии маршрутизации, требуют обработки, например, компиляции регулярного выражения, эта обработка выполняется в методе с аннотацией @OnScheduled с сохранением результата в переменной volatile. Эта переменная обычно имеет тип Map, с ключом типа Relationship, а тип значения определяется результатом обработки значения свойства.
Когда в наборе несколько отношений, процессор создает копию FlowFile для каждого из них, кроме первого с помощью метода clone() класса ProcessSession. Событие клонирования автоматически записывается в лог Provenance. Исходный FlowFile и каждый клон направляются в соответствующие отношения с атрибутом, указывающим имя отношения. Событие Provenance ROUTE генерируется для каждого FlowFile и регистрируется при возврате результатов.
Маршрутизация потоков NiFi на основе содержимого (Route Streams Based on Content)
Хотя предыдущий паттерн проектирования позволяет создать мощный и универсальный процессор NiFi, иногда возникает потребность в более детальной маршрутизации. В частности, когда формат входящих данных представляет собой поток из множества разных фрагментов информации, каждый из которых следует направить в разные отношения. Исходный FlowFile надо разделить на несколько дочерних, каждый из которых может содержать одинаковые части исходного или быть совершенно разными. Для этого пригодится процессор NiFi типа Route, название которого начинается с Route и заканчивается именем типа данных, которые он маршрутизирует, например, RouteCSV, RouteText и пр. Этот процессор поддерживает динамические свойства. Каждое определяемое пользователем свойство имеет имя, которое соответствует имени отношения. Значение свойства имеет формат, необходимый для критериев соответствия, включая регулярные выражения.
Этот процессор поддерживает внутреннее сопоставление ConcurrentMap, где ключом является отношение, а значение имеет тип, зависящий от формата критериев совпадения: ConcurrentMap<Relationship, Pattern>. Этот процессор переопределяет метод onPropertyModified(). Если новое значение, предоставленное этому методу (третий аргумент), равно null, отношение, имя которого определяется именем свойства (первый аргумент), удаляется из ConcurrentMap. Иначе новое значение обрабатывается и добавляется в ConcurrentMap с ключом, являющимся отношением, имя которого определяется именем свойства.
Этот процессор переопределяет метод customValidate(), в котором он извлекает все свойства из ValidationContext и подсчитывает количество динамических дескрипторов свойств путем вызова метода isDynamic() для дескриптора свойств. Если пользователь не добавил никаких отношений, количество динамических дескрипторов PropertyDescriptors равно 0. Поэтому процессор возвращает ValidationResult, указывающий, что процессор невалиден из-за отсутствия отношений.
Процессор возвращает все отношения, указанные пользователем, когда вызывается его метод getRelationships(), а также возвращает unmatched-отношение.
Поскольку процессор читает и записывает в репозиторий контента, он довольно ресурсоемкий. Поэтому при использовании этого процессора для очень больших объемов данных, целесообразно добавить свойство, позволяющее пользователю указать необходимость работы с данными, которые не соответствуют ни одному из критериев соответствия.
Когда вызывается метод onTrigger(), процессор получает FlowFile через метод get() класса ProcessSession. При доступности данных процессор создает сопоставление Map<Relationship, FlowFile>, считывая входящий FlowFile через вызов метода read() класса ProcessSession и предоставляя обратный вызов InputStreamCallback. Из обратного вызова процессор считывает первую часть данных из FlowFile и оценивает каждый из критериев совпадения с этим фрагментом. При совпадении критериев соответствия процессор получает FlowFile из сопоставления flowFileMap, который принадлежит соответствующему отношению. Если в сопоставлении для этого отношения нет FlowFile, процессор создает новый потоковый файл, вызывая session.create(incomingFlowFile), а затем добавляет его новый в flowFileMap. Далее процессор записывает этот фрагмент данных в FlowFile, вызывая session.append() с помощью OutputStreamCallback. Из этого обратного вызова OutputStreamCallback есть доступ к выходному потоку OutputStream нового FlowFile, поэтому можно записывать данные в новый FlowFile, возвращаясь из OutputStreamCallback. Наконец, нужно обновить сопоставление FlowFileMap, чтобы связать Relationship с новым FlowFile.
Если в какой-то момент возникнет исключение, нужно направить входящий FlowFile к отношению failure, а также удалить каждый из вновь созданных FlowFiles. Сделать это можно, вызвав метод session.remove(flowFileMap.values()). Здесь же следует зарегистрировать ошибку и вернуть результат. В случае успеха можно выполнить итерацию по flowFileMap и передать каждый FlowFile соответствующему отношению. Затем исходный FlowFile либо удаляется, либо направляется в исходное отношение. Для каждого из вновь созданных FlowFiles генерируется событие Provenance ROUTE, указывающее, к какому отношению перешел FlowFile. Также полезно указать в деталях события ROUTE, сколько фрагментов информации было включено в этот FlowFile. Это позволит менеджеру потоков данных при просмотре представления Provenance Lineage видеть, сколько фрагментов информации было отправлено в каждое отношение для входного FlowFile.
При необходимости группировки данных, отправляемых в отношения следует добавить свойство к каждому FlowFile в качестве атрибута, изменив определение flowFileMap на Map<Relationship, Map<T, FlowFile>>, где T — тип функции группировки, например, строка для регулярного выражения.
Маршрутизация по атрибутам (Route Based on Attributes) и разделение содержимого
Такой тип процессора почти идентичен вышеописанному, но он не вызывает метод read() класса ProcessSession, поскольку он не читает содержимое FlowFile. Этот процессор обычно очень быстрый и поддерживает аннотацию @SupportsBatching.
Процессор разделения содержимого (Split Content) обычно не требует пользовательской настройки, за исключением размера каждого создаваемого разделения. Метод onTrigger() получает FlowFile из своих входных очередей. Создается список типа FlowFile. Исходный FlowFile читается с помощью read()-метода класса ProcessSession. Внутри обратного вызова InputStreamCallback содержимое считывается до тех пор, пока не будет достигнута точка, в которой FlowFile должен быть разделен. Если разделение не требуется, обратный вызов возвращается, и исходный FlowFile направляется в отношение success с генерацией события Provenance ROUTE, которое обычно не создается при успешной маршрутизации FlowFile.
Если достигнута точка, в которой FlowFile необходимо разделить, новый FlowFile создается с помощью метода create(FlowFile) класса ProcessSession или метода clone(FlowFile, long, long). Метод Create подходит, когда данные не будут копироваться напрямую из исходного FlowFile в новый FlowFile. Например, если будут скопированы только некоторые данные или если данные будут изменены перед копированием в новый FlowFile. Но если содержимое нового FlowFile будет точной копией части исходного FlowFile, предпочтительнее использовать метод клонирования.
При использовании метода create() с исходным FlowFile в качестве аргумента вновь созданный потоковый файл наследует атрибуты исходного, а Apache NiFi создает событие FORK Provenance. В коде процессора дата-инженер при этом использует блок try/finally. В блоке finally вновь созданный FlowFile добавляется в список созданных потоковых файлов, чтобы при возникновении исключения очистить вновь созданный FlowFile. В блоке try обратный вызов инициирует новый обратный вызов, вызывая write()-метод класса ProcessSession с помощью OutputStreamCallback. Затем соответствующие данные копируются из InputStream исходного FlowFile в OutputStream для нового FlowFile.
Если содержимое вновь созданного FlowFile должно быть только непрерывным подмножеством байтов исходного FlowFile, предпочтительно использовать метод clone(FlowFile, long, long) вместо метода create(FlowFile). В этом случае смещение исходного FlowFile, с которого должно начинаться содержимое нового FlowFile, передается в качестве второго аргумента в метод клонирования, а его длина является третьим аргументом. Например, если исходный FlowFile имеет размер 10 000 байт, и вызван метод clone(flowFile, 500, 100), возвращаемый FlowFile будет идентичен FlowFile в отношении его атрибутов. Однако содержимое только что созданного FlowFile будет иметь длину 100 байт и будет начинаться со смещения 500 исходного FlowFile. Таким образом, содержимое вновь созданного FlowFile будет таким же, если скопировать байты исходного FlowFile с 500 по 599. После создания клон добавляется в список FlowFiles.
В отличие от метода create(), клонирование не требует операций дискового ввода-вывода. Apache NiFi может просто создать новый FlowFile, который ссылается на подмножество содержимого исходного FlowFile, вместо фактического копирования данных. Но на практике это не всегда возможно. Например, если информация заголовка должна быть скопирована из начала исходного FlowFile и добавлена в начало каждого разделения, клонирование не применимо.
В любом случае, независимо от клонирования или создания FlowFile при использовании этого паттерна проектирования пользовательского процессора Apache NiFi дата-инженер должен помнить о необходимости выдавать исключение ProcessException, если в какой-либо момент обратном вызове InputStreamCallback достигается условие, когда обработка не может продолжаться. В частности, когда входные данные искажены. Поэтому вызов метода read() класса ProcessSession следует заключить в блок try/catch, в котором перехватывается исключение ProcessException. Если исключение перехвачено, ошибка логируется, список вновь созданных файлов потока удаляется с помощью delete()- метода класса ProcessSession, а исходный FlowFile маршрутизируется в отношение failure. Если созданные FlowFiles маршрутизируются в состояние успеха, это событие регистрируется и метод возвращает результаты.
Обновление атрибутов на основе содержимого (Update Attributes Based on Content)
Этот процессор очень похож на вышеописанные паттерны маршрутизации, но FlowFile направляется в отношение success или failure, и к нему добавляются атрибуты по мере необходимости. Добавляемые атрибуты настраиваются аналогично маршруту на основе содержимого (один ко многим), при этом пользователь определяет свои собственные свойства. Имя свойства указывает имя добавляемого атрибута. Значение свойства указывает некоторые критерии соответствия, которые следует применить к данным. Если критерии соответствия соответствуют данным, добавляется атрибут с таким же именем, как и у свойства. Значением атрибута являются критерии из совпавшего контента.
Например, процессор, который оценивает выражения XPath, может разрешать ввод определенных пользователем XPath. Если XPath соответствует содержимому FlowFile, к этому FlowFile будет добавлен атрибут с именем, равным имени свойства, и значением, равным текстовому содержимому XML-элемента или атрибута, который соответствует XPath. Затем будет использоваться отношение failure, если входящий FlowFile не является допустимым XML. Отношение success будет использоваться независимо от того, были ли найдены совпадения. Этот процессор генерирует событие происхождения типа ATTRIBUTES_MODIFIED.
Как эффективно использовать Apache NiFi для построения эффективных ETL-конвейеров потоковой аналитики больших данных, вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Источники