В этой статье для обучения дата-инженеров рассмотрим, как организовать сбор измененных данных из реляционных СУБД, построив CDC-конвейер с помощью Apache NiFi. А также разберем, зачем процессоры этого потокового ETL-маршрутизатора используют технологию веб-хуков.
ETL-конвейер для DWH и Data Lake
В общем случае сбор данных из реляционных и нереляционных источников и построение конвейера приема можно представить следующими шагами:
- подключение к источникам данных, что обычно требует учетные данные и сведения о соединении, такие как сокет (IP-адрес или имя хоста, и порт удаленной системы);
- извлечение данных через SQL-запросы или методы API;
- преобразование данных после их извлечения, чтобы они соответствовали формату системы-приемника, корпоративного хранилища или озера данных. Например, трансформация вложенных структур, переименование столбцов или выполнение вычислений.
- загрузка преобразованных данных в целевое хранилище или озеро данных, включая перемещение данных во временные промежуточные таблицы с последующим переводом в окончательные места назначения.
Кроме этих последовательных этапов также дата-инженер решает вопросы мониторинга и устранения неполадок на конвейере данных, чтобы убедиться в его бесперебойной работе, и исправить возникающие проблемы. Для этого ведется журналирование всех событий и оповещение о критических ошибках. Также регулярно выполняется планирование и автоматизация процессов запуска конвейера данных в нужные моменты времени, чтобы обеспечивать непрерывный сбор и обработку данных без ручного вмешательства. Наконец, дата-инженер выполняет задачи оптимизации таких ETL-конвейеров, отслеживая производительность и внося изменения для повышения их эффективности и масштабируемости. Для этого важно учитывать вопросы управления данными, включая их качество и происхождение.
Чаще всего при практической реализации подобных ETL-конвейеров выполняется не полная загрузка данных, а только их изменений, чтобы сократить нагрузку на сеть и системы-источники. Это соответствует концепции захвата измененных данных (CDC, Change Data Capture), о чем мы недавно писали здесь. Этот метод отслеживания и фиксации изменений в данных по мере их возникновения часто применяется для синхронизации корпоративного хранилища или озера данных с системами-источниками. Технически CDC-подход может быть реализован через триггеры базы данных, что часто используется коммерческими и открытыми решениями, такими как Debezium, Oracle Change Data Capture, PowerExchange CDC от Informatica, Hevo Data, IBM Infosphere, Qlik Replicate, Talend, Oracle GoldenGate, StreamSets и пр. Также можно организовать захват измененных данных с помощью Apache NiFi. Как это сделать, мы рассмотрим далее.
CDC с Apache NiFi
Apache NiFi — это не просто потоковый ETL-маршрутизатор, но и мощный инструмент создания CDC-конвейера. В частности, захвата изменений в реляционных базах данных с помощью Apache NiFi можно реализовать следующим образом:
- установить JDBC-драйверы для реляционной базы данных, чтобы приложение могло получить доступ к хранящимся в ней данным;
- настроить пул соединений и службу контроллера JDBC для реляционной СУБД, задав хост, порт и учетные данные пользователя;
- использовать процессор NiFi под названием GenerateTableFetch, чтобы запросить СУБД и получить моментальный снимок текущих данных. С помощью SQL-запроса задать столбцы, которые нужно отслеживать, и таблицу, где они находятся.
- настроить процессор ConvertRecord для преобразования данных из формата реляционной таблицы в AVRO, JSON или другие форматы, а процессоры RecordWrite и PutFile поможет записать данные в локальную файловую систему. Подробнее про эти процессоры NiFi, ориентированные на записи, мы рассказывали здесь.
- Используя процессор ListenHTTPRecord можно прослушивать любые обновления в реляционной таблице и извлекать нужные столбцы с помощью процессора ExtractText;
- Сравнить текущее и предыдущее состояние данных поможет процессор CompareRecord, включая идентификацию строк, которые были добавлены, изменены или удалены;
- Процессор RouteOnAttribute направит обновленные данные в место назначения, чтобы маршрутизировать их по типу изменений, например, вставка, обновление или удаление записей;
- процессор PutDatabaseRecord пригодится для записи обновленных данных обратно в реляционную базу данных, если она является основой корпоративного хранилища, например, Greenplum. А процессор PutS3Object поможет сохранить их в облачное объектное хранилище AWS S3, которое часто используется в качестве озера данных.
Помимо активного использования готовых или самописных процессоров Apache NiFi для отслеживания и передачи изменений, также можно использовать веб-хуки или перехватчики (webhoocks). Этот способ позволяет исходной системе предоставлять другим приложениям информацию в режиме реального времени. Веб-хуки обычно применяются для уведомления приложения об изменении в другом сервисе. Например, так клиентские приложения мессенджеров получают новые сообщения от сервера. Также работают уведомления о новых письмах по электронной почте. Или, к примеру, сервис обработки платежей может отправить веб-хук в CRM-систему интернет-магазина о том, что платеж конкретного клиента был успешно проведен. Веб-хуки обычно реализуются с использованием HTTP-запросов POST, которые включают полезную нагрузку JSON с отправляемыми данными. Принимающее приложение должно уметь обрабатывать информацию из входящего веб-хука, извлекая ее из полезной нагрузки. Таким образом, веб-хуки позволяют синхронизировать данные в реальном времени и могут быть более эффективным решением, чем традиционные HTTP-опросы. Поэтому веб-хуки часто применяются в системах интернета вещей (IoT, Internet of Things), мобильных и веб-приложениях.
Многие процессоры Apache NiFi основаны на концепции веб-хуков. В частности, процессор ListenHTTP запускает HTTP-сервер и прослушивает заданный базовый путь для преобразования входящих запросов в потоковые файлы. Под капотом он предоставляет простой веб-перехватчик HTTP в потоке. Аналогично процессор PutSlack с помощью веб-хуков позволяет передавать данные в корпоративный мессенджер Slack, используя URL-адрес для посылки POST-запросов, чтобы отправить сообщения в канал.
Узнайте больше подробностей по администрированию и эксплуатации Apache NiFi для построения эффективных ETL-конвейеров потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Источники