Hystrix技術(shù)解析

一、認(rèn)識Hystrix

Hystrix是Netflix開源的一款容錯框架舟茶,包含常用的容錯方法:線程池隔離、信號量隔離堵第、熔斷吧凉、降級回退。在高并發(fā)訪問下踏志,系統(tǒng)所依賴的服務(wù)的穩(wěn)定性對系統(tǒng)的影響非常大阀捅,依賴有很多不可控的因素,比如網(wǎng)絡(luò)連接變慢针余,資源突然繁忙饲鄙,暫時不可用,服務(wù)脫機(jī)等圆雁。我們要構(gòu)建穩(wěn)定傍妒、可靠的分布式系統(tǒng),就必須要有這樣一套容錯方法摸柄。
本文將逐一分析線程池隔離颤练、信號量隔離、熔斷、降級回退這四種技術(shù)的原理與實(shí)踐嗦玖。

二患雇、線程隔離

2.1為什么要做線程隔離

比如我們現(xiàn)在有3個業(yè)務(wù)調(diào)用分別是查詢訂單、查詢商品宇挫、查詢用戶苛吱,且這三個業(yè)務(wù)請求都是依賴第三方服務(wù)-訂單服務(wù)、商品服務(wù)器瘪、用戶服務(wù)翠储。三個服務(wù)均是通過RPC調(diào)用。當(dāng)查詢訂單服務(wù)橡疼,假如線程阻塞了援所,這個時候后續(xù)有大量的查詢訂單請求過來,那么容器中的線程數(shù)量則會持續(xù)增加直致CPU資源耗盡到100%欣除,整個服務(wù)對外不可用住拭,集群環(huán)境下就是雪崩。如下圖


訂單服務(wù)不可用.png

整個tomcat容器不可用.png
2.2历帚、線程隔離-線程池
2.2.1滔岳、Hystrix是如何通過線程池實(shí)現(xiàn)線程隔離的

Hystrix通過命令模式,將每個類型的業(yè)務(wù)請求封裝成對應(yīng)的命令請求挽牢,比如查詢訂單->訂單Command谱煤,查詢商品->商品Command,查詢用戶->用戶Command禽拔。每個類型的Command對應(yīng)一個線程池刘离。創(chuàng)建好的線程池是被放入到ConcurrentHashMap中,比如查詢訂單:

final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();
threadPools.put(“hystrix-order”, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));

當(dāng)?shù)诙尾樵冇唵握埱筮^來的時候奏赘,則可以直接從Map中獲取該線程池寥闪。具體流程如下圖:

hystrix線程執(zhí)行過程和異步化.png

創(chuàng)建線程池中的線程的方法太惠,查看源代碼如下:

public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize, HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
    ThreadFactory threadFactory = null;
    if (!PlatformSpecific.isAppEngineStandardEnvironment()) {
        threadFactory = new ThreadFactory() {
            protected final AtomicInteger threadNumber = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r, "hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet());
                thread.setDaemon(true);
                return thread;
            }

        };
    } else {
        threadFactory = PlatformSpecific.getAppEngineThreadFactory();
    }

    final int dynamicCoreSize = corePoolSize.get();
    final int dynamicMaximumSize = maximumPoolSize.get();

    if (dynamicCoreSize > dynamicMaximumSize) {
        logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
                dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ".  Maximum size will be set to " +
                dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
        return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime.get(), unit, workQueue, threadFactory);
    } else {
        return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime.get(), unit, workQueue, threadFactory);
    }
}

執(zhí)行Command的方式一共四種,直接看官方文檔(https://github.com/Netflix/Hystrix/wiki/How-it-Works)凿渊,具體區(qū)別如下:

  • execute():以同步堵塞方式執(zhí)行run()弹澎。調(diào)用execute()后躺涝,hystrix先創(chuàng)建一個新線程運(yùn)行run()格仲,接著調(diào)用程序要在execute()調(diào)用處一直堵塞著容诬,直到run()運(yùn)行完成流礁。

  • queue():以異步非堵塞方式執(zhí)行run()。調(diào)用queue()就直接返回一個Future對象滔驶,同時hystrix創(chuàng)建一個新線程運(yùn)行run()遇革,調(diào)用程序通過Future.get()拿到run()的返回結(jié)果,而Future.get()是堵塞執(zhí)行的瓜浸。

  • observe():事件注冊前執(zhí)行run()/construct()澳淑。第一步是事件注冊前比原,先調(diào)用observe()自動觸發(fā)執(zhí)行run()/construct()(如果繼承的是HystrixCommand插佛,hystrix將創(chuàng)建新線程非堵塞執(zhí)行run();如果繼承的是HystrixObservableCommand量窘,將以調(diào)用程序線程堵塞執(zhí)行construct())雇寇,第二步是從observe()返回后調(diào)用程序調(diào)用subscribe()完成事件注冊,如果run()/construct()執(zhí)行成功則觸發(fā)onNext()和onCompleted(),如果執(zhí)行異常則觸發(fā)onError()锨侯。

  • toObservable():事件注冊后執(zhí)行run()/construct()嫩海。第一步是事件注冊前,調(diào)用toObservable()就直接返回一個Observable<String>對象囚痴,第二步調(diào)用subscribe()完成事件注冊后自動觸發(fā)執(zhí)行run()/construct()(如果繼承的是HystrixCommand叁怪,hystrix將創(chuàng)建新線程非堵塞執(zhí)行run(),調(diào)用程序不必等待run()深滚;如果繼承的是HystrixObservableCommand奕谭,將以調(diào)用程序線程堵塞執(zhí)行construct(),調(diào)用程序等待construct()執(zhí)行完才能繼續(xù)往下走)痴荐,如果run()/construct()執(zhí)行成功則觸發(fā)onNext()和onCompleted()血柳,如果執(zhí)行異常則觸發(fā)onError()
    注:
    execute()和queue()是HystrixCommand中的方法,observe()和toObservable()是HystrixObservableCommand 中的方法生兆。從底層實(shí)現(xiàn)來講难捌,HystrixCommand其實(shí)也是利用Observable實(shí)現(xiàn)的(如果我們看Hystrix的源碼的話,可以發(fā)現(xiàn)里面大量使用了RxJava)鸦难,雖然HystrixCommand只返回單個的結(jié)果根吁,但HystrixCommand的queue方法實(shí)際上是調(diào)用了toObservable().toBlocking().toFuture(),而execute方法實(shí)際上是調(diào)用了queue().get()合蔽。

2.2.2婴栽、如何應(yīng)用到實(shí)際代碼中
package myHystrix.threadpool;

import com.netflix.hystrix.*;
import org.junit.Test;

import java.util.List;
import java.util.concurrent.Future;

/**
 * Created by wangxindong on 2017/8/4.
 */
public class GetOrderCommand extends HystrixCommand<List> {

    OrderService orderService;

    public GetOrderCommand(String name){
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ThreadPoolTestGroup"))
                .andCommandKey(HystrixCommandKey.Factory.asKey("testCommandKey"))
                .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(name))
                .andCommandPropertiesDefaults(
                        HystrixCommandProperties.Setter()
                                .withExecutionTimeoutInMilliseconds(5000)
                )
                .andThreadPoolPropertiesDefaults(
                        HystrixThreadPoolProperties.Setter()
                                .withMaxQueueSize(10)   //配置隊(duì)列大小
                                .withCoreSize(2)    // 配置線程池里的線程數(shù)
                )
        );
    }

    @Override
    protected List run() throws Exception {
        return orderService.getOrderList();
    }

    public static class UnitTest {
        @Test
        public void testGetOrder(){
//            new GetOrderCommand("hystrix-order").execute();
            Future<List> future =new GetOrderCommand("hystrix-order").queue();
        }

    }
}

2.2.3、線程隔離-線程池小結(jié)

執(zhí)行依賴代碼的線程與請求線程(比如Tomcat線程)分離辈末,請求線程可以自由控制離開的時間愚争,這也是我們通常說的異步編程,Hystrix是結(jié)合RxJava來實(shí)現(xiàn)的異步編程挤聘。通過設(shè)置線程池大小來控制并發(fā)訪問量轰枝,當(dāng)線程飽和的時候可以拒絕服務(wù),防止依賴問題擴(kuò)散组去。

線程隔離.png

線程池隔離的優(yōu)點(diǎn):
[1]:應(yīng)用程序會被完全保護(hù)起來鞍陨,即使依賴的一個服務(wù)的線程池滿了,也不會影響到應(yīng)用程序的其他部分从隆。
[2]:我們給應(yīng)用程序引入一個新的風(fēng)險較低的客戶端lib的時候诚撵,如果發(fā)生問題,也是在本lib中键闺,并不會影響到其他內(nèi)容寿烟,因此我們可以大膽的引入新lib庫。
[3]:當(dāng)依賴的一個失敗的服務(wù)恢復(fù)正常時辛燥,應(yīng)用程序會立即恢復(fù)正常的性能筛武。
[4]:如果我們的應(yīng)用程序一些參數(shù)配置錯誤了缝其,線程池的運(yùn)行狀況將會很快顯示出來,比如延遲徘六、超時内边、拒絕等。同時可以通過動態(tài)屬性實(shí)時執(zhí)行來處理糾正錯誤的參數(shù)配置待锈。
[5]:如果服務(wù)的性能有變化漠其,從而需要調(diào)整,比如增加或者減少超時時間竿音,更改重試次數(shù)辉懒,就可以通過線程池指標(biāo)動態(tài)屬性修改,而且不會影響到其他調(diào)用請求谍失。
[6]:除了隔離優(yōu)勢外眶俩,hystrix擁有專門的線程池可提供內(nèi)置的并發(fā)功能,使得可以在同步調(diào)用之上構(gòu)建異步的外觀模式快鱼,這樣就可以很方便的做異步編程(Hystrix引入了Rxjava異步框架)颠印。

盡管線程池提供了線程隔離,我們的客戶端底層代碼也必須要有超時設(shè)置抹竹,不能無限制的阻塞以致線程池一直飽和线罕。

線程池隔離的缺點(diǎn):
[1]:線程池的主要缺點(diǎn)就是它增加了計算的開銷,每個業(yè)務(wù)請求(被包裝成命令)在執(zhí)行的時候窃判,會涉及到請求排隊(duì)钞楼,調(diào)度和上下文切換。不過Netflix公司內(nèi)部認(rèn)為線程隔離開銷足夠小袄琳,不會產(chǎn)生重大的成本或性能的影響询件。

The Netflix API processes 10+ billion Hystrix Command executions per day using thread isolation. Each API instance has 40+ thread-pools with 5–20 threads in each (most are set to 10).
Netflix API每天使用線程隔離處理10億次Hystrix Command執(zhí)行。 每個API實(shí)例都有40多個線程池唆樊,每個線程池中有5-20個線程(大多數(shù)設(shè)置為10個)宛琅。

對于不依賴網(wǎng)絡(luò)訪問的服務(wù),比如只依賴內(nèi)存緩存這種情況下逗旁,就不適合用線程池隔離技術(shù)嘿辟,而是采用信號量隔離。

2.3片效、線程隔離-信號量红伦。

2.3.1、線程池和信號量的區(qū)別

上面談到了線程池的缺點(diǎn)淀衣,當(dāng)我們依賴的服務(wù)是極低延遲的昙读,比如訪問內(nèi)存緩存,就沒有必要使用線程池的方式舌缤,那樣的話開銷得不償失箕戳,而是推薦使用信號量這種方式。下面這張圖說明了線程池隔離和信號量隔離的主要區(qū)別:線程池方式下業(yè)務(wù)請求線程和執(zhí)行依賴的服務(wù)的線程不是同一個線程国撵;信號量方式下業(yè)務(wù)請求線程和執(zhí)行依賴服務(wù)的線程是同一個線程

信號量和線程池的區(qū)別.png

2.3.2陵吸、如何使用信號量來隔離線程

將屬性execution.isolation.strategy設(shè)置為SEMAPHORE ,象這樣 ExecutionIsolationStrategy.SEMAPHORE介牙,則Hystrix使用信號量而不是默認(rèn)的線程池來做隔離壮虫。

public class CommandUsingSemaphoreIsolation extends HystrixCommand<String> {

    private final int id;

    public CommandUsingSemaphoreIsolation(int id) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
                // since we're doing work in the run() method that doesn't involve network traffic
                // and executes very fast with low risk we choose SEMAPHORE isolation
                .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                        .withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)));
        this.id = id;
    }

    @Override
    protected String run() {
        // a real implementation would retrieve data from in memory data structure
        // or some other similar non-network involved work
        return "ValueFromHashMap_" + id;
    }

}

2.3.4、線程隔離-信號量小結(jié)

信號量隔離的方式是限制了總的并發(fā)數(shù)环础,每一次請求過來囚似,請求線程和調(diào)用依賴服務(wù)的線程是同一個線程,那么如果不涉及遠(yuǎn)程RPC調(diào)用(沒有網(wǎng)絡(luò)開銷)則使用信號量來隔離线得,更為輕量饶唤,開銷更小。

三贯钩、熔斷

3.1募狂、熔斷器(Circuit Breaker)介紹

熔斷器,現(xiàn)實(shí)生活中有一個很好的類比角雷,就是家庭電路中都會安裝一個保險盒祸穷,當(dāng)電流過大的時候保險盒里面的保險絲會自動斷掉,來保護(hù)家里的各種電器及電路勺三。Hystrix中的熔斷器(Circuit Breaker)也是起到這樣的作用雷滚,Hystrix在運(yùn)行過程中會向每個commandKey對應(yīng)的熔斷器報告成功、失敗吗坚、超時和拒絕的狀態(tài)祈远,熔斷器維護(hù)計算統(tǒng)計的數(shù)據(jù),根據(jù)這些統(tǒng)計的信息來確定熔斷器是否打開商源。如果打開绊含,后續(xù)的請求都會被截斷。然后會隔一段時間默認(rèn)是5s炊汹,嘗試半開躬充,放入一部分流量請求進(jìn)來,相當(dāng)于對依賴服務(wù)進(jìn)行一次健康檢查讨便,如果恢復(fù)充甚,熔斷器關(guān)閉,隨后完全恢復(fù)調(diào)用霸褒。如下圖:

熔斷器開關(guān)圖.png

說明伴找,上面說的commandKey,就是在初始化的時候設(shè)置的andCommandKey(HystrixCommandKey.Factory.asKey("testCommandKey"))

再來看下熔斷器在整個Hystrix流程圖中的位置废菱,從步驟4開始技矮,如下圖:

Hystrix流程圖.png

Hystrix會檢查Circuit Breaker的狀態(tài)抖誉。如果Circuit Breaker的狀態(tài)為開啟狀態(tài),Hystrix將不會執(zhí)行對應(yīng)指令衰倦,而是直接進(jìn)入失敗處理狀態(tài)(圖中8 Fallback)袒炉。如果Circuit Breaker的狀態(tài)為關(guān)閉狀態(tài),Hystrix會繼續(xù)進(jìn)行線程池樊零、任務(wù)隊(duì)列我磁、信號量的檢查(圖中5)

3.2、如何使用熔斷器(Circuit Breaker)

由于Hystrix是一個容錯框架驻襟,因此我們在使用的時候夺艰,要達(dá)到熔斷的目的只需配置一些參數(shù)就可以了。但我們要達(dá)到真正的效果沉衣,就必須要了解這些參數(shù)郁副。Circuit Breaker一共包括如下6個參數(shù)。
1豌习、circuitBreaker.enabled
是否啟用熔斷器霞势,默認(rèn)是TURE。
2斑鸦、circuitBreaker.forceOpen
熔斷器強(qiáng)制打開愕贡,始終保持打開狀態(tài)。默認(rèn)值FLASE巷屿。
3固以、circuitBreaker.forceClosed
熔斷器強(qiáng)制關(guān)閉,始終保持關(guān)閉狀態(tài)嘱巾。默認(rèn)值FLASE憨琳。
4、circuitBreaker.errorThresholdPercentage
設(shè)定錯誤百分比旬昭,默認(rèn)值50%篙螟,例如一段時間(10s)內(nèi)有100個請求,其中有55個超時或者異常返回了问拘,那么這段時間內(nèi)的錯誤百分比是55%遍略,大于了默認(rèn)值50%,這種情況下觸發(fā)熔斷器-打開骤坐。
5绪杏、circuitBreaker.requestVolumeThreshold
默認(rèn)值20.意思是至少有20個請求才進(jìn)行errorThresholdPercentage錯誤百分比計算。比如一段時間(10s)內(nèi)有19個請求全部失敗了纽绍。錯誤百分比是100%蕾久,但熔斷器不會打開,因?yàn)閞equestVolumeThreshold的值是20. 這個參數(shù)非常重要拌夏,熔斷器是否打開首先要滿足這個條件僧著,源代碼如下

// check if we are past the statisticalWindowVolumeThreshold
if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
    // we are not past the minimum volume threshold for the statisticalWindow so we'll return false immediately and not calculate anything
    return false;
}

if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
    return false;
}

6履因、circuitBreaker.sleepWindowInMilliseconds
半開試探休眠時間,默認(rèn)值5000ms盹愚。當(dāng)熔斷器開啟一段時間之后比如5000ms栅迄,會嘗試放過去一部分流量進(jìn)行試探,確定依賴服務(wù)是否恢復(fù)杯拐。

測試代碼(模擬10次調(diào)用霞篡,錯誤百分比為5%的情況下世蔗,打開熔斷器開關(guān)端逼。):

package myHystrix.threadpool;

import com.netflix.hystrix.*;
import org.junit.Test;

import java.util.Random;

/**
 * Created by wangxindong on 2017/8/15.
 */
public class GetOrderCircuitBreakerCommand extends HystrixCommand<String> {

    public GetOrderCircuitBreakerCommand(String name){
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ThreadPoolTestGroup"))
                .andCommandKey(HystrixCommandKey.Factory.asKey("testCommandKey"))
                .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(name))
                .andCommandPropertiesDefaults(
                        HystrixCommandProperties.Setter()
                                .withCircuitBreakerEnabled(true)//默認(rèn)是true,本例中為了展現(xiàn)該參數(shù)
                                .withCircuitBreakerForceOpen(false)//默認(rèn)是false污淋,本例中為了展現(xiàn)該參數(shù)
                                .withCircuitBreakerForceClosed(false)//默認(rèn)是false顶滩,本例中為了展現(xiàn)該參數(shù)
                                .withCircuitBreakerErrorThresholdPercentage(5)//(1)錯誤百分比超過5%
                                .withCircuitBreakerRequestVolumeThreshold(10)//(2)10s以內(nèi)調(diào)用次數(shù)10次,同時滿足(1)(2)熔斷器打開
                                .withCircuitBreakerSleepWindowInMilliseconds(5000)//隔5s之后寸爆,熔斷器會嘗試半開(關(guān)閉)礁鲁,重新放進(jìn)來請求
//                                .withExecutionTimeoutInMilliseconds(1000)
                )
                .andThreadPoolPropertiesDefaults(
                        HystrixThreadPoolProperties.Setter()
                                .withMaxQueueSize(10)   //配置隊(duì)列大小
                                .withCoreSize(2)    // 配置線程池里的線程數(shù)
                )
        );
    }

    @Override
    protected String run() throws Exception {
        Random rand = new Random();
        //模擬錯誤百分比(方式比較粗魯?shù)梢宰C明問題)
        if(1==rand.nextInt(2)){
//            System.out.println("make exception");
            throw new Exception("make exception");
        }
        return "running:  ";
    }

    @Override
    protected String getFallback() {
//        System.out.println("FAILBACK");
        return "fallback: ";
    }

    public static class UnitTest{

        @Test
        public void testCircuitBreaker() throws Exception{
            for(int i=0;i<25;i++){
                Thread.sleep(500);
                HystrixCommand<String> command = new GetOrderCircuitBreakerCommand("testCircuitBreaker");
                String result = command.execute();
                //本例子中從第11次,熔斷器開始打開
                System.out.println("call times:"+(i+1)+"   result:"+result +" isCircuitBreakerOpen: "+command.isCircuitBreakerOpen());
                //本例子中5s以后赁豆,熔斷器嘗試關(guān)閉仅醇,放開新的請求進(jìn)來
            }
        }
    }
}

測試結(jié)果:

call times:1 result:fallback: isCircuitBreakerOpen: false
call times:2 result:running: isCircuitBreakerOpen: false
call times:3 result:running: isCircuitBreakerOpen: false
call times:4 result:fallback: isCircuitBreakerOpen: false
call times:5 result:running: isCircuitBreakerOpen: false
call times:6 result:fallback: isCircuitBreakerOpen: false
call times:7 result:fallback: isCircuitBreakerOpen: false
call times:8 result:fallback: isCircuitBreakerOpen: false
call times:9 result:fallback: isCircuitBreakerOpen: false
call times:10 result:fallback: isCircuitBreakerOpen: false
熔斷器打開
call times:11 result:fallback: isCircuitBreakerOpen: true
call times:12 result:fallback: isCircuitBreakerOpen: true
call times:13 result:fallback: isCircuitBreakerOpen: true
call times:14 result:fallback: isCircuitBreakerOpen: true
call times:15 result:fallback: isCircuitBreakerOpen: true
call times:16 result:fallback: isCircuitBreakerOpen: true
call times:17 result:fallback: isCircuitBreakerOpen: true
call times:18 result:fallback: isCircuitBreakerOpen: true
call times:19 result:fallback: isCircuitBreakerOpen: true
call times:20 result:fallback: isCircuitBreakerOpen: true
5s后熔斷器關(guān)閉
call times:21 result:running: isCircuitBreakerOpen: false
call times:22 result:running: isCircuitBreakerOpen: false
call times:23 result:fallback: isCircuitBreakerOpen: false
call times:24 result:running: isCircuitBreakerOpen: false
call times:25 result:running: isCircuitBreakerOpen: false

3.3、熔斷器(Circuit Breaker)源代碼HystrixCircuitBreaker.java分析
HystrixCircuitBreaker.java.png

Factory 是一個工廠類魔种,提供HystrixCircuitBreaker實(shí)例

public static class Factory {
        //用一個ConcurrentHashMap來保存HystrixCircuitBreaker對象
        private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();
        
//Hystrix首先會檢查ConcurrentHashMap中有沒有對應(yīng)的緩存的斷路器析二,如果有的話直接返回。如果沒有的話就會新創(chuàng)建一個HystrixCircuitBreaker實(shí)例节预,將其添加到緩存中并且返回
        public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
            
            HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name());
            if (previouslyCached != null) {
                return previouslyCached;
            }

            
            HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics));
            if (cbForCommand == null) {
                return circuitBreakersByCommand.get(key.name());
            } else {
                return cbForCommand;
            }
        }

        
        public static HystrixCircuitBreaker getInstance(HystrixCommandKey key) {
            return circuitBreakersByCommand.get(key.name());
        }

        static void reset() {
            circuitBreakersByCommand.clear();
        }
}

HystrixCircuitBreakerImpl是HystrixCircuitBreaker的實(shí)現(xiàn)叶摄,allowRequest()、isOpen()安拟、markSuccess()都會在HystrixCircuitBreakerImpl有默認(rèn)的實(shí)現(xiàn)蛤吓。

static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
        private final HystrixCommandProperties properties;
        private final HystrixCommandMetrics metrics;

        /* 變量circuitOpen來代表斷路器的狀態(tài),默認(rèn)是關(guān)閉 */
        private AtomicBoolean circuitOpen = new AtomicBoolean(false);

        /* 變量circuitOpenedOrLastTestedTime記錄著斷路恢復(fù)計時器的初始時間糠赦,用于Open狀態(tài)向Close狀態(tài)的轉(zhuǎn)換 */
        private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong();

        protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
            this.properties = properties;
            this.metrics = metrics;
        }

        /*用于關(guān)閉熔斷器并重置統(tǒng)計數(shù)據(jù)*/
        public void markSuccess() {
            if (circuitOpen.get()) {
                if (circuitOpen.compareAndSet(true, false)) {
                    //win the thread race to reset metrics
                    //Unsubscribe from the current stream to reset the health counts stream.  This only affects the health counts view,
                    //and all other metric consumers are unaffected by the reset
                    metrics.resetStream();
                }
            }
        }

        @Override
        public boolean allowRequest() {
            //是否設(shè)置強(qiáng)制開啟
            if (properties.circuitBreakerForceOpen().get()) {
                return false;
            }
            if (properties.circuitBreakerForceClosed().get()) {//是否設(shè)置強(qiáng)制關(guān)閉
                isOpen();
                // properties have asked us to ignore errors so we will ignore the results of isOpen and just allow all traffic through
                return true;
            }
            return !isOpen() || allowSingleTest();
        }

        public boolean allowSingleTest() {
            long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
            //獲取熔斷恢復(fù)計時器記錄的初始時間circuitOpenedOrLastTestedTime会傲,然后判斷以下兩個條件是否同時滿足:
            // 1) 熔斷器的狀態(tài)為開啟狀態(tài)(circuitOpen.get() == true)
            // 2) 當(dāng)前時間與計時器初始時間之差大于計時器閾值circuitBreakerSleepWindowInMilliseconds(默認(rèn)為 5 秒)
            //如果同時滿足的話,表示可以從Open狀態(tài)向Close狀態(tài)轉(zhuǎn)換拙泽。Hystrix會通過CAS操作將circuitOpenedOrLastTestedTime設(shè)為當(dāng)前時間唆铐,并返回true。如果不同時滿足奔滑,返回false艾岂,代表熔斷器關(guān)閉或者計時器時間未到。
            if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
                // We push the 'circuitOpenedTime' ahead by 'sleepWindow' since we have allowed one request to try.
                // If it succeeds the circuit will be closed, otherwise another singleTest will be allowed at the end of the 'sleepWindow'.
                if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
                    // if this returns true that means we set the time so we'll return true to allow the singleTest
                    // if it returned false it means another thread raced us and allowed the singleTest before we did
                    return true;
                }
            }
            return false;
        }

        @Override
        public boolean isOpen() {
            if (circuitOpen.get()) {//獲取斷路器的狀態(tài)
                // if we're open we immediately return true and don't bother attempting to 'close' ourself as that is left to allowSingleTest and a subsequent successful test to close
                return true;
            }

            // Metrics數(shù)據(jù)中獲取HealthCounts對象
            HealthCounts health = metrics.getHealthCounts();

            // 檢查對應(yīng)的請求總數(shù)(totalCount)是否小于屬性中的請求容量閾值circuitBreakerRequestVolumeThreshold,默認(rèn)20朋其,如果是的話表示熔斷器可以保持關(guān)閉狀態(tài)王浴,返回false
            if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
                
                return false;
            }

            //不滿足請求總數(shù)條件脆炎,就再檢查錯誤比率(errorPercentage)是否小于屬性中的錯誤百分比閾值(circuitBreakerErrorThresholdPercentage,默認(rèn) 50),如果是的話表示斷路器可以保持關(guān)閉狀態(tài)氓辣,返回 false
            if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
                return false;
            } else {
                // 如果超過閾值秒裕,Hystrix會判定服務(wù)的某些地方出現(xiàn)了問題,因此通過CAS操作將斷路器設(shè)為開啟狀態(tài)钞啸,并記錄此時的系統(tǒng)時間作為定時器初始時間几蜻,最后返回 true
                if (circuitOpen.compareAndSet(false, true)) {
                    circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
                    return true;
                } else {
                    return true;
                }
            }
        }

    }
3.4、熔斷器小結(jié)

每個熔斷器默認(rèn)維護(hù)10個bucket,每秒一個bucket,每個blucket記錄成功,失敗,超時,拒絕的狀態(tài)体斩,默認(rèn)錯誤超過50%且10秒內(nèi)超過20個請求進(jìn)行中斷攔截梭稚。下圖顯示HystrixCommand或HystrixObservableCommand如何與HystrixCircuitBreaker及其邏輯和決策流程進(jìn)行交互,包括計數(shù)器在斷路器中的行為絮吵。

四弧烤、回退降級

4.1、降級

所謂降級蹬敲,就是指在在Hystrix執(zhí)行非核心鏈路功能失敗的情況下暇昂,我們?nèi)绾翁幚恚热缥覀兎祷啬J(rèn)值等伴嗡。如果我們要回退或者降級處理急波,代碼上需要實(shí)現(xiàn)HystrixCommand.getFallback()方法或者是HystrixObservableCommand. HystrixObservableCommand()。

public class CommandHelloFailure extends HystrixCommand<String> {

    private final String name;

    public CommandHelloFailure(String name) {
        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
        this.name = name;
    }

    @Override
    protected String run() {
        throw new RuntimeException("this command always fails");
    }

    @Override
    protected String getFallback() {
        return "Hello Failure " + name + "!";
    }
}
4.2瘪校、Hystrix的降級回退方式

Hystrix一共有如下幾種降級回退模式:

4.2.1澄暮、Fail Fast 快速失敗
 @Override
    protected String run() {
        if (throwException) {
            throw new RuntimeException("failure from CommandThatFailsFast");
        } else {
            return "success";
        }
    }

如果我們實(shí)現(xiàn)的是HystrixObservableCommand.java則 重寫 resumeWithFallback方法

@Override
    protected Observable<String> resumeWithFallback() {
        if (throwException) {
            return Observable.error(new Throwable("failure from CommandThatFailsFast"));
        } else {
            return Observable.just("success");
        }
    }
4.2.2、Fail Silent 無聲失敗

返回null渣淤,空Map赏寇,空List

fail silent.png
@Override
    protected String getFallback() {
        return null;
    }
@Override
    protected List<String> getFallback() {
        return Collections.emptyList();
    }
@Override
    protected Observable<String> resumeWithFallback() {
        return Observable.empty();
    }
4.2.3、Fallback: Static 返回默認(rèn)值

回退的時候返回靜態(tài)嵌入代碼中的默認(rèn)值价认,這樣就不會導(dǎo)致功能以Fail Silent的方式被清楚嗅定,也就是用戶看不到任何功能了。而是按照一個默認(rèn)的方式顯示用踩。

@Override
    protected Boolean getFallback() {
        return true;
    }
@Override
    protected Observable<Boolean> resumeWithFallback() {
        return Observable.just( true );
    }
4.2.4渠退、Fallback: Stubbed 自己組裝一個值返回

當(dāng)我們執(zhí)行返回的結(jié)果是一個包含多個字段的對象時,則會以Stubbed 的方式回退脐彩。Stubbed 值我們建議在實(shí)例化Command的時候就設(shè)置好一個值碎乃。以countryCodeFromGeoLookup為例,countryCodeFromGeoLookup的值惠奸,是在我們調(diào)用的時候就注冊進(jìn)來初始化好的梅誓。CommandWithStubbedFallback command = new CommandWithStubbedFallback(1234, "china");主要代碼如下:

public class CommandWithStubbedFallback extends HystrixCommand<UserAccount> {

protected CommandWithStubbedFallback(int customerId, String countryCodeFromGeoLookup) {
        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
        this.customerId = customerId;
        this.countryCodeFromGeoLookup = countryCodeFromGeoLookup;
    }
    @Override
    protected UserAccount getFallback() {
        /**
         * Return stubbed fallback with some static defaults, placeholders,
         * and an injected value 'countryCodeFromGeoLookup' that we'll use
         * instead of what we would have retrieved from the remote service.
         */
        return new UserAccount(customerId, "Unknown Name",
                countryCodeFromGeoLookup, true, true, false);
    }
4.2.5、Fallback: Cache via Network 利用遠(yuǎn)程緩存

通過遠(yuǎn)程緩存的方式。在失敗的情況下再發(fā)起一次remote請求梗掰,不過這次請求的是一個緩存比如redis嵌言。由于是又發(fā)起一起遠(yuǎn)程調(diào)用,所以會重新封裝一次Command及穗,這個時候要注意摧茴,執(zhí)行fallback的線程一定要跟主線程區(qū)分開,也就是重新命名一個ThreadPoolKey埂陆。

Cache via Network.png
public class CommandWithFallbackViaNetwork extends HystrixCommand<String> {
    private final int id;

    protected CommandWithFallbackViaNetwork(int id) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RemoteServiceX"))
                .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueCommand")));
        this.id = id;
    }

    @Override
    protected String run() {
        //        RemoteServiceXClient.getValue(id);
        throw new RuntimeException("force failure for example");
    }

    @Override
    protected String getFallback() {
        return new FallbackViaNetwork(id).execute();
    }

    private static class FallbackViaNetwork extends HystrixCommand<String> {
        private final int id;

        public FallbackViaNetwork(int id) {
            super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RemoteServiceX"))
                    .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueFallbackCommand"))
                    // use a different threadpool for the fallback command
                    // so saturating the RemoteServiceX pool won't prevent
                    // fallbacks from executing
                    .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("RemoteServiceXFallback")));
            this.id = id;
        }

        @Override
        protected String run() {
            MemCacheClient.getValue(id);
        }

        @Override
        protected String getFallback() {
            // the fallback also failed
            // so this fallback-of-a-fallback will 
            // fail silently and return null
            return null;
        }
    }
}
4.2.6苛白、Primary + Secondary with Fallback 主次方式回退(主要和次要)

這個有點(diǎn)類似我們?nèi)粘i_發(fā)中需要上線一個新功能,但為了防止新功能上線失敗可以回退到老的代碼焚虱,我們會做一個開關(guān)比如使用zookeeper做一個配置開關(guān)购裙,可以動態(tài)切換到老代碼功能。那么Hystrix它是使用通過一個配置來在兩個command中進(jìn)行切換著摔。

Primary + Secondary with Fallback.png
/**
 * Sample {@link HystrixCommand} pattern using a semaphore-isolated command
 * that conditionally invokes thread-isolated commands.
 */
public class CommandFacadeWithPrimarySecondary extends HystrixCommand<String> {

    private final static DynamicBooleanProperty usePrimary = DynamicPropertyFactory.getInstance().getBooleanProperty("primarySecondary.usePrimary", true);

    private final int id;

    public CommandFacadeWithPrimarySecondary(int id) {
        super(Setter
                .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX"))
                .andCommandKey(HystrixCommandKey.Factory.asKey("PrimarySecondaryCommand"))
                .andCommandPropertiesDefaults(
                        // we want to default to semaphore-isolation since this wraps
                        // 2 others commands that are already thread isolated
                        // 采用信號量的隔離方式
                        HystrixCommandProperties.Setter()
                                .withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)));
        this.id = id;
    }

    //通過DynamicPropertyFactory來路由到不同的command
    @Override
    protected String run() {
        if (usePrimary.get()) {
            return new PrimaryCommand(id).execute();
        } else {
            return new SecondaryCommand(id).execute();
        }
    }

    @Override
    protected String getFallback() {
        return "static-fallback-" + id;
    }

    @Override
    protected String getCacheKey() {
        return String.valueOf(id);
    }

    private static class PrimaryCommand extends HystrixCommand<String> {

        private final int id;

        private PrimaryCommand(int id) {
            super(Setter
                    .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX"))
                    .andCommandKey(HystrixCommandKey.Factory.asKey("PrimaryCommand"))
                    .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("PrimaryCommand"))
                    .andCommandPropertiesDefaults(
                            // we default to a 600ms timeout for primary
                            HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(600)));
            this.id = id;
        }

        @Override
        protected String run() {
            // perform expensive 'primary' service call
            return "responseFromPrimary-" + id;
        }

    }

    private static class SecondaryCommand extends HystrixCommand<String> {

        private final int id;

        private SecondaryCommand(int id) {
            super(Setter
                    .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX"))
                    .andCommandKey(HystrixCommandKey.Factory.asKey("SecondaryCommand"))
                    .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("SecondaryCommand"))
                    .andCommandPropertiesDefaults(
                            // we default to a 100ms timeout for secondary
                            HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(100)));
            this.id = id;
        }

        @Override
        protected String run() {
            // perform fast 'secondary' service call
            return "responseFromSecondary-" + id;
        }

    }

    public static class UnitTest {

        @Test
        public void testPrimary() {
            HystrixRequestContext context = HystrixRequestContext.initializeContext();
            try {
                //將屬性"primarySecondary.usePrimary"設(shè)置為true缓窜,則走PrimaryCommand定续;設(shè)置為false谍咆,則走SecondaryCommand
                ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", true);
                assertEquals("responseFromPrimary-20", new CommandFacadeWithPrimarySecondary(20).execute());
            } finally {
                context.shutdown();
                ConfigurationManager.getConfigInstance().clear();
            }
        }

        @Test
        public void testSecondary() {
            HystrixRequestContext context = HystrixRequestContext.initializeContext();
            try {
                //將屬性"primarySecondary.usePrimary"設(shè)置為true,則走PrimaryCommand私股;設(shè)置為false摹察,則走SecondaryCommand
                ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", false);
                assertEquals("responseFromSecondary-20", new CommandFacadeWithPrimarySecondary(20).execute());
            } finally {
                context.shutdown();
                ConfigurationManager.getConfigInstance().clear();
            }
        }
    }
}
4.3、回退降級小結(jié)

降級的處理方式倡鲸,返回默認(rèn)值供嚎,返回緩存里面的值(包括遠(yuǎn)程緩存比如redis和本地緩存比如jvmcache)。
但回退的處理方式也有不適合的場景:
1峭状、寫操作
2克滴、批處理
3、計算
以上幾種情況如果失敗优床,則程序就要將錯誤返回給調(diào)用者劝赔。

總結(jié)

Hystrix為我們提供了一套線上系統(tǒng)容錯的技術(shù)實(shí)踐方法,我們通過在系統(tǒng)中引入Hystrix的jar包可以很方便的使用線程隔離胆敞、熔斷着帽、回退等技術(shù)。同時它還提供了監(jiān)控頁面配置移层,方便我們管理查看每個接口的調(diào)用情況仍翰。像spring cloud這種微服務(wù)構(gòu)建模式中也引入了Hystrix,我們可以放心使用Hystrix的線程隔離技術(shù)观话,來防止雪崩這種可怕的致命性線上故障予借。

轉(zhuǎn)載請注明出處,并附上鏈接 http://www.reibang.com/p/3e11ac385c73

參考資料:
https://github.com/Netflix/Hystrix/wiki
《億級流量網(wǎng)站架構(gòu)核心技術(shù)》一書

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市灵迫,隨后出現(xiàn)的幾起案子喧笔,更是在濱河造成了極大的恐慌,老刑警劉巖龟再,帶你破解...
    沈念sama閱讀 206,968評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件书闸,死亡現(xiàn)場離奇詭異,居然都是意外死亡利凑,警方通過查閱死者的電腦和手機(jī)浆劲,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來哀澈,“玉大人牌借,你說我怎么就攤上這事「畎矗” “怎么了膨报?”我有些...
    開封第一講書人閱讀 153,220評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長适荣。 經(jīng)常有香客問我现柠,道長,這世上最難降的妖魔是什么弛矛? 我笑而不...
    開封第一講書人閱讀 55,416評論 1 279
  • 正文 為了忘掉前任够吩,我火速辦了婚禮,結(jié)果婚禮上丈氓,老公的妹妹穿的比我還像新娘周循。我一直安慰自己,他們只是感情好万俗,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,425評論 5 374
  • 文/花漫 我一把揭開白布湾笛。 她就那樣靜靜地躺著,像睡著了一般闰歪。 火紅的嫁衣襯著肌膚如雪嚎研。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,144評論 1 285
  • 那天课竣,我揣著相機(jī)與錄音嘉赎,去河邊找鬼。 笑死于樟,一個胖子當(dāng)著我的面吹牛公条,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播迂曲,決...
    沈念sama閱讀 38,432評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼靶橱,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起关霸,我...
    開封第一講書人閱讀 37,088評論 0 261
  • 序言:老撾萬榮一對情侶失蹤煌妈,失蹤者是張志新(化名)和其女友劉穎沃琅,沒想到半個月后熄阻,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體跪另,經(jīng)...
    沈念sama閱讀 43,586評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,028評論 2 325
  • 正文 我和宋清朗相戀三年佳遣,在試婚紗的時候發(fā)現(xiàn)自己被綠了识埋。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,137評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡零渐,死狀恐怖窒舟,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情诵盼,我是刑警寧澤惠豺,帶...
    沈念sama閱讀 33,783評論 4 324
  • 正文 年R本政府宣布,位于F島的核電站风宁,受9級特大地震影響洁墙,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜杀糯,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,343評論 3 307
  • 文/蒙蒙 一扫俺、第九天 我趴在偏房一處隱蔽的房頂上張望苍苞。 院中可真熱鬧固翰,春花似錦、人聲如沸羹呵。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,333評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽冈欢。三九已至歉铝,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間凑耻,已是汗流浹背太示。 一陣腳步聲響...
    開封第一講書人閱讀 31,559評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留香浩,地道東北人类缤。 一個月前我還...
    沈念sama閱讀 45,595評論 2 355
  • 正文 我出身青樓,卻偏偏與公主長得像邻吭,于是被迫代替她去往敵國和親餐弱。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,901評論 2 345

推薦閱讀更多精彩內(nèi)容