有狀態(tài)的流處理系統(tǒng)的彈性伸縮(Elastic Scaling)

前言

看到一篇14年的關(guān)于流處理系統(tǒng)的彈性縮擴(kuò)容的論文Elastic Scaling for Data Stream Processing额衙,覺得挺有啟發(fā)性史汗,于是就寫了一篇中文的解讀琼掠。文章主要關(guān)注與設(shè)計與算法實現(xiàn),關(guān)于benchmark的部分沒有涉及停撞。并不是論文翻譯瓷蛙,沒有做到很精準(zhǔn)的表達(dá),里面也算是我個人的理解戈毒,如果哪邊有錯誤歡迎指正艰猬。

概述

  • 流處理應(yīng)用實際上就是由算子(點),數(shù)據(jù)流(邊)組成的一個有向圖埋市。要對這樣的應(yīng)用進(jìn)行彈性伸縮冠桃,本質(zhì)上就是改變這個圖映射到一組機(jī)器上的方式。
  • 但是如何做到讓應(yīng)用開發(fā)者對此無感知的同時還能保證結(jié)果的正確性道宅?并且如何設(shè)計一個高效的食听,資源節(jié)省的Scaling機(jī)制胸蛛?
  • 這套機(jī)制需要做到能根據(jù)工作量,還有可用的資源數(shù)量來動態(tài)的調(diào)節(jié)并行度樱报。

基本概念

  • 如下圖就是一個由點(算子)和邊(數(shù)據(jù)流組成的)流處理應(yīng)用葬项。
    • 點與點之間往往不止一對一的關(guān)系(窄依賴),還有一對多迹蛤,多對一民珍,多對多的關(guān)系。
    • 實際上多對多就是由多個多對一組合而成
    • 在一對多的關(guān)系中盗飒,我們需要一個Splitter嚷量,流分區(qū)器
    • 在多對一的關(guān)系中,我們需要一個Merger箩兽,流歸并器(沒想到更好的翻譯)
    • 包含Splitter和Merger的應(yīng)用
    • 而在多對多的關(guān)系中津肛,上游存在Splitter,下游存在Merger
    • 一個流經(jīng)過流分區(qū)器會被分割為多個流汗贫,在這兒身坐,我們把被分割的子流,叫做Channel落包,通道部蛇,對應(yīng)著圖上的邊。

需求

  • 首先有兩個重要的需求
    • 有狀態(tài)的算子在擴(kuò)容或者縮容時需要狀態(tài)遷移 =>
      • 讓用戶無感知
      • 遷移盡可能少的狀態(tài)
    • 一個根據(jù)運(yùn)行時的情況來決定是否和如何和Scaling的控制算法
      • 鑒于一個通用的流處理系統(tǒng)會有大量的用戶自定義的算子咐蝇,基于代價的優(yōu)化可行性不高
      • 這套算法應(yīng)該基于運(yùn)行時的監(jiān)控數(shù)據(jù)
      • 提供SASO特性
        • Stability 不容易產(chǎn)生震蕩涯鲁,導(dǎo)致頻繁的增減并行度
        • Accuracy 找到最大化吞吐的并行度
        • Settling time 快速的到達(dá)穩(wěn)定的最優(yōu)并行度
        • Overshoot 避免浪費資源,使用超過實際需要的并行度

解決思路

  • 針對狀態(tài)
    • 提供State的API有序,對用戶屏蔽狀態(tài)遷移的具體細(xì)節(jié)
    • 一個增量(原文為增量抹腿,個人理解為部分)的狀態(tài)遷移協(xié)議,配合基于一致性哈希的流分區(qū)策略
  • 針對控制算法
    • 依賴的監(jiān)控數(shù)據(jù)
      • Congestion Index旭寿,流分區(qū)器中的阻塞時間(簡單理解為反壓時間)
      • Throughput 吞吐量
    • 具體操作
      • 基于監(jiān)控動態(tài)增減下游并行度來解決Acurracy和Overshoot問題
      • 記錄歷史表現(xiàn)來保證Stability警绩,防止震蕩
      • Rapid Scaling 快速擴(kuò)縮容來減少Settling time

狀態(tài)遷移

  • 狀態(tài)管理
    • 從狀態(tài)的角度來看算子可以分為無狀態(tài)的算子和分區(qū)狀態(tài)的算子(partitioned stateful)。
    • 分區(qū)狀態(tài)的算子需要根據(jù)流分區(qū)器的變化來遷移狀態(tài)盅称,并且遷移的狀態(tài)是該分區(qū)的子集肩祥,所以要求系統(tǒng)能夠比較細(xì)粒度的掌控這些狀態(tài)(比如說keyBy這樣的流分區(qū)器,系統(tǒng)能遷移指定key的集合的狀態(tài))
  • 狀態(tài)遷移協(xié)議
    • 變量含義
      • i 代表一個算子的某一個通道(分割的子流)的索引
      • N 代表遷移過后通道的數(shù)量(下游新并行度)
      • Si 代表存儲在索引為i的算子的狀態(tài)
      • H 代表流分區(qū)器生成器缩膝,根據(jù)通道數(shù)量生成流分區(qū)器(在此處使用了一致性哈希)
    • 遷移算法實現(xiàn)
      • 算法分為兩個階段混狠,donate和collect
        • donate階段,在流分區(qū)器更新后疾层,在新的并行度下不再屬于這個算子的狀態(tài)(通過新的流分區(qū)方程獲取狀態(tài)屬于哪個算子)會被收集起來将饺,放入一個存儲介質(zhì)。
        • 在donate和collect階段之間有一個vertical barrier來確保在collect階段開使前所有的算子已完成donate操作
        • collect階段,每個算子將其他算子donate的屬于自己的狀態(tài)取回
        • horizontal barrier 用來防止collect完成前splitter發(fā)送新的數(shù)據(jù)到這個算子俯逾,出現(xiàn)數(shù)據(jù)不一致的情況
      • 遷移算法實現(xiàn)
  • 流分區(qū)器生成
    • 如果使用一個簡單的流分區(qū)器生成器贸桶,比如使用一個哈希方程根據(jù)通道數(shù)量取模,可能會導(dǎo)致大量的狀態(tài)在各個算子之間遷移桌肴,成本會非常高
    • 在選擇流分區(qū)器時要考慮兩個因素
      • 一是平衡皇筛,數(shù)據(jù)能否能被比較平均的分配到各個通道
      • 二是單調(diào)性(monotonicity),即狀態(tài)不會被遷移到一個遷移前后都存在的算子
        • 簡單來說坠七,就是當(dāng)新增一個通道時水醋,過去存在的下游算子算子會將狀態(tài)遷移到這個新通道對應(yīng)的算子,而不會互相遷移彪置。
        • 當(dāng)減少一個通道時拄踪,被減少的這個通道下游算子的狀態(tài)只會被分配到其他過去存在的下游算子,而其他算子之間并不會遷移狀態(tài)拳魁。
    • 解決方案惶桐,一致性哈希
      • O(1)的時間復(fù)雜度
      • 每個partition對應(yīng)一個環(huán)上的多個點,而數(shù)據(jù)經(jīng)過哈希方程之后最靠近哪個點就會被分配到哪個partition
        • 在一個環(huán)上新增一個點潘懊,只會從相鄰的兩個點分流數(shù)據(jù)
        • 在一個環(huán)上減少一個點姚糊,之前經(jīng)過這個點的數(shù)據(jù)只會被分配到相鄰的兩個點
        • 一致性哈希
      • 使用這樣一個哈希方程可以同時滿足兩個要求,并且計算本身的代價也是非常低的授舟。
      • 具體一致性哈希的實現(xiàn)這篇文章不會涉及救恨,有相關(guān)的論文

控制算法

  • 監(jiān)控指標(biāo)
    • Congestion Index(阻塞標(biāo)志)
      • 本質(zhì)是測量背壓(消息阻塞)時間的比例是否超出了閾值
      • 監(jiān)控的意義
        • 如果背壓時間過多,說明消費能力不夠释树,需要增加輸出通道(Channel)肠槽,也意味著下游需要增加并行度
        • 如果不存在背壓,說明可能資源超出實際需要奢啥,需要減少輸出通道(Channel)秸仙,也意味著下游需要降低并行度
        • 該指標(biāo)的變化可以反映工作量的變化
    • Throughput 吞吐量
      • 監(jiān)控的意義
        • 當(dāng)增加通道(并發(fā))后,吞吐量是操作是否有效最重要的檢驗標(biāo)準(zhǔn)
        • 該指標(biāo)的變化可以直觀反映工作量的變化
  • 算法基本原則
    • 內(nèi)容
      • P1 如果存在阻塞(Congestion Index)桩盲,查詢歷史記錄筋栋,如果有記錄顯示增加通道(提高下游并行度)并不能增加吞吐量,則不變正驻。其余情況增加并行度。
      • P2 如果沒有阻塞(Congestion Index)抢腐,查詢歷史記錄姑曙,如果有記錄顯示減少通道(降低下游并行度)會導(dǎo)致阻塞(Congestion)出現(xiàn),則不變迈倍。其余情況降低并行度伤靠。
    • 目的
      • P1來保證SASO中的Acurracy, P2來保證避免SASO中的Overshoot
      • 通過查詢歷史記錄來判斷是否改變并行度,保證SASO中的Stability,防止震蕩
      • 最終到達(dá)一個最優(yōu)的并行度
  • 補(bǔ)充
    • 針對情況: 并行度會最終達(dá)到一個穩(wěn)定值宴合,這個值確保在當(dāng)前并行度下不會出現(xiàn)阻塞標(biāo)志(Congestion Index)焕梅,但是任何小于該并行度都會導(dǎo)致出現(xiàn)阻塞標(biāo)志(Congestion Index)。 可是工作量(數(shù)據(jù)流量)本身處于波動卦洽,必須有機(jī)制去適應(yīng)這種波動贞言。
    • 內(nèi)容:如何適應(yīng)工作量(數(shù)據(jù)流量)的變化(在并行度已經(jīng)達(dá)到一個穩(wěn)定值的情況下)
      • P3 如果觀測到阻塞標(biāo)志(Congestion Index),即背壓時間增加阀蒂,意味著工作量(數(shù)據(jù)流量)的增加该窗,則需要忘記歷史記錄,并按照P1要求增加并行度
      • P4 如果觀測到吞吐降低蚤霞,則意味著資源過剩酗失,則需要忘記歷史記錄,并按照P2降低并行度
  • 優(yōu)化
    • 針對情況1: Remote Congestion(遠(yuǎn)程阻塞)昧绣,即阻塞(Congestion)不是由下游引起的规肴,而是由下游的下游引起的,增加下游并行度并沒有效果夜畴。并且一個流處理應(yīng)用也會有一個擴(kuò)展的上限拖刃,這是由無法并行執(zhí)行的部分決定的(比如上圖中的Sink)。
    • 內(nèi)容: P5 如果提高并行度并不能提高吞吐斩启,則降低并行度
    • 針對情況2: 在資源和工作量都很高的情況下序调,該算法可能要很長時間才能找到最優(yōu)的并行度,如何降低SASO中的Settling time也是需要考慮的因素之一
    • 內(nèi)容: P6 Rapid Scaling: 每次擴(kuò)容或縮容的時候不是一個并行度一個并行度來兔簇,而是定義一個Level发绢,這個Level和并行度呈超線性關(guān)系。一個比較合理的公式如下垄琐。
  • 流程圖
    • 最后整個流程如下圖所示边酒,算法也是根據(jù)這個流程圖實現(xiàn)的。
    • 流程圖
  • 算法實現(xiàn)
    • 變量含義
      • P 當(dāng)前階段(可以理解為auto scaling是定期觸發(fā)狸窘,兩次觸發(fā)之間形成一個階段)
      • L 當(dāng)前所處Level(Rapid Scaling當(dāng)中的Level)
      • 以下變量角標(biāo)均表示Level
        • Pi 意味著上最后一次處于這個Level時所處的的階段
          • 如下圖是一個Level隨階段變化的折線圖
          • 從當(dāng)下來看墩朦,P0 = 1, P1 = 2, P2 = 4, P3 = 8, P4 = 6
          • 階段與Level
        • Ci 意味著最后一次處于這個Level時的是否阻塞
        • 意味著最后一次處于這個level是的吞吐

        • 意味著最后一次觀察到有連續(xù)的階段處于這個level時的吞吐量(比較拗口)

      • L*代表level的最大值
    • 變量初始化
      • 變量初始化
    • 算法主體
      • 核心就在于根據(jù)監(jiān)控數(shù)據(jù)獲取最優(yōu)的通道數(shù)(下游并行度)
      • 首先根據(jù)阻塞和吞吐獲取工作量(流量)是否變化
        • 對應(yīng)P3,P4翻擒,決定是否要忘掉歷史記錄
        • 如果工作量減少氓涣,則清除所有當(dāng)前Level以下的阻塞標(biāo)記Ci和吞吐量Ti-| (i < L)
        • 如果工作量增加,則清除所有當(dāng)前Level以上的阻塞標(biāo)記Ci 和吞吐量Ti-| (L < i < L*)
      • 接下來更新當(dāng)前Level的變量
      • 之后判斷是否存在遠(yuǎn)程阻塞陋气,對應(yīng)P5劳吠,如果存在則降低Level
      • 如果不存在遠(yuǎn)程阻塞,但是當(dāng)前存在阻塞巩趁,對應(yīng)P1
        • 并且有歷史記錄表明提升Level能提升吞吐(沒有歷史記錄的話TL+1 -| 為無窮大痒玩,一定大于當(dāng)前吞吐),則提升Level(增加通道,即增加并行度)
      • 如果不存在遠(yuǎn)程阻塞蠢古,當(dāng)前也不存在阻塞实幕,對應(yīng)P2
        • 并且有歷史記錄表明降低Level不會產(chǎn)生阻塞(沒有歷史記錄的話CL-1 為False楞件,標(biāo)識不會阻塞),則降低Level(減少通道,即降低并行度)
      • 最后根據(jù)通道數(shù)與Level的共識來決定通道數(shù)(下游并行度)
      • 算法主體
    • 通過阻塞和吞吐判斷工作量變化的具體實現(xiàn)
      • 通過阻塞判斷工作量變化
        • 因為兩個階段之間Level變?yōu)樽疃酁?拴测,所以分為三種情況
          • 如果當(dāng)前階段Level等于上一階段船庇,并且阻塞情況從不阻塞到阻塞蒿赢,則代表工作量(流量)增加涤妒,從阻塞到不阻塞,則代表工作量(流量)減少
          • 如果當(dāng)前階段Level比上一階段小一層践啄,如果上一階段阻塞浇雹,當(dāng)前不阻塞則代表工作量(流量)降低
          • 如果當(dāng)前階段Level比上一階段高一層,如果上一階段不阻塞屿讽,當(dāng)前阻塞則代表工作量(流量)增加
          • 其余情況返回Unknown
        • 通過阻塞判斷工作量變化
      • 通過吞吐判斷工作量變化
        • 與阻塞相同昭灵,還是分三種情況
          • 如果當(dāng)前階段Level等于上一階段,先判斷吞吐是否有變化伐谈,再判斷吞吐變化是否超出了閾值烂完,這個閾值與TL+1 |- 相關(guān)(這個值不會被遺忘,只會被更新)诵棵,如果都滿足抠蚣,根據(jù)吞吐量增減返回工作量(流量)的增減
          • 如果當(dāng)前階段Level小于上一階段,并且吞吐量上升了履澳,則代表工作量(流量)上升
          • 如果當(dāng)前階段Level高于上一階段嘶窄,并且吞吐量下降了,則代表工作量(流量)下降
          • 其余情況返回Unknown
        • 通過吞吐判斷工作量變化

思考

  • 狀態(tài)遷移中距贷,運(yùn)行機(jī)制必須能從一個partition鐘獲取一個子集的key的狀態(tài)柄冲,遷移時對于借助類似于level db存儲狀態(tài)的系統(tǒng)來說可能不是那么友好(不確定)?可以通過更粗粒度的狀態(tài)遷移來克服忠蝗?
  • 狀態(tài)遷移中现横,這兩個引入的barrier能否通過異步來做?
  • 使用一致性哈希阁最,確實比較均衡戒祠,但是背壓常常是由數(shù)據(jù)傾斜帶來的,能否將背壓的通道一分為二速种?

引用

Bu?ra Gedik ; Scott Schneider ; Martin Hirzel ; Kun-Lung Wu "Elastic Scaling for Data Stream Processing"

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末姜盈,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子哟旗,更是在濱河造成了極大的恐慌,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,682評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件闸餐,死亡現(xiàn)場離奇詭異饱亮,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)舍沙,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,277評論 3 395
  • 文/潘曉璐 我一進(jìn)店門近上,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人拂铡,你說我怎么就攤上這事壹无。” “怎么了感帅?”我有些...
    開封第一講書人閱讀 165,083評論 0 355
  • 文/不壞的土叔 我叫張陵斗锭,是天一觀的道長。 經(jīng)常有香客問我失球,道長岖是,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,763評論 1 295
  • 正文 為了忘掉前任实苞,我火速辦了婚禮豺撑,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘黔牵。我一直安慰自己聪轿,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,785評論 6 392
  • 文/花漫 我一把揭開白布猾浦。 她就那樣靜靜地躺著陆错,像睡著了一般。 火紅的嫁衣襯著肌膚如雪跃巡。 梳的紋絲不亂的頭發(fā)上危号,一...
    開封第一講書人閱讀 51,624評論 1 305
  • 那天,我揣著相機(jī)與錄音素邪,去河邊找鬼外莲。 笑死,一個胖子當(dāng)著我的面吹牛兔朦,可吹牛的內(nèi)容都是我干的偷线。 我是一名探鬼主播,決...
    沈念sama閱讀 40,358評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼沽甥,長吁一口氣:“原來是場噩夢啊……” “哼声邦!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起摆舟,我...
    開封第一講書人閱讀 39,261評論 0 276
  • 序言:老撾萬榮一對情侶失蹤亥曹,失蹤者是張志新(化名)和其女友劉穎邓了,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體媳瞪,經(jīng)...
    沈念sama閱讀 45,722評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡骗炉,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了蛇受。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片句葵。...
    茶點故事閱讀 40,030評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖兢仰,靈堂內(nèi)的尸體忽然破棺而出乍丈,到底是詐尸還是另有隱情,我是刑警寧澤把将,帶...
    沈念sama閱讀 35,737評論 5 346
  • 正文 年R本政府宣布轻专,位于F島的核電站,受9級特大地震影響秸弛,放射性物質(zhì)發(fā)生泄漏铭若。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,360評論 3 330
  • 文/蒙蒙 一递览、第九天 我趴在偏房一處隱蔽的房頂上張望叼屠。 院中可真熱鬧,春花似錦绞铃、人聲如沸镜雨。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,941評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽荚坞。三九已至,卻和暖如春菲盾,著一層夾襖步出監(jiān)牢的瞬間颓影,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,057評論 1 270
  • 我被黑心中介騙來泰國打工懒鉴, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留诡挂,地道東北人。 一個月前我還...
    沈念sama閱讀 48,237評論 3 371
  • 正文 我出身青樓临谱,卻偏偏與公主長得像璃俗,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子悉默,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,976評論 2 355

推薦閱讀更多精彩內(nèi)容

  • feisky云計算城豁、虛擬化與Linux技術(shù)筆記posts - 1014, comments - 298, trac...
    不排版閱讀 3,855評論 0 5
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)抄课,斷路器唱星,智...
    卡卡羅2017閱讀 134,659評論 18 139
  • ORA-00001: 違反唯一約束條件 (.) 錯誤說明:當(dāng)在唯一索引所對應(yīng)的列上鍵入重復(fù)值時雳旅,會觸發(fā)此異常。 O...
    我想起個好名字閱讀 5,320評論 0 9
  • 關(guān)于Mongodb的全面總結(jié) MongoDB的內(nèi)部構(gòu)造《MongoDB The Definitive Guide》...
    中v中閱讀 31,938評論 2 89
  • 1. 基礎(chǔ)知識 1.1间聊、 基本概念岭辣、 功能 馮諾伊曼體系結(jié)構(gòu)1、計算機(jī)處理的數(shù)據(jù)和指令一律用二進(jìn)制數(shù)表示2甸饱、順序執(zhí)...
    yunpiao閱讀 5,313評論 1 22