1. Reset Consumer Group
-
查找broker的主机和端口;
-
了解offset reset strategy:to earliest最早,
to latest最晚,to specific offset特定偏移,shift by(偏移)…… -
停止正在运行的消费者组(否则命令将失败);
-
将kafka-consumer-groups.sh与—reset-offsets合用;
-
示例:reset offsets to earliest;
-
确保消费者被停止,即无活动成员(no active member);
# 此命令可观察消费者组的当前偏移量,也可查看是否有活动成员
./kafka-consumer-groups.sh --bootstrap-server 192.168.0.123:9092 \
--describe --group elf-app
2. to-earliest
-
把偏移量重置到最早位置,以便重新读取整个主题;reset offset to earliest
position in order to read topic entirely again;
# 可看到,新的偏移量均为0
./kafka-consumer-groups.sh --bootstrap-server 192.168.0.123:9092 \
--group elf-app --reset-offsets --to-earliest --execute --topic topic-elf
-
重新消费:注:消息按每个分区的顺序读取,而非跨分区读取,此处有三个分区;
./kafka-console-consumer.sh --bootstrap-server 192.168.0.123:9092 \
--topic topic-elf --group elf-app
3. shift by
* 停止正在运行的所有消费者,以便能重置偏移量;
-
shift by允许将偏移量rewind到特定值,负值negative表示
在消息中倒退(go back),正值positive表示在消息中前进;
./kafka-consumer-groups.sh --bootstrap-server 192.168.0.123:9092 \
--describe --group elf-app
-
此处在主题topic-elf上订阅的消费者组的偏移量移动-2来重置偏移量:
./kafka-consumer-groups.sh --bootstrap-server 192.168.0.123:9092 \
--group elf-app --reset-offsets --shift-by -2 --execute --topic topic-elf
-
每个分区的偏移量减少2;再次读取消息,只会从主题的每个分区返回最后两条消息;
./kafka-console-consumer.sh --bootstrap-server 192.168.0.123:9092 \
--topic topic-elf --group elf-app
4. Option
-
若消费者组中的消费者处于活动状态,则无法重置该消费者组;
-
此命令可用于重新处理(reprocess data)
Option | Memo |
---|---|
--all-groups |
所有组,谨慎使用 |
--all-topics |
组中所有主题,谨慎使用 |
--by-duration |
按持续时间重置为偏移 |
--dry-run |
试运行:仅显示预期结果,但不实际运行命令 |
其它选项 |
--to-datetime,--by-period,--to-earliest, |
5. List Consumer Group
-
列出消费者组状态,以确定那个组down,not stable等;
-
描述所有消费者组和状态,有助于分配策略
assignment strategy和协调员Id(coordinatorId); -
可使用具体的消费者组名称,替代—all-groups;
# 列出消费者组状态
./kafka-console-consumer.sh --bootstrap-server 192.168.0.123:9092 \
--list --state
# 描述所有消费者组和状态
./kafka-consumer-groups.sh --bootstrap-server 192.168.0.123:9092 \
--describe --all-groups --state