1. Dependency
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
2. log4j2.yaml
Configutation:
name: Default
Properties:
Property:
name: log-path
value: "log"
Appenders:
Console:
name: Console_Appender
target: SYSTEM_OUT
PatternLayout:
pattern: "[%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} - %msg%n"
File:
name: File_Appender
fileName: ${log-path}/logfile.log
PatternLayout:
pattern: "[%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} - %msg%n"
Loggers:
Root:
level: debug
AppenderRef:
- ref: Console_Appender
Logger:
- name: io.os
level: info
AppenderRef:
- ref: File_Appender
level: error
3. Producer
import java.lang.invoke.MethodHandles;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ElfProducer {
private static final Logger logger = LoggerFactory
.getLogger(MethodHandles.lookup().lookupClass());
public static void main(String[] sa) {
produce();
}
public static void produce() {
logger.info("kafka producer...");
var bootstrapServer = "192.168.0.123:9092";
//create producer properties
var prop = new Properties();
prop.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
prop.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
prop.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//create producer
var producer = new KafkaProducer<String,String>(prop);
//create producer record
var producerRecord = new ProducerRecord<String,String>("topic-elf","pal4");
//send data:asynchronous
producer.send(producerRecord);
//flush data - synchronous
producer.flush();
//flush and close producer
producer.close();
}
}
4. Topic Operation
# 查看主题列表
./kafka-topics.sh --bootstrap-server 192.168.0.123:9092 --list
# 创建主题topic-elf
./kafka-topics.sh --bootstrap-server 192.168.0.123:9092 \
--topic topic-elf --create --partitions 3 --replication-factor 1
# 使用kafka-console-consumer,观察(observe )生产者应用的输出
./kafka-console-consumer.sh \
--bootstrap-server 192.168.0.123:9092 --topic topic-elf
docker exec -it kafka /opt/kafka/bin/kafka-console-consumer.sh \
-
运行生产者程序ElfProducer,主题不存在会自动创建主题;
5. Producer Callback
producer.send(producerRecord,new Callback() {
public void onCompletion(RecordMetadata rm, Exception e) {
//executes every time record successfully sent or exception thrown
if (null == e) {
//record sent successfully
var pattern = """
receive new metadata:
Topic:{};
Partition:{};
Offset:{};
Timestamp:{};
""";
logger.info(pattern,rm.topic(),rm.partition(),rm.offset(),rm.timestamp());
} else {
logger.error("error while producing",e);
}
}
});