一、推薦閱讀
可能有部分同學(xué)對(duì) Hystrix 的特性了解的不是很清晰歇式,推薦如下文章奏赘,寫(xiě)的真的好;
二幕屹、如何使用Hystrix
目前Hystrix最新版本是1.5.13蓝丙,在項(xiàng)目的pom文件中加上依賴(lài)
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-core</artifactId>
<version>1.5.13</version>
</dependency>
1.Command方式
要想使用hystrix,只需要繼承HystrixCommand
或HystrixObservableCommand
望拖,兩者主要區(qū)別是:
- 前者的命令邏輯寫(xiě)在
run()
渺尘;后者的命令邏輯寫(xiě)在construct()
- 前者的
run()
是由新創(chuàng)建的線(xiàn)程執(zhí)行;后者的construct()
是由調(diào)用程序線(xiàn)程執(zhí)行 - 前者一個(gè)實(shí)例只能向調(diào)用程序發(fā)送(emit)單條數(shù)據(jù)说敏,鸥跟;后者一個(gè)實(shí)例可以順序發(fā)送多條數(shù)據(jù)
1.1執(zhí)行命令
execute()
、queue()
盔沫、observe()
医咨、toObservable()
這4個(gè)方法用來(lái)觸發(fā)執(zhí)行run()/construct()
,一個(gè)實(shí)例只能執(zhí)行一次這4個(gè)方法架诞,特別說(shuō)明的是HystrixObservableCommand
沒(méi)有execute()
和queue()
拟淮。
4個(gè)方法的主要區(qū)別:
-
execute()
:以同步堵塞方式執(zhí)行run()
。調(diào)用execute()
后谴忧,hystrix先創(chuàng)建一個(gè)新線(xiàn)程運(yùn)行run()
很泊,接著調(diào)用程序 要在execute()
調(diào)用處一直堵塞著角虫,直到run()
運(yùn)行完成 -
queue()
:以異步非堵塞方式執(zhí)行run()
。一調(diào)用queue()
就直接返回一個(gè)Future對(duì)象委造,同時(shí)hystrix創(chuàng)建一個(gè)新線(xiàn)程運(yùn)行run()
戳鹅,調(diào)用程序通過(guò)Future.get()
拿到run()
的返回結(jié)果,而Future.get()
是堵塞執(zhí)行的 -
observe()
:事件注冊(cè)前執(zhí)行run()/construct()
昏兆。第一步是事件注冊(cè)前枫虏,先調(diào)用observe()
自動(dòng)觸發(fā) 執(zhí)行run()/construct()
(如果繼承的是HystrixCommand
,hystrix將創(chuàng)建新線(xiàn)程非堵塞執(zhí)行run()
亮垫;如果繼承的是HystrixObservableCommand
模软,將以調(diào)用程序線(xiàn)程堵塞執(zhí)行construct()
),第二步是從observe()
返回后調(diào)用程序調(diào)用subscribe()
完成事件注冊(cè)饮潦,如果run()/construct()
執(zhí)行成功則觸發(fā)onNext()
和onCompleted()
燃异,如果執(zhí)行異常則觸發(fā)onError()
-
toObservable()
:事件注冊(cè)后執(zhí)行run()/construct()
。第一步是事件注冊(cè)前继蜡,一調(diào)用toObservable()
就直接返回一個(gè)Observable<String>
對(duì)象回俐,第二步調(diào)用subscribe()
完成事件注冊(cè)后自動(dòng)觸發(fā)執(zhí)行run()/construct()
(如果繼承的是HystrixCommand
,hystrix將創(chuàng)建新線(xiàn)程非堵塞執(zhí)行run()
稀并,調(diào)用程序不必等待run()
仅颇;如果繼承的是HystrixObservableCommand
,將以調(diào)用程序線(xiàn)程堵塞執(zhí)行construct()
碘举,調(diào)用程序等待construct()
執(zhí)行完才能繼續(xù)往下走)忘瓦,如果run()/construct()
執(zhí)行成功則觸發(fā)onNext()
和onCompleted()
,如果執(zhí)行異常則觸發(fā)onError()
1.2 HystrixCommand
public class CommandHelloWorld2 extends HystrixCommand<String> {
private final String name;
protected CommandHelloWorld2(String name) {
super(//命令分組用于對(duì)依賴(lài)操作分組,便于統(tǒng)計(jì),匯總等.
Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("HelloWorldGroup"))
//配置依賴(lài)超時(shí)時(shí)間,500毫秒
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(500))
//HystrixCommondKey工廠(chǎng)定義依賴(lài)名稱(chēng)
.andCommandKey(HystrixCommandKey.Factory.asKey("commandHelloWorld2"))
//使用HystrixThreadPoolKey工廠(chǎng)定義線(xiàn)程池名稱(chēng)
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("HelloWorldPool")));
this.name = name;
}
@Override
protected String getFallback() {
return "execute Falled";
}
@Override
protected String run() throws Exception {
//sleep 2秒 ,調(diào)用會(huì)超時(shí)
// TimeUnit.MILLISECONDS.sleep(2000);
return "Hello " + name + " thread : " + Thread.currentThread().getName();
}
public static void main(String[] args) throws Exception {
CommandHelloWorld2 commandHelloWorld2 = new CommandHelloWorld2("test-Fallback");
String s = commandHelloWorld2.execute();
System.out.println(" 同步 ====== " + s);
/*Future<String> queue = commandHelloWorld2.queue();
String s1 = queue.get();*/
}
}
-
execute() 執(zhí)行
HystrixCommand
內(nèi)部的execute方法引颈,可以實(shí)現(xiàn)run方法的同步執(zhí)行String s = commandHelloWorld2.execute();
-
queue() 執(zhí)行
HystrixCommand
內(nèi)部的queue方法耕皮,可以實(shí)現(xiàn)run方法的異步執(zhí)行,如果依賴(lài)多個(gè)下游接口 ,通過(guò)異步方式蝙场,可以同時(shí)執(zhí)行凌停,提高接口性能。Future<String> queue = commandHelloWorld2.queue(); String s1 = queue.get()
-
通過(guò)執(zhí)行的結(jié)果發(fā)現(xiàn)
run()
是由新創(chuàng)建的線(xiàn)程執(zhí)行售滤,結(jié)果如下同步 ====== Hello test-Fallback thread : hystrix-HelloWorldPool-1
構(gòu)造方法
主要是相關(guān)參數(shù)設(shè)置罚拟,具體的參數(shù)的作用后續(xù)會(huì)介紹
protected CommandHelloWorld2(String name) {
super(//命令分組用于對(duì)依賴(lài)操作分組,便于統(tǒng)計(jì),匯總等.
Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("HelloWorldGroup"))
//配置依賴(lài)超時(shí)時(shí)間,500毫秒
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(500))
//HystrixCommondKey工廠(chǎng)定義依賴(lài)名稱(chēng)
.andCommandKey(HystrixCommandKey.Factory.asKey("commandHelloWorld2"))
//使用HystrixThreadPoolKey工廠(chǎng)定義線(xiàn)程池名稱(chēng)
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("HelloWorldPool")));
this.name = name;
}
以下是封裝的具體點(diǎn)的參數(shù)設(shè)置,具體的參數(shù)設(shè)置根據(jù)業(yè)務(wù)需求而定完箩;
public CommandHelloWorld4(Integer id) {
super(setter());
this.id = id;
}
private static Setter setter() {
return ApiSetter.setter("getNum");
}
public class ApiSetter {
public static HystrixCommand.Setter setter(String commandKeyName, String threadPoolKeyName) {
return setter("ApiGroup",commandKeyName,threadPoolKeyName);
}
public static HystrixCommand.Setter setter(String commandKeyName) {
return setter(commandKeyName,"Api-Pool");
}
/**
* @author liweihan
* @time 2017/12/20 16:57
* @description 相關(guān)參數(shù)設(shè)置
* @param groupKeyName 服務(wù)分組名
* @param commandKeyName 服務(wù)標(biāo)識(shí)名稱(chēng)
* @param threadPoolKeyName 線(xiàn)程池名稱(chēng)
* @return
*/
public static HystrixCommand.Setter setter(String groupKeyName, String commandKeyName, String threadPoolKeyName) {
//服務(wù)分組
HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey(groupKeyName);
//服務(wù)標(biāo)識(shí)
HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey(commandKeyName);
//線(xiàn)程池名稱(chēng)
HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey(threadPoolKeyName);
//線(xiàn)程配置
HystrixThreadPoolProperties.Setter threadPoolProperties = HystrixThreadPoolProperties.Setter()
.withCoreSize(25)
.withKeepAliveTimeMinutes(5)
.withMaxQueueSize(Integer.MAX_VALUE)
.withQueueSizeRejectionThreshold(10000);
//命令屬性的配置
HystrixCommandProperties.Setter commandProperties = HystrixCommandProperties.Setter()
.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
.withExecutionIsolationThreadInterruptOnTimeout(true)
.withExecutionTimeoutInMilliseconds(3000) //設(shè)置超時(shí)時(shí)間為3秒時(shí)自動(dòng)熔斷
.withCircuitBreakerErrorThresholdPercentage(20);//失敗率達(dá)到20%自動(dòng)熔斷
//返回
return HystrixCommand.Setter
.withGroupKey(groupKey)
.andCommandKey(commandKey)
.andThreadPoolKey(threadPoolKey)
.andThreadPoolPropertiesDefaults(threadPoolProperties)
.andCommandPropertiesDefaults(commandProperties);
}
}
1.3 HystrixObservableCommand
代碼如下:
public class HelloWorldHystrixObservableCommand extends HystrixObservableCommand<String> {
private final String name;
public HelloWorldHystrixObservableCommand(String name) {
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.name = name;
}
// @Override
// protected String getFallback() {
// System.out.println("觸發(fā)了降級(jí)!");
// return "exeucute fallback";
// }
@Override
protected Observable<String> construct() {
System.out.println("in construct! thread:" + Thread.currentThread().getName());
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> observer) {
try {
System.out.println("in call of construct! thread:" + Thread.currentThread().getName());
if (!observer.isUnsubscribed()) {
// observer.onError(getExecutionException()); // 直接拋異常退出赐俗,不會(huì)往下執(zhí)行
observer.onNext("Hello1" + " thread:" + Thread.currentThread().getName());
observer.onNext("Hello2" + " thread:" + Thread.currentThread().getName());
observer.onNext(name + " thread:" + Thread.currentThread().getName());
System.out.println("complete before------" + " thread:" + Thread.currentThread().getName());
observer.onCompleted(); // 不會(huì)往下執(zhí)行observer的任何方法
System.out.println("complete after------" + " thread:" + Thread.currentThread().getName());
observer.onNext("abc"); // 不會(huì)執(zhí)行到
}
} catch (Exception e) {
observer.onError(e);
}
}
} );
}
public static void main(String[] args) throws Exception{
Observable<String> hotObservable = new HelloWorldHystrixObservableCommand("Hlx").observe();
// Observable<String> hotObservable = new HelloWorldHystrixObservableCommand("Hlx").toObservable();
Thread.sleep(2000);
System.out.println("睡眠中。弊知。秃励。。吉捶。");
// 注冊(cè)觀(guān)察者事件
// subscribe()是非堵塞的
hotObservable.subscribe(new Observer<String>() {
// 先執(zhí)行onNext再執(zhí)行onCompleted
@Override
public void onCompleted() {
System.out.println("hotObservable of ObservableCommand completed");
}
@Override
public void onError(Throwable e) {
System.out.println("hotObservable of ObservableCommand error");
e.printStackTrace();
}
@Override
public void onNext(String v) {
System.out.println("hotObservable of ObservableCommand onNext: " + v);
}
});
}
}
執(zhí)行結(jié)果如下:
in construct! thread:main
in call of construct! thread:main
complete before------ thread:main
complete after------ thread:main
睡眠中夺鲜。皆尔。。币励。慷蠕。
hotObservable of ObservableCommand onNext: Hello1 thread:main
hotObservable of ObservableCommand onNext: Hello2 thread:main
hotObservable of ObservableCommand onNext: Hlx thread:main
hotObservable of ObservableCommand completed
- 一個(gè)實(shí)例通過(guò)observer.onNext可以順序發(fā)送多條數(shù)據(jù)
- 通過(guò)執(zhí)行結(jié)果可知調(diào)用
observe()
自動(dòng)觸發(fā)執(zhí)行construct()
方法 - 通過(guò)執(zhí)行結(jié)果 thread:main 可知
construct()
是由調(diào)用程序線(xiàn)程執(zhí)行
toObservable()方法
將toObservable()的注釋放開(kāi),observe()注釋掉食呻,執(zhí)行main方法流炕,結(jié)果如下:
睡眠中。仅胞。每辟。。干旧。
in construct! thread:main
in call of construct! thread:main
hotObservable of ObservableCommand onNext: Hello1 thread:main
hotObservable of ObservableCommand onNext: Hello2 thread:main
hotObservable of ObservableCommand onNext: Hlx thread:main
complete before------ thread:main
hotObservable of ObservableCommand completed
complete after------ thread:main
- 直接調(diào)用
toObservable()
并不會(huì)執(zhí)行construct()
方法渠欺,調(diào)用subscribe()
完成事件注冊(cè)后自動(dòng)觸發(fā)執(zhí)行construct()
2.fallback(降級(jí))
使用fallback機(jī)制很簡(jiǎn)單,繼承HystrixCommand
只需重寫(xiě)getFallback()
椎眯,繼承HystrixObservableCommand
只需重寫(xiě)resumeWithFallback()
挠将,比如HelloWorldHystrixCommand
加上下面代碼片段:
@Override
protected String getFallback() {
return "execute Falled";
}
fallback實(shí)際流程是當(dāng)run()/construct()
被觸發(fā)執(zhí)行時(shí)或執(zhí)行中發(fā)生錯(cuò)誤時(shí),將轉(zhuǎn)向執(zhí)行getFallback()/resumeWithFallback()
编整。結(jié)合下圖舔稀,4種錯(cuò)誤情況將觸發(fā)fallback:
- 當(dāng)
construct()
或者run()
方法執(zhí)行過(guò)程中拋出異常。 - 當(dāng)回路器打開(kāi)掌测,命令的執(zhí)行進(jìn)入了熔斷狀態(tài)内贮。
- 當(dāng)命令執(zhí)行的線(xiàn)程池和隊(duì)列或者信號(hào)量已經(jīng)滿(mǎn)容。
- 命令執(zhí)行超時(shí)汞斧。
若失敗回退方法執(zhí)行失敗贺归,或者用戶(hù)未提供失敗回退方法,Hystrix 會(huì)根據(jù)調(diào)用執(zhí)行命令的方法的不同而產(chǎn)生不同的行為:
-
execute()
—— 拋出異常 -
queue()
—— 成功返回Future
對(duì)象断箫,但其get()
方法被調(diào)用時(shí),會(huì)拋出異常 -
observe()
—— 返回Observable
對(duì)象秋冰,當(dāng)你訂閱它的時(shí)候仲义,會(huì)立即調(diào)用 subscriber 的onError
方法中止請(qǐng)求 -
toObservable()
—— 返回Observable
對(duì)象,當(dāng)你訂閱它的時(shí)候剑勾,會(huì)立即調(diào)用 subscriber 的onError
方法中止請(qǐng)求
途中大致的執(zhí)行順序如下:
1埃撵、構(gòu)建 HystrixCommand 或者 HystrixObservableCommand 對(duì)象
2、執(zhí)行命令(即上述 Command 對(duì)象包裝的邏輯)
3虽另、結(jié)果是否有緩存
4暂刘、請(qǐng)求線(xiàn)路(類(lèi)似電路)是否是開(kāi)路
5、線(xiàn)程池/請(qǐng)求隊(duì)列/信號(hào)量占滿(mǎn)時(shí)會(huì)發(fā)生什么
6捂刺、使用 HystrixObservableCommand.construct() 還是 HystrixCommand.run()
7谣拣、計(jì)算鏈路健康度
8募寨、失敗回退邏輯
9、返回正成回應(yīng)
接下來(lái)拔鹰,我們一一驗(yàn)證4中情況
1、當(dāng)construct()
或者run()
方法執(zhí)行過(guò)程中拋出異常贵涵,代碼如下:
public class HystrixFallbackException extends HystrixCommand<String> {
private final String name;
public HystrixFallbackException(String name) {
super(HystrixCommandGroupKey.Factory.asKey("FallbackGroup"));
this.name = name;
}
@Override
protected String run() throws Exception {
/*---------------會(huì)觸發(fā)fallback的case-------------------*/
//1.主動(dòng)拋出異常
// throw new HystrixTimeoutException();
// throw new RuntimeException("this command will trigger fallback");
// throw new Exception("this command will trigger fallback");
// throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, commandClass, message, cause, fallbackException);
// 2.除零異常
//int i = 1/0;
/*---------------不會(huì)觸發(fā)fallback的case-------------------*/
// 3.被捕獲的異常不會(huì)觸發(fā)fallback
/*try {
throw new RuntimeException("this command never trigger fallback");
} catch(Exception e) {
e.printStackTrace();
}*/
// 4.HystrixBadRequestException異常由非法參數(shù)或非系統(tǒng)錯(cuò)誤引起列肢,不會(huì)觸發(fā)fallback,也不會(huì)被計(jì)入熔斷器
throw new HystrixBadRequestException("HystrixBadRequestException is never trigger fallback");
//return name;
}
@Override
protected String getFallback() {
return "fallback: " + name;
}
public static void main(String[] args) {
HystrixFallbackException hlx = new HystrixFallbackException("Hlx");
try {
String execute = hlx.execute();
System.out.println(execute);
} catch (Exception e) {
System.out.println("run()拋出HystrixBadRequestException時(shí)宾茂,會(huì)被捕獲到這里" + e.getCause());
}
}
}
幾種異常情況已經(jīng)在代碼中注釋了瓷马,可直接去嘗試每種情況
非HystrixBadRequestException異常:當(dāng)拋出HystrixBadRequestException時(shí),調(diào)用程序可以捕獲異常跨晴,沒(méi)有觸發(fā)
getFallback()
欧聘,而其他異常則會(huì)觸發(fā)getFallback()
,調(diào)用程序?qū)@得getFallback()
的返回
2坟奥、命令執(zhí)行超時(shí)
代碼如下:
public class HystrixFallbackTimeOut extends HystrixCommand<String> {
private final String name;
protected HystrixFallbackTimeOut(String name) {
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.name = name;
}
@Override
protected String getFallback() {
return "execute Falled";
}
@Override
protected String run() throws Exception {
//sleep 2秒 ,調(diào)用會(huì)超時(shí)
TimeUnit.MILLISECONDS.sleep(2000);
return "Hello " + name + " thread : " + Thread.currentThread().getName();
}
public static void main(String[] args) throws Exception {
HystrixFallbackTimeOut hystrixFallbackTimeOut = new HystrixFallbackTimeOut("Fallback-timeout");
String s = hystrixFallbackTimeOut.execute();
System.out.println(" 同步 ====== " + s);
}
}
- run()方法由于睡眠了2s,Hystrix的默認(rèn)的執(zhí)行超時(shí)時(shí)間為1000ms,所以執(zhí)行會(huì)超時(shí)
3树瞭、當(dāng)回路器打開(kāi),命令的執(zhí)行進(jìn)入了熔斷狀態(tài)
代碼如下:
/**
*
* CircuitBreakerRequestVolumeThreshold設(shè)置為3爱谁,意味著10s內(nèi)請(qǐng)求超過(guò)3次才會(huì)觸發(fā)熔斷器
* circuitBreakerErrorThresholdPercentage設(shè)置為80晒喷,錯(cuò)誤率是為%80才會(huì)觸發(fā)熔斷器
* 必須兩個(gè)參數(shù)同時(shí)滿(mǎn)足才會(huì)才會(huì)觸發(fā)熔斷器
* run()中使命令超時(shí)進(jìn)入fallback,執(zhí)行4次run后访敌,將被熔斷凉敲,進(jìn)入降級(jí),即不進(jìn)入run()而直接進(jìn)入fallback
*
*/
public class HystrixFallbackCircuitBreaker extends HystrixCommand<String> {
private Integer id;
public HystrixFallbackCircuitBreaker(Integer id) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("CircuitBreakerTestGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("CircuitBreakerTestKey"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("CircuitBreakerTest"))
.andThreadPoolPropertiesDefaults( // 配置線(xiàn)程池
HystrixThreadPoolProperties.Setter()
.withCoreSize(200) // 配置線(xiàn)程池里的線(xiàn)程數(shù)寺旺,設(shè)置足夠多線(xiàn)程爷抓,以防未熔斷卻打滿(mǎn)threadpool
)
.andCommandPropertiesDefaults( // 配置熔斷器
HystrixCommandProperties.Setter()
//開(kāi)啟熔斷器
.withCircuitBreakerEnabled(true)
//滑動(dòng)窗口內(nèi)(10s)的請(qǐng)求數(shù)閾值,只有達(dá)到了這個(gè)閾值阻塑,才有可能熔斷蓝撇。默認(rèn)是 20,如果這個(gè)時(shí)間段只有19個(gè)請(qǐng)求陈莽,就算全部失敗了渤昌,也不會(huì)自動(dòng)熔斷。
.withCircuitBreakerRequestVolumeThreshold(3)
//錯(cuò)誤率閾值走搁,默認(rèn) 50%独柑,比如(10s)內(nèi)有100個(gè)請(qǐng)求,其中有60個(gè)發(fā)生異常私植,那么這段時(shí)間的錯(cuò)誤率是 60忌栅,已經(jīng)超過(guò)了錯(cuò)誤率閾值,熔斷器會(huì)自動(dòng)打開(kāi)曲稼。
.withCircuitBreakerErrorThresholdPercentage(80)
// .withCircuitBreakerForceOpen(true) // 置為true時(shí)索绪,所有請(qǐng)求都將被拒絕湖员,直接到fallback
// .withCircuitBreakerForceClosed(true) // 置為true時(shí),將忽略錯(cuò)誤
// .withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE) // 信號(hào)量隔離
// .withExecutionTimeoutInMilliseconds(5000)
)
);
this.id = id;
}
@Override
protected String run() throws Exception {
System.out.println("running run():" + id);
if (id % 2 == 0 && id <= 10) { //讓小于等于10的偶數(shù)返回
return "running run():" + id;
} else { //讓奇數(shù)或大于10的數(shù)進(jìn)入fallback
TimeUnit.MILLISECONDS.sleep(2000);
return id+"";
}
}
@Override
protected String getFallback() {
return " ====== CircuitBreaker fallback" + id + " ,是否進(jìn)入熔斷:" + super.isCircuitBreakerOpen();
}
public static void main(String[] args) {
for(int i = 0; i < 50; i++) {
try {
System.out.println("===========" + new HystrixFallbackCircuitBreaker(i).execute());
} catch(Exception e) {
System.out.println("run()拋出HystrixBadRequestException時(shí)者春,被捕獲到這里" + e.getCause());
}
}
}
- 我們配置10s內(nèi)請(qǐng)求數(shù)大于3個(gè)時(shí)就啟動(dòng)熔斷器破衔,請(qǐng)求錯(cuò)誤率大于80%時(shí)就熔斷,然后for循環(huán)發(fā)起請(qǐng)求钱烟,當(dāng)請(qǐng)求符合熔斷條件時(shí)將觸發(fā)
getFallback()
晰筛。
4、當(dāng)命令執(zhí)行的線(xiàn)程池和隊(duì)列或者信號(hào)量已經(jīng)滿(mǎn)容
代碼如下:
package com.jimingqiang.study.hystrix.hystrixbegin.failback;
import com.netflix.hystrix.*;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* Created by QDHL on 2018/10/11.
*
* @author mingqiang ji
*/
public class HystrixFallbackThreadPool extends HystrixCommand<String> {
private final String name;
public HystrixFallbackThreadPool(String name) {
// super(HystrixCommandGroupKey.Factory.asKey("ThreadPoolTestGroup"));
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ThreadPoolTestGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("testCommandKey"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ThreadPoolTest"))
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(5000)
)
.andThreadPoolPropertiesDefaults(
HystrixThreadPoolProperties.Setter()
.withCoreSize(3) // 配置線(xiàn)程池里的線(xiàn)程數(shù)
)
);
this.name = name;
}
@Override
protected String run() throws Exception {
TimeUnit.MILLISECONDS.sleep(3000);
return name;
}
@Override
protected String getFallback() {
return "fallback: " + name;
}
public static void main(String[] args) {
for(int i = 0; i < 10; i++) {
try {
Future<String> future = new HystrixFallbackThreadPool("Hlx"+i).queue();
} catch(Exception e) {
System.out.println("run()拋出HystrixBadRequestException時(shí)拴袭,被捕獲到這里" + e.getCause());
}
}
for(int i = 0; i < 20; i++) {
try {
System.out.println("===========" + new HystrixFallbackThreadPool("Hlx").execute());
} catch(Exception e) {
System.out.println("run()拋出HystrixBadRequestException時(shí)读第,被捕獲到這里" + e.getCause());
}
}
}
}
-
我們配置線(xiàn)程池?cái)?shù)目為3,然后先用一個(gè)for循環(huán)執(zhí)行
queue()
拥刻,觸發(fā)的run()
sleep 3s怜瞒,然后再用第2個(gè)for循環(huán)執(zhí)行execute()
,發(fā)現(xiàn)所有execute()
都觸發(fā)了fallback般哼,這是因?yàn)榈?個(gè)for的線(xiàn)程還在sleep吴汪,占用著線(xiàn)程池所有線(xiàn)程,導(dǎo)致第2個(gè)for的所有命令都無(wú)法獲取到線(xiàn)程蒸眠。3.隔離策略
hystrix提供了兩種隔離策略:線(xiàn)程池隔離和信號(hào)量隔離漾橙。hystrix默認(rèn)采用線(xiàn)程池隔離。
- 線(xiàn)程池隔離:不同服務(wù)通過(guò)使用不同線(xiàn)程池楞卡,彼此間將不受影響霜运,達(dá)到隔離效果。我們通過(guò)andThreadPoolKey配置使用命名為
ThreadPoolTest
的線(xiàn)程池蒋腮,實(shí)現(xiàn)與其他命名的線(xiàn)程池天然隔離淘捡,如果不配置andThreadPoolKey則使用withGroupKey配置來(lái)命名線(xiàn)程池 - 信號(hào)量隔離:線(xiàn)程隔離會(huì)帶來(lái)線(xiàn)程開(kāi)銷(xiāo),有些場(chǎng)景(比如無(wú)網(wǎng)絡(luò)請(qǐng)求場(chǎng)景)可能會(huì)因?yàn)橛瞄_(kāi)銷(xiāo)換隔離得不償失池摧,為此hystrix提供了信號(hào)量隔離焦除,當(dāng)服務(wù)的并發(fā)數(shù)大于信號(hào)量閾值時(shí)將進(jìn)入fallback。通過(guò)
withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)
配置為信號(hào)量隔離作彤,通過(guò)withExecutionIsolationSemaphoreMaxConcurrentRequests
配置執(zhí)行并發(fā)數(shù)膘魄。
3.1 線(xiàn)程池隔離
在該模式下,用戶(hù)請(qǐng)求會(huì)被提交到各自的線(xiàn)程池中執(zhí)行宦棺,把執(zhí)行每個(gè)下游服務(wù)的線(xiàn)程分離,這樣黔帕,某個(gè)依賴(lài)服務(wù)的高延遲只會(huì)拖慢這個(gè)依賴(lài)服務(wù)對(duì)應(yīng)的線(xiàn)程池代咸,從而達(dá)到資源隔離的作用。當(dāng)線(xiàn)程池來(lái)不及處理并且請(qǐng)求隊(duì)列塞滿(mǎn)時(shí)成黄,新進(jìn)來(lái)的請(qǐng)求將快速失敗呐芥,可以避免依賴(lài)問(wèn)題擴(kuò)散逻杖。
優(yōu)勢(shì)
- 減少所依賴(lài)服務(wù)發(fā)生故障時(shí)的影響面,比如ServiceA服務(wù)發(fā)生異常思瘟,導(dǎo)致請(qǐng)求大量超時(shí)荸百,對(duì)應(yīng)的線(xiàn)程池被打滿(mǎn),這時(shí)并不影響ServiceB滨攻、ServiceC的調(diào)用够话。
- 如果接口性能有變動(dòng),可以方便的動(dòng)態(tài)調(diào)整線(xiàn)程池的參數(shù)或者是超時(shí)時(shí)間光绕,前提是Hystrix參數(shù)實(shí)現(xiàn)了動(dòng)態(tài)調(diào)整女嘲。
缺點(diǎn)
- 請(qǐng)求在線(xiàn)程池中執(zhí)行,肯定會(huì)帶來(lái)任務(wù)調(diào)度诞帐、排隊(duì)和上下文切換帶來(lái)的開(kāi)銷(xiāo)欣尼。
- 因?yàn)樯婕暗娇缇€(xiàn)程,那么就存在ThreadLocal數(shù)據(jù)的傳遞問(wèn)題停蕉,比如在主線(xiàn)程初始化的ThreadLocal變量愕鼓,在線(xiàn)程池線(xiàn)程中無(wú)法獲取
注意
因?yàn)镠ystrix默認(rèn)使用了線(xiàn)程池模式,所以對(duì)于每個(gè)Command慧起,在初始化的時(shí)候菇晃,會(huì)創(chuàng)建一個(gè)對(duì)應(yīng)的線(xiàn)程池,如果項(xiàng)目中需要進(jìn)行降級(jí)的接口非常多完慧,比如有上百個(gè)的話(huà)谋旦,不太了解Hystrix內(nèi)部機(jī)制的同學(xué),按照默認(rèn)配置直接使用屈尼,可能就會(huì)造成線(xiàn)程資源的大量浪費(fèi)册着。
- 線(xiàn)程池隔離:不同服務(wù)通過(guò)使用不同線(xiàn)程池楞卡,彼此間將不受影響霜运,達(dá)到隔離效果。我們通過(guò)andThreadPoolKey配置使用命名為
線(xiàn)程池的使用示意圖如下圖所示,當(dāng)n個(gè)請(qǐng)求線(xiàn)程并發(fā)對(duì)某個(gè)接口請(qǐng)求調(diào)用時(shí)脾歧,會(huì)先從hystrix管理的線(xiàn)程池里面獲得一個(gè)線(xiàn)程甲捏,然后將參數(shù)傳遞給這個(gè)線(xiàn)程去執(zhí)行真正調(diào)用。線(xiàn)程池的大小有限鞭执,默認(rèn)是10個(gè)線(xiàn)程司顿,可以使用maxConcurrentRequests參數(shù)配置,如果并發(fā)請(qǐng)求數(shù)多于線(xiàn)程池線(xiàn)程個(gè)數(shù)兄纺,就有線(xiàn)程需要進(jìn)入隊(duì)列排隊(duì)大溜,但排隊(duì)隊(duì)列也有上限,默認(rèn)是 5估脆,如果排隊(duì)隊(duì)列也滿(mǎn)钦奋,則必定有請(qǐng)求線(xiàn)程會(huì)走fallback流程。
線(xiàn)程池模式可以支持異步調(diào)用,支持超時(shí)調(diào)用付材,支持直接熔斷朦拖,存在線(xiàn)程切換,開(kāi)銷(xiāo)大厌衔。
代碼如下:
public class HystrixThreadPoolLsolation extends HystrixCommand<String> {
private final String name;
public HystrixThreadPoolLsolation(String name,String threadPoolName) {
// super(HystrixCommandGroupKey.Factory.asKey("ThreadPoolTestGroup"));
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ThreadPoolTestGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("testCommandKey"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(threadPoolName))
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(5000)
)
.andThreadPoolPropertiesDefaults(
HystrixThreadPoolProperties.Setter()
.withCoreSize(3) // 配置線(xiàn)程池里的線(xiàn)程數(shù)
)
);
this.name = name;
}
@Override
protected String run() throws Exception {
TimeUnit.MILLISECONDS.sleep(3000);
return name+"-"+Thread.currentThread().getName();
}
@Override
protected String getFallback() {
return "fallback: " + name;
}
public static void main(String[] args) {
for(int i = 0; i < 10; i++) {
try {
Future<String> future = new HystrixThreadPoolLsolation("Hlx"+i,"thread-pool-1").queue();
} catch(Exception e) {
System.out.println("run()拋出HystrixBadRequestException時(shí)璧帝,被捕獲到這里" + e.getCause());
}
}
for(int i = 0; i < 20; i++) {
try {
System.out.println("===========" + new HystrixThreadPoolLsolation("Hlx","thread-pool-2").execute());
} catch(Exception e) {
System.out.println("run()拋出HystrixBadRequestException時(shí),被捕獲到這里" + e.getCause());
}
}
}
}
- 我們配置線(xiàn)程池?cái)?shù)目為3富寿,然后先用一個(gè)for循環(huán)執(zhí)行
queue()
睬隶,觸發(fā)的run()
sleep 3s,然后再用第2個(gè)for循環(huán)執(zhí)行execute()
作喘,發(fā)現(xiàn)所有execute()
沒(méi)有觸發(fā)fallback理疙,而是繼續(xù)執(zhí)行,這是因?yàn)閮蓚€(gè)命令配置了不同的線(xiàn)程池泞坦。
3.2 信號(hào)量
如果要使用信號(hào)量模式窖贤,需要配置參數(shù)execution.isolation.strategy=ExecutionIsolationStrategy.SEMAPHORE
;
另外贰锁,為了限制對(duì)下游依賴(lài)的并發(fā)調(diào)用量赃梧,可以配置Hystrix的
execution.isolation.semaphore.maxConcurrentRequests
當(dāng)并發(fā)請(qǐng)求數(shù)達(dá)到閾值時(shí),請(qǐng)求線(xiàn)程可以快速失敗豌熄,執(zhí)行降級(jí)授嘀。
信號(hào)量的使用示意圖如下圖所示,當(dāng)n個(gè)并發(fā)請(qǐng)求去調(diào)用一個(gè)目標(biāo)服務(wù)接口時(shí)锣险,都要獲取一個(gè)信號(hào)量才能真正去調(diào)用目標(biāo)服務(wù)接口蹄皱,但信號(hào)量有限,默認(rèn)是10個(gè)芯肤,可以使用maxConcurrentRequests參數(shù)配置巷折,如果并發(fā)請(qǐng)求數(shù)多于信號(hào)量個(gè)數(shù),信號(hào)量在達(dá)到上限時(shí)崖咨,會(huì)拒絕后續(xù)請(qǐng)求的訪(fǎng)問(wèn)锻拘,則必定有請(qǐng)求線(xiàn)程會(huì)走fallback流程,從而達(dá)到限流和防止雪崩的目的击蹲。
信號(hào)量模式從始至終都只有請(qǐng)求線(xiàn)程自身署拟,是同步調(diào)用模式,不支持超時(shí)調(diào)用歌豺,由于沒(méi)有線(xiàn)程的切換推穷,開(kāi)銷(xiāo)非常小。
在該模式下类咧,接收請(qǐng)求和執(zhí)行下游依賴(lài)在同一個(gè)線(xiàn)程內(nèi)完成馒铃,
比如一個(gè)接口中依賴(lài)了3個(gè)下游:serviceA谴咸、serviceB、serviceC骗露,且這3個(gè)服務(wù)返回的數(shù)據(jù)互相不依賴(lài),這種情況下如果針對(duì)A血巍、B萧锉、C的熔斷降級(jí)使用信號(hào)量模式,那么接口耗時(shí)就等于請(qǐng)求A述寡、B柿隙、C服務(wù)耗時(shí)的總和,無(wú)疑這不是好的方案鲫凶。
代碼如下:
package com.jimingqiang.study.hystrix.hystrixbegin.Isolationstrategy;
import com.netflix.hystrix.*;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* 測(cè)試信號(hào)量隔離
* 默認(rèn)執(zhí)行run()用的是主線(xiàn)程禀崖,為了模擬并行執(zhí)行command,這里我們自己創(chuàng)建多個(gè)線(xiàn)程來(lái)執(zhí)行command
* 設(shè)置ExecutionIsolationSemaphoreMaxConcurrentRequests為3螟炫,意味著信號(hào)量最多允許執(zhí)行run的并發(fā)數(shù)為3波附,超過(guò)則觸發(fā)降級(jí),即不執(zhí)行run而執(zhí)行g(shù)etFallback
* 設(shè)置FallbackIsolationSemaphoreMaxConcurrentRequests為1昼钻,意味著信號(hào)量最多允許執(zhí)行fallback的并發(fā)數(shù)為1掸屡,超過(guò)則拋異常fallback execution rejected
*/
public class HystrixSemaphorelLsolation extends HystrixCommand<String> {
private final String name;
public HystrixSemaphorelLsolation(String name) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("SemaphoreTestGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("SemaphoreTestKey"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("SemaphoreTestThreadPoolKey"))
.andCommandPropertiesDefaults( // 配置信號(hào)量隔離
HystrixCommandProperties.Setter()
.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE) // 信號(hào)量隔離
.withExecutionIsolationSemaphoreMaxConcurrentRequests(3)
.withFallbackIsolationSemaphoreMaxConcurrentRequests(1)
)
// 設(shè)置了信號(hào)量隔離后,線(xiàn)程池配置將變無(wú)效
// .andThreadPoolPropertiesDefaults(
// HystrixThreadPoolProperties.Setter()
// .withCoreSize(13) // 配置線(xiàn)程池里的線(xiàn)程數(shù)
// )
);
this.name = name;
}
@Override
protected String run() throws Exception {
return "run(): name="+name+"然评,線(xiàn)程名是" + Thread.currentThread().getName();
}
@Override
protected String getFallback() {
return "getFallback(): name="+name+"仅财,線(xiàn)程名是" + Thread.currentThread().getName();
}
public static void main(String[] args) throws Exception{
try {
Thread.sleep(2000);
for(int i = 0; i < 5; i++) {
final int j = i;
// 自主創(chuàng)建線(xiàn)程來(lái)執(zhí)行command,創(chuàng)造并發(fā)場(chǎng)景
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("===========" + new HystrixSemaphorelLsolation("HLX" + j).execute());
}
});
thread.start();
}
} catch(Exception e) {
e.printStackTrace();
}
System.in.read();
}
}
-
由于并發(fā)執(zhí)行5個(gè)線(xiàn)程碗淌,ExecutionIsolationSemaphoreMaxConcurrentRequests為3,設(shè)置FallbackIsolationSemaphoreMaxConcurrentRequests為1,最終執(zhí)行結(jié)果是3個(gè)線(xiàn)程可以并發(fā)執(zhí)行
run()
盏求,1個(gè)線(xiàn)程執(zhí)行
getFallback()
,一個(gè)線(xiàn)程拋出異常亿眠;
4.熔斷器
在生活中碎罚,如果電路的負(fù)載過(guò)高,保險(xiǎn)箱會(huì)自動(dòng)跳閘缕探,以保護(hù)家里的各種電器魂莫,這就是熔斷器的一個(gè)活生生例子。在Hystrix中也存在這樣一個(gè)熔斷器爹耗,當(dāng)所依賴(lài)的服務(wù)不穩(wěn)定時(shí)耙考,能夠自動(dòng)熔斷,并提供有損服務(wù)潭兽,保護(hù)服務(wù)的穩(wěn)定性倦始。在運(yùn)行過(guò)程中,Hystrix會(huì)根據(jù)接口的執(zhí)行狀態(tài)(成功山卦、失敗鞋邑、超時(shí)和拒絕)诵次,收集并統(tǒng)計(jì)這些數(shù)據(jù),根據(jù)這些信息來(lái)實(shí)時(shí)決策是否進(jìn)行熔斷枚碗。
線(xiàn)路的開(kāi)路閉路詳細(xì)邏輯如下:
- 假設(shè)線(xiàn)路內(nèi)的容量(請(qǐng)求QPS)達(dá)到一定閾值(通過(guò)
HystrixCommandProperties.circuitBreakerRequestVolumeThreshold()
配置) - 同時(shí)逾一,假設(shè)線(xiàn)路內(nèi)的錯(cuò)誤率達(dá)到一定閾值(通過(guò)
HystrixCommandProperties.circuitBreakerErrorThresholdPercentage()
配置) - 熔斷器將從『閉路』轉(zhuǎn)換成『開(kāi)路』
- 若此時(shí)是『開(kāi)路』狀態(tài),熔斷器將短路后續(xù)所有經(jīng)過(guò)該熔斷器的請(qǐng)求肮雨,這些請(qǐng)求直接走『失敗回退邏輯』
- 經(jīng)過(guò)一定時(shí)間(即『休眠窗口』遵堵,通過(guò)
HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds()
配置),后續(xù)第一個(gè)請(qǐng)求將會(huì)被允許通過(guò)熔斷器(此時(shí)熔斷器處于『半開(kāi)』狀態(tài))怨规,若該請(qǐng)求失敗陌宿,熔斷器將又進(jìn)入『開(kāi)路』狀態(tài),且在休眠窗口內(nèi)保持此狀態(tài)波丰;若該請(qǐng)求成功壳坪,熔斷器將進(jìn)入『閉路』狀態(tài),回到邏輯1循環(huán)往復(fù)掰烟。
? 代碼可以參考 【2.fallback(降級(jí))中的 3爽蝴、當(dāng)回路器打開(kāi),命令的執(zhí)行進(jìn)入了熔斷狀態(tài)的代碼片段 】
5.請(qǐng)求緩存
5.1 請(qǐng)求緩存有如下好處:
-
不同請(qǐng)求路徑上針對(duì)同一個(gè)依賴(lài)服務(wù)進(jìn)行的重復(fù)請(qǐng)求(有同一個(gè)緩存 Key)纫骑,不會(huì)真實(shí)請(qǐng)求多次
這個(gè)特性在企業(yè)級(jí)系統(tǒng)中非常有用霜瘪,在這些系統(tǒng)中,開(kāi)發(fā)者往往開(kāi)發(fā)的只是系統(tǒng)功能的一部分惧磺。(注:這樣颖对,開(kāi)發(fā)者彼此隔離,不太可能使用同樣的方法或者策略去請(qǐng)求同一個(gè)依賴(lài)服務(wù)提供的資源)
例如磨隘,請(qǐng)求一個(gè)用戶(hù)的
Account
的邏輯如下所示缤底,這個(gè)邏輯往往在系統(tǒng)不同地方被用到:Account account = new UserGetAccount(accountId).execute(); //or Observable<Account> accountObservable = new UserGetAccount(accountId).observe();
Hystrix 的
RequestCache
只會(huì)在內(nèi)部執(zhí)行run()
方法一次,上面兩個(gè)線(xiàn)程在執(zhí)行HystrixCommand
命令時(shí)番捂,會(huì)得到相同的結(jié)果个唧,即使這兩個(gè)命令是兩個(gè)不同的實(shí)例。 -
數(shù)據(jù)獲取具有一致性
因?yàn)榫彺娴拇嬖谏柙ぃ说谝淮握?qǐng)求需要真正訪(fǎng)問(wèn)依賴(lài)服務(wù)以外徙歼,后續(xù)請(qǐng)求全部從緩存中獲取,可以保證在同一個(gè)用戶(hù)請(qǐng)求內(nèi)鳖枕,不會(huì)出現(xiàn)依賴(lài)服務(wù)返回不同的回應(yīng)的情況魄梯。
-
避免不必要的線(xiàn)程執(zhí)行
在
construct()
或run()
方法執(zhí)行之前,會(huì)先從請(qǐng)求緩存中獲取數(shù)據(jù)宾符,因此酿秸,Hystrix 能利用這個(gè)特性避免不必要的線(xiàn)程執(zhí)行,減小系統(tǒng)開(kāi)銷(xiāo)。若 Hystrix 沒(méi)有實(shí)現(xiàn)請(qǐng)求緩存,那么
HystrixCommand
和HystrixObservableCommand
的實(shí)現(xiàn)者需要自己在construct()
或run()
方法中實(shí)現(xiàn)緩存,這種方式無(wú)法避免不必要的線(xiàn)程執(zhí)行開(kāi)銷(xiāo)绵载。
5.2 緩存的使用
要使用hystrix cache功能稀蟋,第一個(gè)要求是重寫(xiě)getCacheKey()
煌张,用來(lái)構(gòu)造cache key;第二個(gè)要求是構(gòu)建context退客,如果請(qǐng)求B要用到請(qǐng)求A的結(jié)果緩存唱矛,A和B必須同處一個(gè)context。通過(guò)HystrixRequestContext.initializeContext()
和context.shutdown()
可以構(gòu)建一個(gè)context井辜,這兩條語(yǔ)句間的所有請(qǐng)求都處于同一個(gè)context。
代碼如下:
public class CommandHelloWorld3 extends HystrixCommand<String> {
private final int id;
protected CommandHelloWorld3(int id) {
super(HystrixCommandGroupKey.Factory.asKey("RequestCacheCommand"));
this.id = id;
}
@Override
protected String run() throws Exception {
System.out.println(Thread.currentThread().getName() + " execute id = " + id);
return "execute=" + id;
}
//重寫(xiě)getCacheKey管闷,實(shí)現(xiàn)區(qū)分不同請(qǐng)求的邏輯
@Override
protected String getCacheKey() {
System.out.println(" --- ");
return String.valueOf(id);
}
public static void main(String[] args) {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
CommandHelloWorld3 commandHelloWorld3_a = new CommandHelloWorld3(22);
CommandHelloWorld3 commandHelloWorld3_b = new CommandHelloWorld3(22);
System.out.println("a執(zhí)行結(jié)果:" + commandHelloWorld3_a.execute());
System.out.println("a執(zhí)行結(jié)果是否從緩存中獲戎嘟拧:" + commandHelloWorld3_a.isResponseFromCache);
System.out.println("b執(zhí)行結(jié)果:" + commandHelloWorld3_b.execute());
System.out.println("b執(zhí)行結(jié)果是否從緩存中獲取:" + commandHelloWorld3_b.isResponseFromCache);
} finally {
context.shutdown();
}
context = HystrixRequestContext.initializeContext();
try {
CommandHelloWorld3 commandHelloWorld3_c = new CommandHelloWorld3(22);
System.out.println("c執(zhí)行結(jié)果:" + commandHelloWorld3_c.execute());
System.out.println("c執(zhí)行結(jié)果是否從緩存中獲劝觥:" + commandHelloWorld3_c.isResponseFromCache);
} finally {
context.shutdown();
}
}
}
執(zhí)行結(jié)果如下:
---
---
hystrix-RequestCacheCommand-1 execute id = 22
a執(zhí)行結(jié)果:execute=22
a執(zhí)行結(jié)果是否從緩存中獲人⒃省:false
---
---
b執(zhí)行結(jié)果:execute=22
b執(zhí)行結(jié)果是否從緩存中獲取:true
---
---
hystrix-RequestCacheCommand-2 execute id = 22
c執(zhí)行結(jié)果:execute=22
c執(zhí)行結(jié)果是否從緩存中獲缺棠摇:false
6.合并請(qǐng)求collapsing
下圖展示了在兩種場(chǎng)景(未增加『請(qǐng)求合并器』和增加『請(qǐng)求合并器』)下树灶,線(xiàn)程和網(wǎng)絡(luò)連接數(shù)量(假設(shè)所有請(qǐng)求在一個(gè)很小的時(shí)間窗口內(nèi),例如 10ms糯而,是『并發(fā)』的):
為什么要使用請(qǐng)求合并天通?
例如,這里有一個(gè)包含 300 個(gè)視頻對(duì)象的列表熄驼,需要遍歷這個(gè)列表像寒,并對(duì)每一個(gè)對(duì)象調(diào)用 getSomeAttribute()
方法,但如果簡(jiǎn)單處理的話(huà)瓜贾,可能會(huì)導(dǎo)致 300 次的網(wǎng)絡(luò)請(qǐng)求(假設(shè) getSomeAttribute()
方法內(nèi)需要發(fā)出網(wǎng)絡(luò)請(qǐng)求)诺祸,每一個(gè)網(wǎng)絡(luò)請(qǐng)求可能都會(huì)花上幾毫秒(顯然,這種方式非常容易拖慢系統(tǒng))祭芦。
通過(guò)使用 HystrixCollapser
筷笨,Hystrix 能自動(dòng)完成請(qǐng)求的合并,可以將300個(gè)網(wǎng)絡(luò)請(qǐng)求降低為只發(fā)送一次網(wǎng)絡(luò)請(qǐng)求龟劲,大大的減少了網(wǎng)絡(luò)的耗時(shí)胃夏。
Hystrix中的請(qǐng)求合并,就是利用一個(gè)合并處理器昌跌,將對(duì)同一個(gè)服務(wù)發(fā)起的連續(xù)請(qǐng)求合并成一個(gè)請(qǐng)求進(jìn)行處理(這些連續(xù)請(qǐng)求的時(shí)間窗默認(rèn)為10ms)构订,可以有效節(jié)省網(wǎng)絡(luò)帶寬和線(xiàn)程池資源
請(qǐng)求合并帶來(lái)的額外開(kāi)銷(xiāo)
請(qǐng)求合并會(huì)導(dǎo)致依賴(lài)服務(wù)的請(qǐng)求延遲增高(該延遲為等待請(qǐng)求的延遲),延遲的最大值為合并時(shí)間窗口大小避矢。
若某個(gè)請(qǐng)求耗時(shí)的可能是 5ms悼瘾,合并時(shí)間窗口為 10ms囊榜,但是現(xiàn)在必須再等10ms看看還有沒(méi)有其他的請(qǐng)求一起的,這樣一個(gè)請(qǐng)求的耗時(shí)就從5ms增加到15ms了亥宿。
請(qǐng)求合并帶來(lái)的額外開(kāi)銷(xiāo)是否值得卸勺,取決于將要執(zhí)行的命令,高延遲的命令相比較而言不會(huì)有太大的影響,因?yàn)檫@個(gè)時(shí)候時(shí)間窗的時(shí)間消耗就顯得微不足道了
另外烫扼,如果一個(gè)命令具有高并發(fā)度曙求,并且能批量處理多個(gè),甚至上百個(gè)的話(huà)映企,請(qǐng)求合并帶來(lái)的性能開(kāi)銷(xiāo)會(huì)因?yàn)橥掏铝康臉O大提升而基本可以忽略悟狱,因?yàn)?Hystrix 會(huì)減少這些請(qǐng)求所需的線(xiàn)程和網(wǎng)絡(luò)連接數(shù)量。如果一個(gè)合并時(shí)間窗口內(nèi)只有 1~2 個(gè)請(qǐng)求堰氓,將請(qǐng)求合并顯然不是明智的選擇挤渐。
代碼如下:
package com.jimingqiang.study.hystrix.hystrixbegin.collapsing;
import com.netflix.hystrix.*;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class HystrixCollapsing extends HystrixCollapser<List<String>, String, Integer> {
private final Integer key;
public HystrixCollapsing(Integer key) {
this.key = key;
}
/**
* 請(qǐng)求參數(shù)
* 如果有多個(gè)參數(shù)需要被綁定,創(chuàng)建一個(gè)單獨(dú)的對(duì)象來(lái)包含它們双絮,或者使用Tuple浴麻。
* @return
*/
@Override
public Integer getRequestArgument() {
return key;
}
/**
* 創(chuàng)建一個(gè)批量請(qǐng)求命令
* @param requests 保存了延遲時(shí)間窗中收集到的所有單個(gè)的請(qǐng)求的參數(shù) String是返回結(jié)果類(lèi)型,Integer是請(qǐng)求參數(shù)類(lèi)型
* @return
*/
@Override
protected HystrixCommand<List<String>> createCommand(final Collection<CollapsedRequest<String, Integer>> requests) {
return new BatchCommand(requests); // 把批量請(qǐng)求傳給command類(lèi)
}
//
/**
* 把批量請(qǐng)求的結(jié)果和對(duì)應(yīng)的請(qǐng)求一一對(duì)應(yīng)起來(lái)
* @param batchResponse 響應(yīng)的結(jié)果
* @param requests 請(qǐng)求參數(shù)
*/
@Override
protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, Integer>> requests) {
int count = 0;
for (CollapsedRequest<String, Integer> request : requests) {
request.setResponse(batchResponse.get(count++));
}
}
// command類(lèi)
private static final class BatchCommand extends HystrixCommand<List<String>> {
private final Collection<CollapsedRequest<String, Integer>> requests;
private BatchCommand(Collection<CollapsedRequest<String, Integer>> requests) {
super(HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("CollepsingGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("CollepsingKey")
));
this.requests = requests;
}
@Override
protected List<String> run() {
ArrayList<String> response = new ArrayList<String>();
// 處理每個(gè)請(qǐng)求囤攀,返回結(jié)果
for (CollapsedRequest<String, Integer> request : requests) {
// artificial response for each argument received in the batch
response.add("ValueForKey: " + request.getArgument() + " thread:" + Thread.currentThread().getName());
}
return response;
}
}
public static void main(String[] args) throws Exception {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
Future<String> f1 = new HystrixCollapsing(1).queue();
Future<String> f2 = new HystrixCollapsing(2).queue();
Future<String> f3 = new HystrixCollapsing(3).queue();
Future<String> f4 = new HystrixCollapsing(4).queue();
Future<String> f5 = new HystrixCollapsing(5).queue();
// f5和f6软免,如果sleep時(shí)間夠小則會(huì)合并,如果sleep時(shí)間夠大則不會(huì)合并焚挠,默認(rèn)10ms
TimeUnit.MILLISECONDS.sleep(1000);
Future<String> f6 = new HystrixCollapsing(6).queue();
System.out.println(f1.get());
System.out.println(f2.get());
System.out.println(f3.get());
System.out.println(f4.get());
System.out.println(f5.get());
System.out.println(f6.get());
// note:numExecuted表示共有幾個(gè)命令執(zhí)行膏萧,1個(gè)批量多命令請(qǐng)求算一個(gè),這個(gè)實(shí)際值可能比代碼寫(xiě)的要多蝌衔,因?yàn)閐ue to non-determinism of scheduler since this example uses the real timer
int numExecuted = HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size();
System.out.println("num executed: " + numExecuted);
int numLogs = 0;
for (HystrixInvokableInfo<?> command : HystrixRequestLog.getCurrentRequest().getAllExecutedCommands()) {
numLogs++;
System.err.println(command.getCommandKey().name() + " => command.getExecutionEvents(): " + command.getExecutionEvents());
}
System.out.println(numLogs==numExecuted);
} finally {
context.shutdown();
}
}
}
重要一點(diǎn)向抢,兩個(gè)請(qǐng)求能自動(dòng)合并的前提是兩者足夠“近”,即合并時(shí)間窗口,兩者啟動(dòng)執(zhí)行的間隔時(shí)長(zhǎng)要足夠小胚委,默認(rèn)為10ms挟鸠,即超過(guò)10ms將不自動(dòng)合并。
-
我們連續(xù)發(fā)起多個(gè)queue請(qǐng)求亩冬,依次返回f1~f6共6個(gè)Future對(duì)象艘希,根據(jù)打印結(jié)果可知f2~f5同處一個(gè)線(xiàn)程,說(shuō)明這4個(gè)請(qǐng)求被合并了硅急,而f6由另一個(gè)線(xiàn)程執(zhí)行覆享,這是因?yàn)?em>f5和f6中間隔了一個(gè)sleep,超過(guò)了合并要求的最大間隔時(shí)長(zhǎng)营袜,f1為什么沒(méi)有合并到一起撒顿,我很疑惑
執(zhí)行結(jié)果如下:
ValueForKey: 1 thread:hystrix-CollepsingGroup-1 ValueForKey: 2 thread:hystrix-CollepsingGroup-2 ValueForKey: 3 thread:hystrix-CollepsingGroup-2 ValueForKey: 4 thread:hystrix-CollepsingGroup-2 ValueForKey: 5 thread:hystrix-CollepsingGroup-2 CollepsingKey => command.getExecutionEvents(): [SUCCESS, COLLAPSED] ValueForKey: 6 thread:hystrix-CollepsingGroup-3 CollepsingKey => command.getExecutionEvents(): [SUCCESS, COLLAPSED] num executed: 3 CollepsingKey => command.getExecutionEvents(): [SUCCESS, COLLAPSED] true
7. 配置策略
? 具體的配置可以查看官網(wǎng) 官網(wǎng)配置地址,下邊僅是簡(jiǎn)單的整理
-
HystrixCommandProperties
/* --------------統(tǒng)計(jì)相關(guān)------------------*/ // 統(tǒng)計(jì)滾動(dòng)的時(shí)間窗口,默認(rèn):5000毫秒(取自circuitBreakerSleepWindowInMilliseconds) private final HystrixProperty metricsRollingStatisticalWindowInMilliseconds; // 統(tǒng)計(jì)窗口的Buckets的數(shù)量,默認(rèn):10個(gè),每秒一個(gè)Buckets統(tǒng)計(jì) private final HystrixProperty metricsRollingStatisticalWindowBuckets; // number of buckets in the statisticalWindow // 是否開(kāi)啟監(jiān)控統(tǒng)計(jì)功能,默認(rèn):true private final HystrixProperty metricsRollingPercentileEnabled; /* --------------熔斷器相關(guān)------------------*/ // 熔斷器在整個(gè)統(tǒng)計(jì)時(shí)間內(nèi)是否開(kāi)啟的閥值荚板,默認(rèn)20凤壁。也就是在metricsRollingStatisticalWindowInMilliseconds(默認(rèn)10s)內(nèi)至少請(qǐng)求20次吩屹,熔斷器才發(fā)揮起作用 private final HystrixProperty circuitBreakerRequestVolumeThreshold; // 熔斷時(shí)間窗口,默認(rèn):5秒.熔斷器中斷請(qǐng)求5秒后會(huì)進(jìn)入半打開(kāi)狀態(tài),放下一個(gè)請(qǐng)求進(jìn)來(lái)重試拧抖,如果該請(qǐng)求成功就關(guān)閉熔斷器煤搜,否則繼續(xù)等待一個(gè)熔斷時(shí)間窗口 private final HystrixProperty circuitBreakerSleepWindowInMilliseconds; //是否啟用熔斷器,默認(rèn)true. 啟動(dòng) private final HystrixProperty circuitBreakerEnabled; //默認(rèn):50%。當(dāng)出錯(cuò)率超過(guò)50%后熔斷器啟動(dòng) private final HystrixProperty circuitBreakerErrorThresholdPercentage; //是否強(qiáng)制開(kāi)啟熔斷器阻斷所有請(qǐng)求,默認(rèn):false,不開(kāi)啟唧席。置為true時(shí)擦盾,所有請(qǐng)求都將被拒絕,直接到fallback private final HystrixProperty circuitBreakerForceOpen; //是否允許熔斷器忽略錯(cuò)誤,默認(rèn)false, 不開(kāi)啟 private final HystrixProperty circuitBreakerForceClosed; /* --------------信號(hào)量相關(guān)------------------*/ //使用信號(hào)量隔離時(shí)淌哟,命令調(diào)用最大的并發(fā)數(shù),默認(rèn):10 private final HystrixProperty executionIsolationSemaphoreMaxConcurrentRequests; //使用信號(hào)量隔離時(shí)迹卢,命令fallback(降級(jí))調(diào)用最大的并發(fā)數(shù),默認(rèn):10 private final HystrixProperty fallbackIsolationSemaphoreMaxConcurrentRequests; /* --------------其他------------------*/ //使用命令調(diào)用隔離方式,默認(rèn):采用線(xiàn)程隔離,ExecutionIsolationStrategy.THREAD private final HystrixProperty executionIsolationStrategy; //使用線(xiàn)程隔離時(shí),調(diào)用超時(shí)時(shí)間徒仓,默認(rèn):1秒 private final HystrixProperty executionIsolationThreadTimeoutInMilliseconds; //線(xiàn)程池的key,用于決定命令在哪個(gè)線(xiàn)程池執(zhí)行 private final HystrixProperty executionIsolationThreadPoolKeyOverride; //是否開(kāi)啟fallback降級(jí)策略 默認(rèn):true private final HystrixProperty fallbackEnabled; // 使用線(xiàn)程隔離時(shí)腐碱,是否對(duì)命令執(zhí)行超時(shí)的線(xiàn)程調(diào)用中斷(Thread.interrupt())操作.默認(rèn):true private final HystrixProperty executionIsolationThreadInterruptOnTimeout; // 是否開(kāi)啟請(qǐng)求日志,默認(rèn):true private final HystrixProperty requestLogEnabled; //是否開(kāi)啟請(qǐng)求緩存,默認(rèn):true private final HystrixProperty requestCacheEnabled; // Whether request caching is enabled.
-
HystrixCollapserProperties
//請(qǐng)求合并是允許的最大請(qǐng)求數(shù),默認(rèn): Integer.MAX_VALUE private final HystrixProperty maxRequestsInBatch; //批處理過(guò)程中每個(gè)命令延遲的時(shí)間,默認(rèn):10毫秒 private final HystrixProperty timerDelayInMilliseconds; //批處理過(guò)程中是否開(kāi)啟請(qǐng)求緩存,默認(rèn):開(kāi)啟 private final HystrixProperty requestCacheEnabled;
-
HystrixThreadPoolProperties
/* 配置線(xiàn)程池大小,默認(rèn)值10個(gè). 建議值:請(qǐng)求高峰時(shí)99.5%的平均響應(yīng)時(shí)間 + 向上預(yù)留一些即可 */ private final HystrixProperty corePoolSize; /* 配置線(xiàn)程值等待隊(duì)列長(zhǎng)度,默認(rèn)值:-1 建議值:-1表示不等待直接拒絕,測(cè)試表明線(xiàn)程池使用直接決絕策略+ 合適大小的非回縮線(xiàn)程池效率最高.所以不建議修改此值。 當(dāng)使用非回縮線(xiàn)程池時(shí)蓬衡,queueSizeRejectionThreshold,keepAliveTimeMinutes 參數(shù)無(wú)效 */ private final HystrixProperty maxQueueSize;
參考: