rebalance和reassign庶骄,讀這篇文章就夠了

rebalance

我們知道,在storm中rebalance可以通過ui鳖链、命令行、代碼的方式來調(diào)用墩莫,對Topology的worker數(shù)進(jìn)行重新分配。
rebalance通過rebalance(String name, RebalanceOptions options)逞敷、recv_rebalance()方法狂秦,向nimbus傳輸數(shù)據(jù)和接收數(shù)據(jù)。

public void send_rebalance(String name, RebalanceOptions options) throws TException {
            Nimbus.rebalance_args args = new Nimbus.rebalance_args();
            args.set_name(name);
            args.set_options(options);
            this.sendBase("rebalance", args);
        }

        public void recv_rebalance() throws NotAliveException, InvalidTopologyException, AuthorizationException, TException {
            Nimbus.rebalance_result result = new Nimbus.rebalance_result();
            this.receiveBase(result, "rebalance");
            if (result.e != null) {
                throw result.e;
            } else if (result.ite != null) {
                throw result.ite;
            } else if (result.aze != null) {
                throw result.aze;
            }
        }

在接收到rebalance信號后推捐,Topology由active狀態(tài)轉(zhuǎn)換為rebalance狀態(tài)
rebalance: 實(shí)際上是調(diào)用了 rebalance-transition 函數(shù)裂问,從代碼可以看出,會將狀態(tài)改成 rebalancing, 然后再轉(zhuǎn)換成 do-rebalance 牛柒。 do-rebalance 其實(shí)也是重新分配任務(wù)

(defn rebalance-transition [nimbus storm-id status]  
  (fn [time num-workers executor-overrides]  
    (let [delay (if time  
                  time  
                  (get (read-storm-conf (:conf nimbus) storm-id)  
                       TOPOLOGY-MESSAGE-TIMEOUT-SECS))]  
      (delay-event nimbus  
                   storm-id  
                   delay  
                   :do-rebalance)  
      {:type :rebalancing  
       :delay-secs delay  
       :old-status status  
       :num-workers num-workers  
       :executor-overrides executor-overrides  
       })))  

在do-rebalance中調(diào)用mk-assignments重新分配任務(wù)

(defn do-rebalance [nimbus storm-id status storm-base]
  (let [rebalance-options (:topology-action-options storm-base)]
    (.update-storm! (:storm-cluster-state nimbus)
      storm-id
        (-> {}
          (assoc-non-nil :component->executors (:component->executors rebalance-options))
          (assoc-non-nil :num-workers (:num-workers rebalance-options)))))
  (mk-assignments nimbus :scratch-topology-id storm-id))
什么是mk-assignment

主要就是產(chǎn)生executor->node+port關(guān)系, 將executor分配到哪個(gè)node的哪個(gè)slot上(port代表slot, 一個(gè)slot可以run一個(gè)worker進(jìn)程, 一個(gè)worker包含多個(gè)executor線程)
mk-assignment源碼在此處貼出:mk-assignment源碼

在mk-assignment中分為以下幾個(gè)方面

  1. 讀出所有active topology信息
  2. 讀出當(dāng)前的assignemnt情況
  3. 根據(jù)取到的Topology和Assignement情況, 對當(dāng)前topology進(jìn)行新的assignment
Storm的可靠性保證

其實(shí)Storm本身已經(jīng)提供了該問題可靠性保證堪簿。大致的原理是:
spout發(fā)出的所有數(shù)據(jù),都有一個(gè)acker對其進(jìn)行追蹤皮壁,無論處理成功椭更、失敗或者超時(shí),都會告知spout蛾魄。如果spout發(fā)現(xiàn)消息處理失敗或丟失虑瀑,則會重新發(fā)送該消息湿滓。
結(jié)合Topology rebalance的過程,首先de-active舌狗,這時(shí)候topology的狀態(tài)被保存叽奥。未被處理的消息由acker追蹤。
當(dāng)topology重新分配后痛侍,spout發(fā)現(xiàn)已發(fā)出的消息未被處理朝氓,則重新發(fā)射這些消息。


reassign

在mk-assignment中的第三步主届,找出missing-assignment-topologies, 需要從新assign (當(dāng)前邏輯沒有用到, 在sechduler里面會自己判斷(判斷邏輯相同))
什么叫missing-assignment, 滿足下面任一條件

  • topology->executors, 其中沒有該topolgy, 說明該topology沒有assignment信息, 新的或scratch
  • topology->executors != topology->alive-executors, 說明有executor dead
  • topology->scheduler-assignment中的實(shí)際worker數(shù)小于topology配置的worker數(shù) (可能上次assign的時(shí)候可用slot不夠, 也可能由于dead slot造成)
        missing-assignment-topologies (->> topologies
                                           .getTopologies
                                           (map (memfn getId))
                                           (filter (fn [t]
                                                     (let [alle (get topology->executors t)
                                                           alivee (get topology->alive-executors t)]
                                                       (or (empty? alle)
                                                           (not= alle alivee)
                                                           (< (-> topology->scheduler-assignment
                                                                  (get t)
                                                                  num-used-workers )
                                                              (-> topologies (.getById t) .getNumWorkers)))))))

supervisor會定時(shí)從zookeeper獲取topologies赵哲、已分配的任務(wù)分配信息assignments及各類心跳信息,以此為依據(jù)進(jìn)行任務(wù)分配岂膳。
在supervisor周期性地進(jìn)行同步時(shí)誓竿,會根據(jù)新的任務(wù)分配來啟動新的worker或者關(guān)閉舊的worker,以響應(yīng)任務(wù)分配和負(fù)載均衡谈截。
worker通過定期的更新connections信息筷屡,來獲知其應(yīng)該通訊的其它worker。

詳見Lifecycle-of-a-topology.md

- Nimbus monitors the topology during its lifetime
   - Schedules recurring task on the timer thread to check the topologies [code](https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/nimbus.clj#L623)
   - Nimbus's behavior is represented as a finite state machine [code](https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/nimbus.clj#L98)
   - The "monitor" event is called on a topology every "nimbus.monitor.freq.secs", which calls `reassign-topology` through `reassign-transition` [code](https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/nimbus.clj#L497)
   - `reassign-topology` calls `mk-assignments`, the same function used to assign the topology the first time. `mk-assignments` is also capable of incrementally updating a topology
      - `mk-assignments` checks heartbeats and reassigns workers as necessary
      - Any reassignments change the state in ZK, which will trigger supervisors to synchronize and start/stop workers
所以關(guān)于reassign簸喂,其實(shí)就是nimbus重新調(diào)用了mk-assignment,并且根據(jù)負(fù)載均衡重新分配任務(wù)毙死。

參考文章:

mk-assignment源碼解析
storm如何分配任務(wù)和負(fù)載均衡?
Storm中Topology的狀態(tài)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末喻鳄,一起剝皮案震驚了整個(gè)濱河市扼倘,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌除呵,老刑警劉巖再菊,帶你破解...
    沈念sama閱讀 218,546評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異颜曾,居然都是意外死亡纠拔,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,224評論 3 395
  • 文/潘曉璐 我一進(jìn)店門泛豪,熙熙樓的掌柜王于貴愁眉苦臉地迎上來稠诲,“玉大人,你說我怎么就攤上這事诡曙⊥涡穑” “怎么了?”我有些...
    開封第一講書人閱讀 164,911評論 0 354
  • 文/不壞的土叔 我叫張陵价卤,是天一觀的道長劝萤。 經(jīng)常有香客問我,道長荠雕,這世上最難降的妖魔是什么稳其? 我笑而不...
    開封第一講書人閱讀 58,737評論 1 294
  • 正文 為了忘掉前任驶赏,我火速辦了婚禮,結(jié)果婚禮上既鞠,老公的妹妹穿的比我還像新娘煤傍。我一直安慰自己,他們只是感情好嘱蛋,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,753評論 6 392
  • 文/花漫 我一把揭開白布蚯姆。 她就那樣靜靜地躺著,像睡著了一般洒敏。 火紅的嫁衣襯著肌膚如雪龄恋。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,598評論 1 305
  • 那天凶伙,我揣著相機(jī)與錄音郭毕,去河邊找鬼。 笑死函荣,一個(gè)胖子當(dāng)著我的面吹牛显押,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播傻挂,決...
    沈念sama閱讀 40,338評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼乘碑,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了金拒?” 一聲冷哼從身側(cè)響起兽肤,我...
    開封第一講書人閱讀 39,249評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎绪抛,沒想到半個(gè)月后资铡,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,696評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡幢码,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,888評論 3 336
  • 正文 我和宋清朗相戀三年害驹,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片蛤育。...
    茶點(diǎn)故事閱讀 40,013評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖葫松,靈堂內(nèi)的尸體忽然破棺而出瓦糕,到底是詐尸還是另有隱情,我是刑警寧澤腋么,帶...
    沈念sama閱讀 35,731評論 5 346
  • 正文 年R本政府宣布咕娄,位于F島的核電站,受9級特大地震影響珊擂,放射性物質(zhì)發(fā)生泄漏圣勒。R本人自食惡果不足惜费变,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,348評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望圣贸。 院中可真熱鬧挚歧,春花似錦、人聲如沸吁峻。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,929評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽用含。三九已至矮慕,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間啄骇,已是汗流浹背痴鳄。 一陣腳步聲響...
    開封第一講書人閱讀 33,048評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留缸夹,地道東北人痪寻。 一個(gè)月前我還...
    沈念sama閱讀 48,203評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像明未,于是被迫代替她去往敵國和親槽华。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,960評論 2 355