1. Roundup

Command Line Interface Memo

kafka-topics

Kafka Topic Management

kafka-console-producer

Kafka Producer

kafka-console-consumer

Kafka Consumer

kafka-console-consumer

Kafka Consumer in Consumer Group

kafka-consumer-groups

Kafka Consumer Group Management

connect-standalone

Kafka Connect Cli

2. Prerequisite

docker exec -it kafka /bin/bash

cd /opt/kafka/bin && ./kafka-topics.sh --version


docker exec -it kafka /opt/kafka/bin/kafka-console-consumer.sh \
	--bootstrap-server kafka:9092 --topic simple-connect --from-beginning

3. Create Topic

  • kafka-topics用于创建create,删除delete,描述describe和更改change主题;

  • 提供mandatory parameter:

Parameter Case

Host Port

192.168.0.123:9092

Mandatory Parameter

topic名称,partition数,replication factor

# 主题不建议使用句点period('.')或下划线underscore('_')
./kafka-topics.sh \
	--bootstrap-server 192.168.0.123:9092  \
	--topic first_topic --create \
	--partitions 3 --replication-factor 1
  • kafka-topics.sh --create常见错误:

  • 主题名称一定要注意,如空格,无法通过正则:"[a-zA-Z0-9\\._\\-]";

    • 指定的复制因子不能大于拥有的broker数量;可指定任意数量的分区,推荐3;

    • partition和replication factor没默认值,需显式指定;

  • Kafka Topic Config
    #--config max.message.bytes=64000

4. List Topic

./kafka-topics.sh --bootstrap-server 192.168.0.123:9092 --list

* 此命令会列出内部主题,如 \__consumer_offsets

  • 不要删除这些主题,可使用 --exclude internal 来排除(隐藏);

  • 根据复制状态过滤主题:

Option Memo

--at-min-isr-partitions

若在描述主题时设置,则仅显示isr计数等于配置的最小值的分区

--unavailable-partitions

仅显示其leader不可用的分区

--under-min-isr-partitions

仅显示isr计数小于配置的最小值的分区

--under-replicated-partitions

仅在已复制分区下显示

5. Describe Topic

  • 可指定逗号分隔的主题列表来描述多个主题,若不指定—​topic选项,则描述所有主题;

  • partition Id 和 broker Id很容易混淆;

./kafka-topics.sh --bootstrap-server 192.168.0.123:9092 \
	--describe --topic first_topic
Topic: first_topic      TopicId: VeA2QbLNShqi06DSFsF1bA PartitionCount: 3       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: first_topic      Partition: 0    Leader: 1       Replicas: 1     Isr: 1
        Topic: first_topic      Partition: 1    Leader: 1       Replicas: 1     Isr: 1
        Topic: first_topic      Partition: 2    Leader: 1       Replicas: 1     Isr: 1
  • Leader:1对分区0,Id为1的broker是Leader;

  • Replica:1表示对分区0,Id为1的broker是一个副本;

  • Isr:1:表示对分区0,Id为1的broker是一个同步副本(Isr);

  • 此命令在Kafka Cluster场景下的结果:

Topic: third_topic	TopicId: 84yqCErzTG27J4wv44dkPQ	PartitionCount: 4	ReplicationFactor: 3	Configs: cleanup.policy=delete
Topic: third_topic	Partition: 0	Leader: 2	Replicas: 2,3,1	Isr: 2,3,1
Topic: third_topic	Partition: 1	Leader: 3	Replicas: 3,1,2	Isr: 3,1,2
Topic: third_topic	Partition: 2	Leader: 1	Replicas: 1,2,3	Isr: 1,2,3
Topic: third_topic	Partition: 3	Leader: 2	Replicas: 2,1,3	Isr: 2,1,3
  • Leader:2表示对分区0,Id为2的broker是Leader;

  • Replica:2,3,1表示对于分区0,Id为2、3和1的broker是副本;

  • Isr:2,3,1表示对于分区0,Id为2、3和1的broker是ISR中的副本;

  • 根据复制状态过滤分区,类似于List Topic:

Option Memo

--at-min-isr-partitions

若在描述主题时设置,则仅显示isr计数等于配置的最小值的分区

--unavailable-partitions

仅显示其leader不可用的分区

--under-min-isr-partitions

仅显示isr计数小于配置的最小值的分区

--under-replicated-partitions

仅在已复制分区下显示

6. Increase Topic Partition

  • Danger:Increasing Partition

  • 若应用依赖于基于key的排序,那增加主题中分区数量是危险操作

  • 可创建新主题,并将所有数据复制过来,以便正确的重新分发key;

  • 故当用户依赖于基于key的排序时,不建议运行此命令,
    因更改分区数会更改key-hashing技术;

  • 只能添加分区,不能删除分区;

# 此命令无任何输出

./kafka-topics.sh --bootstrap-server 192.168.0.123:9092 \
	--alter --topic first_topic --partitions 5

7. Delete Topic

  • 可使用逗号分隔列表来一次性删除多个主题,此操作无任何输出;

  • 若未启用主题删除,delete.topic.enable broker,
    则该主题被标记未删除,但不会物理删除;

  • 删除主题可能耗时,可能先返回空,但实际还未完全删除完;

  • Win用户(非WSL2运行),不要删除主题,KAFKA-1194会导致Kafka奔溃;

  • 要恢复该错误,需手动删除/data/kafka中的主题文件夹;

./kafka-topics.sh --bootstrap-server 192.168.0.123:9092 \
	--delete --topic first_topic