前言
Strom特點
進程常駐內(nèi)存十拣,數(shù)據(jù)不經(jīng)過磁盤,在內(nèi)存中處理召嘶,速度非掣妇В快,可以達到毫秒(秒)級別
Storm數(shù)據(jù)傳輸
采用Netty弄跌,基于NIO甲喝,更加高效,早期采用ZMQ,但是ZMQ和Strom的license不兼容铛只。
Storm可靠性
- 異常處理
- 消息可靠性的保證機制ACK
Strom和其他框架的對比
1.Spark-Streaming 微批處理框架 秒級埠胖,不是純流式計算框架和Spark核心之上的計算模型,和Spark的其他組件兼容較好淳玩。
2.MR 批處理框架直撤,分鐘級別,MR模型蜕着,反復(fù)啟停谋竖。
3.Strom 秒(毫秒)級別,流式處理承匣,DAG模型有向無環(huán)圖蓖乘,常駐運行,不關(guān)閉韧骗,獨立系統(tǒng)專為流式處理設(shè)計嘉抒。
一.Storm編程模型
Spout:用于持續(xù)不斷的發(fā)送數(shù)據(jù)
Tuple:數(shù)據(jù)都被封裝進tuple容器,進行傳輸袍暴,Storm中發(fā)送數(shù)據(jù)的基本單元些侍,Tuple不斷的向后傳輸隶症,像水滴一樣
Bolt:用于接受并處理數(shù)據(jù)
Stream:從Spout中源源不斷傳遞數(shù)據(jù)給Blot,以及上一個Blog傳遞數(shù)據(jù)給下一個Blot岗宣,所組成的數(shù)據(jù)通道叫做Stream蚂会,Stream默認(rèn)的名稱為default,可以為其指定id狈定。
并行度:可以使用多線程模型颂龙,充分利用CPU,可以有效應(yīng)對高并發(fā)纽什,高數(shù)據(jù)量的應(yīng)用場景措嵌,還可以多臺服務(wù)器,多節(jié)點芦缰,多線程運行任務(wù)
有向無環(huán)圖(Directed Acyclic Graph):對于Storm實時計算邏輯的封裝企巢,即通過一系列由數(shù)據(jù)流相互關(guān)聯(lián)的Spout、Blot所組成的拓?fù)浣Y(jié)構(gòu)
二.Storm的分發(fā)策略
1让蕾、Shuffle Grouping
隨機分發(fā)浪规,隨機派發(fā)Stream里面的tuple,保證每個bolt task接收到的tuple的數(shù)量大致相同
2探孝、Field Grouping
根據(jù)字段分發(fā)笋婿,例如根據(jù)OutputFieldsDeclarer中聲明的一個field屬性進行分發(fā),field的對應(yīng)的tuple的值相同顿颅,就會分發(fā)到同一bolt中去進行處理缸濒,filed不同可能就會被分發(fā)到不同的task
3、All Grouping
廣播模式分發(fā)粱腻,每一個tuple都會被分發(fā)到下一個階段的所有的bolt中
4庇配、Global Grouping分發(fā)
將tuple分發(fā)到后續(xù)bolt中taskid最小的task中去執(zhí)行,可以看做是高可用绍些,其他的bolt作為備用捞慌,一旦taskid最小的task對應(yīng)的bolt掛了,還有bolt可以使用柬批,可以保證Storm集群的可用性
5、None Grouping
類似于Shuffle Grouping氮帐,不同的是None Grouping采用的是輪訓(xùn)的形式揪漩,Shuffle采用的是隨機分發(fā)有一點不同的是storm會把使用none grouping的這個bolt放到這個bolt的訂閱者同一個線程里面去執(zhí)行
6奄容、Direct Grouping
指向性分發(fā),這種分發(fā)策略意味著消息(tuple)的發(fā)送者指定由消息接受者的哪個task來處理消息蜀细,只有被聲明為Direct Stream的消息流可以聲明這種分組方法奠衔,而且這種消息必須由emitDirect方法來發(fā)送归斤≡嗬铮可以使用TopologyContext獲取taskId迫横,還有outputCollector.emit()方法也可以返回taskid
7酝碳、Local or Shuffle Grouping
本地隨機分組,如果目標(biāo)bolt有一個或者多個task與源bolt的task在同進程中呛讲,則隨機分發(fā)到同進程的task中沃斤,否則和Shuffle Grouping一樣
8、自定義分發(fā) customGrouping
三.Storm 架構(gòu)模型
1.Nimbus 主節(jié)點的守護進程
- 負(fù)責(zé)資源調(diào)度
- 任務(wù)分配
- 接受Client上傳的jar包
2.Supervisor 從節(jié)點的守護進程徘公,具體完成計算工作
接受NimBus分配的任務(wù)->監(jiān)視ZK節(jié)點看是否有分配給自己的任務(wù)
啟動关面、關(guān)閉自己管理的Worker進程十厢,可以有多個Worker進程蛮放,Work進程的數(shù)量由配置文件設(shè)定包颁,配置文件是Client上傳jar包的配置文件,即由Client指定滴肿。
3.Worker 從節(jié)點的工作進程
由Supervisor控制啟動關(guān)閉泼差,專門用于計算呵俏,所有的拓?fù)渥鳂I(yè)在Worker上運行柴信。
運行具體運算組件的進程,一個topology可能會在一個或者多個worker(工作進程)里面執(zhí)行潜沦。
Worker任務(wù)的類型只有兩種唆鸡, Spout和Bolt争占。
Executor是Worker JVM內(nèi)部的一個線程,一般每個Executor負(fù)責(zé)運行一個或多個任務(wù)臂痕,但僅用于特定的spout或bolt。但一般默認(rèn)
每個executor只執(zhí)行一個task猿涨。
Worker中可以啟動多個線程Executor,執(zhí)行特定的spout對應(yīng)的多個task任務(wù)叛赚,提高并行度澡绩,但是注意每個task的taskid不一致俺附,taskid起到了區(qū)分作用肥卡。
4.Zookeeper
替代了部分Nimbus的作用,也是目前storm是不支持nimbus高可用的但能保證系統(tǒng)不受太大影響的一個支撐事镣。
Zookeeper本身已經(jīng)是按至少三臺部署的HA架構(gòu)了步鉴。Supervisor進程和Nimbus進程,需要用Daemon程序如monit來啟動唠叛,失效時自動重新啟動艺沼。因為它們在進程內(nèi)都不保存狀態(tài),狀態(tài)都保存在本地文件和ZooKeeper盛杰,因此進程可以隨便殺。
如果Nimbus進程所在的機器都直接倒了,需要在其他機器上重新啟動逗嫡,Storm目前沒有自建支持青自,需要自己寫腳本實現(xiàn)。即使Nimbus進程不在了驱证,也只是不能部署新任務(wù)延窜,有節(jié)點失效時不能重新分配而已,不影響已有的線程抹锄。同樣逆瑞,如果Supervisor進程失效,不影響已存在的Worker進程伙单。
目前storm官方或許是出于nimbus宕機對集群影響不大的考慮获高,并沒有在這方面有所進展。
四.Strom 任務(wù)提交流程
1.Client提交topology作業(yè)的相關(guān)jar包到Nimbus
2.提交的jar包會被上傳到Nimbus服務(wù)器下的Store-local/nimbus/inbox目錄下
3.submitTopology方法負(fù)責(zé)對這個topology進行處理吻育。
- 首先對storm本身和topology進行一些校驗念秧,檢查storm狀態(tài)是否是active的
- 檢查是否存在同名的topology已經(jīng)在storm中運行了
- 檢查topology中的spout和bolt是否使用了相同的id,以及id是否規(guī)范扫沼,不能以開頭出爹,是系統(tǒng)保留的命名方式
4.建立topology的本地文件目錄 /nimbus/stormdist/topology-uuid
該目錄包括三個文件
- stormjar.jar :包含這個topology的所有代碼的jar包
- stomecode.ser --這個topology對象的序列化
- stomeconf.ser --運行這個topology的配置
5.numbus分配任務(wù),獲取空閑的work缎除,根據(jù)topology定義中給的numworker參數(shù)严就、parallelism參數(shù)和numTask數(shù)量,給spout和bolt設(shè)置task數(shù)目器罐,并且分配相應(yīng)的task-id梢为,分配worker
6.numbus在ZK上創(chuàng)建taskbeat目錄,要求每個task每隔一定時間就要發(fā)送心跳匯報狀態(tài)
7.將分配好的任務(wù)寫入到zk中,此刻任務(wù)才算提交完畢铸董,zk上節(jié)點為assignment/topology-uuid
8.將topology的信息寫入到zookeeper/storms目錄
9.Supervisor監(jiān)聽zookeeper上的storms目錄祟印,看看是否有所屬的新的任務(wù),有就根據(jù)任務(wù)信息粟害,啟動worker蕴忆,下載下來jar包
10.刪除本地不再運行的topology代碼
11.supervisor根據(jù)nimbus指定的任務(wù)信息啟動worker進行工作
12.work根據(jù)taskid分辨出spout和blot
13.計算出所代表的spout/blot會給哪些task發(fā)送消息
14.根據(jù)ip和端口號創(chuàng)建響應(yīng)的網(wǎng)絡(luò)連接用來發(fā)送消息
Storm本地目錄樹
Zookeeper目錄樹
Storm程序的并發(fā)機制
從官網(wǎng)的解釋中,我們可以得出以下幾點悲幅。
1.Workers (JVMs):
- 一個Topology拓?fù)淇梢栽谝粋€或多個Worker上運行套鹅,(每個Worker進程只能從屬于一個特定的Topology)
- 這些Worker進程會并行跑在集群中不同的服務(wù)器上,即一個Topology拓?fù)淦鋵嵤怯刹⑿羞\行在Storm集群中多臺服務(wù)器上的進程所組成汰具。
2.Executors (threads):
- Executor是由Worker進程中生成的一個線程
- 每個Worker進程中會運行拓?fù)渲械囊粋€或多個Executor線程卓鹿。
- 一個Executor線程線程可以執(zhí)行一個或者多個Task任務(wù),默認(rèn)只運行一個任務(wù)留荔,但是這些任務(wù)都對應(yīng)相同的Component(Spout/Blot)吟孙。
3.Tasks(bolt/spout instances):
- 實際執(zhí)行數(shù)據(jù)處理的最小單元
- 每個task即為一個Spout或者一個Bolt。
- Task數(shù)量在整個Topology生命周期中保持不變聚蝶,worker,Executor數(shù)量可以變化或手動調(diào)整杰妓。
- 默認(rèn)情況下,Task數(shù)量和Executor是相同的既荚,即每個Executor線程中默認(rèn)運行一個Task任務(wù)
- 可以調(diào)用TopologyBuilder.setSpout和TopBuilder.setBolt來設(shè)置并行度稚失,也就是有多少個task。
4.對應(yīng)關(guān)系總結(jié)
Topology與Woker的關(guān)系
- 1:n
- 1:1
Executor與Task的關(guān)系
- 1:1
- 1:n(相同的Component)
對應(yīng)上圖的代碼示例
Config conf = newConfig();
//用2個worker
conf.setNumWorkers(2);
//設(shè)置2個并發(fā)度
topologyBuilder.setSpout("blue-spout", newBlueSpout(), 2);
////設(shè)置2個并發(fā)度恰聘,4個任務(wù)
topologyBuilder.setBolt("green-bolt", newGreenBolt(), 2).setNumTasks(4).shuffleGrouping("blue-spout");
//設(shè)置6個并發(fā)度
topologyBuilder.setBolt("yellow-bolt", newYellowBolt(), 6).shuffleGrouping("green-bolt");
StormSubmitter.submitTopology("mytopology", conf, topologyBuilder.createTopology());
由上可知句各,3個組件的并發(fā)度加起來是10,就是說拓?fù)湟还灿?0個executor晴叨,一共有2個worker凿宾,每個worker產(chǎn)生10 / 2 = 5條線程。
綠色的bolt配置成2個executor和4個task兼蕊。為此每個executor為這個bolt運行2個task初厚。黃色bolt和藍色的spout都是一個executor運行一個task。
對于并發(fā)度的配置, 在storm里面可以在多個地方進行配置, 優(yōu)先級為:
defaults.yaml < storm.yaml < topology-specific configuration < internal component-specific configuration < external component-specific configuration
Storm的Rebalance – 再平衡
動態(tài)調(diào)整Topology拓?fù)涞腤orker進程數(shù)量孙技、以及Executor線程數(shù)量产禾。支持兩種調(diào)整方式:
1、通過Storm UI
2牵啦、通過Storm CLI
storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10
將mytopology拓?fù)鋡orker進程數(shù)量調(diào)整為5個亚情,“ blue-spout ” 所使用的線程數(shù)量調(diào)整為3個,“ yellow-bolt ”所使用的線程數(shù)量調(diào)整為10個哈雏。