От оркестрации и синхронизации конвейеров обработки данных до управления хранилищами, включая хранение состояний для stateful-приложений: сложности проектирования архитектуры потоковой обработки событий и способы их решения.
Основные сложности проектирования современной архитектуры данных
Из-за принципиальных отличий потоковой парадигмы обработки данных от пакетной, что разбиралось здесь, задача проектирования дата-конвейеров сильно усложняется, т.к. редко где на практике используется только один подход. При большей простоте пакетного подхода, его не всегда достаточно для бизнес-нужд. Например, организовать пакетную выгрузку из прикладных систем в корпоративное хранилище на базе того же PostgreSQL или Greenplum с помощью AirFlow достаточно просто при соблюдении некоторых условий. Однако, подобная инфраструктура не очень хорошо подойдет для потоковой аналитики больших данных в реальном времени, включая мониторинг событий пользовательского поведения и/или генерацию рекомендаций с помощью ML-моделей. Однако, далеко не каждая компания обладает ресурсами и желанием их вкладывать в такие масштабные аналитические проекты. Любую инициативу следует рассматривать, прежде всего, с точки зрения экономической эффективности: какой результат она принесет для бизнеса на текущий момент, а также в долгосрочной перспективе. И если точного ответа на эти вопросы у бизнес-заказчика нет, начинать ИТ-проект, даже при наличии технических возможностей его выполнить, не имеет смысла.
Впрочем, подобные сомнения свойственны мелким и средним компаниям, а крупному бизнесу даже не требуется доказывать ценность аналитики больших данных. В случае корпоративного масштаба архитектору важно предложить такое решение, которое впишется в системный ландшафт и сможет расти вместе с бизнесом, гибко меняя свою производительность и набор функциональных возможностей. Поэтому при проектировании архитектуры данных надо рассматривать предлагаемые решения со следующих точек зрения:
- оркестрация конвейеров обработки данных;
- управление хранилищами данных, включая хранилища состояний для stateful-приложений;
- синхронизация конвейеров и согласованность времени в потоковой обработке.
Далее подробно рассмотрим каждый из этих пунктов.
Оркестрация конвейеров обработки данных
Как уже было отмечено выше, организовать запуск пакетных ETL-процессов по расписанию довольно просто с технической точки зрения. Однако, сегодня бизнес-приложения все больше работают по событиям в реальном времени, а не только по расписанию. Потребность в свежих данных означает, что конвейеры должны работать все чаще. Поэтому пакетные процессы заменяются streaming-подходами микропакетами с часовыми или даже минутными интервалами, а также обработкой данных в (почти) реальном времени. Организовать оркестрацию потоковых ETL-конвейеров намного сложнее, чем пакетных, поскольку события реальной жизни редко бывают идемпотентными, в отличие от запланированных по расписанию запусков пакетных процессов. Откат и повторное воспроизведение потоковых конвейеров может стать причиной проблем с качеством и целостностью данных.
Чтобы предотвратить такие проблемы, дата-инженеру при проектировании конвейера нужно учесть все крайние случаи и сценарии, требующие его повторного воспроизведения. На практике это не всегда возможно, поскольку даже при высоком уровне зрелости процессного управления некоторые события в реальном мире возникают неожиданно и меняют ход выполнения процессов. Например, сформировать новый маршрут доставки заказа из-за изменений его состава после оформления не всегда возможно: приходится отменять прежний заказ и запускать новый, хотя для конечного потребителя его заказ это является одной и той же ценностью. Однако для микросервисной системы с архитектурой, управляемой событиями, конвейер их обработки не предполагает возврата собранного заказа к предыдущему статусу, проведения доплат и перерасчета логистики. Подобные ситуации возникают в реальности довольно часто, поэтому при проектировании архитектуры данных важно определить не только happy path, но и все возможные ответвления от него. Это задача сама по себе является unbounded, т.к. не имеет конца, поскольку инциденты в реальной жизни постоянно пополняют реестр событий EDA-системы и меняют ее архитектуру.
Управление хранилищами данных и состояний
Для потоковых конвейеров также характерна проблема stateful-приложений, когда необходимо организовать хранение промежуточных данных так, чтобы оптимизировать преобразование и хранение последующих вычислений, включая агрегаты, с минимальной задержкой. Большинство технологий потоковой передачи событий, например, брокеры сообщений и streaming-платформы типа Apache Kafka и RabbitMQ, направлены на сохранение минимального объема состояния при нехватке памяти и максимизацию пропускной способности. Фреймворки распределенной обработки типа Apache Spark и Flink, наоборот, хранят большие объемы состояния в памяти, что требует больше аппаратных ресурсов и может привести к катастрофическим потерям всего состояния. Кроме того, организация и обслуживание хранилища состояний увеличивает и без того огромные накладные расходы на управление рабочими процессами данных, включая инфраструктуру оркестрации конвейеров, коннекторы источников и приемников, управление безопасностью и пр.
Современные архитектуры данных, такие как LakeHouse, пытаются совместить преимущества пакетного и потокового подходов к хранению информации, чтобы масштабировать и обрабатывать большие объемы состояний для потоковых соединений и агрегаций. Databricks, GCP, Upsolver SQLake и другие провайдеры облачных платформ, реализующие эту технологию, заявляют о полностью управляемом сервисе, который автоматически масштабируется под нагрузку и настраивается под задачи любого клиента. Однако, это не означает отсутствие работы со стороны архитектора и дата-инженеров по проектированию конвейеров обработки данных. В частности, необходимо синхронизовать хранилище состояний напрямую с данными , делая возможными идемпотентные преобразования, чтобы путешествовать во времени и воспроизводить даже потоковые конвейеры по мере необходимости.
Синхронизация конвейеров и согласованность времени в потоковой обработке
Чтобы организовать путешествия во времени, нужно полностью представить эволюцию записи данных. Для этого необходимо знать время события – когда данные были созданы и время фиксации – когда данные были зафиксированы в приемнике: базе данных, очереди сообщений, файловом хранилище или временной таблице стриминг-фреймворка типа Apache Spark SQL или Flink SQL. Хотя невозможно записать событие до того, как оно произошло, в потоковых конвейерах это случается из-за рассинхрона с часовыми поясами или случайных операций по обновлению меток времени. При конфликте времени события со временем фиксации для записи, надо принять решение о том, какое из них использовать для агрегирования и объединения данных. При агрегации по времени события нужно включить все события, которые произошли в это время, в т.ч. те, которые еще не зафиксированы, т.е. поступившие с опозданием. Их придется ждать и пересчитывать агрегацию. Хотя время событий синхронизирует одновременно создаваемые данные, бесконечное окно прибытия данных делает конвейеры непригодными для производственного развертывания, поскольку придется хранить много данных о состоянии. Это требует много ресурсов, занимая значительный объем памяти, и не позволяет четко определить SLA с привязкой ко времени. Если вместо этого соединить данные на основе времени фиксации, одна и та же запись может быть недоступна при задержке поступления данных в соединяемые источники (таблицы). Такая задержка часто встречается в ETL-процессах, поскольку преобразования требуют времени. Таким образом, время фиксации по своей сути не синхронизировано, и основанная на нем аналитика может давать неправильные выводы. В любом случае, решение о выборе базиса (время события или время его фиксации в платформе данных) должно исходить от бизнеса, а не от технологий. Архитектор решает, насколько старыми могут быть данные, чтобы они по-прежнему считались релевантными. Обеспечить синхронизацию конвейера можно с помощью материализованных представлений для соединений и агрегаций, а также упорядочивания данных в корпоративном озере или хранилище. Для этого можно ввести отметку времени для одновременных событий, генерируемых одним и тем же процессом, которая остается максимально близкой ко времени события, не допуская обратной заливки данных (за прошлое время). Так можно добиться полного упорядочения данных, гарантировать конечную идемпотентность и однократную согласованность.
Эффективно выполнять такие преобразования для потоковых процессов можно, дифференцировав типы заданий:
- несинхронизированное задание использует время фиксации данных в источниках, постоянно обрабатывает любые зафиксированные данные и гарантирует, что каждое событие будет обработано только один раз. Несинхронизированные конвейеры не ждут прибытия данных и не гарантируют, что все преобразования в записи данных остаются синхронизированными для дальнейшего использования.
- Синхронизированное задание использует время события, которое всегда остается неизменным и распространяется как поле метаданных по всем конвейерам. Синхронизированные конвейеры ждут даже опоздавшие данные и обеспечивают ациклические зависимости. Конвейерная синхронизация позволяет включать в рабочие процессы неявные зависимости, определяемые данными. Нижестоящие задания ждут не завершения вышестоящих заданий, как в случае пакетных оркестраторов и запросов к базам данных, а прибытия соответствующих входных данных.
Чтобы оценить единовременной согласованности конвейеров обработки данных, необходимо рассмотреть сценарии, которые основаны на синхронизации. Например, когда надо дополнить события в потоке пакетными данными из таблицы, где данные пакета и потока имеют общую сущность и временную связь. Такое встречается в онлайн-рекламе, где событие клика по объявлению должно дополниться контекстом с логикой показа рекламы и исторического взаимодействия с пользователем, что хранится в базе данных. Для соединения с потоковыми событиями кликов можно взять материализованное представление таблицы взаимодействия с пользователем. Синхронизированное соединение потока с монотонно возрастающим временем события произойдет только при поступлении этого события. Если поток задерживается, соединение и нисходящий конвейер ждут. Так можно корректно определить связь между рекламой и кликом — независимо от того, кликает ли пользователь на рекламное объявление через несколько секунд или через полчаса после его просмотра.
При соединении двух источников данных, которые не имеют общего временного контекста, атрибуция не важна. Например, чтобы узнать локацию пользователей, которые вошли на сайт в настоящее время, надо соединить таблицу онлайн-пользователей с таблицей базы данных, которая содержит адреса пользователей. Этому конвейеру не нужно ждать обновлений таблицы сущностей пользователей, поскольку онлайн-пользователи меняются гораздо быстрее, чем обновления адресов отдельных пользователей. В таком случае работает несинхронизированное задание, чтобы последующие приложения надежно получали необходимые им агрегаты, даже если один из источников данных задерживается или остается статичным.
Аналогично соединение синхронизированных и несинхронизированных заданий позволяют выполнять плавную обработку обратных заполнений, когда время события и время фиксации записей данных сильно различаются, но обработка старых данных может произойти в любое время и не должна задерживать другие процессы, потребляющие поток. Вариативность подобных сценариев предполагает использование разных технологий, которые позволяют воплотить их в жизнь, начиная от пакетных ETL-оркестраторов типа Apache AirFlow и мощных брокеров сообщений таких как Kafka до стриминговых фреймворков (Spark Structured Streaming, Flink) и потоковых баз данных для хранения состояний stateful-приложений.
Освойте все эти подходы и инструменты аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Практическое применение Big Data аналитики для решения бизнес-задач
- Архитектура Данных
- Практическая архитектура данных
Источники