一、介紹
通過Kafka Stream編寫一個(gè)或多個(gè)的計(jì)算邏輯的處理器拓?fù)渖氖浮F渲刑幚砥魍負(fù)涫且粋€(gè)由流(線)連接的流處理(節(jié)點(diǎn))的圖隔缀。根據(jù)不同的需求闻镶,我們可以構(gòu)造不同的處理器拓?fù)鋱D去實(shí)現(xiàn)某一功能,但如果為每一個(gè)功能都自己去構(gòu)造一個(gè)拓?fù)鋱D将鸵,未免使得我們的代碼太過冗余勉盅。此時(shí),我們就需要開發(fā)一個(gè)可以根據(jù)我們拓?fù)涮幚砥骷靶枨髣?dòng)態(tài)創(chuàng)建拓?fù)鋱D的工具顶掉。
二. 構(gòu)建一個(gè)固定的拓?fù)?/h2>
1. 首先加載kafka stream的配置項(xiàng)
private static Map<String, Object> props = new HashMap<>();
//加載配置文件
static {
PropertyReaderUtil reader = new PropertyReaderUtil();
Map<String, String> serverMap = reader.readPropertyFile("kafkaServer.properties");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, serverMap.get("bootstrap.servers"));
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,WallclockTimestampExtractor.class);
}
2.構(gòu)建固定拓?fù)涮幚砥鲌D
public static void topology(){
TopologyBuilder builder = new TopologyBuilder();
String parentName = "source";
builder.addSource("source","data");
//KeywordProcessor草娜、ClassifitionProcessor為具體的拓?fù)涮幚砥? builder.addProcessor("keyword", KeywordProcessor::new,parentName);
builder.addProcessor("classifition", ClassifitionProcessor::new,"keyword");
builder.addSink("sink","output","classifition");
StreamsConfig streamsConfig = new StreamsConfig(props);
KafkaStreams kafkaStreams = new KafkaStreams(builder,streamsConfig);
kafkaStreams.start();
}
三、構(gòu)建動(dòng)態(tài)拓?fù)鋱D
1.創(chuàng)建一個(gè)類實(shí)現(xiàn)ProcessorSupplier
接口
public class ProcessorSupplierFactory implements ProcessorSupplier {
public static final Logger logger = LoggerFactory.getLogger(ProcessorSupplierFactory.class);
private String processorName;
public ProcessorSupplierFactory(String processorName){
this.processorName = processorName;
}
@Override
public Processor get() {
Processor processor = null;
try {
processor = (Processor) Class.forName(processorName).newInstance();
} catch (InstantiationException e) {
logger.error("反射類失敗",e);
} catch (IllegalAccessException e) {
logger.error("錯(cuò)誤:",e);
} catch (ClassNotFoundException e) {
logger.error("找不到類",e);
}
return processor;
}
}
private static Map<String, Object> props = new HashMap<>();
//加載配置文件
static {
PropertyReaderUtil reader = new PropertyReaderUtil();
Map<String, String> serverMap = reader.readPropertyFile("kafkaServer.properties");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, serverMap.get("bootstrap.servers"));
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,WallclockTimestampExtractor.class);
}
public static void topology(){
TopologyBuilder builder = new TopologyBuilder();
String parentName = "source";
builder.addSource("source","data");
//KeywordProcessor草娜、ClassifitionProcessor為具體的拓?fù)涮幚砥? builder.addProcessor("keyword", KeywordProcessor::new,parentName);
builder.addProcessor("classifition", ClassifitionProcessor::new,"keyword");
builder.addSink("sink","output","classifition");
StreamsConfig streamsConfig = new StreamsConfig(props);
KafkaStreams kafkaStreams = new KafkaStreams(builder,streamsConfig);
kafkaStreams.start();
}
ProcessorSupplier
接口public class ProcessorSupplierFactory implements ProcessorSupplier {
public static final Logger logger = LoggerFactory.getLogger(ProcessorSupplierFactory.class);
private String processorName;
public ProcessorSupplierFactory(String processorName){
this.processorName = processorName;
}
@Override
public Processor get() {
Processor processor = null;
try {
processor = (Processor) Class.forName(processorName).newInstance();
} catch (InstantiationException e) {
logger.error("反射類失敗",e);
} catch (IllegalAccessException e) {
logger.error("錯(cuò)誤:",e);
} catch (ClassNotFoundException e) {
logger.error("找不到類",e);
}
return processor;
}
}
通過反射以及構(gòu)造器實(shí)現(xiàn)動(dòng)態(tài)創(chuàng)建拓?fù)涮幚砥鳎╬rocessor)實(shí)例痒筒。
2.加載kafka stream配置文件
private static Map<String, Object> props = new HashMap<>();
//加載配置文件
static {
PropertyReaderUtil reader = new PropertyReaderUtil();
Map<String, String> serverMap = reader.readPropertyFile("kafkaServer.properties");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, serverMap.get("bootstrap.servers"));
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
}
3.動(dòng)態(tài)構(gòu)建拓?fù)涮幚砥鲌D
/**
* 動(dòng)態(tài)構(gòu)建拓?fù)? * @param processors 拓?fù)涮幚砥鰿lassName集合
*/
public static void dynamicTopology(List<String> processors){
TopologyBuilder builder = new TopologyBuilder();
String parentName = "source";
builder.addSource("source","data");
for (String process :
processors) {
//通過構(gòu)造器動(dòng)態(tài)創(chuàng)建Processor實(shí)例
builder.addProcessor(process, new ProcessorSupplierFactory(process), parentName);
parentName = process;
}
builder.addSink("sink","output",parentName);
StreamsConfig streamsConfig = new StreamsConfig(props);
KafkaStreams kafkaStreams = new KafkaStreams(builder,streamsConfig);
kafkaStreams.start();
}
至此驱还,我們就實(shí)現(xiàn)了如何動(dòng)態(tài)的去創(chuàng)建拓?fù)涮幚砥鲌D的功能。