SpringCloud 命中CAP理論中的AP,
當(dāng)調(diào)用某個服務(wù)接口時友酱,Hystrix 會創(chuàng)建一個接口線程池來進(jìn)行隔離相關(guān)服務(wù)褪储。需要調(diào)用該接口的服務(wù)莲镣,都是在消費當(dāng)前服務(wù)的線程池。
1 命令模式降級:
需要繼承HystrixCommand<>方法
public class CommandForIndex extends HystrixCommand<Object> {
private final RestTemplate restTemplate;
public CommandForIndex(RestTemplate restTemplate) {
super(Setter
//這個是必填項踏堡,指定命令分組名,主要意義是用于統(tǒng)計(比如這是商城系統(tǒng)中的商品服務(wù))
.withGroupKey(HystrixCommandGroupKey.Factory.asKey("Prpduct-Group"))
//依賴名稱(如果是服務(wù)調(diào)用,這里就寫具體的接口名,如果是自定的操作别渔,就自己命令),默認(rèn)是command實現(xiàn)類的類名惧互,熔斷就是根據(jù)這個名稱
.andCommandKey(HystrixCommandKey.Factory.asKey("ClientController"))
//線程池命名哎媚,默認(rèn)就是HystrixCommandGroupKey的名稱,線程池配置就是根據(jù)這個名稱
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("UserThreadPool"))
//command 熔斷相關(guān)參數(shù)配置 超時時間1000毫秒
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(1000))
//設(shè)置線程池參數(shù)
.andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
//線程池大小為8
.withCoreSize(8)
//允許緩沖區(qū)大小1024
.withMaxQueueSize(1024))
);
this.restTemplate = restTemplate;
}
@Override
protected Object run() throws Exception {
System.out.println("-------------command-----------" + Thread.currentThread().getId());
String result = restTemplate.getForObject("http://localhost:8080/server/todo", String.class);
System.out.println("--------------command finish------------result:" + result);
return result;
}
@Override
protected Object getFallback(){
System.out.println("我要降級了");
return "超時降級了";
}
}
如圖所示 商城系統(tǒng)中用戶在瀏覽商品喊儡,下單商品拨与,支付時,都會有不同的線程池進(jìn)行隔離艾猜。線程池的名稱就是分組名买喧,統(tǒng)計主要是指接口的調(diào)用情況判斷是否需要降級或熔斷捻悯。
設(shè)置線程池參數(shù)實現(xiàn)簡單秒殺系統(tǒng),比如將線程池參數(shù)設(shè)置中coreSize設(shè)置為1,maxQueyeSize設(shè)置成2時,每次只能有一個線程請求改服務(wù)淤毛,多的話放到阻塞隊列中阻塞隊列可以2個今缚。
//設(shè)置線程池參數(shù)
.andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
//線程池大小為8
.withCoreSize(8)
//允許緩沖區(qū)大小1024
.withMaxQueueSize(1024))
通過測試用列模擬十個線程并發(fā)執(zhí)行時,只有三個線程進(jìn)了command 方法低淡,其余的都被線程池直接拒絕執(zhí)行了fallback方法姓言。(是不是可以自定義線程池拒絕時的方法)
hystrix 配置項
一般只需要在resources下新增一個config.properties配置文件就可以了
# Hystrix 默認(rèn)加載的配置文件 - 限流、 熔斷示例
# 線程池大小
hystrix.threadpool.default.coreSize=1
# 緩沖區(qū)大小蔗蹋, 如果為-1何荚,則不緩沖,直接進(jìn)行降級 fallback
hystrix.threadpool.default.maxQueueSize=200
# 緩沖區(qū)大小超限的閾值猪杭,超限就直接降級
hystrix.threadpool.default.queueSizeRejectionThreshold=2
# 執(zhí)行策略
# 資源隔離模式餐塘,默認(rèn)thread。 還有一種叫信號量
hystrix.command.default.execution.isolation.strategy=THREAD
# 是否打開超時
hystrix.command.default.execution.timeout.enabled=true
# 超時時間胁孙,默認(rèn)1000毫秒
hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds=1000
# 超時時中斷線程(只是中斷當(dāng)前的服務(wù)唠倦,并沒有中斷調(diào)用端的服務(wù))
hystrix.command.default.execution.isolation.thread.interruptOnTimeout=true
# 取消時候中斷線程
hystrix.command.default.execution.isolation.thread.interruptOnFutureCancel=false
# 信號量模式下,最大并發(fā)量
hystrix.command.default.execution.isolation.semaphore.maxConcurrentRequests=2
# 降級策略
# 是否開啟服務(wù)降級
hystrix.command.default.fallback.enabled=true
# fallback執(zhí)行并發(fā)量
hystrix.command.default.fallback.isolation.semaphore.maxConcurrentRequests=100
# 熔斷策略
# 啟用/禁用熔斷機制
hystrix.command.default.circuitBreaker.enabled=true
# 強制開啟熔斷(設(shè)置為true時涮较,表示所有的請求都會熔斷調(diào))
hystrix.command.default.circuitBreaker.forceOpen=false
# 強制關(guān)閉熔斷(設(shè)置為true時,表示所有的請求都降級了也不會熔斷)
hystrix.command.default.circuitBreaker.forceClosed=false
# 前提條件冈止,一定時間內(nèi)發(fā)起一定數(shù)量的請求狂票。 也就是5秒鐘內(nèi)(這個5秒對應(yīng)下面的滾動窗口長度)至少請求3次,熔斷器才發(fā)揮起作用熙暴」胧簦總數(shù) 默認(rèn)20(一定時間內(nèi)有3個請求)
hystrix.command.default.circuitBreaker.requestVolumeThreshold=3
# 錯誤百分比。達(dá)到或超過這個百分比周霉,熔斷器打開掂器。 比如:5秒內(nèi)有100請求,60個請求超時或者失敗俱箱,就會自動開啟熔斷(一定的時間內(nèi)有 50%的請求失敗了)
hystrix.command.default.circuitBreaker.errorThresholdPercentage=50
# 10秒后国瓮,進(jìn)入半打開狀態(tài)(熔斷開啟,間隔一段時間后狞谱,會讓一部分的命令去請求服務(wù)提供者乃摹,如果結(jié)果依舊是失敗,則又會進(jìn)入熔斷狀態(tài)跟衅,如果成功孵睬,就關(guān)閉熔斷)。 默認(rèn)5秒
hystrix.command.default.circuitBreaker.sleepWindowInMilliseconds=10000
# 度量策略
# 5秒為一次統(tǒng)計周期伶跷,術(shù)語描述:滾動窗口的長度為5秒
hystrix.command.default.metrics.rollingStats.timeInMilliseconds=5000
# 統(tǒng)計周期內(nèi) 度量桶的數(shù)量掰读,必須被timeInMilliseconds整除秘狞。作用:(5秒之內(nèi)請求數(shù)量分成10份,也就相當(dāng)于統(tǒng)計500ms內(nèi)請求的數(shù)量蹈集,失敗的比率)
hystrix.command.default.metrics.rollingStats.numBuckets=10
# 是否收集執(zhí)行時間烁试,并計算各個時間段的百分比
hystrix.command.default.metrics.rollingPercentile.enabled=true
# 設(shè)置執(zhí)行時間統(tǒng)計周期為多久,用來計算百分比
hystrix.command.default.metrics.rollingPercentile.timeInMilliseconds=60000
# 執(zhí)行時間統(tǒng)計周期內(nèi)雾狈,每個度量桶最多統(tǒng)計多少條記錄廓潜。設(shè)置為50,有100次請求善榛,則只會統(tǒng)計最近的10次
hystrix.command.default.metrics.rollingPercentile.bucketSize=100
# 數(shù)據(jù)取樣時間間隔
hystrix.command.default.metrics.healthSnapshot.intervalInMilliseconds=500
# 設(shè)置是否緩存請求辩蛋,request-scope內(nèi)緩存
hystrix.command.default.requestCache.enabled=false
# 設(shè)置HystrixCommand執(zhí)行和事件是否打印到HystrixRequestLog中
hystrix.command.default.requestLog.enabled=false
######DnUser-ThreadPool特定配置
# hystrix.threadpool.DnUser-ThreadPool.coreSize=20
# hystrix.threadpool.DnUser-ThreadPool.maxQueueSize=1000
# 超過就報錯
# hystrix.threadpool.DnUser-ThreadPool.queueSizeRejectionThreshold=800
3 采用注解的方式來實現(xiàn)服務(wù)降級
@RequestMapping("todo")
@HystrixCommand(
//線程池相關(guān)配置
threadPoolProperties = {@HystrixProperty(name = "coreSize",value = "1"),@HystrixProperty(name="queueSizeRejectionThreshold",value = "1")},
//配置超時時間
commandProperties = {@HystrixProperty(name="execution.isolation.thread.timeoutInMilliseconds",value = "100")})
public String todo() {
}
Hystrix 執(zhí)行流程
代碼實現(xiàn):
在AbstractCommand主要的三個初始化分別為:
this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);
1 初始化度量桶initMetrics():主要起到聚合作用,將一段時間內(nèi)的執(zhí)行結(jié)果進(jìn)行匯總也就是上圖report metrics的過程移盆,具體設(shè)置見配置文件中的度量策略的配置項悼院。
2 初始化短路器initCircuitBreaker: HystrixCircuitBreaker接口三個抽象方法分別為:
public boolean allowRequest();
public boolean isOpen();
void markSuccess();
該接口的實現(xiàn)類主要有:HystrixCircuitBreakerImpl 和 NoOpCircuitBreaker,其中NoOpCircuitBreaker實現(xiàn)的是一個固定值,沒有具體做任何判斷
static class NoOpCircuitBreaker implements HystrixCircuitBreaker {
public boolean allowRequest() {
return true;
}
@Override
public boolean isOpen() {
return false;
}
@Override
public void markSuccess() {
}
}
主要實現(xiàn)熔斷機制是通過 HystrixCircuitBreakerImpl
public void markSuccess() {
if (circuitOpen.get()) {
if (circuitOpen.compareAndSet(true, false)) {
metrics.resetStream();
}
}
}
@Override
public boolean allowRequest() {
// 首先判斷斷路器是否強制打開咒循,如果強制打開就會永遠(yuǎn)處于斷路狀態(tài)
if (properties.circuitBreakerForceOpen().get()) {
return false;
}
// 首先判斷斷路器是否強制關(guān)閉据途,如果強制打開就不會永遠(yuǎn)處于斷路狀態(tài)
if (properties.circuitBreakerForceClosed().get()) {
isOpen();
return true;
}
return !isOpen() || allowSingleTest();
}
//半開啟狀態(tài)判斷
public boolean allowSingleTest() {
// 獲取上一次斷路器打開的時間
long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
//斷路器是否打開&&斷路器上一次打開的時間+系統(tǒng)配置斷路器多長時間進(jìn)入半開啟狀態(tài)>當(dāng)前時間
if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
// CAS操作重新設(shè)置半打開的時間
if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
return true;
}
}
return false;
}
//是否打開的判斷
@Override
public boolean isOpen() {
if (circuitOpen.get()) {
return true;
}
HealthCounts health = metrics.getHealthCounts();
//度量桶中統(tǒng)計的失敗的請求小于配置請求量時,是沒有打開的
if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
return false;
}
//度量桶中的錯誤百分比小于配置時叙甸,也不會打開
if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
return false;
} else {
//如果錯誤百分比大于配置時颖医,就要使用CASj將斷路器打開,設(shè)置斷路器打開的時間
if (circuitOpen.compareAndSet(false, true)) {
circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
return true;
} else {
return true;
}
}
}
HystrixCommandAspect:
hystrix是對方法的增強裆蒸,入口就是在該類上熔萧。環(huán)繞增加的方法:
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
Method method = getMethodFromTarget(joinPoint);
Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
"annotations at the same time");
}
MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
Object result;
try {
if (!metaHolder.isObservable()) {
result = CommandExecutor.execute(invokable, executionType, metaHolder);
} else {
result = executeObservable(invokable, executionType, metaHolder);
}
} catch (HystrixBadRequestException e) {
throw e.getCause();
} catch (HystrixRuntimeException e) {
throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
}
return result;
}