1. Introduction

  • 默认producer会尽快发送记录(try to send record),
    一个生产者最多(have up to)有5个请求in flight;

  • in flight由max.in.flight.requests.per-connection控制;

  • 即将同时发送多达5个消息批次,在此之后,若在其它消息in flight必须发送更多消息,
    生产者将在之前的消息请求完成时开始处理批次消息;

  • 此智能批次允许Kafka在保持极低延迟latency的同时提高吞吐量throughput;

  • 批次具有更高的压缩比higher compression ratio,故具有更好的
    磁盘和网络效率,批次主要由linger.ms和batch.size控制;

ProducerBatching

2. linger.ms

  • linger.ms是生产者在发送批次前愿意等待的毫秒数,

  • 默认0,即立即发送消息,send msg right away;

  • 引入lag后(如linger.ms=20),我们增加消息在一批次中一起发送的机会;

  • 故引入小延迟delay为代价,可提高生产者的吞吐量throughput,
    压缩compression和效率efficiency;

  • 若一个批次在linger.ms周期period结束前达到
    其最大大小(由batch.size控制),它将立即发送给Kafka;

3. batch.size

  • batch.size:一个批次中包含的最大字节数,
    默认16KB,增加到32KB或64KB可提高请求的压缩,吞吐量和效率;

  • 任何大于批次大小的消息都不会被batch,批次按分区分配allocate,
    故请不要将其设置得太高,否则可能占用大量内存;

4. Key Takeaway

  • 增加linger.ms,生产者将等待几毫秒,等待批次填满(fill up)后再发送;

  • 若发送的是完整的批次,且有空闲内存,则可增加batch.size,并发送更大的批次;

  • 高吞吐(throughput)生产者配置:

// 高吞吐生产者以延迟和cpu利用率为代价
properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

properties.setProperty(ProducerConfig.LINGER_MS_CONFIG,"20");

// 32KB
properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG,Integer.toString(32*1024));