Сегодня разберем, как автоматизировать наполнение озера данных на HDFS через загрузку таблиц из реляционной базы MySQL в Hive с помощью Apache NiFi. Какие процессоры NiFi следует использовать и зачем предварительно разделять таблицу Apache Hive.
Пример ETL-конвейера на процессорах Apache NiFi
Apache NiFi часто используется дата-инженерами в качестве средства автоматизации и управления потоком данных между системами. Он предоставляет пользовательский веб-интерфейс для создания, мониторинга и управления потоками данных. Сбор данных с помощью вызовов REST API позволяет собрать потоковые данные в режиме реального времени. В частности, здесь мы описывали, как организовать процесс веб-скрейпинга данных с сайтов для загрузки информации в озеро данных на HDFS с помощью Apache NiFi.
Аналогично, можно загрузить таблицы из реляционной базы данных MySQL в NoSQL-хранилище Apache Hive, которое работает поверх HDFS, позволяя обращаться к данными в распределенной файловой системе Hadoop с помощью стандартных SQL-запросов. Можно передавать по одной или нескольким таблицам из MySQL в HDFS, каждый раз модифицируя поток данных NiFi вручную. Но если такая рутинная задача периодически повторяется, ее следует автоматизировать. В этом случае поможет поток NiFi, который может обнаруживать список новых таблиц MySQL и автоматически принимать их в Hive, т.е., фактически загружать данные в HDFS.
Для этого необходимо создать сервис передачи данных для загрузки данных из MySQL в Hive. Предположим, периодичность сбора данных будет равна 5 минут. Сервис должен принимать таблицы постепенно на основе метки времени или целочисленного значения, или же получать данные полностью. Чтобы не создавать задание приема каждый раз, когда поступает новая таблица, список таблиц для приема должен создаваться автоматически.
Сперва создадим таблицу MySQL, которая будет определять данные для извлечения из MySQL. В Hive создадим партиционированную таблицу со столбцом разделения, куда будет записываться день (дата) выполнения задания извлечения данных. Благодаря наличию готовых процессоров в NiFi создание конвейера, который передает поток данных из MySQL в Apache Hive сводится к настройке их конфигураций. Например, для процессора CaptureChangeMySQL, который отслеживает двоичный лог MySQL и идентифицирует операцию изменения данных, нужно заменить хосты MySQL и расположение драйвера. Также требуется задать имя пользователя с правами администратора или root.
Также пригодится процессор QueryDatabaseTable, который создает запрос выбора SQL или использует предоставленный оператор и выполняет его для выборки всех строк, значения которых в указанном столбце или столбцах максимального значения больше, чем ранее замеченные максимумы. Результат запроса возвращается в формат AVRO. Язык выражений поддерживается для нескольких свойств, но входящие соединения запрещены. Реестр переменных можно использовать для предоставления значений любого свойства, содержащего язык выражений. Если для выполнения этих запросов необходимо использовать атрибуты потокового файла, для этой цели можно использовать процессоры GenerateTableFetch и/или ExecuteSQL. Из-за потоковой передачи в NiFi поддерживаются произвольно большие наборы результатов. Процессор QueryDatabaseTable можно запланировать для запуска по таймеру или выражению cron, используя стандартные методы планирования. Но он предназначен для работы только на основном узле. Атрибут потокового файла querydbtable.row.count указывает, сколько строк было выбрано.
Для загрузки данных в Apache Hive следует использовать процессор PutHiveQL. При этом надо установить пул соединений JDBC для Hive и создать таблицу, куда будут записаны данные потокового файла. Для этого необходимо настроить JDBC-драйвер Hive для потока NiFi с помощью HiveConnectionPool, который предоставляет службу пула подключений к базе данных для Apache Hive. Соединения могут быть запрошены из пула и возвращены после использования.
В настройках HiveConnectionPool следует задать все необходимые значения настроечных параметров: URL-адрес подключения к базе данных и учетные данные. Свойство Ресурсы конфигурации Hive предполагает путь к файлу конфигурации hive-site.xml.
Для заполнения в Hive-таблице столбца разделения с датой выполнения запроса используем готовый NiFi-процессор UpdateRecord, который преобразует входную метку времени UNIX в формат «ГГГГ-ММ-ДД ЧЧ: мм: сс» и добавляет ее в качестве нового столбца. А процессор ConvertAvroToParquet преобразует данные AVRO из процессора QueryDatabaseTable в колоночном формате формат Parquet, который более эффективен для HDFS с точки зрения хранения и поиска данных. В заключение отметим, что для выполнения CRUD-операций в Hive-таблицах следует использовать столбцовый формат ORC.
Код курса
NOSQL
Ближайшая дата курса
Продолжительность
ак.часов
Стоимость обучения
0 руб.
Больше подробностей про тонкости администрирования и эксплуатации Apache NiFi для построения эффективных ETL-конвейеров потоковой аналитики больших данных вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Эксплуатация Apache NIFI
- Hadoop SQL администратор Hive
- Интеграция Hadoop и NoSQL
- Hadoop для инженеров данных
Источники