Kafka Stream動(dòng)態(tài)構(gòu)建拓?fù)鋱D

一、介紹

通過Kafka Stream編寫一個(gè)或多個(gè)的計(jì)算邏輯的處理器拓?fù)渖氖浮F渲刑幚砥魍負(fù)涫且粋€(gè)由流(線)連接的流處理(節(jié)點(diǎn))的圖隔缀。根據(jù)不同的需求闻镶,我們可以構(gòu)造不同的處理器拓?fù)鋱D去實(shí)現(xiàn)某一功能,但如果為每一個(gè)功能都自己去構(gòu)造一個(gè)拓?fù)鋱D将鸵,未免使得我們的代碼太過冗余勉盅。此時(shí),我們就需要開發(fā)一個(gè)可以根據(jù)我們拓?fù)涮幚砥骷靶枨髣?dòng)態(tài)創(chuàng)建拓?fù)鋱D的工具顶掉。


拓?fù)鋱D(引自http://orchome.com/335)

二. 構(gòu)建一個(gè)固定的拓?fù)?/h2>

1. 首先加載kafka stream的配置項(xiàng)

private static Map<String, Object> props = new HashMap<>();
    //加載配置文件
    static {
        PropertyReaderUtil reader = new PropertyReaderUtil();
        Map<String, String> serverMap = reader.readPropertyFile("kafkaServer.properties");
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, serverMap.get("bootstrap.servers"));
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,WallclockTimestampExtractor.class);
    }

2.構(gòu)建固定拓?fù)涮幚砥鲌D

public static void topology(){
        TopologyBuilder builder = new TopologyBuilder();
        String parentName = "source";
        builder.addSource("source","data");
        //KeywordProcessor草娜、ClassifitionProcessor為具體的拓?fù)涮幚砥?        builder.addProcessor("keyword", KeywordProcessor::new,parentName);
        builder.addProcessor("classifition", ClassifitionProcessor::new,"keyword");
        builder.addSink("sink","output","classifition");
        StreamsConfig streamsConfig = new StreamsConfig(props);
        KafkaStreams kafkaStreams = new KafkaStreams(builder,streamsConfig);
        kafkaStreams.start();
    }

三、構(gòu)建動(dòng)態(tài)拓?fù)鋱D

1.創(chuàng)建一個(gè)類實(shí)現(xiàn)ProcessorSupplier接口

public class ProcessorSupplierFactory implements ProcessorSupplier {
    public static final Logger logger = LoggerFactory.getLogger(ProcessorSupplierFactory.class);

    private String processorName;
    public ProcessorSupplierFactory(String processorName){
        this.processorName = processorName;
    }
    @Override
    public Processor get() {
        Processor processor = null;
        try {
           processor = (Processor) Class.forName(processorName).newInstance();
        } catch (InstantiationException e) {
            logger.error("反射類失敗",e);
        } catch (IllegalAccessException e) {
            logger.error("錯(cuò)誤:",e);
        } catch (ClassNotFoundException e) {
            logger.error("找不到類",e);
        }
        return processor;
    }
}

通過反射以及構(gòu)造器實(shí)現(xiàn)動(dòng)態(tài)創(chuàng)建拓?fù)涮幚砥鳎╬rocessor)實(shí)例痒筒。

2.加載kafka stream配置文件

private static Map<String, Object> props = new HashMap<>();
    //加載配置文件
    static {
        PropertyReaderUtil reader = new PropertyReaderUtil();
        Map<String, String> serverMap = reader.readPropertyFile("kafkaServer.properties");
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, serverMap.get("bootstrap.servers"));
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
    }

3.動(dòng)態(tài)構(gòu)建拓?fù)涮幚砥鲌D

/**
  * 動(dòng)態(tài)構(gòu)建拓?fù)?  * @param processors 拓?fù)涮幚砥鰿lassName集合
  */
public static void dynamicTopology(List<String> processors){
        TopologyBuilder builder = new TopologyBuilder();
        String parentName = "source";
        builder.addSource("source","data");
        for (String process :
                processors) {
            //通過構(gòu)造器動(dòng)態(tài)創(chuàng)建Processor實(shí)例
            builder.addProcessor(process, new ProcessorSupplierFactory(process), parentName);
            parentName = process;
        }
        builder.addSink("sink","output",parentName);
        StreamsConfig streamsConfig = new StreamsConfig(props);
        KafkaStreams kafkaStreams = new KafkaStreams(builder,streamsConfig);
        kafkaStreams.start();
    }

至此驱还,我們就實(shí)現(xiàn)了如何動(dòng)態(tài)的去創(chuàng)建拓?fù)涮幚砥鲌D的功能。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末凸克,一起剝皮案震驚了整個(gè)濱河市议蟆,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌萎战,老刑警劉巖咐容,帶你破解...
    沈念sama閱讀 211,817評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異蚂维,居然都是意外死亡戳粒,警方通過查閱死者的電腦和手機(jī)路狮,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,329評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來蔚约,“玉大人奄妨,你說我怎么就攤上這事∑凰睿” “怎么了砸抛?”我有些...
    開封第一講書人閱讀 157,354評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長树枫。 經(jīng)常有香客問我直焙,道長,這世上最難降的妖魔是什么砂轻? 我笑而不...
    開封第一講書人閱讀 56,498評(píng)論 1 284
  • 正文 為了忘掉前任奔誓,我火速辦了婚禮,結(jié)果婚禮上搔涝,老公的妹妹穿的比我還像新娘厨喂。我一直安慰自己,他們只是感情好庄呈,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,600評(píng)論 6 386
  • 文/花漫 我一把揭開白布杯聚。 她就那樣靜靜地躺著,像睡著了一般抒痒。 火紅的嫁衣襯著肌膚如雪幌绍。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,829評(píng)論 1 290
  • 那天故响,我揣著相機(jī)與錄音傀广,去河邊找鬼。 笑死彩届,一個(gè)胖子當(dāng)著我的面吹牛伪冰,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播樟蠕,決...
    沈念sama閱讀 38,979評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼贮聂,長吁一口氣:“原來是場噩夢(mèng)啊……” “哼!你這毒婦竟也來了寨辩?” 一聲冷哼從身側(cè)響起吓懈,我...
    開封第一講書人閱讀 37,722評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎靡狞,沒想到半個(gè)月后耻警,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,189評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,519評(píng)論 2 327
  • 正文 我和宋清朗相戀三年甘穿,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了腮恩。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,654評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡温兼,死狀恐怖秸滴,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情募判,我是刑警寧澤荡含,帶...
    沈念sama閱讀 34,329評(píng)論 4 330
  • 正文 年R本政府宣布,位于F島的核電站兰伤,受9級(jí)特大地震影響内颗,放射性物質(zhì)發(fā)生泄漏钧排。R本人自食惡果不足惜敦腔,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,940評(píng)論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望恨溜。 院中可真熱鬧符衔,春花似錦、人聲如沸糟袁。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,762評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽项戴。三九已至形帮,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間周叮,已是汗流浹背辩撑。 一陣腳步聲響...
    開封第一講書人閱讀 31,993評(píng)論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留仿耽,地道東北人合冀。 一個(gè)月前我還...
    沈念sama閱讀 46,382評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像项贺,于是被迫代替她去往敵國和親君躺。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,543評(píng)論 2 349

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理开缎,服務(wù)發(fā)現(xiàn)棕叫,斷路器,智...
    卡卡羅2017閱讀 134,633評(píng)論 18 139
  • 姓名:周小蓬 16019110037 轉(zhuǎn)載自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw閱讀 34,713評(píng)論 13 425
  • Kafka設(shè)計(jì)解析(七)- Kafka Stream 原創(chuàng)文章奕删,轉(zhuǎn)載請(qǐng)務(wù)必將下面這段話置于文章開頭處谍珊。本文轉(zhuǎn)發(fā)自技...
    小小少年Boy閱讀 5,245評(píng)論 0 32
  • 昨天23點(diǎn)左右兒子想燒烤,我趁機(jī)邀兒子一起去外面吃,兒子依然很酷的回答我砌滞,不去侮邀,你給我買。我和老公就開車出去給他買...
    徐亞娟閱讀 168評(píng)論 2 2
  • (一) 我有個(gè)習(xí)慣贝润,每當(dāng)看到覺得寫得不錯(cuò)的文章绊茧,如果當(dāng)時(shí)沒有時(shí)間或者心思看完和消化,就會(huì)隨手保存下來打掘,想待到閑暇的...
    實(shí)驗(yàn)室里的風(fēng)信子閱讀 1,566評(píng)論 2 4