1. Dependency

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>

2. log4j2.yaml

Configutation:
  name: Default
  Properties:
    Property:
      name: log-path
      value: "log"
  Appenders:
    Console:
      name: Console_Appender
      target: SYSTEM_OUT
      PatternLayout:
        pattern: "[%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} - %msg%n"

    File:
      name: File_Appender
      fileName: ${log-path}/logfile.log
      PatternLayout:
        pattern: "[%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} - %msg%n"

  Loggers:
      Root:
        level: debug
        AppenderRef:
          - ref: Console_Appender
      Logger:
        - name: io.os
          level: info
          AppenderRef:
            - ref: File_Appender
              level: error

3. Producer

import java.lang.invoke.MethodHandles;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ElfProducer {

	private static final Logger logger = LoggerFactory
		.getLogger(MethodHandles.lookup().lookupClass());

	public static void main(String[] sa) {
		produce();
	}

	public static void produce() {
		logger.info("kafka producer...");
		var bootstrapServer = "192.168.0.123:9092";

		//create producer properties
		var prop = new Properties();
		prop.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
		prop.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
		prop.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

		//create producer
		var producer = new KafkaProducer<String,String>(prop);

		//create producer record
		var producerRecord = new ProducerRecord<String,String>("topic-elf","pal4");

		//send data:asynchronous
		producer.send(producerRecord);

		//flush data - synchronous
		producer.flush();

		//flush and close producer
		producer.close();

	}
}

4. Topic Operation

# 查看主题列表
./kafka-topics.sh --bootstrap-server 192.168.0.123:9092 --list

# 创建主题topic-elf
./kafka-topics.sh --bootstrap-server 192.168.0.123:9092 \
	--topic topic-elf --create --partitions 3 --replication-factor 1

# 使用kafka-console-consumer,观察(observe )生产者应用的输出
./kafka-console-consumer.sh \
	--bootstrap-server 192.168.0.123:9092 --topic topic-elf

docker exec -it kafka /opt/kafka/bin/kafka-console-consumer.sh \
  • 运行生产者程序ElfProducer,主题不存在会自动创建主题;

5. Producer Callback

producer.send(producerRecord,new Callback() {
    public void onCompletion(RecordMetadata rm, Exception e) {
        //executes every time record successfully sent or exception thrown
        if (null == e) {
            //record sent successfully
        	var pattern = """
        	receive new metadata:
        		Topic:{};
        		Partition:{};
        		Offset:{};
        		Timestamp:{};
        	""";
        	logger.info(pattern,rm.topic(),rm.partition(),rm.offset(),rm.timestamp());
        } else {
        	logger.error("error while producing",e);
        }
    }
});