Транзакции в Apache Kafka: атомарность публикации сообщений

Kafka курсы примеры обучение, инженерия данных с Kafka, транзакции Kafka, публикация Kafka пример, Школа Больших Данных Учебный Центр Коммерсант

Как Apache Kafka реализует требование к атомарности транзакций с помощью координатора и журнала транзакций: принцип Atomic в ACID и его иллюстрация на UML-диаграмме последовательности публикации сообщений в раздел топика.

Транзакционная публикация сообщений в Apache Kafka

Хотя Apache Kafka не является базой данных, эта платформа потоковой передачи событий все же хранит сообщения, опубликованные продюсером в ее топики. При этом публикация выполняется транзакционно, т.е. все операции, которые связаны с публикацией сообщения (запись в лидере раздела, репликация по подписчикам, увеличение смещения) в топике Kafka должны выполниться вместе. А если произойдет сбой хотя бы одной операции в рамках транзакции, она целиком считается невыполненной, причем происходит откат всех выполненных операций, если какие-то завершились успешно.

Kafka поддерживает распределённые транзакции с полноценными ACID-гарантиями только в пределах самой платформы, включая несколько продюсеров и потребителей для разных топиков. Транзакции позволяют осуществлять атомарную запись в несколько топиков и разделов Kafka. Все сообщения, включенные в транзакцию, будут успешно записаны или записано не будет ни одно из них. Например, ошибка во время обработки может привести к прерыванию транзакции, и в этом случае ни одно из сообщений транзакции не будет доступно потребителям. Это обеспечивает атомарные циклы чтения-обработки-записи.

В Kafka за транзакции отвечают координатор транзакций и журнал транзакций. Координатор транзакций — это модуль, работающий внутри каждого брокера Kafka. Журнал транзакций — это внутренний топик Kafka под названием __transaction_state. Каждый координатор владеет некоторым подмножеством разделов в журнале транзакций, для которых его брокер является лидером. Каждый идентификатор транзакции сопоставляется с определенным разделом журнала транзакций с помощью простой функции хеширования. Это означает, что только один координатор владеет данным идентификатором транзакции. Координатор транзакций — единственный компонент, который может читать и записывать журнал транзакций. Если этот брокер выходит из строя, новый координатор выбирается лидером разделов журнала транзакций, которые ранее принадлежали отказавшему брокеру. Новый координатор транзакции считывает сообщения из входящих разделов, чтобы восстановить свое состояние в памяти для транзакций в этих разделах.

Репликация сообщений с брокера-лидера раздела на брокеров-подписчиков и процессы перевыбора лидера, гарантируют, что координатор транзакций всегда доступен, а все состояние транзакций надежно хранится. При этом журнал транзакций хранит только последнее состояние транзакции, а не фактические сообщения в транзакции. Сообщения хранятся исключительно в актуальных разделах топика. Сама транзакция в логе транзакций может находиться в различных состояниях, например Ongoing (Выполняется), Prepare commit (Подготовка к фиксации) и Completed (Завершено). Именно это состояние и связанные с ним метаданные сохраняются в журнале транзакций.

Таким образом, при публикации сообщений продюсер взаимодействует с координатором транзакций, отправляя ему запросы следующим образом:

  • API initTransactions регистрирует идентификатор тразакции id у координатора, который закрывает все ожидающие транзакции с этим идентификатором и сдвигает эпоху, чтобы изолировать зомби-процессы, о которых мы писали здесь. Это происходит только один раз за сеанс продюсера. Когда продюсер собирается опубликовать данные в разделе топика Kafka впервые в транзакции, этот раздел сначала регистрируется у координатора. Наконец, когда приложение вызывает метод фиксации транзакции commitTransaction() или ее отмены abortTransaction(), координатору отправляется запрос на запуск протокола двухфазной фиксации (2PC).
  • Далее происходит взаимодействие координатора и журнала транзакций. По мере выполнения транзакции продюсер отправляет запросы для обновления состояния транзакции на координаторе, который сохраняет состояние каждой своей транзакции в памяти. Также координатор транзакции записывает это состояние в журнал транзакций, который для надежности реплицируется тремя способами и является долговечным.
  • Затем продюсер записывает данные в целевые разделы топика Kafka после регистрации новых разделов в транзакции с координатором. Это обычный поток данных в методе producer.send(), но с дополнительной проверкой, гарантирующей доступность и отсутствие изоляции продюсера.
  • Наконец, после того как производитель инициирует фиксацию или отмену транзакции, координатор запускает протокол двухфазной фиксации. На первом этапе 2PC-протокола координатор обновляет свое внутреннее состояние до prepare_commit и обновляет это состояние в журнале транзакций. После этого транзакция гарантированно будет зафиксирована, несмотря ни на что. Затем координатор начинает второй этап, на котором он записывает маркеры фиксации транзакции в разделы топика, которые являются частью транзакции. Эти маркеры транзакций не доступны приложениям, но используются потребителями в режиме read_commited для фильтрации сообщений из прерванных транзакций и для предотвращения возврата сообщений, которые являются частью открытых транзакций, т. е. тех, которые есть в журнале, но не имеют маркер транзакции, связанный с ними. Как только маркеры записаны, координатор транзакции помечает транзакцию как завершенную, обновляя ее состояние на Completed, и продюсер может начать следующую транзакцию.

На UML-диаграмме последовательности это будет выглядеть так:

UML-диаграмма транзакционной публикации сообщений в Apache Kafka
UML-диаграмма транзакционной публикации сообщений в Apache Kafka

Скрипт PlantUML для этой sequence-диаграммы:

@startuml
participant "Продюсер" as Producer
participant "Координатор транзакций" as TC
participant "Журнал транзакций" as TL
participant "Раздел топика Kafka" as K

Producer -> TC: initTransactions(transactional.id)
    TC -> TC: Закрыть все ожидающие транзакции(transactional.id) 
    TC -> TC: Инкрементировать(epoch)
loop для каждого нового раздела топика
    Producer -> TC: register(partition)
    Producer -\ K: producer.send(message)
alt Commit transaction
    Producer -> TC: commitTransaction(transactional.id)
group 2PC фаза 1
    TC -> TC: обновить состояние('Prepare commit')
    TC -> TL: записать состояние(transactional.id, 'Prepare commit')
    TL -> TL: сохранить(transactional.id, 'Prepare commit')
    TL --> TC: ok
end

group 2PC фаза 2
    TC -> K: записать маркеры фиксации транзакции(transactional.id)
    K -> K: сохранить(message)
    K --> TC: ok
    TC -> TC:обновить состояние('Completed')
end
TC --> Producer: state='Completed'
else Abort transaction
    Producer -> TC: abortTransaction(transactional.id)
TC --> Producer: canceled
end
end
@enduml

В следующей статье продолжим разговор про транзакции, рассмотрев другое свойство из набора ACID – изоляцию.

Узнайте больше про администрирование и эксплуатацию Apache Kafka для потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://www.confluent.io/blog/transactions-apache-kafka/
  2. https://developer.confluent.io/learn/kafka-transactions-and-guarantees/
  3. https://developer.confluent.io/courses/architecture/transactions/
Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту