1囤热、Trident簡介
- 在原Storm實時流計算基礎上的高層次抽象,抽象掉了事務處理和狀態(tài)管理的細節(jié)获三,Storm0.7版本的Transactional Topology現(xiàn)在已經(jīng)被Trident完全替代旁蔼,廢棄不用了;
- 將tuple組成一批批進行離散的事務處理疙教;
- 在trident中保留了Spout棺聊,但是不再有bolt組件,將之前在bolt中所實現(xiàn)的數(shù)據(jù)處理邏輯抽象成一系列的Operation贞谓,比如函數(shù)功能限佩、過濾和聚合操作
- 提供數(shù)據(jù)至少被處理一次,甚至有且僅有一次的保障裸弦,事務數(shù)據(jù)持久化祟同,以及一系列公共的流分析處理操作
- Trident以Batch的形式處理Stream
- 底層仍然是spout和bolt,就是說執(zhí)行的時候烁兰,storm框架還是會把Trident解析構建成spout和bolt在執(zhí)行耐亏,如下
2、Trident事務及實現(xiàn)原理
- Trident事務有三個層次:
- 非事務 no-transactional:當失敗沒有重試的情況下沪斟,tuple可能至多一次被處理广辰;當出現(xiàn)在多個批次中都被成功處理的情況下暇矫,可能至少一次被處理
- 嚴格事務 transactional:tuple只會出現(xiàn)在一個批次中,同一批次內的tuples在該批次失敗重試不會發(fā)生變化择吊,任何tuple都會出現(xiàn)在某個批次中李根,不會被跳過;也就是說數(shù)量流被分成固定的批次几睛;
非常嚴格房轿,當批次中某個tuple執(zhí)行失敗,批次重試仍失敗的話所森,處理會被掛起 - 不透明事務 opaque transactional:同一個tuple可能出現(xiàn)在多個批次中囱持,但只會在其中一個批次里面處理成功,即保障僅僅一次被處理成功焕济,相對于嚴格的事務纷妆,不透明事務提供了容錯性,只是保證盡可能將tuple處理成功晴弃,當某個tuple在一個批次中失敗掩幢,可以在另一個批次中重試;也就是exactly-once有且只有一次成功
注: storm-kafka提供這兩種OpaqueTridentKafkaSpout上鞠、TransactionalTridentKafkaSpout事務Spout
- Trident事務實現(xiàn)原理
- 將多個Tuples按小批次進行處理
- 給每個批次分配一個唯一的事務ID际邻,如果該批次失敗重試,這個事務ID不變
- 狀態(tài)更新按批次順序進行芍阎,后面的批次必須等前面的批次更新完成才能進行更新
3世曾、Trident編碼操作
-
自定義實現(xiàn)TridentSpout
- 實現(xiàn)ITridentSpout接口
- MetaData 批次信息元數(shù)據(jù),存儲在Zookeeper中
- BatchCoordinator 分配批次
- Emitter 發(fā)射批次內的tuple到下一個組件
- 常用的Kafka Spout:KafkaSpout能曾、TransactionalTridentKafkaSpout度硝、OpaqueTridentKafkaSpout
-
Trident操作注意事項
- 區(qū)分是否需要跨網(wǎng)絡傳輸
- 區(qū)分是否跨分區(qū)
- 對于Trident的聚合操作肿轨,要區(qū)分是否本批次聚合還是全局聚合
-
Trident操作類型
- 分區(qū)本地操作:作用在每個分區(qū)本地寿冕,不用跨網(wǎng)絡傳輸?shù)牟僮鳎鏔ilter椒袍、Function
- 重分區(qū)操作:將Stream流數(shù)據(jù)重分區(qū)驼唱,但不改變內容(包括跨網(wǎng)絡數(shù)據(jù)傳輸)
- 聚合操作:需要跨網(wǎng)絡傳輸數(shù)據(jù)
- 在分組的流stream上的操作
- Merges 和 Joins
注:Partition:在Storm中并發(fā)的最小執(zhí)行單位是task;在Trident中partition相當于task的角色驹暑,也是最小執(zhí)行單位
Trident并發(fā)度 parallelismHint
在某個操作調用后玫恳,stream調用 parallelismHint,設置前面這個操作的并發(fā)度优俘,如下
.each(new Fields("str"), new SplitFunction(),new Fields("word"))
.parallelismHint(2)//設置2個executor來執(zhí)行splitfunction操作
-
Trident Function
- 對輸入的Tuple進行某種函數(shù)操作京办,對輸入tuple的一系列Fields進行處理,輸出零到多個Tuple帆焕,輸出的Fields會追加到Trident數(shù)據(jù)流上惭婿,但如果function沒有向后面執(zhí)行emit操作,則會將原來的輸入tuple過濾掉
- 無需跨網(wǎng)絡傳輸數(shù)據(jù)
- 實現(xiàn)Function接口,或者繼承BaseFunction抽象類
-
Trident Filter
- 過濾器财饥,輸入Tuple换吧,執(zhí)行規(guī)則判斷是否保留該Tuple
- 無需跨網(wǎng)絡傳輸
- isKeep方法,這里實現(xiàn)是否保留tuple的規(guī)則判斷邏輯
- 實現(xiàn)Filter接口钥星,或者繼承BaseFilter抽象類
-
聚合鏈
- chainedAgg
- partitionAggregate
-
chainEnd
-
partitionAggregate 分區(qū)組合
- 對各分區(qū)partition(最小執(zhí)行單位沾瓦,一個task)內的一個批次tuple數(shù)據(jù)進行函數(shù)操作,但與之前函數(shù)操作不同的是谦炒,partitionAggregate會替換掉輸入tuple贯莺,而不是將輸出tuple追加到流上
- 這里用到的函數(shù)操作有三種,
- CombinerAggregate:Sum宁改,Count
- ReduceAggregate
- Aggregator
-
Projection 投影
-
對輸入tuple截取只需要輸出的的Fields乖篷,即去掉后面不需要的keyvalue
-
-
Trident Repartitioning 重分區(qū)操作
- Repartitioning Operations類似于Storm中的數(shù)據(jù)流分組
- Shuffle : 隨機重分區(qū)
- global:所有tuple進入相同的一個Partition上
- partitionBy:按字段重分區(qū),保證了字段值相同的tuple進入相同的partition上
- batchGlobal:相同批次內的tuple進入同一個Partition上透且,不同批次的tuple進入不同的Partition上
- broadcast:廣播方式重分區(qū)撕蔼,即將每個tuple復制到后面的所有partition上,一般結合drpc使用
- partition:自定義分區(qū)秽誊,實現(xiàn)接口backtype.storm.grouping.CustomStreamGrouping
-
Trident Aggregate
- aggregate:單獨對每個批次的數(shù)據(jù)進行聚合
-
persistentAggregate:對數(shù)據(jù)流中處理過的所有tuple進行聚合操作(全局聚合)鲸沮,并將結果存儲在內存或者其他存儲設備上
-
Group By 分組
- 在partition By 基礎上對指定fields字段值相同的tuple進行分組
- 與partitionBy的區(qū)別:partitionBy只講字段值求HashCode,再與tasks數(shù)取模得到結果锅论,根據(jù)結果重新分配到相應的task里去讼溺,而groupBy,則進一步對指定的Fields字段值相同的tuple進行分組
- groupBy 結合partitionAggregator最易,進行一個批次內各分區(qū)內的分組統(tǒng)計
- groupBy 結合persistentAggregator怒坯,進行全局分組統(tǒng)計
-
Trident Status
- 在進行prisistentAggregate操作時,需要不斷更新結果藻懒,所以需要將中間結果保存在內存剔猿、或者其他存儲設備中,這個過程還需要考慮更新過程中容錯性問題
-
使用內存存儲嬉荆,如果Storm集群重啟归敬,原來的結果數(shù)據(jù)也丟失,如下
- 使用外部存儲設備鄙早,即使storm集群重啟汪茧,也可以在原來的基礎上進行更新結果
4、DRPC
- DRPC:Distributed RPC 限番,分布式RPC舱污,目前DRPC已經(jīng)結合Trident一起使用
-
DRPC本地模式測試
- LocalDRPC localDRPC = new LocalDRPC();//構造本地DRPC客戶端
- tridentTopology.newDRPCStream("functionName",localDRPC)
- LocalCluster cluster = new LocalCluster();//本地模式提交Topology
cluster.submitTopology("wordcountTrident",new Config(),togology.build()); - 發(fā)送DRPC請求弥虐,并得到響應結果
String result = localDRPC.execute("words","hello world");
-
DRPC遠程模式測試
- 修改conf/storm.yaml配置文件扩灯,添加drpc.server和drpc.port參數(shù)
- 啟動DRPC進程
$ nohup bin/storm drpc > /dev/null 2>&1 & - 編寫topology别威,并打jar包提交到Storm集群上運行
- 編寫drpc客戶端發(fā)送DRPC請求,并得到結果