Trident編程實(shí)質(zhì)與普通的storm源碼API編程沒有什么本質(zhì)爬范,其只是對topology出的邏輯執(zhí)行代碼進(jìn)行了封裝捺檬,使得編碼處理更一體化毁菱,功能類要繼承BaseFunction類窄瘟,重寫execute一個方法衷佃,不在想原始API編程那樣重寫很多方法,造成代碼冗余蹄葱。
直接上代碼
/**
*
* @author mis
*
* Trident的作用就是簡化了bolt
*/
public class TridentFunction {
public static class SumFunction extends BaseFunction {
public void execute(TridentTuple tuple, TridentCollector collector) {
System.out.println("傳入進(jìn)來的內(nèi)容為:" + tuple);
//獲取a氏义、b兩個域值
int a = tuple.getInteger(0);//第0個元素
int b = tuple.getInteger(1);//第1個元素
int sum = a + b ; //sum計(jì)算前兩個元素之和
//發(fā)射數(shù)據(jù)
collector.emit(new Values(sum));
}
}
public static class Result extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
//獲取tuple輸入內(nèi)容
Integer a = tuple.getIntegerByField("a");
Integer b = tuple.getIntegerByField("b");
Integer c = tuple.getIntegerByField("c");
Integer d = tuple.getIntegerByField("d");
Integer sum = tuple.getIntegerByField("sum");
System.out.println("a: " + a +", b: " + b + ", c: " + c + ", d: " +d + ", sum: " + sum);
}
}
public static StormTopology buildTopology() {
TridentTopology topology = new TridentTopology();
//設(shè)定數(shù)據(jù)源
@SuppressWarnings("unchecked")
FixedBatchSpout spout = new FixedBatchSpout(
new Fields("a","b","c","d"),//聲明輸入的域字段為"a","b","c","d"
4,//設(shè)置批處理大小
//設(shè)置數(shù)據(jù)源內(nèi)容(此處測試數(shù)據(jù))
new Values(1,4,7,10),
new Values(1,1,3,11),
new Values(2,2,7,1),
new Values(2,5,7,2));
//指定是否循環(huán)
spout.setCycle(false);
//指定輸入員
Stream inputStream = topology.newStream("spout", spout);
/**
* 要實(shí)現(xiàn)流spout - bolt的模式 在trident里是使用each來做到的
* each方法參數(shù):
* 1,輸入數(shù)據(jù)源參數(shù)名稱:"a","b","c","d"
* 2,需要流轉(zhuǎn)執(zhí)行的function對象(也就是bolt對象):new SumFunction()
* 3,指定function對象里的輸出參數(shù)名稱:sum
* */
inputStream.each(new Fields("a","b","c","d"), new SumFunction(),new Fields("sum"))
/**
* 繼續(xù)使用each調(diào)用下一個function(bolt)
* 1,參數(shù)一為:"a","b","c","d","sum"
* 2,參數(shù)二為:new Result() 也就是執(zhí)行函數(shù),第三個參數(shù)為沒有輸出(即這是最后一個bolt)
* */
.each(new Fields("a","b","c","d","sum"),new Result(),new Fields());
return topology.build();//利用這種方式图云,我們返回一個StormTopology對象惯悠,進(jìn)行提交
}
public static void main(String[] args) throws Exception {
Config conf = new Config();
conf.setNumWorkers(2);
conf.setMaxSpoutPending(20);
if(args.length == 0){//本地模式運(yùn)行
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("trident-function", conf, buildTopology());
Thread.sleep(10000);
cluster.shutdown();
}else{//集群模式運(yùn)行
StormSubmitter.submitTopology(args[0], conf, buildTopology());
}
}
}
使用each()方法進(jìn)行鏈?zhǔn)骄幊蹋瑪?shù)據(jù)所流向的bolt類直接以對象的形式傳入
結(jié)果:
2017-04-26_113201.png