1. Dependency
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
2. KafkaConst
public interface KafkaConst {
String TX_ID = "kafka-tx-id";
String TOPIC_NAME = "product-price";
String BOOTSTRAP_SERVER = "192.168.0.123:9092";
}
3. KafkaUtil
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
public final class KafkaUtil {
private KafkaUtil() {}
private static final String SERIALIZER_CLASS = StringSerializer.class.getName();
private static final String DESERIALIZER_CLASS = StringDeserializer.class.getName();
public static Map<String,Object> producerConfigMap(){
var producerConfigMap = new HashMap<String,Object>();
producerConfigMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,KafkaConst.BOOTSTRAP_SERVER);
producerConfigMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,SERIALIZER_CLASS);
producerConfigMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,SERIALIZER_CLASS);
producerConfigMap.put(ProducerConfig.ACKS_CONFIG,"-1");
producerConfigMap.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
producerConfigMap.put(ProducerConfig.RETRIES_CONFIG,5);
producerConfigMap.put(ProducerConfig.BATCH_SIZE_CONFIG,5);
producerConfigMap.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,3000);
//事务基于幂等性操作
producerConfigMap.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,KafkaConst.TX_ID);
return producerConfigMap;
}
public static Map<String,Object> consumerConfigMap(){
var consumerConfigMap = new HashMap<String,Object>();
consumerConfigMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KafkaConst.BOOTSTRAP_SERVER);
consumerConfigMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,DESERIALIZER_CLASS);
consumerConfigMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,DESERIALIZER_CLASS);
//读取数据的位置:earliest:最早,latest:最晚
consumerConfigMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
//消费者组
consumerConfigMap.put(ConsumerConfig.GROUP_ID_CONFIG,"elf");
//自动提交偏移量
consumerConfigMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
return consumerConfigMap;
}
}
4. ElfKafkaAdmin
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
public class ElfKafkaAdmin {
public static void createTopic(String topicName,Integer partitionCount,
Short replicationFactor)throws InterruptedException,ExecutionException{
Map<String, Object> configMap = new HashMap<>();
configMap.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,KafkaConst.BOOTSTRAP_SERVER);
final Admin admin = Admin.create(configMap);
var topic = new NewTopic(topicName,partitionCount,replicationFactor);
final CreateTopicsResult result = admin.createTopics(Arrays.asList(topic));
System.out.printf("topicId: %s",result.topicId(topicName).get());
admin.close();
}
}
5. ElfKafkaProducer
import java.lang.invoke.MethodHandles;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ElfKafkaProducer {
private static final Logger logger = LoggerFactory
.getLogger(MethodHandles.lookup().lookupClass());
public static void produce(String topic,String key,String value)
throws InterruptedException, ExecutionException {
logger.error("produce msg start...");
var producer = new KafkaProducer<String,String>(KafkaUtil.producerConfigMap());
for (int i = 0; i < 10; i++) {
var pr = new ProducerRecord<String,String>(topic,key + i,value + i);
final Future<RecordMetadata> frm = producer.send(pr);
var rm = frm.get();
System.out.printf("partition: %d,offset: %d,timestamp: %d \n",
rm.partition(),rm.offset(),rm.timestamp());
}
producer.flush();
producer.close();
logger.error("produce msg end...");
}
public static void produceTransactional(String topic,String key,String value)
throws InterruptedException, ExecutionException {
logger.error("produce msg start...");
var producer = new KafkaProducer<String,String>(KafkaUtil.producerConfigMap());
//初始化事务
producer.initTransactions();
try {
//开启事务
producer.beginTransaction();
for (int i = 0; i < 10; i++) {
var pr = new ProducerRecord<String,String>(topic,key + i,value + i);
final Future<RecordMetadata> frm = producer.send(pr);
var rm = frm.get();
System.out.printf("partition: %d,offset: %d,timestamp: %d \n",
rm.partition(),rm.offset(),rm.timestamp());
}
//提交事务
producer.commitTransaction();
} catch (Exception e) {
e.printStackTrace();
//终止事务
producer.abortTransaction();
} finally {
producer.flush();
producer.close();
logger.error("produce msg end...");
}
}
}
6. ElfKafkaConsumer
import java.time.Duration;
import java.util.Collections;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class ElfKafkaConsumer {
public static void consume() {
@SuppressWarnings("resource")
var consumer = new KafkaConsumer<>(KafkaUtil.consumerConfigMap());
consumer.seekToBeginning(consumer.assignment());
//消费者订阅指定主题的数据
consumer.subscribe(Collections.singleton(KafkaConst.TOPIC_NAME));
while (true) {
//每100ms抓取一次数据
var recordList = consumer.poll(Duration.ofMillis(100));
recordList.forEach(r -> {
String pattern = "topic:%s,partition:%d,offset:%d,key:%s,value:%s!\n";
System.out.printf(pattern,r.topic(),r.partition(),r.offset(),r.key(),r.value());
});
}
}
public static void consumeShutdownGraceful() {
@SuppressWarnings("resource")
var consumer = new KafkaConsumer<String,String>(KafkaUtil.consumerConfigMap());
final Thread mainThread = Thread.currentThread();
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
//detect shutdown,exit by calling consumer.wakeup()
consumer.wakeup();
try {
//join main thread to allow execution of code in main thread
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
consumer.seekToBeginning(consumer.assignment());
//消费者订阅指定主题的数据
consumer.subscribe(Collections.singleton(KafkaConst.TOPIC_NAME));
while (true) {
//每100ms抓取一次数据
var recordList = consumer.poll(Duration.ofMillis(100));
recordList.forEach(r -> {
String pattern = "topic:%s,partition:%d,offset:%d,key:%s,value:%s!\n";
System.out.printf(pattern,r.topic(),r.partition(),r.offset(),r.key(),r.value());
});
}
}
}