1. Reset Consumer Group

  • 查找broker的主机和端口;

  • 了解offset reset strategy:to earliest最早,
    to latest最晚,to specific offset特定偏移,shift by(偏移)……

  • 停止正在运行的消费者组(否则命令将失败);

  • 将kafka-consumer-groups.sh与—​reset-offsets合用;

  • 示例:reset offsets to earliest;

  • 确保消费者被停止,即无活动成员(no active member);

# 此命令可观察消费者组的当前偏移量,也可查看是否有活动成员

./kafka-consumer-groups.sh --bootstrap-server 192.168.0.123:9092 \
	--describe --group elf-app

2. to-earliest

  • 把偏移量重置到最早位置,以便重新读取整个主题;reset offset to earliest
    position in order to read topic entirely again;

# 可看到,新的偏移量均为0
./kafka-consumer-groups.sh --bootstrap-server 192.168.0.123:9092 \
	--group elf-app --reset-offsets --to-earliest --execute --topic topic-elf
  • 重新消费:注:消息按每个分区的顺序读取,而非跨分区读取,此处有三个分区;

./kafka-console-consumer.sh --bootstrap-server 192.168.0.123:9092 \
	--topic topic-elf --group elf-app

3. shift by

* 停止正在运行的所有消费者,以便能重置偏移量;

  • shift by允许将偏移量rewind到特定值,负值negative表示
    在消息中倒退(go back),正值positive表示在消息中前进;

./kafka-consumer-groups.sh --bootstrap-server 192.168.0.123:9092 \
	--describe --group elf-app
  • 此处在主题topic-elf上订阅的消费者组的偏移量移动-2来重置偏移量:

./kafka-consumer-groups.sh --bootstrap-server 192.168.0.123:9092 \
	--group elf-app --reset-offsets --shift-by -2 --execute --topic topic-elf
  • 每个分区的偏移量减少2;再次读取消息,只会从主题的每个分区返回最后两条消息;

./kafka-console-consumer.sh --bootstrap-server 192.168.0.123:9092 \
	--topic topic-elf --group elf-app

4. Option

  • 若消费者组中的消费者处于活动状态,则无法重置该消费者组;

  • 此命令可用于重新处理(reprocess data)

Option Memo

--all-groups

所有组,谨慎使用

--all-topics

组中所有主题,谨慎使用

--by-duration

按持续时间重置为偏移

--dry-run

试运行:仅显示预期结果,但不实际运行命令

其它选项

--to-datetime,--by-period,--to-earliest,
--to-latest,--shift-by,--from-file,--to-current

5. List Consumer Group

  • 列出消费者组状态,以确定那个组down,not stable等;

  • 描述所有消费者组和状态,有助于分配策略
    assignment strategy和协调员Id(coordinatorId);

  • 可使用具体的消费者组名称,替代—​all-groups;

# 列出消费者组状态
./kafka-console-consumer.sh --bootstrap-server 192.168.0.123:9092 \
	--list --state

# 描述所有消费者组和状态
./kafka-consumer-groups.sh --bootstrap-server 192.168.0.123:9092 \
	--describe --all-groups --state

6. Delete Consumer Group

./kafka-consumer-groups.sh --bootstrap-server 192.168.0.123:9092 \
	--delete --group elf-app
  • 或指向删除特定主题的偏移量(当消费者组正在从多个主题读取时很有用),可使用命令:

./kafka-consumer-groups.sh --bootstrap-server 192.168.0.123:9092 \
	--delete-offsets --group elf-app --topic topic-elf