1. Introduction

  • ack:在消息被认为成功写入前需确认(acknowledge)接收消息的broker的数量;

  • 使用min.insync.replicas和此生产者配置(如acks=all);

  • 生产者只向分区的当前leader broker写入数据,生产者还必须确认acks的级别,
    以指定在将消息视为成功写入之前,是否必须将消息写入最小数量的副本;

2. Ack Setting

  • kafka ≥ 3.0,acks = all,否则acks = 1;

2.1. Ack0

Ack0
  • 当acks = 0,生产者在发送消息的那一刻就认为消息
    "written successfully",而根本无需等待broker接受它;

  • 若brokeroffline或发生异常,我们将不知道,并将丢失数据;

  • 这对可能potentially丢失消息的数据(如metric collection)很有用,
    且因网络开销overhead最小化,故可产生高吞吐produce highest throughput设置;

2.2. Ack1

Ack1
  • 当acks = 1时,当消息仅由leader确认时,生产者会将消息视为"written successfully";

  • 请求leader响应(request leader response),但不能保证guarantee复制replication;

  • 因它发生在后台,若没收到(receive)ack,生产者可重试请求;

  • 若leader broker offline,但副本replica尚未复制replicate数据,则数据丢失;

2.3. AckAll

AckAll
  • 当acks = all时,当消息被所有ISR接受时,生产者将消息视为"written successfully";

  • 分区的lead replica检查是否有足够的ISR用于安全的写入消息(由min.insync.replicas控制);

  • 请求被存储在缓冲区,直到leader观察到observe follower replica
    已复制replicate消息,此时成功的确认acknowledgement被发送会客户端;

  • min.insync.replicas可在主题和broker级别配置,
    当数据写入所有ISR,即视为已提交commit数据;

  • min.insync.replicas为2时,即至少有2个
    ISR broker(含leader)必须响应respond他们拥有数据;

  • 若要确保提交的数据写入多个replica,则需将ISR的最小数量设置为更高的值;

  • 若主题由3个replica,则且将min.insync.replicas设置为2,
    则只有在3个replica中至少由两个同步in-sync的情况下,才能写入该主题中的分区;

  • 当所有三个副本都同步时,一切正常进行,若其中某个replica不可用unavailable,也是如此;

  • 但三分之二的replica不可用,broker将不再接受produce request,
    相反尝试发送数据的生产者将收到NotEnoughReplicasException;

3. Topic Durability Availability

TopicReplicationIsrMsgSafety
  • 主题复制因子topic replication factor为3,主题数据的
    持久性durability可承受(withstand)2个broker的损失(loss);

  • 一般对与N个复制因子,可永久丢失(N - 1)个broker,且仍可恢复数据;

  • 可用性availability稍微复杂,此处先假定复制因子为3;

  • 读取Read:只要一个分区已启动(up)并被视为ISR,该主题就可读取;

  • 写Write:当acks=0且acks=1,只要一个分区已启动,并被视为ISR,
    该主题就可写入,当acks=all时,情况相对复杂:

Entry Memo

min.insync.replicas=1

默认,主题必须至少有一个分区作为ISR(含reader),
这样可容忍tolerate两个broker宕机down;

min.insync.replicas=2

主题必须至少有2个ISR启动,那最多可容忍一个broker down(在复制因子
为3的情况下),且我们保证每次写入数据时,数据至少会被写入2次;

min.insync.replicas=3

复制因子为3,没多大意义,不能容忍任何broker宕机;

Summary

总之,当acks=all带有replication.factor=N和min.insync.replicas=M时,
我们可容忍(N - M)个broker出于主题可用性的目的而宕机

  • acks = all和min.insync.replicas = 2时最流行的数据持久性durability
    和可用性availability选项,最多可承受withstand一个broker的损失loss;

4. Overriding Topic Config

  • 可使用kafka-configs.sh更改主题配置min.insync.replicas,

  • 此配置在broker级别默认为1,此配置的典型值(replication factor - 1);
    即当复制因子为3时,min.insync.replicas应为2;

  • 创建3个分区,复制因子为1,名为topic-config的主题:

./kafka-topics.sh --bootstrap-server 192.168.0.123:9092 \
	--topic topic-config --create \
	--partitions 3 --replication-factor 1
  • 描述(describe)主题以检查是否为此主题已设置任何配置覆盖(override);

./kafka-topics.sh --bootstrap-server 192.168.0.123:9092 \
	--describe --topic topic-config
  • 将主题topic-config配置min.insync.replicas为2:

./kafka-configs.sh --bootstrap-server 192.168.0.123:9092 \
	--alter --entity-type topics --entity-name topic-config \
	--add-config min.insync.replicas=2
  • 重新描述(describe)主题以检查是否为此主题已设置任何配置覆盖(override);

./kafka-topics.sh --bootstrap-server 192.168.0.123:9092 \
	--describe --topic topic-config
  • 现在可看到主题配置有属性:Configs: min.insync.replicas=2;

  • 可通过—​delete-config替换—​add-config来删除属性;

./kafka-configs.sh --bootstrap-server 192.168.0.123:9092 \
	--alter --entity-type topics --entity-name topic-config \
	--delete-config min.insync.replicas
  • 再次描述(describe)主题以检查是否为此主题已设置任何配置覆盖(override);

./kafka-topics.sh --bootstrap-server 192.168.0.123:9092 \
	--describe --topic topic-config

5. min.insync.replicas

5.1. Via Config File

  • 此配置的默认值为1,但这可在broker级别进行更改;

  • 打开broker配置,并在文件末尾追加如下内容:

# config/server.properties

min.insync.replicas = 2
  • 与kafka-configs.sh不同,kafka-configs.sh可在运行时修改,
    而通过配置文件方式修改则需重新启动broker;

5.2. Dynamic Config

  • 通过kafka-configs.sh动态更改broker配置,而无需重启broker;

./kafka-configs.sh --bootstrap-server 192.168.0.123:9092 \
	--alter --entity-type brokers --entity-default \
	--add-config min.insync.replicas=2
  • 描述broker动态更新配置:

./kafka-configs.sh --bootstrap-server 192.168.0.123:9092 \
	--describe --entity-type brokers --entity-default

Default configs for brokers in the cluster are:
  min.insync.replicas=2 sensitive=false
  	synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:min.insync.replicas=2}
  • 删除动态配置:

./kafka-configs.sh --bootstrap-server 192.168.0.123:9092 \
	--alter --entity-type brokers --entity-default \
	--delete-config min.insync.replicas
  • 描述broker动态更新配置:

./kafka-configs.sh --bootstrap-server 192.168.0.123:9092 \
	--describe --entity-type brokers --entity-default