1. Dependency

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>

2. KafkaConst

public interface KafkaConst {

	String TX_ID = "kafka-tx-id";
	String TOPIC_NAME = "product-price";
	String BOOTSTRAP_SERVER = "192.168.0.123:9092";

}

3. KafkaUtil

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

public final class KafkaUtil {

	private KafkaUtil() {}

	private static final String SERIALIZER_CLASS = StringSerializer.class.getName();
	private static final String DESERIALIZER_CLASS = StringDeserializer.class.getName();

	public static Map<String,Object> producerConfigMap(){
		var producerConfigMap = new HashMap<String,Object>();
		producerConfigMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,KafkaConst.BOOTSTRAP_SERVER);
		producerConfigMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,SERIALIZER_CLASS);
		producerConfigMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,SERIALIZER_CLASS);

		producerConfigMap.put(ProducerConfig.ACKS_CONFIG,"-1");
		producerConfigMap.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
		producerConfigMap.put(ProducerConfig.RETRIES_CONFIG,5);
		producerConfigMap.put(ProducerConfig.BATCH_SIZE_CONFIG,5);
		producerConfigMap.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,3000);

		//事务基于幂等性操作
		producerConfigMap.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,KafkaConst.TX_ID);

		return producerConfigMap;
	}

	public static Map<String,Object> consumerConfigMap(){
		var consumerConfigMap = new HashMap<String,Object>();
		consumerConfigMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KafkaConst.BOOTSTRAP_SERVER);
		consumerConfigMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,DESERIALIZER_CLASS);
		consumerConfigMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,DESERIALIZER_CLASS);

		//读取数据的位置:earliest:最早,latest:最晚
		consumerConfigMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
		//消费者组
		consumerConfigMap.put(ConsumerConfig.GROUP_ID_CONFIG,"elf");
		//自动提交偏移量
		consumerConfigMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
		return consumerConfigMap;
	}

}

4. ElfKafkaAdmin

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;

public class ElfKafkaAdmin {

	public static void createTopic(String topicName,Integer partitionCount,
		Short replicationFactor)throws InterruptedException,ExecutionException{

		Map<String, Object> configMap = new HashMap<>();
        configMap.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,KafkaConst.BOOTSTRAP_SERVER);

        final Admin admin = Admin.create(configMap);
    	var topic = new NewTopic(topicName,partitionCount,replicationFactor);

        final CreateTopicsResult result = admin.createTopics(Arrays.asList(topic));
        System.out.printf("topicId: %s",result.topicId(topicName).get());
        admin.close();
	}

}

5. ElfKafkaProducer

import java.lang.invoke.MethodHandles;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ElfKafkaProducer {

	private static final Logger logger = LoggerFactory
		.getLogger(MethodHandles.lookup().lookupClass());

	public static void produce(String topic,String key,String value)
		throws InterruptedException, ExecutionException {
		logger.error("produce msg start...");
		var producer = new KafkaProducer<String,String>(KafkaUtil.producerConfigMap());

		for (int i = 0; i < 10; i++) {
			var pr = new ProducerRecord<String,String>(topic,key + i,value + i);
			final Future<RecordMetadata> frm = producer.send(pr);
			var rm = frm.get();
			System.out.printf("partition: %d,offset: %d,timestamp: %d \n",
				rm.partition(),rm.offset(),rm.timestamp());
		}
		producer.flush();
		producer.close();
		logger.error("produce msg end...");
	}

	public static void produceTransactional(String topic,String key,String value)
		throws InterruptedException, ExecutionException {

		logger.error("produce msg start...");
		var producer = new KafkaProducer<String,String>(KafkaUtil.producerConfigMap());
		//初始化事务
		producer.initTransactions();

		try {
			//开启事务
			producer.beginTransaction();

			for (int i = 0; i < 10; i++) {
				var pr = new ProducerRecord<String,String>(topic,key + i,value + i);
				final Future<RecordMetadata> frm = producer.send(pr);
				var rm = frm.get();
				System.out.printf("partition: %d,offset: %d,timestamp: %d \n",
					rm.partition(),rm.offset(),rm.timestamp());
			}

			//提交事务
			producer.commitTransaction();
		} catch (Exception e) {
			e.printStackTrace();
			//终止事务
			producer.abortTransaction();
		} finally {
			producer.flush();
			producer.close();
			logger.error("produce msg end...");
		}
	}

}

6. ElfKafkaConsumer

import java.time.Duration;
import java.util.Collections;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class ElfKafkaConsumer {

	public static void consume() {
		@SuppressWarnings("resource")
		var consumer = new KafkaConsumer<>(KafkaUtil.consumerConfigMap());
		consumer.seekToBeginning(consumer.assignment());
		//消费者订阅指定主题的数据
		consumer.subscribe(Collections.singleton(KafkaConst.TOPIC_NAME));
		while (true) {
			//每100ms抓取一次数据
			var recordList = consumer.poll(Duration.ofMillis(100));
			recordList.forEach(r -> {
				String pattern = "topic:%s,partition:%d,offset:%d,key:%s,value:%s!\n";
				System.out.printf(pattern,r.topic(),r.partition(),r.offset(),r.key(),r.value());
			});
		}
	}

	public static void consumeShutdownGraceful() {
		@SuppressWarnings("resource")
		var consumer = new KafkaConsumer<String,String>(KafkaUtil.consumerConfigMap());

		final Thread mainThread = Thread.currentThread();
		Runtime.getRuntime().addShutdownHook(new Thread() {
			public void run() {
				//detect shutdown,exit by calling consumer.wakeup()
				consumer.wakeup();

				try {
					//join main thread to allow execution of code in main thread
					mainThread.join();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		});

		consumer.seekToBeginning(consumer.assignment());
		//消费者订阅指定主题的数据
		consumer.subscribe(Collections.singleton(KafkaConst.TOPIC_NAME));
		while (true) {
			//每100ms抓取一次数据
			var recordList = consumer.poll(Duration.ofMillis(100));
			recordList.forEach(r -> {
				String pattern = "topic:%s,partition:%d,offset:%d,key:%s,value:%s!\n";
				System.out.printf(pattern,r.topic(),r.partition(),r.offset(),r.key(),r.value());
			});
		}
	}

}

7. KafkaDemonstrator

public class KafkaDemonstrator {

	public static void main(String[] sa) throws InterruptedException, ExecutionException {
		ElfKafkaAdmin.createTopic(KafkaConst.TOPIC_NAME,2,(short)1);

		ElfKafkaProducer.produce(KafkaConst.TOPIC_NAME,"pal-","role-");

		ElfKafkaConsumer.consume();
	}

}