Storm高階(一):Trident

1囤热、Trident簡介

  • 在原Storm實時流計算基礎上的高層次抽象,抽象掉了事務處理和狀態(tài)管理的細節(jié)获三,Storm0.7版本的Transactional Topology現(xiàn)在已經(jīng)被Trident完全替代旁蔼,廢棄不用了;
  • 將tuple組成一批批進行離散的事務處理疙教;
  • 在trident中保留了Spout棺聊,但是不再有bolt組件,將之前在bolt中所實現(xiàn)的數(shù)據(jù)處理邏輯抽象成一系列的Operation贞谓,比如函數(shù)功能限佩、過濾和聚合操作
  • 提供數(shù)據(jù)至少被處理一次,甚至有且僅有一次的保障裸弦,事務數(shù)據(jù)持久化祟同,以及一系列公共的流分析處理操作
  • Trident以Batch的形式處理Stream
  • 底層仍然是spout和bolt,就是說執(zhí)行的時候烁兰,storm框架還是會把Trident解析構建成spout和bolt在執(zhí)行耐亏,如下
image.png
image.png

2、Trident事務及實現(xiàn)原理

  • Trident事務有三個層次:
    • 非事務 no-transactional:當失敗沒有重試的情況下沪斟,tuple可能至多一次被處理广辰;當出現(xiàn)在多個批次中都被成功處理的情況下暇矫,可能至少一次被處理
    • 嚴格事務 transactional:tuple只會出現(xiàn)在一個批次中,同一批次內的tuples在該批次失敗重試不會發(fā)生變化择吊,任何tuple都會出現(xiàn)在某個批次中李根,不會被跳過;也就是說數(shù)量流被分成固定的批次几睛;
      非常嚴格房轿,當批次中某個tuple執(zhí)行失敗,批次重試仍失敗的話所森,處理會被掛起
    • 不透明事務 opaque transactional:同一個tuple可能出現(xiàn)在多個批次中囱持,但只會在其中一個批次里面處理成功,即保障僅僅一次被處理成功焕济,相對于嚴格的事務纷妆,不透明事務提供了容錯性,只是保證盡可能將tuple處理成功晴弃,當某個tuple在一個批次中失敗掩幢,可以在另一個批次中重試;也就是exactly-once有且只有一次成功

注: storm-kafka提供這兩種OpaqueTridentKafkaSpout上鞠、TransactionalTridentKafkaSpout事務Spout

  • Trident事務實現(xiàn)原理
    • 將多個Tuples按小批次進行處理
    • 給每個批次分配一個唯一的事務ID际邻,如果該批次失敗重試,這個事務ID不變
    • 狀態(tài)更新按批次順序進行芍阎,后面的批次必須等前面的批次更新完成才能進行更新

3世曾、Trident編碼操作

  • 自定義實現(xiàn)TridentSpout

    • 實現(xiàn)ITridentSpout接口
    • MetaData 批次信息元數(shù)據(jù),存儲在Zookeeper中
    • BatchCoordinator 分配批次
    • Emitter 發(fā)射批次內的tuple到下一個組件
    • 常用的Kafka Spout:KafkaSpout能曾、TransactionalTridentKafkaSpout度硝、OpaqueTridentKafkaSpout
  • Trident操作注意事項

    • 區(qū)分是否需要跨網(wǎng)絡傳輸
    • 區(qū)分是否跨分區(qū)
    • 對于Trident的聚合操作肿轨,要區(qū)分是否本批次聚合還是全局聚合
  • Trident操作類型

    • 分區(qū)本地操作:作用在每個分區(qū)本地寿冕,不用跨網(wǎng)絡傳輸?shù)牟僮鳎鏔ilter椒袍、Function
    • 重分區(qū)操作:將Stream流數(shù)據(jù)重分區(qū)驼唱,但不改變內容(包括跨網(wǎng)絡數(shù)據(jù)傳輸)
    • 聚合操作:需要跨網(wǎng)絡傳輸數(shù)據(jù)
    • 在分組的流stream上的操作
    • Merges 和 Joins
      注:Partition:在Storm中并發(fā)的最小執(zhí)行單位是task;在Trident中partition相當于task的角色驹暑,也是最小執(zhí)行單位
  • Trident并發(fā)度 parallelismHint
    在某個操作調用后玫恳,stream調用 parallelismHint,設置前面這個操作的并發(fā)度优俘,如下
    .each(new Fields("str"), new SplitFunction(),new Fields("word"))
    .parallelismHint(2)//設置2個executor來執(zhí)行splitfunction操作

  • Trident Function

    • 對輸入的Tuple進行某種函數(shù)操作京办,對輸入tuple的一系列Fields進行處理,輸出零到多個Tuple帆焕,輸出的Fields會追加到Trident數(shù)據(jù)流上惭婿,但如果function沒有向后面執(zhí)行emit操作,則會將原來的輸入tuple過濾掉
    • 無需跨網(wǎng)絡傳輸數(shù)據(jù)
    • 實現(xiàn)Function接口,或者繼承BaseFunction抽象類
  • Trident Filter

    • 過濾器财饥,輸入Tuple换吧,執(zhí)行規(guī)則判斷是否保留該Tuple
    • 無需跨網(wǎng)絡傳輸
    • isKeep方法,這里實現(xiàn)是否保留tuple的規(guī)則判斷邏輯
    • 實現(xiàn)Filter接口钥星,或者繼承BaseFilter抽象類
  • 聚合鏈

    • chainedAgg
    • partitionAggregate
    • chainEnd


      image.png
  • partitionAggregate 分區(qū)組合

    • 對各分區(qū)partition(最小執(zhí)行單位沾瓦,一個task)內的一個批次tuple數(shù)據(jù)進行函數(shù)操作,但與之前函數(shù)操作不同的是谦炒,partitionAggregate會替換掉輸入tuple贯莺,而不是將輸出tuple追加到流上
    • 這里用到的函數(shù)操作有三種,
      • CombinerAggregate:Sum宁改,Count
      • ReduceAggregate
      • Aggregator
  • Projection 投影

    • 對輸入tuple截取只需要輸出的的Fields乖篷,即去掉后面不需要的keyvalue


      image.png
  • Trident Repartitioning 重分區(qū)操作

    • Repartitioning Operations類似于Storm中的數(shù)據(jù)流分組
    • Shuffle : 隨機重分區(qū)
    • global:所有tuple進入相同的一個Partition上
    • partitionBy:按字段重分區(qū),保證了字段值相同的tuple進入相同的partition上
    • batchGlobal:相同批次內的tuple進入同一個Partition上透且,不同批次的tuple進入不同的Partition上
    • broadcast:廣播方式重分區(qū)撕蔼,即將每個tuple復制到后面的所有partition上,一般結合drpc使用
    • partition:自定義分區(qū)秽誊,實現(xiàn)接口backtype.storm.grouping.CustomStreamGrouping
  • Trident Aggregate

    • aggregate:單獨對每個批次的數(shù)據(jù)進行聚合
    • persistentAggregate:對數(shù)據(jù)流中處理過的所有tuple進行聚合操作(全局聚合)鲸沮,并將結果存儲在內存或者其他存儲設備上


      image.png
  • Group By 分組

    • 在partition By 基礎上對指定fields字段值相同的tuple進行分組
    • 與partitionBy的區(qū)別:partitionBy只講字段值求HashCode,再與tasks數(shù)取模得到結果锅论,根據(jù)結果重新分配到相應的task里去讼溺,而groupBy,則進一步對指定的Fields字段值相同的tuple進行分組
    • groupBy 結合partitionAggregator最易,進行一個批次內各分區(qū)內的分組統(tǒng)計
    • groupBy 結合persistentAggregator怒坯,進行全局分組統(tǒng)計
  • Trident Status

    • 在進行prisistentAggregate操作時,需要不斷更新結果藻懒,所以需要將中間結果保存在內存剔猿、或者其他存儲設備中,這個過程還需要考慮更新過程中容錯性問題
    • 使用內存存儲嬉荆,如果Storm集群重啟归敬,原來的結果數(shù)據(jù)也丟失,如下


      image.png
    • 使用外部存儲設備鄙早,即使storm集群重啟汪茧,也可以在原來的基礎上進行更新結果

4、DRPC

  • DRPC:Distributed RPC 限番,分布式RPC舱污,目前DRPC已經(jīng)結合Trident一起使用
image.png
  • DRPC本地模式測試

    • LocalDRPC localDRPC = new LocalDRPC();//構造本地DRPC客戶端
    • tridentTopology.newDRPCStream("functionName",localDRPC)
    • LocalCluster cluster = new LocalCluster();//本地模式提交Topology
      cluster.submitTopology("wordcountTrident",new Config(),togology.build());
    • 發(fā)送DRPC請求弥虐,并得到響應結果
      String result = localDRPC.execute("words","hello world");
  • DRPC遠程模式測試

    • 修改conf/storm.yaml配置文件扩灯,添加drpc.server和drpc.port參數(shù)
    • 啟動DRPC進程
      $ nohup bin/storm drpc > /dev/null 2>&1 &
    • 編寫topology别威,并打jar包提交到Storm集群上運行
    • 編寫drpc客戶端發(fā)送DRPC請求,并得到結果
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末驴剔,一起剝皮案震驚了整個濱河市省古,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌丧失,老刑警劉巖豺妓,帶你破解...
    沈念sama閱讀 206,839評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異布讹,居然都是意外死亡琳拭,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,543評論 2 382
  • 文/潘曉璐 我一進店門描验,熙熙樓的掌柜王于貴愁眉苦臉地迎上來白嘁,“玉大人,你說我怎么就攤上這事膘流⌒趺澹” “怎么了?”我有些...
    開封第一講書人閱讀 153,116評論 0 344
  • 文/不壞的土叔 我叫張陵呼股,是天一觀的道長耕魄。 經(jīng)常有香客問我,道長彭谁,這世上最難降的妖魔是什么吸奴? 我笑而不...
    開封第一講書人閱讀 55,371評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮缠局,結果婚禮上则奥,老公的妹妹穿的比我還像新娘。我一直安慰自己狭园,他們只是感情好读处,可當我...
    茶點故事閱讀 64,384評論 5 374
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著妙啃,像睡著了一般档泽。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上揖赴,一...
    開封第一講書人閱讀 49,111評論 1 285
  • 那天,我揣著相機與錄音抑胎,去河邊找鬼燥滑。 笑死,一個胖子當著我的面吹牛阿逃,可吹牛的內容都是我干的铭拧。 我是一名探鬼主播赃蛛,決...
    沈念sama閱讀 38,416評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼搀菩!你這毒婦竟也來了呕臂?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 37,053評論 0 259
  • 序言:老撾萬榮一對情侶失蹤肪跋,失蹤者是張志新(化名)和其女友劉穎歧蒋,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體州既,經(jīng)...
    沈念sama閱讀 43,558評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡谜洽,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,007評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了吴叶。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片阐虚。...
    茶點故事閱讀 38,117評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖蚌卤,靈堂內的尸體忽然破棺而出实束,到底是詐尸還是另有隱情,我是刑警寧澤逊彭,帶...
    沈念sama閱讀 33,756評論 4 324
  • 正文 年R本政府宣布磕洪,位于F島的核電站,受9級特大地震影響诫龙,放射性物質發(fā)生泄漏析显。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,324評論 3 307
  • 文/蒙蒙 一签赃、第九天 我趴在偏房一處隱蔽的房頂上張望谷异。 院中可真熱鬧,春花似錦锦聊、人聲如沸歹嘹。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,315評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽尺上。三九已至,卻和暖如春圆到,著一層夾襖步出監(jiān)牢的瞬間怎抛,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,539評論 1 262
  • 我被黑心中介騙來泰國打工芽淡, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留马绝,地道東北人。 一個月前我還...
    沈念sama閱讀 45,578評論 2 355
  • 正文 我出身青樓挣菲,卻偏偏與公主長得像富稻,于是被迫代替她去往敵國和親掷邦。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 42,877評論 2 345

推薦閱讀更多精彩內容

  • 本文內容部分來自Trident Tutorial椭赋。 Trident是基于Storm的實時計算模型的高級抽象抚岗。它可以...
    看山遠兮閱讀 444評論 0 5
  • 這是一個JStorm使用教程,不包含環(huán)境搭建教程哪怔,直接在公司現(xiàn)有集群上跑任務宣蔚,關于JStorm集群環(huán)境搭建,后續(xù)研...
    Coselding閱讀 6,299評論 1 9
  • Storm入門系列之三:storm-trident 簡介 引 最近在用 Trident 做各個 url 的訪問統(tǒng)計...
    zhaif閱讀 1,719評論 0 5
  • 一. wordCount Topology開發(fā): 1.spout數(shù)據(jù)收集器(SentenceSpout類): 有...
    奉先閱讀 1,179評論 0 0
  • 來北京15年蔓涧。 她拉了拉裹在脖子上的圍巾件已,11月,北京的深秋元暴,秋高氣爽篷扩,天格外遠,樹干枝枝叉叉如同伸懶...
    紫文閱讀 294評論 1 1