1. Introduction

  • 当生产者向broker发送消息时,broker可返回成功代码或错误代码;

  • 错误代码有两类:Retriable Error和Nonretriable Error;

  • 最好启用重试,以确保发送到Kafka时不会丢失消息;

1.1. Retriable Error

  • 可重试错误:重试后可解决的错误,如broker返回异常NotEnoughReplicasException;
    生产者可尝试再次发送消息,也许replica broker将come back online,第二次重试将成功;

1.2. Nonretriable Error

  • 不可修复的错误:无法解决的错误,如broker返回InvalidConfigException,
    则再次尝试相同的生产者请求将不会更改请求的结果;

2. In Short

//create safe producer
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true");
properties.setProperty(ProducerConfig.ACKS_CONFIG,"all");
properties.setProperty(ProducerConfig.RETRIES_CONFIG,Integer.toString(Integer.MAX_VALUE));
properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");

3. retries

  • 重试设置确定生产者在将消息标记为失败前尝试发送消息的次数;

  • Kafka >= 2.1,默认MAX_INT,否则为0;

  • 用户通常更应该使用delivery.timeout.ms来控制重试行为;

4. delivery.timeout.ms

  • 若重试次数大于0,如retries = 2147483647,生产者不会永远尝试该请求,
    它会受到超时限制,如设置超时2分钟:delivery.timeout.ms=120000;

  • 若在delivery.timeout.ms内无法deliver record,则记录失败;

ProducerRetry

5. retry.backoff.ms

  • 默认:生产者将在重试之间等待100毫秒,但可使用retry.backoff.ms来控制;

6. In Flight

  • max.in.flight.requests.per.connection

  • 允许重试而不将max.in.flight.requests.per.connection设置为1;

  • 设置为1可能会更改记录的顺序,因若将两个批次发送到单个分区,且第一个批次
    失败并重试,但第二个批次成功,则记录在第二批次中可能会首先出现;

  • 若rely on key-based ordering,可能是个问题,
    通过将正在进行的请求数量限制为1(默认5),即:

  • max.in.flight.requests.per.connection = 1,
    我们可保证当某些消息需多次重试才能成功确认acknowledge时,Kafka将保留消息顺序;

  • 设置max.in.flight.requests.per.connection = 1

  • 会显著降低吞吐量significantly decrease throughput;

  • 注:当启用幂等性(idempotence):enable.idempotence=true,
    则需max.in.flight.requests.per.connection ≤ 5;

  • 并为任何允许allowable的值保留preserve消息排序ordering;