Trident Spout
Trident Spout特點
- Trident中,定義Spout的接口為ITridentSpout围来。
- Trident Spout必須以批量形式發(fā)送tuple。
- Trident Spout不真正執(zhí)行數(shù)據(jù)的發(fā)送茶袒,而是由ITridentSpout.Emitter負責發(fā)送數(shù)據(jù)褪迟。同時引入了協(xié)調器的概念,協(xié)調器負責管理數(shù)據(jù)發(fā)送的批次和元數(shù)據(jù)弦赖,當事務失敗時,調度Emitter根據(jù)元數(shù)據(jù)重新發(fā)送數(shù)據(jù)浦辨。協(xié)調器接口為ITridentSpout.BatchCoordinator蹬竖。
maven依賴
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.2</version>
</dependency>
Trident Spout簡單實現(xiàn)
public class WordSpout implements ITridentSpout<String> {
/**
*
*/
private static final long serialVersionUID = -954626449213280061L;
/**
* 協(xié)調器
* 負責保存重放batch元數(shù)據(jù),當重放一個batch時荤牍,通過協(xié)調器中保存的元數(shù)據(jù)創(chuàng)建batch
*/
@Override
public BatchCoordinator<String> getCoordinator(String txStateId,Map conf, TopologyContext context) {
return new WordCoordinator();
}
@Override
public Emitter<String> getEmitter(String txStateId, Map conf, TopologyContext context) {
return new WordEmitter();
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
/**
* 定義發(fā)送的所有字段
*/
@Override
public Fields getOutputFields() {
return new Fields("field1","field2");
}
private class WordCoordinator implements BatchCoordinator<String> {
@Override
public String initializeTransaction(long txid, String prevMetadata, String currMetadata) {
return null;
}
@Override
public void success(long txid) {
logger.info("success: " + txid);
}
@Override
public boolean isReady(long txid) {
return Boolean.TRUE;
}
@Override
public void close() {
}
}
/**
* 發(fā)射器
* 發(fā)送數(shù)據(jù)流
*
*/
private class WordEmitter implements Emitter<String> {
@Override
public void success(TransactionAttempt tx) {
logger.info("emitter success " + tx.getId());
}
@Override
public void close() {
}
/**
* 每次調用本方法所發(fā)送的數(shù)據(jù)集合被稱為batch
* batch是Trident中發(fā)送數(shù)據(jù)流的最小單元
*/
@Override
public void emitBatch(TransactionAttempt tx, String coordinatorMeta, TridentCollector collector) {
for(int i=0;i<10;i++){
List list = Lists.newArrayList();
list.add("event1");
list.add("event2");
collector.emit(list);
}
}
}
private Logger logger = LoggerFactory.getLogger("Trident Spout");
}
重點說明
Emitter定義的emitBatch方法案腺。該方法實現(xiàn)了發(fā)送哪里數(shù)據(jù)。該方法每執(zhí)行一次康吵,發(fā)送的所有數(shù)據(jù)被稱為batch劈榨。batch中的每條數(shù)據(jù)被稱為tuple。
ITridentSpout.getOutputFields定義了每條tuple有哪些字段晦嵌,本例中定義了2個字段同辣,字段名為"field1"、"field2"惭载。在Emitter.emitBatch中每條tuple均符合該定義旱函。本例中每次調用emitBatch方法發(fā)送的數(shù)據(jù)內容及格式可以假象如下:
[event1,event2]
[event1,event2]
[event1,event2]
[event1,event2]
[event1,event2]
[event1,event2]
[event1,event2]
[event1,event2]
[event1,event2]
[event1,event2]
每次調用均發(fā)送了10條數(shù)據(jù)(10個tuple),這10個tuple構成1個batch,tuple均符合ITridentSpout.getOutputFields中定義的字段
Trident bolt
Trident bolt特點
- Trident中沒有bolt接口,而是分為了Filter和Function兩類
Trident Function簡單實現(xiàn)
public class WordFunction extends BaseFunction {
/**
*
*/
private static final long serialVersionUID = 735468688795780833L;
/**
* 接收數(shù)據(jù)流
* 每次接收batch中一條數(shù)據(jù)
*/
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
logger.info(tuple.getValueByField("field1").toString());
}
private Logger logger = LoggerFactory.getLogger("Trident Function");
}
重點說明
BaseFunction已經(jīng)實現(xiàn)了Function接口描滔。
execute方法用于具體實現(xiàn)接收到tuple后如何處理棒妨。每次接收1個tuple。在本例中emitter每次發(fā)送1個batch,每個batch有10條數(shù)據(jù)含长,則每次發(fā)送數(shù)據(jù)券腔,execute方法均會被調用10次伏穆。
在tuple中可以獲取數(shù)據(jù)流中的數(shù)據(jù),能夠獲取的字段受TridentTopology對象的控制纷纫。
在execute方法中處理完成后可繼續(xù)使用TridentCollector對象繼續(xù)發(fā)送數(shù)據(jù)到下一節(jié)點枕扫,F(xiàn)unction發(fā)送數(shù)據(jù)時只能添加新的字段,不能修改或刪除已有的字段
啟動TridentTopology
public class Start {
public static StormTopology buildTopology() {
TridentTopology topology = new TridentTopology();
WordSpout spout = new WordSpout();
WordFunction function = new WordFunction();
topology.newStream("filter", spout)
/**
* 將spout發(fā)送的數(shù)據(jù)流中哪些字段傳入bolt中
*/
.each(new Fields("field1"), function, new Fields());
return topology.build();
}
public static void main(String[] args) throws Exception {
Config conf = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("MyStorm", conf, buildTopology());
Thread.sleep(1000 * 60);
cluster.shutdown();
}
}
重點說明
topology定義數(shù)據(jù)流時指定function可以讀取哪些字段。