前言
看到一篇14年的關(guān)于流處理系統(tǒng)的彈性縮擴(kuò)容的論文Elastic Scaling for Data Stream Processing额衙,覺得挺有啟發(fā)性史汗,于是就寫了一篇中文的解讀琼掠。文章主要關(guān)注與設(shè)計與算法實現(xiàn),關(guān)于benchmark的部分沒有涉及停撞。并不是論文翻譯瓷蛙,沒有做到很精準(zhǔn)的表達(dá),里面也算是我個人的理解戈毒,如果哪邊有錯誤歡迎指正艰猬。
概述
- 流處理應(yīng)用實際上就是由算子(點),數(shù)據(jù)流(邊)組成的一個有向圖埋市。要對這樣的應(yīng)用進(jìn)行彈性伸縮冠桃,本質(zhì)上就是改變這個圖映射到一組機(jī)器上的方式。
- 但是如何做到讓應(yīng)用開發(fā)者對此無感知的同時還能保證結(jié)果的正確性道宅?并且如何設(shè)計一個高效的食听,資源節(jié)省的Scaling機(jī)制胸蛛?
- 這套機(jī)制需要做到能根據(jù)工作量,還有可用的資源數(shù)量來動態(tài)的調(diào)節(jié)并行度樱报。
基本概念
- 如下圖就是一個由點(算子)和邊(數(shù)據(jù)流組成的)流處理應(yīng)用葬项。
- 點與點之間往往不止一對一的關(guān)系(窄依賴),還有一對多迹蛤,多對一民珍,多對多的關(guān)系。
- 實際上多對多就是由多個多對一組合而成
- 在一對多的關(guān)系中盗飒,我們需要一個Splitter嚷量,流分區(qū)器
- 在多對一的關(guān)系中,我們需要一個Merger箩兽,流歸并器(沒想到更好的翻譯)
- 包含Splitter和Merger的應(yīng)用
- 而在多對多的關(guān)系中津肛,上游存在Splitter,下游存在Merger
- 一個流經(jīng)過流分區(qū)器會被分割為多個流汗贫,在這兒身坐,我們把被分割的子流,叫做Channel落包,通道部蛇,對應(yīng)著圖上的邊。
需求
- 首先有兩個重要的需求
- 有狀態(tài)的算子在擴(kuò)容或者縮容時需要狀態(tài)遷移 =>
- 讓用戶無感知
- 遷移盡可能少的狀態(tài)
- 一個根據(jù)運(yùn)行時的情況來決定是否和如何和Scaling的控制算法
- 鑒于一個通用的流處理系統(tǒng)會有大量的用戶自定義的算子咐蝇,基于代價的優(yōu)化可行性不高
- 這套算法應(yīng)該基于運(yùn)行時的監(jiān)控數(shù)據(jù)
- 提供SASO特性
- Stability 不容易產(chǎn)生震蕩涯鲁,導(dǎo)致頻繁的增減并行度
- Accuracy 找到最大化吞吐的并行度
- Settling time 快速的到達(dá)穩(wěn)定的最優(yōu)并行度
- Overshoot 避免浪費資源,使用超過實際需要的并行度
- 有狀態(tài)的算子在擴(kuò)容或者縮容時需要狀態(tài)遷移 =>
解決思路
- 針對狀態(tài)
- 提供State的API有序,對用戶屏蔽狀態(tài)遷移的具體細(xì)節(jié)
- 一個增量(原文為增量抹腿,個人理解為部分)的狀態(tài)遷移協(xié)議,配合基于一致性哈希的流分區(qū)策略
- 針對控制算法
- 依賴的監(jiān)控數(shù)據(jù)
- Congestion Index旭寿,流分區(qū)器中的阻塞時間(簡單理解為反壓時間)
- Throughput 吞吐量
- 具體操作
- 基于監(jiān)控動態(tài)增減下游并行度來解決Acurracy和Overshoot問題
- 記錄歷史表現(xiàn)來保證Stability警绩,防止震蕩
- Rapid Scaling 快速擴(kuò)縮容來減少Settling time
- 依賴的監(jiān)控數(shù)據(jù)
狀態(tài)遷移
- 狀態(tài)管理
- 從狀態(tài)的角度來看算子可以分為無狀態(tài)的算子和分區(qū)狀態(tài)的算子(partitioned stateful)。
- 分區(qū)狀態(tài)的算子需要根據(jù)流分區(qū)器的變化來遷移狀態(tài)盅称,并且遷移的狀態(tài)是該分區(qū)的子集肩祥,所以要求系統(tǒng)能夠比較細(xì)粒度的掌控這些狀態(tài)(比如說keyBy這樣的流分區(qū)器,系統(tǒng)能遷移指定key的集合的狀態(tài))
- 狀態(tài)遷移協(xié)議
- 變量含義
- i 代表一個算子的某一個通道(分割的子流)的索引
- N 代表遷移過后通道的數(shù)量(下游新并行度)
- Si 代表存儲在索引為i的算子的狀態(tài)
- H 代表流分區(qū)器生成器缩膝,根據(jù)通道數(shù)量生成流分區(qū)器(在此處使用了一致性哈希)
- 遷移算法實現(xiàn)
- 算法分為兩個階段混狠,donate和collect
- donate階段,在流分區(qū)器更新后疾层,在新的并行度下不再屬于這個算子的狀態(tài)(通過新的流分區(qū)方程獲取狀態(tài)屬于哪個算子)會被收集起來将饺,放入一個存儲介質(zhì)。
- 在donate和collect階段之間有一個vertical barrier來確保在collect階段開使前所有的算子已完成donate操作
- collect階段,每個算子將其他算子donate的屬于自己的狀態(tài)取回
- horizontal barrier 用來防止collect完成前splitter發(fā)送新的數(shù)據(jù)到這個算子俯逾,出現(xiàn)數(shù)據(jù)不一致的情況
- 遷移算法實現(xiàn)
- 算法分為兩個階段混狠,donate和collect
- 變量含義
- 流分區(qū)器生成
- 如果使用一個簡單的流分區(qū)器生成器贸桶,比如使用一個哈希方程根據(jù)通道數(shù)量取模,可能會導(dǎo)致大量的狀態(tài)在各個算子之間遷移桌肴,成本會非常高
- 在選擇流分區(qū)器時要考慮兩個因素
- 一是平衡皇筛,數(shù)據(jù)能否能被比較平均的分配到各個通道
- 二是單調(diào)性(monotonicity),即狀態(tài)不會被遷移到一個遷移前后都存在的算子
- 簡單來說坠七,就是當(dāng)新增一個通道時水醋,過去存在的下游算子算子會將狀態(tài)遷移到這個新通道對應(yīng)的算子,而不會互相遷移彪置。
- 當(dāng)減少一個通道時拄踪,被減少的這個通道下游算子的狀態(tài)只會被分配到其他過去存在的下游算子,而其他算子之間并不會遷移狀態(tài)拳魁。
- 解決方案惶桐,一致性哈希
- O(1)的時間復(fù)雜度
- 每個partition對應(yīng)一個環(huán)上的多個點,而數(shù)據(jù)經(jīng)過哈希方程之后最靠近哪個點就會被分配到哪個partition
- 在一個環(huán)上新增一個點潘懊,只會從相鄰的兩個點分流數(shù)據(jù)
- 在一個環(huán)上減少一個點姚糊,之前經(jīng)過這個點的數(shù)據(jù)只會被分配到相鄰的兩個點
- 一致性哈希
- 使用這樣一個哈希方程可以同時滿足兩個要求,并且計算本身的代價也是非常低的授舟。
- 具體一致性哈希的實現(xiàn)這篇文章不會涉及救恨,有相關(guān)的論文
控制算法
- 監(jiān)控指標(biāo)
- Congestion Index(阻塞標(biāo)志)
- 本質(zhì)是測量背壓(消息阻塞)時間的比例是否超出了閾值
- 監(jiān)控的意義
- 如果背壓時間過多,說明消費能力不夠释树,需要增加輸出通道(Channel)肠槽,也意味著下游需要增加并行度
- 如果不存在背壓,說明可能資源超出實際需要奢啥,需要減少輸出通道(Channel)秸仙,也意味著下游需要降低并行度
- 該指標(biāo)的變化可以反映工作量的變化
- Throughput 吞吐量
- 監(jiān)控的意義
- 當(dāng)增加通道(并發(fā))后,吞吐量是操作是否有效最重要的檢驗標(biāo)準(zhǔn)
- 該指標(biāo)的變化可以直觀反映工作量的變化
- 監(jiān)控的意義
- Congestion Index(阻塞標(biāo)志)
- 算法基本原則
- 內(nèi)容
- P1 如果存在阻塞(Congestion Index)桩盲,查詢歷史記錄筋栋,如果有記錄顯示增加通道(提高下游并行度)并不能增加吞吐量,則不變正驻。其余情況增加并行度。
- P2 如果沒有阻塞(Congestion Index)抢腐,查詢歷史記錄姑曙,如果有記錄顯示減少通道(降低下游并行度)會導(dǎo)致阻塞(Congestion)出現(xiàn),則不變迈倍。其余情況降低并行度伤靠。
- 目的
- P1來保證SASO中的Acurracy, P2來保證避免SASO中的Overshoot
- 通過查詢歷史記錄來判斷是否改變并行度,保證SASO中的Stability,防止震蕩
- 最終到達(dá)一個最優(yōu)的并行度
- 內(nèi)容
- 補(bǔ)充
- 針對情況: 并行度會最終達(dá)到一個穩(wěn)定值宴合,這個值確保在當(dāng)前并行度下不會出現(xiàn)阻塞標(biāo)志(Congestion Index)焕梅,但是任何小于該并行度都會導(dǎo)致出現(xiàn)阻塞標(biāo)志(Congestion Index)。 可是工作量(數(shù)據(jù)流量)本身處于波動卦洽,必須有機(jī)制去適應(yīng)這種波動贞言。
- 內(nèi)容:如何適應(yīng)工作量(數(shù)據(jù)流量)的變化(在并行度已經(jīng)達(dá)到一個穩(wěn)定值的情況下)
- P3 如果觀測到阻塞標(biāo)志(Congestion Index),即背壓時間增加阀蒂,意味著工作量(數(shù)據(jù)流量)的增加该窗,則需要忘記歷史記錄,并按照P1要求增加并行度
- P4 如果觀測到吞吐降低蚤霞,則意味著資源過剩酗失,則需要忘記歷史記錄,并按照P2降低并行度
- 優(yōu)化
- 針對情況1: Remote Congestion(遠(yuǎn)程阻塞)昧绣,即阻塞(Congestion)不是由下游引起的规肴,而是由下游的下游引起的,增加下游并行度并沒有效果夜畴。并且一個流處理應(yīng)用也會有一個擴(kuò)展的上限拖刃,這是由無法并行執(zhí)行的部分決定的(比如上圖中的Sink)。
- 內(nèi)容: P5 如果提高并行度并不能提高吞吐斩启,則降低并行度
- 針對情況2: 在資源和工作量都很高的情況下序调,該算法可能要很長時間才能找到最優(yōu)的并行度,如何降低SASO中的Settling time也是需要考慮的因素之一
- 內(nèi)容: P6 Rapid Scaling: 每次擴(kuò)容或縮容的時候不是一個并行度一個并行度來兔簇,而是定義一個Level发绢,這個Level和并行度呈超線性關(guān)系。一個比較合理的公式如下垄琐。
- 流程圖
- 最后整個流程如下圖所示边酒,算法也是根據(jù)這個流程圖實現(xiàn)的。
- 流程圖
- 算法實現(xiàn)
- 變量含義
- P 當(dāng)前階段(可以理解為auto scaling是定期觸發(fā)狸窘,兩次觸發(fā)之間形成一個階段)
- L 當(dāng)前所處Level(Rapid Scaling當(dāng)中的Level)
- 以下變量角標(biāo)均表示Level
- Pi 意味著上最后一次處于這個Level時所處的的階段
- 如下圖是一個Level隨階段變化的折線圖
- 從當(dāng)下來看墩朦,P0 = 1, P1 = 2, P2 = 4, P3 = 8, P4 = 6
- 階段與Level
- Ci 意味著最后一次處于這個Level時的是否阻塞
-
意味著最后一次處于這個level是的吞吐
-
意味著最后一次觀察到有連續(xù)的階段處于這個level時的吞吐量(比較拗口)
- Pi 意味著上最后一次處于這個Level時所處的的階段
- L*代表level的最大值
- 變量初始化
- 變量初始化
- 算法主體
- 核心就在于根據(jù)監(jiān)控數(shù)據(jù)獲取最優(yōu)的通道數(shù)(下游并行度)
- 首先根據(jù)阻塞和吞吐獲取工作量(流量)是否變化
- 對應(yīng)P3,P4翻擒,決定是否要忘掉歷史記錄
- 如果工作量減少氓涣,則清除所有當(dāng)前Level以下的阻塞標(biāo)記Ci和吞吐量Ti-| (i < L)
- 如果工作量增加,則清除所有當(dāng)前Level以上的阻塞標(biāo)記Ci 和吞吐量Ti-| (L < i < L*)
- 接下來更新當(dāng)前Level的變量
- 之后判斷是否存在遠(yuǎn)程阻塞陋气,對應(yīng)P5劳吠,如果存在則降低Level
- 如果不存在遠(yuǎn)程阻塞,但是當(dāng)前存在阻塞巩趁,對應(yīng)P1
- 并且有歷史記錄表明提升Level能提升吞吐(沒有歷史記錄的話TL+1 -| 為無窮大痒玩,一定大于當(dāng)前吞吐),則提升Level(增加通道,即增加并行度)
- 如果不存在遠(yuǎn)程阻塞蠢古,當(dāng)前也不存在阻塞实幕,對應(yīng)P2
- 并且有歷史記錄表明降低Level不會產(chǎn)生阻塞(沒有歷史記錄的話CL-1 為False楞件,標(biāo)識不會阻塞),則降低Level(減少通道,即降低并行度)
- 最后根據(jù)通道數(shù)與Level的共識來決定通道數(shù)(下游并行度)
- 算法主體
- 通過阻塞和吞吐判斷工作量變化的具體實現(xiàn)
- 通過阻塞判斷工作量變化
- 因為兩個階段之間Level變?yōu)樽疃酁?拴测,所以分為三種情況
- 如果當(dāng)前階段Level等于上一階段船庇,并且阻塞情況從不阻塞到阻塞蒿赢,則代表工作量(流量)增加涤妒,從阻塞到不阻塞,則代表工作量(流量)減少
- 如果當(dāng)前階段Level比上一階段小一層践啄,如果上一階段阻塞浇雹,當(dāng)前不阻塞則代表工作量(流量)降低
- 如果當(dāng)前階段Level比上一階段高一層,如果上一階段不阻塞屿讽,當(dāng)前阻塞則代表工作量(流量)增加
- 其余情況返回Unknown
- 通過阻塞判斷工作量變化
- 因為兩個階段之間Level變?yōu)樽疃酁?拴测,所以分為三種情況
- 通過吞吐判斷工作量變化
- 與阻塞相同昭灵,還是分三種情況
- 如果當(dāng)前階段Level等于上一階段,先判斷吞吐是否有變化伐谈,再判斷吞吐變化是否超出了閾值烂完,這個閾值與TL+1 |- 相關(guān)(這個值不會被遺忘,只會被更新)诵棵,如果都滿足抠蚣,根據(jù)吞吐量增減返回工作量(流量)的增減
- 如果當(dāng)前階段Level小于上一階段,并且吞吐量上升了履澳,則代表工作量(流量)上升
- 如果當(dāng)前階段Level高于上一階段嘶窄,并且吞吐量下降了,則代表工作量(流量)下降
- 其余情況返回Unknown
- 通過吞吐判斷工作量變化
- 與阻塞相同昭灵,還是分三種情況
- 通過阻塞判斷工作量變化
- 變量含義
思考
- 狀態(tài)遷移中距贷,運(yùn)行機(jī)制必須能從一個partition鐘獲取一個子集的key的狀態(tài)柄冲,遷移時對于借助類似于level db存儲狀態(tài)的系統(tǒng)來說可能不是那么友好(不確定)?可以通過更粗粒度的狀態(tài)遷移來克服忠蝗?
- 狀態(tài)遷移中现横,這兩個引入的barrier能否通過異步來做?
- 使用一致性哈希阁最,確實比較均衡戒祠,但是背壓常常是由數(shù)據(jù)傾斜帶來的,能否將背壓的通道一分為二速种?
引用
Bu?ra Gedik ; Scott Schneider ; Martin Hirzel ; Kun-Lung Wu "Elastic Scaling for Data Stream Processing"