Зачем нужны средства записи и чтения в процессорах Apache NiFi и как они работают: разбираемся на примере QueryRecord, PartitionRecord и RouteText. Сходства и отличия этих процессоров, а также тонкости их использования в задачах дата-инженерии.
Процессор QueryRecord в Apache NiFi
Напомним, в потоковом ETL-маршрутизаторе Apache NiFi процессоры используются для прослушивания входящих данных, извлечения их из внешних источников и публикации в места назначения. Также эти обработчики позволяют маршрутизировать, преобразовывать или извлекать информацию из потоковых файлов (FlowFile).
Apache NiFi предоставляет более 400 готовых процессоров. Также дата-инженер может написать свой собственный обработчик, о чем мы рассказывали здесь. Одним из готовых процессоров, обеспечивающих обработку записей, является QueryRecord, который оценивает один или несколько запросов SQL по содержимому FlowFile. Затем результат SQL-запроса становится содержимым выходного потокового файла. Этот процессор можно использовать, например, для фильтрации по полям или строкам и преобразования данных. Столбцы можно переименовывать, выполнять простые вычисления и агрегации и пр. Процессор настраивается с помощью службы контроллера чтения записей и службы записи записей, чтобы обеспечить гибкость форматов входящих и исходящих данных.
Процессор QueryRecord должен быть настроен хотя с одним определяемым пользователем свойством. Имя Свойства — это Отношение, в которое направляются данные, а его значение — это оператор ANSI SQL SELECT на базе оптимизатора Apache Calcite, который используется для указания того, как входные данные должны быть преобразованы/фильтрованы. Если преобразование завершается неудачно, исходный FlowFile направляется в отношение «сбой». В случае успеха выбранные данные будут направлены в связанное отношение. Если средство записи записей решит наследовать схему из записи, наследуемая схема будет из набора результатов, а не из входной записи. Это позволяет одному экземпляру процессора QueryRecord иметь несколько запросов, каждый из которых возвращает другой набор столбцов и агрегатов. Однако в результате производная схема не будет иметь имени схемы, поэтому важно, чтобы сконфигурированный модуль записи не пытался записать имя схемы в качестве атрибута при наследовании схемы от записи.
Другой интересный пример использования процессора QueryRecord – разветвление одного потока данных на несколько разных. Например, нужно проанализировать системные логи, выбрав типа ошибка (ERROR), чтобы отправить их в механизм оповещения: уведомление в Slack, письмо по электронной почте, смс-информирование и пр. Можно зарегистрировать все эти запросы одновременно, используя только один процессор QueryRecord, и обрабатывать каждый из этих потоков так, как это нужно.
Таким образом, процессор QueryRecord позволяет рассматривать каждый потоковый файл ка таблицу базы данных, и запускать SQL-запрос к ней, предоставляя результаты в виде выходного FlowFile. Благодаря тому, что процессор использует средства чтения и записи сообщений, можно использовать его для преобразования данных из одного формата в другой. Например, JSON Reader и Avro Writer, чтобы читать входящий JSON и записывать результаты в AVRO-формате.
Наличие средств чтения и записи в это процессоре позволяет дата-инженеру не беспокоиться о преобразовании данных в нужный формат: можно использовать данные любого формата, если для них есть средство чтения. Apache NiFi предоставляет множество различных средств чтения записей: CSVReader, JsonTreeReader, AvroReader и пр. Также есть инструменты чтения системных логов (SyslogReader), Parquet-файлов, XML и множества других форматов.
Примечательно, что средство записи системных логов SyslogWriter отсутствует, т.к. в большинстве случаев, запрошенные данные системных журналов должны быть более структурированными/поддающимися анализу. Поэтому их часто записывают в формате JSON. В Apache NiFi для этого есть средство записи JsonRecordSetWriter. Впрочем, если нужно записать выходные данные были в формате Syslog, можно применить FreeFormTextRecordSetWriter в качестве средства записи, настроив свойство необработанного сообщения Raw message на значение true, чтобы запись содержала поле с именем _raw, содержащее необработанное сообщение системного журнала. Затем следует настроить свойство text средства записи как просто ${_raw}.
Процессор PartitionRecord
Если процессор QueryRecord является универсальным, то PartitionRecord менее мощный, однако, его также можно использовать в Apache NiFi для создания нескольких потоков из одного входящего потока. Недостаток мощности в нем компенсируется производительностью и простотой. PartitionRecord позволяет группировать подобные данные, используя RecordPath – простой синтаксис на основе JSONPath и XPath. Этот процессор имеет один вход и много выходов. Но в отличие от QueryRecord, который может направлять одну запись во множество различных выходных потоковых файлов, PartitionRecord будет направлять каждую запись входящего FlowFile ровно в один исходящий потоковый файл.
PartitionRecord позволяет маршрутизировать данные в соответствии со значением в записи, а также группировать их для хранения. Получает данные, ориентированные на запись (т. е. данные, которые могут быть прочитаны настроенным средством чтения записей) и оценивает один или несколько путей записи для каждой записи во входящем FlowFile. Затем каждая запись группируется с другими подобными записями, и для каждой группы подобных записей создается FlowFile. Подобие записей определяется заданными пользователем свойствами, значением которого является RecordPath. Две записи считаются одинаковыми, если они имеют одинаковое значение для всех настроенных путей записи. Поскольку все записи в заданном выходном потоковом файле имеют одинаковое значение для полей, указанных в RecordPath, атрибут добавляется для каждого поля.
Как и QueryRecord, PartitionRecord — процессор, ориентированный на записи. Поэтому дата-инженеру следует настроить как средство чтения записей, так и средство записи записей, т.е. определить свойства Record Reader и Record Writer. Также следует сообщить процессору, как разделить данные, используя RecordPath. Для этого надо добавить одно или несколько определяемых пользователем свойств. Имя свойства становится именем атрибута FlowFile, который добавляется к каждому FlowFile. Значением свойства является выражение RecordPath, которое NiFi будет оценивать для каждой записи. Результат определяет, к какой группе или разделу относится запись.
Процессор RouteText
Рассмотренные процессоры QueryRecord и PartitionRecord обеспечивают большую гибкость и мощность с высокой производительностью. Но иногда приходится иметь дело с данными странных форматов, которые не ориентированы на запись. Разумеется, можно создать собственный пользовательский считыватель записей на Java или использовать Scripted Record Reader с помощью Groovy или Python. Но можно работать с этими данными как с необработанным текстом, используя процессор RouteText. Он направляет текстовые данные на основе набора пользовательских правил. Каждая строка входящего файла FlowFile сравнивается со значениями, указанными в пользовательских свойствах. Механизм сравнения текста с определяемыми пользователем свойствами, задается стратегией сопоставления. Затем данные маршрутизируются в соответствии с этими правилами, маршрутизируя каждую строку текста отдельно.
Таким образом, процессор RouteText позволяет направлять строки текста в определенное отношение, не разбивая данные на отдельные строки. Это обеспечивает высокую производительность. Также это позволяет группировать строки текста и отделять их от непохожих на них строк. По сути, эта группировка дает те же возможности, что и PartitionRecord, но для необработанных текстовых данных.
Процессор позволяет работать с регулярными выражениями и использовать внутренний язык выражений NiFi для оценки строки, чтобы обрабатывать кодирование и декодирование текста, экранирование, синтаксический анализ дат и другие мощные функции. Также некоторые ориентированные на записи процессоры позволяют использовать DSL-язык RecorPath, о чем мы рассказываем в новой статье.
Аналогично QueryRecord, процессор RouteText позволяет разветвлять один входной поток текста и разветвлять на множество потоков, разбивать текст, фильтровать или группировать строки. Также RouteText похож на PartitionRecord, предоставляя возможность разделять данные, ориентированные на записи.
В заключение отметим, что три рассмотренных процессора Apache NiFi далеко не единственный инструменты обработки записей в этом фреймворке. Все они чрезвычайно гибкие и мощные, обеспечивают высокую производительность. Это достигается за счет хранения множества крошечных записей, объединенных в более крупные потоковые файлы до 3 МБ, т.к. NiFi имеет определенные ограничения по размеру FlowFile, о чем мы писали здесь и здесь. Но если данные не соответствуют типовой спецификации формата, использовать процессоры на основе записей невозможно. Но даже в этом случае важно убедиться, что эти данные не разбиты на множество крошечных потоковых файлов, так как это сильно снижает производительность.
Узнайте больше подробностей по администрированию и эксплуатации Apache NiFi для построения эффективных ETL-конвейеров потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Источники
- https://nifi.apache.org/docs/nifi-docs/html/user-guide.html
- https://medium.com/@nifi.notes/building-an-effective-nifi-flow-queryrecord-cca5ba51afd5
- https://medium.com/@nifi.notes/building-an-effective-nifi-flow-partitionrecord-b342a8efc50c
- https://medium.com/@nifi.notes/building-an-effective-nifi-flow-routetext-5068a3b4efb3