1. Prerequisite
-
镜像选择:apache/kafka:3.8.0或confluentinc/cp-kafka:7.6.1
-
schema-registry可选,docker compose up -d;
-
https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
-
不建议Windows下使用KRaft模式,如可能无法删除主题等诸多问题;
-
常用工具:kafkatool,conduktor,kafka-eagle,PrettyZoo,
-
docker pull quay.io/strimzi/kafka:0.40.0-kafka-3.8.0
-
https://github.com/wikimedia/mediawiki-services-eventstreams
2. Property Variable
Configuration Property | Environment Variable |
---|---|
.(abc.def) |
_(KAFKA_ABC_DEF) |
_(abc_def) |
__(double underscore,KAFKA_ABC__DEF) |
-(abc-def) |
_(triple underscore,KAFKA_ABC___DEF) |
3. Generate ClusterId
# generate clusterId
./kafka-storage.sh random-uuid
cat /proc/sys/kernel/random/uuid | tr -d '-' | base64 | cut -b 1-22
uuidgen --time | tr -d '-' | base64 | cut -b 1-22
4. Environment Variable
-
KAFKA_NODE_ID:Kafka节点唯一标识(uniquely identifying);
-
KAFKA_CONTROLLER_LISTENER_NAMES:
控制器名称,逗号分隔,KRaft模式必须设置,因默认PLAINTEXT协议; -
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:
将监听器名称映射到响应的安全协议,为未经身份认证且未加密的PLAINTEXT
安全协议分配三个监听器(CONTROLLER,INTERNAL,EXTERNAL); -
KAFKA_LISTENERS:服务器监听的地址
-
CONTROLLER:controller quorum communication;
-
INTERNAL:internal communication between DATA PLANE broker
及与broker位于同一网络上的客户端(如Schema Registry); -
EXTERNAL:用于与broker网络外部的客户端通信,
如连接到Docker中运行的Kafka集群的计算机上运行的应用程序;
-
-
KAFKA_ADVERTISED_LISTENERS:
address advertised to client,若未设置,则默认KAFKA_LISTENERS; -
KAFKA_CONTROLLER_QUORUM_VOTERS:
-
逗号分隔的controller voter(控制器投票者)列表;
-
格式:{controllerId}@{controllerHost}:{controllerPort};
-
quorum中controller使用29093端口;
-
-
KAFKA_PROCESS_ROLES:指定节点的角色:broker,controller,both;
-
CLUSTER_ID:集群的唯一16字节Base64 Uuid;
-
KAFKA_LOG_DIRS:存储日志文件的以逗号分隔的目录列表;
-
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS:
当第一个成员(member)加入空组(empty group)时,GroupCoordinator
用于启动初始重新平衡(start initial rebalance)的延迟(delay); -
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR:
__kafka_offsets的复制因子,该主题存储提交的偏移量,
默认为1,但我们在3个broker集群中调整为3; -
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR:
-
__transaction_state的复制因子,
该主题存储事务状态,并在第一次请求事务Api时自动创建; -
复制因子是节点的数量,包括Leader;
-
主机端口9092,9093,9094映射到容器的端口9092,
使broker网络外部的Kafka客户端能够连接到集群;
-
-
每个Kafka节点的其它KRaft属性可使用server.properties配置,
在Docker Compose中设置属性时,请记住使用"KAFKA_"前缀;
5. Schema Registry
-
SCHEMA_REGISTRY_HOST_NAME:advertised host name,
Schema Registry的多个实例间相互通信(intercommunication)所必须; -
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS:
要连接到的Kafka broker的列表; -
SCHEMA_REGISTRY_LISTNERS:默认默认http://0.0.0.0.8081,
支持Http或Https的Api请求的监听器的逗号分隔列表; -
其它属性:https://docs.confluent.io/platform/current/schema-registry/installation/config.html
6. Docker Compose
-
请严格按照官方配置进行安装配置:
-
如:./jvm/single-node/plaintext/docker-compose.yml
version: '3.8'
services:
kafka:
image: apache/kafka:3.8.0
container_name: kafka
privileged: true
hostname: kafka
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_LOG_DIRS: '/tmp/kafka-log'
CLUSTER_ID: 'YWU3MzE1YmVmYzhiMTFlZT'
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093'
KAFKA_LISTENERS: 'CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT_HOST://192.168.0.123:9092,PLAINTEXT://kafka:19092'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
networks:
default:
name: network-common
external: true
7. KRaft server.properties
-
因Kafka目前还没到4.X版本,目前只修改了config/server.properties,
并未修改config/kraft/server.properties -
若不修改kraft/server.properties,则可能出现各种奇奇怪怪的问题,
如能发送消息,但不能消费等; -
当然也可通过数据卷映射:
/elf/kafka/kraft:/opt/kafka/config/kraft -
也可自定义Docker镜像:
https://blog.csdn.net/Vector97/article/details/128237505
docker compose up -d
docker exec -it kafka /bin/bash
cd /opt/kafka/config/kraft
vi server.properties
#controller.quorum.voters=1@localhost:9093
controller.quorum.voters=1@192.168.0.123:9093
#listeners=PLAINTEXT://:9092,CONTROLLER://:9093
listeners=CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092
#advertised.listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT_HOST://192.168.0.123:9092,PLAINTEXT://kafka:19092
docker compose stop
docker compose start
8. Install On Linux
-
安装Jdk17,下载并解压Kafka;
-
Generate Cluster Id:./bin/kafka-storage.sh random-uuid
-
Format Storage Directory(替换UUID):
.bin/kafka-storage.sh format -t <uuid> -c ~/config/kraft/server.properties
-
这将格式化config/kraft/server.properties文件中
log.dirs中的目录,默认为/tmp/kraft-combined-logs; -
启动broker,不要关闭Shell窗口,因它回关闭Broker:
./bin/kafka-server-start.sh ~/config/kraft/server.properties
-
设置PATH:PATH="$PATH:/elf/kafka/bin";
-
kafka-topics.sh