Hystrix的用途以及使用場景就不在這里贅述了耗绿,這里只關(guān)注源碼級別的實現(xiàn)原理杨箭。
1膝蜈、AbstractCommand鹅心、HystrixCommand吕粗、HystrixObservableCommand
1.1 AbstractCommand
Hystrix會把對服務(wù)的請求封裝成命令并通過執(zhí)行命令的方式進行請求調(diào)用,抽象類AbstractCommand即是這些命令的核心父類接口旭愧,此接口完成了大部分的Hystrix的邏輯實現(xiàn)颅筋。
1.1.1 核心屬性
HystrixCircuitBreaker circuitBreaker
Hystrix在執(zhí)行具體的調(diào)用操作之前會調(diào)用HystrixCircuitBreaker的相關(guān)方法判斷熔斷器是否打開,如果熔斷器已經(jīng)打開則直接fallback快速失敗输枯,否則才會進行實際的調(diào)用议泵。下面的章節(jié)會詳細(xì)介紹HystrixCircuitBreaker接口。HystrixThreadPool threadPool
Hystrix線程池桃熄,Hystrix命令執(zhí)行隔離策略有兩種:線程池和信號量先口。默認(rèn)是線程池,即命令會異步運行在線程池中瞳收。信號量則使用原有的線程碉京。使用線程池有一定的延遲,但是可以有較高的吞吐量螟深。HystrixCommandKey commandKey
命令唯一標(biāo)識谐宙,Hystrix會為每一個命令緩存一個HystrixCircuitBreaker對象以及一個HystrixCommandMetrics對象,這些對象被緩存在的靜態(tài)屬性Map中血崭,命令通過HystrixCommandKey標(biāo)識其唯一性卧惜,在使用Feign接口時,命令唯一標(biāo)識即為方法簽名夹纫。HystrixCommandGroupKey commandGroup
命令組唯一標(biāo)識咽瓷,每一個依賴服務(wù)對應(yīng)一個命令組,每一個命令組對應(yīng)一個線程池舰讹,例如茅姜,某應(yīng)用依賴了S1和S2兩個服務(wù),則會有兩個線程池,服務(wù)的調(diào)用運行在自己的線程池中钻洒,在使用Feign接口時奋姿,命令組唯一標(biāo)識即為服務(wù)的名稱。HystrixThreadPoolKey threadPoolKey
線程池唯一標(biāo)識素标,和命令組唯一標(biāo)識保持一致称诗。HystrixThreadPool threadPool
用于執(zhí)行Hystrix命令的線程池。此線程池是服務(wù)級別的头遭,即每一個服務(wù)對應(yīng)一個線程池寓免,那么對這個服務(wù)的所有調(diào)用請求都會在此線程池中執(zhí)行。HystrixCommandMetrics metrics
命令執(zhí)行情況的統(tǒng)計度量计维,Hystrix通過HystrixCommandMetrics記錄和統(tǒng)計命令執(zhí)行的總次數(shù)袜香、失敗次數(shù)等信息。ExecutionResult executionResult
命令執(zhí)行結(jié)果鲫惶,命令執(zhí)行過程中會不斷的把執(zhí)行情況匯總至ExecutionResult蜈首,當(dāng)命令完成后會把ExecutionResult交給HystrixCommandMetrics做統(tǒng)計之用。
1.1.2 核心方法
Observable<R> toObservable()
注冊命令執(zhí)行完成事件以及觸發(fā)事件源欠母。此方法返回一個Cold Observable對象欢策,即在沒有訂閱者時不進行事件發(fā)布,而是等待艺蝴,直到有訂閱者時才發(fā)布事件猬腰。Observable<R> observable()
注冊命令執(zhí)行完成事件以及觸發(fā)事件源。此方法返回一個Hot Observable對象猜敢,即不管是否有訂閱者都會發(fā)布事件姑荷,這種情況下有可能出現(xiàn)訂閱者只觀察到整個過程的一部分的現(xiàn)象。-
Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd)
執(zhí)行熔斷語義缩擂,是熔斷判斷的核心方法鼠冕,Hystrix首先會從緩存中獲取結(jié)果,如果從緩存中獲取結(jié)果失敗則調(diào)用此方法執(zhí)行熔斷語義胯盯,此方法首先調(diào)用HystrixCircuitBreaker#allowRequest()判斷是否允許請求懈费,如果不允許則直接調(diào)用handleShortCircuitViaFallback()方法執(zhí)行快速失敗的邏輯,如果允許則判斷信號量資源是否能獲取博脑,如果獲取失敗則調(diào)用handleSemaphoreRejectionViaFallback()方法執(zhí)行信號量資源拒絕快速失敗邏輯憎乙,如果資源獲取成功則執(zhí)行命令。其源碼如下:private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { // 熔斷器是否允許請求 if (circuitBreaker.allowRequest()) { final TryableSemaphore executionSemaphore = getExecutionSemaphore(); final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false); final Action0 singleSemaphoreRelease = new Action0() { @Override public void call() { if (semaphoreHasBeenReleased.compareAndSet(false, true)) { executionSemaphore.release(); } } }; final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() { @Override public void call(Throwable t) { eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey); } }; // 信號量獲取資源是否成功 if (executionSemaphore.tryAcquire()) { try { /* used to track userThreadExecutionTime */ executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis()); // 成功則執(zhí)行命令 return executeCommandAndObserve(_cmd) .doOnError(markExceptionThrown) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error(e); } } else { // 執(zhí)行信號量獲取資源拒絕快速失敗邏輯 return handleSemaphoreRejectionViaFallback(); } } else { // 執(zhí)行熔斷器開啟是的快速失敗邏輯 return handleShortCircuitViaFallback(); } }
Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd)
注冊熔斷相關(guān)事件叉趣,用于收集執(zhí)行結(jié)果情況泞边,執(zhí)行結(jié)果情況記錄在ExecutionResult中。Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd)
此方法會判斷是需要在線程池中執(zhí)行命令還是使用原有線程執(zhí)行命令疗杉,如果是前者則注冊線程完成相關(guān)的事件阵谚。void handleCommandEnd(boolean commandExecutionStarted)
命令執(zhí)行結(jié)束會調(diào)用此方法,此方法會將ExecutionResult對象交由HystrixCommandMetrics做執(zhí)行情況的統(tǒng)計。Observable<R> handleSemaphoreRejectionViaFallback()
信號量資源拒絕時的快速失敗邏輯實現(xiàn)梢什,最終會執(zhí)行用戶指定的fallback邏輯奠蹬。Observable<R> handleShortCircuitViaFallback()
熔斷器打開時的快速失敗邏輯實現(xiàn),最終會執(zhí)行用戶指定的fallback邏輯嗡午。
1.2. HystrixCommand囤躁、HystrixObservableCommand
Hystrix的兩個核心命令,在使用Hystrix時第一步是創(chuàng)建命令翼馆,那么這里說的命令即是HystrixCommand和HystrixObservableCommand或者是他們的子類割以。
1.2.1 HystrixCommand
AbstractCommand的子類金度,用于返回一個單一的結(jié)果应媚,同時提供同步和異步兩種方式執(zhí)行命令:
- Future<R> queue() 異步方式執(zhí)行命令。
- public R execute() 同步方式執(zhí)行命令猜极。
HystrixCommand是一個抽象類中姜,其中run()方法為抽象方法,在使用時一般需要實現(xiàn)run()和getFallback()兩個方法跟伏,如下:
- protected abstract R run() throws Exception
- protected R getFallback()
這兩個方法分別對應(yīng)正常的結(jié)果返回和快速失敗的結(jié)果返回丢胚,在調(diào)用queue()和execute()時會回調(diào)到這兩個方法。
1.2.2 HystrixObservableCommand
AbstractCommand的子類受扳,用于返回多個結(jié)果携龟,同時提供Hot Observable和Cold Observable兩種執(zhí)行方式:
- Observable<R> observable() 返回一個Hot Observable。
- Observable<R> toObservable() 返回一個Cold Observable勘高。
值得注意的是這兩個方法并不是HystrixObservableCommand中的峡蟋,而是其父類AbstractCommand中的方法。
在使用HystrixObservableCommand時需要實現(xiàn)construct()和resumeWithFallback()兩個方法:
- protected Observable<String> construct()
- protected Observable<String> resumeWithFallback()
這兩個方法分別返回正常的Observable以及快速失敗的Observable华望。在調(diào)用observable()和toObservable()時會回調(diào)到這兩個方法蕊蝗。
所謂返回多個結(jié)果,是說可以通過Observable的相關(guān)方法獲取到多個結(jié)果的迭代器赖舟,如下是一個使用HystrixObservableCommand的小例子:
package com.zws.cloud.consumer;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixObservableCommand;
import rx.Emitter;
import rx.Observable;
import java.util.Iterator;
/**
* @Author wensh.zhu
* @Date 2019/5/7 16:55
*/
public class HystrixObservableCommandTest {
public static void main(String[] args) {
HystrixObservableCommandHello commandHello = new HystrixObservableCommandHello();
// 得到結(jié)果的迭代器
Iterator<String> iterator = commandHello.observe().toBlocking().getIterator();
while (iterator.hasNext()) {
System.out.println(iterator.next());
}
}
}
class HystrixObservableCommandHello extends HystrixObservableCommand<String> {
public HystrixObservableCommandHello() {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("TestGroup")).andCommandKey(HystrixCommandKey.Factory.asKey("Hello")));
}
@Override
protected Observable<String> construct() {
return Observable.<String>create(emitter -> {
emitter.onNext("abc");
/**
// 模擬異常發(fā)生
String name = null;
System.out.println(name.length());
**/
emitter.onNext("efg");
emitter.onCompleted();
}, Emitter.BackpressureMode.NONE).doOnError(throwable -> System.out.println("哎呀蓬戚,觀察到異常啦"));
}
@Override
protected Observable<String> resumeWithFallback() {
return Observable.create(emitter -> {
emitter.onNext("error");
emitter.onNext("ERROR");
emitter.onCompleted();
}, Emitter.BackpressureMode.NONE);
}
}
2、HystrixCircuitBreaker
HystrixCircuitBreaker是Hystrix的核心接口宾抓,其實現(xiàn)類為HystrixCircuitBreaker.HystrixCircuitBreakerImpl子漩,重要方法如下:
-
boolean isOpen()
熔斷器是否打開,類HystrixCircuitBreakerImpl中有一類型為AtomicBoolean的屬性circuitOpen石洗,表示熔斷器是否打開幢泼。此方法首先判斷circuitOpen是否為true,如果為true則直接返回true劲腿。否則判斷請求次數(shù)是否達到熔斷請求次數(shù)閾值旭绒,此閾值默認(rèn)為20,如果沒有達到則直接返回false,如果請求次數(shù)大于等于20挥吵,則判斷這些請求中失敗的請求次數(shù)占比是否小于50%(默認(rèn))重父,如果小于則返回false,否則將circuitOpen設(shè)置為true忽匈,并更新熔斷時間戳房午,熔斷時間戳記錄在類HystrixCircuitBreakerImpl的屬性circuitOpenedOrLastTestedTime中,類型為AtomicLong丹允。所以綜上可以得出熔斷器打開需要滿足兩個條件:- 請求次數(shù)大于等于20(默認(rèn))
- 失敗請求次數(shù)占比大于等于50%(默認(rèn))
-
boolean allowSingleTest()
熔斷器是否處于半打開狀態(tài)郭厌。此方法的邏輯是:如果熔斷器沒有打開(即circuitOpen為false)則直接返回false,如果熔斷器已經(jīng)打開則判斷當(dāng)前時間戳是否大于熔斷器打開的時間戳+熔斷窗口時間(默認(rèn)5秒)雕蔽,如果是則更新熔斷器打開的時間戳為當(dāng)前時間并返回true折柠。也就是說:- 熔斷器打開后需要等待至少5秒才可以再次嘗試請求。
- 如果此次嘗試失敗則至少再等待5秒才可以嘗試批狐。
-
void markSuccess()
此方法會在熔斷器打開的狀態(tài)下:-
關(guān)閉熔斷器
即把circuitOpen設(shè)置為false扇售。 -
并將統(tǒng)計數(shù)據(jù)置零
調(diào)用HystrixCommandMetrics的resetStream()方法。
-
關(guān)閉熔斷器
-
boolean allowRequest()
是否允許請求嚣艇,Hystrix在執(zhí)行具體的調(diào)用之前通過調(diào)用此方法判斷是否允許請求承冰,如果不允許則直接fullback快速失敗。此方法調(diào)用isOpen()和allowSingleTest()返回熔斷器是否關(guān)閉或半開食零,具體邏輯:首先判斷調(diào)用isOpen()方法判斷熔斷器是否打開困乒,如果熔斷器打開則直接返回false,否則調(diào)用allowSingleTest()方法判斷是否允許嘗試請求贰谣,如果是則返回true娜搂,否則返回false。HystrixCircuitBreakerImpl源碼如下:
static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker { private final HystrixCommandProperties properties; private final HystrixCommandMetrics metrics; // 保存熔斷器狀態(tài):開啟or關(guān)閉 private AtomicBoolean circuitOpen = new AtomicBoolean(false); // 保存熔斷器開啟或最后更新時間戳 private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong(); protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties, HystrixCommandMetrics metrics) { this.properties = properties; this.metrics = metrics; } public void markSuccess() { if (circuitOpen.get()) { // 熔斷器關(guān)閉并重置統(tǒng)計數(shù)據(jù) if (circuitOpen.compareAndSet(true, false)) { metrics.resetStream(); } } } @Override public boolean allowRequest() { if (properties.circuitBreakerForceOpen().get()) { return false; } if (properties.circuitBreakerForceClosed().get()) { isOpen(); return true; } // 熔斷器是否關(guān)閉或者半開 return !isOpen() || allowSingleTest(); } public boolean allowSingleTest() { long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get(); // 熔斷器開啟且開啟時長大于等于5秒則為半開狀態(tài) if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) { // 更新熔斷器開啟時間戳 if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) { return true; } } return false; } @Override public boolean isOpen() { if (circuitOpen.get()) { return true; } HealthCounts health = metrics.getHealthCounts(); // 請求次數(shù)小于20直接返回false if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) { return false; } if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) { return false; } else { // 請求失敗次數(shù)占比大于等于50%開啟熔斷器冈爹、記錄開啟時間戳 if (circuitOpen.compareAndSet(false, true)) { circuitOpenedOrLastTestedTime.set(System.currentTimeMillis()); return true; } else { return true; } } } }
熔斷器對象是命令級別的守呜,也就是說每一個命令都會對應(yīng)一個HystrixCircuitBreaker對象椿访,這些HystrixCircuitBreaker對象被緩存在類HystrixCircuitBreaker.Factory的屬性circuitBreakersByCommand中听系,如下:
private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();
3术浪、HystrixCommandMetrics、HealthCountsStream憋肖、HystrixEventStream因痛、HystrixCommandMetrics.HealthCounts
這些都是和Hystrix的命令執(zhí)行情況統(tǒng)計相關(guān)的類,這些統(tǒng)計信息直接影響到Hystrix的熔斷斷判岸更。
3.1 HystrixCommandMetrics.HealthCounts
此類保存了命令執(zhí)行的總次數(shù)鸵膏、失敗次數(shù)以及失敗次數(shù)的占比,plus()方法則用于更新這些數(shù)據(jù)怎炊,HystrixCircuitBreaker就是根據(jù)此類的統(tǒng)計信息做熔斷器的打開和半開判斷的谭企,源碼如下:
public static class HealthCounts {
private final long totalCount;
private final long errorCount;
private final int errorPercentage;
HealthCounts(long total, long error) {
this.totalCount = total;
this.errorCount = error;
if (totalCount > 0) {
this.errorPercentage = (int) ((double) errorCount / totalCount * 100);
} else {
this.errorPercentage = 0;
}
}
// 略
public HealthCounts plus(long[] eventTypeCounts) {
long updatedTotalCount = totalCount;
long updatedErrorCount = errorCount;
long successCount = eventTypeCounts[HystrixEventType.SUCCESS.ordinal()];
long failureCount = eventTypeCounts[HystrixEventType.FAILURE.ordinal()];
long timeoutCount = eventTypeCounts[HystrixEventType.TIMEOUT.ordinal()];
long threadPoolRejectedCount = eventTypeCounts[HystrixEventType.THREAD_POOL_REJECTED.ordinal()];
long semaphoreRejectedCount = eventTypeCounts[HystrixEventType.SEMAPHORE_REJECTED.ordinal()];
updatedTotalCount += (successCount + failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
updatedErrorCount += (failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
return new HealthCounts(updatedTotalCount, updatedErrorCount);
}
}
3.2 HealthCountsStream
從上面我們知道HystrixCommandMetrics.HealthCounts保存了命令執(zhí)行的總次數(shù)廓译、失敗次數(shù),那么這些統(tǒng)計數(shù)字是怎么來的呢债查?答案就是HealthCountsStream類非区,首先此類的父類BucketedCounterStream中有一個BehaviorSubject<Output>類型的屬性counterSubject,Output是泛型盹廷,這里即是HystrixCommandMetrics.HealthCounts征绸。HealthCountsStream保存了一個回調(diào)函數(shù),用于通過調(diào)用HystrixCommandMetrics.HealthCounts的plus()方法累計命令執(zhí)行的總次數(shù)和失敗次數(shù)俄占。每當(dāng)HealthCountsStream接受到事件時都會回調(diào)此函數(shù)管怠。那么事件時在哪里觸發(fā)的呢?答案是HystrixEventStream缸榄。
3.3 HystrixEventStream
HystrixEventStream用于發(fā)布命令相關(guān)事件渤弛,其實現(xiàn)類有HystrixCommandCompletionStream、HystrixThreadPoolStartStream碰凶、HystrixCollapserEventStream等暮芭,每一個實現(xiàn)類都有一個write()方法,此方法發(fā)布一種命令相關(guān)事件欲低,例如:HystrixCommandCompletionStream負(fù)責(zé)發(fā)布命令完成事件,HystrixThreadPoolStartStream發(fā)布線程池開始事件畜晰。這些實現(xiàn)類發(fā)布的事件會被上面說的類HealthCountsStream中的一個回調(diào)函數(shù)監(jiān)聽到砾莱。
3.4 HystrixCommandMetrics
此類用于重置統(tǒng)計信息以及接受命令執(zhí)行完成和開始的通知。當(dāng)命令開始執(zhí)行和執(zhí)行結(jié)束時分別調(diào)用此類的markCommandStart()方法和markCommandDone()方法凄鼻,這兩個方法分別通過HystrixCommandStartStream和HystrixCommandCompletionStream發(fā)布命令開始和結(jié)束事件腊瑟。另外HystrixCircuitBreaker的markSuccess()方法被調(diào)用時會調(diào)用HystrixCommandMetrics#resetStream()方法,此方法會刪除緩存中命令對應(yīng)的HystrixCommandMetrics并創(chuàng)建一個新的HystrixCommandMetrics块蚌。相關(guān)源碼如下:
void markCommandStart(HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties.ExecutionIsolationStrategy isolationStrategy) {
int currentCount = concurrentExecutionCount.incrementAndGet();
HystrixThreadEventStream.getInstance().commandExecutionStarted(commandKey, threadPoolKey, isolationStrategy, currentCount);
}
void markCommandDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, boolean executionStarted) {
HystrixThreadEventStream.getInstance().executionDone(executionResult, commandKey, threadPoolKey);
if (executionStarted) {
concurrentExecutionCount.decrementAndGet();
}
}
synchronized void resetStream() {
healthCountsStream.unsubscribe();
HealthCountsStream.removeByKey(key);
healthCountsStream = HealthCountsStream.getInstance(key, properties);
}
4闰非、Feign與Hystrix
4.1 Hystrix依賴與配置
在我們使用Spring Cloud時,更多是在使用Feign接口時使用Hystrix峭范,那么在Feign接口中使用Hystrix需要那些配置呢财松?很簡單:
- 引入Hystrix依賴,如下:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix</artifactId> </dependency>
- 配置項eign.hystrix.enabled設(shè)置為true纱控。
feign: hystrix: enabled: true
4.2 HystrixInvocationHandler
當(dāng)配置項eign.hystrix.enabled設(shè)置為true時辆毡,配置類FeignClientsConfiguration的內(nèi)部靜態(tài)類HystrixFeignConfiguration會向Spring上下文中注入類型為HystrixFeign.Builder的Bean,如下:
@Configuration
@ConditionalOnClass({ HystrixCommand.class, HystrixFeign.class })
protected static class HystrixFeignConfiguration {
@Bean
@Scope("prototype")
@ConditionalOnMissingBean
@ConditionalOnProperty(name = "feign.hystrix.enabled")
public Feign.Builder feignHystrixBuilder() {
return HystrixFeign.builder();
}
}
public final class HystrixFeign {
public static Builder builder() {
return new Builder();
}
public static final class Builder extends Feign.Builder {
Feign build(final FallbackFactory<?> nullableFallbackFactory) {
super.invocationHandlerFactory(new InvocationHandlerFactory() {
@Override
public InvocationHandler create(Target target,
Map<Method, MethodHandler> dispatch) {
// 這里返回HystrixInvocationHandler對象
return new HystrixInvocationHandler(target, dispatch, setterFactory,
nullableFallbackFactory);
}
});
super.contract(new HystrixDelegatingContract(contract));
return super.build();
}
// 略
}
}
HystrixFeign.builder()方法返回HystrixFeign的內(nèi)部類HystrixFeign.Builder甜害,當(dāng)為Feign接口生成代理對象時舶掖,HystrixFeign.Builder類的build()方法被調(diào)用,此方法會創(chuàng)建執(zhí)行處理器HystrixInvocationHandler尔店,當(dāng)Feign接口執(zhí)行時就會調(diào)用HystrixInvocationHandler的invoke()方法眨攘,那么接下來就簡單了主慰,invoke()方法創(chuàng)建HystrixCommand對象并執(zhí)行其execute()方法,如下:
public Object invoke(final Object proxy, final Method method, final Object[] args)
throws Throwable {
// 略
HystrixCommand<Object> hystrixCommand =
new HystrixCommand<Object>(setterMethodMap.get(method)) {
@Override
protected Object run() throws Exception {
try {
return HystrixInvocationHandler.this.dispatch.get(method).invoke(args);
} catch (Exception e) {
throw e;
} catch (Throwable t) {
throw (Error) t;
}
}
@Override
protected Object getFallback() {
if (fallbackFactory == null) {
return super.getFallback();
}
try {
Object fallback = fallbackFactory.create(getExecutionException());
Object result = fallbackMethodMap.get(method).invoke(fallback, args);
if (isReturnsHystrixCommand(method)) {
return ((HystrixCommand) result).execute();
} else if (isReturnsObservable(method)) {
// Create a cold Observable
return ((Observable) result).toBlocking().first();
} else if (isReturnsSingle(method)) {
// Create a cold Observable as a Single
return ((Single) result).toObservable().toBlocking().first();
} else if (isReturnsCompletable(method)) {
((Completable) result).await();
return null;
} else {
return result;
}
} catch (IllegalAccessException e) {
throw new AssertionError(e);
} catch (InvocationTargetException e) {
throw new AssertionError(e.getCause());
}
}
};
if (Util.isDefault(method)) {
return hystrixCommand.execute();
} else if (isReturnsHystrixCommand(method)) {
return hystrixCommand;
} else if (isReturnsObservable(method)) {
return hystrixCommand.toObservable();
} else if (isReturnsSingle(method)) {
return hystrixCommand.toObservable().toSingle();
} else if (isReturnsCompletable(method)) {
return hystrixCommand.toObservable().toCompletable();
}
return hystrixCommand.execute();
}
5 注解的方式使用Hystrix
當(dāng)不使用Feign接口時我們還可以使用注解@HystrixCommand和@HystrixCollapse的方式使用Hystrix鲫售。前者執(zhí)行單一命令河哑,后者則可以聚合多個請求最終以批量的方式執(zhí)行命令。
5.1 注解的配置與使用
-
配置
在SpringBoot啟動類添加@EnableCircuitBreaker注解龟虎。 -
使用
下面是一個使用注解的小例子:@Service public class HelloService { @Resource private RestTemplate restTemplate; @HystrixCommand(groupKey = "gTest", commandKey = "hi", fallbackMethod = "fallBack") public String hi() { return restTemplate.getForEntity("http://Eureka-Producer/hi", String.class).getBody(); } public String fallBack() { return "熔斷發(fā)生"; } @HystrixCollapser(batchMethod = "getMembers", collapserProperties = { // 收集1秒內(nèi)的請求 @HystrixProperty(name = "timerDelayInMilliseconds", value = "1000") }) public Future<String> getMember(Integer id) { System.out.println("執(zhí)行單個查詢的方法"); return null; } @HystrixCommand public List<String> getMembers(List<Integer> ids) { System.out.println("執(zhí)行合并操作調(diào)用"); List<String> mems = new ArrayList<>(); for(Integer id : ids) { mems.add("name" + id); } return mems; } } @RestController public class HelloController { @Resource private HelloService helloService; @GetMapping("/hi") public String hi() { return helloService.hi(); } @GetMapping("/coll") public String coll() throws ExecutionException, InterruptedException { HystrixRequestContext.initializeContext(); // 這里模擬短時間內(nèi)3次調(diào)用 Future<String> f1 = helloService.getMember(1); Future<String> f2 = helloService.getMember(2); Future<String> f3 = helloService.getMember(3); return f1.get() + ", " + f2.get() + ", " + f3.get(); } }
5.2 注解的實現(xiàn)原理
@EnableCircuitBreaker注解會使配置類HystrixCircuitBreakerConfiguration會被加載璃谨,此配置類向Spring上下文中注入類一個切面Bean:HystrixCommandAspect,源碼如下:
@Configuration
public class HystrixCircuitBreakerConfiguration {
@Bean
public HystrixCommandAspect hystrixCommandAspect() {
return new HystrixCommandAspect();
}
// 略
}
此切面會攔截標(biāo)記了@HystrixCommand和@HystrixCollapse的方法鲤妥,并根據(jù)不同注解以及相關(guān)配置選擇對應(yīng)的命令并執(zhí)行佳吞,如下:
@Aspect
public class HystrixCommandAspect {
private static final Map<HystrixPointcutType, MetaHolderFactory> META_HOLDER_FACTORY_MAP;
static {
META_HOLDER_FACTORY_MAP = ImmutableMap.<HystrixPointcutType, MetaHolderFactory>builder()
.put(HystrixPointcutType.COMMAND, new CommandMetaHolderFactory())
.put(HystrixPointcutType.COLLAPSER, new CollapserMetaHolderFactory())
.build();
}
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
public void hystrixCommandAnnotationPointcut() {
}
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
public void hystrixCollapserAnnotationPointcut() {
}
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
Method method = getMethodFromTarget(joinPoint);
Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
"annotations at the same time");
}
MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
Object result;
try {
if (!metaHolder.isObservable()) {
result = CommandExecutor.execute(invokable, executionType, metaHolder);
} else {
result = executeObservable(invokable, executionType, metaHolder);
}
} catch (HystrixBadRequestException e) {
throw e.getCause();
} catch (HystrixRuntimeException e) {
throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
}
return result;
}
// 略
}
public class HystrixCommandFactory {
private static final HystrixCommandFactory INSTANCE = new HystrixCommandFactory();
private HystrixCommandFactory() {
}
public static HystrixCommandFactory getInstance() {
return INSTANCE;
}
public HystrixInvokable create(MetaHolder metaHolder) {
HystrixInvokable executable;
// 選擇創(chuàng)建對應(yīng)的命令
if (metaHolder.isCollapserAnnotationPresent()) {
executable = new CommandCollapser(metaHolder);
} else if (metaHolder.isObservable()) {
executable = new GenericObservableCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
} else {
executable = new GenericCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
}
return executable;
}
// 略
}
6、總結(jié)
Hystrix會把請求封裝成命令(HystrixCommand或HystrixObservableCommand)棉安,并以執(zhí)行命令的方式進行請求的調(diào)用底扳,整個過程如下:
- 創(chuàng)建命令HystrixCommand或者HystrixObservableCommand。
- 調(diào)用命令對應(yīng)的方法贡耽,HystrixCommand:execute()或queue()衷模,HystrixObservableCommand:toObservable()或observe()。
- 判斷緩存是否開啟緩存以及緩存中是否已經(jīng)緩存了請求結(jié)果蒲赂,如果有則直接返回阱冶。
- HystrixCircuitBreaker熔斷器是否允許請求,如果不允許則執(zhí)行快速失敗邏輯滥嘴。
- 信號量/線程池資源獲取是否成功木蹬,如果失敗則執(zhí)行快速失敗邏輯。
- 執(zhí)行run()方法或者construct()方法若皱。
- 返回結(jié)果更新統(tǒng)計數(shù)據(jù)镊叁。
放個彩蛋: