Продюсер – это приложение, которое создает и опубликовывает события в виде сообщений, информируя об изменениях в существующих данных или создании новых. Это приложение может представлять собой как микросервис, так и устройство IoT, и может быть реализовано на любом языке программирования, поддерживающем выбранный протокол. Для взаимодействия и обмена сообщениями приложение должно использовать протокол, поддерживаемый сервером. Примеры продюсеров в распределенных системах могут включать в себя [1]:
-
Продюсеры сообщений в системах сообщений: в распределенных системах, использующих системы сообщений (например, Apache Kafka, RabbitMQ), продюсер генерирует и отправляет сообщения в темы или очереди, где они могут быть обработаны потребителями.
-
Продюсеры в клиент-серверных приложениях: в клиент-серверной архитектуре распределенных систем продюсер может быть клиентским приложением, которое генерирует запросы и отправляет их серверу для обработки.
-
Продюсеры в системах потоковой обработки данных: в системах потоковой обработки данных продюсеры генерируют и передают потоки данных, которые затем могут быть обработаны различными компонентами, такими как потребители данных.
Рассмотрим ниже небольшой пример продюсера на примере брокера Kafka.
Продюсер в системе Kafka, также известный как Kafka producer, представляет собой фрагмент кода, исполняемый на стороне приложения, который ответственен за генерацию и передачу сообщений Big Data другим получателям (consumer) в распределенном кластере Kafka. Каждый Kafka-продюсер включает в себя 2 атрибута [2]:
- bootstrap.servers — список брокеров producer для соединения с кластером Kafka. Обычно рекомендуется использовать по меньшей мере два брокера, чтобы продюсер мог подключиться к кластеру при сбое одного из них;
- value.serializer — класс-сериализатор, применяемый для сериализации (преобразования объектов в байтовый массив) ключей записей (сообщений) для отправки брокерам в кластере Kafka. В качестве сериализаторов используются классы StringSerializer (для сериализации строковых объектов) и IntegerSerializer (для сериализации целочсиленных объектов);
Следующий фрагмент кода на языке Java отвечает за создание Kafka-продюсера [3]:
import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class KafkaProducerExample { public static void main(String[] args) { // Настройки для продюсера Kafka Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "local:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // Экземпляр продюсера Kafka Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(properties); // Параметры для отправки сообщения String topic = "my-topic"; String key = "my-key"; // Может быть null String value = "my-value"; ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value); // Отправка сообщения producer.send(record, (metadata, exception) -> { if (exception == null) { System.out.printf("Сообщение успешно отправлено в топик%s с оффсетом %d%n", metadata.topic(), metadata.offset()); } else { System.err.println("Ошибка при отправке сообщения: " + exception.getMessage()); } }); // Закрытие продюсера Kafka producer.close(); } }
Администрирование кластера Kafka
Администрирование Arenadata Streaming Kafka
Apache Kafka для инженеров данных
Источники
- https://itglobal.com/ru-ru/company/blog/chto-takoe-raspredelennye-vychisleniya/
- https://kafka.apache.org/documentation/
- Н.Нархид, Г.Шапира, Т.Палино. Apache Kafka. Потоковая обработка и анализ данных