Как работать с протоколом MQTT в Apache NiFi: потоковая обработка данных для IoT

Apache NiFi IoT Internet of Things интернет вещей дата-инженерия примеры курсы обучение, инженер данных Apache NiFi лучшие практики примеры курсы обучение, Apache NiFi дата-инженер курсы обучение, инженерия данных курсы примеры обучение, инженер данных Big Data Apache NiFi примеры курсы обучение, Школа Больших Данных Учебный центр Коммерсант

В прошлой статье про обновление Apache NiFi мы писали, что в новой версии 1.18.0 улучшено взаимодействие с протоколом MQTT, который активно используется в системах интернета вещей. Сегодня разберем более подробно, как наладить сбор и публикацию данных в MQTT-топики с помощью процессоров Apache NiFi, а также разберем, что такое брокер HiveMQ.

Что такое MQTT и при чем здесь HiveMQ

Напомним, MQTT — это облегченный протокол обмена сообщениями публикации/подписки, который идеально подходит для подключения удаленных устройств с небольшим объемом кода и минимальной пропускной способностью сети. В 2013 году MQTT стал стандартным протоколом обмена сообщениями OASIS для Интернета вещей (Internet Of Things, IoT). В обновленном Apache NiFi 1.18 поддерживается протокол v5 для существующих процессоров MQTT с использованием клиентской библиотеки HiveMQ.

HiveMQ — это MQTT-брокер и клиентская платформа обмена сообщениями для быстрого, эффективного и надежного перемещения данных на подключенные IoT-устройства и обратно. Он использует протокол MQTT для мгновенной двунаправленной передачи данных между IoT-устройством и корпоративными системами. HiveMQ создан для решения ключевых технических проблем приложений Интернета вещей, включая низкую надежность и слабую масштабируемость, а также задержи доставки и высоких эксплуатационных расходов.

Экземпляры MQTT-брокера HiveMQ масштабируются вместе с базовым оборудованием: асинхронный и многопоточный подход позволяет одновременно подключать до 10 миллионов устройств, сохраняя высокую пропускную способность и минимальную задержку. В отличие от HTTP-протокола, работающего по принципу запрос-ответ, HiveMQ и MQTT основаны на архитектуре издатель/подписчик. Поэтому общий сетевой трафик уменьшается из-за отсутствия постоянного опроса клиентов. Размер MQTT-сообщения значительно меньше, чем у HTTP, поэтому объем данных, проходящих через сеть, уменьшается.

HiveMQ реализует все уровни качества обслуживания MQTT, включая доставку at least once, at most once и exactly once. Поддержка HiveMQ расширенных политик хранения сообщений и автономных очередей сообщений необходима для компенсации задержки в сети. Интеграция корпоративных данных достигается за счет двунаправленной передачи данных между брокером HiveMQ MQTT и корпоративной системой, которая действует как клиент MQTT. Благодаря архитектуре издатель подписчик в MQTT-протоколе, клиент каждой корпоративной системы подписывается на данные, которые необходимо интегрировать. Реализация общей подписки MQTT в HiveMQ позволяет горизонтально масштабировать клиенты MQTT, обеспечивая масштабируемую и надежную интеграцию корпоративных приложений.

Архитектура HiveMQ основана на действительно распределенной архитектуре кластера без ведущего, что означает отсутствие единой точки отказа, и кластер может увеличиваться и уменьшаться во время выполнения без потери данных или доступности. Поддержка Kubernetes, OpenShift и DC/OS позволяет автоматически масштабировать HiveMQ в соответствии с требованиями вашего приложения IoT. Администраторы могут использовать панель мониторинга HiveMQ для мониторинга данных в режиме реального времени, проходящих через брокера и клиентов MQTT, подключенных к IoT-приложению. Администратор может просматривать каждого MQTT-клиента, отключать его, удалять сеанс MQTT и добавлять/удалять подписки. Для расширенного устранения неполадок HiveMQ позволяет создавать записи трассировки, которые можно использовать для выявления проблем и узких мест в развернутых IoT-приложениях. Общая сводная информационная панель дает операционной группе полный обзор кластера брокера в режиме реального времени и общего состояния системы.

HiveMQ предназначен для защиты данных IoT от устройства до корпоративных систем. Передача данных защищена отраслевыми стандартами, такими как TLS 1.3, безопасные веб-сокеты и современные наборы шифров. Поддержка проверки подлинности и авторизации включает в себя сертификаты X.509, имя пользователя и пароль, проверку подлинности на основе IP и API, который позволяет использовать пользовательскую логику проверки подлинности, авторизации и разрешений, например интеграцию OAuth 2.0.

Открытый API и гибкая структура расширений позволяют интегрировать HiveMQ и данные IoT в существующие корпоративные системы. Платформа расширений позволяет разработчикам быстро создавать расширения для обработки пользовательских данных, аутентификации устройств и механизмов авторизации устройств. HiveMQ также предоставляет множество готовых расширений для Kafka и других систем.

Брокер MQTT HiveMQ на 100 % соответствует спецификациям MQTT 3.1, MQTT 3.1.1 и MQTT 5. Любая клиентская библиотека, совместимая с MQTT, может использоваться с HiveMQ. HiveMQ предоставляет собственную клиентскую библиотеку Java, но также позволяет использовать библиотеки Eclipse Paho, C/C++, JavaScript или Python. Можно создать собственного клиента MQTT, избегая привязки к отдельным провайдерам.

HiveMQ можно развернуть в частном, гибридном и общедоступном облаке. Готовые образы можно развернуть в частных облаках с помощью Kubernetes, OpenShift и DC/OS. Поддерживаемые общедоступные облачные платформы включают AWS и MS Azure. HiveMQ также может изначально работать в Linux, Windows и OS X. Наконец, есть HiveMQ Cloud — облачная служба обмена IoT-сообщениями, которая упрощает развертывание и управление MQTT-платформами, позволяя быстро создавать масштабируемые и надежные кластеры облачных MQTT-брокеров.

Вспомнив, что такое MQTT и HiveMQ, рассмотрим, как это использовать с Apache NiFi .

MQTT-процессоры в Apache NiFi

В Apache NiFi для работы с MQTT-протоколом есть 2 готовых процессора:

  • ConsumeMQTT, который подписывается на топик и получает сообщения от MQTT-брокера. Он имеет множество свойств, но не имеет входных атрибутов для чтения. В качестве отношений можно указать количество полученных записей (record.count), MQTT-брокер источник сообщения (mqtt.broker), топик MQTT для получения сообщения (mqtt.topic), качество обслуживания этого сообщения (mqtt.qos), может ли это сообщение быть дубликатом сообщения, которое уже было получено (mqtt.isDuplicate) и было ли оно сохранено сервером как последнее сообщение в этом топике (mqtt.isRetained). Этот процессор не сохраняет состояние и не разрешает входящие отношения. При использовании процессора ConsumeMQTT есть смысл установить значение свойства Max Queue Size. Оно задает максимальный размер очереди и указывает максимальное количество сообщений, которые могут храниться в памяти NiFi одним экземпляром этого процессора. Высокое значение этого свойства означает, что в памяти хранится много данных.
  • PublishMQTT публикует сообщение в MQTT-топике. Этот процессор не имеет атрибутов чтения и записи. На выходе процессора дает 2 типовых отношения: успех (success), когда потоковые файлы успешно отправлены в пункт назначения и сбой (failure), когда потоковые файлы не удалось отправить по назначению. Аналогично процессору ConsumeMQTT, экземпляры процессора PublishMQTT могут вызвать интенсивное использование памяти. В частности, несколько экземпляров или высокие параметры параллелизма могут привести к снижению производительности Apache NiFi.

Чтобы продемонстрировать принципы работы этих процессоров в Apache NiFi, составим небольшой ETL-конвейер. В дополнение к ранее рассмотренным процессорам PublishMQTT и ConsumeMQTT добавим процессор GenerateFlowFile, который будет генерировать потоковые файлы для PublishMQTT. Также процессор LogAttribute пригодится для логирования исходящих потоковых файлов из PublishMQTT.

ETL IoT MQTT NiFi
ETL-конвейер для IoT с MQTT-процессорами Apache NiFi

Освойте практику администрирования и эксплуатации Apache NiFi  для построения эффективных ETL-конвейеров потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://www.hivemq.com/hivemq/mqtt-broker/
  2. https://medium.com/@nsabonyi/getting-started-with-mqtt-in-apache-nifi-64e8cde1de91
  3. https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-mqtt-nar/1.18.0/org.apache.nifi.processors.mqtt.ConsumeMQTT/
  4. https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-mqtt-nar/1.18.0/org.apache.nifi.processors.mqtt.PublishMQTT/index.html
Поиск по сайту