Hystrix執(zhí)行原理

前奏

Hystrix的常規(guī)使用姿勢

    @Test
    public void test_run(){
        String s = new CommandHelloWorld("Bob").execute();
        System.out.println(s);
    }

我們的command在new的時候發(fā)生了什么邓嘹?execute()是如何執(zhí)行的歌馍?execute執(zhí)行失敗或者超時如何fallback?

一滚朵、PREPARE 初始化

當我們new XXCommand()的時候湾碎,大部分的工作都是在 AbstractCommand完成

protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
        HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
        HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
        HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {

    this.commandGroup = initGroupKey(group);
    this.commandKey = initCommandKey(key, getClass());
    this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
    this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
    this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
    this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
    this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);

    //Strategies from plugins
    this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
    this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
    HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties);
    this.executionHook = initExecutionHook(executionHook);

    this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy);
    this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy);

    /* fallback semaphore override if applicable */
    this.fallbackSemaphoreOverride = fallbackSemaphore;

    /* execution semaphore override if applicable */
    this.executionSemaphoreOverride = executionSemaphore;
}

可以很清晰的看到尸闸,這里面在進行command配置裝載、線程池配置裝載及線程池的創(chuàng)建兵拢、指標搜集器翻斟、熔斷器的初始化等等。

//HystrixCommandMetrics
ConcurrentHashMap<String, HystrixCommandMetrics> metrics = new ConcurrentHashMap<String, HystrixCommandMetrics>();

//HystrixThreadPoolDefault
final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();

//com.netflix.hystrix.HystrixCircuitBreaker.Factory
private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();

除HystrixCommand每次都需要重新建立说铃,其它基本都以commandKey維護著配置访惜,熔斷器,指標的單例而線程池則以threadkey進場存儲腻扇。

我們可以了了解下Hystrix的線程池如何管理
創(chuàng)建線程調(diào)用 HystrixThreadPool.Factory.getInstance

static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
    // get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work
    String key = threadPoolKey.name();

    // this should find it for all but the first time
    HystrixThreadPool previouslyCached = threadPools.get(key);
    if (previouslyCached != null) {
        return previouslyCached;
    }

    // if we get here this is the first time so we need to initialize
    synchronized (HystrixThreadPool.class) {
        if (!threadPools.containsKey(key)) {
            threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
        }
    }
    return threadPools.get(key);
}

從緩存中以threadPoolKey獲取線程池债热,獲取不到則 調(diào)用new HystrixThreadPoolDefault新建

public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
    this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
    HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
    this.queueSize = properties.maxQueueSize().get();

    this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
            concurrencyStrategy.getThreadPool(threadPoolKey, properties),
            properties);
    this.threadPool = this.metrics.getThreadPool();
    this.queue = this.threadPool.getQueue();

    /* strategy: HystrixMetricsPublisherThreadPool */
    HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
}

注意

this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,concurrencyStrategy.getThreadPool(threadPoolKey, properties),properties);

其中 concurrencyStrategy.getThreadPool,HystrixConcurrencyStrategy就是hystrix的線程創(chuàng)建策略者

真正的創(chuàng)建線程執(zhí)行
HystrixConcurrencyStrategy#getThreadPool

public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
    .....各種配置幼苛,此處代碼省略......

    if (allowMaximumSizeToDivergeFromCoreSize) {
        final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
        if (dynamicCoreSize > dynamicMaximumSize) {
            logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
                    dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ".  Maximum size will be set to " +
                    dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
            return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
        } else {
            return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
        }
    } else {
        return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
    }
}

這里調(diào)用java JUC原生的 ThreadPoolExecutor創(chuàng)建線程

二窒篱、Observable 大串燒

Hystrix的執(zhí)行利用RxJava,組合了很多的Observable,形成一個Observable墙杯,和傳統(tǒng)的調(diào)用鏈相比更加簡潔配并。


三、各色Observable顯神通

3.1.command 狀態(tài)位
  1. toObservable 第一個observable高镐,在下一個chain之前溉旋,會更改HystrixCommand狀態(tài)位 OBSERVABLE_CHAIN_CREATED
  2. toObservable doOnTerminate,探測到terminate時嫉髓,會將HystrixCommand更改為 TERMINAL
  3. executeCommandWithSpecifiedIsolation在開始執(zhí)行的時候會更改HystrixCommand更改為 USER_CODE_EXECUTED
  4. toObservable doOnUnsubscribe观腊,探測到terminate時,會將HystrixCommand更改為 UNSUBSCRIBED
3.2.executeCommandWithSpecifiedIsolation

分配執(zhí)行線程算行,維護線程狀態(tài)

private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
    if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
        // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                .....省略干擾代碼.....
                if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
                    return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
                }

                if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
                        // the command timed out in the wrapping thread so we will return immediately
                        // and not increment any of the counters below or other such logic
                        return Observable.error(new RuntimeException("timed out before executing run()"));
                    }
                
                if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {

                    try {
                       .....省略干擾代碼.....

                        return getUserExecutionObservable(_cmd);
                    } catch (Throwable ex) {
                        return Observable.error(ex);
                    }
                } else {
                    //command has already been unsubscribed, so return immediately
                    return Observable.error(new RuntimeException("unsubscribed before executing run()"));
                }
            }
        }).doOnTerminate(new Action0() {
            @Override
            public void call() {
                if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
                    handleThreadEnd(_cmd);
                }
                if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {
                    //if it was never started and received terminal, then no need to clean up (I don't think this is possible)
                }
                //if it was unsubscribed, then other cleanup handled it
            }
        }).doOnUnsubscribe(new Action0() {
            @Override
            public void call() {
                if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
                    handleThreadEnd(_cmd);
                }
                if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {
                    //if it was never started and was cancelled, then no need to clean up
                }
                //if it was terminal, then other cleanup handled it
            }
        }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
            @Override
            public Boolean call() {
                return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
            }
        }));
    } else {
        .....省略干擾代碼.....
    }
}

具體邏輯
1.判斷隔離策略梧油,如果是Semaphore 信號量則在當前線程上執(zhí)行,否則進入線程分配邏輯
2.更改HystrixCommand的狀態(tài) USER_CODE_EXECUTED
3.判斷HystrixCommand超時狀態(tài)纱意,如果已經(jīng)超時則拋出異常
4.更改當前command的線程執(zhí)行狀態(tài)為 STARTED
5.調(diào)用 getUserExecutionObservable 執(zhí)行具體邏輯
6.doOnTerminate 當Observale執(zhí)行完畢后(HystrixCommand可能失敗也可能執(zhí)行成功)婶溯,此時的線程狀態(tài)可能有兩種分別是 STARTEDNOT_USING_THREAD 鲸阔, 然后更改線程狀態(tài)為 TERMINAL
7.doOnUnsubscribe 當Observable被取消訂閱偷霉,更改線程狀態(tài)為 TERMINAL
8.subscribeOn 指定scheduler,這里Hystrix實現(xiàn)了自己的scheduler褐筛,在scheduler的worker指定線程池类少,在配置線程之前會重新加載線程池配置(這里是Rxjava的東西,暫時大家可以粗略的認為這里就是指定線程池渔扎,然后把要執(zhí)行的任務扔到這個線程池里)

@Override
public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
    touchConfig();
    return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}

// allow us to change things via fast-properties by setting it each time
private void touchConfig() {
    final int dynamicCoreSize = properties.coreSize().get();
    final int configuredMaximumSize = properties.maximumSize().get();
    int dynamicMaximumSize = properties.actualMaximumSize();
    final boolean allowSizesToDiverge = properties.getAllowMaximumSizeToDivergeFromCoreSize().get();
    boolean maxTooLow = false;

    if (allowSizesToDiverge && configuredMaximumSize < dynamicCoreSize) {
        //if user sets maximum < core (or defaults get us there), we need to maintain invariant of core <= maximum
        dynamicMaximumSize = dynamicCoreSize;
        maxTooLow = true;
    }

    // In JDK 6, setCorePoolSize and setMaximumPoolSize will execute a lock operation. Avoid them if the pool size is not changed.
    if (threadPool.getCorePoolSize() != dynamicCoreSize || (allowSizesToDiverge && threadPool.getMaximumPoolSize() != dynamicMaximumSize)) {
        if (maxTooLow) {
            logger.error("Hystrix ThreadPool configuration for : " + metrics.getThreadPoolKey().name() + " is trying to set coreSize = " +
                    dynamicCoreSize + " and maximumSize = " + configuredMaximumSize + ".  Maximum size will be set to " +
                    dynamicMaximumSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
        }
        threadPool.setCorePoolSize(dynamicCoreSize);
        threadPool.setMaximumPoolSize(dynamicMaximumSize);
    }

    threadPool.setKeepAliveTime(properties.keepAliveTimeMinutes().get(), TimeUnit.MINUTES);
}

touchConfig 執(zhí)行具體的線程池參數(shù)調(diào)整硫狞。

從上面的過程也能發(fā)現(xiàn),該observable也是維護線程狀態(tài)的地方晃痴,線程的狀態(tài)變更見下圖


3.3.getUserExecutionObservable

執(zhí)行具體業(yè)務邏輯

private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
    Observable<R> userObservable;

    try {
        userObservable = getExecutionObservable();
    } catch (Throwable ex) {
        // the run() method is a user provided implementation so can throw instead of using Observable.onError
        // so we catch it here and turn it into Observable.error
        userObservable = Observable.error(ex);
    }

    return userObservable
            .lift(new ExecutionHookApplication(_cmd))
            .lift(new DeprecatedOnRunHookApplication(_cmd));
}

userObservable = getExecutionObservable(); 由HystrixCommand自己實現(xiàn)

//HystrixCommand
final protected Observable<R> getExecutionObservable() {
    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            try {
                return Observable.just(run());
            } catch (Throwable ex) {
                return Observable.error(ex);
            }
        }
    }).doOnSubscribe(new Action0() {
        @Override
        public void call() {
            // Save thread on which we get subscribed so that we can interrupt it later if needed
            executionThread.set(Thread.currentThread());
        }
    });
}

這里看到 run()應該就明白了残吩,就是我們自己的業(yè)務代碼 CommandHelloWorld去實現(xiàn)的。

3.4.getFallbackOrThrowException

當executeCommandWithSpecifiedIsolation探測到異常時觸發(fā)該Observable倘核。getFallbackOrThrowException里具體fallback執(zhí)行看
executeCommandAndObserve泣侮。

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
    .....省略干擾代碼.....
    final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
        .....省略干擾代碼.....
    };

    .....省略干擾代碼.....

    Observable<R> execution;
    if (properties.executionTimeoutEnabled().get()) {
        execution = executeCommandWithSpecifiedIsolation(_cmd)
                .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
    } else {
        execution = executeCommandWithSpecifiedIsolation(_cmd);
    }

    return execution.doOnNext(markEmits)
            .doOnCompleted(markOnCompleted)
            .onErrorResumeNext(handleFallback)
            .doOnEach(setRequestContext);
}

doErrorResumeNext 會觸發(fā)下一個 handleFallback。

private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {
    ....省略干擾代碼....

    if (isUnrecoverable(originalException)) {
        ....省略干擾代碼....
    } else {
        ....省略干擾代碼....

        if (properties.fallbackEnabled().get()) {
        
            ....省略干擾代碼....

            Observable<R> fallbackExecutionChain;

            // acquire a permit
            if (fallbackSemaphore.tryAcquire()) {
                try {
                    if (isFallbackUserDefined()) {
                        executionHook.onFallbackStart(this);
                        fallbackExecutionChain = getFallbackObservable();
                    } else {
                        //same logic as above without the hook invocation
                        fallbackExecutionChain = getFallbackObservable();
                    }
                } catch (Throwable ex) {
                    //If hook or user-fallback throws, then use that as the result of the fallback lookup
                    fallbackExecutionChain = Observable.error(ex);
                }

                return fallbackExecutionChain
                        .doOnEach(setRequestContext)
                        .lift(new FallbackHookApplication(_cmd))
                        .lift(new DeprecatedOnFallbackHookApplication(_cmd))
                        .doOnNext(markFallbackEmit)
                        .doOnCompleted(markFallbackCompleted)
                        .onErrorResumeNext(handleFallbackError)
                        .doOnTerminate(singleSemaphoreRelease)
                        .doOnUnsubscribe(singleSemaphoreRelease);
            } else {
               return handleFallbackRejectionByEmittingError();
            }
        } else {
            return handleFallbackDisabledByEmittingError(originalException, failureType, message);
        }
    }
}

這里優(yōu)先幾個步驟
1.判斷異常是否是能走fallback處理紧唱,不能則拋出HystrixRuntimeException
2.判斷配置是否開啟允許fallback活尊,開啟,則進入 getFallbackObservable()漏益,而該方法具體有HystrixCommand實現(xiàn)蛹锰,調(diào)用的則是用戶的Command的fallback方法,如果調(diào)用方?jīng)]有覆蓋該方法绰疤,則會執(zhí)行HystrixCommand的fallback方法铜犬,拋出未定義fallback方法的異常

protected R getFallback() {
    throw new UnsupportedOperationException("No fallback available.");
 }

@Override
final protected Observable<R> getFallbackObservable() {
    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            try {
               //調(diào)用方 fallback邏輯
                return Observable.just(getFallback());
            } catch (Throwable ex) {
                return Observable.error(ex);
            }
        }
    });
}

系列文章推薦
Hystrix熔斷框架介紹
Hystrix常用功能介紹
Hystrix執(zhí)行原理
Hystrix熔斷器執(zhí)行機制
Hystrix超時實現(xiàn)機制

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子翎苫,更是在濱河造成了極大的恐慌权埠,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,482評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件煎谍,死亡現(xiàn)場離奇詭異攘蔽,居然都是意外死亡,警方通過查閱死者的電腦和手機呐粘,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,377評論 2 382
  • 文/潘曉璐 我一進店門满俗,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人作岖,你說我怎么就攤上這事唆垃。” “怎么了痘儡?”我有些...
    開封第一講書人閱讀 152,762評論 0 342
  • 文/不壞的土叔 我叫張陵辕万,是天一觀的道長。 經(jīng)常有香客問我沉删,道長渐尿,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,273評論 1 279
  • 正文 為了忘掉前任矾瑰,我火速辦了婚禮砖茸,結果婚禮上,老公的妹妹穿的比我還像新娘殴穴。我一直安慰自己凉夯,他們只是感情好,可當我...
    茶點故事閱讀 64,289評論 5 373
  • 文/花漫 我一把揭開白布采幌。 她就那樣靜靜地躺著劲够,像睡著了一般。 火紅的嫁衣襯著肌膚如雪休傍。 梳的紋絲不亂的頭發(fā)上征绎,一...
    開封第一講書人閱讀 49,046評論 1 285
  • 那天,我揣著相機與錄音尊残,去河邊找鬼炒瘸。 笑死,一個胖子當著我的面吹牛寝衫,可吹牛的內(nèi)容都是我干的顷扩。 我是一名探鬼主播,決...
    沈念sama閱讀 38,351評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼慰毅,長吁一口氣:“原來是場噩夢啊……” “哼隘截!你這毒婦竟也來了?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 36,988評論 0 259
  • 序言:老撾萬榮一對情侶失蹤婶芭,失蹤者是張志新(化名)和其女友劉穎东臀,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體犀农,經(jīng)...
    沈念sama閱讀 43,476評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡惰赋,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,948評論 2 324
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了呵哨。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片赁濒。...
    茶點故事閱讀 38,064評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖孟害,靈堂內(nèi)的尸體忽然破棺而出拒炎,到底是詐尸還是另有隱情,我是刑警寧澤挨务,帶...
    沈念sama閱讀 33,712評論 4 323
  • 正文 年R本政府宣布击你,位于F島的核電站,受9級特大地震影響谎柄,放射性物質(zhì)發(fā)生泄漏丁侄。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,261評論 3 307
  • 文/蒙蒙 一谷誓、第九天 我趴在偏房一處隱蔽的房頂上張望绒障。 院中可真熱鬧吨凑,春花似錦捍歪、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,264評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至恩商,卻和暖如春变逃,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背怠堪。 一陣腳步聲響...
    開封第一講書人閱讀 31,486評論 1 262
  • 我被黑心中介騙來泰國打工揽乱, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人粟矿。 一個月前我還...
    沈念sama閱讀 45,511評論 2 354
  • 正文 我出身青樓凰棉,卻偏偏與公主長得像,于是被迫代替她去往敵國和親陌粹。 傳聞我的和親對象是個殘疾皇子撒犀,可洞房花燭夜當晚...
    茶點故事閱讀 42,802評論 2 345