Storm(七)通過代碼提交Topology

Storm(五)第一個(gè)集群Topology中用如下的命令行向集群提交了Topology:
storm jar cluster-topology-1.0-SNAPSHOT.jar com.quiterr.ExclamationTopology
但是在項(xiàng)目中需要?jiǎng)討B(tài)自動(dòng)化提交Topology攀操,也就是通過代碼來提交Topology。

這里仍然以ExclamationTopology為例來說明昌讲,不過要對(duì)ExclamationTopology做一些改造嵌纲,要把ExclamationTopology中的Bolt單獨(dú)提出來做一個(gè)項(xiàng)目(或模塊)班巩,創(chuàng)建Topology相關(guān)的代碼做成另一個(gè)項(xiàng)目。

一、Topology項(xiàng)目(后續(xù)以項(xiàng)目名dynamic-submit-classes來敘述)

這個(gè)項(xiàng)目用來放Topology自身相關(guān)的東西寥殖,包括Spout慈参、Bolt等呛牲,實(shí)際項(xiàng)目中可以很復(fù)雜,但是在這個(gè)例子中很簡(jiǎn)單驮配,因?yàn)镾pout是Storm核心庫(kù)提供的娘扩,而Bolt只有一個(gè)。
(一)pom文件

    <dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>${storm.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

極簡(jiǎn)單壮锻,沒什么好說的琐旁。
(二)ExclamationBolt
這個(gè)項(xiàng)目只有一個(gè)類ExclamationBolt,從ExclamationTopology里邊copy出來的猜绣。

public class ExclamationBolt extends BaseRichBolt {
    OutputCollector _collector;

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        _collector = collector;
    }

    public void execute(Tuple tuple) {
        _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
        _collector.ack(tuple);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

}

(三)安裝到本地
使用mvn install命令安裝到本地灰殴,因?yàn)橄乱粋€(gè)項(xiàng)目要用到本項(xiàng)目。

(四)部署到Storm服務(wù)器
把生成的jar包放到Storm服務(wù)器上掰邢,這里放的路徑是/home/app/test

本項(xiàng)目源代碼:https://github.com/quiterr/storm-test/tree/master/dynamic-submit-classes

二牺陶、創(chuàng)建Topology項(xiàng)目(后續(xù)以項(xiàng)目名dynamic-submit來敘述)

用Spring Boot做一個(gè)RESTful API,專門用來接收創(chuàng)建Topology的請(qǐng)求辣之,創(chuàng)建完畢后向集群提交Topology掰伸。
(一)pom文件

<dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>1.5.3.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--配置處理器,會(huì)在編輯配置文件的時(shí)候智能提示-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
        </dependency>
        <!--自己實(shí)現(xiàn)的topology所在的庫(kù)-->
        <dependency>
            <groupId>com.quiterr</groupId>
            <artifactId>dynamic-submit-classes</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>1.5.3.RELEASE</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

除了Spring Boot相關(guān)的依賴怀估,就是上一個(gè)項(xiàng)目的依賴狮鸭。
(二)SubmitController
SubmitController接收一個(gè)topologyName參數(shù)合搅,然后創(chuàng)建topology,最后提交到集群歧蕉。代碼中有一行指明了topology所在的位置:System.setProperty("storm.jar", "/home/app/test/dynamic-submit-classes-1.0-SNAPSHOT.jar");

@RestController
public class SubmitController {
    @RequestMapping(value = "/topology/create", method = RequestMethod.POST)
    private boolean create(@RequestBody String topologyName){
        Config conf = new Config();
        conf.setDebug(true);
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("word", new TestWordSpout(), 10);
        builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
        builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");

        System.setProperty("storm.jar", "/home/app/test/dynamic-submit-classes-1.0-SNAPSHOT.jar");
        try {
            StormSubmitter.submitTopology(topologyName, conf, builder.createTopology());
        } catch (AlreadyAliveException e) {
            e.printStackTrace();
        } catch (InvalidTopologyException e) {
            e.printStackTrace();
        } catch (AuthorizationException e) {
            e.printStackTrace();
        }
        return true;
    }
}

本項(xiàng)目源代碼:https://github.com/quiterr/storm-test/tree/master/dynamic-submit

三历筝、測(cè)試和說明

把dynamic-submit-1.0-SNAPSHOT.jar和dynamic-submit-classes-1.0-SNAPSHOT.jar都放到Storm服務(wù)器上后,用java -jar dynamic-submit-1.0-SNAPSHOT.jar啟動(dòng)第一個(gè)項(xiàng)目廊谓,然后用一個(gè)RESTful的測(cè)試客戶端發(fā)請(qǐng)求即可梳猪。

有個(gè)問題:topology被提交到哪里了?
Storm基礎(chǔ)(三)配置這篇文章里提到過蒸痹,如果沒有定義storm.yml文件春弥,就會(huì)使用Storm核心庫(kù)中自帶的default.xml文件,默認(rèn)情況下topology就會(huì)提交到本地叠荠,這就是一定要把程序放到Storm所在的服務(wù)器上的原因匿沛。

四、用代碼遠(yuǎn)程提交

能不能不把topology傳到服務(wù)器上榛鼎,就在本地提交呢逃呼?
理論上是可以的。
1.把dynamic-submit-classes-1.0-SNAPSHOT.jar放到dynamic-submit工程目錄下者娱。
2.StormSubmitter類中設(shè)置環(huán)境變量:System.setProperty("storm.jar", "dynamic-submit-classes-1.0-SNAPSHOT.jar");
3.dynamic-submit加一個(gè)storm.yml配置文件抡笼,內(nèi)容像這樣:

########### These MUST be filled in for a storm configuration
 storm.zookeeper.servers:
     - "172.19.3.147"
     - "172.19.3.148"
     - "172.19.3.149"
 
 nimbus.seeds: ["172.19.3.147"]
 storm.local.dir: "/data/storm_local"
 supervisor.slots.ports:
     - 6700
     - 6701
     - 6702
     - 6703
     - 6704
     - 6705
     - 6706
     - 6707
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市黄鳍,隨后出現(xiàn)的幾起案子推姻,更是在濱河造成了極大的恐慌,老刑警劉巖框沟,帶你破解...
    沈念sama閱讀 211,042評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件藏古,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡忍燥,警方通過查閱死者的電腦和手機(jī)拧晕,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,996評(píng)論 2 384
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來梅垄,“玉大人厂捞,你說我怎么就攤上這事“ゼ祝” “怎么了蔫敲?”我有些...
    開封第一講書人閱讀 156,674評(píng)論 0 345
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)炭玫。 經(jīng)常有香客問我奈嘿,道長(zhǎng),這世上最難降的妖魔是什么吞加? 我笑而不...
    開封第一講書人閱讀 56,340評(píng)論 1 283
  • 正文 為了忘掉前任裙犹,我火速辦了婚禮尽狠,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘叶圃。我一直安慰自己袄膏,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,404評(píng)論 5 384
  • 文/花漫 我一把揭開白布掺冠。 她就那樣靜靜地躺著沉馆,像睡著了一般。 火紅的嫁衣襯著肌膚如雪德崭。 梳的紋絲不亂的頭發(fā)上斥黑,一...
    開封第一講書人閱讀 49,749評(píng)論 1 289
  • 那天,我揣著相機(jī)與錄音眉厨,去河邊找鬼锌奴。 笑死,一個(gè)胖子當(dāng)著我的面吹牛憾股,可吹牛的內(nèi)容都是我干的鹿蜀。 我是一名探鬼主播,決...
    沈念sama閱讀 38,902評(píng)論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼服球,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼茴恰!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起有咨,我...
    開封第一講書人閱讀 37,662評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤琐簇,失蹤者是張志新(化名)和其女友劉穎蒸健,沒想到半個(gè)月后座享,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,110評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡似忧,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,451評(píng)論 2 325
  • 正文 我和宋清朗相戀三年渣叛,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片盯捌。...
    茶點(diǎn)故事閱讀 38,577評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡淳衙,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出饺著,到底是詐尸還是另有隱情箫攀,我是刑警寧澤,帶...
    沈念sama閱讀 34,258評(píng)論 4 328
  • 正文 年R本政府宣布幼衰,位于F島的核電站靴跛,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏渡嚣。R本人自食惡果不足惜梢睛,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,848評(píng)論 3 312
  • 文/蒙蒙 一肥印、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧绝葡,春花似錦深碱、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,726評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至愉阎,卻和暖如春竞膳,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背诫硕。 一陣腳步聲響...
    開封第一講書人閱讀 31,952評(píng)論 1 264
  • 我被黑心中介騙來泰國(guó)打工坦辟, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人章办。 一個(gè)月前我還...
    沈念sama閱讀 46,271評(píng)論 2 360
  • 正文 我出身青樓锉走,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親藕届。 傳聞我的和親對(duì)象是個(gè)殘疾皇子挪蹭,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,452評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容