Mirror Maker — это инструмент Apache Kafka, предназначенный для реализации зеркального копирования данных внутри брокера. Зеркальное копирование в Kafka подразумевает доступ к записям из разделов основного кластера с целью формирования локальной копии на дополнительном (целевом) кластере. Mirror Maker представляет собой набор потребителей, объединенных в одну группу, которые считывают данные из выбранных тем для копирования. В процессе зеркального копирования каждый потребитель в Mirror Maker запускает отдельный поток выполнения, считывающий данные из указанных топиков и разделов исходного кластера. После этого создается продюсер для отправки считанных данных на целевой кластер.
Каждые 60 секунд потребители уведомляют продюсера о необходимости передачи информации об имеющихся данных. Затем потребители обращаются к исходному кластеру Kafka для фиксации смещения этих данных.
Начало работы с Mirror Maker
Для запуска процесса копирования необходимо сначала получить перечень топиков, содержащих данные, которые требуется скопировать из исходного кластера. Далее рассмотрим пример кода, который формирует список топиков с данными на исходном кластере для их копирования на целевой кластер под другими именами:
import java.util.Collections; import java.util.HashMap; import java.util.List; import kafka.consumer.BaseConsumerRecord; import kafka.tools.MirrorMaker; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; public class RenameTopicHandler implements MirrorMaker.MirrorMakerMessageHandler { private final HashMap<String, String> topicMap = new HashMap<String, String>(); public RenameTopicHandler(String topicList) { String[] topicAssignments = topicList.split(";"); for (String topicAssignment : topicAssignments) { String[] topicsArray = topicAssignment.split(","); if (topicsArray.length == 2) { topicMap.put(topicsArray[0], topicsArray[1]); } } } public List<ProducerRecord<byte[], byte[]>> handle(BaseConsumerRecord record) { String targetTopic = null; if (topicMap.containsKey(record.topic())) { targetTopic = topicMap.get(record.topic()); } else { targetTopic = record.topic(); } Long timestamp = record.timestamp() == ConsumerRecord.NO_TIMESTAMP ? null : record.timestamp(); return Collections.singletonList(new ProducerRecord<byte[], byte[]>(targetTopic, null, timestamp, record.key(), record.value(), record.headers())); } }
Для зеркального копирования также немаловажно сохранять полную структуру исходных данных для их корректного «отражения». Следующий код «зеркально отражает» структуру исходных данных для копирования их на целевой кластер:
import java.util.Collections; import java.util.List; import kafka.consumer.BaseConsumerRecord; import kafka.tools.MirrorMaker; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.record.RecordBatch; public class ExactMessageHandler implements MirrorMaker.MirrorMakerMessageHandler { public List<ProducerRecord<byte[], byte[]>> handle(BaseConsumerRecord record) { Long timestamp = record.timestamp() == RecordBatch.NO_TIMESTAMP ? null : record.timestamp(); return Collections.singletonList(new ProducerRecord<byte[], byte[]>(record.topic(), record.partition(), timestamp, record.key(), record.value(), record.headers())); } }
Таким образом, благодаря механизму зеркального копирования Apache Kafka является весьма надежным и отказоустойчивым брокером для хранения и обмена больших потоков данных, что делает ее полезным средством для каждого специалиста в области анализа и обработки больших данных, от Data Scientist’а до разработчика распределенных приложений.
Освоить Apache Kafka на профессиональном уровне в качестве администратора Big Data кластеров, разработчика распределенных приложений и прочих прикладных областях Data Science вы сможете на практических курсах по Kafka в нашем лицензированном учебном центре обучения и повышения квалификации ИТ-специалистов в Москве:
Администрирование кластера Kafka
Администрирование Arenadata Streaming Kafka