1. Create Consumer In Group

  • 要启动消费者组中的消费者:

    • 创建一个至少有两个分区的主题,并向其发送数据;

    • 创建kafka-console-consumer并使用 --group指定组名称;

    • 打开新的terminal或shell window;

    • 创建第二个kafka-console-consumer,同样也使用—​group参数指定组名称;

    • 发送数据到主题,可看到消费者共享读取(consumer sharing read);

2. Create Consumer Group

  • 一个组中的消费者不能多于Kafka 主题中的分区,故需先创建具有几个分区的主题;

./kafka-topics.sh \
	--bootstrap-server 192.168.0.123:9092  \
	--topic topic-elf --create \
	--partitions 3 --replication-factor 1
  • 在名为(elf-app)的消费者组中启动消费者:

./kafka-console-consumer.sh --bootstrap-server 192.168.0.123:9092 \
	--topic topic-elf --group elf-app
  • 打开新的的终端terminal,并在同一消费者组(elf-app)
    中启动第二个消费者,即使用完全相同的命令;

./kafka-console-consumer.sh --bootstrap-server 192.168.0.123:9092 \
	--topic topic-elf --group elf-app
  • 打开新的终端terminal,并在同一消费者组(elf-app)中启动第三个消费者:

./kafka-console-consumer.sh --bootstrap-server 192.168.0.123:9092 \
	--topic topic-elf --group elf-app
  • 消费者组elf-app中每个消费者都将被分配一个分区;在该主题中生成一些字符串消息;

./kafka-console-producer.sh --bootstrap-server 192.168.0.123:9092 \
	--topic topic-elf

> first message
> second message
> third message
> fourth message
  • 每个消费者将仅显示分配给它们分区上生成的消息:

ConsumerGroupA
  • 若停止某个消费者,则消息自动发送给剩余消费者,
    因消费者组中的消费者会自动执行消费者重新平衡(consumer rebalance);

ConsumerGroupB
  • 停止所有消费者:

ConsumerGroupC
  • 继续对主题进行生产(producing):

> eigth message
> ninth message
> tenth message
  • 组中的消费者重新启动后,消费者将从最新提交的偏移量
    (latest committed offset)中读取并仅读取刚刚生成的消息;

./kafka-console-consumer.sh --bootstrap-server 192.168.0.123:9092 \
	--topic topic-elf --group elf-app

eigth message
ninth message
tenth message

3. Gotcha

  • 若使用 --group命令在消费者组中消费,那若随后尝试对同一组
    使用—​from-beginning选项,它将被忽略,相反需重置费者组;

  • 若不指定—​group选项,消费者的消费组将是随机的消费组,
    如console-consumer-11984;

  • 若看到一个消费者已获取所有消息,这可能意味主题仅使用1个分区创建,
    可使用kafka-topics --describe命令验证;