Как Apache Kafka реализует требование к атомарности транзакций с помощью координатора и журнала транзакций: принцип Atomic в ACID и его иллюстрация на UML-диаграмме последовательности публикации сообщений в раздел топика.
Транзакционная публикация сообщений в Apache Kafka
Хотя Apache Kafka не является базой данных, эта платформа потоковой передачи событий все же хранит сообщения, опубликованные продюсером в ее топики. При этом публикация выполняется транзакционно, т.е. все операции, которые связаны с публикацией сообщения (запись в лидере раздела, репликация по подписчикам, увеличение смещения) в топике Kafka должны выполниться вместе. А если произойдет сбой хотя бы одной операции в рамках транзакции, она целиком считается невыполненной, причем происходит откат всех выполненных операций, если какие-то завершились успешно.
Kafka поддерживает распределённые транзакции с полноценными ACID-гарантиями только в пределах самой платформы, включая несколько продюсеров и потребителей для разных топиков. Транзакции позволяют осуществлять атомарную запись в несколько топиков и разделов Kafka. Все сообщения, включенные в транзакцию, будут успешно записаны или записано не будет ни одно из них. Например, ошибка во время обработки может привести к прерыванию транзакции, и в этом случае ни одно из сообщений транзакции не будет доступно потребителям. Это обеспечивает атомарные циклы чтения-обработки-записи.
В Kafka за транзакции отвечают координатор транзакций и журнал транзакций. Координатор транзакций — это модуль, работающий внутри каждого брокера Kafka. Журнал транзакций — это внутренний топик Kafka под названием __transaction_state. Каждый координатор владеет некоторым подмножеством разделов в журнале транзакций, для которых его брокер является лидером. Каждый идентификатор транзакции сопоставляется с определенным разделом журнала транзакций с помощью простой функции хеширования. Это означает, что только один координатор владеет данным идентификатором транзакции. Координатор транзакций — единственный компонент, который может читать и записывать журнал транзакций. Если этот брокер выходит из строя, новый координатор выбирается лидером разделов журнала транзакций, которые ранее принадлежали отказавшему брокеру. Новый координатор транзакции считывает сообщения из входящих разделов, чтобы восстановить свое состояние в памяти для транзакций в этих разделах.
Репликация сообщений с брокера-лидера раздела на брокеров-подписчиков и процессы перевыбора лидера, гарантируют, что координатор транзакций всегда доступен, а все состояние транзакций надежно хранится. При этом журнал транзакций хранит только последнее состояние транзакции, а не фактические сообщения в транзакции. Сообщения хранятся исключительно в актуальных разделах топика. Сама транзакция в логе транзакций может находиться в различных состояниях, например Ongoing (Выполняется), Prepare commit (Подготовка к фиксации) и Completed (Завершено). Именно это состояние и связанные с ним метаданные сохраняются в журнале транзакций.
Таким образом, при публикации сообщений продюсер взаимодействует с координатором транзакций, отправляя ему запросы следующим образом:
- API initTransactions регистрирует идентификатор тразакции id у координатора, который закрывает все ожидающие транзакции с этим идентификатором и сдвигает эпоху, чтобы изолировать зомби-процессы, о которых мы писали здесь. Это происходит только один раз за сеанс продюсера. Когда продюсер собирается опубликовать данные в разделе топика Kafka впервые в транзакции, этот раздел сначала регистрируется у координатора. Наконец, когда приложение вызывает метод фиксации транзакции commitTransaction() или ее отмены abortTransaction(), координатору отправляется запрос на запуск протокола двухфазной фиксации (2PC).
- Далее происходит взаимодействие координатора и журнала транзакций. По мере выполнения транзакции продюсер отправляет запросы для обновления состояния транзакции на координаторе, который сохраняет состояние каждой своей транзакции в памяти. Также координатор транзакции записывает это состояние в журнал транзакций, который для надежности реплицируется тремя способами и является долговечным.
- Затем продюсер записывает данные в целевые разделы топика Kafka после регистрации новых разделов в транзакции с координатором. Это обычный поток данных в методе producer.send(), но с дополнительной проверкой, гарантирующей доступность и отсутствие изоляции продюсера.
- Наконец, после того как производитель инициирует фиксацию или отмену транзакции, координатор запускает протокол двухфазной фиксации. На первом этапе 2PC-протокола координатор обновляет свое внутреннее состояние до prepare_commit и обновляет это состояние в журнале транзакций. После этого транзакция гарантированно будет зафиксирована, несмотря ни на что. Затем координатор начинает второй этап, на котором он записывает маркеры фиксации транзакции в разделы топика, которые являются частью транзакции. Эти маркеры транзакций не доступны приложениям, но используются потребителями в режиме read_commited для фильтрации сообщений из прерванных транзакций и для предотвращения возврата сообщений, которые являются частью открытых транзакций, т. е. тех, которые есть в журнале, но не имеют маркер транзакции, связанный с ними. Как только маркеры записаны, координатор транзакции помечает транзакцию как завершенную, обновляя ее состояние на Completed, и продюсер может начать следующую транзакцию.
На UML-диаграмме последовательности это будет выглядеть так:
Скрипт PlantUML для этой sequence-диаграммы:
@startuml participant "Продюсер" as Producer participant "Координатор транзакций" as TC participant "Журнал транзакций" as TL participant "Раздел топика Kafka" as K Producer -> TC: initTransactions(transactional.id) TC -> TC: Закрыть все ожидающие транзакции(transactional.id) TC -> TC: Инкрементировать(epoch) loop для каждого нового раздела топика Producer -> TC: register(partition) Producer -\ K: producer.send(message) alt Commit transaction Producer -> TC: commitTransaction(transactional.id) group 2PC фаза 1 TC -> TC: обновить состояние('Prepare commit') TC -> TL: записать состояние(transactional.id, 'Prepare commit') TL -> TL: сохранить(transactional.id, 'Prepare commit') TL --> TC: ok end group 2PC фаза 2 TC -> K: записать маркеры фиксации транзакции(transactional.id) K -> K: сохранить(message) K --> TC: ok TC -> TC:обновить состояние('Completed') end TC --> Producer: state='Completed' else Abort transaction Producer -> TC: abortTransaction(transactional.id) TC --> Producer: canceled end end @enduml
В следующей статье продолжим разговор про транзакции, рассмотрев другое свойство из набора ACID – изоляцию.
Узнайте больше про администрирование и эксплуатацию Apache Kafka для потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Apache Kafka для инженеров данных
- Администрирование кластера Kafka
- Администрирование Arenadata Streaming Kafka
Источники