Как написать свой процессор Apache NiFi на Python: обзор 2-х API

Apache NiFi для дата-инженера, инженерия данных с Apache NiFi, Apache NiFi примеры курсы обучение, Python в Apache NiFi, Школа больших Данных Учебный центр Коммерсант

Продолжая разговор про рассмотренные в прошлой статье принципы взаимодействия процессов Python с Java, на которой написан Apache NiFi, сегодня разберем, как использовать это на практике. Пишем свои процессоры, используя классы FlowFileTransform и RecordTransform.

Python-процессор Apache NiFi на базе FlowFileTransform

Хотя Apache NiFi предоставляет более 300 процессоров для вычислительных операций и взаимодействия со множеством сторонних систем, иногда дата-инженеру приходится писать собственные обработчики. Как это сделать на Java, нативном языке фреймворка, мы рассматривали здесь. Но писать код на Java на порядок сложнее, чем на Python. Поэтому Apache NiFi, как и многие другие Java-фреймворки, например, Spark и Flink, позволяет использовать этот более легкий язык. Чтобы Python-код заработал в Java-приложении, необходимо обеспечить взаимодействие процессов Java и Python. Это делается с помощью библиотеки Py4J, которая выполняет роль моста между Python и Java, позволяя программам Python, работающим в интерпретаторе Python, динамически получать доступ к объектам Java в JVM. Методы вызываются таким образом, будто объекты Java находятся в интерпретаторе Python, например, доступ к коллекциям Java можно получить через стандартные методы Python-коллекций. Аналогично Py4J работает в обратную сторону, позволяя Java-программам вызывать объекты Python. API Python в Apache Spark и Flink тоже используют эту библиотеку, о чем мы писали здесь и здесь.

Чтобы разработать собственный процессор Apache NiFi на Python, надо использовать классы FlowFileTransform и RecordTransform.

API FlowFileTransform предоставляет механизм маршрутизации и преобразования FlowFile на основе его атрибутов, а также его текстового или двоичного содержимого. А RecordTransform API предоставляет механизм маршрутизации и преобразования отдельных записей, например, JSON, AVRO или CSV. Для реализации FlowFileTransform класс Python должен наследоваться от класса nifiapi.FlowFileTransform и реализовывать метод transform(ProcessContext, InputFlowFile), который возвращает объект FlowFileTransformResult. Простая реализация FlowFileTransform может выглядеть так:

from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult

class WriteHelloWorld(FlowFileTransform):
    class Java:
        implements = ['org.apache.nifi.python.processor.FlowFileTransform']
    class ProcessorDetails:
        version = '0.0.1-SNAPSHOT'

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def transform(self, context, flowfile):
        return FlowFileTransformResult(relationship = "success", contents = "Hello World", attributes = {"greeting", "hello"})

В этом участке кода метод transform() будет принимать два аргумента: контекст типа nifiapi.properties.ProcessContext и потоковый файл InputFlowFile.В результате возвращается FlowFileTransformResult, который указывает, в какую связь следует передать FlowFile, обновленное содержимое FlowFile и любые атрибуты, которые следует добавить в FlowFile или перезаписать. Отношение – это обязательный аргумент, а содержимое — необязательный. Если содержимое FlowFile не подлежит обновлению, его контент не надо указывать или можно указать как None. Исходное содержимое FlowFile не следует возвращать, поскольку это аналогично передаче None, но более затратно из-за записи во FlowFile. Аналогично, следует опустить атрибуты, если их не надо записывать. Параметр context типа nifiapi.properties.ProcessContext можно использовать для определения конфигурации, например имени процессора с помощью метода context.getName() и значений свойств с помощью методов context.getProperties() и context.getProperty(String propertyName). Однако, метод getProperty(String) возвращает не строковое представление настроенного значения, а объект PythonPropertyValue. Это позволяет интерпретировать значение свойства по-разному. Например, PythonPropertyValue.getValue() возвращает строковое представление значения, а PythonPropertyValue.asInteger() возвращает None или целочисленное представление значения.

Поэтому, например, PythonPropertyValue.getTimePeriod(nifiapi.properties.TimeUnit ) может использоваться для получения настроенного значения в виде некоторого периода времени. Например, если для свойства timeout установлено значение 30 секунд, можно использовать его, вызвав context.getProperty(«timeout»).asTimePeriod(TimeUnit.MILLISECONDS). Результат вернет значение 30000. 

Метод PythonPropertyValue.asControllerService()можно использовать для получения службы контроллера, которую может использовать процессор. Объект PythonPropertyValue также позволяет вызвать метод evaluateAttributeExpressions(attributeMap=None). Это можно использовать для оценки настроенного языка выражений. Например, если значение переменной ${filename} используется для свойства, можно использовать его метод context.getProperty(«my property»).evaluateAttributeExpressions(flowFile).getValue() для оценки выражения языка выражений NiFi, о котором мы писали здесь, а затем получить строковое представление значения.

Поскольку FlowFile в Python является прокси-сервером  Java-объекта InputFlowFile, можно использовать следующие методы:

  • getContentsAsBytes() — возвращает содержимое FlowFile в виде массива байтов. Этот метод загружает все содержимое FlowFile в массив байтов на стороне Java, а затем отправляет копию на сторону Python. В результате содержимое FlowFile буферизуется в памяти дважды: один раз в куче Java и один раз в процессе Python. Поэтому использовать этот метод надо очень осторожно.
  • getContentsAsReader() — возвращает Java-объект BufferedReader, который можно использовать для чтения содержимого FlowFile по одной строке за раз. Хотя это применимо только к текстовому содержимому, оно позволяет избежать загрузки всего содержимого FlowFile в память. Однако, каждый вызов readLine() требует вызова Java, поэтому производительность ниже, чем у метода getContentsAsBytes().
  • getSize() — возвращает количество байтов в содержимом FlowFile;
  • getAttribute(String name) – возвращает значение атрибута FlowFile с заданным именем или None, если FlowFile не имеет атрибута с таким именем;
  • getAttributes() — возвращает словарь Python, ключами которого являются имена атрибутов FlowFile, а значениями — значения связанных атрибутов.

После того, как процессор выполнил свою задачу, он возвращает экземпляр nifiapi.flowfiletransform.FlowFileTransformResult. Конструктор имеет единственный обязательный позиционный аргумент — отношение (relationship) для  маршрутизации FlowFile. Кроме того, если содержимое FlowFile обновляется, оно возвращается через аргумент content. Любые атрибуты FlowFile, которые необходимо добавить или изменить, можно дополнительно предоставить с помощью аргумента attribute.

Использование API RecordTransform

Хотя API FlowFileTransform позволяет работать с FlowFile,  с помощью API RecordTransform разработчик может взаимодействовать с конкретной записью. Например, если FlowFile состоит из множества записей JSON, процессор RecordTransform  можно использовать для обработки каждой из них. Для этого надо создать класс на базе RecordTransform, а также реализовывать следующий метод def transform(self, context, record, schema, attributemap), который вернет объект RecordTransformResult.

Объект context является реализацией того же самого объекта ProcessContext, который используется в процессоре FlowFileTransform, рассмотренном выше. Это словарь Python, который представляет запись, с которой нужно работать, независимо от исходного формата данных. Такая независимость обеспечивает переносимость кода.

Связанный объект schema является экземпляром Java-объекта org.apache.nifi.record.serialization.RecordSchema, который представляет собой схему данных. Однако, вызовы схемы выполняются через сокет на стороне Java и поэтому являются дорогостоящими. Сигнатура метода предоставляет расширение attributemap, которое имеет два метода:

  • getAttribute(String name) – возвращает значение атрибута FlowFile с заданным именем или None, если FlowFile не имеет атрибута с таким именем;
  • getAttributes() – возвращает словарь Python, ключами которого являются имена атрибутов FlowFile, а значениями — значения связанных атрибутов.

Эти два метода идентичны методам класса InputFlowFile, рассмотренного выше, что позволяет attributemap предоставить его Python-свойство PythonPropertyValue для вычислений с помощью языка выражений. Например, можно определить имя поля записи, которое будет использоваться для какой-либо операции, вызвав метод

field_name = context.getProperty(“Field Name”).evaluateAttributeExpressions(attributemap).getValue()

Этот метод должен вернуть экземпляр nifiapi.recordtransform.RecordTransformResult, конструктор которого принимает четыре необязательных именованных аргумента:

  • record — преобразованная версия записи. Если запись не предоставлена ​​или если указано None, входная запись будет удалена из выходной.
  • schema — преобразованная схема. Если не задано, схема будет выведена. Однако, если схема указана, привязкой является схема, а не данные. Поэтому при отсутствии поля в схеме оно будет удалено из данных. А если в схеме есть поле, а в данных нет соответствующего значения, предполагается, что это поле имеет значение None.
  • relationship — имя связи, к которой направляется запись. Если не задано, результат направляется в отношение успех (success). Можно реализовать другие отношения, помимо успеха и неудачи (failure), чтобы маршрутизировать записи. Например, пометить запись как валидную или невалидную.
  • partition – раздел. По умолчанию все записи в данном входящем FlowFile будут записаны в один выходной FlowFile, т.е. в преобразованную версию записи, для поля результата не возвращается значение None. Раздел используется. Когда нужно разделить входящие данные на отдельные выходные FlowFiles. Например, входящие данные имеют поле страна, и нужен отдельный выходной FlowFile для каждой страны. Для этого надо вернуть словарь Python для аргумента partition, который выглядит примерно так {‘country’: record[‘country’]}. Если раздел имеет более одного поля в словаре, все поля в словаре должны иметь одинаковое значение для двух записей, чтобы записи могли быть записаны в один и тот же выходной FlowFile.

Помимо разработки собственного процессора, дата-инженеру также надо его настроить. Как это сделать, рассмотрим в следующий раз.

Больше подробностей про администрирование и эксплуатацию Apache NiFi в проектах построения эффективных ETL-конвейеров потоковой аналитики больших данных вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://nifi.apache.org/documentation/v2/
  2. https://www.py4j.org/
Поиск по сайту