Hystrix超時實現(xiàn)機制

HystrixCommand在執(zhí)行的過程中如何探測超時件蚕,本篇主要對此進行介紹說明附鸽。

1.主入口:executeCommandAndObserve

#com.netflix.hystrix.AbstractCommand#executeCommandAndObserve
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
        ···省略部分代碼···
        Observable<R> execution;

        //判斷是否開啟超時監(jiān)測
        if (properties.executionTimeoutEnabled().get()) {
            execution = executeCommandWithSpecifiedIsolation(_cmd)
                    .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
        } else {
            execution = executeCommandWithSpecifiedIsolation(_cmd);
        }

        return execution.doOnNext(markEmits)
                .doOnCompleted(markOnCompleted)
                .onErrorResumeNext(handleFallback)
                .doOnEach(setRequestContext);
    }

executeCommandWithSpecifiedIsolation(_cmd) .lift(new HystrixObservableTimeoutOperator<R>(_cmd));

可以簡單的認為lift 里面的對前面的Observable包含尝盼,類似裝飾者,后面的parent就是指上層的Observable顾瞪。其中 HystrixObservableTimeoutOperator 就是關鍵的部分汪诉。

2.關鍵點: HystrixObservableTimeoutOperator

先看下HystrixObservableTimeoutOperator.call(),TimerListener的實現(xiàn)

TimerListener listener = new TimerListener() {

                @Override
                public void tick() {
                   
                    if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
                        // 標記事件椭赋,可以認為是開的hook抚岗,這里暫忽略
                        originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);

                        //取消原Obserable的訂閱
                        s.unsubscribe();

                        final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {

                            @Override
                            public void run() {
                                child.onError(new HystrixTimeoutException());
                            }
                        });
                        timeoutRunnable.run();
                    }
                }

                //獲取配置的超時時間配置
                @Override
                public int getIntervalTimeInMilliseconds() {
                    return originalCommand.properties.executionTimeoutInMilliseconds().get();
                }
            };

這段代碼的意思就是,給當前command的超時狀態(tài)置為超時哪怔,如果設置成功就拋出HystrixTimeoutException異常宣蔚,緊接著被command的 doOnErron接收走 fallback邏輯

fallback
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
        final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();

        .................................

        final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
            @Override
            public Observable<R> call(Throwable t) {
                circuitBreaker.markNonSuccess();
                Exception e = getExceptionFromThrowable(t);
                executionResult = executionResult.setExecutionException(e);
                if (e instanceof RejectedExecutionException) {
                    return handleThreadPoolRejectionViaFallback(e);
                } else if (t instanceof HystrixTimeoutException) {
                    //此處catch到超時異常
                    return handleTimeoutViaFallback();
                } else if (t instanceof HystrixBadRequestException) {
                    return handleBadRequestByEmittingError(e);
                } else {
                    /*
                     * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
                     */
                    if (e instanceof HystrixBadRequestException) {
                        eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                        return Observable.error(e);
                    }

                    return handleFailureViaFallback(e);
                }
            }
        };

        .................................

        return execution.doOnNext(markEmits)
                .doOnCompleted(markOnCompleted)
                .onErrorResumeNext(handleFallback)
                .doOnEach(setRequestContext);
    }

同時s.unsubscribe()通知正在執(zhí)行的線程,終止任務认境。如何終止呢胚委?

executeCommandWithSpecifiedIsolation.subscribeOn()

subscribeOne的參數(shù)就是HystrixContextScheduler, Rxjava里 scheduler具體干活的是 worker,我們先看下Hystrix自定義scheduler的結構示意圖


那么我們直奔主題叉信,直接看 ThreadPoolWorker

//ThreadPoolWorker.schedule
@Override
public Subscription schedule(final Action0 action) {
    if (subscription.isUnsubscribed()) {
        return Subscriptions.unsubscribed();
    }

    ScheduledAction sa = new ScheduledAction(action);

    subscription.add(sa);
    sa.addParent(subscription);

    ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
    FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
    sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));

    return sa;
}

1.開始的時候判斷observable是否被訂閱
2.被訂閱后亩冬,將任務 submit到線程池
3.FutureCompleterWithConfigurableInterrupt scheduler在執(zhí)行的時候,增加了observable的中斷探測

private static class FutureCompleterWithConfigurableInterrupt implements Subscription {
    private final FutureTask<?> f;
    private final Func0<Boolean> shouldInterruptThread;
    private final ThreadPoolExecutor executor;

    private FutureCompleterWithConfigurableInterrupt(FutureTask<?> f, Func0<Boolean> shouldInterruptThread, ThreadPoolExecutor executor) {
        this.f = f;
        this.shouldInterruptThread = shouldInterruptThread;
        this.executor = executor;
    }

    @Override
    public void unsubscribe() {
        executor.remove(f);
        if (shouldInterruptThread.call()) {
            f.cancel(true);
        } else {
            f.cancel(false);
        }
    }

    .....省略代碼.......
}

當observable 取消訂閱時硼身,就會把當前任務移除硅急,并中斷任務

到這里只是講說了超時后的處理,如何認定執(zhí)行超時呢佳遂?

3.匠心之巧

這里有個很巧妙的設計营袜,再探HystrixObservableTimeoutOperator

final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);

#com.netflix.hystrix.util.HystrixTimer#addTimerListener
public Reference<TimerListener> addTimerListener(final TimerListener listener) {
        startThreadIfNeeded();
        // add the listener

        Runnable r = new Runnable() {

            @Override
            public void run() {
                try {
                    listener.tick();
                } catch (Exception e) {
                    logger.error("Failed while ticking TimerListener", e);
                }
            }
        };

        ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
        return new TimerReference(listener, f);
    }

利用了ScheduledThreadPoolExecutor,延遲執(zhí)行丑罪,延遲時間就是我們設定的超時時間荚板,我們再看下

#HystrixObservableTimeoutOperator
Subscriber<R> parent = new Subscriber<R>() {

                @Override
                public void onCompleted() {
                    if (isNotTimedOut()) {
                        // stop timer and pass notification through
                        tl.clear();
                        child.onCompleted();
                    }
                }

                @Override
                public void onError(Throwable e) {
                    if (isNotTimedOut()) {
                        // stop timer and pass notification through
                        tl.clear();
                        child.onError(e);
                    }
                }

                .....  .....  .....  .....  .....  .....  .....  .....  .....

                private boolean isNotTimedOut() {
                    // if already marked COMPLETED (by onNext) or succeeds in setting to COMPLETED
                    return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||
                            originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);
                }

            };

這里parent就是指上層的obserable,這里可以抽象的認為是我們的HystrixCommand執(zhí)行線程, 當command執(zhí)行線程執(zhí)行完成的時候或異常的時候吩屹,會執(zhí)行 tl.clear()跪另, 也就是Future.cancel()會中斷 TimerListener 的ScheduledFuture 線程,迫使超時機制失效煤搜。

// tl.clear()
private static class TimerReference extends SoftReference<TimerListener> {
        private final ScheduledFuture<?> f;
        ....        ....        ....        ....        ....
        @Override
        public void clear() {
            super.clear();
            // stop this ScheduledFuture from any further executions
            f.cancel(false);
        }
    }

4.回歸文字

HystrixCommand里有個 TimedOutStatus 超時狀態(tài)

TimedOutStatus

現(xiàn)在可以認為有兩個線程免绿,一個是hystrixCommand任務執(zhí)行線程,一個是等著給hystrixCommand判定超時的線程宅楞,現(xiàn)在兩個線程看誰能先把hystrixCommand的狀態(tài)置換针姿,只要任何一個線程對hystrixCommand打上標就意味著超時判定結束。

系列文章推薦
Hystrix熔斷框架介紹
Hystrix常用功能介紹
Hystrix執(zhí)行原理
Hystrix熔斷器執(zhí)行機制
Hystrix超時實現(xiàn)機制

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末厌衙,一起剝皮案震驚了整個濱河市距淫,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌婶希,老刑警劉巖榕暇,帶你破解...
    沈念sama閱讀 218,858評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異喻杈,居然都是意外死亡彤枢,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,372評論 3 395
  • 文/潘曉璐 我一進店門筒饰,熙熙樓的掌柜王于貴愁眉苦臉地迎上來缴啡,“玉大人,你說我怎么就攤上這事瓷们∫嫡ぃ” “怎么了?”我有些...
    開封第一講書人閱讀 165,282評論 0 356
  • 文/不壞的土叔 我叫張陵谬晕,是天一觀的道長碘裕。 經(jīng)常有香客問我,道長攒钳,這世上最難降的妖魔是什么帮孔? 我笑而不...
    開封第一講書人閱讀 58,842評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮不撑,結果婚禮上文兢,老公的妹妹穿的比我還像新娘。我一直安慰自己燎孟,他們只是感情好禽作,可當我...
    茶點故事閱讀 67,857評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著揩页,像睡著了一般旷偿。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上爆侣,一...
    開封第一講書人閱讀 51,679評論 1 305
  • 那天萍程,我揣著相機與錄音,去河邊找鬼兔仰。 笑死茫负,一個胖子當著我的面吹牛,可吹牛的內容都是我干的乎赴。 我是一名探鬼主播忍法,決...
    沈念sama閱讀 40,406評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼潮尝,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了饿序?” 一聲冷哼從身側響起勉失,我...
    開封第一講書人閱讀 39,311評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎原探,沒想到半個月后乱凿,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,767評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡咽弦,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年徒蟆,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片型型。...
    茶點故事閱讀 40,090評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡段审,死狀恐怖,靈堂內的尸體忽然破棺而出输莺,到底是詐尸還是另有隱情戚哎,我是刑警寧澤,帶...
    沈念sama閱讀 35,785評論 5 346
  • 正文 年R本政府宣布嫂用,位于F島的核電站型凳,受9級特大地震影響,放射性物質發(fā)生泄漏嘱函。R本人自食惡果不足惜甘畅,卻給世界環(huán)境...
    茶點故事閱讀 41,420評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望往弓。 院中可真熱鬧疏唾,春花似錦、人聲如沸函似。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,988評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽撇寞。三九已至顿天,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間蔑担,已是汗流浹背牌废。 一陣腳步聲響...
    開封第一講書人閱讀 33,101評論 1 271
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留啤握,地道東北人鸟缕。 一個月前我還...
    沈念sama閱讀 48,298評論 3 372
  • 正文 我出身青樓,卻偏偏與公主長得像,于是被迫代替她去往敵國和親懂从。 傳聞我的和親對象是個殘疾皇子授段,可洞房花燭夜當晚...
    茶點故事閱讀 45,033評論 2 355