1. Project Structure

2. Kafka Compose

version: '3.8'
services:
  kafka:
    image: apache/kafka:3.8.0
    container_name: kafka
    privileged: true
    hostname: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_LOG_DIRS: '/tmp/kafka-log'
      CLUSTER_ID: 'YWU3MzE1YmVmYzhiMTFlZT'

      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093'

      KAFKA_LISTENERS: 'CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT_HOST://192.168.0.123:9092,PLAINTEXT://kafka:19092'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'

      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
networks:
  default:
    name: network-common
    external: true

3. KRaft server.properties

  • vi /opt/kafka/config/kraft/server.properties

#controller.quorum.voters=1@localhost:9093
controller.quorum.voters=1@192.168.0.123:9093

#listeners=PLAINTEXT://:9092,CONTROLLER://:9093
listeners=CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092

#advertised.listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT_HOST://192.168.0.123:9092,PLAINTEXT://kafka:19092

4. Open Search Prerequisite

# disable memory paging and swapping performance
sudo swapoff -a

# edit sysctl config
sudo vi /etc/sysctl.conf

# add line to define desired value or change exist
vm.max_map_count=262144

# reload kernel parameter using sysctl
sudo sysctl -p

# verify change
cat /proc/sys/vm/max_map_count

5. Open Search Compose

docker pull opensearchproject/opensearch:1.3.16 && \
	docker pull opensearchproject/opensearch-dashboards:1.3.16
version: '3.8'
services:
  opensearch:
    image: opensearchproject/opensearch:1.3.16
    container_name: opensearch
    environment:
      discovery.type: single-node
      plugins.security.disabled: true
      compatibility.override_main_response_version: true
    ports:
      - "9200:9200"
      - "9600:9600"
  opensearch-dashboard:
    image: opensearchproject/opensearch-dashboards:1.3.16
    container_name: opensearch-dashboard
    ports:
      - "5601:5601"
    environment:
      OPENSEARCH_HOSTS: '["http://opensearch:9200"]'
      DISABLE_SECURITY_DASHBOARDS_PLUGIN: "true"

6. Producer Dependency

<properties>
	<okhttp.eventsource>2.7.1</okhttp.eventsource>
</properties>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>

<dependency>
    <groupId>com.squareup.okhttp3</groupId>
    <artifactId>okhttp</artifactId>
</dependency>

<dependency>
    <groupId>com.launchdarkly</groupId>
    <artifactId>okhttp-eventsource</artifactId>
    <version>${okhttp.eventsource}</version>
</dependency>

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>

7. WikimediaChangeHandler

import com.launchdarkly.eventsource.EventHandler;
import com.launchdarkly.eventsource.MessageEvent;
import java.lang.invoke.MethodHandles;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WikimediaChangeHandler implements EventHandler {

	private static final Logger logger = LoggerFactory
		.getLogger(MethodHandles.lookup().lookupClass());

	String topic;
	KafkaProducer<String, String> kafkaProducer;

	public WikimediaChangeHandler(KafkaProducer<String, String> kafkaProducer,String topic) {
        this.kafkaProducer = kafkaProducer;
        this.topic = topic;
    }

    @Override
    public void onOpen() {}

    @Override
    public void onClosed() {
        kafkaProducer.close();
    }

    @Override
    public void onMessage(String event, MessageEvent messageEvent) {
    	logger.error(messageEvent.getData());
        kafkaProducer.send(new ProducerRecord<>(topic, messageEvent.getData()));
    }

    @Override
    public void onComment(String comment) {}

    @Override
    public void onError(Throwable t) {
    	logger.error("Stream Reading Failure!", t);
    }

}

8. WikimediaChangeProducer

import com.launchdarkly.eventsource.EventSource;
import com.launchdarkly.eventsource.EventHandler;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;

import java.net.URI;
import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class WikimediaChangeProducer {

	public static void main(String[] args) throws InterruptedException {
		String bootstrapServers = "192.168.0.123:9092";

	    // create Producer properties
	    Properties properties = new Properties();
	    properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
	    properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
	    properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

	    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

	    String topic = "wikimedia.recentchange";

	    EventHandler eventHandler = new WikimediaChangeHandler(kafkaProducer, topic);
	    String url = "https://stream.wikimedia.org/v2/stream/recentchange";
	    EventSource.Builder builder = new EventSource.Builder(eventHandler, URI.create(url));

	    builder.connectTimeout(Duration.ofMinutes(10));
	    //注:需科学上网
	    builder.proxy("127.0.0.1",1080);
	    EventSource eventSource = builder.build();

	    // start the producer in another thread
	    eventSource.start();

	    // we produce for 10 minutes and block the program until then
	    TimeUnit.MINUTES.sleep(10);
  	}

}

9. Consumer Dependency

<properties>
	<opensearch.java>2.10.1</opensearch.java>
</properties>

<dependency>
    <groupId>org.opensearch.client</groupId>
    <artifactId>opensearch-java</artifactId>
    <version>${opensearch.java}</version>
</dependency>

<dependency>
    <groupId>org.apache.httpcomponents.client5</groupId>
    <artifactId>httpclient5</artifactId>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>

10. OpenSearchConsumer

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.core5.http.HttpHost;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.OpenSearchException;
import org.opensearch.client.opensearch.core.IndexRequest;
import org.opensearch.client.opensearch.core.IndexResponse;
import org.opensearch.client.opensearch.indices.CreateIndexRequest;
import org.opensearch.client.opensearch.indices.DeleteIndexRequest;
import org.opensearch.client.opensearch.indices.ExistsRequest;
import org.opensearch.client.transport.OpenSearchTransport;
import org.opensearch.client.transport.endpoints.BooleanResponse;
import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OpenSearchConsumer {

	private static final Logger logger = LoggerFactory
		.getLogger(MethodHandles.lookup().lookupClass());

	public static OpenSearchClient connect(String scheme,String hostName,int port) {
		final HttpHost host = new HttpHost(scheme,hostName,port);

	    final ApacheHttpClient5TransportBuilder builder =
	    	ApacheHttpClient5TransportBuilder.builder(host);

	    builder.setHttpClientConfigCallback(hcb -> {
	    	final PoolingAsyncClientConnectionManager manager =
	    		PoolingAsyncClientConnectionManagerBuilder.create().build();
	    	return hcb.setConnectionManager(manager);
	    });

	    final OpenSearchTransport transport = builder.build();
	    return new OpenSearchClient(transport);
	}

	public static OpenSearchClient connect() {
		return connect("http","192.168.0.123",9200);
	}

	public static boolean exist(OpenSearchClient client,String indexName)
		throws OpenSearchException, IOException {
		var existRequest = ExistsRequest.of(fn -> fn.index(indexName));
		BooleanResponse exist = client.indices().exists(existRequest);
		return exist.value();
	}

	public static void createIndex(OpenSearchClient client,
		String indexName) throws OpenSearchException, IOException {
		var exist = exist(client,indexName);
		if (exist) {
			System.out.printf("index %s already exist!\n",indexName);
		} else {
			var createRequest = new CreateIndexRequest.Builder().index(indexName).build();
			client.indices().create(createRequest);
		}
	}

	//GET /_cat/indices?v
	public static void deleteIndex(OpenSearchClient client,
		String indexName) throws OpenSearchException, IOException {
		var exist = exist(client,indexName);
		if (!exist) {
			System.out.printf("index %s not exist!\n",indexName);
		} else {
			var deleteRequest = new DeleteIndexRequest.Builder().index(indexName).build();
			client.indices().delete(deleteRequest);
		}
	}

	public static KafkaConsumer<String,String> createKafkaConsumer(){
		var boostrapServer = "192.168.0.123:9092";
		var groupId = "group-wikimedia-opensearch";
		Properties prop = new Properties();
		prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,boostrapServer);
		prop.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
		prop.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
		prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);
		//earliest,latest etc.
		prop.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
		return new KafkaConsumer<>(prop);
	}

	public static void consume(OpenSearchClient client,String indexName)
		throws OpenSearchException, IOException {
		var consumer = createKafkaConsumer();
		var topic = "wikimedia.recentchange";
		consumer.subscribe(Arrays.asList(topic));

		while (true) {
			var consumerRecord = consumer.poll(Duration.ofMillis(3000));
			int recordCount = consumerRecord.count();
			logger.info("receive %d record!",recordCount);

			for (ConsumerRecord<String,String> cr : consumerRecord) {
				//send record into OpenSearch
				IndexData indexData = new IndexData(cr.value());
				var indexRequest = new IndexRequest.Builder<IndexData>()
					.index(indexName).document(indexData).build();
				IndexResponse response = client.index(indexRequest);
				System.out.println(response.id());
			}
		}
	}

	public static void main(String[] sa) throws OpenSearchException, IOException {
		var client = connect();
		var indexName = "wikimedia-opensearch";
		//createIndex(client, indexName);

		System.out.println("consuming start...");
		consume(client,indexName);
		System.out.println("consuming end...");
	}


	static class IndexData {
		private String wikiMediaValue;

		public IndexData(String wikiMediaValue) {
			this.wikiMediaValue = wikiMediaValue;
		}

		@Override
		public String toString() {
			return String.format("IndexData{wikiMediaValue='%s'}",wikiMediaValue);
		}

		public String getWikiMediaValue() {
			return wikiMediaValue;
		}
		public void setWikiMediaValue(String wikiMediaValue) {
			this.wikiMediaValue = wikiMediaValue;
		}
	}

}

11. Create Topic

  • 创建主题,并启动WikimediaChangeProducer;

  • 注:需科学上网,需科学上网,需科学上网;

./kafka-topics.sh \
	--bootstrap-server 192.168.0.123:9092  \
	--topic wikimedia.recentchange --create \
	--partitions 3 --replication-factor 1
20:52:31.936 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://ru.wikipedia.org/wiki/%D0%A3%D1%87%D0%B0%D1%81%D1%82%D0%BD%D0%B8%D0%BA:Sherbek_Qarshiyev","request_id":"68d8c10a-31dc-44f3-8b1d-cb977dfd1602","id":"93bf8bf8-4c36-4a49-b226-3a9311d2c906","dt":"2024-04-25T12:16:20Z","domain":"ru.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257037},"id":484017699,"type":"log","namespace":2,"title":"Участник:Sherbek Qarshiyev","title_url":"https://ru.wikipedia.org/wiki/%D0%A3%D1%87%D0%B0%D1%81%D1%82%D0%BD%D0%B8%D0%BA:Sherbek_Qarshiyev","comment":"Carn removed Carn from mentorship","timestamp":1714047380,"user":"Carn","bot":true,"log_id":101062090,"log_type":"growthexperiments","log_action":"setmentor","log_params":{"previous-mentor":"Carn","new-mentor":"Birulik"},"log_action_comment":"Carn установил Birulik как наставницу для Sherbek Qarshiyev (предыдущий наставник Carn): Carn removed Carn from mentorship","server_url":"https://ru.wikipedia.org","server_name":"ru.wikipedia.org","server_script_path":"/w","wiki":"ruwiki","parsedcomment":"Carn removed Carn from mentorship"}
20:52:31.936 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://www.wikidata.org/wiki/Q46898283","request_id":"161daa3c-9f11-44f1-b042-209a3acabcf8","id":"65fb4ae3-86fd-47ab-a85f-f0dd26b04b66","dt":"2024-04-25T12:52:32Z","domain":"www.wikidata.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257038},"id":2201214347,"type":"edit","namespace":0,"title":"Q46898283","title_url":"https://www.wikidata.org/wiki/Q46898283","comment":"/* wbsetclaimvalue:1| */ [[Property:P1476]]: Guidelines for diagnosis and therapy of patients with asthma 2005. The most important aspects for adults","timestamp":1714049552,"user":"KrBot","bot":true,"notify_url":"https://www.wikidata.org/w/index.php?diff=2136975570&oldid=2136975561&rcid=2201214347","minor":false,"patrolled":true,"length":{"old":60872,"new":60869},"revision":{"old":2136975561,"new":2136975570},"server_url":"https://www.wikidata.org","server_name":"www.wikidata.org","server_script_path":"/w","wiki":"wikidatawiki","parsedcomment":"‎<span dir=\"auto\"><span class=\"autocomment\">Определено значение для утверждения: </span></span> <a href=\"/wiki/Property:P1476\" title=\"‎название‎ | ‎название произведения (книги, фильма, газетной статьи, произведения исполнительского искусства, веб-сайта)‎\"><span class=\"wb-itemlink\"><span class=\"wb-itemlink-label\" lang=\"ru\" dir=\"ltr\">название</span> <span class=\"wb-itemlink-id\">(P1476)</span></span></a>: Guidelines for diagnosis and therapy of patients with asthma 2005. The most important aspects for adults"}
20:52:31.936 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://en.wikipedia.org/wiki/User:Ali_Ahwazi/sandbox2","request_id":"4c136271-a6cd-4ff6-a6b7-429d769ba5ba","id":"14fcefb7-1f73-40a9-9acc-539d97aa06c3","dt":"2024-04-25T12:52:32Z","domain":"en.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257039},"id":1769512861,"type":"edit","namespace":2,"title":"User:Ali Ahwazi/sandbox2","title_url":"https://en.wikipedia.org/wiki/User:Ali_Ahwazi/sandbox2","comment":"","timestamp":1714049552,"user":"Ali Ahwazi","bot":false,"notify_url":"https://en.wikipedia.org/w/index.php?diff=1220709553&oldid=1220709505","minor":false,"length":{"old":26671,"new":31548},"revision":{"old":1220709505,"new":1220709553},"server_url":"https://en.wikipedia.org","server_name":"en.wikipedia.org","server_script_path":"/w","wiki":"enwiki","parsedcomment":""}
20:52:31.936 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://fr.wikipedia.org/wiki/Portail:Ch%C3%A2teaux/Articles_r%C3%A9cents","request_id":"be816a4f-be27-4c49-830a-31161665401f","id":"ec47eca3-b6a4-4b4f-ac71-b44369de940d","dt":"2024-04-25T12:52:33Z","domain":"fr.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257040},"id":519472837,"type":"edit","namespace":100,"title":"Portail:Châteaux/Articles récents","title_url":"https://fr.wikipedia.org/wiki/Portail:Ch%C3%A2teaux/Articles_r%C3%A9cents","comment":"+ [[Château de Mielmont]]","timestamp":1714049553,"user":"OrlodrimBot","bot":true,"notify_url":"https://fr.wikipedia.org/w/index.php?diff=214561930&oldid=214559720&rcid=519472837","minor":false,"patrolled":true,"length":{"old":779,"new":779},"revision":{"old":214559720,"new":214561930},"server_url":"https://fr.wikipedia.org","server_name":"fr.wikipedia.org","server_script_path":"/w","wiki":"frwiki","parsedcomment":"+ <a href=\"/wiki/Ch%C3%A2teau_de_Mielmont\" title=\"Château de Mielmont\">Château de Mielmont</a>"}
20:52:31.936 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://ru.wikipedia.org/wiki/%D0%A3%D1%87%D0%B0%D1%81%D1%82%D0%BD%D0%B8%D0%BA:%D0%92%D0%B8%D0%BA%D1%82%D0%BE%D1%80%D0%B8%D1%8F_%D0%9D%D0%B8%D0%BA%D0%B8%D1%82%D0%B5%D0%BD%D0%BA%D0%BE","request_id":"68d8c10a-31dc-44f3-8b1d-cb977dfd1602","id":"b8ab3286-f69f-466a-9a00-c5c12c176001","dt":"2024-04-25T12:16:20Z","domain":"ru.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257041},"id":484017700,"type":"log","namespace":2,"title":"Участник:Виктория Никитенко","title_url":"https://ru.wikipedia.org/wiki/%D0%A3%D1%87%D0%B0%D1%81%D1%82%D0%BD%D0%B8%D0%BA:%D0%92%D0%B8%D0%BA%D1%82%D0%BE%D1%80%D0%B8%D1%8F_%D0%9D%D0%B8%D0%BA%D0%B8%D1%82%D0%B5%D0%BD%D0%BA%D0%BE","comment":"Carn removed Carn from mentorship","timestamp":1714047380,"user":"Carn","bot":true,"log_id":101062091,"log_type":"growthexperiments","log_action":"setmentor","log_params":{"previous-mentor":"Carn","new-mentor":"Birulik"},"log_action_comment":"Carn установил Birulik как наставницу для Виктория Никитенко (предыдущий наставник Carn): Carn removed Carn from mentorship","server_url":"https://ru.wikipedia.org","server_name":"ru.wikipedia.org","server_script_path":"/w","wiki":"ruwiki","parsedcomment":"Carn removed Carn from mentorship"}

20:52:31.937 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://oc.wikipedia.org/wiki/Pairac_lo_Chasteu","request_id":"739f7d34-fd23-4b37-ab77-87c95663aeda","id":"34ab12f5-7256-48e5-a4f8-b40bf95316ad","dt":"2024-04-25T12:52:33Z","domain":"oc.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257042},"id":10815379,"type":"new","namespace":0,"title":"Pairac lo Chasteu","title_url":"https://oc.wikipedia.org/wiki/Pairac_lo_Chasteu","comment":"Redireccion cap a [[Pairac (lo Chasteu)]]","timestamp":1714049553,"user":"PairacLoChasteu","bot":false,"notify_url":"https://oc.wikipedia.org/w/index.php?oldid=2436522&rcid=10815379","minor":false,"patrolled":false,"length":{"new":32},"revision":{"new":2436522},"server_url":"https://oc.wikipedia.org","server_name":"oc.wikipedia.org","server_script_path":"/w","wiki":"ocwiki","parsedcomment":"Redireccion cap a <a href=\"/wiki/Pairac_(lo_Chasteu)\" title=\"Pairac (lo Chasteu)\">Pairac (lo Chasteu)</a>"}
20:52:31.938 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://fr.wikipedia.org/wiki/Cat%C3%A9gorie:Article_%C3%A0_r%C3%A9f%C3%A9rence_n%C3%A9cessaire","request_id":"7d6530d5-10a3-4628-a101-bc2e75b9a92f","id":"2eb4865e-b4ee-407c-9e19-b871967ed9e1","dt":"2024-04-25T12:52:30Z","domain":"fr.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257043},"id":519472838,"type":"categorize","namespace":14,"title":"Catégorie:Article à référence nécessaire","title_url":"https://fr.wikipedia.org/wiki/Cat%C3%A9gorie:Article_%C3%A0_r%C3%A9f%C3%A9rence_n%C3%A9cessaire","comment":"[[:20e armée (Union soviétique)]] ajoutée à la catégorie","timestamp":1714049550,"user":"Le Petit Chat","bot":false,"notify_url":"https://fr.wikipedia.org/w/index.php?diff=214561929&oldid=209346180&rcid=519472838","server_url":"https://fr.wikipedia.org","server_name":"fr.wikipedia.org","server_script_path":"/w","wiki":"frwiki","parsedcomment":"<a href=\"/wiki/20e_arm%C3%A9e_(Union_sovi%C3%A9tique)\" title=\"20e armée (Union soviétique)\">20e armée (Union soviétique)</a> ajoutée à la catégorie"}
20:52:31.939 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://commons.wikimedia.org/wiki/Category:Milford,_Derbyshire","request_id":"3d4237d1-1994-4c65-b41a-84ee1f1a05c6","id":"d9e3f133-2e22-4022-9449-65a78ed83452","dt":"2024-04-25T12:52:31Z","domain":"commons.wikimedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257044},"id":2478318778,"type":"categorize","namespace":14,"title":"Category:Milford, Derbyshire","title_url":"https://commons.wikimedia.org/wiki/Category:Milford,_Derbyshire","comment":"[[:File:The King William pub - geograph.org.uk - 5560373.jpg]] added to category","timestamp":1714049551,"user":"WereSpielChequers","bot":false,"notify_url":"https://commons.wikimedia.org/w/index.php?diff=871189763&oldid=871189716&rcid=2478318778","server_url":"https://commons.wikimedia.org","server_name":"commons.wikimedia.org","server_script_path":"/w","wiki":"commonswiki","parsedcomment":"<a href=\"/wiki/File:The_King_William_pub_-_geograph.org.uk_-_5560373.jpg\" title=\"File:The King William pub - geograph.org.uk - 5560373.jpg\">File:The King William pub - geograph.org.uk - 5560373.jpg</a> added to category"}
20:52:31.939 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://ko.wikipedia.org/wiki/%EC%A1%B0%EC%83%81%ED%99%98","request_id":"c408c6e9-7e92-4f96-8a99-1d518f3af5ed","id":"e94ece30-3416-41e9-860d-d3547f2248d6","dt":"2024-04-25T12:52:33Z","domain":"ko.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257045},"type":"log","namespace":0,"title":"조상환","title_url":"https://ko.wikipedia.org/wiki/%EC%A1%B0%EC%83%81%ED%99%98","comment":"","timestamp":1714049553,"user":"Cho Sang Hwan","bot":false,"log_id":0,"log_type":"abusefilter","log_action":"hit","log_params":{"action":"edit","filter":"71","actions":"tag","log":1521905},"log_action_comment":"Cho Sang Hwan님이 [[조상환]]에서 \"edit\" 동작을 하여 [[특수:편집필터/71|필터 71]]이(가) 작동하였습니다. 조치: 태그 ([[특수:편집필터기록/1521905|자세한 사항]])","server_url":"https://ko.wikipedia.org","server_name":"ko.wikipedia.org","server_script_path":"/w","wiki":"kowiki","parsedcomment":""}
20:52:31.940 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://os.wikipedia.org/wiki/%D0%9A%D0%B0%D1%82%D0%B5%D0%B3%D0%BE%D1%80%D0%B8:%D0%A5%D1%83%D1%81%D1%81%D0%B0%D1%80_%D0%93%D0%BE%D0%BB%D0%BB%D0%B0%D0%BD%D0%B4%D0%B8%D0%B9%D1%8B_%D1%87%D0%B8_%D0%B0%D0%BC%D0%B0%D1%80%D0%B4%D0%B8%D1%81,_%D1%83%D1%8B%D0%B4%D0%BE%D0%BD","request_id":"d2dfebf5-df74-43d5-860e-be964ee93420","id":"27e197b2-cfb1-4c65-9717-09bb25438f08","dt":"2024-04-25T12:52:33Z","domain":"os.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257046},"id":1911685,"type":"new","namespace":14,"title":"Категори:Хуссар Голландийы чи амардис, уыдон","title_url":"https://os.wikipedia.org/wiki/%D0%9A%D0%B0%D1%82%D0%B5%D0%B3%D0%BE%D1%80%D0%B8:%D0%A5%D1%83%D1%81%D1%81%D0%B0%D1%80_%D0%93%D0%BE%D0%BB%D0%BB%D0%B0%D0%BD%D0%B4%D0%B8%D0%B9%D1%8B_%D1%87%D0%B8_%D0%B0%D0%BC%D0%B0%D1%80%D0%B4%D0%B8%D1%81,_%D1%83%D1%8B%D0%B4%D0%BE%D0%BD","comment":"Ног фарс, йæ код райдайы афтæ: «[[Категори:Нидерландты чи амардис, уыдон]] [[Категори:Хуссар Голландийы зындгонд адæм|Амард]]»","timestamp":1714049553,"user":"Taamu","bot":false,"notify_url":"https://os.wikipedia.org/w/index.php?oldid=558822&rcid=1911685","minor":false,"patrolled":true,"length":{"new":167},"revision":{"new":558822},"server_url":"https://os.wikipedia.org","server_name":"os.wikipedia.org","server_script_path":"/w","wiki":"oswiki","parsedcomment":"Ног фарс, йæ код райдайы афтæ: «<a href=\"/wiki/%D0%9A%D0%B0%D1%82%D0%B5%D0%B3%D0%BE%D1%80%D0%B8:%D0%9D%D0%B8%D0%B4%D0%B5%D1%80%D0%BB%D0%B0%D0%BD%D0%B4%D1%82%D1%8B_%D1%87%D0%B8_%D0%B0%D0%BC%D0%B0%D1%80%D0%B4%D0%B8%D1%81,_%D1%83%D1%8B%D0%B4%D0%BE%D0%BD\" title=\"Категори:Нидерландты чи амардис, уыдон\">Категори:Нидерландты чи амардис, уыдон</a> <a href=\"/wiki/%D0%9A%D0%B0%D1%82%D0%B5%D0%B3%D0%BE%D1%80%D0%B8:%D0%A5%D1%83%D1%81%D1%81%D0%B0%D1%80_%D0%93%D0%BE%D0%BB%D0%BB%D0%B0%D0%BD%D0%B4%D0%B8%D0%B9%D1%8B_%D0%B7%D1%8B%D0%BD%D0%B4%D0%B3%D0%BE%D0%BD%D0%B4_%D0%B0%D0%B4%C3%A6%D0%BC\" title=\"Категори:Хуссар Голландийы зындгонд адæм\">Амард»</a>"}

……

12. Consume Message

  • 启动OpenSearchConsumer

# 此步骤可选
./kafka-console-consumer.sh --bootstrap-server 192.168.0.123:9092 \
	--topic wikimedia.recentchange --from-beginning
GET /_cat/indices?v

GET _search
{
  "query": {
    "match_all": {}
  }
}

GET /index_name/_search
{
  "query": {
    "match_all": {}
  }
}
BonsaiConsole
WikiMediaKafkaOpenSearch