Как расширить возможности Apache NiFi, используя Python: знакомимся с библиотекой NiPyAPI. Возможности, принципы работы и примеры использования NiPyAPI в управлении средой NiFi: очистка от неиспользуемых компонентов.
Python в Apache NiFi
Хотя официальная поддержка Python ожидается в релизе 2.0, о чем мы писали здесь, использовать этот язык программирования в Apache NiFi можно уже давно. Например, процессор ExecuteScript позволяет использовать язык сценариев для использования API NiFi для выполнения следующих задач:
- чтение содержимого и/или атрибутов из входящего FlowFile;
- создание нового FlowFile (с родительским элементом или без него);
- запись содержимого и/или атрибутов в исходящий FlowFile;
- взаимодействие с ProcessSession для передачи FlowFiles в отношения;
- чтение/запись в State Manager для отслеживания переменных во время выполнения процессора.
Справедливости ради, стоит отметить, что Python-движок, указанный в списке доступных скриптовых движков процессора ExecuteScript, на самом деле является Jython, а не Python. Поэтому при его использовании нельзя импортировать чистые CPython-модули, такие как pandas.
Поэтому для Python-скрипта, который использует другие библиотеки и выдает выходные данные, можно использовать Execute Process, который выполнит сценарий Python на компьютере, используя полную python-библиотеку, и выходные данные станут файлом потока. Оператор устанавливает тип скрипта на Python и в разделе тело скрипта вводит скрипт.
При этом можно использовать Nifi-Python-Api — богатый клиентский SDK Apache NiFi Python, который имеет 3 уровня поддержки Python для работы с Apache NiFi:
- Высокоуровневые демонстрации и примеры сценариев;
- Клиентский SDK среднего уровня для типичных сложных задач;
- Клиентские SDK низкого уровня для полной реализации API NiFi и отдельных подпроектов.
По сути, NiPyAPI – это Python-библиотека для взаимодействия с инстансами NiFi Она имеет следующие функциональные возможности:
- Подробная документация полного SDK на всех уровнях;
- Оболочки CRUD-операций для общих областей задач, таких как группы процессоров, процессоры, шаблоны, клиенты реестра, сегменты реестра, потоки реестра и т. д.
- Удобные функции для задач инвентаризации, такие как рекурсивное получение всего холста или плоского списка всех групп процессов;
- Поддержка планирования и очистки потоков, служб контроллера и соединений;
- Поддержка выборки и обновления реестров переменных;
- Поддержка импорта/экспорта версионных потоков из NiFi-Registry;
- Конфигурации Docker Compose для тестирования и развертывания;
- Развертывание интерактивной среды по сценарию и защищенная конфигурация для целей тестирования и демонстрации.
Познакомившись с NiPyAPI, далее рассмотрим примеры использования этой библиотеки.
Примеры использования NiPyAPI
Чтобы использовать NiPyAPI, его сперва следует установить с помощью менеджера пакетов pip:
pip install nipyapi
Следующий пример показывает, как автоматизировать поиск неиспользуемых параметров, например, после рефакторинга, когда они были перемещены в другой контекст параметра или по-другому реализованы. При вводе параметров вновь вводимые чувствительные значения необходимо устанавливать вручную. Чтобы найти их, не проверяя вручную, можно запустить следующий Python-скрипт, использующий методы библиотеки NiPyAPI:
def get_unused_parameters(app): app_pg_groups = nipyapi.canvas.list_all_process_groups(name_to_id[app]) # Enable controllers and process groups for pg in app_pg_groups: if (pg.parameter_context is not None): #print("Processing: "+ str(pg.parameter_context.component.name)) param_context = nipyapi.nifi.ParameterContextsApi().get_parameter_context(pg.parameter_context.component.id) params=param_context.component.parameters for param in params: if len(param.parameter.referencing_components) == 0: print("Context: " + str(pg.parameter_context.component.name)+". Parameter: " + str(param.parameter.name) + " is not being used") if param.parameter.sensitive is True and param.parameter.value is None: print("Context: " + str(pg.parameter_context.component.name)+". Sensitive parameter " + str(param.parameter.name) + " has no value set") else: print("Process group: " + str(pg.component.name) + " has no parameter context assigned")
А неиспользуемые службы контроллера NiFi можно найти с помощью следующего метода:
def get_unused_controller_services(app): app_pg = nipyapi.canvas.get_process_group(identifier_type='id',identifier=name_to_id[app]) # Enable controllers and process groups cs = nipyapi.nifi.FlowApi().get_controller_services_from_group(id=app_pg.id, include_ancestor_groups=False, include_descendant_groups=True).controller_services for service in cs: if len(service.component.referencing_components)==0: print("Controller service " + str(service.component.name) + " has no referencing components. Parent pg: "+id_to_name[service.component.parent_group_id])
Таким образом, Python-скрипты с методами библиотеки NiPyAPI позволяют улучшить работу с NiFi: автоматизировать развертывание и сократить усилия, необходимые для очистки среды от компонентов, которые больше не используются.
Читайте в нашей новой статье, как взаимодействуют процессы Python с Java, на которой написан Apache NiFi.
Освойте администрирование и использование Apache NiFi для построения эффективных ETL-конвейеров потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Источники
- https://technology.amis.nl/big-data-database/apache-nifi-automating-tasks-using-nipyapi/
- https://nipyapi.readthedocs.io/en/latest/
- https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-scripting-nar/1.17.0/org.apache.nifi.processors.script.ExecuteScript/additionalDetails.html
- https://community.cloudera.com/t5/Community-Articles/Python-Script-in-NiFi/ta-p/246406