Hystrix源碼解析

Hystrix簡(jiǎn)介

??在分布式系統(tǒng)中,難免有對(duì)外部接口的依賴痊项,而外部接口有可能出現(xiàn)響應(yīng)緩慢痛垛,大量請(qǐng)求超時(shí)燃乍,大量訪問(wèn)出現(xiàn)異常等情況。出現(xiàn)上面所說(shuō)的情況有可能是由很多原因?qū)е频呐赡苁蔷W(wǎng)絡(luò)抖動(dòng),外部系統(tǒng)有沒(méi)有測(cè)出的bug,系統(tǒng)遭遇黑客攻擊等吼驶。因?yàn)橐粋€(gè)接口的異常,有可能導(dǎo)制線程阻塞店煞,影響到其它接口的服務(wù)蟹演,甚至整個(gè)系統(tǒng)的服務(wù)給拖跨,對(duì)外部系統(tǒng)依賴的模塊越多顷蟀,出現(xiàn)的風(fēng)險(xiǎn)也就會(huì)越高酒请,Hystrix正是用于解決這樣的問(wèn)題。Hystrix同樣是Netflix公司開(kāi)源的用于解決分布式問(wèn)題而開(kāi)源的框架鸣个。源碼網(wǎng)址為:https://github.com/Netflix/Hystrix羞反。Hystrix提供了如下幾種解決方案應(yīng)對(duì)上面說(shuō)的問(wèn)題,分別為:

  • 線程池隔離
  • 信號(hào)量隔離
  • 熔斷
  • 降級(jí)回退

Hystrix 版的 Hello World

  • 在pom.xml文件里引入Hystrix依賴的類
   <dependencies>
       <dependency>
           <groupId>com.netflix.hystrix</groupId>
           <artifactId>hystrix-core</artifactId>
           <version>1.5.13</version>
       </dependency>
   </dependencies>
  • 編寫(xiě)業(yè)務(wù)Command
package com.ivan.client.hystrix;

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;

public class HelloCommand extends HystrixCommand<String> {

   protected HelloCommand() {
       super(HystrixCommandGroupKey.Factory.asKey("test"));
   }

   @Override
   protected String run() throws Exception {
       //模擬請(qǐng)求外部接口需要的時(shí)間長(zhǎng)度
       Thread.sleep(500);
       return "sucess";
   }
   
   @Override
   protected String getFallback() {
     //當(dāng)外部請(qǐng)求超時(shí)后囤萤,會(huì)執(zhí)行fallback里的業(yè)務(wù)邏輯
     System.out.println("執(zhí)行了回退方法");
     return "error";
   }

}
  • 模擬系統(tǒng)調(diào)用
package com.ivan.client.hystrix;

public class App {
    public static void main(String[] args) {
        HelloCommand command = new HelloCommand();
        String result = command.execute();
        System.out.println(result);
    }
}

當(dāng)我們?cè)龃?HelloCommand run方法里Thread.sleep()方法的時(shí)長(zhǎng)時(shí)昼窗,我們可以看到 command.execute()方法調(diào)用返回了error。在實(shí)際的使用中涛舍,當(dāng)發(fā)現(xiàn)第三方接口調(diào)用不通的情況下澄惊,會(huì)調(diào)用fallback方法進(jìn)行降級(jí)處理,比如可以返回一段錯(cuò)誤提示富雅。

Hystrix線程池隔離

?? 在分布式的系統(tǒng)里掸驱,系統(tǒng)可能對(duì)多個(gè)外部系統(tǒng)都有依賴關(guān)系,比同訂單系統(tǒng)同時(shí)對(duì)會(huì)員系統(tǒng)没佑,庫(kù)存系統(tǒng)統(tǒng)毕贼,優(yōu)惠券系統(tǒng)都有依賴。假如優(yōu)惠券系統(tǒng)出現(xiàn)訪問(wèn)異常的時(shí)候蛤奢,會(huì)超成線程的堆積鬼癣,對(duì)于系統(tǒng)調(diào)用庫(kù)存系統(tǒng)與會(huì)員系統(tǒng)的業(yè)務(wù)也不可用。而通過(guò)線程池能夠?qū)⒉煌臉I(yè)務(wù)由不同的線程池處理远剩,從而做到保護(hù)其它業(yè)務(wù)能夠正常訪問(wèn)扣溺。下面就來(lái)看看Hystrix是根據(jù)什么來(lái)創(chuàng)建線程的。

  • 找到HystrixCommand的父類AbstractCommand瓜晤, 里面有個(gè)構(gòu)造方法锥余,從構(gòu)造方法可以看出里這里定義了 threadPool對(duì)象。代碼如下痢掠,關(guān)鍵代碼都有做相應(yīng)的注釋
/**
這個(gè)方法是AbstractCommand的構(gòu)造方法驱犹,里面用于初使化AbstractCommand嘲恍,包括circuitBreaker 與線程池對(duì)象都在這里進(jìn)行構(gòu)造
**/
    protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
            HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
            HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
            HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
//commandGroup對(duì)象,用于組織一類業(yè)務(wù)相關(guān)的對(duì)象
        this.commandGroup = initGroupKey(group);
// commandKey默認(rèn)是以類為為名稱的
        this.commandKey = initCommandKey(key, getClass());
        this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
//這個(gè)方法里定義了TheradPool里的關(guān)鍵字雄驹,默認(rèn)以傳入的commandGroup 的name做為key的名稱
        this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
        this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
        this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
//這里就是線程池對(duì)象啦佃牛。
        this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);

        //Strategies from plugins
        this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
        this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
        HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties);
        this.executionHook = initExecutionHook(executionHook);

        this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy);
        this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy);

        /* fallback semaphore override if applicable */
        this.fallbackSemaphoreOverride = fallbackSemaphore;

        /* execution semaphore override if applicable */
        this.executionSemaphoreOverride = executionSemaphore;
    }

/**
這個(gè)方法用于得到HystrixThreadPoolKey 對(duì)象, Hystrix內(nèi)部有大量的Key對(duì)象医舆,可以簡(jiǎn)單理解這些  Key都是相應(yīng)對(duì)象的唯一標(biāo)識(shí)俘侠。從代碼里可以看出,默認(rèn)情況下Hystrix采用的是commandGroup 的name做為T(mén)hread Pool的key值蔬将。
**/
    private static HystrixThreadPoolKey initThreadPoolKey(HystrixThreadPoolKey threadPoolKey, HystrixCommandGroupKey groupKey, String threadPoolKeyOverride) {
        if (threadPoolKeyOverride == null) {
            // we don't have a property overriding the value so use either HystrixThreadPoolKey or HystrixCommandGroup
            if (threadPoolKey == null) {
                /* use HystrixCommandGroup if HystrixThreadPoolKey is null */
                return HystrixThreadPoolKey.Factory.asKey(groupKey.name());
            } else {
                return threadPoolKey;
            }
        } else {
            // we have a property defining the thread-pool so use it instead
            return HystrixThreadPoolKey.Factory.asKey(threadPoolKeyOverride);
        }
    }

/**
在這里將調(diào)用具體的構(gòu)造線程池的方法爷速。
**/
    private static HystrixThreadPool initThreadPool(HystrixThreadPool fromConstructor, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults) {
        if (fromConstructor == null) {
            // get the default implementation of HystrixThreadPool
            return HystrixThreadPool.Factory.getInstance(threadPoolKey, threadPoolPropertiesDefaults);
        } else {
            return fromConstructor;
        }
    }

從上面的代碼分析我們知道線程池的構(gòu)造最終會(huì)落到HystrixThreadPool.Factory這個(gè)類上面。這個(gè)類內(nèi)存持有一個(gè)ConcurrentHashMap用于緩存線程池對(duì)象霞怀,當(dāng)傳入的HystrixThreadPoolKey已經(jīng)構(gòu)造過(guò)了相應(yīng)的ThreadPool惫东,將會(huì)直接從ConcurrentHashMap里返回已經(jīng)生成的ThreadPool。如果傳入的HystrixThreadPoolKey沒(méi)有相應(yīng)的ThreadPool毙石,將構(gòu)造新的ThreadPool并放入到ConcurrentHashMap這個(gè)緩存對(duì)象上廉沮。下面是關(guān)鍵代碼:

static class Factory {
  final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();

static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
            // get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work
            String key = threadPoolKey.name();

            // this should find it for all but the first time
//這里從緩存取
            HystrixThreadPool previouslyCached = threadPools.get(key);
            if (previouslyCached != null) {
                return previouslyCached;
            }

            // if we get here this is the first time so we need to initialize
//這里需要保證線程安全,加上了相應(yīng)的鎖
            synchronized (HystrixThreadPool.class) {
                if (!threadPools.containsKey(key)) {
//具體的線程池是由HystrixThreadPoolDefault進(jìn)行構(gòu)造的
                    threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
                }
            }
            return threadPools.get(key);
        }
}

HystrixThreadPoolDefault 內(nèi)部通過(guò)HystrixConcurrencyStrategy這個(gè)對(duì)象進(jìn)行線程池的構(gòu)造徐矩,里面根據(jù)傳入的properties信息來(lái)構(gòu)造線程池對(duì)象滞时。 關(guān)鍵代碼如下:

static class HystrixThreadPoolDefault implements HystrixThreadPool {
        private static final Logger logger = LoggerFactory.getLogger(HystrixThreadPoolDefault.class);

        private final HystrixThreadPoolProperties properties;
        private final BlockingQueue<Runnable> queue;
        private final ThreadPoolExecutor threadPool;
        private final HystrixThreadPoolMetrics metrics;
        private final int queueSize;

        public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
            this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
            HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
            this.queueSize = properties.maxQueueSize().get();

            this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
                    concurrencyStrategy.getThreadPool(threadPoolKey, properties),
                    properties);
            this.threadPool = this.metrics.getThreadPool();
            this.queue = this.threadPool.getQueue();

            /* strategy: HystrixMetricsPublisherThreadPool */
            HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
        }
}

HystrixConcurrencyStrategy 類里我們可以看到采用的我們熟悉的ThreadPoolExecutor對(duì)象來(lái)構(gòu)造線程池。 里面需要傳入核心線程池的大小滤灯,最大線程數(shù)漂洋,隊(duì)列等關(guān)鍵信息。關(guān)鍵代碼如下:

    public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
        final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);

        final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
        final int dynamicCoreSize = threadPoolProperties.coreSize().get();
        final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
        final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
        final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);

        if (allowMaximumSizeToDivergeFromCoreSize) {
            final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
            if (dynamicCoreSize > dynamicMaximumSize) {
                logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
                        dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ".  Maximum size will be set to " +
                        dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
                return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
            } else {
                return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
            }
        } else {
            return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
        }
    }

從上面代碼的分析我們可以得出力喷,線程池是以HystrixCommandGroupKey進(jìn)行劃分的,不同的CommandGroup有不同的線程池來(lái)處理演训,而這個(gè)CommandGroup在我們的分布式系統(tǒng)中弟孟,可以把相關(guān)的業(yè)務(wù)處理放到一個(gè)CommandGroup中。

Hystrix熔斷

??熔斷器样悟,現(xiàn)實(shí)生活中有一個(gè)很好的類比拂募,就是家庭電路中都會(huì)安裝一個(gè)保險(xiǎn)盒,當(dāng)電流過(guò)大的時(shí)候保險(xiǎn)盒里面的保險(xiǎn)絲會(huì)自動(dòng)斷掉窟她,來(lái)保護(hù)家里的各種電器及電路陈症。Hystrix中的熔斷器(Circuit Breaker)也是起到這樣的作用,Hystrix在運(yùn)行過(guò)程中會(huì)向每個(gè)CommandKey對(duì)應(yīng)的熔斷器報(bào)告成功震糖、失敗录肯、超時(shí)和拒絕的狀態(tài),熔斷器維護(hù)計(jì)算統(tǒng)計(jì)的數(shù)據(jù)吊说,根據(jù)這些統(tǒng)計(jì)的信息來(lái)確定熔斷器是否打開(kāi)论咏。如果打開(kāi)优炬,后續(xù)的請(qǐng)求都會(huì)被截?cái)啵ú辉賵?zhí)行run方法里的內(nèi)容了,直接執(zhí)行fallback方法里的內(nèi)容)厅贪。然后會(huì)隔一段時(shí)間默認(rèn)是5s蠢护,嘗試半開(kāi),放入一部分流量請(qǐng)求進(jìn)來(lái)养涮,相當(dāng)于對(duì)依賴服務(wù)進(jìn)行一次健康檢查葵硕,如果恢復(fù),熔斷器關(guān)閉贯吓,隨后完全恢復(fù)調(diào)用懈凹。改造HelloCommand類代碼如下:

package com.ivan.client.hystrix;

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandProperties;

public class HelloCommand extends HystrixCommand<String> {

    protected HelloCommand() {
        
        
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("test"))
                .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                        //開(kāi)啟熔斷模式
                        .withCircuitBreakerEnabled(true)
                        //出現(xiàn)錯(cuò)誤的比率超過(guò)30%就開(kāi)啟熔斷
                        .withCircuitBreakerErrorThresholdPercentage(30)
                        //至少有10個(gè)請(qǐng)求才進(jìn)行errorThresholdPercentage錯(cuò)誤百分比計(jì)算
                        .withCircuitBreakerRequestVolumeThreshold(10)
                        //半開(kāi)試探休眠時(shí)間,這里設(shè)置為3秒
                        .withCircuitBreakerSleepWindowInMilliseconds(3000)
                        )
                );
        
    }

    @Override
    protected String run() throws Exception {
        //模擬外部請(qǐng)求需要的時(shí)間長(zhǎng)度
        System.out.println("執(zhí)行了run方法");
        Thread.sleep(2000);
        return "sucess";
    }
    
    @Override
    protected String getFallback() {
      //當(dāng)外部請(qǐng)求超時(shí)后宣决,會(huì)執(zhí)行fallback里的業(yè)務(wù)邏輯
      System.out.println("執(zhí)行了回退方法");
      return "error";
    }

}

改造App的代碼蘸劈,通過(guò)執(zhí)行30次請(qǐng)求,可以看出剛開(kāi)始的時(shí)候(前10次請(qǐng)求)會(huì)執(zhí)行run方法里的邏輯尊沸,一量熔斷器打開(kāi)后威沫,將不執(zhí)行run方法里的內(nèi)容,而是直接執(zhí)行g(shù)etFallback方法里的邏輯洼专,直到過(guò)了設(shè)置的3秒后才又有流量執(zhí)行run方法的邏輯棒掠。改造App后的代碼如下:

package com.ivan.client.hystrix;

public class App {
    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 30; i++) {
            HelloCommand command = new HelloCommand();
            String result = command.execute();
            System.out.println("circuit Breaker is open : " + command.isCircuitBreakerOpen());
            if(command.isCircuitBreakerOpen()){
                Thread.currentThread().sleep(500);
            }
        }
    }
}

Hystrix熔斷源碼分析

??找到AbstractCommand類的initCircuitBreaker方法,這是熔斷器的構(gòu)造方法入口屁商。首先判斷是否打開(kāi)了熔斷器烟很,只有在打開(kāi)了熔斷器后才會(huì)通過(guò)HystrixCircuitBreaker.Factory工廠新建一個(gè)熔斷器,源碼如下:

    private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor,
                                                            HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey,
                                                            HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
        if (enabled) {
            if (fromConstructor == null) {
                // get the default implementation of HystrixCircuitBreaker
                return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);
            } else {
                return fromConstructor;
            }
        } else {
            return new NoOpCircuitBreaker();
        }
    }

HystrixCircuitBreaker.Factory 類里對(duì)熔斷器根據(jù)CommandKey進(jìn)行了緩存蜡镶,如果存在直接取緩存里的key,不存在則新建HystrixCircuitBreakerImpl對(duì)象用于熔斷操作雾袱。源代碼如下:

    class Factory {
      //circuitBreakersByCommand 是個(gè)ConcurrentHashMap, 這里緩存了系統(tǒng)的所有熔斷器
        private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();

        public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
            // this should find it for all but the first time
  //先從緩存里取
            HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name());
            if (previouslyCached != null) {
                return previouslyCached;
            }

            // if we get here this is the first time so we need to initialize

            // Create and add to the map ... use putIfAbsent to atomically handle the possible race-condition of
            // 2 threads hitting this point at the same time and let ConcurrentHashMap provide us our thread-safety
            // If 2 threads hit here only one will get added and the other will get a non-null response instead.
 //取不到對(duì)象才會(huì)創(chuàng)建個(gè)HystrixCircuitBreakerImpl對(duì)象并放入緩存Map中
            HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics));
            if (cbForCommand == null) {
                // this means the putIfAbsent step just created a new one so let's retrieve and return it
                return circuitBreakersByCommand.get(key.name());
            } else {
                // this means a race occurred and while attempting to 'put' another one got there before
                // and we instead retrieved it and will now return it
                return cbForCommand;
            }
        }

        /**
         * Get the {@link HystrixCircuitBreaker} instance for a given {@link HystrixCommandKey} or null if none exists.
         * 
         * @param key
         *            {@link HystrixCommandKey} of {@link HystrixCommand} instance requesting the {@link HystrixCircuitBreaker}
         * @return {@link HystrixCircuitBreaker} for {@link HystrixCommandKey}
         */
        public static HystrixCircuitBreaker getInstance(HystrixCommandKey key) {
            return circuitBreakersByCommand.get(key.name());
        }

        /**
         * Clears all circuit breakers. If new requests come in instances will be recreated.
         */
        /* package */static void reset() {
            circuitBreakersByCommand.clear();
        }
    }

??HystrixCircuitBreakerImpl 這個(gè)類里定義了一個(gè)狀態(tài)變量官还,斷路由有三種狀態(tài) 芹橡,分別為關(guān)閉,打開(kāi)望伦,半開(kāi)狀態(tài)林说。重點(diǎn)關(guān)注下allowRequest方法,在allowRequest里首先判斷forceOpen屬性是否打開(kāi)屯伞,如果打開(kāi)則不允許有請(qǐng)求進(jìn)入腿箩,然后forceClosed屬性,如果這個(gè)屬性為true,剛對(duì)所有的求求放行劣摇,相當(dāng)于熔斷器不起作用珠移。之后就是狀態(tài)判斷了。isAfterSleepWindow()方法用于放行超過(guò)了指定時(shí)間后的流量,剑梳。具體代碼如下唆貌,關(guān)鍵部分有相應(yīng)的注釋:

class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
        private final HystrixCommandProperties properties;
        private final HystrixCommandMetrics metrics;
  //三種狀態(tài)通過(guò)枚舉來(lái)定義
        enum Status {
            CLOSED, OPEN, HALF_OPEN;
        }
//狀態(tài)變時(shí),默認(rèn)是關(guān)閉的狀態(tài)
        private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED);
//最后一次訪問(wèn)的時(shí)間垢乙,用于試探請(qǐng)求是否恢復(fù)
        private final AtomicLong circuitOpened = new AtomicLong(-1);
        private final AtomicReference<Subscription> activeSubscription = new AtomicReference<Subscription>(null);

        protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
            this.properties = properties;
            this.metrics = metrics;

            //On a timer, this will set the circuit between OPEN/CLOSED as command executions occur
            Subscription s = subscribeToStream();
            activeSubscription.set(s);
        }

        private Subscription subscribeToStream() {
            /*
             * This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream
             */
            return metrics.getHealthCountsStream()
                    .observe()
                    .subscribe(new Subscriber<HealthCounts>() {
                        @Override
                        public void onCompleted() {

                        }

                        @Override
                        public void onError(Throwable e) {

                        }

                        @Override
                        public void onNext(HealthCounts hc) {
                            // check if we are past the statisticalWindowVolumeThreshold
                            if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
                                // we are not past the minimum volume threshold for the stat window,
                                // so no change to circuit status.
                                // if it was CLOSED, it stays CLOSED
                                // if it was half-open, we need to wait for a successful command execution
                                // if it was open, we need to wait for sleep window to elapse
                            } else {
                                if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
                                    //we are not past the minimum error threshold for the stat window,
                                    // so no change to circuit status.
                                    // if it was CLOSED, it stays CLOSED
                                    // if it was half-open, we need to wait for a successful command execution
                                    // if it was open, we need to wait for sleep window to elapse
                                } else {
                                    // our failure rate is too high, we need to set the state to OPEN
                                    if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
                                        circuitOpened.set(System.currentTimeMillis());
                                    }
                                }
                            }
                        }
                    });
        }
//將熔斷器置于關(guān)閉狀態(tài)锨咙,并重置統(tǒng)計(jì)數(shù)據(jù)
        @Override
        public void markSuccess() {
            if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
                //This thread wins the race to close the circuit - it resets the stream to start it over from 0
                metrics.resetStream();
                Subscription previousSubscription = activeSubscription.get();
                if (previousSubscription != null) {
                    previousSubscription.unsubscribe();
                }
                Subscription newSubscription = subscribeToStream();
                activeSubscription.set(newSubscription);
                circuitOpened.set(-1L);
            }
        }

//將熔斷器置于打開(kāi)狀態(tài)
        @Override
        public void markNonSuccess() {
            if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
                //This thread wins the race to re-open the circuit - it resets the start time for the sleep window
                circuitOpened.set(System.currentTimeMillis());
            }
        }
//用于判斷熔斷器是否打開(kāi)
        @Override
        public boolean isOpen() {
            if (properties.circuitBreakerForceOpen().get()) {
                return true;
            }
            if (properties.circuitBreakerForceClosed().get()) {
                return false;
            }
            return circuitOpened.get() >= 0;
        }
//用于判斷是否放行流量
        @Override
        public boolean allowRequest() {
            if (properties.circuitBreakerForceOpen().get()) {
                return false;
            }
            if (properties.circuitBreakerForceClosed().get()) {
                return true;
            }
//第一次請(qǐng)求肯定就放行了
            if (circuitOpened.get() == -1) {
                return true;
            } else {
//半開(kāi)狀態(tài)將不放行
                if (status.get().equals(Status.HALF_OPEN)) {
                    return false;
                } else {
                    return isAfterSleepWindow();
                }
            }
        }

//根據(jù)當(dāng)前時(shí)間與最后一次請(qǐng)求的時(shí)候進(jìn)行比較,當(dāng)超過(guò)了設(shè)置的SleepWindowInMilliseconds追逮,將放行請(qǐng)求用于試探服務(wù)訪問(wèn)是否OK
        private boolean isAfterSleepWindow() {
            final long circuitOpenTime = circuitOpened.get();
            final long currentTime = System.currentTimeMillis();
            final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
            return currentTime > circuitOpenTime + sleepWindowTime;
        }

//用于試探服務(wù)是否OK的方法
        @Override
        public boolean attemptExecution() {
            if (properties.circuitBreakerForceOpen().get()) {
                return false;
            }
            if (properties.circuitBreakerForceClosed().get()) {
                return true;
            }
            if (circuitOpened.get() == -1) {
                return true;
            } else {
                if (isAfterSleepWindow()) {
                    //only the first request after sleep window should execute
                    //if the executing command succeeds, the status will transition to CLOSED
                    //if the executing command fails, the status will transition to OPEN
                    //if the executing command gets unsubscribed, the status will transition to OPEN
                    if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
                        return true;
                    } else {
                        return false;
                    }
                } else {
                    return false;
                }
            }
        }
    }

Hystrix降級(jí)處理

??所謂降級(jí)酪刀,就是指在在Hystrix執(zhí)行非核心鏈路功能失敗的情況下,我們?nèi)绾翁幚砼シ酰热缥覀兎祷啬J(rèn)值等骂倘。如果我們要回退或者降級(jí)處理,代碼上需要實(shí)現(xiàn)HystrixCommand.getFallback()方法或者是HystrixObservableCommand. resumeWithFallback()巴席。

Hystrix與Spring Cloud整合

在實(shí)際項(xiàng)目的開(kāi)發(fā)中历涝,都會(huì)用到Fegin,所以這里的集成是在Feign的基礎(chǔ)上進(jìn)行的漾唉。

  • 首先還是需要引入包荧库,pom.xml文件如下:
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>Edgware.SR4</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-eureka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-ribbon</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-feign</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-hystrix</artifactId>
        </dependency>
    </dependencies>
  • 啟動(dòng)類需要加上EnableCircuitBreaker注解代碼如下:
package com.ivan.client.feign;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.netflix.feign.EnableFeignClients;

@SpringBootApplication
@EnableEurekaClient
@EnableFeignClients
@EnableCircuitBreaker
public class App {

    public static void main(String[] args) {
        SpringApplication.run(App.class, args);
    }
}

  • FeignClient 注解需有fallback屬性,屬性的值是個(gè)class,這個(gè) class在是斷路器打開(kāi)后赵刑,會(huì)執(zhí)行的業(yè)務(wù)邏輯分衫,一般在項(xiàng)目里返回一個(gè)默認(rèn)值。這個(gè)類需要實(shí)現(xiàn)與FeignClient注釋上相同的接口
package com.ivan.client.feign.service;

import org.springframework.cloud.netflix.feign.FeignClient;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;

import com.ivan.client.feign.entity.User;
import com.ivan.client.feign.hystrix.fallback.UserServiceFallback;


@FeignClient(value="provider", fallback=UserServiceFallback.class)
public interface UserService {

    @RequestMapping(method = RequestMethod.GET, value = "/user/{id}")
    public User getUser(@PathVariable("id") Integer id);

    @RequestMapping(method = RequestMethod.POST, value = "/user/create", consumes = MediaType.APPLICATION_JSON_VALUE)
    public User create(User user);

}
  • fallback類的代碼如下:
package com.ivan.client.feign.hystrix.fallback;

import org.springframework.stereotype.Component;

import com.ivan.client.feign.entity.User;
import com.ivan.client.feign.service.UserService;

@Component
public class UserServiceFallback implements UserService {

    public User getUser(Integer id) {
        System.out.println(Thread.currentThread().getName());
        System.out.println("=====執(zhí)行到了fallback方法=======");
        User user = new User();
        user.setId(0);
        return user;
    }

    public User create(User user) {
        // TODO Auto-generated method stub
        return null;
    }

}

  • application.properties文件里需要把feign.hystrix.enabled=true 這個(gè)屬性打開(kāi)般此。配置文件如下:
server.port=9000
spring.application.name=consumer-feign-hystrix
eureka.instance.hostname=localhost
eureka.client.serviceUrl.defaultZone=http://localhost:8761/eureka/
spring.cloud.circuit.breaker.enabled=true

ribbon.ReadTimeout=5000

feign.hystrix.enabled=true
#command相關(guān)
hystrix.command.default.execution.isolation.strategy=THREAD
#設(shè)置調(diào)用者的超時(shí)時(shí)間
hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds=6000
#是否開(kāi)啟超時(shí)設(shè)置
hystrix.command.default.execution.timeout.enabled=true
#表示是否在執(zhí)行超時(shí)時(shí)蚪战,中斷HystrixCommand.run() 的執(zhí)行
hystrix.command.default.execution.isolation.thread.interruptOnTimeout=true

#fallback相關(guān)
#是否開(kāi)啟fallback功能
hystrix.command.default.fallback.enabled=true

#斷路器相關(guān)
#是否開(kāi)啟斷路器
hystrix.command.default.circuitBreaker.enabled=true
#窗口時(shí)間內(nèi)打開(kāi)斷路器最小的請(qǐng)求量
hystrix.command.default.circuitBreaker.requestVolumeThreshold=5
#斷路器跳閘后,在此值的時(shí)間的內(nèi)铐懊,hystrix會(huì)拒絕新的請(qǐng)求邀桑,只有過(guò)了這個(gè)時(shí)間斷路器才會(huì)打開(kāi)閘門(mén)
hystrix.command.default.circuitBreaker.sleepWindowInMilliseconds=5
#失敗百分比的閾值
hystrix.command.default.circuitBreaker.errorThresholdPercentage=20

#線程相關(guān)配置
#核心線程數(shù)
hystrix.threadpool.default.coreSize=5
#最大線程數(shù)
hystrix.threadpool.default.maximumSize=5
#隊(duì)列的大小
hystrix.threadpool.default.maxQueueSize=1024
#因?yàn)閙axQueueSize值不能被動(dòng)態(tài)修改,所有通過(guò)設(shè)置此值可以實(shí)現(xiàn)動(dòng)態(tài)修改等待隊(duì)列長(zhǎng)度科乎。即等待的隊(duì)列的數(shù)量大于queueSizeRejectionThreshold時(shí)(但是沒(méi)有達(dá)到maxQueueSize值)概漱,則開(kāi)始拒絕后續(xù)的請(qǐng)求進(jìn)入隊(duì)列
hystrix.threadpool.default.queueSizeRejectionThreshold=128
#設(shè)置線程多久沒(méi)有服務(wù)后,需要釋放(maximumSize-coreSize )個(gè)線程
hystrix.threadpool.default.keepAliveTimeMinutes=60

上面的屬性基本上包括了大部分會(huì)在項(xiàng)目中使用的屬性喜喂,有以下幾點(diǎn)需要重點(diǎn)關(guān)注一下:

  • 上面屬性的default可以改成ComandKey,這樣就可以對(duì)特定的接口進(jìn)行配置了竿裂,F(xiàn)eign中CommandKey的值為:接口名#方法名(參數(shù)類型)玉吁,如上的CommandKey為UserService#getUser(Integer)
  • 在測(cè)試hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds 屬性的時(shí)候,服務(wù)端如果在指定的時(shí)間返回了結(jié)果腻异,但系統(tǒng)還是調(diào)用了fallback里的邏輯进副,需要指定ribbon.ReadTimeout的時(shí)間。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市影斑,隨后出現(xiàn)的幾起案子给赞,更是在濱河造成了極大的恐慌,老刑警劉巖矫户,帶你破解...
    沈念sama閱讀 211,948評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件片迅,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡皆辽,警方通過(guò)查閱死者的電腦和手機(jī)柑蛇,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,371評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)驱闷,“玉大人耻台,你說(shuō)我怎么就攤上這事】樟恚” “怎么了盆耽?”我有些...
    開(kāi)封第一講書(shū)人閱讀 157,490評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)扼菠。 經(jīng)常有香客問(wèn)我摄杂,道長(zhǎng),這世上最難降的妖魔是什么娇豫? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,521評(píng)論 1 284
  • 正文 為了忘掉前任匙姜,我火速辦了婚禮,結(jié)果婚禮上冯痢,老公的妹妹穿的比我還像新娘氮昧。我一直安慰自己,他們只是感情好浦楣,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,627評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布袖肥。 她就那樣靜靜地躺著,像睡著了一般振劳。 火紅的嫁衣襯著肌膚如雪椎组。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 49,842評(píng)論 1 290
  • 那天历恐,我揣著相機(jī)與錄音寸癌,去河邊找鬼。 笑死弱贼,一個(gè)胖子當(dāng)著我的面吹牛蒸苇,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播吮旅,決...
    沈念sama閱讀 38,997評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼溪烤,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起檬嘀,我...
    開(kāi)封第一講書(shū)人閱讀 37,741評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤槽驶,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后鸳兽,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體掂铐,經(jīng)...
    沈念sama閱讀 44,203評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,534評(píng)論 2 327
  • 正文 我和宋清朗相戀三年贸铜,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了堡纬。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,673評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡蒿秦,死狀恐怖烤镐,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情棍鳖,我是刑警寧澤炮叶,帶...
    沈念sama閱讀 34,339評(píng)論 4 330
  • 正文 年R本政府宣布,位于F島的核電站渡处,受9級(jí)特大地震影響镜悉,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜医瘫,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,955評(píng)論 3 313
  • 文/蒙蒙 一侣肄、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧醇份,春花似錦稼锅、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,770評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至怖竭,卻和暖如春锥债,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背痊臭。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,000評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工哮肚, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人广匙。 一個(gè)月前我還...
    沈念sama閱讀 46,394評(píng)論 2 360
  • 正文 我出身青樓绽左,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親艇潭。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,562評(píng)論 2 349

推薦閱讀更多精彩內(nèi)容