Настройка портов Docker-контейнеров компонентов Kafka для запуска на офисном ноутбуке и Windows

Kafka курсы примеры обучение, Kafka для разработчика и дата-инженера, Kafka Docker, Kafka примеры курсы обучение дата-инженеров, Школа Больших Данных Учебный Центр Коммерсант

Как настроить YAML-файл Docker Compose для доступа к Kafka на WSL в Windows:  открытие портов в конфигурации развертывания с примерами (продолжение).

Настройка конфигурационного YAML-файла для запуска Docker-контейнеров с компонентами Kafka на Windows в WSL

Как я рассказывала вчера, для работы с компонентами платформы Kafka от Confluent, развернутой как набор связанных Docker-контейнеров в WSL на Windows с GUI-интерфейсом AKHQ исходная конфигурация не подошла. После запуска контейнеров с помощью команды

docker-compose start

в консоли WSL из директории, где лежит файл docker-compose.yml, веб-сервис AKHQ становится доступным на локальном хосте, порт 8080.

Запуск Docker-контейнеров
Запуск Docker-контейнеров

Поскольку для работы Kafka необходим Zookeeper, который хранит метаданные о топиках, разделах и брокерах, а также отслеживает лидера раздела, управляет ACL-списками и другими конфигурационными данными, он тоже должен быть запущен. Чтобы контейнер Kafka мог обратиться к Zookeeper, а также другим компонентам платформы, которые в будущем я планирую использовать (ksqlDB, Kafka Connect, REST Proxy, Schema Registry), надо открыть порты, внеся изменения в конфигурационный YAML-файл docker-compose.yml. В документации Docker-образа cp-all-in-one-community, доступной на Github [1], говорится, что сервисы запускаются на следующих портах:

  • ZooKeeper – порт 2181;
  • Kafka broker — порт 9092;
  • Kafka broker JMX — порт 9101;
  • Confluent Schema Registry — порт 8081;
  • Kafka Connect — порт 8083;
  • ksqlDB — порт 8088;
  • Confluent REST Proxy — порт 8082.

Я решила сразу открыть все порты, изменив конфигурационный файл следующим образом:

version: '3.6'

volumes:
  zookeeper-data:
    driver: local
  zookeeper-log:
    driver: local
  kafka-data:
    driver: local

services:
  akhq:
    # build:
    #   context: .
    image: tchiotludo/akhq
    restart: unless-stopped
    environment:
      AKHQ_CONFIGURATION: |
        akhq:
          connections:
            docker-kafka-server:
              properties:
                bootstrap.servers: "kafka:9092"
              schema-registry:
                url: "http://schema-registry:8085"
              connect:
                - name: "connect"
                  url: "http://connect:8083"

    ports:
      - 8080:8080
    links:
      - kafka
      - schema-registry

  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    restart: unless-stopped
    ports:
      - 2181:2181
    volumes:
      - zookeeper-data:/var/lib/zookeeper/data:Z
      - zookeeper-log:/var/lib/zookeeper/log:Z
    environment:
      ZOOKEEPER_CLIENT_PORT: '2181'
      ZOOKEEPER_ADMIN_ENABLE_SERVER: 'false'

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    restart: unless-stopped
    volumes:
      - kafka-data:/var/lib/kafka/data:Z
    environment:
      KAFKA_BROKER_ID: '0'
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_NUM_PARTITIONS: '12'
      KAFKA_COMPRESSION_TYPE: 'gzip'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: '1'
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: '1'
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: '1'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:9092'
      KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: 'false'
      KAFKA_JMX_PORT: '9091'
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer'
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
    ports:
      - 9092:9092
    links:
      - zookeeper

  schema-registry:
    image: confluentinc/cp-schema-registry:7.5.0
    restart: unless-stopped
    depends_on:
      - kafka
    ports:
     - 8081:8081
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka:9092'
      SCHEMA_REGISTRY_HOST_NAME: 'schema-registry'
      SCHEMA_REGISTRY_LISTENERS: 'http://0.0.0.0:8085'
      SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: 'INFO'

  connect:
    image: confluentinc/cp-kafka-connect:7.5.0
    restart: unless-stopped
    depends_on:
      - kafka
      - schema-registry
    ports:
      - 8083:8083
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'kafka:9092'
      CONNECT_REST_PORT: '8083'
      CONNECT_REST_LISTENERS: 'http://0.0.0.0:8083'
      CONNECT_REST_ADVERTISED_HOST_NAME: 'connect'
      CONNECT_CONFIG_STORAGE_TOPIC: '__connect-config'
      CONNECT_OFFSET_STORAGE_TOPIC: '__connect-offsets'
      CONNECT_STATUS_STORAGE_TOPIC: '__connect-status'
      CONNECT_GROUP_ID: 'kafka-connect'
      CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: 'true'
      CONNECT_KEY_CONVERTER: 'io.confluent.connect.avro.AvroConverter'
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8085'
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: 'true'
      CONNECT_VALUE_CONVERTER: 'io.confluent.connect.avro.AvroConverter'
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8085'
      CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '1'
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '1'
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '1'
      CONNECT_PLUGIN_PATH: ' /usr/share/java/'

  ksqldb:
    image: confluentinc/cp-ksqldb-server:7.5.0
    restart: unless-stopped
    depends_on:
      - kafka
      - connect
      - schema-registry
    ports:
      - 8088:8088
    environment:
      KSQL_BOOTSTRAP_SERVERS: 'kafka:9092'
      KSQL_LISTENERS: 'http://0.0.0.0:8088'
      KSQL_KSQL_SERVICE_ID: 'ksql'
      KSQL_KSQL_SCHEMA_REGISTRY_URL: 'http://schema-registry:8085'
      KSQL_KSQL_CONNECT_URL: 'http://connect:8083'
      KSQL_KSQL_SINK_PARTITIONS: '1'
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: '1'

  test-data:
    image: gradle:8-jdk11
    command: "gradle --no-daemon testInjectData -x installFrontend -x assembleFrontend"
    restart: unless-stopped
    working_dir: /app
    volumes:
      - ./:/app:z
    links:
      - kafka
      - schema-registry

  kafkacat:
    image: confluentinc/cp-kafkacat:7.1.14
    restart: unless-stopped
    depends_on:
      - kafka
    command:
      - bash
      - -c
      - |
        kafkacat -P -b kafka:9092 -t json << EOF
        {"_id":"5c4b2b45ab234c86955f0802","index":0,"guid":"d3637b06-9940-4958-9f82-639001c14c34"}
        {"_id":"5c4b2b459ffa9bb0c0c249e1","index":1,"guid":"08612fb5-40a7-45e5-9ff2-beb89a1b2835"}
        {"_id":"5c4b2b4545d7cbc7bf8b6e3e","index":2,"guid":"4880280a-cf8b-4884-881e-7b64ebf2afd0"}
        {"_id":"5c4b2b45dab381e6b3024c6d","index":3,"guid":"36d04c26-0dae-4a8e-a66e-bde9b3b6a745"}
        {"_id":"5c4b2b45d1103ce30dfe1947","index":4,"guid":"14d53f2c-def3-406f-9dfb-c29963fdc37e"}
        {"_id":"5c4b2b45d6d3b5c51d3dacb7","index":5,"guid":"a20cfc3a-934a-4b93-9a03-008ec651b5a4"}
        EOF

        kafkacat -P -b kafka:9092 -t csv << EOF
        1,Sauncho,Attfield,sattfield0@netlog.com,Male,221.119.13.246
        2,Luci,Harp,lharp1@wufoo.com,Female,161.14.184.150
        3,Hanna,McQuillan,hmcquillan2@mozilla.com,Female,214.67.74.80
        4,Melba,Lecky,mlecky3@uiuc.edu,Female,158.112.18.189
        5,Mordecai,Hurdiss,mhurdiss4@rambler.ru,Male,175.123.45.143
        EOF

        kafkacat -b kafka:9092 -o beginning -G json-consumer json
    links:
      - kafka

После изменения этого конфигурационного файла пришлось пересоздать контейнеры, сперва остановив и удалив их с помощью команды

docker-compose down

После повторного создания Docker-контейнеров из измененного YAML-файла с помощью команды docker-compose up –d, посмотрим открытые порты, запустив в консоли WSL команду

docker ps
Просмотр запущенных Docker-контейнеров
Просмотр запущенных Docker-контейнеров

Веб-интерфейс AKHQ успешно работает на localhost:8080, позволяя создавать и просматривать содержимое топиков в визуальном режиме. Также можно отправить сообщение в топик через GUI.

Kafka AKHQ GUI
Просмотр списка топиков Kafka в веб-интерфейсе AKHQ

Однако, попытка подключиться к Kafka на localhost:9092 из Python-скрипта, выдала ошибку. Для проверки подключения использовался следующий скрипт:

from kafka import KafkaConsumer, KafkaProducer
from kafka.errors import KafkaError
import logging


log_level = logging.DEBUG
logging.basicConfig(level=log_level)
log = logging.getLogger('kafka')
log.setLevel(log_level)


def test_kafka_connection():
    # Define the Kafka broker address
    bootstrap_servers = ['localhost:9092']

    # Test producer connection
    try:
        producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
        # Send a test message to ensure the connection works
        producer.send('test', value='Test message'.encode('utf-8'))
        producer.flush()
        print("Successfully connected to Kafka as a producer.")
    except KafkaError as e:
        print(f"Failed to connect to Kafka as a producer: {e}")

    # Test consumer connection
    try:
        consumer = KafkaConsumer(
            'test',
            bootstrap_servers=bootstrap_servers,
            auto_offset_reset='earliest',
            enable_auto_commit=True,
            group_id='my-group',
            value_deserializer=lambda x: x.decode('utf-8')
        )
        # Check if we can consume messages
        for message in consumer:
            print(f"Consumed message: {message.value}")
            break  # Exit after first message
        consumer.close()
        print("Successfully connected to Kafka as a consumer.")
    except KafkaError as e:
        print(f"Failed to connect to Kafka as a consumer: {e}")


if __name__ == "__main__":
    test_kafka_connection()

Это скрипт выдает ошибку.

Ошибка подключения к брокеру Kafka клиентом вне контейнера
Ошибка подключения к брокеру Kafka клиентом вне контейнера

Ошибка связана с невозможностью необходимостью настроить слушатели Kafka. Что это такое, рассмотрим далее.

Подключение к Kafka на Docker: настройка слушателей

Слушатель — это комбинация хоста/IP, порта, и протокола. Будучи распределенной системой, Kafka позволяет публиковать данные в лидер раздела, который может быть на любом из брокеров в кластере. Когда запускается клиент, т.е. продюсер или потребитель, он запрашивает метаданные о том, какой брокер является лидером для раздела, и он может сделать это у любого брокера. Возвращаемые метаданные будут включать конечные точки, доступные для брокера-лидера для этого раздела, и клиент будет использовать их для подключения к брокеру. Именно эти конечные точки стали источником проблемы. Если запустить все на голом железе, без виртуальных машин и Docker-контейнеров, можно работать просто с локальным хостом localhost. Но при более сложных сетевых настройках и нескольких узлах, нужно использовать KafkaListenerConfigurer, интерфейс для настройки конфигурации конечных точек слушателя. Для работы с ним используются следующие конфигурации:

  • KAFKA_LISTENERS— разделенный запятыми список слушателей, а также хост/IP и порт, к которому Kafka привязывается для прослушивания. Для более сложной сети это может быть IP-адрес, связанный с заданным сетевым интерфейсом на машине. Значение по умолчанию — 0.0.0.0, что означает прослушивание на всех интерфейсах. Возможно, это уже устаревшая конфигурация, поскольку ее не было в исходном YAML-файле для создания контейнеров от Confluent.
  • KAFKA_ADVERTISED_LISTENERS— это список слушателей, разделенных запятыми, с их хостом/IP и портом. Это метаданные, которые передаются обратно клиентам.
  • KAFKA_LISTENER_SECURITY_PROTOCOL_MAP – сопоставление, которое определяет пары ключ/значение для протокола безопасности, используемого каждым слушателем.

Брокеры взаимодействуют между собой обычно во внутренней сети. Для определения конкретного слушателя используется конфигурация KAFKA_INTER_BROKER_LISTENER_NAME (inter.broker.listener.name). Используемый хост/IP должен быть доступен с машины брокера другим компонентам. Если клиенты Kafka находятся вне сети брокера, что и было в моем случае, надо настраивать дополнительные слушатели.

Каждый слушатель, при подключении, сообщит адрес, по которому он может быть доступен. Этот адрес зависит от используемой сети. Подключение к брокеру из внутренней сети отличается от хоста/IP и порта при внешнем подключении.

Для запуска в Docker нужно настроить два слушателя Kafka:

  • для коммуникации между брокерами и другими компонентами Kafka в сети Docker. Для этих коммуникаций надо использовать имя хоста Docker-контейнеров. Каждый контейнер Docker в той же сети Docker будет использовать имя хоста контейнера брокера Kafka для доступа к нему.
  • трафик вне Docker:  клиенты, работающие локально на хост-машине, но вне контейнера. Они будут подключаться к Kafka по localhost к порту, открытому из контейнера Docker.

На UML-диаграмме развертывания это выглядит так.

UML-диаграмма развертывания контейнеров Kafka в Docker
UML-диаграмма развертывания контейнеров Kafka в Docker

Скрипт PlantUML для этой диаграммы:

@startuml
node "Host Machine" {
package "Docker Container" {
component "Kafka Broker" as KB {
  portin "внутренний\nпорт\n9092" as 9092 
  portin "внешний\nпорт\n29092" as 29092
}
[Internal Producer] --> 9092
}
[External Producer] --> 29092
}
@enduml

Продолжительные исследования показали, что в конфигурационном файле docker-compose.yml необходимо изменить открытый порт для Kafka, а также следующие конфигурации:

  • KAFKA_ADVERTISED_LISTENERS: ‘PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092’
  • KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: ‘PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT’
  • ports: — 29092:29092

В результате этих манипуляций конфигурационный YAML-файл docker-compose.yml выглядит так:

version: '3.6'

volumes:
  zookeeper-data:
    driver: local
  zookeeper-log:
    driver: local
  kafka-data:
    driver: local

services:
  akhq:
    # build:
    #   context: .
    image: tchiotludo/akhq
    restart: unless-stopped
    environment:
      AKHQ_CONFIGURATION: |
        akhq:
          connections:
            docker-kafka-server:
              properties:
                bootstrap.servers: "kafka:9092"
              schema-registry:
                url: "http://schema-registry:8085"
              connect:
                - name: "connect"
                  url: "http://connect:8083"

    ports:
      - 8080:8080
    links:
      - kafka
      - schema-registry

  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    restart: unless-stopped
    ports:
      - 2181:2181
    volumes:
      - zookeeper-data:/var/lib/zookeeper/data:Z
      - zookeeper-log:/var/lib/zookeeper/log:Z
    environment:
      ZOOKEEPER_CLIENT_PORT: '2181'
      ZOOKEEPER_ADMIN_ENABLE_SERVER: 'false'

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    restart: unless-stopped
    volumes:
      - kafka-data:/var/lib/kafka/data:Z
    environment:
      KAFKA_BROKER_ID: '0'
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_NUM_PARTITIONS: '12'
      KAFKA_COMPRESSION_TYPE: 'gzip'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: '1'
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: '1'
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: '1'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
      KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: 'false'
      KAFKA_JMX_PORT: '9091'
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer'
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
    ports:
      - 29092:29092
    links:
      - zookeeper

  schema-registry:
    image: confluentinc/cp-schema-registry:7.5.0
    restart: unless-stopped
    depends_on:
      - kafka
    ports:
     - 8081:8081
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka:9092'
      SCHEMA_REGISTRY_HOST_NAME: 'schema-registry'
      SCHEMA_REGISTRY_LISTENERS: 'http://0.0.0.0:8085'
      SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: 'INFO'

  connect:
    image: confluentinc/cp-kafka-connect:7.5.0
    restart: unless-stopped
    depends_on:
      - kafka
      - schema-registry
    ports:
      - 8083:8083
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'kafka:9092'
      CONNECT_REST_PORT: '8083'
      CONNECT_REST_LISTENERS: 'http://0.0.0.0:8083'
      CONNECT_REST_ADVERTISED_HOST_NAME: 'connect'
      CONNECT_CONFIG_STORAGE_TOPIC: '__connect-config'
      CONNECT_OFFSET_STORAGE_TOPIC: '__connect-offsets'
      CONNECT_STATUS_STORAGE_TOPIC: '__connect-status'
      CONNECT_GROUP_ID: 'kafka-connect'
      CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: 'true'
      CONNECT_KEY_CONVERTER: 'io.confluent.connect.avro.AvroConverter'
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8085'
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: 'true'
      CONNECT_VALUE_CONVERTER: 'io.confluent.connect.avro.AvroConverter'
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8085'
      CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '1'
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '1'
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '1'
      CONNECT_PLUGIN_PATH: ' /usr/share/java/'

  ksqldb:
    image: confluentinc/cp-ksqldb-server:7.5.0
    restart: unless-stopped
    depends_on:
      - kafka
      - connect
      - schema-registry
    ports:
      - 8088:8088
    environment:
      KSQL_BOOTSTRAP_SERVERS: 'kafka:9092'
      KSQL_LISTENERS: 'http://0.0.0.0:8088'
      KSQL_KSQL_SERVICE_ID: 'ksql'
      KSQL_KSQL_SCHEMA_REGISTRY_URL: 'http://schema-registry:8085'
      KSQL_KSQL_CONNECT_URL: 'http://connect:8083'
      KSQL_KSQL_SINK_PARTITIONS: '1'
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: '1'

  test-data:
    image: gradle:8-jdk11
    command: "gradle --no-daemon testInjectData -x installFrontend -x assembleFrontend"
    restart: unless-stopped
    working_dir: /app
    volumes:
      - ./:/app:z
    links:
      - kafka
      - schema-registry

  kafkacat:
    image: confluentinc/cp-kafkacat:7.1.14
    restart: unless-stopped
    depends_on:
      - kafka
    command:
      - bash
      - -c
      - |
        kafkacat -P -b kafka:9092 -t json << EOF
        {"_id":"5c4b2b45ab234c86955f0802","index":0,"guid":"d3637b06-9940-4958-9f82-639001c14c34"}
        {"_id":"5c4b2b459ffa9bb0c0c249e1","index":1,"guid":"08612fb5-40a7-45e5-9ff2-beb89a1b2835"}
        {"_id":"5c4b2b4545d7cbc7bf8b6e3e","index":2,"guid":"4880280a-cf8b-4884-881e-7b64ebf2afd0"}
        {"_id":"5c4b2b45dab381e6b3024c6d","index":3,"guid":"36d04c26-0dae-4a8e-a66e-bde9b3b6a745"}
        {"_id":"5c4b2b45d1103ce30dfe1947","index":4,"guid":"14d53f2c-def3-406f-9dfb-c29963fdc37e"}
        {"_id":"5c4b2b45d6d3b5c51d3dacb7","index":5,"guid":"a20cfc3a-934a-4b93-9a03-008ec651b5a4"}
        EOF

        kafkacat -P -b kafka:9092 -t csv << EOF
        1,Sauncho,Attfield,sattfield0@netlog.com,Male,221.119.13.246
        2,Luci,Harp,lharp1@wufoo.com,Female,161.14.184.150
        3,Hanna,McQuillan,hmcquillan2@mozilla.com,Female,214.67.74.80
        4,Melba,Lecky,mlecky3@uiuc.edu,Female,158.112.18.189
        5,Mordecai,Hurdiss,mhurdiss4@rambler.ru,Male,175.123.45.143
        EOF

        kafkacat -b kafka:9092 -o beginning -G json-consumer json
    links:
      - kafka

Теперь открытые порты выглядят так:

Просмотр запущенных Docker-контейнеров в WSL
Просмотр запущенных Docker-контейнеров

Успех внесенных изменений подтверждает ответ тестового скрипта, в котором идет обращение к bootstrap_servers = [‘localhost:29092’].

Успешное подключение к брокеру Kafka клиентом вне контейнера
Успешное подключение к брокеру Kafka клиентом вне контейнера

Таким образом, работа с Docker-контейнерами оказалась не так проста, как это казалось сначала. Завтра я продолжу работать с этим развертыванием и покажу пример публикации сообщений в топики и потребления их оттуда.

Освойте администрирование и эксплуатацию Apache Kafka на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники:

  1. https://github.com/confluentinc/cp-all-in-one/tree/7.5.0-post/cp-all-in-one-community
  2. https://www.confluent.io/blog/kafka-listeners-explained/
Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту