Storm入門系列之三:storm-trident 簡介

Storm入門系列之三:storm-trident 簡介

最近在用 Trident 做各個 url 的訪問統(tǒng)計 (實時統(tǒng)計各個 url 各個狀態(tài)碼的數(shù)量),順帶補上這個空了好久的坑蔬将!

Trident 是在 storm-core 之上的一個高級抽象霞怀,其可以保證 message 保證被處理且只被處理一次的語義莉给,即 "exactly once"

本文將簡要介紹 Trident 的一些核心概念以及使用方法徐矩。

在 storm-core 中叁幢,有兩個核心概念:spout 和 bolt,相似的 Trident 也包含 spout鳞骤,其作用和在 storm-core 中相同黍判,是整個 topology 的數(shù)據(jù)源。Trident 中沒有bolt美旧,但有一個 operations 的概念贬墩,其作用和 bolt 相似,主要是實現(xiàn)一些對 message 的處理录肯,下面將逐一介紹吊说。

Spout

和 storm-core 類似优炬,Trident 也以 spout 作為整個 stream 的源頭厅贪。

Zookeeper

在 topology 中每個 spout 都會擁有一個唯一標識,且在整個集群中都唯一葵硕,這個標識是 spout 在 zookeeper 中記錄的元數(shù)據(jù)的唯一標識贯吓。

默認的 Spout 會使用 storm 集群的 zookeeper 集群悄谐,當然也可以通過以下配置使用單獨的集群:

transactional.zookeeper.serverstransactional.zookeeper.porttransactional.zookeeper.root

Pipline

在 Trident 中,Spout ?emit message 不再是一條一條的们陆,而是以一個 batch 的形式一次 emit 一組 messages情屹。默認的,storm 在同一只時間只會處理一個 batch垃你,直到其成功或失敗惜颇,通過:

topology.max.spout.pending

這個配置可以配置其并發(fā)處理 batch 的個數(shù)葛作,但是 Trident 仍然會按順序更新 batch 的 state 以保證『exactly once』語義(關(guān)于 state 的實現(xiàn)原理會單獨詳細介紹,這里不再詳細描述)望伦。

Spout 類型

Spout 根據(jù)事務(wù)性可分為三類:
non-transactional spout (非事務(wù)性)transactional spout (透明事務(wù)性)opaque transactional spout (不透明事務(wù)性)

其一次對應(yīng)的 java 接口為:
IBatchSpout屯伞、IPartitionedTridentSpoutIOpaquePartitionedTridentSpout

另外珠移,還有一個通用的非事務(wù)性接口 IRichSpout。

Kafka-Spout

一個比較通用的場景是從 kafka 讀取數(shù)據(jù)钧惧,然后 storm 做實時處理浓瞪。Storm-Kafka 提供了很簡單的接口實現(xiàn) kafka 數(shù)據(jù)的接入和管理,eg:

TridentTopology topology = new TridentTopology();
BrokerHosts zk = new ZkHosts("localhost"); // 使用zookeeper 鏈接
kafkaTridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test-topic"); // 配置一些Kafka的參數(shù)
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);

在第一次連接 kafka 消費時涂乌,可以使用以下兩個配置英岭,選擇從topic 最早的 offset 或 最近的 offset 開始消費,storm 會把消費的 state 信息存在 zookeeper 中( / + ‘spout_id’ 目錄下)罚勾,所以后續(xù)的消費會直接從 zookeeper 中讀取消費記錄繼續(xù)消費漾唉,也就是說以下配置只會在第一次消費時生效堰塌,當然如果手動在 zookeeper 中刪除消費記錄场刑,還是會生效的。

kafka.api.OffsetRequest.EarliestTime()
kafka.api.OffsetRequest.LatestTime()

Operations

Trident 包含5中常用的 operation:

  • Partition-local operations
  • Repartitioning operations
  • Aggregation operations
  • Operations on grouped streams
  • Merges and joins

接下來铐懊,依次了解各個 operation瞎疼。

Partition-local operations

這個 operation 包含的操作都是本地的,即不會發(fā)生網(wǎng)絡(luò)傳輸茅茂,這類操作都是獨立的對每個 batch 生效的太抓。這一類是很通用的操作,其包含很多種類碴倾,常用的為以下 5 類:

1. Functions

Functions 是最通用的一類操作,這類操作對于每個待處理的 tuple异雁,可以 emit 任意個結(jié)果矫户,但是其不能刪除或者變更 tuple 中已有的 fields,只能新增 fields柑蛇。

比如收到 tuple :[1, 2]驱闷,根據(jù)自己編寫的 function 的邏輯,可以不 emit 任何結(jié)果直接 pass盆耽,或者 emit 1個結(jié)果 [1, 2 ,3]扼菠。也可以emit 多個結(jié)果 [1, 2, 3], [1, 2, 4], [1, 2, 5]。

2. Filters

Filter 與 Functions 不同析恢,它是用來做過濾的秧饮,即處理的每個 tuple 只有兩個選擇:允許這個 tuple 繼續(xù)向下傳輸或者不傳輸任何結(jié)果盗尸。比如收到 tuple :[1, 2],則此 filter 能 emit 的數(shù)據(jù)只有 [1, 2], 或者不 emit 任何結(jié)果鞍时。

3. Map and FlatMap

Map 會處理接收到的 tuple扣蜻,并 emit 一個新的 value,其是 1-1 的處理方式蒸苇,即接收一個且 emit 一個吮旅。

FlatMap 和 map 類似味咳,唯一的區(qū)別在于它會提交一組 values檬嘀,即是 1-N 的處理方式鸳兽,會 emit 一個 List<Values>。

4. min and minBy 和 max and maxBy

前面提到全陨,trident 是以一個小 batch 為單位處理處理 stream 中的數(shù)據(jù)的衷掷,這 4個類型的操作就是針對每次處理的這個 batch 計算過最小/最大值。

5. Windowing

Trident 也提供了時間窗口的處理方式雨涛,和 storm-core 非常類似懦胞,通過 windowing 可以對同一時間窗口內(nèi)的 batchs 進行計算、處理蚯根。關(guān)于windowing 這里不再單獨介紹醇份,后面會單獨寫一篇文章介紹僚纷。

6. partitionAggregate
這類運算同樣是針對每個 batch 而言的拗盒,它可以重新組合每個 batch 中的 tuples, 并 emit 任意結(jié)果痊臭。Trident提供了3類partitionAggregate:
CombinerAggregator:只會 emit 一個 tuple登夫,且這個 tuple 只有一個 field
ReducerAggregator:也只會 emit 一個 tuple,這個 tuple 只有一個 value
Aggregator: 可以 emit 包含任意 fields 的任意數(shù)量 tuples鸦致,是一個比較通用的接口

Repartitioning operations

和Partition-local operations 相反,這類操作一定會發(fā)生網(wǎng)絡(luò)上的傳輸抗碰。

1.shuffle

類似 storm-core 中的 shuffle grouping绽乔, 基于 Random Round Robin 算法隨機將 tuples 均勻的傳給目標 partition折砸。

2. broadcast

類似 storm-core 的 all grouping,每個tuple 都會復(fù)制發(fā)送到后續(xù)所有的 partition鹃觉。

3.partitionBy

類似 storm-core 的 Fields grouping睹逃,保證相同 fields 值數(shù)據(jù)被分配到統(tǒng)一個 partiton。

4.global

類似 storm-core 的 Global grouping疗隶,所有 tuples 被分配到同一個 partion翼闹。

5.batchGlobal

和 global 類似,但其會保證同一個 batch 的 tuples 被分配到同一個 partition坚弱。

Aggregation operations

注意與上文的 partitionAggregate 區(qū)別荒叶,這類操作是作用于 streams 之上的输虱,而partitionAggregate 僅僅是對單個 batch的,即一個 batch 所擁有的本地操作愁茁。

這類操作可以分成兩種:

  • 1.aggregate:以 batch 為單位亭病,每個 batch 獨立實現(xiàn)相應(yīng)的聚合計算。
  • 2.persistentAggregate:與 aggregate 相反促煮,
    persistentAggregate 則是基于所有 batch 的所有 tuples 在全局實現(xiàn)聚合。

常用的聚合操作包括:ReducerAggregator樱报、CombinerAggregator 以及通用的 Aggregator泞当。其中 ReducerAggregator 和 Aggregator 會操作會將 stream repartition 到一個單獨的 partition,在這個 partition 上實現(xiàn)聚合操作盗飒。而CombinerAggregator 則會現(xiàn)在每個 partition 上做實現(xiàn) partial aggregation逆趣,然后將每個 partition 的結(jié)果在 repartition 到一個單獨的 partition 實現(xiàn)聚合操作嗜历。

所以相比而言 CombinerAggregator 性能會更好。

Operations on grouped streams

這個操作只有一種痕囱,即 “groupby” 暴匠,功能類似 sql 中的 groupby,基于指定的 fields 分組帮掉,此后的操作窒典,比如 “persistentAggregate” 則不在以 batch 為單位崇败,而是以不同的 group肩祥。

Merges and joins

這一類操作主要用于不同 stream 之間的計算,包含兩種操作 “merge” 和 “join”混狠。

參考

trident-api docs
trident-examples

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末将饺,一起剝皮案震驚了整個濱河市痛黎,隨后出現(xiàn)的幾起案子湖饱,更是在濱河造成了極大的恐慌杀捻,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,907評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異垢袱,居然都是意外死亡请契,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,987評論 3 395
  • 文/潘曉璐 我一進店門贿衍,熙熙樓的掌柜王于貴愁眉苦臉地迎上來贸辈,“玉大人肠槽,你說我怎么就攤上這事∽炻#” “怎么了寂纪?”我有些...
    開封第一講書人閱讀 164,298評論 0 354
  • 文/不壞的土叔 我叫張陵捞蛋,是天一觀的道長。 經(jīng)常有香客問我庄涡,道長搬设,這世上最難降的妖魔是什么撕捍? 我笑而不...
    開封第一講書人閱讀 58,586評論 1 293
  • 正文 為了忘掉前任忧风,我火速辦了婚禮阀蒂,結(jié)果婚禮上弟蚀,老公的妹妹穿的比我還像新娘。我一直安慰自己昧绣,他們只是感情好捶闸,可當我...
    茶點故事閱讀 67,633評論 6 392
  • 文/花漫 我一把揭開白布删壮。 她就那樣靜靜地躺著,像睡著了一般税灌。 火紅的嫁衣襯著肌膚如雪亿虽。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,488評論 1 302
  • 那天,我揣著相機與錄音收毫,去河邊找鬼。 笑死昔搂,一個胖子當著我的面吹牛引润,可吹牛的內(nèi)容都是我干的巩趁。 我是一名探鬼主播,決...
    沈念sama閱讀 40,275評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼淳附,長吁一口氣:“原來是場噩夢啊……” “哼议慰!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起奴曙,我...
    開封第一講書人閱讀 39,176評論 0 276
  • 序言:老撾萬榮一對情侶失蹤别凹,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后洽糟,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體炉菲,經(jīng)...
    沈念sama閱讀 45,619評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,819評論 3 336
  • 正文 我和宋清朗相戀三年坤溃,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片薪介。...
    茶點故事閱讀 39,932評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡祠饺,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出汁政,到底是詐尸還是另有隱情道偷,我是刑警寧澤,帶...
    沈念sama閱讀 35,655評論 5 346
  • 正文 年R本政府宣布记劈,位于F島的核電站勺鸦,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏目木。R本人自食惡果不足惜换途,卻給世界環(huán)境...
    茶點故事閱讀 41,265評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望刽射。 院中可真熱鬧怀跛,春花似錦、人聲如沸柄冲。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,871評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽现横。三九已至漓拾,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間戒祠,已是汗流浹背骇两。 一陣腳步聲響...
    開封第一講書人閱讀 32,994評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留姜盈,地道東北人低千。 一個月前我還...
    沈念sama閱讀 48,095評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像,于是被迫代替她去往敵國和親示血。 傳聞我的和親對象是個殘疾皇子棋傍,可洞房花燭夜當晚...
    茶點故事閱讀 44,884評論 2 354

推薦閱讀更多精彩內(nèi)容

  • 原文鏈接 譯者:魏勇 Trident 中含有對狀態(tài)化(stateful)的數(shù)據(jù)源進行讀取和寫入操作的一級抽象...
    Albert陳凱閱讀 497評論 0 1
  • Storm入門系列之一:storm核心概念及特性 本文的將介紹一些 storm 入門的基礎(chǔ)知識,包括 storm ...
    zhaif閱讀 3,111評論 0 17
  • Date: Nov 17-24, 2017 1. 目的 積累Storm為主的流式大數(shù)據(jù)處理平臺對實時數(shù)據(jù)處理的相關(guān)...
    一只很努力爬樹的貓閱讀 2,174評論 0 4
  • 這是一個JStorm使用教程难审,不包含環(huán)境搭建教程瘫拣,直接在公司現(xiàn)有集群上跑任務(wù),關(guān)于JStorm集群環(huán)境搭建告喊,后續(xù)研...
    Coselding閱讀 6,335評論 1 9
  • 想給你寫封信 說說我的心事 可是 還是駐筆停頓了 遠方的摯友 你曾說過 不要輕易相信一個人 除了我 可是 我還是錯...
    笙筱呦閱讀 191評論 7 3