<h2>簡介:</h2>
Trident 是 Storm 的一種高度抽象的實時計算模型,它可以將高吞吐量(每秒百萬級)數(shù)據(jù)輸入姆打、有狀態(tài)的流式處理與低延時的分布式查詢無縫結(jié)合起來。如果你了解 Pig 或者 Cascading 這樣的高級批處理工具肠虽,你就會發(fā)現(xiàn)他們和 Trident 的概念非常相似幔戏。Trident 同樣有聯(lián)結(jié)(join)、聚合(aggregation)税课、分組(grouping)闲延、函數(shù)(function)以及過濾器(filter)這些功能痊剖。Trident 為數(shù)據(jù)庫或者其他持久化存儲上層的狀態(tài)化、增量式處理提供了基礎(chǔ)原語垒玲。由于 Trident 有著一致的陆馁、恰好一次的語義,因此推斷出 Trident 拓?fù)涞臓顟B(tài)也是一件很容易的事合愈。
<b>Trident 流程圖:</b>
就按照流程圖來講吧:
<h2>Trident Spouts</h2>
查看官方demo中代碼:
<pre>
TridentTopology topology = new TridentTopology();
topology.newStream("myspoutid", new MyRichSpout());
</pre>
查看newStream()方法源代碼:
<pre>
//上節(jié)中BaseRichSpout類就是實現(xiàn)了IRichSpout
public Stream newStream(String txId, IRichSpout spout) {
return newStream(txId, new RichSpoutBatchExecutor(spout));
}
//非事務(wù)型 spout叮贩,每次會輸出一個 batch 的 tuple.接下來的demo會用到
public Stream newStream(String txId, IBatchSpout spout) {
Node n = new SpoutNode(getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH);
return addNode(n);
}
//這是最常用的 API,支持事務(wù)型和模糊事務(wù)型的語義實現(xiàn)佛析。不過一般會根據(jù)需要使用它的某個已有的實現(xiàn)妇汗,而不是直接實現(xiàn)該接口。
public Stream newStream(String txId, ITridentSpout spout) {
Node n = new SpoutNode(getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH);
return addNode(n);
}
//可以從分布式數(shù)據(jù)源(比如一個集群或者 Kafka 服務(wù)器)讀取數(shù)據(jù)的事務(wù)型 spout说莫。
public Stream newStream(String txId, IPartitionedTridentSpout spout) {
return newStream(txId, new PartitionedTridentSpoutExecutor(spout));
}
//可以從分布式數(shù)據(jù)源讀取數(shù)據(jù)的模糊事務(wù)型 spout杨箭。
public Stream newStream(String txId, IOpaquePartitionedTridentSpout spout) {
return newStream(txId, new OpaquePartitionedTridentSpoutExecutor(spout));
}
</pre>
<h2>Trident Bolts</h2>
主要有 5 類操作:
<ol>
<li>針對每個小分區(qū)(partition)的本地操作,這類操作不會產(chǎn)生網(wǎng)絡(luò)數(shù)據(jù)傳輸</li>
<li>針對一個數(shù)據(jù)流的重新分區(qū)操作储狭,這類操作不會改變數(shù)據(jù)流中的內(nèi)容互婿,但是會產(chǎn)生一定的網(wǎng)絡(luò)傳輸</li>
<li>通過網(wǎng)絡(luò)數(shù)據(jù)傳輸進行的聚合操作</li>
<li>針對數(shù)據(jù)流的分組操作</li>
<li>融合與聯(lián)結(jié)操作</li>
</ul>
<h4>本地分區(qū)操作</h4>
<b>函數(shù):</b>
函數(shù)負(fù)責(zé)接收一個輸入域的集合并選擇輸出或者不輸出 tuple。輸出 tuple 的域會被添加到原始數(shù)據(jù)流的輸入域中辽狈。如果一個函數(shù)不輸出 tuple慈参,那么原始的輸入 tuple 就會被直接過濾掉。否則刮萌,每個輸出 tuple 都會復(fù)制一份輸入 tuple 驮配。假設(shè)你有下面這樣的函數(shù):
<pre>
public class Split extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
for(String word: tuple.getString(0).split(" ")) {
if(word.length() > 0) {
collector.emit(new Values(word));
}
}
}
}
</pre>
上圖是源碼中提供的一個Split函數(shù)用于按空格進行分割,分割完成以后繼續(xù)延續(xù)原來的輸出着茸。
<b>過濾器</b>
過濾器負(fù)責(zé)判斷輸入的 tuple 是否需要保留壮锻,直接改變stream 的內(nèi)容:
<pre>
public class FilterNull extends BaseFilter {
@Override
public boolean isKeep(TridentTuple tuple) {
for(Object o: tuple) {
if(o==null) return false;
}
return true;
}
}
</pre>
上圖就是一個判斷tuple是否為空的filter,如果為false的則不繼續(xù)留在stream流中
<b>partitionAggregate</b>
會在一批 tuple 的每個分區(qū)上執(zhí)行一個指定的功能操作涮阔。與上面的函數(shù)不同猜绣,由 partitionAggregate
發(fā)送出的 tuple 會將輸入 tuple 的域替換。以下面這段代碼為例:
官方給出的代碼:
<pre>mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))</pre>
假設(shè)stream 中tuple的內(nèi)容如下:
<pre>
Partition 0:
["a", 1]
["b", 2]
Partition 1:
["a", 3]
["c", 8]
Partition 2:
["e", 1]
["d", 9]
["d", 10]
</pre>
執(zhí)行上面的分區(qū)聚合的結(jié)果為:
<pre>
Partition 0:
[3]
Partition 1:
[11]
Partition 2:
[20]
</pre>
解釋一下上面的函數(shù)功能為:分區(qū)聚合內(nèi)容對分區(qū)中的內(nèi)容求和即sum()然后輸出key為sum的bolt流敬特。
查看sum函數(shù)源代碼:
<pre>
public class Sum implements CombinerAggregator<Number> {
@Override
public Number init(TridentTuple tuple) {
return (Number) tuple.getValue(0);
}
@Override
public Number combine(Number val1, Number val2) {
return Numbers.add(val1, val2);
}
@Override
public Number zero() {
return 0;
}
}
CombinerAggregator類只提供了sum和count求和函數(shù)
</pre>
Storm 有三個用于定義聚合器的接口:CombinerAggregator掰邢、ReducerAggregator
、 Aggregator伟阔。ReducerAggregator
<b>融合(Merge)與聯(lián)結(jié)(join)</b>
Trident API 的最后一部分是聯(lián)結(jié)不同的數(shù)據(jù)流的操作辣之。聯(lián)結(jié)數(shù)據(jù)流最簡單的方式就是將所有的數(shù)據(jù)流融合到一個流中。你可以使用 TridentTopology 的 merge 方法實現(xiàn)該操作皱炉,比如這樣:
<pre>
topology.merge(stream1, stream2, stream3);
</pre>
Trident 會將融合后的新數(shù)據(jù)流的域命名為為第一個數(shù)據(jù)流的輸出域怀估。
聯(lián)結(jié)數(shù)據(jù)流的另外一種方法是使用 join。像 SQL 那樣的標(biāo)準(zhǔn) join 操作只能用于有限的輸入數(shù)據(jù)集娃承,對于無限的數(shù)據(jù)集就沒有用武之地了奏夫。Trident 中的 join 只會應(yīng)用于每個從 spout 中輸出的小 batch怕篷。
下面是兩個流的 join 操作的示例历筝,其中一個流含有 [“key”, “val1″, “val2″] 域酗昼,另外一個流含有 [“x”, “val1″] 域:
<pre>
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));
</pre>
上面的例子會使用 “key” 和 “x” 作為 join 的域來聯(lián)結(jié) stream1 和 stream2。Trident 要求先定義好新流的輸出域梳猪,因為輸入流的域可能會覆蓋新流的域名麻削。從 join 中輸出的 tuple 中會包含:
join 域的列表。在這個例子里春弥,輸出的 “key” 域與 stream1 的 “key” 域以及 stream2 的 “x” 域?qū)?yīng)呛哟。
來自所有流的非 join 域的列表。這個列表是按照傳入 join 方法的流的順序排列的匿沛。在這個例子里扫责,“ a” 和 “b” 域與 stream1 的 “val1” 和 “val2” 域?qū)?yīng);而 “c” 域則與 stream2 的 “val1” 域相對應(yīng)逃呼。
在對不同的 spout 發(fā)送出的流進行 join 時鳖孤,這些 spout 上會按照他們發(fā)送 batch 的方式進行同步處理。也就是說抡笼,一個處理中的 batch 中含有每個 spout 發(fā)送出的 tuple苏揣。
最后的結(jié)果查詢你可以使用 partitionPersist 和 stateQuery 來實現(xiàn)這個過程。過去一段時間內(nèi)的 tuple 會以 join 域為關(guān)鍵字被保存到一個 state 源中推姻。然后就可以使用 stateQuery 查詢 join 域來實現(xiàn)這個“聯(lián)結(jié)”(join)的過程平匈。
我想還是上個demo吧,要不然都要睡過去啦:
<pre>
public class Print extends BaseFilter {
//分區(qū)索引號從0開始標(biāo)示
private int partitionIndex;
//總的分區(qū)數(shù)
private int numPartitions;
@Override
public void prepare(Map conf, TridentOperationContext context) {
//獲取當(dāng)前分區(qū)以及總的分區(qū)數(shù)
this.partitionIndex = context.getPartitionIndex();
this.numPartitions = context.numPartitions();
}
//過濾條件藏古,其實這邊就是用來打印輸出增炭,對最后的tuple元數(shù)據(jù)沒有任何改變
@Override
public boolean isKeep(TridentTuple tuple) { System.err.println(String.format("Partition idx: %s out of %s partitions got %s/%s", partitionIndex, numPartitions, tuple.get(0).toString(),tuple.get(1).toString()));
return true;
}
//構(gòu)造StormTopology
public static StormTopology buildTopology(LocalDRPC drpc) {
//構(gòu)造一個固定的batch數(shù)的spout,這個類代碼上面有大概分析過
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 5,
new Values("the cow jumped over the moon"),
new Values("the man went to the store and bought some candy"),
new Values("four score and seven years ago"), new Values("how many apples can you eat"),
new Values("to be or not to be the person"));
//循環(huán)發(fā)送數(shù)據(jù)
spout.setCycle(true);
TridentTopology topology = new TridentTopology();
//TridentState對象最終代表了所有的單詞的數(shù)量拧晕。我們會使用這個TridentState對象來實現(xiàn)在計算過程中的進行分布式查詢弟跑。
TridentState wordCounts = topology.newStream("testSpout", spout)
//對每個tuple內(nèi)容用空格來分隔,然后通過相同的字符串來分組
.each(new Fields("sentence"), new Split(), new Fields("word")).groupBy(new Fields("word"))
//persistentAggregate函數(shù)使用三個并行度(三個線程)對源源不斷發(fā)送過來數(shù)據(jù)流做一個總的聚合防症,對出現(xiàn)的次數(shù)累加孟辑,然后加結(jié)果緩存在當(dāng)前節(jié)點的內(nèi)存中
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")).parallelismHint(3);
topology.newDRPCStream("print", drpc)
.stateQuery(wordCounts, new TupleCollectionGet(), new Fields("word", "count"))
.each(new Fields("word", "count"), new Print());
return topology.build();
}
public static void main(String[] args) throws InterruptedException, AlreadyAliveException, InvalidTopologyException {
Config conf = new Config();
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster(); cluster.submitTopology("wordCounter", conf, buildTopology(drpc));
for (int i = 0; i < 10; i++) {
drpc.execute("print", "");
Thread.sleep(1000);
}
cluster.deactivate("wordCounter"); cluster.killTopology("wordCounter");
}
}
</pre>
在main方法控制臺輸出為:
<pre>
Partition idx: 1 out of 3 partitions got the/20
Partition idx: 1 out of 3 partitions got or/4
Partition idx: 1 out of 3 partitions got score/4
Partition idx: 1 out of 3 partitions got moon/4
Partition idx: 1 out of 3 partitions got four/4
Partition idx: 1 out of 3 partitions got over/4
Partition idx: 1 out of 3 partitions got bought/4
Partition idx: 1 out of 3 partitions got can/4
Partition idx: 0 out of 3 partitions got went/8
Partition idx: 0 out of 3 partitions got candy/8
Partition idx: 0 out of 3 partitions got seven/8
Partition idx: 0 out of 3 partitions got jumped/8
Partition idx: 0 out of 3 partitions got ago/8
Partition idx: 0 out of 3 partitions got store/8
Partition idx: 0 out of 3 partitions got cow/8
Partition idx: 0 out of 3 partitions got many/8
Partition idx: 0 out of 3 partitions got years/8
Partition idx: 0 out of 3 partitions got eat/8
Partition idx: 0 out of 3 partitions got person/8
Partition idx: 0 out of 3 partitions got to/24
Partition idx: 0 out of 3 partitions got apples/8
Partition idx: 2 out of 3 partitions got be/24
Partition idx: 2 out of 3 partitions got not/12
Partition idx: 2 out of 3 partitions got some/12
Partition idx: 2 out of 3 partitions got and/24
Partition idx: 2 out of 3 partitions got man/12
Partition idx: 2 out of 3 partitions got how/12
Partition idx: 2 out of 3 partitions got you/12
Partition idx: 2 out of 3 partitions got be/32
Partition idx: 2 out of 3 partitions got not/16
Partition idx: 2 out of 3 partitions got some/16
Partition idx: 2 out of 3 partitions got and/32
Partition idx: 2 out of 3 partitions got man/16
Partition idx: 2 out of 3 partitions got how/16
Partition idx: 2 out of 3 partitions got you/16
Partition idx: 2 out of 3 partitions got be/38
Partition idx: 2 out of 3 partitions got not/19
Partition idx: 2 out of 3 partitions got some/19
Partition idx: 2 out of 3 partitions got and/38
Partition idx: 2 out of 3 partitions got man/19
Partition idx: 2 out of 3 partitions got how/19
Partition idx: 2 out of 3 partitions got you/19
Partition idx: 0 out of 3 partitions got went/23
Partition idx: 0 out of 3 partitions got candy/23
Partition idx: 0 out of 3 partitions got seven/23
Partition idx: 0 out of 3 partitions got jumped/23
Partition idx: 0 out of 3 partitions got ago/23
Partition idx: 0 out of 3 partitions got store/23
Partition idx: 0 out of 3 partitions got cow/23
Partition idx: 0 out of 3 partitions got many/23
Partition idx: 0 out of 3 partitions got years/23
Partition idx: 0 out of 3 partitions got eat/23
Partition idx: 0 out of 3 partitions got person/23
Partition idx: 0 out of 3 partitions got to/69
Partition idx: 0 out of 3 partitions got apples/23
Partition idx: 0 out of 3 partitions got went/25
Partition idx: 0 out of 3 partitions got candy/25
Partition idx: 0 out of 3 partitions got seven/25
Partition idx: 0 out of 3 partitions got jumped/25
Partition idx: 0 out of 3 partitions got ago/25
Partition idx: 0 out of 3 partitions got store/25
Partition idx: 0 out of 3 partitions got cow/25
Partition idx: 0 out of 3 partitions got many/25
Partition idx: 0 out of 3 partitions got years/25
Partition idx: 0 out of 3 partitions got eat/25
Partition idx: 0 out of 3 partitions got person/25
Partition idx: 0 out of 3 partitions got to/75
Partition idx: 0 out of 3 partitions got apples/25
Partition idx: 2 out of 3 partitions got be/56
Partition idx: 2 out of 3 partitions got not/28
Partition idx: 2 out of 3 partitions got some/28
Partition idx: 2 out of 3 partitions got and/56
Partition idx: 2 out of 3 partitions got man/28
Partition idx: 2 out of 3 partitions got how/28
Partition idx: 2 out of 3 partitions got you/28
</pre>
不過對于TridentState 中的數(shù)據(jù)在分布式存儲的環(huán)境如何存取的?
<pre>
DRPCClient client = new DRPCClient("drpc.server.location", 3772);
System.out.println(client.execute("print", "cat dog the man");
</pre>
整合在我們自己的代碼中就需要這么使用了:
<pre>
topology.newDRPCStream("print", drpc)
.stateQuery(wordCounts, new TupleCollectionGet(), new Fields("word", "count"))
.each(new Fields("word", "count"), new Print());
</pre>
查看newDRPCStream源碼:
<pre>
public Stream newDRPCStream(String function, ILocalDRPC server) {
DRPCSpout spout;
if(server==null) {
spout = new DRPCSpout(function);
} else {
spout = new DRPCSpout(function, server);
}
return newDRPCStream(spout);
}
發(fā)現(xiàn)是一個比較簡單的spout
</pre>
最后在main方法中執(zhí)行execute蔫敲,就這么跑起來了饲嗽。