1. Prerequisite

2. Property Variable

Configuration Property Environment Variable

.(abc.def)

_(KAFKA_ABC_DEF)

_(abc_def)

__(double underscore,KAFKA_ABC__DEF)

-(abc-def)

_(triple underscore,KAFKA_ABC___DEF)

3. Generate ClusterId

# generate  clusterId

./kafka-storage.sh random-uuid

cat /proc/sys/kernel/random/uuid | tr -d '-' | base64 | cut -b 1-22

uuidgen --time | tr -d '-' | base64 | cut -b 1-22

4. Environment Variable

  • KAFKA_NODE_ID:Kafka节点唯一标识(uniquely identifying);

  • KAFKA_CONTROLLER_LISTENER_NAMES
    控制器名称,逗号分隔,KRaft模式必须设置,因默认PLAINTEXT协议;

  • KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
    将监听器名称映射到响应的安全协议,为未经身份认证且未加密的PLAINTEXT
    安全协议分配三个监听器(CONTROLLER,INTERNAL,EXTERNAL);

  • KAFKA_LISTENERS:服务器监听的地址

    • CONTROLLER:controller quorum communication;

    • INTERNAL:internal communication between DATA PLANE broker
      及与broker位于同一网络上的客户端(如Schema Registry);

    • EXTERNAL:用于与broker网络外部的客户端通信,
      如连接到Docker中运行的Kafka集群的计算机上运行的应用程序;

  • KAFKA_ADVERTISED_LISTENERS
    address advertised to client,若未设置,则默认KAFKA_LISTENERS;

  • KAFKA_CONTROLLER_QUORUM_VOTERS

    • 逗号分隔的controller voter(控制器投票者)列表;

    • 格式:{controllerId}@{controllerHost}:{controllerPort};

    • quorum中controller使用29093端口;

  • KAFKA_PROCESS_ROLES:指定节点的角色:broker,controller,both;

  • CLUSTER_ID:集群的唯一16字节Base64 Uuid;

  • KAFKA_LOG_DIRS:存储日志文件的以逗号分隔的目录列表;

  • KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS
    当第一个成员(member)加入空组(empty group)时,GroupCoordinator
    用于启动初始重新平衡(start initial rebalance)的延迟(delay);

  • KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
    __kafka_offsets的复制因子,该主题存储提交的偏移量,
    默认为1,但我们在3个broker集群中调整为3;

  • KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR

    • __transaction_state的复制因子,
      该主题存储事务状态,并在第一次请求事务Api时自动创建;

    • 复制因子是节点的数量,包括Leader;

    • 主机端口9092,9093,9094映射到容器的端口9092,
      使broker网络外部的Kafka客户端能够连接到集群;

  • 每个Kafka节点的其它KRaft属性可使用server.properties配置,
    在Docker Compose中设置属性时,请记住使用"KAFKA_"前缀;

5. Schema Registry

  • SCHEMA_REGISTRY_HOST_NAME:advertised host name,
    Schema Registry的多个实例间相互通信(intercommunication)所必须;

  • SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS
    要连接到的Kafka broker的列表;

  • SCHEMA_REGISTRY_LISTNERS:默认默认http://0.0.0.0.8081,
    支持Http或Https的Api请求的监听器的逗号分隔列表;

  • 其它属性:https://docs.confluent.io/platform/current/schema-registry/installation/config.html

6. Docker Compose

version: '3.8'
services:
  kafka:
    image: apache/kafka:3.8.0
    container_name: kafka
    privileged: true
    hostname: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_LOG_DIRS: '/tmp/kafka-log'
      CLUSTER_ID: 'YWU3MzE1YmVmYzhiMTFlZT'

      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093'

      KAFKA_LISTENERS: 'CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT_HOST://192.168.0.123:9092,PLAINTEXT://kafka:19092'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'

      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
networks:
  default:
    name: network-common
    external: true

7. KRaft server.properties

  • 因Kafka目前还没到4.X版本,目前只修改了config/server.properties,
    并未修改config/kraft/server.properties

  • 若不修改kraft/server.properties,则可能出现各种奇奇怪怪的问题,
    如能发送消息,但不能消费等;

  • 当然也可通过数据卷映射:
    /elf/kafka/kraft:/opt/kafka/config/kraft

  • 也可自定义Docker镜像:
    https://blog.csdn.net/Vector97/article/details/128237505

docker compose up -d

docker exec -it kafka /bin/bash

cd /opt/kafka/config/kraft

vi server.properties
#controller.quorum.voters=1@localhost:9093
controller.quorum.voters=1@192.168.0.123:9093

#listeners=PLAINTEXT://:9092,CONTROLLER://:9093
listeners=CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092

#advertised.listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT_HOST://192.168.0.123:9092,PLAINTEXT://kafka:19092
docker compose stop

docker compose start

8. Install On Linux

  • 安装Jdk17,下载并解压Kafka;

  • Generate Cluster Id:./bin/kafka-storage.sh random-uuid

  • Format Storage Directory(替换UUID):

.bin/kafka-storage.sh format -t <uuid> -c ~/config/kraft/server.properties
  • 这将格式化config/kraft/server.properties文件中
    log.dirs中的目录,默认为/tmp/kraft-combined-logs;

  • 启动broker,不要关闭Shell窗口,因它回关闭Broker:

./bin/kafka-server-start.sh ~/config/kraft/server.properties
  • 设置PATH:PATH="$PATH:/elf/kafka/bin"

  • kafka-topics.sh