上文中介紹了Hystrix的由來余指,本文會深入分析Hystrix的執(zhí)行過程捕犬。
Hystrix的大部分邏輯基于RxJava跷坝,其實現(xiàn)讓很熱多人望而卻步酵镜,停留在了僅僅使用的地步,從一個簡單的HelloWorld開始柴钻。
public class CommandHelloWorld extends HystrixCommand<String> {
private final String name;
public CommandHelloWorld(String name) {
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.name = name;
}
@Override
protected String run() {
// 省略業(yè)務(wù)邏輯
// 該方法可能會拋出異常
return "Hello " + name + "!";
}
@Override
protected String getFallback() {
return "Hello Failure " + name + "!";
}
}
通過簡單的實現(xiàn)run方法和getFallback 方法淮韭,CommandHelloWorld
具備了熔斷降級的功能,其中HystrixCommand
提供了4個方法:execute()贴届,queue()靠粪,observe(),toObservable()毫蚓,平時只需要關(guān)注execute和queue即可占键。
- queue():
異步調(diào)用,返回一個Future對象元潘,后面可以通過Future獲取結(jié)果畔乙。 - execute()
同步調(diào)用,調(diào)用后直接block住翩概,直到依賴服務(wù)返回結(jié)果牲距,或者拋出異常。
public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}
其實同步方式是通過直接執(zhí)行Future的get方法進行實現(xiàn)的钥庇,這里需要知道這個返回的Future對象到底是什么牍鞠?
通過實現(xiàn)可以發(fā)現(xiàn),返回的Futrue對象只是對toObservable
返回結(jié)果的封裝代理评姨,把注意力轉(zhuǎn)移到toObservable
方法的實現(xiàn)难述。
第一次看到這個方法的實現(xiàn),也許會很崩潰,方法內(nèi)部初始化了一系列的Action0對象胁后,這是RxJava的內(nèi)部對象硫眯,可以看作是訂閱一個事件后的回調(diào)。
一開始择同,不要太過在意這些Action两入,不然會陷入迷失,在debug的時候敲才,通過不斷的回調(diào)裹纳,經(jīng)常會不知身處何處。
直接看到toObservable
方法的return處紧武,又是一坨回調(diào)剃氧,崩潰。
如果不熟悉RxJava語法阻星,看這種代碼真心的累朋鞍,call方法的前面部分主要做兩件事。
1妥箕、記錄請求日志(日志功能開啟)
2滥酥、從緩存中返回結(jié)果(定義了cacheKey方法,并且緩存功能開啟)
如果緩存沒有開啟畦幢,或者返回null坎吻,那只能執(zhí)行正常邏輯從下游服務(wù)拿取數(shù)據(jù),這里通過上面定義的applyHystrixSemantics
Action進行回調(diào)宇葱,最終執(zhí)行的是applyHystrixSemantics
方法瘦真,這個方法才是精華所在,想調(diào)試的同學(xué)黍瞧,直接在這個方法入口打個斷點诸尽,事半功倍。
其中circuitBreaker.attemptExecution()
的返回結(jié)果印颤,決定了接下去是執(zhí)行正常邏輯您机、還是降級邏輯,這才是精華的精華膀哲。
看一下熔斷器的attemptExecution
方法往产,內(nèi)部涉及了多個開關(guān)
- forceOpen
強制開啟,所以請求都執(zhí)行降級邏輯 - forceClose
強制關(guān)閉某宪,所以請求都執(zhí)行正常邏輯 - circuitOpened
熔斷開關(guān)仿村,默認為-1,請求執(zhí)行正常邏輯兴喂,如果發(fā)生熔斷蔼囊,該值會被修改成0焚志,請求執(zhí)行降級邏輯 - HALF_OPEN
熔斷半開,即熔斷之后畏鼓,每隔一段會進行試探
個人覺得酱酬,這里的實現(xiàn)過于復(fù)雜。
如果沒有發(fā)生熔斷云矫,還有一道門檻膳沽,Hystrix提供了一個信號量限流器,限制進入熔斷器最大并發(fā)數(shù)让禀,可以控制請求下游的并發(fā)量挑社,如果超過這個閾值,會被降級處理巡揍,有效的保護下游服務(wù)不會被突發(fā)流量給攻擊痛阻。
通過的請求,繼續(xù)調(diào)用executeCommandAndObserve
方法腮敌,在該方法中阱当,又定義了一堆讓人迷惑的Action,不過這次通過名字和實現(xiàn)糜工,可以知道個大概弊添,先把這些Action放一邊,看看接下去會執(zhí)行的方法啤斗。
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
// 忽略一堆Action的定義
Observable<R> execution;
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
Hystrix內(nèi)部提供了超時檢查的機制表箭,如果參數(shù)executionTimeoutEnabled
開啟,則每次請求都會提交一個任務(wù)到線程池中延遲執(zhí)行钮莲。由于Hystrix實現(xiàn)中考慮的東西太多,所以在實現(xiàn)上還是很復(fù)雜彼水。
這里只給出了關(guān)鍵邏輯崔拥,如果配置了超時時間10ms,會提交一個延遲10ms執(zhí)行的任務(wù)凤覆,其中tick
方法會通過CAS機制保證超時狀態(tài)的變更链瓦,最終對應(yīng)command的會執(zhí)行onError方法,這里加入的HystrixContextRunnable
主要為了跨線程的上下文數(shù)據(jù)傳遞盯桦。
在執(zhí)行正常邏輯的實現(xiàn)中慈俯,Hystrix內(nèi)部提供了信號量、線程池兩種模式拥峦,默認使用線程池模式贴膘。在executeCommandWithSpecifiedIsolation
方法中,分別對這兩種模式進行了處理略号,而且可以根據(jù)參數(shù)隨時進行切換刑峡,這就是為什么線程池一開始就要初始化的原因洋闽,雖然有資源的消耗,但是帶來了更好的靈活性突梦,在需要的時候可以從信號量模式變成線程池模式進行隔離诫舅。
下面以信號量的方式為例,分析下如何執(zhí)行用戶自定義的run
方法.
又想吐槽Hystrix的代碼宫患,又是這種回調(diào)刊懈,在call實現(xiàn)中,暫時忽略前面的一大坨邏輯娃闲,跟進getUserExecutionObservable
方法俏讹。
繼續(xù)查看getExecutionObservable
方法,該方法在HystrixCommand
中被重寫實現(xiàn)畜吊。
run
方法終于執(zhí)行了泽疆。run
方法執(zhí)行之后,如果正常返回玲献、拋出異常殉疼、或者其它情況,都需要對應(yīng)的后續(xù)處理捌年,這時之前executeCommandAndObserve
方法中定義的Action瓢娜,就開始起作用了。
execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
markEmits
run方法正常返回時執(zhí)行礼预,主要記錄執(zhí)行耗時眠砾;觸發(fā)執(zhí)行成功的通知事件,可以通過擴展插件做更多事情托酸;如果當(dāng)前是熔斷狀態(tài)褒颈,則關(guān)閉熔斷。handleFallback
run方法發(fā)生異常時執(zhí)行励堡,最終執(zhí)行降級邏輯谷丸,但是整個過程實現(xiàn)還是很復(fù)雜的。
后續(xù)文章
Hystrix系列之自動熔斷和恢復(fù)
Hystrix系列之插件實現(xiàn)