在Storm(五)第一個(gè)集群Topology中用如下的命令行向集群提交了Topology:
storm jar cluster-topology-1.0-SNAPSHOT.jar com.quiterr.ExclamationTopology
但是在項(xiàng)目中需要?jiǎng)討B(tài)自動(dòng)化提交Topology攀操,也就是通過代碼來提交Topology。
這里仍然以ExclamationTopology為例來說明昌讲,不過要對(duì)ExclamationTopology做一些改造嵌纲,要把ExclamationTopology中的Bolt單獨(dú)提出來做一個(gè)項(xiàng)目(或模塊)班巩,創(chuàng)建Topology相關(guān)的代碼做成另一個(gè)項(xiàng)目。
一、Topology項(xiàng)目(后續(xù)以項(xiàng)目名dynamic-submit-classes來敘述)
這個(gè)項(xiàng)目用來放Topology自身相關(guān)的東西寥殖,包括Spout慈参、Bolt等呛牲,實(shí)際項(xiàng)目中可以很復(fù)雜,但是在這個(gè)例子中很簡(jiǎn)單驮配,因?yàn)镾pout是Storm核心庫(kù)提供的娘扩,而Bolt只有一個(gè)。
(一)pom文件
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
極簡(jiǎn)單壮锻,沒什么好說的琐旁。
(二)ExclamationBolt
這個(gè)項(xiàng)目只有一個(gè)類ExclamationBolt,從ExclamationTopology里邊copy出來的猜绣。
public class ExclamationBolt extends BaseRichBolt {
OutputCollector _collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
(三)安裝到本地
使用mvn install
命令安裝到本地灰殴,因?yàn)橄乱粋€(gè)項(xiàng)目要用到本項(xiàng)目。
(四)部署到Storm服務(wù)器
把生成的jar包放到Storm服務(wù)器上掰邢,這里放的路徑是/home/app/test
本項(xiàng)目源代碼:https://github.com/quiterr/storm-test/tree/master/dynamic-submit-classes
二牺陶、創(chuàng)建Topology項(xiàng)目(后續(xù)以項(xiàng)目名dynamic-submit來敘述)
用Spring Boot做一個(gè)RESTful API,專門用來接收創(chuàng)建Topology的請(qǐng)求辣之,創(chuàng)建完畢后向集群提交Topology掰伸。
(一)pom文件
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>1.5.3.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--配置處理器,會(huì)在編輯配置文件的時(shí)候智能提示-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<!--自己實(shí)現(xiàn)的topology所在的庫(kù)-->
<dependency>
<groupId>com.quiterr</groupId>
<artifactId>dynamic-submit-classes</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>1.5.3.RELEASE</version>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
除了Spring Boot相關(guān)的依賴怀估,就是上一個(gè)項(xiàng)目的依賴狮鸭。
(二)SubmitController
SubmitController接收一個(gè)topologyName參數(shù)合搅,然后創(chuàng)建topology,最后提交到集群歧蕉。代碼中有一行指明了topology所在的位置:System.setProperty("storm.jar", "/home/app/test/dynamic-submit-classes-1.0-SNAPSHOT.jar");
@RestController
public class SubmitController {
@RequestMapping(value = "/topology/create", method = RequestMethod.POST)
private boolean create(@RequestBody String topologyName){
Config conf = new Config();
conf.setDebug(true);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word", new TestWordSpout(), 10);
builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
System.setProperty("storm.jar", "/home/app/test/dynamic-submit-classes-1.0-SNAPSHOT.jar");
try {
StormSubmitter.submitTopology(topologyName, conf, builder.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
} catch (AuthorizationException e) {
e.printStackTrace();
}
return true;
}
}
本項(xiàng)目源代碼:https://github.com/quiterr/storm-test/tree/master/dynamic-submit
三历筝、測(cè)試和說明
把dynamic-submit-1.0-SNAPSHOT.jar和dynamic-submit-classes-1.0-SNAPSHOT.jar都放到Storm服務(wù)器上后,用java -jar dynamic-submit-1.0-SNAPSHOT.jar
啟動(dòng)第一個(gè)項(xiàng)目廊谓,然后用一個(gè)RESTful的測(cè)試客戶端發(fā)請(qǐng)求即可梳猪。
有個(gè)問題:topology被提交到哪里了?
在Storm基礎(chǔ)(三)配置這篇文章里提到過蒸痹,如果沒有定義storm.yml文件春弥,就會(huì)使用Storm核心庫(kù)中自帶的default.xml文件,默認(rèn)情況下topology就會(huì)提交到本地叠荠,這就是一定要把程序放到Storm所在的服務(wù)器上的原因匿沛。
四、用代碼遠(yuǎn)程提交
能不能不把topology傳到服務(wù)器上榛鼎,就在本地提交呢逃呼?
理論上是可以的。
1.把dynamic-submit-classes-1.0-SNAPSHOT.jar放到dynamic-submit工程目錄下者娱。
2.StormSubmitter類中設(shè)置環(huán)境變量:System.setProperty("storm.jar", "dynamic-submit-classes-1.0-SNAPSHOT.jar");
3.dynamic-submit加一個(gè)storm.yml配置文件抡笼,內(nèi)容像這樣:
########### These MUST be filled in for a storm configuration
storm.zookeeper.servers:
- "172.19.3.147"
- "172.19.3.148"
- "172.19.3.149"
nimbus.seeds: ["172.19.3.147"]
storm.local.dir: "/data/storm_local"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
- 6704
- 6705
- 6706
- 6707