Самообслуживаемый ETL-конвейер с Apache Airflow и Amazon Athena: кейс hipages

DAG Apache AirFlow, Apache AirFlow примеры курсы обучение, обучение дата-инженеров, инженер данных курсы примеры обучение, запуск DAG по расписанию airflow example, инженерия данных с Apache AirFlow пример, обучение большим данным, Школа Больших Данных Учебный центр Коммерсант

Сегодня разберем опыт австралийской ИТ-компании hipages по построению самообслуживаемого ETL-конвейера с Apache Airflow и Amazon Athena, призванного обеспечить высокое качество данных и облегчить дата-инженерам управление информационными активами. Изящное решение сложных проблем управления данными с примерами SQL-запросов к корпоративному Data Lake на AWS S3.

Что не так с монолитной архитектурой платформы данных в hipages

Компания hipages – это крупнейший в Австралии маркетплейс онлайн-торговли, который объединяет покупателей с продавцами. По мере развития бизнеса, инженеры hipages столкнулись с проблемой управления качеством данных. В динамичной среде обеспечение потребностей разных продуктовых групп в данных путем создания и обслуживания различных конвейеров их обработки становится сложным. Для этого в hipages были созданы самообслуживаемые ETL-конвейеры на базе Amazon Athena и Apache Airflow.

Корпоративная платформа данных представляла собой масштабируемую систему для сбора, обработки, преобразования и интеграции наборов данных. Изначально ее основная цель была сосредоточена на сборе данных, поскольку в то время у бизнеса было немного требований к данным. Первая версия платформы данных состояла из следующих компонентов:

  • озеро данных на базе AWS S3;
  • SQL-интерфейс на базе Amazon Athena для запроса данных в S3;
  • Python-библиотека Pandas для очистки и преобразования данных в ETL;
  • Apache Airflow в качестве диспетчера рабочих процессов;
  • BI-решения на основе Looker, интегрированного с озером данных через Amazon Athena.

Множество источников передают данные в корпоративное озеро. Бизнес-пользователи и дата-аналитики обращаются к ним, создавая различные дэшборды, чтобы предоставлять ценную информацию для принятия важных управленческих решений. Чтобы сделать этот процесс эффективным, часто используемые датасеты извлечены из необработанных данных в S3, преобразованы и сохранены в другом месте облачного объектного хранилища вместе с таблицей Athena.

Внутренняя структура hipages состоит из кросс-функциональных команд, в состав каждой из которых входит дата-аналитик, выполняющий также роль эксперта в области бизнеса. Когда команде необходимо принять решение, основанное на данных, дата-аналитик обеспечивает понимание данных, анализируя данные с помощью Looker. В отличие от кросс-функциональных команд, платформа данных в hipages была полностью централизованной и монолитной. Если дата-аналитики не могли найти необходимый для анализа датасет в озере данных, приходилось обращаться к дата-инженерам данных, чтобы запросить необходимые ETL. Все ETL принадлежали платформе данных, т.е. дата-инженерам, независимо от характера бизнес-домена. Дата-инженеры использовали Pandas для создания различных ETL-задач, чтобы предоставлять запрошенные таблицы Athena аналитикам для интеграции поисковых систем.

Такая централизованная платформа данных имела следующие недостатки:

  • незнание дата-инженеров, которые реализуют ETL-процессы, особенностей предметной области, что не позволяло им выявить проблемы с качеством данных и понять некоторые нюансы логики преобразования;
  • по мере роста бизнеса внедрение и поддержка ETL-процессов стали для дата-инженеров основной работой, а на добавление новых функций в платформу данных уже не оставалось времени.

Устранить эти проблемы в hipages было решено переходом от монолитной архитектуры платформы данных к децентрализованной, в основе которой лежат концепции взаимодействующих друг с другом микросервисов и самообслуживаемых ETL-конвейеров. Как именно это было реализовано, рассмотрим далее.

 Самообслуживаемый ETL на Apache Airflow и Amazon Athena

Концепция самообслуживаемых сервисов уже частично использовалась в платформе данных hipages для BI-целей с применением Looker. Применяя подобную идею к конвейерам данных путем децентрализации владения ETL-процессов, было решено сделать их максимально доступными для дата-аналитиков, чтобы они могли получать нужные данные без привлечения инженеров посредством типовых SQL-запросов. Поэтому первоочередной задачей стало внедрение самообслуживаемого ETL-решения на основе SQL для корпоративного озера данных на базе Amazon S3.

Для этого отлично подходит интерактивный бессерверный сервис запросов Amazon Athena, который позволяет анализировать данные в Amazon S3 с помощью стандартного языка структурированных запросов SQL. Созданный на основе Apache Presto, Athena помогает избежать процессов настройки ETL-операций, работает со множеством форматов данных и поддерживает функции для комплексного анализа данных, включая объединение больших таблиц, работу оконных функций и обработку массивов данных. Для использования достаточно просто указать данные в Amazon S3, задать схему и выполнять SQL-запросы во встроенном редакторе.

В Athena есть операторы Create Table As Select (CTAS) и INSERT INTO, которые можно применить для извлечения, преобразования и загрузки (ETL) данных в Amazon S3. Дата-инженеры hipages решили использовать функцию CTAS для своих ETL-задач, немного адаптировав ее к специфике применения.

CTAS-запрос создает новую таблицу в Athena на основе результатов оператора SELECT. Athena хранит файлы данных, созданные оператором CTAS, в указанном месте в Amazon S3. Например, следующий запрос выбирает данные за 3 года, с 2020 по 2022 включительно:

CREATE table new_parquet
WITH (
format='PARQUET',
partitioned_by=array['year'],
external_location = 's3://your-bucket/data/'
)
AS SELECT
id,
date,
element,
datavalue,
mflag,
qflag,
sflag,
obstime,
substr("date",1,4) AS year
FROM source_table
WHERE
cast(substr("date",1,4) AS bigint) >= 2020
AND
cast(substr("date",1,4) AS bigint) <= 2022

Выполняя запрос на Amazon Athena, мы получаем следующие каталоги в заданном местоположении S3:

s3://your-bucket/data/year=2020/

s3://your-bucket/data/year=2021/

s3://your-bucket/data/year=2022/

Часть SELECT запроса CTAS извлекает данные из таблиц-источников (source_table) и загружает их в указанное место в виде Parquet-файлов. Здесь можно включить всю логику преобразования в запрос SELECT, используя выражение partitioned_by. В этом случае получится 3 раздела в заданном месте S3. Однако, при повторном выполнении этого SQL-запроса для другого временного диапазона, все разделы в этом расположении S3 будут перезаписаны. Дополнительным ограничением был набор следующих требований к самообслуживаемому ETL-решению:

  • оно должно работать по расписанию;
  • оно должно генерировать разделы на основе времени.

Поэтому пришлось немного модифицировать CTAS-запрос. Например, выполнить его для создания ежегодных разделов: следует запускать вышеприведенный запрос один раз в год и создавать раздел для этого года. Сперва нужно определить, сколько ежегодных разделов надо создавать в данном месте. Для этого пригодится такой CTAS-запрос:

CREATE table temp_new_parquet
WITH (
format='PARQUET',
partitioned_by=array['year'],
external_location = 's3://your-bucket/temp-data/'
)
AS SELECT
id,
date,
element,
datavalue,
mflag,
qflag,
sflag,
obstime,
substr("date",1,4) AS year
FROM source_table

Каталог temp-data/ будет содержать все ежегодные разделы, которые есть в этой таблице. Затем можно выполнить запрос SHOW PARTITIONS temp_new_parquet, чтобы получить список разделов, который содержит все ключи раздела S3. Далее можно выполнять CTAS-запросы для каждого раздела с условием WHERE для извлечения данных для этого конкретного раздела и ключом S3 для создания раздела. Например, если исходная_таблица имеет 3 годовых разделов, выполняется 3 подобных CTAS-запроса для каждого раздела. Рассмотрим пример для 2022 года, т.е. ключ раздел year равен 2022:

CREATE table new_parquet
WITH (
format='PARQUET',
partitioned_by=array['year'],
external_location = 's3://your-bucket/data/year=2022/'
)
AS SELECT
id,
date,
element,
datavalue,
mflag,
qflag,
sflag,
obstime,
substr("date",1,4) AS year
FROM source_table
WHERE year = 2022

Чтобы использовать это решение ETL в реальных бизнес-сценариях, понадобился инструмент для оркестровки и шаблонов SQL-запросов. В качестве такого средства дата-инженеры hipages выбрали Apache Airflow, поскольку он уже был частью корпоративной платформы данных. Был разработан подключаемый модуль Airflow с использованием AWS-библиотеки boto3 для программного выполнения CTAS-запросов в Amazon Athena. Представляя собой AWS SDK для Python, boto3 упрощает интеграцию приложений, библиотек и скриптов Python с такими сервисами AWS, как Amazon S3, EC2, DynamoDB и пр. В boto3 есть два разных уровня API:

  • Низкоуровневые клиентские API обеспечивают связь 1-к-1 к базовым операциям HTTP;
  • Высокоуровневые API ресурсов скрывают явные сетевые вызовы, предоставляя взамен ресурсные объекты и наборы объектов для доступа к атрибутам и выполнения действий.

Оба вида API динамически генерируют классы на основе моделей JSON, описывающие API AWS, позволяя быстро предоставлять обновления с строгой непротиворечивостью для всех поддерживаемых сервисов. Также в boto3 для клиентских и ресурсных API есть waiter-функции, которые автоматически выполняют опрос предопределенных изменений состояния ресурсов AWS, позволяя запустить инстанс Amazon EC2 и дождаться его перехода в рабочее состояние или создать новую таблицу и дождаться, когда она станет доступной для использования. Еще в boto3 есть много функций для конкретных сервисов, такие как автоматическая многопотоковая передача для AWS S3 или упрощенные условия запросов для DynamoDB.

Таким образом, возможности boto3 позволили передать SQL-запросы с целевым местоположением S3 и столбцом раздела в плагин Apache Airflow, чтобы сгенерировать преобразованную, разбитую на разделы таблицу Athena из Parquet-файлов. Условие SQL-запроса WHERE можно параметризовать с помощью шаблонов, например:

SELECT name, value, run_date
FROM source_table
WHERE run_date = '{end_year}-{end_month}-{end_day}'

Получившийся Airflow-плагин совместим с любым сложным SQL-запросом. Airflow позволяет легко ввести дату выполнения DAG в этот SQL-запрос и выполнить его как CTAS-запрос для нужной даты. Другой пример использования boto3 вместе с Apache Spark читайте в нашей новой статье.

В результате этого изящного решения дата-аналитики и инженеры могут самостоятельно строить собственные ETL-конвейеры для обращения к корпоративному озеру данных, просто написав SQL-запрос для логики преобразования данных. Все оркестровки и мониторинг ETL-конвейеров проходят через Airflow. Описанное ETL-решение использует доменно-ориентированную структуру самообслуживания для конвейеров обработки данных и приближается к децентрализованной архитектуре Data Mesh, о которой мы поговорим в следующий раз.

Все подробности администрирования и эксплуатации Apache AirFlow для организации ETL/ELT-процессов в аналитике больших данных вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://medium.com/hipages-engineering/decentralising-the-ownership-of-etls-using-amazon-athena-and-apache-airflow-cebd7105cae2
  2. https://aws.amazon.com/ru/athena/
  3. https://aws.amazon.com/ru/sdk-for-python/
Поиск по сайту