Trident是什么
Trident是Storm上的高層次抽象平夜,它能夠在提供高吞吐量的能力同時(shí)(每秒幾百萬(wàn)消息),也提供了有狀態(tài)的流式處理和低延遲分布式查詢的能力哄陶。它類似Pig這種高級(jí)批處理工具颜说,Trident提供了joins喷楣、aggregations、grouping恩伺、functions以及filters等功能函數(shù)赴背。
Trident主要提供了以下功能:
- 常見的流分析操作,比如join晶渠、aggregation等凰荚。具體就是Trident提供的API操作。
- 一次性處理語(yǔ)義(exactly-once)褒脯。
- 事務(wù)數(shù)據(jù)存儲(chǔ)(transaction)便瑟。
Trident核心數(shù)據(jù)模型是一系列批處理(Batch)的流,也就是說(shuō)雖然Storm Trident處理的是Stream番川,但是處理過(guò)程中Trident將Stream分隔成Batch來(lái)進(jìn)行處理到涂。
所以Stream會(huì)被切分成一個(gè)個(gè)Batch分布到集群中,所有應(yīng)用在Stream上的函數(shù)都會(huì)具體應(yīng)用到每個(gè)節(jié)點(diǎn)的Batch上中爽彤,來(lái)實(shí)現(xiàn)并行計(jì)算养盗。
為什么使用Trident
Storm Topology適合一些無(wú)統(tǒng)計(jì)、不需要Transaction(事務(wù))的應(yīng)用适篙,比如過(guò)濾往核、清洗數(shù)據(jù)等場(chǎng)景。Topology在開啟Ack的情況下嚷节,能夠保證數(shù)據(jù)不丟失但可能重復(fù)聂儒。
而Trident適合需要嚴(yán)格不丟不重復(fù)消息的場(chǎng)景虎锚,比如交易額統(tǒng)計(jì)。Trident通過(guò)事務(wù)來(lái)實(shí)現(xiàn)eactly-once衩婚,保證數(shù)據(jù)不丟不重復(fù)窜护。但同時(shí),使用Trident會(huì)使其性能有所下降非春。
Triden API
Trident API可以分為Spout操作和Bolt操作柱徙,對(duì)于Bolt操作提供常見的流數(shù)據(jù)分析操作。
Bolt Trident提供了五種類型的操作:
- 本地分區(qū)操作(Partition-local operations)奇昙,操作應(yīng)用到本地每個(gè)分區(qū)上护侮,這部分操作不會(huì)產(chǎn)生網(wǎng)絡(luò)傳輸。
- 重分區(qū)操作储耐,對(duì)數(shù)據(jù)流進(jìn)行重新分區(qū)羊初,但是不會(huì)改變數(shù)據(jù)內(nèi)容,這部分操作會(huì)有網(wǎng)絡(luò)傳輸什湘。
- 聚合操作长赞,這部分操作會(huì)有網(wǎng)絡(luò)傳輸。
- 流分組操作闽撤。
- 合并(meger)和連接(join)操作得哆。
Trident Spout
Trident與Storm topology一樣也是使用Spout作為Trident拓?fù)涞臄?shù)據(jù)源。Trident Spout提供了更復(fù)雜的API腹尖,因?yàn)樗瓤梢垣@取事務(wù)數(shù)據(jù)源柳恐,也可以獲取非事務(wù)數(shù)據(jù)源。
對(duì)于非事務(wù)的Spout热幔,可以使用普通的Storm IRichSpout接口:
TridentTopology topology = new TridentTopology();
topology.newStream("myspoutid",new RichSpout());
Trident拓?fù)渲欣稚瑁蠸pout都需要指定一個(gè)唯一的流的標(biāo)識(shí),比如這里的“myspoutid”(整個(gè)集群級(jí)別的唯一標(biāo)識(shí))绎巨。Trident使用該唯一標(biāo)識(shí)存儲(chǔ)Spout元數(shù)據(jù)近尚,比如txId(事務(wù)ID)以及其它Spout相關(guān)的信息。
可以通過(guò)如下配置场勤,來(lái)配置Zookeeper保存Spout的元數(shù)據(jù)戈锻。
transactional.zookeeper.servers:Zookeeper主機(jī)列表
transactional.zookeeper.port:Zookeeper集群端口
transactional.zookeeper.root:在Zookeeper存儲(chǔ)元數(shù)據(jù)的根目錄
下面是Trident Spout一些類型:
- ITridentSpout:最通用API接口,可以支持事務(wù)和不透明事務(wù)語(yǔ)義和媳。一般會(huì)用這個(gè)API分區(qū)的特性格遭,而不是直接使用該接口。
- IBatchSpout:非事務(wù)Spout留瞳,每次發(fā)射一個(gè)Batch的元組拒迅。
- IPartitionedTridentSpout:事務(wù)Spout,從分區(qū)數(shù)據(jù)源讀取數(shù)據(jù),比如Kafka集群璧微。
- IOpaquePartitionedTridentSpout:不透明事務(wù)Spout作箍,從分區(qū)數(shù)據(jù)源中讀取數(shù)據(jù)。
本地分區(qū)操作
本地分區(qū)操作不會(huì)產(chǎn)生網(wǎng)絡(luò)傳輸前硫,并且會(huì)獨(dú)立的應(yīng)用到batch的每個(gè)分區(qū)上胞得。
函數(shù)操作(Functions)
函數(shù)用于接受一個(gè)tuple,并且指定接收這個(gè)tuple的哪些field屹电,它會(huì)發(fā)射(emit)0個(gè)或多個(gè)tuple阶剑。輸出的tuple feild會(huì)被追加到原始tuple的后面,如果不輸出tuple就意味著這個(gè)tuple被過(guò)濾掉了危号。比如下面的實(shí)例:
class MyFunction extends BaseFunction {
/**
* 在每個(gè)元組上面執(zhí)行該邏輯函數(shù)个扰,并且發(fā)射0個(gè)或多個(gè)元組
*
* @param tuple 傳入的元組
* @param collector 用于發(fā)射元組的收集器實(shí)例
*/
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
//tuple.getInteger(0)接收第一個(gè)Field
for(int i=0; i< tuple.getInteger(0); i++) {
collector.emit(new Values(i));
}
}
}
假設(shè)我們有一個(gè)“mystream”流,其中每個(gè)tuple包含以下filed ['a','b','c']葱色,比如有以下元組。
[1,2,3]
[2,1,2]
[10,0,1]
我們將每個(gè)元組都經(jīng)過(guò)以下MyFunction操作:
//將每個(gè)tuple的字段"b"應(yīng)用于MyFunction娘香,并且產(chǎn)生新字段"d"追加到原tuple的字段中
mystream.each(new Fields("b"),new MyFunction(),new Fields('d'))
得到數(shù)據(jù)為:
//[1,2,3]的emit苍狰,其中0和1是新字段“d”
[1,2,3,0]
[1,2,3,1]
//[2,1,2]的emit,其中0是新字段“d”
[2,1,2,0]
//[10,0,1]不滿足需求烘绽,過(guò)濾掉了
過(guò)濾操作(Filters)
接收一個(gè)tuple淋昭,并決定這個(gè)tuple是否應(yīng)該被保留。比如我們有以下Filter操作:
class MyFilter extends BaseFilter {
/**
* 確定是否應(yīng)該從流中過(guò)濾元組
*
* @param tuple 被評(píng)估的元組
* @return 返回"false"則該元組被拋棄安接,返回"true"則該元組被保留
*/
@Override
public boolean isKeep(TridentTuple tuple) {
//每個(gè)tuple中的第一個(gè)Field
return tuple.getInteger(0) > 1;
}
}
同樣以Function中的實(shí)例進(jìn)行操作:
mystream.filter(new MyFilter())翔忽。
輸出數(shù)據(jù):
#[1,2,3]中的第一個(gè)字段不大于1,所以被過(guò)濾掉
[2,1,2]
[10,0,1]
map和flatMap操作
map接收一個(gè)tuple盏檐,將其作用在map函數(shù)上歇式,并且返回經(jīng)map函數(shù)處理過(guò)的tuple字段值。
比如下面的實(shí)例:
class UpperMap implements MapFunction {
/**
* 流中的每個(gè)trident元組調(diào)用
*
* @param input 接受trident tuple
* @return 返回轉(zhuǎn)換之后的值
*/
@Override
public Values execute(TridentTuple input) {
//只返回原tuple中第一個(gè)Filed的大寫字符串(其它filed被丟棄了)
return new Values(input.getString(0).toUpperCase());
}
}
flatMap類似map胡野,但是它會(huì)分兩步執(zhí)行:執(zhí)行flat將所有元素展開材失,然后每個(gè)元素使用map函數(shù)。比如[[1,2],3,4]經(jīng)過(guò)flat操作后得到元素集合為[1,2,3,4]硫豆。比如我們有以下實(shí)例:
class SplitFlatMap implements FlatMapFunction {
/**
* 流中的每個(gè)trident元組調(diào)用
*
* @param input 接收的trident tuple
* @return 一個(gè)可迭代的結(jié)果集
*/
@Override
public Iterable<Values> execute(TridentTuple input) {
List<Values> resultValues = new ArrayList<>();
//獲取一個(gè)Filed并將其以空格作為切割
for(String word : input.getString(0).split(" ")){
resultValues.add(new Values(word));
}
return resultValues;
}
}
我們通過(guò)上面的flatMap和Map就可以得到一個(gè)流的所有大寫詞組流了龙巨。
mystream.flatMap(new SplitFlatMap()).map(new UpperMap())。
通常我們也可以將map或flatMap的輸出結(jié)果命名一個(gè)新字段:
mystream.flatMap(new SplitFlatMap(),new Fields("word"))
peek操作
peek操作一般用來(lái)debug熊响,比如查看上一步的操作結(jié)果旨别。假如我們有以下peek操作。
class PrintPeek implements Consumer {
/**
* 對(duì)于輸入的每個(gè)trident元組應(yīng)用以下操作
*
* @param input 接收的trident 元組
*/
@Override
public void accept(TridentTuple input) {
System.out.println(input.getString(0));
}
}
以下處理操作汗茄,能把轉(zhuǎn)換大寫之后的tuple打印打出來(lái):
mystream.flatMap(new SplitFlatMap()).map(new UpperMap()).peek(new PrintPeek());
min和minBy操作
返回一批(Batch)元組中的每個(gè)分區(qū)的最小值秸弛。
比如一批(Batch)元組有以下三個(gè)partition,它們對(duì)應(yīng)的Field為['device-id','count']。
Partiton 0:
[213,15]
[125,21]
[100,10]
Partition 1:
[123,20]
[215,32]
[183,25]
針對(duì)以上數(shù)據(jù)統(tǒng)計(jì)count最小的device-id:
mystream.minBy(new Fields("count"));
返回結(jié)果:
Partition 0:
[100,10]
Partition 1:
[123,20]
除了以上使用方式胆屿,我們還可以通過(guò)傳入比較器來(lái)使用min和minBy:
public <T> Stream minBy(String inputFieldName, Comparator<T> comparator)
public Stream min(Comparator<TridentTuple> comparator)
max和maxBy操作
max和maxBy操作同min/minBy操作奥喻,只不過(guò)返回最大值。
mystream.maxBy(new Fields("count"));
上面實(shí)例輸出結(jié)果為:
Partition 0:
[125,21]
Partition 1:
[215,32]
max和maxBy也提供了自定義比較器的方法:
public <T> Stream maxBy(String inputFieldName, Comparator<T> comparator)
public Stream max(Comparator<TridentTuple> comparator)
窗口操作(Window)
Trident流能夠處理具有相同窗口的元素非迹,對(duì)它們進(jìn)行聚合操作环鲤,然后將聚合結(jié)果向下發(fā)送。Storm支持兩種窗口操作:翻滾窗口(Tumbing window)和滑動(dòng)窗口(Sliding window)憎兽。
Tumbing window
元組根據(jù)處理時(shí)間或計(jì)數(shù)分組到一個(gè)窗口中冷离,任何元組只屬于其中一個(gè)窗口。
//返回一個(gè)元組流的聚合結(jié)果纯命,它是滾動(dòng)窗口內(nèi)每windowCount個(gè)數(shù)的聚合結(jié)果
public Stream tumblingWindow(int windowCount, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields);
//返回一個(gè)元組流的聚合結(jié)果西剥,這些元組是一個(gè)窗口的聚合結(jié)果,該窗口在windowDuration的持續(xù)時(shí)間內(nèi)滾動(dòng)
public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields);
Sliding window
元組在每個(gè)滑動(dòng)間隔的窗口內(nèi)分組亿汞,一個(gè)元組可能屬于多個(gè)窗口瞭空。
//返回一個(gè)元組流的聚合結(jié)果,它是滑動(dòng)窗口每windowCount個(gè)元組樹的聚合結(jié)果疗我,并在slideCount之后滑動(dòng)窗口
public Stream slidingWindow(int windowCount, int slideCount, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields);
//返回一個(gè)元組流的聚合結(jié)果咆畏,該窗口在slidingInterval持續(xù)滑動(dòng),并在windowDuration處完成一個(gè)窗口
public Stream slidingWindow(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingInterval, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields);
Common window
除了上面提供的滾動(dòng)窗口api和滑動(dòng)窗口api吴裤,Trident還提供了公用窗口api旧找,通過(guò)windowConfig可以支持任意窗口。
public Stream window(WindowConfig windowConfig, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields)
Trident window api需要使用WindowsStoreFactor存儲(chǔ)接收到的元組和聚合值麦牺。目前钮蛛,Trident提供了HBaseWindowsStoreFactor的HBase實(shí)現(xiàn)。
partitionAggregate操作
partitionAggregate對(duì)一批(batch)元組的每個(gè)分區(qū)進(jìn)行聚合剖膳,與前面Function在元組后面追加不同F(xiàn)ield不同魏颓,partitionAggregate會(huì)使用發(fā)射出去的元組替換接收進(jìn)來(lái)的元組。比如以下實(shí)例:
比如有以下數(shù)據(jù)吱晒,對(duì)應(yīng)的Field分別為["a","b"]:
Partition 0:
["a":1]
["b":2]
Partition 1:
["c":2]
["d":2]
使用partitionAggregate進(jìn)行求和:
mystream.partitionAggregate(new Fields("b"),new Sum(),new Fields("sum"))
經(jīng)過(guò)partitionAggregate函數(shù)之后的結(jié)果為:
Partition 0:
["sum":3]
Partition 1:
["sum":4]
Trident API提供了三個(gè)聚合器接口:CombinerAggregator琼开、ReducerAggregator和Aggregator。
CombinerAggregator操作
CombinerAggregator只返回單個(gè)tuple枕荞,并且這個(gè)tuple只包含一個(gè)Field柜候。每個(gè)元組首先都經(jīng)過(guò)init函數(shù)進(jìn)行預(yù)處理,然后在執(zhí)行combine函數(shù)來(lái)計(jì)算接受到的tuple躏精,直到最后一個(gè)tuple到達(dá)渣刷。如果分區(qū)內(nèi)沒(méi)有tuple,則會(huì)通過(guò)zero函數(shù)發(fā)射結(jié)果矗烛。
public interface CombinerAggregator<T> extends Serializable {
T init(TridentTuple tuple);
T combine(T val1, T val2);
T zero();
}
比如以下實(shí)例:
class Count implements CombinerAggregator<Long> {
@Override
public Long init(TridentTuple tuple) {
//計(jì)數(shù)辅柴,每個(gè)tuple代表一個(gè)數(shù)
return 1L;
}
@Override
public Long combine(Long val1, Long val2) {
return val1 + val2;
}
@Override
public Long zero() {
return 0L;
}
}
ReducerAggregator操作
ReducerAggregator通過(guò)init方法提供一個(gè)初始值箩溃,然后每個(gè)輸入的tuple迭代這個(gè)值,最后產(chǎn)生一個(gè)唯一的tuple輸出碌嘀。
public interface ReducerAggregator<T> extends Serializable {
T init();
T reduce(T curr, TridentTuple tuple);
}
比如同樣使用RecuerAggregator來(lái)實(shí)現(xiàn)計(jì)數(shù)器:
class Count implements ReducerAggregator<Long> {
@Override
public Long init() {
return 0L;
}
@Override
public Long reduce(Long curr, TridentTuple tuple) {
return curr + 1;
}
}
Aggregator
執(zhí)行聚合操作最通用的接口就是Aggregator了涣旨,它能夠發(fā)射任意數(shù)量的元組,每個(gè)元組可以包含任意數(shù)量的字段股冗。
public interface Aggregator<T> extends Operation {
T init(Object batchId, TridentCollector collector);
void aggregate(T state, TridentTuple tuple, TridentCollector collector);
void complete(T state, TridentCollector collector);
}
它的執(zhí)行流程是:
- 在處理Batch之前調(diào)用init方法霹陡,它返回一個(gè)聚合的狀態(tài)值,傳遞給aggregate和complete方法止状。
- 為批處理分區(qū)中的每個(gè)tuple調(diào)用aggregate方法烹棉,此方法可以更新狀態(tài)值,也可以發(fā)射元組怯疤。
- 當(dāng)aggregator處理完Batch分區(qū)的所有元組后調(diào)用complete方法浆洗。
使用Aggregator來(lái)實(shí)現(xiàn)計(jì)數(shù)器:
class CountAgg extends BaseAggregator<CountAgg.CountState> {
class CountState{
long count = 0;
}
@Override
public CountState init(Object batchId, TridentCollector collector) {
return new CountState();
}
@Override
public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {
state.count += 1;
}
@Override
public void complete(CountState state, TridentCollector collector) {
collector.emit(new Values(state.count));
}
}
狀態(tài)查詢(stateQuery)和分區(qū)持久化(partitionPersist)
stateQuery用于查詢狀態(tài)源,partitionPersist用于更新狀態(tài)源集峦。具體使用方式可查看:http://storm.apache.org/releases/1.2.2/Trident-state.html
投影(projection)操作
projection操作用于只保留指定的字段伏社,比如元組有字段["a","b","c","d"],通過(guò)以下投影操作塔淤,輸出流只會(huì)包含["c","d"]洛口。
mystream.projection(new Fields("c","d"));
重分區(qū)操作
Repartition操作運(yùn)行一個(gè)函數(shù)來(lái)改變?cè)M在任務(wù)之間的分布,調(diào)整分區(qū)數(shù)也可能會(huì)導(dǎo)致Repartition操作凯沪。重分區(qū)操作會(huì)引發(fā)網(wǎng)絡(luò)傳輸。下面是重分區(qū)的相關(guān)函數(shù):
- shuffle:使用隨機(jī)算法來(lái)均衡tuple到每個(gè)分區(qū)买优。
- broadcast:每個(gè)tuple被廣播到所有分區(qū)上妨马,使用DRPC時(shí)使用這種方法比較多,比如每個(gè)分區(qū)上做stateQuery杀赢。
- global:所有tuple都發(fā)送到一個(gè)分區(qū)上烘跺,這個(gè)分區(qū)用來(lái)處理stream。
- batchGlobal:一個(gè)batch中的所有tuple會(huì)發(fā)送到一個(gè)分區(qū)中脂崔,不同batch的元組會(huì)被發(fā)送到不同分區(qū)上滤淳。
- partition:通過(guò)一個(gè)自定義的分區(qū)函數(shù)來(lái)進(jìn)行分區(qū),這個(gè)自定義函數(shù)需要實(shí)現(xiàn)
org.apache.storm.grouping.CustomStreamGrouping
砌左。
聚合操作
Trident提供了aggregate和persistentAggregate方法脖咐,aggregate運(yùn)行在每個(gè)batch中,而persistentAggregate將聚合所有Batch汇歹,并將結(jié)果保存在一個(gè)狀態(tài)源上屁擅。
我們前面講的aggregate、CombinerAggregator和ReducerAggregator運(yùn)行在patitionAggregation上是本地分區(qū)操作产弹。如果直接作用于流上派歌,則是對(duì)全局進(jìn)行聚合。
在對(duì)全局流進(jìn)行聚合時(shí),Aggregator和ReducerAggregator會(huì)首先重分區(qū)到一個(gè)單分區(qū)胶果,然后在該分區(qū)上執(zhí)行聚合函數(shù)匾嘱。而CombinerAggregator則會(huì)首先聚合每個(gè)分區(qū),然后重分區(qū)到單個(gè)分區(qū)早抠,在網(wǎng)絡(luò)傳輸中完成聚合操作霎烙。所以我們應(yīng)該盡量用CombinerAggregator,因?yàn)樗痈咝А?/p>
mystream.aggregate(new Count(),new Fields("count"));
流分組操作
groupBy操作會(huì)重新分區(qū)流贝或,對(duì)指定字段執(zhí)行partitionBy操作吼过,指定字段相同的元組被劃分到相同的分區(qū)。goupBy操作如下圖:
如果在流分組中運(yùn)行聚合器咪奖,聚合會(huì)在每個(gè)group中運(yùn)行盗忱,而不是對(duì)整個(gè)Batch操作。
合并和連接
Trident可以允許我們將不同流組合在一起羊赵,通過(guò)TridentTopology.merge()方法操作趟佃。
//合并流會(huì)以第一個(gè)流的輸出字段來(lái)命名
topology.mege(stream1,stream2,stream3);
另一種合并流的方式是連接,類似于SQL那樣的連接昧捷,要求輸入是有限的闲昭。所以Trident的join只適用于來(lái)自Spout的每個(gè)小Bath之間。
比如有一個(gè)流包含["key1","val1","val2"]靡挥,另一個(gè)流包含["key2","val1","val2"]序矩,通過(guò)以下連接操作:
//Trident需要join之后的流重新命名,因?yàn)檩斎肓骺赡艽嬖谥貜?fù) 字段跋破。
mystream.join(stream1,new Fields("key1"),stream2,new Fields("key2"),new Fields("key","a","b","c"簸淀,"d"))