rebalance
我們知道,在storm中rebalance可以通過ui鳖链、命令行、代碼的方式來調(diào)用墩莫,對Topology的worker數(shù)進(jìn)行重新分配。
rebalance通過rebalance(String name, RebalanceOptions options)逞敷、recv_rebalance()方法狂秦,向nimbus傳輸數(shù)據(jù)和接收數(shù)據(jù)。
public void send_rebalance(String name, RebalanceOptions options) throws TException {
Nimbus.rebalance_args args = new Nimbus.rebalance_args();
args.set_name(name);
args.set_options(options);
this.sendBase("rebalance", args);
}
public void recv_rebalance() throws NotAliveException, InvalidTopologyException, AuthorizationException, TException {
Nimbus.rebalance_result result = new Nimbus.rebalance_result();
this.receiveBase(result, "rebalance");
if (result.e != null) {
throw result.e;
} else if (result.ite != null) {
throw result.ite;
} else if (result.aze != null) {
throw result.aze;
}
}
在接收到rebalance信號后推捐,Topology由active狀態(tài)轉(zhuǎn)換為rebalance狀態(tài)
rebalance: 實(shí)際上是調(diào)用了 rebalance-transition 函數(shù)裂问,從代碼可以看出,會將狀態(tài)改成 rebalancing, 然后再轉(zhuǎn)換成 do-rebalance 牛柒。 do-rebalance 其實(shí)也是重新分配任務(wù)
(defn rebalance-transition [nimbus storm-id status]
(fn [time num-workers executor-overrides]
(let [delay (if time
time
(get (read-storm-conf (:conf nimbus) storm-id)
TOPOLOGY-MESSAGE-TIMEOUT-SECS))]
(delay-event nimbus
storm-id
delay
:do-rebalance)
{:type :rebalancing
:delay-secs delay
:old-status status
:num-workers num-workers
:executor-overrides executor-overrides
})))
在do-rebalance中調(diào)用mk-assignments重新分配任務(wù)
(defn do-rebalance [nimbus storm-id status storm-base]
(let [rebalance-options (:topology-action-options storm-base)]
(.update-storm! (:storm-cluster-state nimbus)
storm-id
(-> {}
(assoc-non-nil :component->executors (:component->executors rebalance-options))
(assoc-non-nil :num-workers (:num-workers rebalance-options)))))
(mk-assignments nimbus :scratch-topology-id storm-id))
什么是mk-assignment
主要就是產(chǎn)生executor->node+port關(guān)系, 將executor分配到哪個(gè)node的哪個(gè)slot上(port代表slot, 一個(gè)slot可以run一個(gè)worker進(jìn)程, 一個(gè)worker包含多個(gè)executor線程)
mk-assignment源碼在此處貼出:mk-assignment源碼
在mk-assignment中分為以下幾個(gè)方面
- 讀出所有active topology信息
- 讀出當(dāng)前的assignemnt情況
- 根據(jù)取到的Topology和Assignement情況, 對當(dāng)前topology進(jìn)行新的assignment
Storm的可靠性保證
其實(shí)Storm本身已經(jīng)提供了該問題可靠性保證堪簿。大致的原理是:
spout發(fā)出的所有數(shù)據(jù),都有一個(gè)acker對其進(jìn)行追蹤皮壁,無論處理成功椭更、失敗或者超時(shí),都會告知spout蛾魄。如果spout發(fā)現(xiàn)消息處理失敗或丟失虑瀑,則會重新發(fā)送該消息湿滓。
結(jié)合Topology rebalance的過程,首先de-active舌狗,這時(shí)候topology的狀態(tài)被保存叽奥。未被處理的消息由acker追蹤。
當(dāng)topology重新分配后痛侍,spout發(fā)現(xiàn)已發(fā)出的消息未被處理朝氓,則重新發(fā)射這些消息。
reassign
在mk-assignment中的第三步主届,找出missing-assignment-topologies, 需要從新assign (當(dāng)前邏輯沒有用到, 在sechduler里面會自己判斷(判斷邏輯相同))
什么叫missing-assignment, 滿足下面任一條件
- topology->executors, 其中沒有該topolgy, 說明該topology沒有assignment信息, 新的或scratch
- topology->executors != topology->alive-executors, 說明有executor dead
- topology->scheduler-assignment中的實(shí)際worker數(shù)小于topology配置的worker數(shù) (可能上次assign的時(shí)候可用slot不夠, 也可能由于dead slot造成)
missing-assignment-topologies (->> topologies
.getTopologies
(map (memfn getId))
(filter (fn [t]
(let [alle (get topology->executors t)
alivee (get topology->alive-executors t)]
(or (empty? alle)
(not= alle alivee)
(< (-> topology->scheduler-assignment
(get t)
num-used-workers )
(-> topologies (.getById t) .getNumWorkers)))))))
supervisor會定時(shí)從zookeeper獲取topologies赵哲、已分配的任務(wù)分配信息assignments及各類心跳信息,以此為依據(jù)進(jìn)行任務(wù)分配岂膳。
在supervisor周期性地進(jìn)行同步時(shí)誓竿,會根據(jù)新的任務(wù)分配來啟動新的worker或者關(guān)閉舊的worker,以響應(yīng)任務(wù)分配和負(fù)載均衡谈截。
worker通過定期的更新connections信息筷屡,來獲知其應(yīng)該通訊的其它worker。
詳見Lifecycle-of-a-topology.md
- Nimbus monitors the topology during its lifetime
- Schedules recurring task on the timer thread to check the topologies [code](https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/nimbus.clj#L623)
- Nimbus's behavior is represented as a finite state machine [code](https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/nimbus.clj#L98)
- The "monitor" event is called on a topology every "nimbus.monitor.freq.secs", which calls `reassign-topology` through `reassign-transition` [code](https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/nimbus.clj#L497)
- `reassign-topology` calls `mk-assignments`, the same function used to assign the topology the first time. `mk-assignments` is also capable of incrementally updating a topology
- `mk-assignments` checks heartbeats and reassigns workers as necessary
- Any reassignments change the state in ZK, which will trigger supervisors to synchronize and start/stop workers
所以關(guān)于reassign簸喂,其實(shí)就是nimbus重新調(diào)用了mk-assignment,并且根據(jù)負(fù)載均衡重新分配任務(wù)毙死。
參考文章:
mk-assignment源碼解析
storm如何分配任務(wù)和負(fù)載均衡?
Storm中Topology的狀態(tài)