1. Introduction
-
默认producer会尽快发送记录(try to send record),
一个生产者最多(have up to)有5个请求in flight; -
in flight由max.in.flight.requests.per-connection控制;
-
即将同时发送多达5个消息批次,在此之后,若在其它消息in flight必须发送更多消息,
生产者将在之前的消息请求完成时开始处理批次消息; -
此智能批次允许Kafka在保持极低延迟latency的同时提高吞吐量throughput;
-
批次具有更高的压缩比higher compression ratio,故具有更好的
磁盘和网络效率,批次主要由linger.ms和batch.size控制;
2. linger.ms
-
linger.ms是生产者在发送批次前愿意等待的毫秒数,
-
默认0,即立即发送消息,send msg right away;
-
引入lag后(如linger.ms=20),我们增加消息在一批次中一起发送的机会;
-
故引入小延迟delay为代价,可提高生产者的吞吐量throughput,
压缩compression和效率efficiency; -
若一个批次在linger.ms周期period结束前达到
其最大大小(由batch.size控制),它将立即发送给Kafka;
3. batch.size
-
batch.size:一个批次中包含的最大字节数,
默认16KB,增加到32KB或64KB可提高请求的压缩,吞吐量和效率; -
任何大于批次大小的消息都不会被batch,批次按分区分配allocate,
故请不要将其设置得太高,否则可能占用大量内存;
4. Key Takeaway
-
增加linger.ms,生产者将等待几毫秒,等待批次填满(fill up)后再发送;
-
若发送的是完整的批次,且有空闲内存,则可增加batch.size,并发送更大的批次;
-
高吞吐(throughput)生产者配置:
// 高吞吐生产者以延迟和cpu利用率为代价
properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG,"20");
// 32KB
properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG,Integer.toString(32*1024));