通過(guò)自定義實(shí)現(xiàn)KafkaClientSupplier
接口實(shí)現(xiàn)從一個(gè)kafka集群讀取數(shù)據(jù),再寫(xiě)入到另一個(gè)kafka集群中怎诫。主要實(shí)現(xiàn)如下:
1.自定義實(shí)現(xiàn)KafkaClientSupplier
接口
package org.feiyu.dataprocess.kafka;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.streams.KafkaClientSupplier;
import java.util.HashMap;
import java.util.Map;
/**
* 自定義KafkaClientSupplier
*/
public class MyKafkaClientSupplier implements KafkaClientSupplier {
//生產(chǎn)者配置文件
private Map<String,Object> producerConfig;
//消費(fèi)者配置文件
private Map<String,Object> consumerConfig;
public MyKafkaClientSupplier(Map<String, Object> producerConfig, Map<String, Object> consumerConfig) {
this.producerConfig = producerConfig;
this.consumerConfig = consumerConfig;
}
@Override
public Producer<byte[], byte[]> getProducer(Map<String, Object> config) {
Map<String,Object> map = new HashMap<>(config);
map.putAll(producerConfig);
return new KafkaProducer<>(map,new ByteArraySerializer(),new ByteArraySerializer());
}
@Override
public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) {
Map<String,Object> map = new HashMap<>(config);
map.putAll(consumerConfig);
return new KafkaConsumer<>(map,new ByteArrayDeserializer(),new ByteArrayDeserializer());
}
@Override
public Consumer<byte[], byte[]> getRestoreConsumer(Map<String, Object> config) {
Map<String,Object> map = new HashMap<>(config);
map.putAll(consumerConfig);
return new KafkaConsumer<>(map,new ByteArrayDeserializer(),new ByteArrayDeserializer());
}
}
2.創(chuàng)建自定義的KafkaClientSupplier
實(shí)例并傳入KafkaStreams
構(gòu)造器中
StreamsConfig config = new StreamsConfig(props);
MyKafkaClientSupplier supplier = new MyKafkaClientSupplier(
Collections.singletonMap(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"http://生產(chǎn)者kafka集群配置"),
Collections.singletonMap(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"http://消費(fèi)者kafka集群配置"));
KafkaStreams streams = new KafkaStreams(builder, config,supplier);
streams.start();
kafkaStreams.add(streams);
3.在kafka stream啟動(dòng)日志中可以看到配置生效瘾晃,實(shí)現(xiàn)了kafka stream的讀寫(xiě)分離
消費(fèi)者配置
生產(chǎn)者配置