Resilience4j-CircuitBreaker詳解

CircuitBreaker

CircuitBreaker主要由以下幾個部分組成:配置她奥、注冊器怔蚌、熔斷器敷硅、度量指標(biāo)功咒、事件發(fā)布及熔斷器的工作原理愉阎。接下來會逐一介紹。

CircuitBreaker配置

基本配置

首先是CircuitBreaker的一些可配置項力奋,在CircuitBreakerConfig中:

CircuitBreaker

這是一個獨立的類榜旦,里面包含熔斷器的可配置項,提供了一個內(nèi)部類Builder來構(gòu)建配置景殷,主要通過三個方法實現(xiàn):

  • ofDefaults():使用默認配置
  • custom():返回一個Builder對象
  • from(CircuitBreakerConfig baseConfig):返回一個包含baseConfigBuilder對象

得到Builder對象后就可以根據(jù)Builder提供的方法構(gòu)建配置

SpringBoot配置

CircuitBreakerProperties

首先是CircuitBreakerProperties溅呢,整體繼承關(guān)系如下圖:

properties繼承關(guān)系

CircuitBreakerProperties類中添加了@ConfigurationProperties(prefix = "resilience4j.circuitbreaker")注解,是讀取ymlproperties文件中配置的入口滨彻。

CircuitBreakerConfigurationProperties添加了@Configuration注解藕届,并在其中定義了Aspect的順序挪蹭,即之前提到的注解使用熔斷器亭饵、限流器、重試組件和隔板組件的切入順序梁厉。

CircuitBreakerConfigurationProperties

基類的CircuitBreakerConfigurationProperties包含了兩個Map<String, InstanceProperties> instances辜羊、Map<String, InstanceProperties> configs以及一個內(nèi)部靜態(tài)類InstancePropertiesInstanceProperties中除了熔斷器的各項配置外词顾,還有一個baseConfig的可配置項八秃,在ymlproperties文件中的配置最終都會進入instancesconfigs這兩個map中肉盹。例如如下的yml文件:

resilience4j:
  circuitbreaker:
    configs:
      myDefault:
        ringBufferSizeInClosedState: 10 # 熔斷器關(guān)閉時的緩沖區(qū)大小
        ringBufferSizeInHalfOpenState: 5 # 熔斷器半開時的緩沖區(qū)大小
        waitDurationInOpenState: 10000 # 熔斷器從打開到半開需要的時間
        failureRateThreshold: 60 # 熔斷器打開的失敗閾值
        eventConsumerBufferSize: 10 # 事件緩沖區(qū)大小
        registerHealthIndicator: true # 健康監(jiān)測
        automaticTransitionFromOpenToHalfOpenEnabled: false # 是否自動從打開到半開
        recordFailurePredicate:     com.example.resilience4j.predicate.RecordFailurePredicate # 謂詞設(shè)置異常是否為失敗
        recordExceptions: # 記錄的異常
          - com.example.resilience4j.exceptions.BusinessBException
        ignoreExceptions: # 忽略的異常
          - com.example.resilience4j.exceptions.BusinessAException
    instances:
      backendA:
        baseConfig: myDefault
        waitDurationInOpenState: 5000
        failureRateThreshold: 20

myDefault將作為key昔驱,failureRateThreshold等將作為InstanceProperties對象中的屬性一起存入configs這個Map中,instances也是如此上忍。

CircuitBreakerConfigurationProperties提供了一個createCircuitBreakerConfig方法用于創(chuàng)建CircuitBreakerConfig

public CircuitBreakerConfig createCircuitBreakerConfig(InstanceProperties instanceProperties) {
        // 如果配置項中有baseConfig,就去configs中找到baseConfig
        if (StringUtils.isNotEmpty(instanceProperties.getBaseConfig())) {
            InstanceProperties baseProperties = configs.get(instanceProperties.getBaseConfig());
            if (baseProperties == null) {
                throw new ConfigurationNotFoundException(instanceProperties.getBaseConfig());
            }
            //調(diào)用buildConfigFromBaseConfig方法使用instanceProperties覆蓋baseProperties
            return buildConfigFromBaseConfig(instanceProperties, baseProperties);
        }
        //如果沒有調(diào)用buildConfig方法返回CircuitBreakerConfig
        return buildConfig(CircuitBreakerConfig.custom(), instanceProperties);
    }

    private CircuitBreakerConfig buildConfigFromBaseConfig(InstanceProperties instanceProperties, InstanceProperties baseProperties) {
        ConfigUtils.mergePropertiesIfAny(instanceProperties, baseProperties);
        //覆蓋的實現(xiàn)調(diào)用了CircuitBreakerConfig的custom()方法和from()方法
        CircuitBreakerConfig baseConfig = buildConfig(CircuitBreakerConfig.custom(), baseProperties);
        return buildConfig(CircuitBreakerConfig.from(baseConfig), instanceProperties);
    }

CircuitBreakerConfiguration

接下來看看這些配置是如何配置到注冊器中的骤肛,在CircuitBreakerConfiguration中:

CircuitBreakerConfiguration
CircuitBreakerConfiguration

CircuitBreakerConfiguration中的circuitBreakerRegistry方法使用@Bean注解注入了注冊器,方法中主要做了3件事:

  1. 創(chuàng)建注冊器

    public CircuitBreakerRegistry createCircuitBreakerRegistry(CircuitBreakerConfigurationProperties circuitBreakerProperties) {
        //把Map<String, InstanceProperties> 轉(zhuǎn)換為Map<String, CircuitBreakerConfig>
        Map<String, CircuitBreakerConfig> configs = circuitBreakerProperties.getConfigs()
            .entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
                     entry -> circuitBreakerProperties.createCircuitBreakerConfig(entry.getValue())));
     //調(diào)用registry的靜態(tài)of方法新建一個InMemoryCircuitBreakerRegistry,放入configurations,Map作為傳入?yún)?shù)
        return CircuitBreakerRegistry.of(configs);
    }
    
  2. 注冊事件消費者:說到事件的時候再提

  3. 初始化注冊器

public void initCircuitBreakerRegistry(CircuitBreakerRegistry circuitBreakerRegistry) {
    //把circuitBreakerProperties中的instances中的配置先轉(zhuǎn)換,再用注冊器構(gòu)建實例放入entryMap
    circuitBreakerProperties.getInstances().forEach(
        (name, properties) -> circuitBreakerRegistry.circuitBreaker(name,                                                circuitBreakerProperties.createCircuitBreakerConfig(properties)));
}

動態(tài)配置

基于以上結(jié)果窍蓝,若使用配置中心完成配置的動態(tài)刷新腋颠,刷新的內(nèi)容是CircuitBreakerConfigurationProperties中的內(nèi)容,若要使配置生效需要把CircuitBreakerConfigurationProperties中的配置添加到circuitBreakerRegistry中吓笙,并且替換之前的注冊器實例淑玫,如下:

//1.獲取更新的配置
Map<String, CircuitBreakerConfig> configs = circuitBreakerProperties.getConfigs()
    .entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
                                                  entry -> circuitBreakerProperties.createCircuitBreakerConfig(entry.getValue())));
//2.將新配置加入注冊器
configs.forEach(circuitBreakerRegistry::addConfiguration);
//3.替換注冊器中的實例
circuitBreakerProperties.getInstances().forEach(
    (name, properties) -> circuitBreakerRegistry.replace(name, CircuitBreaker.of(
        name, circuitBreakerProperties.createCircuitBreakerConfig(properties))));

CircuitBreaker注冊器

InMemoryCircuitBreakerRegistry是注冊器的實現(xiàn)類,繼承及實現(xiàn)關(guān)系如下圖:

CircuitBreakerRegistry

CircuitBreakerRegistry的創(chuàng)建

承接之前配置讀入的過程我們看一下InMemoryCircuitBreakerRegistry的構(gòu)造方法面睛,因為registry的靜態(tài)of方法其實就是調(diào)用了InMemoryCircuitBreakerRegistry的構(gòu)造方法絮蒿,3個of方法對應(yīng)3個構(gòu)造方法,如下:

//無參構(gòu)造方法,其實就是使用默認配置調(diào)用第三個構(gòu)造方法,這個默認配置是內(nèi)置的,不受spring配置注入的影響
public InMemoryCircuitBreakerRegistry() {
    this(CircuitBreakerConfig.ofDefaults());
}
//批量配置,首先會查看configs中是否有名為"default"的配置,如果有則將該配置放入configurations中,沒有則放入內(nèi)置的默認配置
public InMemoryCircuitBreakerRegistry(Map<String, CircuitBreakerConfig> configs) {
    this(configs.getOrDefault(DEFAULT_CONFIG, CircuitBreakerConfig.ofDefaults()));
    //然后把其他配置放入configurations中
    this.configurations.putAll(configs);
}
//使用指定的配置作為默認配置
public InMemoryCircuitBreakerRegistry(CircuitBreakerConfig defaultConfig) {
    super(defaultConfig);
}

假設(shè)目前我們有3個配置叁鉴,分別為:

  1. 內(nèi)置的默認配置:default
  2. 在配置文件中配置名為default的配置:myDefault
  3. 在配置文件中配置名為configA的配置:configA

如果我們配置了23歌径,從Bean中拿到的注冊器配置屬性如下:

configurations1

如果我們只配置了3,從Bean中拿到的注冊器配置屬性如下:

configurations2

如果我們不從Bean中拿亲茅,而是直接new CircuitBreakerRegistry()回铛,則注冊器配置屬性如下:

configurations3

上面3張圖說明了三個問題:

  1. 通過配置文件注入到Bean的注冊器狗准,默認的配置可以被人為覆蓋
  2. 如果不添加名為default的配置,Bean中的注冊器會把內(nèi)置的默認配置放入注冊器中
  3. 直接新建一個注冊器會取內(nèi)置的配置而不會從配置文件中去拿

從CircuitBreakerRegistry中獲取CircuitBreaker

從注冊器中獲取CircuitBreaker4種方式:

//從entryMap中獲取所有的熔斷器
@Override
public Seq<CircuitBreaker> getAllCircuitBreakers() {
    return Array.ofAll(entryMap.values());
}

//根據(jù)名字獲取熔斷器茵肃,實際是使用默認的配置調(diào)用下面的方法
@Override
public CircuitBreaker circuitBreaker(String name) {
    return circuitBreaker(name, getDefaultConfig());
}

//根據(jù)名稱及配置獲取熔斷器
@Override
public CircuitBreaker circuitBreaker(String name, CircuitBreakerConfig config) {
    return computeIfAbsent(name, () -> CircuitBreaker
                           .of(name, Objects.requireNonNull(config, CONFIG_MUST_NOT_BE_NULL)));
}

//根據(jù)名稱及配置名稱獲取熔斷器
@Override
public CircuitBreaker circuitBreaker(String name, String configName) {
    return computeIfAbsent(name, () -> CircuitBreaker.of(name, getConfiguration(configName)
                          .orElseThrow(() -> new ConfigurationNotFoundException(configName))));
}

這里有個名叫computeIfAbsent的方法腔长,這個方法內(nèi)部簡單來說做了以下的事:如果entryMap中存在名為name的熔斷器,就獲取該熔斷器验残,如果不存在該熔斷器捞附,就使用config這個配置新建一個名為name的熔斷器放入entryMap中,然后返回您没。所以以下三種方法熔斷器的邏輯如下:

CircuitBreaker獲取

還是假設(shè)目前我們有3個配置鸟召,分別為:

  1. 內(nèi)置的默認配置:default
  2. 在配置文件中配置名為default的配置:myDefault
  3. 在配置文件中配置名為configA的配置:configA

還配置了一個熔斷器實例:backendA,并且baseConfigconfigA

如果我們配置了2氨鹏、3欧募,并使用3個方法分別去Bean中的注冊器拿名為backendA的熔斷器,得到的熔斷器如下:

backendA

同樣的情況下使用3個方法分別去Bean中的注冊器拿名為backendB的熔斷器仆抵,得到的熔斷器如下:

backendB

而對于新建的注冊器跟继,分別用3個方法去注冊器中拿名為backend的熔斷器,得到的熔斷器如下:

backend

CircuitBreaker狀態(tài)

CircuitBreaker共有5個狀態(tài)镣丑,通過實現(xiàn)CircuitBreakerState接口實現(xiàn)各狀態(tài)的功能:

CircuitBreakerState

CircuitBreakerState共有7個方法和一個default方法:

// 用于返回是否允許通過熔斷器
boolean tryAcquirePermission();
// 主要用于打開狀態(tài)時熔斷器的觸發(fā)轉(zhuǎn)換和半開狀態(tài)時限制線程數(shù)
void acquirePermission();
// 主要用于半開狀態(tài)時線程數(shù)釋放
void releasePermission();
// 調(diào)用失敗舔糖,計算失敗率
void onError(Throwable throwable);
// 調(diào)用成功,計算失敗率
void onSuccess();
// 返回當(dāng)前狀態(tài)在CircuitBreaker的枚舉值
CircuitBreaker.State getState();
// 返回當(dāng)前封裝了度量指標(biāo)的類實例
CircuitBreakerMetrics getMetrics();
// 決定是否會發(fā)布事件
default boolean shouldPublishEvents(CircuitBreakerEvent event){
    return event.getEventType().forcePublish || getState().allowPublish;
}

關(guān)閉狀態(tài)

關(guān)閉狀態(tài)比較簡單莺匠,所有請求都會通過金吗,在成功或失敗的時候分別調(diào)用度量指標(biāo)的onSuccess()或onError()方法,然后檢查失敗率進行狀態(tài)裝換趣竣。

@Override
public boolean tryAcquirePermission() {
    return true;
}

@Override
public void onError(Throwable throwable) {
    // CircuitBreakerMetrics是線程安全的
    checkFailureRate(circuitBreakerMetrics.onError());
}

@Override
public void onSuccess() {
    // CircuitBreakerMetrics是線程安全的
    checkFailureRate(circuitBreakerMetrics.onSuccess());
}

private void checkFailureRate(float currentFailureRate) {
    if (currentFailureRate >= failureRateThreshold) {
        transitionToOpenState();
    }
}

打開狀態(tài)

打開狀態(tài)的構(gòu)造方法會拿取isAutomaticTransitionFromOpenToHalfOpenEnabled配置摇庙,如果為true,就會調(diào)用scheduledExecutorServiceschedule方法時間達到的時候自動轉(zhuǎn)換期贫,如果為false跟匆,就必須等到別的請求調(diào)用了acquirePermission方法:

OpenState(CircuitBreakerMetrics circuitBreakerMetrics) {
    final Duration waitDurationInOpenState = circuitBreakerConfig.getWaitDurationInOpenState();
    this.retryAfterWaitDuration = clock.instant().plus(waitDurationInOpenState);
    this.circuitBreakerMetrics = circuitBreakerMetrics;
    // 檢查是否自動轉(zhuǎn)換
    if (circuitBreakerConfig.isAutomaticTransitionFromOpenToHalfOpenEnabled()) {
        ScheduledExecutorService scheduledExecutorService = schedulerFactory.getScheduler();
        scheduledExecutorService.schedule(CircuitBreakerStateMachine.this::transitionToHalfOpenState, waitDurationInOpenState.toMillis(), TimeUnit.MILLISECONDS);
    }
}

@Override
public boolean tryAcquirePermission() {
    // 請求進來時檢查時間,如果時間到了通砍,就狀態(tài)轉(zhuǎn)換并且放請求通過
    if (clock.instant().isAfter(retryAfterWaitDuration)) {
        transitionToHalfOpenState();
        return true;
    }
    circuitBreakerMetrics.onCallNotPermitted();
    return false;
}

@Override
public void acquirePermission() {
    if(!tryAcquirePermission()){
        throw new CallNotPermittedException(CircuitBreakerStateMachine.this);
    }
}

// 成功和失敗仍然調(diào)用Metrics的方法增加成功或失敗次數(shù)玛臂,并且計算失敗率
@Override
public void onError(Throwable throwable) {
    circuitBreakerMetrics.onError();
}
@Override
public void onSuccess() {
    circuitBreakerMetrics.onSuccess();
}

半開狀態(tài)

半開狀態(tài)和關(guān)閉狀態(tài)大體相似,但略有不同封孙,請求通過時并不是全都通過迹冤,而是使用原子變量testRequestCounter,來限制請求數(shù)虎忌,testRequestCounter的大小為半開狀態(tài)環(huán)的大小泡徙,若超過允許的請求數(shù)則直接拒絕:

@Override
public boolean tryAcquirePermission() {
    // 查看半開狀態(tài)環(huán)是否裝滿,沒裝滿就進入膜蠢,裝滿就拒絕
    if (testRequestCounter.getAndUpdate(current -> current == 0 ? current : --current) > 0) {
        return true;
    }
    circuitBreakerMetrics.onCallNotPermitted();
    return false;
}

@Override
public void acquirePermission() {
    if(!tryAcquirePermission()){
        throw new CallNotPermittedException(CircuitBreakerStateMachine.this);
    }
}

// 請求完成會釋放拿著的原子變量
@Override
public void releasePermission() {
    testRequestCounter.incrementAndGet();
}

// 根據(jù)失敗率完成兩種狀態(tài)的轉(zhuǎn)換
private void checkFailureRate(float currentFailureRate) {
    if(currentFailureRate != -1){
        if(currentFailureRate >= failureRateThreshold) {
            transitionToOpenState();
        }else{
            transitionToClosedState();
        }
    }
}

ForcedOpenStateDisabledState兩種狀態(tài)比較簡單堪藐,強制開啟就直接熔斷也不計算失敗率莉兰,禁止?fàn)顟B(tài)什么都不干,只能拿到狀態(tài)枚舉和度量指標(biāo)礁竞。

CircuitBreaker狀態(tài)機

有了狀態(tài)還需要狀態(tài)機來控制狀態(tài)的轉(zhuǎn)換糖荒,CircuitBreakerStateMachine實現(xiàn)了CircuitBreaker接口,同時又一個內(nèi)部類CircuitBreakerEventProcessor來處理事件模捂,是熔斷器的核心部分捶朵,繼承實現(xiàn)關(guān)系如下:

CircuitBreakerStateMachine

StateStateTransition是兩個CircuitBreaker接口中的內(nèi)部枚舉

State

定義了幾個狀態(tài)的枚舉類型,allowPublish設(shè)置了每種狀態(tài)是否允許發(fā)布事件

enum State {
    DISABLED(3, false),
    CLOSED(0, true),
    OPEN(1, true),
    FORCED_OPEN(4, false),
    HALF_OPEN(2, true);

    private final int order;
    public final boolean allowPublish;

    State(int order, boolean allowPublish){
        this.order = order;
        this.allowPublish = allowPublish;
    }

    public int getOrder(){
        return order;
    }
}

StateTransition

定義了狀態(tài)轉(zhuǎn)換的枚舉狂男,存在一個元組<State, State>為鍵综看,枚舉值為StateTransitionMap中,同時提供一個狀態(tài)轉(zhuǎn)換的方法岖食,能夠根據(jù)轉(zhuǎn)換的兩個狀態(tài)的枚舉值返回StateTransition红碑,主要便于發(fā)布事件。

enum StateTransition {
    CLOSED_TO_OPEN(State.CLOSED, State.OPEN),
    CLOSED_TO_DISABLED(State.CLOSED, State.DISABLED),
    CLOSED_TO_FORCED_OPEN(State.CLOSED, State.FORCED_OPEN),
    HALF_OPEN_TO_CLOSED(State.HALF_OPEN, State.CLOSED),
    HALF_OPEN_TO_OPEN(State.HALF_OPEN, State.OPEN),
    HALF_OPEN_TO_DISABLED(State.HALF_OPEN, State.DISABLED),
    HALF_OPEN_TO_FORCED_OPEN(State.HALF_OPEN, State.FORCED_OPEN),
    OPEN_TO_CLOSED(State.OPEN, State.CLOSED),
    OPEN_TO_HALF_OPEN(State.OPEN, State.HALF_OPEN),
    OPEN_TO_DISABLED(State.OPEN, State.DISABLED),
    OPEN_TO_FORCED_OPEN(State.OPEN, State.FORCED_OPEN),
    FORCED_OPEN_TO_CLOSED(State.FORCED_OPEN, State.CLOSED),
    FORCED_OPEN_TO_OPEN(State.FORCED_OPEN, State.OPEN),
    FORCED_OPEN_TO_DISABLED(State.FORCED_OPEN, State.DISABLED),
    FORCED_OPEN_TO_HALF_OPEN(State.FORCED_OPEN, State.HALF_OPEN),
    DISABLED_TO_CLOSED(State.DISABLED, State.CLOSED),
    DISABLED_TO_OPEN(State.DISABLED, State.OPEN),
    DISABLED_TO_FORCED_OPEN(State.DISABLED, State.FORCED_OPEN),
    DISABLED_TO_HALF_OPEN(State.DISABLED, State.HALF_OPEN);

    private final State fromState;

    private final State toState;

    // 將枚舉中的值全放入Map中
    private static final Map<Tuple2<State, State>, StateTransition> STATE_TRANSITION_MAP = Arrays.stream(StateTransition.values())
        .collect(Collectors.toMap(v -> Tuple.of(v.fromState, v.toState), Function.identity()));

    // 提供方法根據(jù)兩個狀態(tài)返回狀態(tài)轉(zhuǎn)換的枚舉
    public static StateTransition transitionBetween(State fromState, State toState){
        final StateTransition stateTransition = STATE_TRANSITION_MAP.get(Tuple.of(fromState, toState));
        if(stateTransition == null) {
            throw new IllegalStateException(
                String.format("Illegal state transition from %s to %s", fromState.toString(), toState.toString()));
        }
        return stateTransition;
    }
}

進行狀態(tài)轉(zhuǎn)換時先用原子變量的操作更新引用县耽,然后根據(jù)前后狀態(tài)是否相同決定是否發(fā)布事件句喷。

private void stateTransition(State newState, UnaryOperator<CircuitBreakerState> newStateGenerator) {
    // 原子操作獲取之前的狀態(tài)并更新狀態(tài)
    CircuitBreakerState previousState = stateReference.getAndUpdate(currentState -> {
        if (currentState.getState() == newState) {
            return currentState;
        }
        return newStateGenerator.apply(currentState);
    });
    // 如果兩個狀態(tài)不同則發(fā)布事件
    if (previousState.getState() != newState) {
        // 通過兩個不同狀態(tài)的枚舉獲取StateTransition
        publishStateTransitionEvent(StateTransition.transitionBetween(previousState.getState(), newState));
    }
}

CircuitBreakerMetrics

度量標(biāo)的繼承實現(xiàn)關(guān)系如下:

CircuitBreakerMetrics

CircuitBreakerMetrics實現(xiàn)了Metrics接口镣典,并持有RingBitSet兔毙,當(dāng)各個狀態(tài)下的熔斷器調(diào)用成功或失敗就調(diào)用Metrics相應(yīng)的方法寫入環(huán)的bit位并計算失敗率。

float onError() {
    int currentNumberOfFailedCalls = ringBitSet.setNextBit(true);
    return getFailureRate(currentNumberOfFailedCalls);
}


float onSuccess() {
    int currentNumberOfFailedCalls = ringBitSet.setNextBit(false);
    return getFailureRate(currentNumberOfFailedCalls);
}

void onCallNotPermitted() {
    numberOfNotPermittedCalls.increment();
}

RingBitSet

RingBitSet內(nèi)部有一個按位存儲的Ring Bit Bufffer(環(huán)形緩存區(qū))數(shù)據(jù)結(jié)構(gòu)BitSetMod兄春,具體實現(xiàn)不用特別關(guān)心澎剥,只需知道它的大小是由long[]size決定,邏輯上是如下的環(huán)狀:

RingBuffer

成功的調(diào)用會寫0赶舆,失敗的寫1哑姚,當(dāng)環(huán)滿了就計算失敗率。例如芜茵,如果Ring Bit Buffer的大小設(shè)置為10叙量,如果前9次的請求調(diào)用都失敗也不會計算請求調(diào)用失敗率。

CircuitBreaker從初始狀態(tài)轉(zhuǎn)換到打開狀態(tài)為例九串,熔斷器的轉(zhuǎn)換如下:

CircuitBreakerStateMachine

CircuitBreaker事件

CircuitBreaker的事件采用觀察者模式绞佩,事件框架如下:

eventProcessor
  • EventConsumer<T>是事件消費者接口(觀察者),是函數(shù)式接口猪钮,使用event->{......}來創(chuàng)建事件消費函數(shù)品山。
  • EventPublisher<T>是事件發(fā)布者接口(被觀察者),只有一個方法onEvent(EventConsumer<T> onEventConsumer)用于設(shè)置事件消費者烤低。
  • EventProcessor<T>EventPublisher<T>的通用實現(xiàn)類肘交,主要完成消費者注冊以及調(diào)用消費者消費事件。

EventProcessor

EventProcessor主要完成消費者的注冊和事件消費扑馁,主要代碼如下:

// 這列表中的消費者是通用事件消費者涯呻,任何類型的事件都會觸發(fā)列表里的消費者消費
List<EventConsumer<T>> onEventConsumers = new CopyOnWriteArrayList<>();
// ConcurrentMap中存儲著特定的消費者凉驻,特定類型事件觸發(fā)時會調(diào)用特定消費者
ConcurrentMap<String, List<EventConsumer<T>>> eventConsumerMap = new ConcurrentHashMap<>();

@Override
// 注冊通用消費者的方法
public synchronized void onEvent(@Nullable EventConsumer<T> onEventConsumer) {
    this.consumerRegistered = true;
    this.onEventConsumers.add(onEventConsumer);
}

// 注冊特定的消費者
public synchronized void registerConsumer(String className, EventConsumer<? extends T> eventConsumer){
    this.consumerRegistered = true;
    this.eventConsumerMap.compute(className, (k, consumers) -> {
        if(consumers == null){
            consumers = new ArrayList<>();
            consumers.add((EventConsumer<T>) eventConsumer);
            return consumers;
        }else{
            consumers.add((EventConsumer<T>) eventConsumer);
            return consumers;
        }
    });
}

// 調(diào)用消費者消費事件
public <E extends T> boolean processEvent(E event) {
    boolean consumed = false;
    // 通用消費者消費事件
    if(!onEventConsumers.isEmpty()){
        onEventConsumers.forEach(onEventConsumer -> onEventConsumer.consumeEvent(event));
        consumed = true;
    }
    // 特定消費者消費事件
    if(!eventConsumerMap.isEmpty()){
        List<EventConsumer<T>> eventConsumers = this.eventConsumerMap.get(event.getClass().getSimpleName());
        if(eventConsumers != null && !eventConsumers.isEmpty()){
            eventConsumers.forEach(consumer -> consumer.consumeEvent(event));
            consumed = true;
        }
    }
    return consumed;
}

CircuitBreaker的事件類型

CircuitBreaker共有6種事件類型

Event
  • CircuitBreakerOnResetEvent:在熔斷器重置時發(fā)布的事件。
  • CircuitBreakerOnSuccessEvent:在調(diào)用成功時發(fā)布的事件复罐。
  • CircuitBreakerOnErrorEvent:在調(diào)用失敗時發(fā)布的事件沿侈。
  • CircuitBreakerOnIgnoredErrorEvent:在異常被忽略時發(fā)布的事件。
  • CircuitBreakerOnCallNotPermittedEvent:在熔斷器熔斷時發(fā)布的事件市栗。
  • CircuitBreakerOnStateTranstionEvent:在熔斷器狀態(tài)轉(zhuǎn)換時發(fā)布的事件迹鹅。

CircuitBreakerEvent接口中聲明了與具體事件對應(yīng)的枚舉類Type,用于表示事件類型勋乾,已經(jīng)是應(yīng)該否強制發(fā)布:

enum Type {
    /** A CircuitBreakerEvent which informs that an error has been recorded */
    ERROR(false),
    /** A CircuitBreakerEvent which informs that an error has been ignored */
    IGNORED_ERROR(false),
    /** A CircuitBreakerEvent which informs that a success has been recorded */
    SUCCESS(false),
    /** A CircuitBreakerEvent which informs that a call was not permitted because the CircuitBreaker state is OPEN */
    NOT_PERMITTED(false),
    /** A CircuitBreakerEvent which informs the state of the CircuitBreaker has been changed */
    STATE_TRANSITION(true),
    /** A CircuitBreakerEvent which informs the CircuitBreaker has been reset */
    RESET(true),
    /** A CircuitBreakerEvent which informs the CircuitBreaker has been forced open */
    FORCED_OPEN(false),
    /** A CircuitBreakerEvent which informs the CircuitBreaker has been disabled */
    DISABLED(false);

    public final boolean forcePublish;

    Type(boolean forcePublish) {
        this.forcePublish = forcePublish;
    }
}

CircuitBreakerEventProcessor

熔斷器事件處理的繼承實現(xiàn)關(guān)系如下:

CircuitBreakerEventProcessor

其中EventPublisher繼承了EventPublisher<T>唧躲,并設(shè)置了幾種事件的消費者:

interface EventPublisher extends io.github.resilience4j.core.EventPublisher<CircuitBreakerEvent> {

    EventPublisher onSuccess(EventConsumer<CircuitBreakerOnSuccessEvent> eventConsumer);

    EventPublisher onError(EventConsumer<CircuitBreakerOnErrorEvent> eventConsumer);

    EventPublisher onStateTransition(EventConsumer<CircuitBreakerOnStateTransitionEvent> eventConsumer);

    EventPublisher onReset(EventConsumer<CircuitBreakerOnResetEvent> eventConsumer);

    EventPublisher onIgnoredError(EventConsumer<CircuitBreakerOnIgnoredErrorEvent> eventConsumer);

    EventPublisher onCallNotPermitted(EventConsumer<CircuitBreakerOnCallNotPermittedEvent> eventConsumer);
}

CircuitBreakerEventProcessor分別實現(xiàn)了事件注冊接口和消費接口并繼承EventProcessor完成整個事件的處理。

private class CircuitBreakerEventProcessor extends EventProcessor<CircuitBreakerEvent> implements EventConsumer<CircuitBreakerEvent>, EventPublisher {
    // 完成事件消費者的注冊
    @Override
    public EventPublisher onSuccess(EventConsumer<CircuitBreakerOnSuccessEvent> onSuccessEventConsumer) {
        registerConsumer(CircuitBreakerOnSuccessEvent.class.getSimpleName(), onSuccessEventConsumer);
        return this;
    }
    /*...*/
    // 調(diào)用父類方法處理事件
    @Override
    public void consumeEvent(CircuitBreakerEvent event) {
        super.processEvent(event);
    }
}

CircuitBreaker熔斷

有了前面的所有工作篡腌,CircuitBreaker的使用就十分簡單了褐荷,只需要使用CircuitBreaker提供的函數(shù)包裝 Callable, Supplier, Runnable, Consumer, CheckedRunnable, CheckedSupplier, CheckedConsumer 或者 CompletionStage ,這些包裝函數(shù)都大同小異嘹悼,看一下其中一個:

static <T> CheckedFunction0<T> decorateCheckedSupplier(CircuitBreaker circuitBreaker, CheckedFunction0<T> supplier){
    return () -> {
        // 1.先檢查是否可以通過熔斷器
        circuitBreaker.acquirePermission();
        long start = System.nanoTime();
        try {
            // 2.正常運行叛甫,拿取返回值
            T returnValue = supplier.apply();
            // 3.計算運行成功,調(diào)用onSuccess()
            long durationInNanos = System.nanoTime() - start;
            circuitBreaker.onSuccess(durationInNanos);
            return returnValue;
        } catch (Exception exception) {
            // 如果拋出異常杨伙,調(diào)用onError()其监,onError()會對異常依據(jù)黑白名單進行進行判斷
            long durationInNanos = System.nanoTime() - start;
            circuitBreaker.onError(durationInNanos, exception);
            // 不管是否在黑白名單,異常都會拋出
            throw exception;
        }
    };
}

onSuccess()

@Override
public void onSuccess(long durationInNanos) {
    // 發(fā)布成功事件
    publishSuccessEvent(durationInNanos);
    // 調(diào)用狀態(tài)檢查是否需要狀態(tài)轉(zhuǎn)換
    stateReference.get().onSuccess();
}

onError()

@Override
public void onError(long durationInNanos, Throwable throwable) {
    // 拿取判斷異常的謂詞函數(shù)
    Predicate<Throwable> recordFailurePredicate = circuitBreakerConfig.getRecordFailurePredicate();
    // 解包CompletionException異常
    if (throwable instanceof CompletionException) {
        Throwable cause = throwable.getCause();
        handleThrowable(durationInNanos, recordFailurePredicate, cause);
    }else{
        handleThrowable(durationInNanos, recordFailurePredicate, throwable);
    }
}

private void handleThrowable(long durationInNanos, Predicate<Throwable> recordFailurePredicate, Throwable throwable) {
    // 用謂詞函數(shù)判斷是否應(yīng)該被忽略
    if (recordFailurePredicate.test(throwable)) {
        LOG.debug("CircuitBreaker '{}' recorded a failure:", name, throwable);
        // 提交失敗事件限匣,調(diào)用狀態(tài)檢查是否需要狀態(tài)轉(zhuǎn)換
        publishCircuitErrorEvent(name, durationInNanos, throwable);
        stateReference.get().onError(throwable);
    } else {
        // 允許通過抖苦,提交忽略異常的事件
        releasePermission();
        publishCircuitIgnoredErrorEvent(name, durationInNanos, throwable);
    }
}

CircuitBreaker總結(jié)

綜上,熔斷器的總體工作流程如下:

CircuitBreaker

END

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末米死,一起剝皮案震驚了整個濱河市锌历,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌峦筒,老刑警劉巖究西,帶你破解...
    沈念sama閱讀 221,695評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異物喷,居然都是意外死亡卤材,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,569評論 3 399
  • 文/潘曉璐 我一進店門脯丝,熙熙樓的掌柜王于貴愁眉苦臉地迎上來商膊,“玉大人,你說我怎么就攤上這事宠进≡尾穑” “怎么了?”我有些...
    開封第一講書人閱讀 168,130評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長实幕。 經(jīng)常有香客問我吝镣,道長,這世上最難降的妖魔是什么昆庇? 我笑而不...
    開封第一講書人閱讀 59,648評論 1 297
  • 正文 為了忘掉前任末贾,我火速辦了婚禮,結(jié)果婚禮上整吆,老公的妹妹穿的比我還像新娘拱撵。我一直安慰自己,他們只是感情好表蝙,可當(dāng)我...
    茶點故事閱讀 68,655評論 6 397
  • 文/花漫 我一把揭開白布拴测。 她就那樣靜靜地躺著,像睡著了一般府蛇。 火紅的嫁衣襯著肌膚如雪集索。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,268評論 1 309
  • 那天汇跨,我揣著相機與錄音务荆,去河邊找鬼。 笑死穷遂,一個胖子當(dāng)著我的面吹牛函匕,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播塞颁,決...
    沈念sama閱讀 40,835評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼浦箱,長吁一口氣:“原來是場噩夢啊……” “哼吸耿!你這毒婦竟也來了祠锣?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,740評論 0 276
  • 序言:老撾萬榮一對情侶失蹤咽安,失蹤者是張志新(化名)和其女友劉穎伴网,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體妆棒,經(jīng)...
    沈念sama閱讀 46,286評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡澡腾,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,375評論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了糕珊。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片动分。...
    茶點故事閱讀 40,505評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖红选,靈堂內(nèi)的尸體忽然破棺而出澜公,到底是詐尸還是另有隱情,我是刑警寧澤喇肋,帶...
    沈念sama閱讀 36,185評論 5 350
  • 正文 年R本政府宣布坟乾,位于F島的核電站迹辐,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏甚侣。R本人自食惡果不足惜明吩,卻給世界環(huán)境...
    茶點故事閱讀 41,873評論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望殷费。 院中可真熱鬧印荔,春花似錦、人聲如沸详羡。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,357評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽殷绍。三九已至染苛,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間主到,已是汗流浹背茶行。 一陣腳步聲響...
    開封第一講書人閱讀 33,466評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留登钥,地道東北人畔师。 一個月前我還...
    沈念sama閱讀 48,921評論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像牧牢,于是被迫代替她去往敵國和親看锉。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,515評論 2 359

推薦閱讀更多精彩內(nèi)容