1. Introduction
-
Kafka Connect Source Connector(Producer);
-
Source:Database (Debezium Donnector),Jdbc,Couchbase,
Blockchain,Cassandra,DynamoDB,FTP,IOT,MongoDB,MQTT等; -
Kafka Connect Sink Connector(Consumer);
-
Sink:ElasticSearch,Hdfs,Jdbc,DocumentDB,
Cassandra,HBase,MongoDB,Redis等;
2. Connect Standalone Mode
-
Download Kafka Connect Connector;
https://www.confluent.io/hub -
Create Connector Configuration;
-
connect-standalone.sh to Start;
3. Connect To Wikipedia
-
创建有3个分区的主题:wikipedia.recentchange;
./kafka-topics.sh \
--bootstrap-server 192.168.0.123:9092 \
--topic wikipedia.recentchange --create \
--partitions 3 --replication-factor 1
-
及主题死信队列:wikipedia.dlq,用于捕获任何错误,
topic dead letter queue for catching any error;
./kafka-topics.sh --bootstrap-server 192.168.0.123:9092 \
--topic wikipedia.dlq --create --partitions 3 --replication-factor 1
-
下载并配置kafka-connect-sse:{KAFKA_HOME}/connectors/kafka-connect-sse
/kafka370/connectors ls -R kafka-connect-sse
./kafka-connect-sse: connector.properties \
kafka-connect-sse-1.0-jar-with-dependencies.jar
-
编辑connectors/kafka-connect-sse/connector.properties;
name=sse-source-connector
tasks.max=1
connector.class=com.github.cjmatta.kafka.connect.sse.ServerSentEventsSourceConnector
topic=wikipedia.recentchange
sse.uri=https://stream.wikimedia.org/v2/stream/recentchange
errors.tollerance=all
errors.deadletterqueue.topic.name=wikipedia.dlq
-
编辑config/connect-tandalone.properties;
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.flush.interval.ms=10000
# EDIT BELOW IF NEEDED
bootstrap.servers=192.168.0.123:9092
offset.storage.file.filename=/tmp/connect.offsets
plugin.path=/elf/kafka370/connectors
-
plugin.path-config:存储下载kafka connector的绝对路径;
-
启动Kafka Connect Standalone Connector,
数据正在写入wikipedia.recentchange主题;
connect-standalone.sh /kafka370/config/connect-standalone.properties \
~/kafka370/connectors/kafka-connect-sse/connector.properties
./kafka-console-consumer.sh --bootstrap-server 192.168.0.123:9092 \
--topic wikipedia.recentchange