Процессоры-слушатели в Apache NiFi

NiFI процессоры, NiFI примеры курсы обучение, NiFI для инженера данных, потоковая обработка с NiFI, Школа Больших Данных Учебный Центр Коммерсант

Какие процессоры Apache NiFi позволяют принимать и обрабатывать данные из различных источников по разным протоколам, и как избежать сбоев при их использовании с удержанием открытых соединений и порты.

Listen-процессоры Apache NiFi

В Apache NiFi есть целый набор процессов-слушателей, которые принимают и обрабатывают входящие данные из различных источников по разным протоколам. Хотя каждый из них используется для работы с разными протоколами и типами входящих данных, их основная функция заключается в прослушивании входящих соединений и приеме информации, которая затем может быть обработана и направлена в дальнейшие процессоры или системы. В версии 2.0 к таким процессорам NiFi относятся следующие:

  • ListenFTP — прослушивает входящие FTP-соединения и принимает файлы, загружаемые через протокол FTP;
  • ListenHTTP — принимает HTTP-запросы и данные, отправляемые по HTTP-протоколу;
  • ListenOTLP — слушает данные, поступающие из OpenTelemetry Protocol (OTLP), используемого для передачи телеметрии;
  • ListenRELP — ожидает и принимает логи по протоколу RELP (Reliable Event Logging Protocol);
  • ListenSlack — принимает сообщения и события, отправляемые через платформу Slack;
  • ListenSMTP — слушает входящие сообщения электронной почты по протоколу SMTP;
  • ListenSyslog — принимает данные системных логов (syslog) от устройств и приложений;
  • ListenTCP — слушает TCP-соединения и принимает передаваемые по TCP-протоколу данные;
  • ListenTCPRecord – слушает TCP-соединения и обрабатывает записи, передаваемые по TCP;
  • ListenTrapSNMP — принимает SNMP Trap-сообщения для обработки сетевых событий событие или условий, например, обрыв связи, аутентификация, и сбой питания и пр.;
  • ListenUDP — слушает UDP-соединения и принимает данные, передаваемые по UDP-протоколу;
  • ListenUDPRecord – слушает UDP-соединения и обрабатывает записи, передаваемые по UDP-протоколу;
  • ListenWebSocket — принимает данные, поступающие по протоколу WebSocket.

Большинство из этих процессоров работают в режиме постоянного ожидания входящих данных или событий, поддерживая открытое соединение с внешними источниками информации. Это позволяет им моментально реагировать на поступающие данные и передавать их на дальнейшую обработку в NiFi.

Например, ListenFTP запускает FTP-сервер, который прослушивает указанный порт, преобразует входящие файлы в FlowFiles и передает их на следующий процессор через отношение success. Процессор ListenSMTP реализует облегченный SMTP-сервер для произвольного порта, позволяя NiFi прослушивать входящую электронную почту. Однако, этот сервер не выполняет никакой проверки электронной почты, а его потоки управляются базовым smtp-сервером, поэтому ListenSMTP не поддерживает более одного потока.

Процессор ListenHTTP запускает HTTP-сервер и прослушивает указанный базовый путь для преобразования входящих HEAD- и POST-запросов в FlowFiles. Попытка обработки GET-, PUT-, DELETE-, OPTIONS- и TRACE-запросов приведут к ошибке и коду статуса HTTP-ответа 405; а вызов CONNECT выдаст ошибку и код статуса HTTP-ответа 400. Процессор ListenTCP прослушивает входящие TCP-соединения и считывает данные из каждого, используя разделитель строк в качестве разделителя сообщений. По умолчанию каждое сообщение создает один FlowFile, но это можно изменить, увеличив размер пакета до большего значения для повышения пропускной способности. Размер буфера приема должен быть больше сообщения. Процессор можно настроить на использование службы контекста SSL, чтобы разрешать только безопасные соединения. Когда подключенные клиенты представляют сертификаты для взаимной аутентификации TLS, отличительные имена издателя и субъекта клиентского сертификата добавляются к исходящим FlowFiles в качестве атрибутов. Процессор не выполняет авторизацию на основе значений отличительных имен, но поскольку эти значения прикреплены к исходящим FlowFiles, авторизация может быть реализована на основе этих атрибутов.

Аналогично работает процессор ListenTCPRecord: входящие TCP-соединения и считывает данные из каждого соединения с помощью настроенного считывателя записей, а также записывает их во FlowFile. Тип выбранного считывателя записей будет определять, как клиенты должны отправлять данные. Например, при использовании считывателя Grok для чтения логов клиент может поддерживать открытое соединение и непрерывно передавать потоковые данные. Но при использовании считывателя JSON клиент не может отправить массив документов JSON, а затем отправить другой массив по тому же соединению, т.к. записи будут считываться из соединения в режиме блокировки и будут устаревать в соответствии с тайм-аутом чтения, указанным в процессоре. Если время чтения заканчивается или при чтении возникает какая-либо другая ошибка, соединение закрывается, а все записи, считанные до этого момента, будут обрабатываться в соответствии с настроенной стратегией ошибок чтения: отменить или передать далее. Когда клиенты поддерживают TCP-соединение открытым, количество параллельных задач для процессора ListenTCPRecord должно соответствовать максимальному количеству разрешенных TCP-соединений, имея отдельную задачу для обработки каждого из них.

Процессор ListenUDP прослушивает пакеты датаграмм на указанном порту. По умолчанию создается FlowFile на каждую датаграмму. Чтобы повысить пропускную способность можно увеличить свойство Max Batch Size, которое указывает количество датаграмм для пакетирования в одном FlowFile. Этот процессор можно ограничить прослушиванием датаграмм с определенного удаленного хоста и порта, указав свойства Sending Host и Sending Host Port. Иначе он будет прослушивать датаграммы со всех хостов и портов. Похожим образом работает и процессор ListenUDPRecord, который прослушивает пакеты датаграмм на заданном порту и считывает содержимое каждой датаграммы с помощью настроенного Record Reader. Затем каждая запись записывается во FlowFile с помощью настроенного Record Writer.

Процессор ListenWebSocket действует как конечная точка сервера WebSocket для приема клиентских подключений. FlowFiles передаются в нисходящие отношения в соответствии с полученными типами сообщений, поскольку сервер WebSocket, настроенный с этим процессором, получает клиентские запросы.

Подводя итог процессорам Apache NiFi с префиксом Listen, стоит отметить, что все они предназначены для приема данных в режиме реального времени через различные протоколы, могут удерживать открытые соединения и занимать порты. Однако, это не обязательно приводит к повышенному потреблению ресурсов. В большей степени на производительность и стабильность при использовании этих процессоров влияет количество открытых соединений, объем принимаемых данных, конфигурация самого процессора и аппаратные характеристики сервера, на котором работает приложение NiFi. Поэтому для предотвращения сбоев следует внимательно настраивать процессоры согласно требованиям нагрузки, следить за потреблением ресурсов и равномерно распределять нагрузки между несколькими экземплярами NiFi.

Освойте администрирование и использование Apache NiFi для построения эффективных ETL-конвейеров потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://nifi.apache.org/documentation/v2/
Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту