HystrixCommand在執(zhí)行的過程中如何探測超時件蚕,本篇主要對此進行介紹說明附鸽。
1.主入口:executeCommandAndObserve
#com.netflix.hystrix.AbstractCommand#executeCommandAndObserve
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
···省略部分代碼···
Observable<R> execution;
//判斷是否開啟超時監(jiān)測
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
executeCommandWithSpecifiedIsolation(_cmd) .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
可以簡單的認為lift 里面的對前面的Observable包含尝盼,類似裝飾者,后面的parent就是指上層的Observable顾瞪。其中 HystrixObservableTimeoutOperator 就是關鍵的部分汪诉。
2.關鍵點: HystrixObservableTimeoutOperator
先看下HystrixObservableTimeoutOperator.call(),TimerListener的實現(xiàn)
TimerListener listener = new TimerListener() {
@Override
public void tick() {
if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
// 標記事件椭赋,可以認為是開的hook抚岗,這里暫忽略
originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);
//取消原Obserable的訂閱
s.unsubscribe();
final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {
@Override
public void run() {
child.onError(new HystrixTimeoutException());
}
});
timeoutRunnable.run();
}
}
//獲取配置的超時時間配置
@Override
public int getIntervalTimeInMilliseconds() {
return originalCommand.properties.executionTimeoutInMilliseconds().get();
}
};
這段代碼的意思就是,給當前command的超時狀態(tài)置為超時哪怔,如果設置成功就拋出HystrixTimeoutException
異常宣蔚,緊接著被command的 doOnErron接收走 fallback邏輯
fallback
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
.................................
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
circuitBreaker.markNonSuccess();
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
//此處catch到超時異常
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
/*
* Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
*/
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
return handleFailureViaFallback(e);
}
}
};
.................................
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
同時s.unsubscribe()
通知正在執(zhí)行的線程,終止任務认境。如何終止呢胚委?
executeCommandWithSpecifiedIsolation.subscribeOn()
subscribeOne的參數(shù)就是HystrixContextScheduler
, Rxjava里 scheduler具體干活的是 worker
,我們先看下Hystrix自定義scheduler的結構示意圖
那么我們直奔主題叉信,直接看
ThreadPoolWorker
//ThreadPoolWorker.schedule
@Override
public Subscription schedule(final Action0 action) {
if (subscription.isUnsubscribed()) {
return Subscriptions.unsubscribed();
}
ScheduledAction sa = new ScheduledAction(action);
subscription.add(sa);
sa.addParent(subscription);
ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
return sa;
}
1.開始的時候判斷observable是否被訂閱
2.被訂閱后亩冬,將任務 submit到線程池
3.FutureCompleterWithConfigurableInterrupt
scheduler在執(zhí)行的時候,增加了observable的中斷探測
private static class FutureCompleterWithConfigurableInterrupt implements Subscription {
private final FutureTask<?> f;
private final Func0<Boolean> shouldInterruptThread;
private final ThreadPoolExecutor executor;
private FutureCompleterWithConfigurableInterrupt(FutureTask<?> f, Func0<Boolean> shouldInterruptThread, ThreadPoolExecutor executor) {
this.f = f;
this.shouldInterruptThread = shouldInterruptThread;
this.executor = executor;
}
@Override
public void unsubscribe() {
executor.remove(f);
if (shouldInterruptThread.call()) {
f.cancel(true);
} else {
f.cancel(false);
}
}
.....省略代碼.......
}
當observable 取消訂閱時硼身,就會把當前任務移除硅急,并中斷任務
到這里只是講說了超時后的處理,如何認定執(zhí)行超時呢佳遂?
3.匠心之巧
這里有個很巧妙的設計营袜,再探HystrixObservableTimeoutOperator
final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);
#com.netflix.hystrix.util.HystrixTimer#addTimerListener
public Reference<TimerListener> addTimerListener(final TimerListener listener) {
startThreadIfNeeded();
// add the listener
Runnable r = new Runnable() {
@Override
public void run() {
try {
listener.tick();
} catch (Exception e) {
logger.error("Failed while ticking TimerListener", e);
}
}
};
ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
return new TimerReference(listener, f);
}
利用了ScheduledThreadPoolExecutor
,延遲執(zhí)行丑罪,延遲時間就是我們設定的超時時間荚板,我們再看下
#HystrixObservableTimeoutOperator
Subscriber<R> parent = new Subscriber<R>() {
@Override
public void onCompleted() {
if (isNotTimedOut()) {
// stop timer and pass notification through
tl.clear();
child.onCompleted();
}
}
@Override
public void onError(Throwable e) {
if (isNotTimedOut()) {
// stop timer and pass notification through
tl.clear();
child.onError(e);
}
}
..... ..... ..... ..... ..... ..... ..... ..... .....
private boolean isNotTimedOut() {
// if already marked COMPLETED (by onNext) or succeeds in setting to COMPLETED
return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||
originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);
}
};
這里parent就是指上層的obserable,這里可以抽象的認為是我們的HystrixCommand執(zhí)行線程, 當command執(zhí)行線程執(zhí)行完成的時候或異常的時候吩屹,會執(zhí)行 tl.clear()跪另, 也就是Future.cancel()會中斷 TimerListener
的ScheduledFuture 線程,迫使超時機制失效煤搜。
// tl.clear()
private static class TimerReference extends SoftReference<TimerListener> {
private final ScheduledFuture<?> f;
.... .... .... .... ....
@Override
public void clear() {
super.clear();
// stop this ScheduledFuture from any further executions
f.cancel(false);
}
}
4.回歸文字
HystrixCommand里有個 TimedOutStatus
超時狀態(tài)
現(xiàn)在可以認為有兩個線程免绿,一個是hystrixCommand任務執(zhí)行線程,一個是等著給hystrixCommand判定超時的線程宅楞,現(xiàn)在兩個線程看誰能先把hystrixCommand的狀態(tài)置換针姿,只要任何一個線程對hystrixCommand打上標就意味著超時判定結束。
系列文章推薦
Hystrix熔斷框架介紹
Hystrix常用功能介紹
Hystrix執(zhí)行原理
Hystrix熔斷器執(zhí)行機制
Hystrix超時實現(xiàn)機制