Как настроить свой Python-процессор Apache NiFi

Что необходимо реализовать в собственном процессоре, написанном на Python, чтобы запускать его в Apache NiFi. Классы и методы для настройки свойств, а также отношения и состояния жизненного цикла.

Классы и методы для настройки свойств

Предустановленные обработчики данных или процессоры (processor) Apache NiFi, написанные на Java, можно настроить прямо в GUI, задав значения конфигурациям. Подробно об этом мы писали здесь. Однако, чтобы сделать это для пользовательского процессора, написанного на Python, при его разработке следует реализовать соответствующие методы. Это необходимо сделать из-за вызовов Java-объектов из Python-процессов, что обеспечивается с помощью библиотеки Py4J. Она играет роль шлюза между Python и Java, позволяя программам Python, работающим в интерпретаторе Python, динамически получать доступ к объектам Java в JVM. Методы вызываются таким образом, будто объекты Java находятся в интерпретаторе Python. Так можно получить доступ к коллекциям Java, используя стандартные методы Python-коллекций и наоборот: Java-программа может вызывать объекты Python.

Как мы рассматривали в прошлой статье, для разработки собственного процессора Apache NiFi на Python, надо использовать API классов FlowFileTransform и RecordTransform. При этом, чтобы раскрыть доступные свойства процессора, Python-код его реализации должен предоставить свойство PropertyDescriptor, используя класс nifiapi.properties.PropertyDescriptor. Конструктор этого класса принимает два обязательных позиционных аргумента: имя (name) и описание (description). Все остальные аргументы являются необязательными. Обычно процессор имеет несколько дескрипторов свойств, которые возвращаются в структуру NiFi путем реализации метода def getPropertyDescriptors(self). Этот метод возвращает список PropertyDescriptors. После создания дескрипторов свойств в конструкторе процессора их надо вернуть в этом методе, например:

from nifiapi.flowfiletransform import FlowFileTransform
from nifiapi.properties import PropertyDescriptor, StandardValidators

class PrettyPrintJson(FlowFileTransform):
...
    def __init__(self, **kwargs):
        super.__init(**kwargs)

        numspaces = PropertyDescriptor(name="Number of Spaces",
            description="Number of spaces to use for pretty-printing",
            validators=[StandardValidators.POSITIVE_INTEGER_VALIDATOR],
            defaultValue="4",
            required=True)
        self.descriptors = [numspaces]

...

    def getPropertyDescriptors(self):
        return self.descriptors

Если надо ограничить разнообразие данных, которые пользователи Apache NiFi могут вводить в свойства процессора, разработчик должен реализовать соответствующий метод. Например, чтобы пользователь мог вводить несколько пар ключ/значение, где ключ — это название поля записи, которое нужно установить, надо реализовать следующий метод, который возвращает PropertyDescriptor:

def getDynamicPropertyDescriptor(self, propertyname):
    return PropertyDescriptor(name=propertyname,
        description="A user-defined property",
        dynamic=True)   # dynamic=True is optional and included here only for completeness' sake

Если этот метод не реализован и пользователь добавит свойство, отличное от тех, которые явно поддерживаются, процессор станет недействительным. Также можно указать явные валидаторы, допустимые к использованию.

Отношения и жизненный цикл Python-процессора Apache NiFi

Поскольку любой процессор в NiFi отправляет данные в отношения (relationship), их тоже нужно объявить при разработке собственных процессоров. Оба класса для создания своих Python-процессоров FlowFileTransform и RecordTransform уже имеют три отношения:

  • original – системная связь, не используется реализациями, а используется только самой платформой, чтобы передавать входной FlowFile без изменений;
  • failure, куда процессор направляет данные, если не может преобразовать их, например, потому что они невалидны;
  • success – для направления результатов успешно выполненной операции.

Разработчик может переопределить эти отношения, реализовав метод def getRelationships(self), который возвращает список или набор объектов nifiapi.relationship.Relationship. Если этот метод реализован, отношение success  не будет автоматически доступно: его придется создать и вернуть в этом списке, чтобы использовать. При этом отношения failure и original доступны всегда, независимо от реализации.

В заключение отметим важную для разработчика возможность повторного использования созданных объектов. Для этого в NiFi есть метод onScheduled(), который можно реализовать в процессоре, определив его так: def onScheduled(self, context)

Аргумент context представляет собой объект класса ProcessContext, о чем мы ранее рассказывали здесь. Этот метод не имеет возвращаемого значения и вызывается каждый раз, когда запланирован запуск процессора, независимо от повода. Чтобы удалить объекты, когда они больше не нужны, надо реализовать метод def onStopped(self, context). Этот метод вызывается каждый раз, когда процессор остановлен и больше не имеет активных задач. 

Узнайте больше про администрирование и использование Apache NiFi для построения эффективных ETL-конвейеров потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://nifi.apache.org/documentation/v2/
  2. https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#Configuring_a_Processor

Поиск по сайту