這幾天在看Hystrix的一些實現(xiàn)健民,里面大量運用了rxjava的原理谈息,將代碼簡化到了極致凳厢,對于有rxjava基礎的同學蜀踏,相信看懂Hystrix代碼并不是一件難事针余。我這篇文章主要是針對Hystrix Command執(zhí)行之后的一個數(shù)據流向以及熔斷機制做了一個梳理和總結梗摇,后續(xù)還會出對于Hystrix組裝command岳悟、超時機制愧驱、隔離機制等源代碼實現(xiàn)進行一個梳理和總結往踢。這篇文章權當做hystrix梳理的第一步腾誉,雖然感覺從代碼執(zhí)行順序上這篇文章不應該是第一個,但是從這一塊開始梳理我感覺可以更好的理解Hystrix中的消息流的一個概念(不得不感嘆,有的人寫代碼就是再創(chuàng)造利职,而我寫代碼可能就是為了工作吧趣效。。)
首先我們先了解一下Hystrix熔斷的一個基本原理:
當出現(xiàn)問題時猪贪,Hystrix會檢查一個一定時長(圖中為10s)的一個時間窗(window)跷敬,在這個時間窗內是否有足夠多的請求,如果有足夠多的請求热押,是否錯誤率已經達到閾值西傀,如果達到則啟動斷路器熔斷機制,這時再有請求過來就會直接到fallback路徑桶癣。在斷路器打開之后拥褂,會有一個sleep window(圖中為5s),每經過一個sleep window牙寞,當有請求過來的時候饺鹃,斷路器會放掉一個請求給remote 服務,讓它去試探下游服務是否已經恢復间雀,如果成功悔详,斷路器會恢復到正常狀態(tài),讓后續(xù)請求重新請求到remote 服務惹挟,否則伟端,保持熔斷狀態(tài)。
這里我們就會考慮匪煌,我們應該怎么去實現(xiàn)這樣一個window機制呢责蝠?通過ScheduledExecutorService和并發(fā)List以及累加器?我確實沒想到比較好的方法萎庭,當我看了Hystrix的文檔和實現(xiàn)之后霜医,恍然大悟。它通過一個叫 metrics.rollingStats.numBuckets的屬性驳规,標明我們的window需要被拆分到多少個bucket(桶)中肴敛,對于我們上圖的例子10s的window,我們設置5個桶的話吗购,每個桶就是2s的時長医男。我們針對每個桶統(tǒng)計桶內的請求的一個情況(成功or失敗),然后對于10s的一個時長window捻勉,我們只要組合連續(xù)的5個bucket就能得到一個window內的統(tǒng)計數(shù)據镀梭,就能做一個判斷,當有新的bucket來的時候踱启,我們在我們的短路器中只需要拋棄最老的bucket报账,把最新的bucket加進來研底,形成一個LinkedList,就能重新做統(tǒng)計了透罢,這樣也形成了一個滾動統(tǒng)計的計算模式榜晦。這樣的樣例很適合通過基于流和響應式的reactiveX框架來做,因此Hystrix也采用了RxJava來實現(xiàn)羽圃。
在了解了Hystrix熔斷的原理之后乾胶,我們上一個流程圖(跟上面這圖簡直沒法比,丑的想哭):
這是我自己整理的一個流程圖朽寞,途中黃色箭頭方向為數(shù)據流向方向识窿,其中橢圓框有包含關系是因為這幾個類存在繼承關系,即HealthCountsStream繼承自BucketRollingCountersStream愁憔,而BucketRollingCounterStream繼承自BucketCountersStream腕扶。這個圖比較清晰的展現(xiàn)了在一個command執(zhí)行完成之后孽拷,整個數(shù)據的流向吨掌。下面我將針對每一步的源代碼,做下說明脓恕。
1. 首先AbstractCommand(HystrixCommand的父類)會發(fā)起一個handleCommandEnd方法調用:
2. 在調用handleCommandEnd方法中膜宋,首先會取消掉timoutTimer,避免不必要的timout操作炼幔,接著形成一個executionResult秋茫,這個result可以看做包含了整個command運行周期所有信息,接著調用HystrixCommandMetrics的markCommandDone操作
3. HystrixCommandMetrics這個類本身的用途是作為所有Command的指標衡量的一個工具乃秀,在markCommandDone方法中直接調用了HystrixThreadEventStream的executionDone方法
4.在HystrixThreadEventStream中我們需要注意幾個東西:首先這個getInstance是從threadLocal中獲取一個HystrixThreadEventStream實例肛著,因此我們知道對于一個線程都會有自己獨立的HystrixThreadEventStream實例以及它自己的成員變量。
然后在executionDone方法中跺讯,會根據executionResult生成一個HystrixCommandCompletion事件枢贿,然后將它傳遞給一個叫wirteOnlyCommandCompletionSubject的實例成員變量。
在這個subject中通過doOnNext的callback最終把消息傳遞給了HystrixCommandCompletionStream
5. 這個HystrixCommandCompletionStream最終在哪里用到找起來比較麻煩刀脏,但是我通過find usage還是找到了他的用處:
在HealthCountsStream中使用了它作為構造函數(shù)參數(shù)傳入到super中局荚,一路跟蹤,我們找到了HealthCountsStream的源頭:
在BucketedCounterStream(HealthCountersStream終極父類)中愈污,對這個inputStream做了一些列變換耀态,首先通過window操作將HystrixCommandCompletionStream中所得到的事件以bucketSizeInMs的時長分割成了多個子塊,然后通過flatMap操作把他們reduce成了一個bucket的summary信息暂雹,并通過startWith操作將一段empty的summary列表作為了初始的消息流首装。
接著我們在BucketCounterStream的子類,也是HealthCounterStream的父類:BucketedRollingCounterStream中杭跪,我們看到了進一步的操作:
首先通過window操作將上一步處理好的以bucket為單位的消息流分割成以numBuckets為window長度簿盅,1個bucket為步長間隔的消息流(這里可能說的比較拗口挥下,后面會通過圖表進行解釋),再通過flatMap將每個消息轉換成以一個numBuckets長度的window內的summary信息桨醋。而flapMap中的reduceWindowToSummary函數(shù)棚瘟,就是HealthCountsStream在構造的時候傳入給父類的,因此喜最,在這一步flatMap之后偎蘸,得到的消息流就已經是HealthCountStream所指定的HealthCounts數(shù)據結構了。最終再通過observe方法把這個sourceStream暴露出去(中間有幾步不涉及數(shù)據變化瞬内,所有就不展開了)
6. 最終我們可以看到在HystrixCircuitBreak的默認實現(xiàn)中:
我們可以看到斷路器subscribe到了我們剛剛看到的HealthCountsStream迷雪,并且在onNext中針對每次發(fā)出的消息,通過判斷window中的請求數(shù)量和錯誤比例來控制了斷路器是否斷開的邏輯虫蝶。整個熔斷機制分析也就到了數(shù)據的終點章咧。
下面我們通過圖表對之前的兩個消息流的變換做下說明:
還有短路器斷開情況下的恢復機制,這里可以稍微說明下:
在之前的代碼中我們可以看到:在斷路器判定錯誤過多需要短路的時候會做以下操作:
首先會將狀態(tài)設置到OPEN能真, 然后在circuitOpened中設置當前的時間戳赁严,這個變量在后面sleep window試探操作時會用到。
在一個command執(zhí)行之前粉铐,我們會通過下列代碼進行執(zhí)行前操作(command的組裝預處理執(zhí)行過程我會在后面的文章中講解):
我們進入這個方法看:
我們可以看到疼约,在這個方法中,我們經過一些列的判斷蝙泼,最終查看現(xiàn)在距離上次斷路器斷開時間是否已經過去了一個sleepWindow程剥,如果是,則將status改為HALF_OPEN汤踏,即試探狀態(tài)织鲸,并且通過cas操作保證了只有第一個request能夠通過,后續(xù)request繼續(xù)回到fallback執(zhí)行路徑下溪胶。
隨著command的執(zhí)行搂擦,我們在它complete或者出錯的時候會有下列操作:
可以看到,當命令正常完成载荔,會調用斷路器的markSuccess操作盾饮,而異常到了fallback中則會執(zhí)行markNonSuccess
這里面的代碼就比較簡單了,當成功并且狀態(tài)為HALF_OPEN的時候懒熙,重置stream丘损,并且恢復斷路器狀態(tài)和circuitOpened到-1,如果失敗則設置當前時間到circuitOpened工扎,方便下一次sleepWindow操作徘钥。
下面是為了補充整個執(zhí)行順序、數(shù)據以及線程相關情況肢娘,我在一些關鍵節(jié)點上加了打印當前線程和數(shù)據的操作呈础,并執(zhí)行Hystrix core中本身的一些test例子舆驶,打出來的結果如下:
從上圖我們可以看到,本身Command的發(fā)起和HealthCounterStream的注冊以及Timeout的發(fā)起都在main線程中進行而钞,而command的run沙廉,直到BucketedCounterStream的一系列消息傳遞都發(fā)生在Hystrix為這個Command開辟的線程池中進行。而由于window操作符的關系臼节,BucketedRollingCounterStream撬陵、HealthCountsStream和HystrixCircuitBreaker都在RxJava本身的computation線程池中進行。
一點點自己的總結:
Hystrix乃至于netflix團隊對于RxJava這種響應式編程范式可以說已經運用的出神入化网缝,對于rxjava與目前傳統(tǒng)編程范式所涉及到中間的migrate問題巨税,Hystrix中都有比較好的實踐,像操作符的組合粉臊,Subject的應用等等草添,我也不禁覺得想學好rxjava,還是動手最重要扼仲。不過rxjava由于自身操作符lift的關系远寸,在調試的時候可能會比較麻煩,特別是在單步的時候會發(fā)現(xiàn)有很長的call stack出現(xiàn)犀盟,這也是對于以后開發(fā)和運維是一個比較大的挑戰(zhàn)而晒。我看下來的感覺是盡量用rxjava自己的api做到一個閉環(huán)蝇狼,把問題域集中在rxjava內部阅畴,不要引入外部的機制,是比較好的實踐方式迅耘。另外贱枣,對于我現(xiàn)在的工作,在一些場景中引入響應式編程可能也會讓較復雜的場景簡單化颤专。因為響應式編程的目的是要將問題和邏輯都集中在信息流中纽哥,通過訂閱與轉換甚至是切換執(zhí)行線程來完成業(yè)務邏輯和狀態(tài)遷移,在邏輯上保持一個比較簡單清晰的模型栖秕,并且很容易將業(yè)務進行分stage處理春塌。
回到Hystrix本身,其實熔斷的整個機制通過stream處理已經劃分的比較清晰了簇捍,每個階段會有自己的處理邏輯和轉換方式只壳,通過window來實現(xiàn)滾動統(tǒng)計也是非常巧妙。有一點我比較關心是最終寫入是通過一個全局的SerializableStream做的write操作暑塑。而熟悉rxjava的同學都知道吼句,這種序列化的stream是通過emitter-loop來保證的串行訪問,大致原理就是當有多個線程進入時事格,如果都沒有開始emit惕艳,那先到的那個會獲得權力emit搞隐,但這并不算完,后到的thread就把自己的event放置到同一個queue里面远搪,就可以撤了劣纲,而先到的那個在emit完自己的event之后,需要檢查當前queue是否有多的谁鳍,如果有那你就把多的都emit了吧味廊,誰讓你先來的?直到發(fā)現(xiàn)現(xiàn)在queue也空了棠耕,你就可以走了余佛。在這個期間如果也有thread進來一樣也是放到queue里面。這樣可以保證盡量不阻塞thread的同時保證串行窍荧,但是我一直都在思索辉巡,不會出現(xiàn)一個線程一直發(fā)而無法退出執(zhí)行自己后面任務的情況么?不過我后來也在想蕊退,就算出現(xiàn)也沒什么郊楣,因為在這個command自己的thread中,總需要有個thread來做這個事情瓤荔。只要自己保證執(zhí)行了onNext净蚤,處理了自己的請求,后面的就無所謂了输硝。不知道這樣的想法是否正確今瀑,可能需要做一定的測試來進行佐證。
第一篇關于Hystrix的大致就是這樣点把。歡迎有興趣的同學一起討論橘荠,中間有什么問題也希望大家能夠指正,多多交流郎逃,共同進步哥童!