CircuitBreaker
CircuitBreaker主要由以下幾個部分組成:配置她奥、注冊器怔蚌、熔斷器敷硅、度量指標(biāo)功咒、事件發(fā)布及熔斷器的工作原理愉阎。接下來會逐一介紹。
CircuitBreaker配置
基本配置
首先是CircuitBreaker的一些可配置項力奋,在CircuitBreakerConfig中:
這是一個獨立的類榜旦,里面包含熔斷器的可配置項,提供了一個內(nèi)部類Builder來構(gòu)建配置景殷,主要通過三個方法實現(xiàn):
- ofDefaults():使用默認配置
- custom():返回一個Builder對象
- from(CircuitBreakerConfig baseConfig):返回一個包含baseConfig的Builder對象
得到Builder對象后就可以根據(jù)Builder提供的方法構(gòu)建配置
SpringBoot配置
CircuitBreakerProperties
首先是CircuitBreakerProperties溅呢,整體繼承關(guān)系如下圖:
在CircuitBreakerProperties類中添加了@ConfigurationProperties(prefix = "resilience4j.circuitbreaker")注解,是讀取yml或properties文件中配置的入口滨彻。
CircuitBreakerConfigurationProperties添加了@Configuration注解藕届,并在其中定義了Aspect的順序挪蹭,即之前提到的注解使用熔斷器亭饵、限流器、重試組件和隔板組件的切入順序梁厉。
基類的CircuitBreakerConfigurationProperties包含了兩個Map<String, InstanceProperties> instances辜羊、Map<String, InstanceProperties> configs以及一個內(nèi)部靜態(tài)類InstanceProperties。InstanceProperties中除了熔斷器的各項配置外词顾,還有一個baseConfig的可配置項八秃,在yml和properties文件中的配置最終都會進入instances、configs這兩個map中肉盹。例如如下的yml文件:
resilience4j:
circuitbreaker:
configs:
myDefault:
ringBufferSizeInClosedState: 10 # 熔斷器關(guān)閉時的緩沖區(qū)大小
ringBufferSizeInHalfOpenState: 5 # 熔斷器半開時的緩沖區(qū)大小
waitDurationInOpenState: 10000 # 熔斷器從打開到半開需要的時間
failureRateThreshold: 60 # 熔斷器打開的失敗閾值
eventConsumerBufferSize: 10 # 事件緩沖區(qū)大小
registerHealthIndicator: true # 健康監(jiān)測
automaticTransitionFromOpenToHalfOpenEnabled: false # 是否自動從打開到半開
recordFailurePredicate: com.example.resilience4j.predicate.RecordFailurePredicate # 謂詞設(shè)置異常是否為失敗
recordExceptions: # 記錄的異常
- com.example.resilience4j.exceptions.BusinessBException
ignoreExceptions: # 忽略的異常
- com.example.resilience4j.exceptions.BusinessAException
instances:
backendA:
baseConfig: myDefault
waitDurationInOpenState: 5000
failureRateThreshold: 20
myDefault將作為key昔驱,failureRateThreshold等將作為InstanceProperties對象中的屬性一起存入configs這個Map中,instances也是如此上忍。
CircuitBreakerConfigurationProperties提供了一個createCircuitBreakerConfig方法用于創(chuàng)建CircuitBreakerConfig:
public CircuitBreakerConfig createCircuitBreakerConfig(InstanceProperties instanceProperties) {
// 如果配置項中有baseConfig,就去configs中找到baseConfig
if (StringUtils.isNotEmpty(instanceProperties.getBaseConfig())) {
InstanceProperties baseProperties = configs.get(instanceProperties.getBaseConfig());
if (baseProperties == null) {
throw new ConfigurationNotFoundException(instanceProperties.getBaseConfig());
}
//調(diào)用buildConfigFromBaseConfig方法使用instanceProperties覆蓋baseProperties
return buildConfigFromBaseConfig(instanceProperties, baseProperties);
}
//如果沒有調(diào)用buildConfig方法返回CircuitBreakerConfig
return buildConfig(CircuitBreakerConfig.custom(), instanceProperties);
}
private CircuitBreakerConfig buildConfigFromBaseConfig(InstanceProperties instanceProperties, InstanceProperties baseProperties) {
ConfigUtils.mergePropertiesIfAny(instanceProperties, baseProperties);
//覆蓋的實現(xiàn)調(diào)用了CircuitBreakerConfig的custom()方法和from()方法
CircuitBreakerConfig baseConfig = buildConfig(CircuitBreakerConfig.custom(), baseProperties);
return buildConfig(CircuitBreakerConfig.from(baseConfig), instanceProperties);
}
CircuitBreakerConfiguration
接下來看看這些配置是如何配置到注冊器中的骤肛,在CircuitBreakerConfiguration中:
CircuitBreakerConfiguration中的circuitBreakerRegistry方法使用@Bean注解注入了注冊器,方法中主要做了3件事:
-
創(chuàng)建注冊器
public CircuitBreakerRegistry createCircuitBreakerRegistry(CircuitBreakerConfigurationProperties circuitBreakerProperties) { //把Map<String, InstanceProperties> 轉(zhuǎn)換為Map<String, CircuitBreakerConfig> Map<String, CircuitBreakerConfig> configs = circuitBreakerProperties.getConfigs() .entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> circuitBreakerProperties.createCircuitBreakerConfig(entry.getValue()))); //調(diào)用registry的靜態(tài)of方法新建一個InMemoryCircuitBreakerRegistry,放入configurations,Map作為傳入?yún)?shù) return CircuitBreakerRegistry.of(configs); }
注冊事件消費者:說到事件的時候再提
初始化注冊器
public void initCircuitBreakerRegistry(CircuitBreakerRegistry circuitBreakerRegistry) {
//把circuitBreakerProperties中的instances中的配置先轉(zhuǎn)換,再用注冊器構(gòu)建實例放入entryMap
circuitBreakerProperties.getInstances().forEach(
(name, properties) -> circuitBreakerRegistry.circuitBreaker(name, circuitBreakerProperties.createCircuitBreakerConfig(properties)));
}
動態(tài)配置
基于以上結(jié)果窍蓝,若使用配置中心完成配置的動態(tài)刷新腋颠,刷新的內(nèi)容是CircuitBreakerConfigurationProperties中的內(nèi)容,若要使配置生效需要把CircuitBreakerConfigurationProperties中的配置添加到circuitBreakerRegistry中吓笙,并且替換之前的注冊器實例淑玫,如下:
//1.獲取更新的配置
Map<String, CircuitBreakerConfig> configs = circuitBreakerProperties.getConfigs()
.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
entry -> circuitBreakerProperties.createCircuitBreakerConfig(entry.getValue())));
//2.將新配置加入注冊器
configs.forEach(circuitBreakerRegistry::addConfiguration);
//3.替換注冊器中的實例
circuitBreakerProperties.getInstances().forEach(
(name, properties) -> circuitBreakerRegistry.replace(name, CircuitBreaker.of(
name, circuitBreakerProperties.createCircuitBreakerConfig(properties))));
CircuitBreaker注冊器
InMemoryCircuitBreakerRegistry是注冊器的實現(xiàn)類,繼承及實現(xiàn)關(guān)系如下圖:
CircuitBreakerRegistry的創(chuàng)建
承接之前配置讀入的過程我們看一下InMemoryCircuitBreakerRegistry的構(gòu)造方法面睛,因為registry的靜態(tài)of方法其實就是調(diào)用了InMemoryCircuitBreakerRegistry的構(gòu)造方法絮蒿,3個of方法對應(yīng)3個構(gòu)造方法,如下:
//無參構(gòu)造方法,其實就是使用默認配置調(diào)用第三個構(gòu)造方法,這個默認配置是內(nèi)置的,不受spring配置注入的影響
public InMemoryCircuitBreakerRegistry() {
this(CircuitBreakerConfig.ofDefaults());
}
//批量配置,首先會查看configs中是否有名為"default"的配置,如果有則將該配置放入configurations中,沒有則放入內(nèi)置的默認配置
public InMemoryCircuitBreakerRegistry(Map<String, CircuitBreakerConfig> configs) {
this(configs.getOrDefault(DEFAULT_CONFIG, CircuitBreakerConfig.ofDefaults()));
//然后把其他配置放入configurations中
this.configurations.putAll(configs);
}
//使用指定的配置作為默認配置
public InMemoryCircuitBreakerRegistry(CircuitBreakerConfig defaultConfig) {
super(defaultConfig);
}
假設(shè)目前我們有3個配置叁鉴,分別為:
- 內(nèi)置的默認配置:default
- 在配置文件中配置名為default的配置:myDefault
- 在配置文件中配置名為configA的配置:configA
如果我們配置了2和3歌径,從Bean中拿到的注冊器配置屬性如下:
如果我們只配置了3,從Bean中拿到的注冊器配置屬性如下:
如果我們不從Bean中拿亲茅,而是直接new CircuitBreakerRegistry()回铛,則注冊器配置屬性如下:
上面3張圖說明了三個問題:
- 通過配置文件注入到Bean的注冊器狗准,默認的配置可以被人為覆蓋
- 如果不添加名為default的配置,Bean中的注冊器會把內(nèi)置的默認配置放入注冊器中
- 直接新建一個注冊器會取內(nèi)置的配置而不會從配置文件中去拿
從CircuitBreakerRegistry中獲取CircuitBreaker
從注冊器中獲取CircuitBreaker有4種方式:
//從entryMap中獲取所有的熔斷器
@Override
public Seq<CircuitBreaker> getAllCircuitBreakers() {
return Array.ofAll(entryMap.values());
}
//根據(jù)名字獲取熔斷器茵肃,實際是使用默認的配置調(diào)用下面的方法
@Override
public CircuitBreaker circuitBreaker(String name) {
return circuitBreaker(name, getDefaultConfig());
}
//根據(jù)名稱及配置獲取熔斷器
@Override
public CircuitBreaker circuitBreaker(String name, CircuitBreakerConfig config) {
return computeIfAbsent(name, () -> CircuitBreaker
.of(name, Objects.requireNonNull(config, CONFIG_MUST_NOT_BE_NULL)));
}
//根據(jù)名稱及配置名稱獲取熔斷器
@Override
public CircuitBreaker circuitBreaker(String name, String configName) {
return computeIfAbsent(name, () -> CircuitBreaker.of(name, getConfiguration(configName)
.orElseThrow(() -> new ConfigurationNotFoundException(configName))));
}
這里有個名叫computeIfAbsent的方法腔长,這個方法內(nèi)部簡單來說做了以下的事:如果entryMap中存在名為name的熔斷器,就獲取該熔斷器验残,如果不存在該熔斷器捞附,就使用config這個配置新建一個名為name的熔斷器放入entryMap中,然后返回您没。所以以下三種方法熔斷器的邏輯如下:
還是假設(shè)目前我們有3個配置鸟召,分別為:
- 內(nèi)置的默認配置:default
- 在配置文件中配置名為default的配置:myDefault
- 在配置文件中配置名為configA的配置:configA
還配置了一個熔斷器實例:backendA,并且baseConfig為configA
如果我們配置了2氨鹏、3欧募,并使用3個方法分別去Bean中的注冊器拿名為backendA的熔斷器,得到的熔斷器如下:
同樣的情況下使用3個方法分別去Bean中的注冊器拿名為backendB的熔斷器仆抵,得到的熔斷器如下:
而對于新建的注冊器跟继,分別用3個方法去注冊器中拿名為backend的熔斷器,得到的熔斷器如下:
CircuitBreaker狀態(tài)
CircuitBreaker共有5個狀態(tài)镣丑,通過實現(xiàn)CircuitBreakerState接口實現(xiàn)各狀態(tài)的功能:
CircuitBreakerState共有7個方法和一個default方法:
// 用于返回是否允許通過熔斷器
boolean tryAcquirePermission();
// 主要用于打開狀態(tài)時熔斷器的觸發(fā)轉(zhuǎn)換和半開狀態(tài)時限制線程數(shù)
void acquirePermission();
// 主要用于半開狀態(tài)時線程數(shù)釋放
void releasePermission();
// 調(diào)用失敗舔糖,計算失敗率
void onError(Throwable throwable);
// 調(diào)用成功,計算失敗率
void onSuccess();
// 返回當(dāng)前狀態(tài)在CircuitBreaker的枚舉值
CircuitBreaker.State getState();
// 返回當(dāng)前封裝了度量指標(biāo)的類實例
CircuitBreakerMetrics getMetrics();
// 決定是否會發(fā)布事件
default boolean shouldPublishEvents(CircuitBreakerEvent event){
return event.getEventType().forcePublish || getState().allowPublish;
}
關(guān)閉狀態(tài)
關(guān)閉狀態(tài)比較簡單莺匠,所有請求都會通過金吗,在成功或失敗的時候分別調(diào)用度量指標(biāo)的onSuccess()或onError()方法,然后檢查失敗率進行狀態(tài)裝換趣竣。
@Override
public boolean tryAcquirePermission() {
return true;
}
@Override
public void onError(Throwable throwable) {
// CircuitBreakerMetrics是線程安全的
checkFailureRate(circuitBreakerMetrics.onError());
}
@Override
public void onSuccess() {
// CircuitBreakerMetrics是線程安全的
checkFailureRate(circuitBreakerMetrics.onSuccess());
}
private void checkFailureRate(float currentFailureRate) {
if (currentFailureRate >= failureRateThreshold) {
transitionToOpenState();
}
}
打開狀態(tài)
打開狀態(tài)的構(gòu)造方法會拿取isAutomaticTransitionFromOpenToHalfOpenEnabled配置摇庙,如果為true,就會調(diào)用scheduledExecutorService的schedule方法時間達到的時候自動轉(zhuǎn)換期贫,如果為false跟匆,就必須等到別的請求調(diào)用了acquirePermission方法:
OpenState(CircuitBreakerMetrics circuitBreakerMetrics) {
final Duration waitDurationInOpenState = circuitBreakerConfig.getWaitDurationInOpenState();
this.retryAfterWaitDuration = clock.instant().plus(waitDurationInOpenState);
this.circuitBreakerMetrics = circuitBreakerMetrics;
// 檢查是否自動轉(zhuǎn)換
if (circuitBreakerConfig.isAutomaticTransitionFromOpenToHalfOpenEnabled()) {
ScheduledExecutorService scheduledExecutorService = schedulerFactory.getScheduler();
scheduledExecutorService.schedule(CircuitBreakerStateMachine.this::transitionToHalfOpenState, waitDurationInOpenState.toMillis(), TimeUnit.MILLISECONDS);
}
}
@Override
public boolean tryAcquirePermission() {
// 請求進來時檢查時間,如果時間到了通砍,就狀態(tài)轉(zhuǎn)換并且放請求通過
if (clock.instant().isAfter(retryAfterWaitDuration)) {
transitionToHalfOpenState();
return true;
}
circuitBreakerMetrics.onCallNotPermitted();
return false;
}
@Override
public void acquirePermission() {
if(!tryAcquirePermission()){
throw new CallNotPermittedException(CircuitBreakerStateMachine.this);
}
}
// 成功和失敗仍然調(diào)用Metrics的方法增加成功或失敗次數(shù)玛臂,并且計算失敗率
@Override
public void onError(Throwable throwable) {
circuitBreakerMetrics.onError();
}
@Override
public void onSuccess() {
circuitBreakerMetrics.onSuccess();
}
半開狀態(tài)
半開狀態(tài)和關(guān)閉狀態(tài)大體相似,但略有不同封孙,請求通過時并不是全都通過迹冤,而是使用原子變量testRequestCounter,來限制請求數(shù)虎忌,testRequestCounter的大小為半開狀態(tài)環(huán)的大小泡徙,若超過允許的請求數(shù)則直接拒絕:
@Override
public boolean tryAcquirePermission() {
// 查看半開狀態(tài)環(huán)是否裝滿,沒裝滿就進入膜蠢,裝滿就拒絕
if (testRequestCounter.getAndUpdate(current -> current == 0 ? current : --current) > 0) {
return true;
}
circuitBreakerMetrics.onCallNotPermitted();
return false;
}
@Override
public void acquirePermission() {
if(!tryAcquirePermission()){
throw new CallNotPermittedException(CircuitBreakerStateMachine.this);
}
}
// 請求完成會釋放拿著的原子變量
@Override
public void releasePermission() {
testRequestCounter.incrementAndGet();
}
// 根據(jù)失敗率完成兩種狀態(tài)的轉(zhuǎn)換
private void checkFailureRate(float currentFailureRate) {
if(currentFailureRate != -1){
if(currentFailureRate >= failureRateThreshold) {
transitionToOpenState();
}else{
transitionToClosedState();
}
}
}
ForcedOpenState和DisabledState兩種狀態(tài)比較簡單堪藐,強制開啟就直接熔斷也不計算失敗率莉兰,禁止?fàn)顟B(tài)什么都不干,只能拿到狀態(tài)枚舉和度量指標(biāo)礁竞。
CircuitBreaker狀態(tài)機
有了狀態(tài)還需要狀態(tài)機來控制狀態(tài)的轉(zhuǎn)換糖荒,CircuitBreakerStateMachine實現(xiàn)了CircuitBreaker接口,同時又一個內(nèi)部類CircuitBreakerEventProcessor來處理事件模捂,是熔斷器的核心部分捶朵,繼承實現(xiàn)關(guān)系如下:
State和StateTransition是兩個CircuitBreaker接口中的內(nèi)部枚舉
State
定義了幾個狀態(tài)的枚舉類型,allowPublish設(shè)置了每種狀態(tài)是否允許發(fā)布事件
enum State {
DISABLED(3, false),
CLOSED(0, true),
OPEN(1, true),
FORCED_OPEN(4, false),
HALF_OPEN(2, true);
private final int order;
public final boolean allowPublish;
State(int order, boolean allowPublish){
this.order = order;
this.allowPublish = allowPublish;
}
public int getOrder(){
return order;
}
}
StateTransition
定義了狀態(tài)轉(zhuǎn)換的枚舉狂男,存在一個元組<State, State>為鍵综看,枚舉值為StateTransition的Map中,同時提供一個狀態(tài)轉(zhuǎn)換的方法岖食,能夠根據(jù)轉(zhuǎn)換的兩個狀態(tài)的枚舉值返回StateTransition红碑,主要便于發(fā)布事件。
enum StateTransition {
CLOSED_TO_OPEN(State.CLOSED, State.OPEN),
CLOSED_TO_DISABLED(State.CLOSED, State.DISABLED),
CLOSED_TO_FORCED_OPEN(State.CLOSED, State.FORCED_OPEN),
HALF_OPEN_TO_CLOSED(State.HALF_OPEN, State.CLOSED),
HALF_OPEN_TO_OPEN(State.HALF_OPEN, State.OPEN),
HALF_OPEN_TO_DISABLED(State.HALF_OPEN, State.DISABLED),
HALF_OPEN_TO_FORCED_OPEN(State.HALF_OPEN, State.FORCED_OPEN),
OPEN_TO_CLOSED(State.OPEN, State.CLOSED),
OPEN_TO_HALF_OPEN(State.OPEN, State.HALF_OPEN),
OPEN_TO_DISABLED(State.OPEN, State.DISABLED),
OPEN_TO_FORCED_OPEN(State.OPEN, State.FORCED_OPEN),
FORCED_OPEN_TO_CLOSED(State.FORCED_OPEN, State.CLOSED),
FORCED_OPEN_TO_OPEN(State.FORCED_OPEN, State.OPEN),
FORCED_OPEN_TO_DISABLED(State.FORCED_OPEN, State.DISABLED),
FORCED_OPEN_TO_HALF_OPEN(State.FORCED_OPEN, State.HALF_OPEN),
DISABLED_TO_CLOSED(State.DISABLED, State.CLOSED),
DISABLED_TO_OPEN(State.DISABLED, State.OPEN),
DISABLED_TO_FORCED_OPEN(State.DISABLED, State.FORCED_OPEN),
DISABLED_TO_HALF_OPEN(State.DISABLED, State.HALF_OPEN);
private final State fromState;
private final State toState;
// 將枚舉中的值全放入Map中
private static final Map<Tuple2<State, State>, StateTransition> STATE_TRANSITION_MAP = Arrays.stream(StateTransition.values())
.collect(Collectors.toMap(v -> Tuple.of(v.fromState, v.toState), Function.identity()));
// 提供方法根據(jù)兩個狀態(tài)返回狀態(tài)轉(zhuǎn)換的枚舉
public static StateTransition transitionBetween(State fromState, State toState){
final StateTransition stateTransition = STATE_TRANSITION_MAP.get(Tuple.of(fromState, toState));
if(stateTransition == null) {
throw new IllegalStateException(
String.format("Illegal state transition from %s to %s", fromState.toString(), toState.toString()));
}
return stateTransition;
}
}
進行狀態(tài)轉(zhuǎn)換時先用原子變量的操作更新引用县耽,然后根據(jù)前后狀態(tài)是否相同決定是否發(fā)布事件句喷。
private void stateTransition(State newState, UnaryOperator<CircuitBreakerState> newStateGenerator) {
// 原子操作獲取之前的狀態(tài)并更新狀態(tài)
CircuitBreakerState previousState = stateReference.getAndUpdate(currentState -> {
if (currentState.getState() == newState) {
return currentState;
}
return newStateGenerator.apply(currentState);
});
// 如果兩個狀態(tài)不同則發(fā)布事件
if (previousState.getState() != newState) {
// 通過兩個不同狀態(tài)的枚舉獲取StateTransition
publishStateTransitionEvent(StateTransition.transitionBetween(previousState.getState(), newState));
}
}
CircuitBreakerMetrics
度量標(biāo)的繼承實現(xiàn)關(guān)系如下:
CircuitBreakerMetrics實現(xiàn)了Metrics接口镣典,并持有RingBitSet兔毙,當(dāng)各個狀態(tài)下的熔斷器調(diào)用成功或失敗就調(diào)用Metrics相應(yīng)的方法寫入環(huán)的bit位并計算失敗率。
float onError() {
int currentNumberOfFailedCalls = ringBitSet.setNextBit(true);
return getFailureRate(currentNumberOfFailedCalls);
}
float onSuccess() {
int currentNumberOfFailedCalls = ringBitSet.setNextBit(false);
return getFailureRate(currentNumberOfFailedCalls);
}
void onCallNotPermitted() {
numberOfNotPermittedCalls.increment();
}
RingBitSet
RingBitSet內(nèi)部有一個按位存儲的Ring Bit Bufffer(環(huán)形緩存區(qū))數(shù)據(jù)結(jié)構(gòu)BitSetMod兄春,具體實現(xiàn)不用特別關(guān)心澎剥,只需知道它的大小是由long[]和size決定,邏輯上是如下的環(huán)狀:
成功的調(diào)用會寫0赶舆,失敗的寫1哑姚,當(dāng)環(huán)滿了就計算失敗率。例如芜茵,如果Ring Bit Buffer的大小設(shè)置為10叙量,如果前9次的請求調(diào)用都失敗也不會計算請求調(diào)用失敗率。
以CircuitBreaker從初始狀態(tài)轉(zhuǎn)換到打開狀態(tài)為例九串,熔斷器的轉(zhuǎn)換如下:
CircuitBreaker事件
CircuitBreaker的事件采用觀察者模式绞佩,事件框架如下:
- EventConsumer<T>是事件消費者接口(觀察者),是函數(shù)式接口猪钮,使用event->{......}來創(chuàng)建事件消費函數(shù)品山。
- EventPublisher<T>是事件發(fā)布者接口(被觀察者),只有一個方法onEvent(EventConsumer<T> onEventConsumer)用于設(shè)置事件消費者烤低。
- EventProcessor<T>是EventPublisher<T>的通用實現(xiàn)類肘交,主要完成消費者注冊以及調(diào)用消費者消費事件。
EventProcessor
EventProcessor主要完成消費者的注冊和事件消費扑馁,主要代碼如下:
// 這列表中的消費者是通用事件消費者涯呻,任何類型的事件都會觸發(fā)列表里的消費者消費
List<EventConsumer<T>> onEventConsumers = new CopyOnWriteArrayList<>();
// ConcurrentMap中存儲著特定的消費者凉驻,特定類型事件觸發(fā)時會調(diào)用特定消費者
ConcurrentMap<String, List<EventConsumer<T>>> eventConsumerMap = new ConcurrentHashMap<>();
@Override
// 注冊通用消費者的方法
public synchronized void onEvent(@Nullable EventConsumer<T> onEventConsumer) {
this.consumerRegistered = true;
this.onEventConsumers.add(onEventConsumer);
}
// 注冊特定的消費者
public synchronized void registerConsumer(String className, EventConsumer<? extends T> eventConsumer){
this.consumerRegistered = true;
this.eventConsumerMap.compute(className, (k, consumers) -> {
if(consumers == null){
consumers = new ArrayList<>();
consumers.add((EventConsumer<T>) eventConsumer);
return consumers;
}else{
consumers.add((EventConsumer<T>) eventConsumer);
return consumers;
}
});
}
// 調(diào)用消費者消費事件
public <E extends T> boolean processEvent(E event) {
boolean consumed = false;
// 通用消費者消費事件
if(!onEventConsumers.isEmpty()){
onEventConsumers.forEach(onEventConsumer -> onEventConsumer.consumeEvent(event));
consumed = true;
}
// 特定消費者消費事件
if(!eventConsumerMap.isEmpty()){
List<EventConsumer<T>> eventConsumers = this.eventConsumerMap.get(event.getClass().getSimpleName());
if(eventConsumers != null && !eventConsumers.isEmpty()){
eventConsumers.forEach(consumer -> consumer.consumeEvent(event));
consumed = true;
}
}
return consumed;
}
CircuitBreaker的事件類型
CircuitBreaker共有6種事件類型
- CircuitBreakerOnResetEvent:在熔斷器重置時發(fā)布的事件。
- CircuitBreakerOnSuccessEvent:在調(diào)用成功時發(fā)布的事件复罐。
- CircuitBreakerOnErrorEvent:在調(diào)用失敗時發(fā)布的事件沿侈。
- CircuitBreakerOnIgnoredErrorEvent:在異常被忽略時發(fā)布的事件。
- CircuitBreakerOnCallNotPermittedEvent:在熔斷器熔斷時發(fā)布的事件市栗。
- CircuitBreakerOnStateTranstionEvent:在熔斷器狀態(tài)轉(zhuǎn)換時發(fā)布的事件迹鹅。
CircuitBreakerEvent接口中聲明了與具體事件對應(yīng)的枚舉類Type,用于表示事件類型勋乾,已經(jīng)是應(yīng)該否強制發(fā)布:
enum Type {
/** A CircuitBreakerEvent which informs that an error has been recorded */
ERROR(false),
/** A CircuitBreakerEvent which informs that an error has been ignored */
IGNORED_ERROR(false),
/** A CircuitBreakerEvent which informs that a success has been recorded */
SUCCESS(false),
/** A CircuitBreakerEvent which informs that a call was not permitted because the CircuitBreaker state is OPEN */
NOT_PERMITTED(false),
/** A CircuitBreakerEvent which informs the state of the CircuitBreaker has been changed */
STATE_TRANSITION(true),
/** A CircuitBreakerEvent which informs the CircuitBreaker has been reset */
RESET(true),
/** A CircuitBreakerEvent which informs the CircuitBreaker has been forced open */
FORCED_OPEN(false),
/** A CircuitBreakerEvent which informs the CircuitBreaker has been disabled */
DISABLED(false);
public final boolean forcePublish;
Type(boolean forcePublish) {
this.forcePublish = forcePublish;
}
}
CircuitBreakerEventProcessor
熔斷器事件處理的繼承實現(xiàn)關(guān)系如下:
其中EventPublisher繼承了EventPublisher<T>唧躲,并設(shè)置了幾種事件的消費者:
interface EventPublisher extends io.github.resilience4j.core.EventPublisher<CircuitBreakerEvent> {
EventPublisher onSuccess(EventConsumer<CircuitBreakerOnSuccessEvent> eventConsumer);
EventPublisher onError(EventConsumer<CircuitBreakerOnErrorEvent> eventConsumer);
EventPublisher onStateTransition(EventConsumer<CircuitBreakerOnStateTransitionEvent> eventConsumer);
EventPublisher onReset(EventConsumer<CircuitBreakerOnResetEvent> eventConsumer);
EventPublisher onIgnoredError(EventConsumer<CircuitBreakerOnIgnoredErrorEvent> eventConsumer);
EventPublisher onCallNotPermitted(EventConsumer<CircuitBreakerOnCallNotPermittedEvent> eventConsumer);
}
CircuitBreakerEventProcessor分別實現(xiàn)了事件注冊接口和消費接口并繼承EventProcessor完成整個事件的處理。
private class CircuitBreakerEventProcessor extends EventProcessor<CircuitBreakerEvent> implements EventConsumer<CircuitBreakerEvent>, EventPublisher {
// 完成事件消費者的注冊
@Override
public EventPublisher onSuccess(EventConsumer<CircuitBreakerOnSuccessEvent> onSuccessEventConsumer) {
registerConsumer(CircuitBreakerOnSuccessEvent.class.getSimpleName(), onSuccessEventConsumer);
return this;
}
/*...*/
// 調(diào)用父類方法處理事件
@Override
public void consumeEvent(CircuitBreakerEvent event) {
super.processEvent(event);
}
}
CircuitBreaker熔斷
有了前面的所有工作篡腌,CircuitBreaker的使用就十分簡單了褐荷,只需要使用CircuitBreaker提供的函數(shù)包裝 Callable
, Supplier
, Runnable
, Consumer
, CheckedRunnable
, CheckedSupplier
, CheckedConsumer
或者 CompletionStage
,這些包裝函數(shù)都大同小異嘹悼,看一下其中一個:
static <T> CheckedFunction0<T> decorateCheckedSupplier(CircuitBreaker circuitBreaker, CheckedFunction0<T> supplier){
return () -> {
// 1.先檢查是否可以通過熔斷器
circuitBreaker.acquirePermission();
long start = System.nanoTime();
try {
// 2.正常運行叛甫,拿取返回值
T returnValue = supplier.apply();
// 3.計算運行成功,調(diào)用onSuccess()
long durationInNanos = System.nanoTime() - start;
circuitBreaker.onSuccess(durationInNanos);
return returnValue;
} catch (Exception exception) {
// 如果拋出異常杨伙,調(diào)用onError()其监,onError()會對異常依據(jù)黑白名單進行進行判斷
long durationInNanos = System.nanoTime() - start;
circuitBreaker.onError(durationInNanos, exception);
// 不管是否在黑白名單,異常都會拋出
throw exception;
}
};
}
onSuccess():
@Override
public void onSuccess(long durationInNanos) {
// 發(fā)布成功事件
publishSuccessEvent(durationInNanos);
// 調(diào)用狀態(tài)檢查是否需要狀態(tài)轉(zhuǎn)換
stateReference.get().onSuccess();
}
onError():
@Override
public void onError(long durationInNanos, Throwable throwable) {
// 拿取判斷異常的謂詞函數(shù)
Predicate<Throwable> recordFailurePredicate = circuitBreakerConfig.getRecordFailurePredicate();
// 解包CompletionException異常
if (throwable instanceof CompletionException) {
Throwable cause = throwable.getCause();
handleThrowable(durationInNanos, recordFailurePredicate, cause);
}else{
handleThrowable(durationInNanos, recordFailurePredicate, throwable);
}
}
private void handleThrowable(long durationInNanos, Predicate<Throwable> recordFailurePredicate, Throwable throwable) {
// 用謂詞函數(shù)判斷是否應(yīng)該被忽略
if (recordFailurePredicate.test(throwable)) {
LOG.debug("CircuitBreaker '{}' recorded a failure:", name, throwable);
// 提交失敗事件限匣,調(diào)用狀態(tài)檢查是否需要狀態(tài)轉(zhuǎn)換
publishCircuitErrorEvent(name, durationInNanos, throwable);
stateReference.get().onError(throwable);
} else {
// 允許通過抖苦,提交忽略異常的事件
releasePermission();
publishCircuitIgnoredErrorEvent(name, durationInNanos, throwable);
}
}
CircuitBreaker總結(jié)
綜上,熔斷器的總體工作流程如下:
END