Поскольку Apache NiFi является распределенной системой стека Big Data, для него очень значимы вопросы балансировки нагрузки. Поэтому сегодня разберем важную для обучения дата-инженеров и администраторов кластера NiFi тему по балансировке нагрузки и распространению данных в этом потоковом ETL-фреймворке.
Как происходит балансировка нагрузки в кластере Apache NiFi
До версии 1.8 в Apache NiFi была проблема с сохранением состояния в случае переключения основного узла, из-за которого процессоры не могли получать данные из систем-источников. При внесении пользователем каких-либо изменений узел отключался от кластера, а файл flow.xml становится недействительным. При этом узел не мог подключиться к кластеру, пока администратор не скопирует XML-файл вручную с узла. В версии NiFi 1.8.0 балансировка нагрузки была добавлена между каждым процессором в любом соединении, а также реализован способ настроить автоматическую балансировку нагрузки между узлами.
Также в Apache NiFi есть функция, позволяющая выводить из эксплуатации и отключать узлы от кластера, а также выгружать все их данные. Это особенно важно для Kubernetes и динамического масштабирования для обеспечения эластичности. Эластичное масштабирование нужно для рабочих нагрузок, которые меняются в течение дня или года. Чтобы соответствовать соглашениям об уровне обслуживания и срокам, можно увеличивать масштаб в пиковые периоды и уменьшайте его для экономии расходов. Эта функция балансировки нагрузки демонстрирует возможности распределения большого набора данных или захвата неструктурированных данных в другом центре обработки данных, разделения и передачи, а затем использования привязки атрибутов к узлу для восстановления данных в определенном порядке.
Это случается, когда есть большой массив данных, экспортируемых из системы, например, дамп реляционной базы данных в один файл размером несколько терабайт. Нужен один узел NiFi, чтобы загрузить этот файл, а затем разбить его на более мелкие фрагменты, передать и отправить другим узлам для обработки. Иногда для упорядочивания записей требуется использовать атрибут для хранения связанных фрагментов, к примеру, одной таблицы на одном узле.
Также большим файлом может быть какой-либо архив, например, zip-файл, содержащий множество файлов разных типов, которые надо направить к одному и тому же узлу NiFi на основе корневого имени файла или типов вложенных файлов. Поэтому при настройке соединения между процессорами, которые обрабатывают потоковые файлы, дата-инженеру следует выбрать стратегию балансировки:
- без балансировки нагрузки — это значение по умолчанию;
- циклический режим (Round Robin) – потоковые файлы будут распределяться по кластеру в циклическом режиме. Если узел отключен от кластера или не может связаться с узлом, данные, поставленные в очередь для этого узла, будут автоматически перераспределены на другие узлы.
- один узел, когда все потоковые файлы будут распределены на один узел в кластере. Если же этот узел отключен от кластера или с ним невозможно связаться, данные, поставленные в очередь для этого узла, останутся в очереди до тех пор, пока узел снова не станет доступным. Все соединения, для которых настроена эта стратегия, будут отправлять данные на один и тот же узел.
Стратегии балансировки и распределения данных
В случае стратегии одного узла пользователь может указать, на какой узел передавать данные, но в эластично масштабируемом кластере узлы могут добавляться или удаляться произвольно, что делает выбранное значение недействительным и требует ручного вмешательства. Кроме того, это будет означать, что сохранение этого значения в реестре потоков или в шаблоне сделает конфигурацию недействительной в любой другой среде. Поэтому можно задать выбор узла на усмотрение фреймворка и отправлять данные на этот узел.
Также можно применить разделение по атрибуту потокового файла. Все потоковые файлы с одинаковым значением указанного атрибута будут распределены на один и тот же узел в кластере. Если узел назначения отключен от кластера или не доступен, данные не будут передаваться на другой узел, а запишутся в очередь, ожидая доступности нужного узла. Если топология кластера изменится, это приведет к повторной балансировке данных. Согласованное хэширование следует использовать, чтобы избежать перераспределения всех данных, когда узел присоединяется к кластеру или покидает его.
Когда данные передаются с одного узла на другой, след источника данных должен работать так же, как и с Site-to-Site: если данные передаются от узла A к узлу B, то узел A должен предоставить событие происхождения SEND и DROP. Узел B должен предоставить событие происхождения RECEIVE, которое должно заполнить идентификатор исходной системы, чтобы пользователи могли сопоставить два потоковых файла. Причем принимающая система не должна назначать тот же UUID, что и отправляющая система.
Чтобы проиллюстрировать, как работает балансировка нагрузки в кластере Apache NiFi, далее рассмотрим небольшой пример. Предположим, есть трехузловой кластер NiFi с балансировщиком нагрузки. Основной поток начинается с процессора ListenHTTP, который запускает HTTP-сервер и прослушивает заданный базовый путь для преобразования входящих запросов в ожидании файлов из AWS S3. У балансировщика нагрузки есть пул, который отправляет полученные файлы в кластер NiFi. Если отправить потоковые файлы на один узел NiFi, соединение после процессора ListenHTTP будет балансировать нагрузку по всему кластеру, но также это будет единственная точка отказа.
Если не использовать балансировщик нагрузки, клиент несет ответственность за обнаружение проблем с доставкой данных и переключение ее на другой узел NiFi. Поскольку в NiFi каждый узел в кластере выполняет один и тот же файл flow.xml.gz, куда в режиме реального времени записывается все, что пользователь помещает на холст в GUI. Когда есть процессоры ListSFTP или GetSFTP, работающие на каждом узле, происходит дублирование данных и потенциальные проблемы, поскольку каждый узел пытался использовать одни и те же данные. В этом сценарии следует настроить процессор для выполнения только на основном узле, а затем использовать LB-соединения для немедленного перераспределения этих потоковых файлов в кластере NiFi, прежде чем выполнять дальнейшую обработку.
Процессор ListSFTP, который выполняет список файлов, находящихся на SFTP-сервере и для каждого найденного файла создает новый FlowFile с атрибутом имени файла, равным имени файла на удаленном сервере. Затем это можно использовать в сочетании с FetchSFTP для извлечения этих файлов. Итак, ListSFTP создает потоковые файлы с нулевым содержимым и атрибутами с подробной информацией о том, откуда можно получить их содержимое. Эти 0-байтовые FlowFiles быстро распределяют нагрузку по всему кластеру, где процессор FetchSFTP извлекает фактическое содержимое для файла данных, специфичного для FlowFile, и вставляет его в потоковый файл. Такой тип настройки также позволяет избежать единой точки отказа, поскольку потеря выбранного в данный момент основного узла, где работает потребитель данных, приведет к тому, что новый узел будет выбран в качестве нового основного узла. Этот новый первичный узел считывает состояние из поставщика состояния кластера и начинает перечисление с того места, где остановился обработчик списка предыдущего выбранного узла.
Освойте практику администрирования и эксплуатации Apache NiFi для построения эффективных ETL-конвейеров потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Источники