Hystrix簡(jiǎn)介
??在分布式系統(tǒng)中,難免有對(duì)外部接口的依賴痊项,而外部接口有可能出現(xiàn)響應(yīng)緩慢痛垛,大量請(qǐng)求超時(shí)燃乍,大量訪問(wèn)出現(xiàn)異常等情況。出現(xiàn)上面所說(shuō)的情況有可能是由很多原因?qū)е频呐赡苁蔷W(wǎng)絡(luò)抖動(dòng),外部系統(tǒng)有沒(méi)有測(cè)出的bug,系統(tǒng)遭遇黑客攻擊等吼驶。因?yàn)橐粋€(gè)接口的異常,有可能導(dǎo)制線程阻塞店煞,影響到其它接口的服務(wù)蟹演,甚至整個(gè)系統(tǒng)的服務(wù)給拖跨,對(duì)外部系統(tǒng)依賴的模塊越多顷蟀,出現(xiàn)的風(fēng)險(xiǎn)也就會(huì)越高酒请,Hystrix正是用于解決這樣的問(wèn)題。Hystrix同樣是Netflix公司開(kāi)源的用于解決分布式問(wèn)題而開(kāi)源的框架鸣个。源碼網(wǎng)址為:https://github.com/Netflix/Hystrix羞反。Hystrix提供了如下幾種解決方案應(yīng)對(duì)上面說(shuō)的問(wèn)題,分別為:
- 線程池隔離
- 信號(hào)量隔離
- 熔斷
- 降級(jí)回退
Hystrix 版的 Hello World
- 在pom.xml文件里引入Hystrix依賴的類
<dependencies>
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-core</artifactId>
<version>1.5.13</version>
</dependency>
</dependencies>
- 編寫(xiě)業(yè)務(wù)Command
package com.ivan.client.hystrix;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
public class HelloCommand extends HystrixCommand<String> {
protected HelloCommand() {
super(HystrixCommandGroupKey.Factory.asKey("test"));
}
@Override
protected String run() throws Exception {
//模擬請(qǐng)求外部接口需要的時(shí)間長(zhǎng)度
Thread.sleep(500);
return "sucess";
}
@Override
protected String getFallback() {
//當(dāng)外部請(qǐng)求超時(shí)后囤萤,會(huì)執(zhí)行fallback里的業(yè)務(wù)邏輯
System.out.println("執(zhí)行了回退方法");
return "error";
}
}
- 模擬系統(tǒng)調(diào)用
package com.ivan.client.hystrix;
public class App {
public static void main(String[] args) {
HelloCommand command = new HelloCommand();
String result = command.execute();
System.out.println(result);
}
}
當(dāng)我們?cè)龃?HelloCommand run方法里Thread.sleep()方法的時(shí)長(zhǎng)時(shí)昼窗,我們可以看到 command.execute()方法調(diào)用返回了error。在實(shí)際的使用中涛舍,當(dāng)發(fā)現(xiàn)第三方接口調(diào)用不通的情況下澄惊,會(huì)調(diào)用fallback方法進(jìn)行降級(jí)處理,比如可以返回一段錯(cuò)誤提示富雅。
Hystrix線程池隔離
?? 在分布式的系統(tǒng)里掸驱,系統(tǒng)可能對(duì)多個(gè)外部系統(tǒng)都有依賴關(guān)系,比同訂單系統(tǒng)同時(shí)對(duì)會(huì)員系統(tǒng)没佑,庫(kù)存系統(tǒng)統(tǒng)毕贼,優(yōu)惠券系統(tǒng)都有依賴。假如優(yōu)惠券系統(tǒng)出現(xiàn)訪問(wèn)異常的時(shí)候蛤奢,會(huì)超成線程的堆積鬼癣,對(duì)于系統(tǒng)調(diào)用庫(kù)存系統(tǒng)與會(huì)員系統(tǒng)的業(yè)務(wù)也不可用。而通過(guò)線程池能夠?qū)⒉煌臉I(yè)務(wù)由不同的線程池處理远剩,從而做到保護(hù)其它業(yè)務(wù)能夠正常訪問(wèn)扣溺。下面就來(lái)看看Hystrix是根據(jù)什么來(lái)創(chuàng)建線程的。
- 找到HystrixCommand的父類AbstractCommand瓜晤, 里面有個(gè)構(gòu)造方法锥余,從構(gòu)造方法可以看出里這里定義了 threadPool對(duì)象。代碼如下痢掠,關(guān)鍵代碼都有做相應(yīng)的注釋
/**
這個(gè)方法是AbstractCommand的構(gòu)造方法驱犹,里面用于初使化AbstractCommand嘲恍,包括circuitBreaker 與線程池對(duì)象都在這里進(jìn)行構(gòu)造
**/
protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
//commandGroup對(duì)象,用于組織一類業(yè)務(wù)相關(guān)的對(duì)象
this.commandGroup = initGroupKey(group);
// commandKey默認(rèn)是以類為為名稱的
this.commandKey = initCommandKey(key, getClass());
this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
//這個(gè)方法里定義了TheradPool里的關(guān)鍵字雄驹,默認(rèn)以傳入的commandGroup 的name做為key的名稱
this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
//這里就是線程池對(duì)象啦佃牛。
this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);
//Strategies from plugins
this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties);
this.executionHook = initExecutionHook(executionHook);
this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy);
this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy);
/* fallback semaphore override if applicable */
this.fallbackSemaphoreOverride = fallbackSemaphore;
/* execution semaphore override if applicable */
this.executionSemaphoreOverride = executionSemaphore;
}
/**
這個(gè)方法用于得到HystrixThreadPoolKey 對(duì)象, Hystrix內(nèi)部有大量的Key對(duì)象医舆,可以簡(jiǎn)單理解這些 Key都是相應(yīng)對(duì)象的唯一標(biāo)識(shí)俘侠。從代碼里可以看出,默認(rèn)情況下Hystrix采用的是commandGroup 的name做為T(mén)hread Pool的key值蔬将。
**/
private static HystrixThreadPoolKey initThreadPoolKey(HystrixThreadPoolKey threadPoolKey, HystrixCommandGroupKey groupKey, String threadPoolKeyOverride) {
if (threadPoolKeyOverride == null) {
// we don't have a property overriding the value so use either HystrixThreadPoolKey or HystrixCommandGroup
if (threadPoolKey == null) {
/* use HystrixCommandGroup if HystrixThreadPoolKey is null */
return HystrixThreadPoolKey.Factory.asKey(groupKey.name());
} else {
return threadPoolKey;
}
} else {
// we have a property defining the thread-pool so use it instead
return HystrixThreadPoolKey.Factory.asKey(threadPoolKeyOverride);
}
}
/**
在這里將調(diào)用具體的構(gòu)造線程池的方法爷速。
**/
private static HystrixThreadPool initThreadPool(HystrixThreadPool fromConstructor, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults) {
if (fromConstructor == null) {
// get the default implementation of HystrixThreadPool
return HystrixThreadPool.Factory.getInstance(threadPoolKey, threadPoolPropertiesDefaults);
} else {
return fromConstructor;
}
}
從上面的代碼分析我們知道線程池的構(gòu)造最終會(huì)落到HystrixThreadPool.Factory這個(gè)類上面。這個(gè)類內(nèi)存持有一個(gè)ConcurrentHashMap用于緩存線程池對(duì)象霞怀,當(dāng)傳入的HystrixThreadPoolKey已經(jīng)構(gòu)造過(guò)了相應(yīng)的ThreadPool惫东,將會(huì)直接從ConcurrentHashMap里返回已經(jīng)生成的ThreadPool。如果傳入的HystrixThreadPoolKey沒(méi)有相應(yīng)的ThreadPool毙石,將構(gòu)造新的ThreadPool并放入到ConcurrentHashMap這個(gè)緩存對(duì)象上廉沮。下面是關(guān)鍵代碼:
static class Factory {
final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();
static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
// get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work
String key = threadPoolKey.name();
// this should find it for all but the first time
//這里從緩存取
HystrixThreadPool previouslyCached = threadPools.get(key);
if (previouslyCached != null) {
return previouslyCached;
}
// if we get here this is the first time so we need to initialize
//這里需要保證線程安全,加上了相應(yīng)的鎖
synchronized (HystrixThreadPool.class) {
if (!threadPools.containsKey(key)) {
//具體的線程池是由HystrixThreadPoolDefault進(jìn)行構(gòu)造的
threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
}
}
return threadPools.get(key);
}
}
HystrixThreadPoolDefault 內(nèi)部通過(guò)HystrixConcurrencyStrategy這個(gè)對(duì)象進(jìn)行線程池的構(gòu)造徐矩,里面根據(jù)傳入的properties信息來(lái)構(gòu)造線程池對(duì)象滞时。 關(guān)鍵代碼如下:
static class HystrixThreadPoolDefault implements HystrixThreadPool {
private static final Logger logger = LoggerFactory.getLogger(HystrixThreadPoolDefault.class);
private final HystrixThreadPoolProperties properties;
private final BlockingQueue<Runnable> queue;
private final ThreadPoolExecutor threadPool;
private final HystrixThreadPoolMetrics metrics;
private final int queueSize;
public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
this.queueSize = properties.maxQueueSize().get();
this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
concurrencyStrategy.getThreadPool(threadPoolKey, properties),
properties);
this.threadPool = this.metrics.getThreadPool();
this.queue = this.threadPool.getQueue();
/* strategy: HystrixMetricsPublisherThreadPool */
HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
}
}
HystrixConcurrencyStrategy 類里我們可以看到采用的我們熟悉的ThreadPoolExecutor對(duì)象來(lái)構(gòu)造線程池。 里面需要傳入核心線程池的大小滤灯,最大線程數(shù)漂洋,隊(duì)列等關(guān)鍵信息。關(guān)鍵代碼如下:
public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);
final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
final int dynamicCoreSize = threadPoolProperties.coreSize().get();
final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);
if (allowMaximumSizeToDivergeFromCoreSize) {
final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
if (dynamicCoreSize > dynamicMaximumSize) {
logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " +
dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
} else {
return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
}
} else {
return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
}
}
從上面代碼的分析我們可以得出力喷,線程池是以HystrixCommandGroupKey進(jìn)行劃分的,不同的CommandGroup有不同的線程池來(lái)處理演训,而這個(gè)CommandGroup在我們的分布式系統(tǒng)中弟孟,可以把相關(guān)的業(yè)務(wù)處理放到一個(gè)CommandGroup中。
Hystrix熔斷
??熔斷器样悟,現(xiàn)實(shí)生活中有一個(gè)很好的類比拂募,就是家庭電路中都會(huì)安裝一個(gè)保險(xiǎn)盒,當(dāng)電流過(guò)大的時(shí)候保險(xiǎn)盒里面的保險(xiǎn)絲會(huì)自動(dòng)斷掉窟她,來(lái)保護(hù)家里的各種電器及電路陈症。Hystrix中的熔斷器(Circuit Breaker)也是起到這樣的作用,Hystrix在運(yùn)行過(guò)程中會(huì)向每個(gè)CommandKey對(duì)應(yīng)的熔斷器報(bào)告成功震糖、失敗录肯、超時(shí)和拒絕的狀態(tài),熔斷器維護(hù)計(jì)算統(tǒng)計(jì)的數(shù)據(jù)吊说,根據(jù)這些統(tǒng)計(jì)的信息來(lái)確定熔斷器是否打開(kāi)论咏。如果打開(kāi)优炬,后續(xù)的請(qǐng)求都會(huì)被截?cái)啵ú辉賵?zhí)行run方法里的內(nèi)容了,直接執(zhí)行fallback方法里的內(nèi)容)厅贪。然后會(huì)隔一段時(shí)間默認(rèn)是5s蠢护,嘗試半開(kāi),放入一部分流量請(qǐng)求進(jìn)來(lái)养涮,相當(dāng)于對(duì)依賴服務(wù)進(jìn)行一次健康檢查葵硕,如果恢復(fù),熔斷器關(guān)閉贯吓,隨后完全恢復(fù)調(diào)用懈凹。改造HelloCommand類代碼如下:
package com.ivan.client.hystrix;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandProperties;
public class HelloCommand extends HystrixCommand<String> {
protected HelloCommand() {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("test"))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
//開(kāi)啟熔斷模式
.withCircuitBreakerEnabled(true)
//出現(xiàn)錯(cuò)誤的比率超過(guò)30%就開(kāi)啟熔斷
.withCircuitBreakerErrorThresholdPercentage(30)
//至少有10個(gè)請(qǐng)求才進(jìn)行errorThresholdPercentage錯(cuò)誤百分比計(jì)算
.withCircuitBreakerRequestVolumeThreshold(10)
//半開(kāi)試探休眠時(shí)間,這里設(shè)置為3秒
.withCircuitBreakerSleepWindowInMilliseconds(3000)
)
);
}
@Override
protected String run() throws Exception {
//模擬外部請(qǐng)求需要的時(shí)間長(zhǎng)度
System.out.println("執(zhí)行了run方法");
Thread.sleep(2000);
return "sucess";
}
@Override
protected String getFallback() {
//當(dāng)外部請(qǐng)求超時(shí)后宣决,會(huì)執(zhí)行fallback里的業(yè)務(wù)邏輯
System.out.println("執(zhí)行了回退方法");
return "error";
}
}
改造App的代碼蘸劈,通過(guò)執(zhí)行30次請(qǐng)求,可以看出剛開(kāi)始的時(shí)候(前10次請(qǐng)求)會(huì)執(zhí)行run方法里的邏輯尊沸,一量熔斷器打開(kāi)后威沫,將不執(zhí)行run方法里的內(nèi)容,而是直接執(zhí)行g(shù)etFallback方法里的邏輯洼专,直到過(guò)了設(shè)置的3秒后才又有流量執(zhí)行run方法的邏輯棒掠。改造App后的代碼如下:
package com.ivan.client.hystrix;
public class App {
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 30; i++) {
HelloCommand command = new HelloCommand();
String result = command.execute();
System.out.println("circuit Breaker is open : " + command.isCircuitBreakerOpen());
if(command.isCircuitBreakerOpen()){
Thread.currentThread().sleep(500);
}
}
}
}
Hystrix熔斷源碼分析
??找到AbstractCommand類的initCircuitBreaker方法,這是熔斷器的構(gòu)造方法入口屁商。首先判斷是否打開(kāi)了熔斷器烟很,只有在打開(kāi)了熔斷器后才會(huì)通過(guò)HystrixCircuitBreaker.Factory工廠新建一個(gè)熔斷器,源碼如下:
private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor,
HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey,
HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
if (enabled) {
if (fromConstructor == null) {
// get the default implementation of HystrixCircuitBreaker
return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);
} else {
return fromConstructor;
}
} else {
return new NoOpCircuitBreaker();
}
}
HystrixCircuitBreaker.Factory 類里對(duì)熔斷器根據(jù)CommandKey進(jìn)行了緩存蜡镶,如果存在直接取緩存里的key,不存在則新建HystrixCircuitBreakerImpl對(duì)象用于熔斷操作雾袱。源代碼如下:
class Factory {
//circuitBreakersByCommand 是個(gè)ConcurrentHashMap, 這里緩存了系統(tǒng)的所有熔斷器
private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();
public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
// this should find it for all but the first time
//先從緩存里取
HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name());
if (previouslyCached != null) {
return previouslyCached;
}
// if we get here this is the first time so we need to initialize
// Create and add to the map ... use putIfAbsent to atomically handle the possible race-condition of
// 2 threads hitting this point at the same time and let ConcurrentHashMap provide us our thread-safety
// If 2 threads hit here only one will get added and the other will get a non-null response instead.
//取不到對(duì)象才會(huì)創(chuàng)建個(gè)HystrixCircuitBreakerImpl對(duì)象并放入緩存Map中
HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics));
if (cbForCommand == null) {
// this means the putIfAbsent step just created a new one so let's retrieve and return it
return circuitBreakersByCommand.get(key.name());
} else {
// this means a race occurred and while attempting to 'put' another one got there before
// and we instead retrieved it and will now return it
return cbForCommand;
}
}
/**
* Get the {@link HystrixCircuitBreaker} instance for a given {@link HystrixCommandKey} or null if none exists.
*
* @param key
* {@link HystrixCommandKey} of {@link HystrixCommand} instance requesting the {@link HystrixCircuitBreaker}
* @return {@link HystrixCircuitBreaker} for {@link HystrixCommandKey}
*/
public static HystrixCircuitBreaker getInstance(HystrixCommandKey key) {
return circuitBreakersByCommand.get(key.name());
}
/**
* Clears all circuit breakers. If new requests come in instances will be recreated.
*/
/* package */static void reset() {
circuitBreakersByCommand.clear();
}
}
??HystrixCircuitBreakerImpl 這個(gè)類里定義了一個(gè)狀態(tài)變量官还,斷路由有三種狀態(tài) 芹橡,分別為關(guān)閉,打開(kāi)望伦,半開(kāi)狀態(tài)林说。重點(diǎn)關(guān)注下allowRequest方法,在allowRequest里首先判斷forceOpen屬性是否打開(kāi)屯伞,如果打開(kāi)則不允許有請(qǐng)求進(jìn)入腿箩,然后forceClosed屬性,如果這個(gè)屬性為true,剛對(duì)所有的求求放行劣摇,相當(dāng)于熔斷器不起作用珠移。之后就是狀態(tài)判斷了。isAfterSleepWindow()方法用于放行超過(guò)了指定時(shí)間后的流量,剑梳。具體代碼如下唆貌,關(guān)鍵部分有相應(yīng)的注釋:
class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
private final HystrixCommandProperties properties;
private final HystrixCommandMetrics metrics;
//三種狀態(tài)通過(guò)枚舉來(lái)定義
enum Status {
CLOSED, OPEN, HALF_OPEN;
}
//狀態(tài)變時(shí),默認(rèn)是關(guān)閉的狀態(tài)
private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED);
//最后一次訪問(wèn)的時(shí)間垢乙,用于試探請(qǐng)求是否恢復(fù)
private final AtomicLong circuitOpened = new AtomicLong(-1);
private final AtomicReference<Subscription> activeSubscription = new AtomicReference<Subscription>(null);
protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
this.properties = properties;
this.metrics = metrics;
//On a timer, this will set the circuit between OPEN/CLOSED as command executions occur
Subscription s = subscribeToStream();
activeSubscription.set(s);
}
private Subscription subscribeToStream() {
/*
* This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream
*/
return metrics.getHealthCountsStream()
.observe()
.subscribe(new Subscriber<HealthCounts>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(HealthCounts hc) {
// check if we are past the statisticalWindowVolumeThreshold
if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
// we are not past the minimum volume threshold for the stat window,
// so no change to circuit status.
// if it was CLOSED, it stays CLOSED
// if it was half-open, we need to wait for a successful command execution
// if it was open, we need to wait for sleep window to elapse
} else {
if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
//we are not past the minimum error threshold for the stat window,
// so no change to circuit status.
// if it was CLOSED, it stays CLOSED
// if it was half-open, we need to wait for a successful command execution
// if it was open, we need to wait for sleep window to elapse
} else {
// our failure rate is too high, we need to set the state to OPEN
if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
circuitOpened.set(System.currentTimeMillis());
}
}
}
}
});
}
//將熔斷器置于關(guān)閉狀態(tài)锨咙,并重置統(tǒng)計(jì)數(shù)據(jù)
@Override
public void markSuccess() {
if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
//This thread wins the race to close the circuit - it resets the stream to start it over from 0
metrics.resetStream();
Subscription previousSubscription = activeSubscription.get();
if (previousSubscription != null) {
previousSubscription.unsubscribe();
}
Subscription newSubscription = subscribeToStream();
activeSubscription.set(newSubscription);
circuitOpened.set(-1L);
}
}
//將熔斷器置于打開(kāi)狀態(tài)
@Override
public void markNonSuccess() {
if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
//This thread wins the race to re-open the circuit - it resets the start time for the sleep window
circuitOpened.set(System.currentTimeMillis());
}
}
//用于判斷熔斷器是否打開(kāi)
@Override
public boolean isOpen() {
if (properties.circuitBreakerForceOpen().get()) {
return true;
}
if (properties.circuitBreakerForceClosed().get()) {
return false;
}
return circuitOpened.get() >= 0;
}
//用于判斷是否放行流量
@Override
public boolean allowRequest() {
if (properties.circuitBreakerForceOpen().get()) {
return false;
}
if (properties.circuitBreakerForceClosed().get()) {
return true;
}
//第一次請(qǐng)求肯定就放行了
if (circuitOpened.get() == -1) {
return true;
} else {
//半開(kāi)狀態(tài)將不放行
if (status.get().equals(Status.HALF_OPEN)) {
return false;
} else {
return isAfterSleepWindow();
}
}
}
//根據(jù)當(dāng)前時(shí)間與最后一次請(qǐng)求的時(shí)候進(jìn)行比較,當(dāng)超過(guò)了設(shè)置的SleepWindowInMilliseconds追逮,將放行請(qǐng)求用于試探服務(wù)訪問(wèn)是否OK
private boolean isAfterSleepWindow() {
final long circuitOpenTime = circuitOpened.get();
final long currentTime = System.currentTimeMillis();
final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
return currentTime > circuitOpenTime + sleepWindowTime;
}
//用于試探服務(wù)是否OK的方法
@Override
public boolean attemptExecution() {
if (properties.circuitBreakerForceOpen().get()) {
return false;
}
if (properties.circuitBreakerForceClosed().get()) {
return true;
}
if (circuitOpened.get() == -1) {
return true;
} else {
if (isAfterSleepWindow()) {
//only the first request after sleep window should execute
//if the executing command succeeds, the status will transition to CLOSED
//if the executing command fails, the status will transition to OPEN
//if the executing command gets unsubscribed, the status will transition to OPEN
if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
return true;
} else {
return false;
}
} else {
return false;
}
}
}
}
Hystrix降級(jí)處理
??所謂降級(jí)酪刀,就是指在在Hystrix執(zhí)行非核心鏈路功能失敗的情況下,我們?nèi)绾翁幚砼シ酰热缥覀兎祷啬J(rèn)值等骂倘。如果我們要回退或者降級(jí)處理,代碼上需要實(shí)現(xiàn)HystrixCommand.getFallback()方法或者是HystrixObservableCommand. resumeWithFallback()巴席。
Hystrix與Spring Cloud整合
在實(shí)際項(xiàng)目的開(kāi)發(fā)中历涝,都會(huì)用到Fegin,所以這里的集成是在Feign的基礎(chǔ)上進(jìn)行的漾唉。
- 首先還是需要引入包荧库,pom.xml文件如下:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Edgware.SR4</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-ribbon</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-feign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-hystrix</artifactId>
</dependency>
</dependencies>
- 啟動(dòng)類需要加上EnableCircuitBreaker注解代碼如下:
package com.ivan.client.feign;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.netflix.feign.EnableFeignClients;
@SpringBootApplication
@EnableEurekaClient
@EnableFeignClients
@EnableCircuitBreaker
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
}
- FeignClient 注解需有fallback屬性,屬性的值是個(gè)class,這個(gè) class在是斷路器打開(kāi)后赵刑,會(huì)執(zhí)行的業(yè)務(wù)邏輯分衫,一般在項(xiàng)目里返回一個(gè)默認(rèn)值。這個(gè)類需要實(shí)現(xiàn)與FeignClient注釋上相同的接口
package com.ivan.client.feign.service;
import org.springframework.cloud.netflix.feign.FeignClient;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import com.ivan.client.feign.entity.User;
import com.ivan.client.feign.hystrix.fallback.UserServiceFallback;
@FeignClient(value="provider", fallback=UserServiceFallback.class)
public interface UserService {
@RequestMapping(method = RequestMethod.GET, value = "/user/{id}")
public User getUser(@PathVariable("id") Integer id);
@RequestMapping(method = RequestMethod.POST, value = "/user/create", consumes = MediaType.APPLICATION_JSON_VALUE)
public User create(User user);
}
- fallback類的代碼如下:
package com.ivan.client.feign.hystrix.fallback;
import org.springframework.stereotype.Component;
import com.ivan.client.feign.entity.User;
import com.ivan.client.feign.service.UserService;
@Component
public class UserServiceFallback implements UserService {
public User getUser(Integer id) {
System.out.println(Thread.currentThread().getName());
System.out.println("=====執(zhí)行到了fallback方法=======");
User user = new User();
user.setId(0);
return user;
}
public User create(User user) {
// TODO Auto-generated method stub
return null;
}
}
- application.properties文件里需要把feign.hystrix.enabled=true 這個(gè)屬性打開(kāi)般此。配置文件如下:
server.port=9000
spring.application.name=consumer-feign-hystrix
eureka.instance.hostname=localhost
eureka.client.serviceUrl.defaultZone=http://localhost:8761/eureka/
spring.cloud.circuit.breaker.enabled=true
ribbon.ReadTimeout=5000
feign.hystrix.enabled=true
#command相關(guān)
hystrix.command.default.execution.isolation.strategy=THREAD
#設(shè)置調(diào)用者的超時(shí)時(shí)間
hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds=6000
#是否開(kāi)啟超時(shí)設(shè)置
hystrix.command.default.execution.timeout.enabled=true
#表示是否在執(zhí)行超時(shí)時(shí)蚪战,中斷HystrixCommand.run() 的執(zhí)行
hystrix.command.default.execution.isolation.thread.interruptOnTimeout=true
#fallback相關(guān)
#是否開(kāi)啟fallback功能
hystrix.command.default.fallback.enabled=true
#斷路器相關(guān)
#是否開(kāi)啟斷路器
hystrix.command.default.circuitBreaker.enabled=true
#窗口時(shí)間內(nèi)打開(kāi)斷路器最小的請(qǐng)求量
hystrix.command.default.circuitBreaker.requestVolumeThreshold=5
#斷路器跳閘后,在此值的時(shí)間的內(nèi)铐懊,hystrix會(huì)拒絕新的請(qǐng)求邀桑,只有過(guò)了這個(gè)時(shí)間斷路器才會(huì)打開(kāi)閘門(mén)
hystrix.command.default.circuitBreaker.sleepWindowInMilliseconds=5
#失敗百分比的閾值
hystrix.command.default.circuitBreaker.errorThresholdPercentage=20
#線程相關(guān)配置
#核心線程數(shù)
hystrix.threadpool.default.coreSize=5
#最大線程數(shù)
hystrix.threadpool.default.maximumSize=5
#隊(duì)列的大小
hystrix.threadpool.default.maxQueueSize=1024
#因?yàn)閙axQueueSize值不能被動(dòng)態(tài)修改,所有通過(guò)設(shè)置此值可以實(shí)現(xiàn)動(dòng)態(tài)修改等待隊(duì)列長(zhǎng)度科乎。即等待的隊(duì)列的數(shù)量大于queueSizeRejectionThreshold時(shí)(但是沒(méi)有達(dá)到maxQueueSize值)概漱,則開(kāi)始拒絕后續(xù)的請(qǐng)求進(jìn)入隊(duì)列
hystrix.threadpool.default.queueSizeRejectionThreshold=128
#設(shè)置線程多久沒(méi)有服務(wù)后,需要釋放(maximumSize-coreSize )個(gè)線程
hystrix.threadpool.default.keepAliveTimeMinutes=60
上面的屬性基本上包括了大部分會(huì)在項(xiàng)目中使用的屬性喜喂,有以下幾點(diǎn)需要重點(diǎn)關(guān)注一下:
- 上面屬性的default可以改成ComandKey,這樣就可以對(duì)特定的接口進(jìn)行配置了竿裂,F(xiàn)eign中CommandKey的值為:接口名#方法名(參數(shù)類型)玉吁,如上的CommandKey為UserService#getUser(Integer)
- 在測(cè)試hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds 屬性的時(shí)候,服務(wù)端如果在指定的時(shí)間返回了結(jié)果腻异,但系統(tǒng)還是調(diào)用了fallback里的邏輯进副,需要指定ribbon.ReadTimeout的時(shí)間。