1. Introduction

  • Kafka Connect Source Connector(Producer);

  • Source:Database (Debezium Donnector),Jdbc,Couchbase,
    Blockchain,Cassandra,DynamoDB,FTP,IOT,MongoDB,MQTT等;

  • Kafka Connect Sink Connector(Consumer);

  • Sink:ElasticSearch,Hdfs,Jdbc,DocumentDB,
    Cassandra,HBase,MongoDB,Redis等;

2. Connect Standalone Mode

3. Connect To Wikipedia

  • 创建有3个分区的主题:wikipedia.recentchange;

./kafka-topics.sh \
	--bootstrap-server 192.168.0.123:9092  \
	--topic wikipedia.recentchange --create \
	--partitions 3 --replication-factor 1
  • 及主题死信队列:wikipedia.dlq,用于捕获任何错误,
    topic dead letter queue for catching any error;

./kafka-topics.sh --bootstrap-server 192.168.0.123:9092 \
	--topic wikipedia.dlq --create --partitions 3 --replication-factor 1
/kafka370/connectors ls -R kafka-connect-sse

./kafka-connect-sse: connector.properties \
	kafka-connect-sse-1.0-jar-with-dependencies.jar
  • 编辑connectors/kafka-connect-sse/connector.properties;

name=sse-source-connector
tasks.max=1
connector.class=com.github.cjmatta.kafka.connect.sse.ServerSentEventsSourceConnector
topic=wikipedia.recentchange
sse.uri=https://stream.wikimedia.org/v2/stream/recentchange
errors.tollerance=all
errors.deadletterqueue.topic.name=wikipedia.dlq
  • 编辑config/connect-tandalone.properties;

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.flush.interval.ms=10000

# EDIT BELOW IF NEEDED
bootstrap.servers=192.168.0.123:9092
offset.storage.file.filename=/tmp/connect.offsets
plugin.path=/elf/kafka370/connectors
  • plugin.path-config:存储下载kafka connector的绝对路径;

  • 启动Kafka Connect Standalone Connector,
    数据正在写入wikipedia.recentchange主题;

connect-standalone.sh /kafka370/config/connect-standalone.properties \
	~/kafka370/connectors/kafka-connect-sse/connector.properties

./kafka-console-consumer.sh --bootstrap-server 192.168.0.123:9092 \
	--topic wikipedia.recentchange