Как настроить YAML-файл Docker Compose для доступа к Kafka на WSL в Windows: открытие портов в конфигурации развертывания с примерами (продолжение).
Настройка конфигурационного YAML-файла для запуска Docker-контейнеров с компонентами Kafka на Windows в WSL
Как я рассказывала вчера, для работы с компонентами платформы Kafka от Confluent, развернутой как набор связанных Docker-контейнеров в WSL на Windows с GUI-интерфейсом AKHQ исходная конфигурация не подошла. После запуска контейнеров с помощью команды
docker-compose start
в консоли WSL из директории, где лежит файл docker-compose.yml, веб-сервис AKHQ становится доступным на локальном хосте, порт 8080.
Поскольку для работы Kafka необходим Zookeeper, который хранит метаданные о топиках, разделах и брокерах, а также отслеживает лидера раздела, управляет ACL-списками и другими конфигурационными данными, он тоже должен быть запущен. Чтобы контейнер Kafka мог обратиться к Zookeeper, а также другим компонентам платформы, которые в будущем я планирую использовать (ksqlDB, Kafka Connect, REST Proxy, Schema Registry), надо открыть порты, внеся изменения в конфигурационный YAML-файл docker-compose.yml. В документации Docker-образа cp-all-in-one-community, доступной на Github [1], говорится, что сервисы запускаются на следующих портах:
- ZooKeeper – порт 2181;
- Kafka broker — порт 9092;
- Kafka broker JMX — порт 9101;
- Confluent Schema Registry — порт 8081;
- Kafka Connect — порт 8083;
- ksqlDB — порт 8088;
- Confluent REST Proxy — порт 8082.
Я решила сразу открыть все порты, изменив конфигурационный файл следующим образом:
version: '3.6' volumes: zookeeper-data: driver: local zookeeper-log: driver: local kafka-data: driver: local services: akhq: # build: # context: . image: tchiotludo/akhq restart: unless-stopped environment: AKHQ_CONFIGURATION: | akhq: connections: docker-kafka-server: properties: bootstrap.servers: "kafka:9092" schema-registry: url: "http://schema-registry:8085" connect: - name: "connect" url: "http://connect:8083" ports: - 8080:8080 links: - kafka - schema-registry zookeeper: image: confluentinc/cp-zookeeper:7.5.0 restart: unless-stopped ports: - 2181:2181 volumes: - zookeeper-data:/var/lib/zookeeper/data:Z - zookeeper-log:/var/lib/zookeeper/log:Z environment: ZOOKEEPER_CLIENT_PORT: '2181' ZOOKEEPER_ADMIN_ENABLE_SERVER: 'false' kafka: image: confluentinc/cp-kafka:7.5.0 restart: unless-stopped volumes: - kafka-data:/var/lib/kafka/data:Z environment: KAFKA_BROKER_ID: '0' KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_NUM_PARTITIONS: '12' KAFKA_COMPRESSION_TYPE: 'gzip' KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: '1' KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: '1' KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: '1' KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:9092' KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: 'false' KAFKA_JMX_PORT: '9091' KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer' KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true' ports: - 9092:9092 links: - zookeeper schema-registry: image: confluentinc/cp-schema-registry:7.5.0 restart: unless-stopped depends_on: - kafka ports: - 8081:8081 environment: SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka:9092' SCHEMA_REGISTRY_HOST_NAME: 'schema-registry' SCHEMA_REGISTRY_LISTENERS: 'http://0.0.0.0:8085' SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: 'INFO' connect: image: confluentinc/cp-kafka-connect:7.5.0 restart: unless-stopped depends_on: - kafka - schema-registry ports: - 8083:8083 environment: CONNECT_BOOTSTRAP_SERVERS: 'kafka:9092' CONNECT_REST_PORT: '8083' CONNECT_REST_LISTENERS: 'http://0.0.0.0:8083' CONNECT_REST_ADVERTISED_HOST_NAME: 'connect' CONNECT_CONFIG_STORAGE_TOPIC: '__connect-config' CONNECT_OFFSET_STORAGE_TOPIC: '__connect-offsets' CONNECT_STATUS_STORAGE_TOPIC: '__connect-status' CONNECT_GROUP_ID: 'kafka-connect' CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: 'true' CONNECT_KEY_CONVERTER: 'io.confluent.connect.avro.AvroConverter' CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8085' CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: 'true' CONNECT_VALUE_CONVERTER: 'io.confluent.connect.avro.AvroConverter' CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8085' CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter' CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter' CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '1' CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '1' CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '1' CONNECT_PLUGIN_PATH: ' /usr/share/java/' ksqldb: image: confluentinc/cp-ksqldb-server:7.5.0 restart: unless-stopped depends_on: - kafka - connect - schema-registry ports: - 8088:8088 environment: KSQL_BOOTSTRAP_SERVERS: 'kafka:9092' KSQL_LISTENERS: 'http://0.0.0.0:8088' KSQL_KSQL_SERVICE_ID: 'ksql' KSQL_KSQL_SCHEMA_REGISTRY_URL: 'http://schema-registry:8085' KSQL_KSQL_CONNECT_URL: 'http://connect:8083' KSQL_KSQL_SINK_PARTITIONS: '1' KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: '1' test-data: image: gradle:8-jdk11 command: "gradle --no-daemon testInjectData -x installFrontend -x assembleFrontend" restart: unless-stopped working_dir: /app volumes: - ./:/app:z links: - kafka - schema-registry kafkacat: image: confluentinc/cp-kafkacat:7.1.14 restart: unless-stopped depends_on: - kafka command: - bash - -c - | kafkacat -P -b kafka:9092 -t json << EOF {"_id":"5c4b2b45ab234c86955f0802","index":0,"guid":"d3637b06-9940-4958-9f82-639001c14c34"} {"_id":"5c4b2b459ffa9bb0c0c249e1","index":1,"guid":"08612fb5-40a7-45e5-9ff2-beb89a1b2835"} {"_id":"5c4b2b4545d7cbc7bf8b6e3e","index":2,"guid":"4880280a-cf8b-4884-881e-7b64ebf2afd0"} {"_id":"5c4b2b45dab381e6b3024c6d","index":3,"guid":"36d04c26-0dae-4a8e-a66e-bde9b3b6a745"} {"_id":"5c4b2b45d1103ce30dfe1947","index":4,"guid":"14d53f2c-def3-406f-9dfb-c29963fdc37e"} {"_id":"5c4b2b45d6d3b5c51d3dacb7","index":5,"guid":"a20cfc3a-934a-4b93-9a03-008ec651b5a4"} EOF kafkacat -P -b kafka:9092 -t csv << EOF 1,Sauncho,Attfield,sattfield0@netlog.com,Male,221.119.13.246 2,Luci,Harp,lharp1@wufoo.com,Female,161.14.184.150 3,Hanna,McQuillan,hmcquillan2@mozilla.com,Female,214.67.74.80 4,Melba,Lecky,mlecky3@uiuc.edu,Female,158.112.18.189 5,Mordecai,Hurdiss,mhurdiss4@rambler.ru,Male,175.123.45.143 EOF kafkacat -b kafka:9092 -o beginning -G json-consumer json links: - kafka
После изменения этого конфигурационного файла пришлось пересоздать контейнеры, сперва остановив и удалив их с помощью команды
docker-compose down
После повторного создания Docker-контейнеров из измененного YAML-файла с помощью команды docker-compose up –d, посмотрим открытые порты, запустив в консоли WSL команду
docker ps
Веб-интерфейс AKHQ успешно работает на localhost:8080, позволяя создавать и просматривать содержимое топиков в визуальном режиме. Также можно отправить сообщение в топик через GUI.
Однако, попытка подключиться к Kafka на localhost:9092 из Python-скрипта, выдала ошибку. Для проверки подключения использовался следующий скрипт:
from kafka import KafkaConsumer, KafkaProducer from kafka.errors import KafkaError import logging log_level = logging.DEBUG logging.basicConfig(level=log_level) log = logging.getLogger('kafka') log.setLevel(log_level) def test_kafka_connection(): # Define the Kafka broker address bootstrap_servers = ['localhost:9092'] # Test producer connection try: producer = KafkaProducer(bootstrap_servers=bootstrap_servers) # Send a test message to ensure the connection works producer.send('test', value='Test message'.encode('utf-8')) producer.flush() print("Successfully connected to Kafka as a producer.") except KafkaError as e: print(f"Failed to connect to Kafka as a producer: {e}") # Test consumer connection try: consumer = KafkaConsumer( 'test', bootstrap_servers=bootstrap_servers, auto_offset_reset='earliest', enable_auto_commit=True, group_id='my-group', value_deserializer=lambda x: x.decode('utf-8') ) # Check if we can consume messages for message in consumer: print(f"Consumed message: {message.value}") break # Exit after first message consumer.close() print("Successfully connected to Kafka as a consumer.") except KafkaError as e: print(f"Failed to connect to Kafka as a consumer: {e}") if __name__ == "__main__": test_kafka_connection()
Это скрипт выдает ошибку.
Ошибка связана с невозможностью необходимостью настроить слушатели Kafka. Что это такое, рассмотрим далее.
Подключение к Kafka на Docker: настройка слушателей
Слушатель — это комбинация хоста/IP, порта, и протокола. Будучи распределенной системой, Kafka позволяет публиковать данные в лидер раздела, который может быть на любом из брокеров в кластере. Когда запускается клиент, т.е. продюсер или потребитель, он запрашивает метаданные о том, какой брокер является лидером для раздела, и он может сделать это у любого брокера. Возвращаемые метаданные будут включать конечные точки, доступные для брокера-лидера для этого раздела, и клиент будет использовать их для подключения к брокеру. Именно эти конечные точки стали источником проблемы. Если запустить все на голом железе, без виртуальных машин и Docker-контейнеров, можно работать просто с локальным хостом localhost. Но при более сложных сетевых настройках и нескольких узлах, нужно использовать KafkaListenerConfigurer, интерфейс для настройки конфигурации конечных точек слушателя. Для работы с ним используются следующие конфигурации:
- KAFKA_LISTENERS— разделенный запятыми список слушателей, а также хост/IP и порт, к которому Kafka привязывается для прослушивания. Для более сложной сети это может быть IP-адрес, связанный с заданным сетевым интерфейсом на машине. Значение по умолчанию — 0.0.0.0, что означает прослушивание на всех интерфейсах. Возможно, это уже устаревшая конфигурация, поскольку ее не было в исходном YAML-файле для создания контейнеров от Confluent.
- KAFKA_ADVERTISED_LISTENERS— это список слушателей, разделенных запятыми, с их хостом/IP и портом. Это метаданные, которые передаются обратно клиентам.
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP – сопоставление, которое определяет пары ключ/значение для протокола безопасности, используемого каждым слушателем.
Брокеры взаимодействуют между собой обычно во внутренней сети. Для определения конкретного слушателя используется конфигурация KAFKA_INTER_BROKER_LISTENER_NAME (inter.broker.listener.name). Используемый хост/IP должен быть доступен с машины брокера другим компонентам. Если клиенты Kafka находятся вне сети брокера, что и было в моем случае, надо настраивать дополнительные слушатели.
Каждый слушатель, при подключении, сообщит адрес, по которому он может быть доступен. Этот адрес зависит от используемой сети. Подключение к брокеру из внутренней сети отличается от хоста/IP и порта при внешнем подключении.
Для запуска в Docker нужно настроить два слушателя Kafka:
- для коммуникации между брокерами и другими компонентами Kafka в сети Docker. Для этих коммуникаций надо использовать имя хоста Docker-контейнеров. Каждый контейнер Docker в той же сети Docker будет использовать имя хоста контейнера брокера Kafka для доступа к нему.
- трафик вне Docker: клиенты, работающие локально на хост-машине, но вне контейнера. Они будут подключаться к Kafka по localhost к порту, открытому из контейнера Docker.
На UML-диаграмме развертывания это выглядит так.
Скрипт PlantUML для этой диаграммы:
@startuml node "Host Machine" { package "Docker Container" { component "Kafka Broker" as KB { portin "внутренний\nпорт\n9092" as 9092 portin "внешний\nпорт\n29092" as 29092 } [Internal Producer] --> 9092 } [External Producer] --> 29092 } @enduml
Продолжительные исследования показали, что в конфигурационном файле docker-compose.yml необходимо изменить открытый порт для Kafka, а также следующие конфигурации:
- KAFKA_ADVERTISED_LISTENERS: ‘PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092’
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: ‘PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT’
- ports: — 29092:29092
В результате этих манипуляций конфигурационный YAML-файл docker-compose.yml выглядит так:
version: '3.6' volumes: zookeeper-data: driver: local zookeeper-log: driver: local kafka-data: driver: local services: akhq: # build: # context: . image: tchiotludo/akhq restart: unless-stopped environment: AKHQ_CONFIGURATION: | akhq: connections: docker-kafka-server: properties: bootstrap.servers: "kafka:9092" schema-registry: url: "http://schema-registry:8085" connect: - name: "connect" url: "http://connect:8083" ports: - 8080:8080 links: - kafka - schema-registry zookeeper: image: confluentinc/cp-zookeeper:7.5.0 restart: unless-stopped ports: - 2181:2181 volumes: - zookeeper-data:/var/lib/zookeeper/data:Z - zookeeper-log:/var/lib/zookeeper/log:Z environment: ZOOKEEPER_CLIENT_PORT: '2181' ZOOKEEPER_ADMIN_ENABLE_SERVER: 'false' kafka: image: confluentinc/cp-kafka:7.5.0 restart: unless-stopped volumes: - kafka-data:/var/lib/kafka/data:Z environment: KAFKA_BROKER_ID: '0' KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_NUM_PARTITIONS: '12' KAFKA_COMPRESSION_TYPE: 'gzip' KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: '1' KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: '1' KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: '1' KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: 'false' KAFKA_JMX_PORT: '9091' KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer' KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true' ports: - 29092:29092 links: - zookeeper schema-registry: image: confluentinc/cp-schema-registry:7.5.0 restart: unless-stopped depends_on: - kafka ports: - 8081:8081 environment: SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka:9092' SCHEMA_REGISTRY_HOST_NAME: 'schema-registry' SCHEMA_REGISTRY_LISTENERS: 'http://0.0.0.0:8085' SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: 'INFO' connect: image: confluentinc/cp-kafka-connect:7.5.0 restart: unless-stopped depends_on: - kafka - schema-registry ports: - 8083:8083 environment: CONNECT_BOOTSTRAP_SERVERS: 'kafka:9092' CONNECT_REST_PORT: '8083' CONNECT_REST_LISTENERS: 'http://0.0.0.0:8083' CONNECT_REST_ADVERTISED_HOST_NAME: 'connect' CONNECT_CONFIG_STORAGE_TOPIC: '__connect-config' CONNECT_OFFSET_STORAGE_TOPIC: '__connect-offsets' CONNECT_STATUS_STORAGE_TOPIC: '__connect-status' CONNECT_GROUP_ID: 'kafka-connect' CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: 'true' CONNECT_KEY_CONVERTER: 'io.confluent.connect.avro.AvroConverter' CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8085' CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: 'true' CONNECT_VALUE_CONVERTER: 'io.confluent.connect.avro.AvroConverter' CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8085' CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter' CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter' CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '1' CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '1' CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '1' CONNECT_PLUGIN_PATH: ' /usr/share/java/' ksqldb: image: confluentinc/cp-ksqldb-server:7.5.0 restart: unless-stopped depends_on: - kafka - connect - schema-registry ports: - 8088:8088 environment: KSQL_BOOTSTRAP_SERVERS: 'kafka:9092' KSQL_LISTENERS: 'http://0.0.0.0:8088' KSQL_KSQL_SERVICE_ID: 'ksql' KSQL_KSQL_SCHEMA_REGISTRY_URL: 'http://schema-registry:8085' KSQL_KSQL_CONNECT_URL: 'http://connect:8083' KSQL_KSQL_SINK_PARTITIONS: '1' KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: '1' test-data: image: gradle:8-jdk11 command: "gradle --no-daemon testInjectData -x installFrontend -x assembleFrontend" restart: unless-stopped working_dir: /app volumes: - ./:/app:z links: - kafka - schema-registry kafkacat: image: confluentinc/cp-kafkacat:7.1.14 restart: unless-stopped depends_on: - kafka command: - bash - -c - | kafkacat -P -b kafka:9092 -t json << EOF {"_id":"5c4b2b45ab234c86955f0802","index":0,"guid":"d3637b06-9940-4958-9f82-639001c14c34"} {"_id":"5c4b2b459ffa9bb0c0c249e1","index":1,"guid":"08612fb5-40a7-45e5-9ff2-beb89a1b2835"} {"_id":"5c4b2b4545d7cbc7bf8b6e3e","index":2,"guid":"4880280a-cf8b-4884-881e-7b64ebf2afd0"} {"_id":"5c4b2b45dab381e6b3024c6d","index":3,"guid":"36d04c26-0dae-4a8e-a66e-bde9b3b6a745"} {"_id":"5c4b2b45d1103ce30dfe1947","index":4,"guid":"14d53f2c-def3-406f-9dfb-c29963fdc37e"} {"_id":"5c4b2b45d6d3b5c51d3dacb7","index":5,"guid":"a20cfc3a-934a-4b93-9a03-008ec651b5a4"} EOF kafkacat -P -b kafka:9092 -t csv << EOF 1,Sauncho,Attfield,sattfield0@netlog.com,Male,221.119.13.246 2,Luci,Harp,lharp1@wufoo.com,Female,161.14.184.150 3,Hanna,McQuillan,hmcquillan2@mozilla.com,Female,214.67.74.80 4,Melba,Lecky,mlecky3@uiuc.edu,Female,158.112.18.189 5,Mordecai,Hurdiss,mhurdiss4@rambler.ru,Male,175.123.45.143 EOF kafkacat -b kafka:9092 -o beginning -G json-consumer json links: - kafka
Теперь открытые порты выглядят так:
Успех внесенных изменений подтверждает ответ тестового скрипта, в котором идет обращение к bootstrap_servers = [‘localhost:29092’].
Таким образом, работа с Docker-контейнерами оказалась не так проста, как это казалось сначала. Завтра я продолжу работать с этим развертыванием и покажу пример публикации сообщений в топики и потребления их оттуда.
Освойте администрирование и эксплуатацию Apache Kafka на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Apache Kafka для инженеров данных
- Администрирование кластера Kafka
- Администрирование Arenadata Streaming Kafka
Источники: