前奏
Hystrix的常規(guī)使用姿勢
@Test
public void test_run(){
String s = new CommandHelloWorld("Bob").execute();
System.out.println(s);
}
我們的command在new的時候發(fā)生了什么邓嘹?execute()是如何執(zhí)行的歌馍?execute執(zhí)行失敗或者超時如何fallback?
一滚朵、PREPARE 初始化
當我們new XXCommand()的時候湾碎,大部分的工作都是在 AbstractCommand
完成
protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
this.commandGroup = initGroupKey(group);
this.commandKey = initCommandKey(key, getClass());
this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);
//Strategies from plugins
this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties);
this.executionHook = initExecutionHook(executionHook);
this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy);
this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy);
/* fallback semaphore override if applicable */
this.fallbackSemaphoreOverride = fallbackSemaphore;
/* execution semaphore override if applicable */
this.executionSemaphoreOverride = executionSemaphore;
}
可以很清晰的看到尸闸,這里面在進行command配置裝載、線程池配置裝載及線程池的創(chuàng)建兵拢、指標搜集器翻斟、熔斷器的初始化等等。
//HystrixCommandMetrics
ConcurrentHashMap<String, HystrixCommandMetrics> metrics = new ConcurrentHashMap<String, HystrixCommandMetrics>();
//HystrixThreadPoolDefault
final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();
//com.netflix.hystrix.HystrixCircuitBreaker.Factory
private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();
除HystrixCommand每次都需要重新建立说铃,其它基本都以commandKey維護著配置访惜,熔斷器,指標的單例而線程池則以threadkey進場存儲腻扇。
我們可以了了解下Hystrix的線程池如何管理
創(chuàng)建線程調(diào)用 HystrixThreadPool.Factory.getInstance
static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
// get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work
String key = threadPoolKey.name();
// this should find it for all but the first time
HystrixThreadPool previouslyCached = threadPools.get(key);
if (previouslyCached != null) {
return previouslyCached;
}
// if we get here this is the first time so we need to initialize
synchronized (HystrixThreadPool.class) {
if (!threadPools.containsKey(key)) {
threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
}
}
return threadPools.get(key);
}
從緩存中以threadPoolKey獲取線程池债热,獲取不到則 調(diào)用new HystrixThreadPoolDefault
新建
public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
this.queueSize = properties.maxQueueSize().get();
this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
concurrencyStrategy.getThreadPool(threadPoolKey, properties),
properties);
this.threadPool = this.metrics.getThreadPool();
this.queue = this.threadPool.getQueue();
/* strategy: HystrixMetricsPublisherThreadPool */
HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
}
注意
this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,concurrencyStrategy.getThreadPool(threadPoolKey, properties),properties);
其中 concurrencyStrategy.getThreadPool,HystrixConcurrencyStrategy
就是hystrix的線程創(chuàng)建策略者
真正的創(chuàng)建線程執(zhí)行
HystrixConcurrencyStrategy#getThreadPool
public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
.....各種配置幼苛,此處代碼省略......
if (allowMaximumSizeToDivergeFromCoreSize) {
final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
if (dynamicCoreSize > dynamicMaximumSize) {
logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " +
dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
} else {
return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
}
} else {
return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
}
}
這里調(diào)用java JUC原生的 ThreadPoolExecutor創(chuàng)建線程
二窒篱、Observable 大串燒
Hystrix的執(zhí)行利用RxJava,組合了很多的Observable,形成一個Observable墙杯,和傳統(tǒng)的調(diào)用鏈相比更加簡潔配并。
三、各色Observable顯神通
3.1.command 狀態(tài)位
-
toObservable
第一個observable高镐,在下一個chain之前溉旋,會更改HystrixCommand狀態(tài)位OBSERVABLE_CHAIN_CREATED
-
toObservable
doOnTerminate,探測到terminate時嫉髓,會將HystrixCommand更改為TERMINAL
-
executeCommandWithSpecifiedIsolation
在開始執(zhí)行的時候會更改HystrixCommand更改為USER_CODE_EXECUTED
-
toObservable
doOnUnsubscribe观腊,探測到terminate時,會將HystrixCommand更改為UNSUBSCRIBED
3.2.executeCommandWithSpecifiedIsolation
分配執(zhí)行線程算行,維護線程狀態(tài)
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
// mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
.....省略干擾代碼.....
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}
if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
// the command timed out in the wrapping thread so we will return immediately
// and not increment any of the counters below or other such logic
return Observable.error(new RuntimeException("timed out before executing run()"));
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
try {
.....省略干擾代碼.....
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
return Observable.error(ex);
}
} else {
//command has already been unsubscribed, so return immediately
return Observable.error(new RuntimeException("unsubscribed before executing run()"));
}
}
}).doOnTerminate(new Action0() {
@Override
public void call() {
if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
handleThreadEnd(_cmd);
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {
//if it was never started and received terminal, then no need to clean up (I don't think this is possible)
}
//if it was unsubscribed, then other cleanup handled it
}
}).doOnUnsubscribe(new Action0() {
@Override
public void call() {
if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
handleThreadEnd(_cmd);
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {
//if it was never started and was cancelled, then no need to clean up
}
//if it was terminal, then other cleanup handled it
}
}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}));
} else {
.....省略干擾代碼.....
}
}
具體邏輯
1.判斷隔離策略梧油,如果是Semaphore 信號量則在當前線程上執(zhí)行,否則進入線程分配邏輯
2.更改HystrixCommand的狀態(tài) USER_CODE_EXECUTED
3.判斷HystrixCommand超時狀態(tài)纱意,如果已經(jīng)超時則拋出異常
4.更改當前command的線程執(zhí)行狀態(tài)為 STARTED
5.調(diào)用 getUserExecutionObservable 執(zhí)行具體邏輯
6.doOnTerminate
當Observale執(zhí)行完畢后(HystrixCommand可能失敗也可能執(zhí)行成功)婶溯,此時的線程狀態(tài)可能有兩種分別是 STARTED
和 NOT_USING_THREAD
鲸阔, 然后更改線程狀態(tài)為 TERMINAL
7.doOnUnsubscribe
當Observable被取消訂閱偷霉,更改線程狀態(tài)為 TERMINAL
8.subscribeOn
指定scheduler,這里Hystrix實現(xiàn)了自己的scheduler褐筛,在scheduler的worker指定線程池类少,在配置線程之前會重新加載線程池配置(這里是Rxjava的東西,暫時大家可以粗略的認為這里就是指定線程池渔扎,然后把要執(zhí)行的任務扔到這個線程池里)
@Override
public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
touchConfig();
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}
// allow us to change things via fast-properties by setting it each time
private void touchConfig() {
final int dynamicCoreSize = properties.coreSize().get();
final int configuredMaximumSize = properties.maximumSize().get();
int dynamicMaximumSize = properties.actualMaximumSize();
final boolean allowSizesToDiverge = properties.getAllowMaximumSizeToDivergeFromCoreSize().get();
boolean maxTooLow = false;
if (allowSizesToDiverge && configuredMaximumSize < dynamicCoreSize) {
//if user sets maximum < core (or defaults get us there), we need to maintain invariant of core <= maximum
dynamicMaximumSize = dynamicCoreSize;
maxTooLow = true;
}
// In JDK 6, setCorePoolSize and setMaximumPoolSize will execute a lock operation. Avoid them if the pool size is not changed.
if (threadPool.getCorePoolSize() != dynamicCoreSize || (allowSizesToDiverge && threadPool.getMaximumPoolSize() != dynamicMaximumSize)) {
if (maxTooLow) {
logger.error("Hystrix ThreadPool configuration for : " + metrics.getThreadPoolKey().name() + " is trying to set coreSize = " +
dynamicCoreSize + " and maximumSize = " + configuredMaximumSize + ". Maximum size will be set to " +
dynamicMaximumSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
}
threadPool.setCorePoolSize(dynamicCoreSize);
threadPool.setMaximumPoolSize(dynamicMaximumSize);
}
threadPool.setKeepAliveTime(properties.keepAliveTimeMinutes().get(), TimeUnit.MINUTES);
}
touchConfig
執(zhí)行具體的線程池參數(shù)調(diào)整硫狞。
從上面的過程也能發(fā)現(xiàn),該observable也是維護線程狀態(tài)的地方晃痴,線程的狀態(tài)變更見下圖
3.3.getUserExecutionObservable
執(zhí)行具體業(yè)務邏輯
private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
Observable<R> userObservable;
try {
userObservable = getExecutionObservable();
} catch (Throwable ex) {
// the run() method is a user provided implementation so can throw instead of using Observable.onError
// so we catch it here and turn it into Observable.error
userObservable = Observable.error(ex);
}
return userObservable
.lift(new ExecutionHookApplication(_cmd))
.lift(new DeprecatedOnRunHookApplication(_cmd));
}
userObservable = getExecutionObservable();
由HystrixCommand自己實現(xiàn)
//HystrixCommand
final protected Observable<R> getExecutionObservable() {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
try {
return Observable.just(run());
} catch (Throwable ex) {
return Observable.error(ex);
}
}
}).doOnSubscribe(new Action0() {
@Override
public void call() {
// Save thread on which we get subscribed so that we can interrupt it later if needed
executionThread.set(Thread.currentThread());
}
});
}
這里看到 run()
應該就明白了残吩,就是我們自己的業(yè)務代碼 CommandHelloWorld
去實現(xiàn)的。
3.4.getFallbackOrThrowException
當executeCommandWithSpecifiedIsolation探測到異常時觸發(fā)該Observable倘核。getFallbackOrThrowException里具體fallback執(zhí)行看
executeCommandAndObserve泣侮。
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
.....省略干擾代碼.....
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
.....省略干擾代碼.....
};
.....省略干擾代碼.....
Observable<R> execution;
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
doErrorResumeNext 會觸發(fā)下一個 handleFallback。
private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {
....省略干擾代碼....
if (isUnrecoverable(originalException)) {
....省略干擾代碼....
} else {
....省略干擾代碼....
if (properties.fallbackEnabled().get()) {
....省略干擾代碼....
Observable<R> fallbackExecutionChain;
// acquire a permit
if (fallbackSemaphore.tryAcquire()) {
try {
if (isFallbackUserDefined()) {
executionHook.onFallbackStart(this);
fallbackExecutionChain = getFallbackObservable();
} else {
//same logic as above without the hook invocation
fallbackExecutionChain = getFallbackObservable();
}
} catch (Throwable ex) {
//If hook or user-fallback throws, then use that as the result of the fallback lookup
fallbackExecutionChain = Observable.error(ex);
}
return fallbackExecutionChain
.doOnEach(setRequestContext)
.lift(new FallbackHookApplication(_cmd))
.lift(new DeprecatedOnFallbackHookApplication(_cmd))
.doOnNext(markFallbackEmit)
.doOnCompleted(markFallbackCompleted)
.onErrorResumeNext(handleFallbackError)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} else {
return handleFallbackRejectionByEmittingError();
}
} else {
return handleFallbackDisabledByEmittingError(originalException, failureType, message);
}
}
}
這里優(yōu)先幾個步驟
1.判斷異常是否是能走fallback處理紧唱,不能則拋出HystrixRuntimeException
2.判斷配置是否開啟允許fallback活尊,開啟,則進入 getFallbackObservable()
漏益,而該方法具體有HystrixCommand實現(xiàn)蛹锰,調(diào)用的則是用戶的Command的fallback方法,如果調(diào)用方?jīng)]有覆蓋該方法绰疤,則會執(zhí)行HystrixCommand的fallback方法铜犬,拋出未定義fallback方法的異常
protected R getFallback() {
throw new UnsupportedOperationException("No fallback available.");
}
@Override
final protected Observable<R> getFallbackObservable() {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
try {
//調(diào)用方 fallback邏輯
return Observable.just(getFallback());
} catch (Throwable ex) {
return Observable.error(ex);
}
}
});
}
系列文章推薦
Hystrix熔斷框架介紹
Hystrix常用功能介紹
Hystrix執(zhí)行原理
Hystrix熔斷器執(zhí)行機制
Hystrix超時實現(xiàn)機制