Продюсер – это приложение, которое создает и опубликовывает события в виде сообщений, информируя об изменениях в существующих данных или создании новых. Это приложение может представлять собой как микросервис, так и устройство IoT, и может быть реализовано на любом языке программирования, поддерживающем выбранный протокол. Для взаимодействия и обмена сообщениями приложение должно использовать протокол, поддерживаемый сервером. Примеры продюсеров в распределенных системах могут включать в себя [1]:
-
Продюсеры сообщений в системах сообщений: в распределенных системах, использующих системы сообщений (например, Apache Kafka, RabbitMQ), продюсер генерирует и отправляет сообщения в темы или очереди, где они могут быть обработаны потребителями.
-
Продюсеры в клиент-серверных приложениях: в клиент-серверной архитектуре распределенных систем продюсер может быть клиентским приложением, которое генерирует запросы и отправляет их серверу для обработки.
-
Продюсеры в системах потоковой обработки данных: в системах потоковой обработки данных продюсеры генерируют и передают потоки данных, которые затем могут быть обработаны различными компонентами, такими как потребители данных.
Рассмотрим ниже небольшой пример продюсера на примере брокера Kafka.
Продюсер в системе Kafka, также известный как Kafka producer, представляет собой фрагмент кода, исполняемый на стороне приложения, который ответственен за генерацию и передачу сообщений Big Data другим получателям (consumer) в распределенном кластере Kafka. Каждый Kafka-продюсер включает в себя 2 атрибута [2]:
- bootstrap.servers — список брокеров producer для соединения с кластером Kafka. Обычно рекомендуется использовать по меньшей мере два брокера, чтобы продюсер мог подключиться к кластеру при сбое одного из них;
- value.serializer — класс-сериализатор, применяемый для сериализации (преобразования объектов в байтовый массив) ключей записей (сообщений) для отправки брокерам в кластере Kafka. В качестве сериализаторов используются классы StringSerializer (для сериализации строковых объектов) и IntegerSerializer (для сериализации целочсиленных объектов);
Следующий фрагмент кода на языке Java отвечает за создание Kafka-продюсера [3]:
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) {
// Настройки для продюсера Kafka
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "local:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Экземпляр продюсера Kafka
Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(properties);
// Параметры для отправки сообщения
String topic = "my-topic";
String key = "my-key"; // Может быть null
String value = "my-value";
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
// Отправка сообщения
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.printf("Сообщение успешно отправлено в топик%s с оффсетом %d%n",
metadata.topic(), metadata.offset());
} else {
System.err.println("Ошибка при отправке сообщения: " + exception.getMessage());
}
});
// Закрытие продюсера Kafka
producer.close();
}
}
Администрирование кластера Kafka
Администрирование Arenadata Streaming Kafka
Apache Kafka для инженеров данных
Источники
- https://itglobal.com/ru-ru/company/blog/chto-takoe-raspredelennye-vychisleniya/
- https://kafka.apache.org/documentation/
- Н.Нархид, Г.Шапира, Т.Палино. Apache Kafka. Потоковая обработка и анализ данных
