Python для Apache NiFi с NiPyAPI

Python NiFi инженерия данных разработка Data Flow NiPyAPI, Apache NiFi проектирование потока данных, Apache NiFi для дата-инженеров примеры курсы обучение, обучение инженеров данных Apache NiFi, Школа Больших Данных Учебный Центр Коммерсант

Как расширить возможности Apache NiFi, используя Python: знакомимся с библиотекой NiPyAPI. Возможности, принципы работы и примеры использования NiPyAPI в управлении средой NiFi: очистка от неиспользуемых компонентов.

Python в Apache NiFi

Хотя официальная поддержка Python ожидается в релизе 2.0, о чем мы писали здесь, использовать этот язык программирования в Apache NiFi можно уже давно. Например, процессор ExecuteScript позволяет использовать язык сценариев для использования API NiFi для выполнения следующих задач:

  • чтение содержимого и/или атрибутов из входящего FlowFile;
  • создание нового FlowFile (с родительским элементом или без него);
  • запись содержимого и/или атрибутов в исходящий FlowFile;
  • взаимодействие с ProcessSession для передачи FlowFiles в отношения;
  • чтение/запись в State Manager для отслеживания переменных во время выполнения процессора.

Справедливости ради, стоит отметить, что Python-движок, указанный в списке доступных скриптовых движков процессора ExecuteScript, на самом деле является Jython, а не Python. Поэтому при его использовании нельзя импортировать чистые CPython-модули, такие как pandas.

Поэтому для Python-скрипта, который использует другие библиотеки и выдает выходные данные, можно использовать Execute Process, который выполнит сценарий Python на компьютере, используя полную python-библиотеку, и выходные данные станут файлом потока. Оператор устанавливает тип скрипта на Python и в разделе тело скрипта вводит скрипт.

При этом можно использовать Nifi-Python-Api — богатый клиентский SDK Apache NiFi Python, который имеет 3 уровня поддержки Python для работы с Apache NiFi:

  • Высокоуровневые демонстрации и примеры сценариев;
  • Клиентский SDK среднего уровня для типичных сложных задач;
  • Клиентские SDK низкого уровня для полной реализации API NiFi и отдельных подпроектов.

По сути, NiPyAPI – это Python-библиотека для взаимодействия с инстансами NiFi Она имеет следующие функциональные возможности:

  • Подробная документация полного SDK на всех уровнях;
  • Оболочки CRUD-операций для общих областей задач, таких как группы процессоров, процессоры, шаблоны, клиенты реестра, сегменты реестра, потоки реестра и т. д.
  • Удобные функции для задач инвентаризации, такие как рекурсивное получение всего холста или плоского списка всех групп процессов;
  • Поддержка планирования и очистки потоков, служб контроллера и соединений;
  • Поддержка выборки и обновления реестров переменных;
  • Поддержка импорта/экспорта версионных потоков из NiFi-Registry;
  • Конфигурации Docker Compose для тестирования и развертывания;
  • Развертывание интерактивной среды по сценарию и защищенная конфигурация для целей тестирования и демонстрации.

Познакомившись с NiPyAPI, далее рассмотрим примеры использования этой библиотеки.

Примеры использования NiPyAPI

Чтобы использовать NiPyAPI, его сперва следует установить с помощью менеджера пакетов pip:

pip install nipyapi

Следующий пример показывает, как автоматизировать поиск неиспользуемых параметров, например, после рефакторинга, когда они были перемещены в другой контекст параметра или по-другому реализованы. При вводе параметров вновь вводимые чувствительные значения необходимо устанавливать вручную. Чтобы найти их, не проверяя вручную, можно запустить следующий Python-скрипт, использующий методы библиотеки NiPyAPI:

def get_unused_parameters(app):
    app_pg_groups = nipyapi.canvas.list_all_process_groups(name_to_id[app])
    # Enable controllers and process groups
    for pg in app_pg_groups:
        if (pg.parameter_context is not None):
            #print("Processing: "+ str(pg.parameter_context.component.name))
            param_context = nipyapi.nifi.ParameterContextsApi().get_parameter_context(pg.parameter_context.component.id)
            params=param_context.component.parameters
            for param in params:
                if len(param.parameter.referencing_components) == 0:
                    print("Context: " + str(pg.parameter_context.component.name)+". Parameter: " + str(param.parameter.name) + " is not being used")
                if param.parameter.sensitive is True and param.parameter.value is None:
                    print("Context: " + str(pg.parameter_context.component.name)+". Sensitive parameter " + str(param.parameter.name) + " has no value set")
         else:
             print("Process group: " + str(pg.component.name) + " has no parameter context assigned")

А неиспользуемые службы контроллера NiFi можно найти с помощью следующего метода:

def get_unused_controller_services(app):
    app_pg = nipyapi.canvas.get_process_group(identifier_type='id',identifier=name_to_id[app])
    # Enable controllers and process groups
    cs = nipyapi.nifi.FlowApi().get_controller_services_from_group(id=app_pg.id, include_ancestor_groups=False, include_descendant_groups=True).controller_services
    for service in cs:
        if len(service.component.referencing_components)==0:
            print("Controller service " + str(service.component.name) + " has no referencing components. Parent pg: "+id_to_name[service.component.parent_group_id])

Таким образом, Python-скрипты с методами библиотеки NiPyAPI позволяют улучшить работу с NiFi: автоматизировать развертывание и сократить усилия, необходимые для очистки среды от компонентов, которые больше не используются.

Читайте в нашей новой статье, как взаимодействуют процессы Python с Java, на которой написан Apache NiFi.

Освойте администрирование и использование Apache NiFi для построения эффективных ETL-конвейеров потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://technology.amis.nl/big-data-database/apache-nifi-automating-tasks-using-nipyapi/
  2. https://nipyapi.readthedocs.io/en/latest/
  3. https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-scripting-nar/1.17.0/org.apache.nifi.processors.script.ExecuteScript/additionalDetails.html
  4. https://community.cloudera.com/t5/Community-Articles/Python-Script-in-NiFi/ta-p/246406
Поиск по сайту