1. Prerequisite

./kafka-console-consumer.sh \
	--bootstrap-server 192.168.0.123:9092 \
	--topic topic-elf --create \
	--partitions 3 --replication-factor 1

2. Consume Msg

# 消费未来消息
./kafka-console-consumer.sh --bootstrap-server 192.168.0.123:9092 \
	--topic topic-elf

# 消费所有历史消息及未来消息
./kafka-console-consumer.sh --bootstrap-server 192.168.0.123:9092 \
	--topic topic-elf --from-beginning
  • 消费者将保持打开状态,直到退出,并将继续在屏幕上显示消息;

  • 它假设所有传入的消息都可反序列化为文本(字符串);

  • 默认:Kafka Console Consumer不显示key或任何分区信息;

  • 消息只在分区级别内排序,因一个主题可能由多个分区创建,
    若只有单分区,则看到的消息总是排序的;

  • 若没任何输出,但主题中有数据,请不要忘记使用—​from-beginning选项;

  • 若需读取历史数据,请使用—​from-beginning选项,
    否则只能显示和读取未来的数据,忌用—​zookeeper选项;

  • 默认,消息不会显示key或元数据信息,若主题不存在,则会默认自动创建;

  • 可使用逗号分隔的列表或模式,一次使用多个主题;

  • 若未指定消费者组Id,则会生产随机消费者组;

  • 若消息未按顺序显示,请记住消息的顺序是分区级别,而非主题级别;

3. Option

Entry Memo

--from-beginning

读取所有历史消息,及未来信息

--formatter

以特定格式显示消息

--consumer-property

传入任何消费者属性,如allow.auto.create.topics

--group

默认会选择随机消费者组Id,可覆盖(override)

--max-messages

退出前消耗的最大消息数

--partition

只想从特定分区消费

4. Show Key Value

  • 默认:仅显示kafka record value,使用此命令可显示key和value;

  • 使用格式化程序kafka.tools.DefaultMessageFormatter,
    并使用属性print.timestamp=true,print.key=true,print.value=true:

./kafka-console-consumer.sh --bootstrap-server 192.168.0.123:9092 \
	--topic topic-elf --formatter kafka.tools.DefaultMessageFormatter \
	--property print.timestamp=true --property print.key=true \
	--property print.value=true --from-beginning
  • 其它属性:print.partition,print.offset,print.headers,
    key.separator,line.separator,headers.separator;