Apache Storm обычно сравнивают со другими популярными фреймворками потоковой аналитики больших данных: Spark и Flink. Однако для несложной обработки событий дата-инженер может заменить эти платформы более легким инструментом маршрутизации потоковых данных в виде Apache NiFi. Сегодня сравним Apache NiFi co Storm и разберем практический пример, когда предпочтительнее именно его для обработки пакетированных сообщения Kafka с помощью ориентированных на запись процессоров.
Потоковая обработка событий для дата-инженера: Apache NiFi vs Storm
Одним из главных достоинств Apache Storm является возможность обработки данных действительно в реальном времени – в отличие от Spark, здесь практически полностью отсутствует задержка. Впрочем, как и Spark, Storm реализует концепцию направленного ациклического графа (DAG) с представлением потокового приложения в виде топологии, позволяя обрабатывать более 1 миллиона кортежей в секунду на узел кластера. Однако, несмотря на развитие Storm, текущая версия которого, 2.2.0, выпущена в июне 2020 года, эта платформа также имеет недостатки. В частности, если необходимо обеспечить сохранение состояния (stateful) Storm-приложения и в точности однократную доставку сообщений (exactly once), следует использовать Trident API, который позволяет работать с микропакетами, как Apache Spark [1]. Кроме того, внутренние концепции Storm не так просты для понимания из-за особенностей узлов DAG-графа и взаимоотношений между ними: необходимо знать разницу между spaut и bolt, а также отношения между ними.
Поэтому для относительно простых случаев потоковой обработки событий дата-инженер может использовать Apache NiFi – популярный инструмент маршрутизации потоков данных в реальном времени с наглядным веб-GUI. С помощью готовых и собственных обработчиков (процессоров) можно в наглядно построить потоковый конвейер подобно топологиям Storm. В частности, процессор (processor) Apache NiFi можно рассматривать как аналог вычислительного узла DAG-топологии в Storm, называемого болт (Bolt). Благодаря наличию GUI, который упрощает взаимодействие пользователя с NiFi, можно создавать поток данных и отслеживать любые ошибки, а также показатели обработки сообщений, а также определять особенности движения успешно и неуспешно обработанных данных между процессорами. Например, направить потоковый файл в процессор регистратора в случае сбоя и автоматически завершить обрабатывающий процессор.
Таким образом, NiFi позволяет быстро создавать конвейеры обработки данных вместо довольно трудоемкого процесса разработки топологий Storm. Также развертывание NiFi в кластере проще, чем Storm, при сохранении отказоустойчивости и масштабируемости. Кроме того, в отличие от Storm, у которого есть узел Nimbus для развертывания топологии и узлы Supervisor, управляющие рабочими процессами, все узлы NiFi имеют схожие конфигурации и обеспечивают отказоустойчивость за счет автоматического выбора лидера из любого из доступных узлов [2].
Поэтому многие компании предпочитают выбирать более простые фреймворки вместо Apache Storm. В частности, известный вендор Big Data решений, корпорация Cloudera рекомендует вместо Storm аналогичный по возможностям фреймворк Flink или NiFi. Преимущества последнего особенно заметны в кейсах простой обработки событий в реально времени, когда потоки NiFi можно создать в веб-GUI за считанные часы вместо месяцев разработки Java-кода. Одни из таких кейсов мы и рассмотрим далее.
Ориентированные на запись процессоры и пакетирование сообщений Kafka: кейс Cloudera
Выбор между Storm и NiFi в рассматриваемом примере происходил в контексте других Big Data платформ: Kafka и HBase. В частности, клиентам Cloudera необходимо хранить огромные объемы данных, обрабатывать их в режиме реального времени или пакетно, чтобы далее передавать результаты аналитики другим приложениям. К примеру, согласованные данные нужно обрабатывать в течение дня. Сам конвейер построен следующим образом [3]:
- внешнее приложение публикует события в разные топики Apache Kafka;
- Storm или NiFi выполняет простые преобразования с этими данными и записывает результаты в HBase;
- другие системы используют информацию из HBase для создания аналитических отчетов;
- согласованные данные хранятся в HBase в окончательном виде (без изменения) не менее 2-х лет.
Ключевым вопросом замены Storm на NiFi была производительность без дополнительных затрат на оборудование. Изначально кластер Apache Storm включал 4 узла и этого оказалось более чем достаточно для NiFi, который показал пропускную способность в 4 раза больше. Причиной такой оптимизации потоковая парадигма NiFi. В рассматриваемом кейсе главным источником потоковых данных является платформа потоковой обработки событий Apache Kafka. Поэтому процессоры NiFi с ориентацией на запись (Record Oriented) показывают отличную пропускную способность для этого случая[3].
Напомним, Record Oriented процессоры впервые были представлены в Apache NiFi версии 1.2, чтобы упростить управление структурированными данными. Хотя FlowFile не привязывается к структуре данных, что делает этот фреймворк универсальным ко множеству сценариев использования, иногда привязка к формату необходима. Процессоры, ориентированные на запись, применяют набор десериализаторов (Record Readers) и сериализаторов (Record Writers) для эффективного чтения, преобразования и записи данных. Чтобы такой процессор, пользователь указывает схему каждого типа данных одним из следующих способов [4]:
- найти в одном из трех поддерживаемых реестров схем от Apache NiFi, Hortonworks или Confluent. Ссылки схемы (имя и версия) определены в каждом потоковом файле как атрибут или закодированы в содержании.
- задать в конфигурации самого сериализатора/десириализатора.
Возвращаясь к рассматриваемому примеру, отметим, что преобразования через процессоры JoltTransformRecord, ConvertRecord, QueyRecord, PartitionRecord и ScriptedTransformRecord используют записи. Это означает, что вместо обработки одного файла в каждом процессоре можно объединить несколько файлов в один FlowFile. Первоначальный DAG-граф из процессоров в NiFi выглядел так [3]:
- (Source) ConsumeKafka ->
- (Transform) ConvertAvroToJSON -> EvaluateJsonPath -> RouteOnAttribute -> SplitJSON -> TransformJSON -> MergeContent ->
- (Target) PutHBase
Эти 8 процессоров NiFi обеспечивали более 30 ГБ за 5 минут, что соответствует пропускной способности Storm. Однако, заменив некоторые процессоры из этого набора на ориентированные на запись аналоги, число узлов в DAG-графе немного сократилось:
- (Source) ConsumeKafkaRecord ->
- (Transform) PartitionRecord -> RouteOnAttribute -> TransformRecord ->
- (Target) PutHBaseRecord
На первый взгляд кажется, что исключение 3-х процессоров (теперь их 5 вместо 8) – это несущественно. Однако, это упрощает и оптимизирует общее количество задач и ресурсов NiFi, которые могут быть потрачены на другие процессоры. Таким образом, пропускная способность конвейера выросла более чем в 5 раз, до 160 ГБ за 5 минут. Ключевую роль в этом сыграла возможность пакетирования нескольких сообщений Kafka с помощью параметров batch.size и linger.ms в один потоковый файл NiFi. При этом реестр схем для процессоров, ориентированных на запись, не использовался – вместо этого была разработана внутренняя служба контроллера схемы в NiFi. С помощью этого сервиса сериализаторы и десериализаторы NiFi собирали схемы данных централизовано из одного места.
Узнайте больше про администрирование и применение Apache NiFi для современной дата-инженерии на специализированных курсах для разработчиков, ИТ-архитекторов, инженеров данных, администраторов, Data Scientist’ов и аналитиков Big Data в нашем лицензированном учебном центре обучения и повышения квалификации в Москве:
Источники
- https://storm.apache.org/2020/06/30/storm220-released.html
- https://blog.maxar.com/earth-intelligence/2017/data-pipelines-with-nifi-as-a-storm-alternative
- https://blog.cloudera.com/replace-and-boost-your-apache-storm-topologies-with-apache-nifi-flows/
- https://medium.com/@abdelkrim.hadjidj/democratizing-nifi-record-processors-with-automatic-schemas-inference-4f2b2794c427