Resilience4j-輕量級(jí)熔斷框架

Resilience4j

簡介

Resilience4j是一款輕量級(jí)佃扼,易于使用的容錯(cuò)庫拆融,其靈感來自于Netflix Hystrix,但是專為Java 8和函數(shù)式編程而設(shè)計(jì)。輕量級(jí),因?yàn)閹熘皇褂昧?strong>Vavr敞贡,它沒有任何其他外部依賴下。相比之下,Netflix HystrixArchaius具有編譯依賴性嗤堰,Archaius具有更多的外部庫依賴性,例如GuavaApache Commons Configuration

要使用Resilience4j踢匣,不需要引入所有依賴告匠,只需要選擇你需要的。

Resilience4j提供了以下的核心模塊和拓展模塊:

核心模塊:

  • resilience4j-circuitbreaker: Circuit breaking
  • resilience4j-ratelimiter: Rate limiting
  • resilience4j-bulkhead: Bulkheading
  • resilience4j-retry: Automatic retrying (sync and async)
  • resilience4j-cache: Result caching
  • resilience4j-timelimiter: Timeout handling

Circuitbreaker

簡介

CircuitBreaker通過具有三種正常狀態(tài)的有限狀態(tài)機(jī)實(shí)現(xiàn):CLOSED离唬,OPENHALF_OPEN以及兩個(gè)特殊狀態(tài)DISABLEDFORCED_OPEN后专。當(dāng)熔斷器關(guān)閉時(shí),所有的請求都會(huì)通過熔斷器输莺。如果失敗率超過設(shè)定的閾值戚哎,熔斷器就會(huì)從關(guān)閉狀態(tài)轉(zhuǎn)換到打開狀態(tài),這時(shí)所有的請求都會(huì)被拒絕嫂用。當(dāng)經(jīng)過一段時(shí)間后型凳,熔斷器會(huì)從打開狀態(tài)轉(zhuǎn)換到半開狀態(tài),這時(shí)僅有一定數(shù)量的請求會(huì)被放入嘱函,并重新計(jì)算失敗率甘畅,如果失敗率超過閾值,則變?yōu)榇蜷_狀態(tài)往弓,如果失敗率低于閾值疏唾,則變?yōu)殛P(guān)閉狀態(tài)。

Circuitbreaker狀態(tài)機(jī)

Resilience4j記錄請求狀態(tài)的數(shù)據(jù)結(jié)構(gòu)和Hystrix不同函似,Hystrix是使用滑動(dòng)窗口來進(jìn)行存儲(chǔ)的槐脏,而Resilience4j采用的是Ring Bit Buffer(環(huán)形緩沖區(qū))。Ring Bit Buffer在內(nèi)部使用BitSet這樣的數(shù)據(jù)結(jié)構(gòu)來進(jìn)行存儲(chǔ)撇寞,BitSet的結(jié)構(gòu)如下圖所示:

環(huán)形緩沖區(qū)

每一次請求的成功或失敗狀態(tài)只占用一個(gè)bit位顿天,與boolean數(shù)組相比更節(jié)省內(nèi)存。BitSet使用long[]數(shù)組來存儲(chǔ)這些數(shù)據(jù)重抖,意味著16個(gè)值(64bit)的數(shù)組可以存儲(chǔ)1024個(gè)調(diào)用狀態(tài)露氮。

計(jì)算失敗率需要填滿環(huán)形緩沖區(qū)。例如钟沛,如果環(huán)形緩沖區(qū)的大小為10畔规,則必須至少請求滿10次,才會(huì)進(jìn)行故障率的計(jì)算恨统,如果僅僅請求了9次叁扫,即使9個(gè)請求都失敗,熔斷器也不會(huì)打開畜埋。但是CLOSE狀態(tài)下的緩沖區(qū)大小設(shè)置為10并不意味著只會(huì)進(jìn)入10個(gè) 請求莫绣,在熔斷器打開之前的所有請求都會(huì)被放入。

當(dāng)故障率高于設(shè)定的閾值時(shí)悠鞍,熔斷器狀態(tài)會(huì)從由CLOSE變?yōu)?strong>OPEN对室。這時(shí)所有的請求都會(huì)拋出CallNotPermittedException異常。當(dāng)經(jīng)過一段時(shí)間后,熔斷器的狀態(tài)會(huì)從OPEN變?yōu)?strong>HALF_OPEN掩宜,HALF_OPEN狀態(tài)下同樣會(huì)有一個(gè)Ring Bit Buffer蔫骂,用來計(jì)算HALF_OPEN狀態(tài)下的故障率,如果高于配置的閾值牺汤,會(huì)轉(zhuǎn)換為OPEN辽旋,低于閾值則裝換為CLOSE。與CLOSE狀態(tài)下的緩沖區(qū)不同的地方在于檐迟,HALF_OPEN狀態(tài)下的緩沖區(qū)大小會(huì)限制請求數(shù)补胚,只有緩沖區(qū)大小的請求數(shù)會(huì)被放入。

除此以外追迟,熔斷器還會(huì)有兩種特殊狀態(tài):DISABLED(始終允許訪問)和FORCED_OPEN(始終拒絕訪問)溶其。這兩個(gè)狀態(tài)不會(huì)生成熔斷器事件(除狀態(tài)裝換外),并且不會(huì)記錄事件的成功或者失敗敦间。退出這兩個(gè)狀態(tài)的唯一方法是觸發(fā)狀態(tài)轉(zhuǎn)換或者重置熔斷器握联。

熔斷器關(guān)于線程安全的保證措施有以下幾個(gè)部分:

  • 熔斷器的狀態(tài)使用AtomicReference保存的
  • 更新熔斷器狀態(tài)是通過無狀態(tài)的函數(shù)或者原子操作進(jìn)行的
  • 更新事件的狀態(tài)用synchronized關(guān)鍵字保護(hù)

意味著同一時(shí)間只有一個(gè)線程能夠修改熔斷器狀態(tài)或者記錄事件的狀態(tài)。

可配置參數(shù)

配置參數(shù) 默認(rèn)值 描述
failureRateThreshold 50 熔斷器關(guān)閉狀態(tài)和半開狀態(tài)使用的同一個(gè)失敗率閾值
ringBufferSizeInHalfOpenState 10 熔斷器半開狀態(tài)的緩沖區(qū)大小每瞒,會(huì)限制線程的并發(fā)量,例如緩沖區(qū)為10則每次只會(huì)允許10個(gè)請求調(diào)用后端服務(wù)
ringBufferSizeInClosedState 100 熔斷器關(guān)閉狀態(tài)的緩沖區(qū)大小纯露,不會(huì)限制線程的并發(fā)量剿骨,在熔斷器發(fā)生狀態(tài)轉(zhuǎn)換前所有請求都會(huì)調(diào)用后端服務(wù)
waitDurationInOpenState 60(s) 熔斷器從打開狀態(tài)轉(zhuǎn)變?yōu)榘腴_狀態(tài)等待的時(shí)間
automaticTransitionFromOpenToHalfOpenEnabled false 如果置為true,當(dāng)?shù)却龝r(shí)間結(jié)束會(huì)自動(dòng)由打開變?yōu)榘腴_埠褪,若置為false浓利,則需要一個(gè)請求進(jìn)入來觸發(fā)熔斷器狀態(tài)轉(zhuǎn)換
recordExceptions empty 需要記錄為失敗的異常列表
ignoreExceptions empty 需要忽略的異常列表
recordFailure throwable -> true 自定義的謂詞邏輯用于判斷異常是否需要記錄或者需要忽略,默認(rèn)所有異常都進(jìn)行記錄

測試前準(zhǔn)備

pom.xml

測試使用的IDEidea钞速,使用的springboot進(jìn)行學(xué)習(xí)測試贷掖,首先引入maven依賴:

<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-spring-boot</artifactId>
    <version>0.9.0</version>
</dependency>

resilience4j-spring-boot集成了circuitbeakerretry渴语、bulkhead苹威、ratelimiter幾個(gè)模塊,因?yàn)楹罄m(xù)還要學(xué)習(xí)其他模塊驾凶,就直接引入resilience4j-spring-boot依賴牙甫。

application.yml配置

resilience4j:
  circuitbreaker:
    configs:
      default:
        ringBufferSizeInClosedState: 5 # 熔斷器關(guān)閉時(shí)的緩沖區(qū)大小
        ringBufferSizeInHalfOpenState: 2 # 熔斷器半開時(shí)的緩沖區(qū)大小
        waitDurationInOpenState: 10000 # 熔斷器從打開到半開需要的時(shí)間
        failureRateThreshold: 60 # 熔斷器打開的失敗閾值
        eventConsumerBufferSize: 10 # 事件緩沖區(qū)大小
        registerHealthIndicator: true # 健康監(jiān)測
        automaticTransitionFromOpenToHalfOpenEnabled: false # 是否自動(dòng)從打開到半開,不需要觸發(fā)
        recordFailurePredicate:    com.example.resilience4j.exceptions.RecordFailurePredicate # 謂詞設(shè)置異常是否為失敗
        recordExceptions: # 記錄的異常
          - com.example.resilience4j.exceptions.BusinessBException
          - com.example.resilience4j.exceptions.BusinessAException
        ignoreExceptions: # 忽略的異常
          - com.example.resilience4j.exceptions.BusinessAException
    instances:
      backendA:
        baseConfig: default
        waitDurationInOpenState: 5000
        failureRateThreshold: 20
      backendB:
        baseConfig: default

可以配置多個(gè)熔斷器實(shí)例调违,使用不同配置或者覆蓋配置窟哺。

需要保護(hù)的后端服務(wù)

以一個(gè)查找用戶列表的后端服務(wù)為例,利用熔斷器保護(hù)該服務(wù)技肩。

interface RemoteService {
    List<User> process() throws TimeoutException, InterruptedException;
}

連接器調(diào)用該服務(wù)

這是調(diào)用遠(yuǎn)端服務(wù)的連接器且轨,我們通過調(diào)用連接器中的方法來調(diào)用后端服務(wù)。

public RemoteServiceConnector{
    public List<User> process() throws TimeoutException, InterruptedException {
        List<User> users;
        users = remoteServic.process();
        return users;
    }
}

用于監(jiān)控熔斷器狀態(tài)及事件的工具類

要想學(xué)習(xí)各個(gè)配置項(xiàng)的作用,需要獲取特定時(shí)候的熔斷器狀態(tài)旋奢,寫一個(gè)工具類:

@Log4j2
public class CircuitBreakerUtil {

    /**
     * @Description: 獲取熔斷器的狀態(tài)
     */
    public static void getCircuitBreakerStatus(String time, CircuitBreaker circuitBreaker){
        CircuitBreaker.Metrics metrics = circuitBreaker.getMetrics();
        // Returns the failure rate in percentage.
        float failureRate = metrics.getFailureRate();
        // Returns the current number of buffered calls.
        int bufferedCalls = metrics.getNumberOfBufferedCalls();
        // Returns the current number of failed calls.
        int failedCalls = metrics.getNumberOfFailedCalls();
        // Returns the current number of successed calls.
        int successCalls = metrics.getNumberOfSuccessfulCalls();
        // Returns the max number of buffered calls.
        int maxBufferCalls = metrics.getMaxNumberOfBufferedCalls();
        // Returns the current number of not permitted calls.
        long notPermittedCalls = metrics.getNumberOfNotPermittedCalls();

        log.info(time + "state=" +circuitBreaker.getState() + " , metrics[ failureRate=" + failureRate +
                ", bufferedCalls=" + bufferedCalls +
                ", failedCalls=" + failedCalls +
                ", successCalls=" + successCalls +
                ", maxBufferCalls=" + maxBufferCalls +
                ", notPermittedCalls=" + notPermittedCalls +
                " ]"
        );
    }

    /**
     * @Description: 監(jiān)聽熔斷器事件
     */
    public static void addCircuitBreakerListener(CircuitBreaker circuitBreaker){
        circuitBreaker.getEventPublisher()
                .onSuccess(event -> log.info("服務(wù)調(diào)用成功:" + event.toString()))
                .onError(event -> log.info("服務(wù)調(diào)用失斢净印:" + event.toString()))
                .onIgnoredError(event -> log.info("服務(wù)調(diào)用失敗,但異常被忽略:" + event.toString()))
                .onReset(event -> log.info("熔斷器重置:" + event.toString()))
                .onStateTransition(event -> log.info("熔斷器狀態(tài)改變:" + event.toString()))
                .onCallNotPermitted(event -> log.info(" 熔斷器已經(jīng)打開:" + event.toString()))
        ;
    }

調(diào)用方法

CircuitBreaker目前支持兩種方式調(diào)用黄绩,一種是程序式調(diào)用羡洁,一種是AOP使用注解的方式調(diào)用。

程序式的調(diào)用方法

CircuitService中先注入注冊器爽丹,然后用注冊器通過熔斷器名稱獲取熔斷器筑煮。如果不需要使用降級(jí)函數(shù),可以直接調(diào)用熔斷器的executeSupplier方法或executeCheckedSupplier方法:

public class CircuitBreakerServiceImpl{
    
    @Autowired
    private CircuitBreakerRegistry circuitBreakerRegistry;

    public List<User> circuitBreakerNotAOP() throws Throwable {
        CircuitBreaker circuitBreaker = circuitBreakerRegistry.circuitBreaker("backendA");
        CircuitBreakerUtil.getCircuitBreakerStatus("執(zhí)行開始前:", circuitBreaker);
        circuitBreaker.executeCheckedSupplier(remotServiceConnector::process);
    }
}

如果需要使用降級(jí)函數(shù)粤蝎,則要使用decorate包裝服務(wù)的方法真仲,再使用Try.of().recover()進(jìn)行降級(jí)處理,同時(shí)也可以根據(jù)不同的異常使用不同的降級(jí)方法:

public class CircuitBreakerServiceImpl {
    
    @Autowired
    private RemoteServiceConnector remoteServiceConnector;
    
    @Autowired
    private CircuitBreakerRegistry circuitBreakerRegistry;

    public List<User> circuitBreakerNotAOP(){
        // 通過注冊器獲取熔斷器的實(shí)例
        CircuitBreaker circuitBreaker = circuitBreakerRegistry.circuitBreaker("backendA");
        CircuitBreakerUtil.getCircuitBreakerStatus("執(zhí)行開始前:", circuitBreaker);
        // 使用熔斷器包裝連接器的方法
        CheckedFunction0<List<User>> checkedSupplier = CircuitBreaker.
            decorateCheckedSupplier(circuitBreaker, remoteServiceConnector::process);
        // 使用Try.of().recover()調(diào)用并進(jìn)行降級(jí)處理
        Try<List<User>> result = Try.of(checkedSupplier).
                    recover(CallNotPermittedException.class, throwable -> {
                        log.info("熔斷器已經(jīng)打開初澎,拒絕訪問被保護(hù)方法~");
                        CircuitBreakerUtil
                        .getCircuitBreakerStatus("熔斷器打開中:", circuitBreaker);
                        List<User> users = new ArrayList();
                        return users;
                    })
                    .recover(throwable -> {
                        log.info(throwable.getLocalizedMessage() + ",方法被降級(jí)了~~");
                        CircuitBreakerUtil
                        .getCircuitBreakerStatus("降級(jí)方法中:",circuitBreaker);
                        List<User> users = new ArrayList();
                        return users;
                    });
            CircuitBreakerUtil.getCircuitBreakerStatus("執(zhí)行結(jié)束后:", circuitBreaker);
            return result.get();
    }
}

AOP式的調(diào)用方法

首先在連接器方法上使用@CircuitBreaker(name="",fallbackMethod="")注解秸应,其中name是要使用的熔斷器的名稱,fallbackMethod是要使用的降級(jí)方法碑宴,降級(jí)方法必須和原方法放在同一個(gè)類中软啼,且降級(jí)方法的返回值需要和原方法相同,輸入?yún)?shù)需要添加額外的exception參數(shù)延柠,類似這樣:

public RemoteServiceConnector{
    
    @CircuitBreaker(name = "backendA", fallbackMethod = "fallBack")
    public List<User> process() throws TimeoutException, InterruptedException {
        List<User> users;
        users = remoteServic.process();
        return users;
    }
    
    private List<User> fallBack(Throwable throwable){
        log.info(throwable.getLocalizedMessage() + ",方法被降級(jí)了~~");
        CircuitBreakerUtil.getCircuitBreakerStatus("降級(jí)方法中:", circuitBreakerRegistry.circuitBreaker("backendA"));
        List<User> users = new ArrayList();
        return users;
    }
    
    private List<User> fallBack(CallNotPermittedException e){
        log.info("熔斷器已經(jīng)打開祸挪,拒絕訪問被保護(hù)方法~");
        CircuitBreakerUtil.getCircuitBreakerStatus("熔斷器打開中:", circuitBreakerRegistry.circuitBreaker("backendA"));
        List<User> users = new ArrayList();
        return users;
    }
    
} 

可使用多個(gè)降級(jí)方法,保持方法名相同贞间,同時(shí)滿足的條件的降級(jí)方法會(huì)觸發(fā)最接近的一個(gè)(這里的接近是指類型的接近贿条,先會(huì)觸發(fā)離它最近的子類異常),例如如果process()方法拋出CallNotPermittedException增热,將會(huì)觸發(fā)fallBack(CallNotPermittedException e)方法而不會(huì)觸發(fā)fallBack(Throwable throwable)方法整以。

之后直接調(diào)用方法就可以了:

public class CircuitBreakerServiceImpl {
    
    @Autowired
    private RemoteServiceConnector remoteServiceConnector;
    
    @Autowired
    private CircuitBreakerRegistry circuitBreakerRegistry;
    
    public List<User> circuitBreakerAOP() throws TimeoutException, InterruptedException {
        CircuitBreakerUtil
            .getCircuitBreakerStatus("執(zhí)行開始前:",circuitBreakerRegistry.circuitBreaker("backendA"));
        List<User> result = remoteServiceConnector.process();
        CircuitBreakerUtil
            .getCircuitBreakerStatus("執(zhí)行結(jié)束后:", circuitBreakerRegistry.circuitBreaker("backendA"));
        return result;
    }
}

使用測試

接下來進(jìn)入測試,首先我們定義了兩個(gè)異常峻仇,異常A同時(shí)在黑白名單中公黑,異常B只在黑名單中:

recordExceptions: # 記錄的異常
    - com.example.resilience4j.exceptions.BusinessBException
    - com.example.resilience4j.exceptions.BusinessAException
ignoreExceptions: # 忽略的異常
    - com.example.resilience4j.exceptions.BusinessAException

然后對被保護(hù)的后端接口進(jìn)行如下的實(shí)現(xiàn):

public class RemoteServiceImpl implements RemoteService {
    
    private static AtomicInteger count = new AtomicInteger(0);

    public List<User> process() {
        int num = count.getAndIncrement();
        log.info("count的值 = " + num);
        if (num % 4 == 1){
            throw new BusinessAException("異常A,不需要被記錄");
        }
        if (num % 4 == 2 || num % 4 == 3){
            throw new BusinessBException("異常B础浮,需要被記錄");
        }
        log.info("服務(wù)正常運(yùn)行帆调,獲取用戶列表");
        // 模擬數(shù)據(jù)庫的正常查詢
        return repository.findAll();
    }
}

使用CircuitBreakerServiceImpl中的AOP或者程序式調(diào)用方法進(jìn)行單元測試,循環(huán)調(diào)用10次:

public class CircuitBreakerServiceImplTest{
    
    @Autowired
    private CircuitBreakerServiceImpl circuitService;
    
    @Test
    public void circuitBreakerTest() {
        for (int i=0; i<10; i++){
            // circuitService.circuitBreakerAOP();
            circuitService.circuitBreakerNotAOP();
        }
    }
}

看下運(yùn)行結(jié)果:

執(zhí)行開始前:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=0, failedCalls=0, successCalls=0, maxBufferCalls=5, notPermittedCalls=0 ]
count的值 = 0
服務(wù)正常運(yùn)行豆同,獲取用戶列表
執(zhí)行結(jié)束后:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=1, failedCalls=0, successCalls=1, 
執(zhí)行開始前:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=1, failedCalls=0, successCalls=1, maxBufferCalls=5, notPermittedCalls=0 ]
count的值 = 1
異常A番刊,不需要被記錄,方法被降級(jí)了~~
降級(jí)方法中:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=1, failedCalls=0, successCalls=1, maxBufferCalls=5, notPermittedCalls=0 ]
執(zhí)行結(jié)束后:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=1, failedCalls=0, successCalls=1, maxBufferCalls=5, notPermittedCalls=0 ]
執(zhí)行開始前:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=1, failedCalls=0, successCalls=1, maxBufferCalls=5, notPermittedCalls=0 ]
count的值 = 2
異常B,需要被記錄,方法被降級(jí)了~~
降級(jí)方法中:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=2, failedCalls=1, successCalls=1, maxBufferCalls=5, notPermittedCalls=0 ]
執(zhí)行結(jié)束后:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=2, failedCalls=1, successCalls=1, maxBufferCalls=5, notPermittedCalls=0 ]
執(zhí)行開始前:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=2, failedCalls=1, successCalls=1, maxBufferCalls=5, notPermittedCalls=0 ]
count的值 = 3
異常B影锈,需要被記錄,方法被降級(jí)了~~
降級(jí)方法中:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=3, failedCalls=2, successCalls=1, maxBufferCalls=5, notPermittedCalls=0 ]
執(zhí)行結(jié)束后:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=3, failedCalls=2, successCalls=1, maxBufferCalls=5, notPermittedCalls=0 ]
執(zhí)行開始前:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=3, failedCalls=2, successCalls=1, maxBufferCalls=5, notPermittedCalls=0 ]
count的值 = 4
服務(wù)正常運(yùn)行芹务,獲取用戶列表
執(zhí)行結(jié)束后:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=4, failedCalls=2, successCalls=2, maxBufferCalls=5, notPermittedCalls=0 ]
執(zhí)行開始前:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=4, failedCalls=2, successCalls=2, maxBufferCalls=5, notPermittedCalls=0 ]
count的值 = 5
異常A蝉绷,不需要被記錄,方法被降級(jí)了~~
降級(jí)方法中:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=4, failedCalls=2, successCalls=2, maxBufferCalls=5, notPermittedCalls=0 ]
執(zhí)行結(jié)束后:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=4, failedCalls=2, successCalls=2, maxBufferCalls=5, notPermittedCalls=0 ]
執(zhí)行開始前:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=4, failedCalls=2, successCalls=2, maxBufferCalls=5, notPermittedCalls=0 ]
count的值 = 6
異常B,需要被記錄,方法被降級(jí)了~~
降級(jí)方法中:state=OPEN , metrics[ failureRate=60.0, bufferedCalls=5, failedCalls=3, successCalls=2, maxBufferCalls=5, notPermittedCalls=0 ]
執(zhí)行結(jié)束后:state=OPEN , metrics[ failureRate=60.0, bufferedCalls=5, failedCalls=3, successCalls=2, maxBufferCalls=5, notPermittedCalls=0 ]
執(zhí)行開始前:state=OPEN , metrics[ failureRate=60.0, bufferedCalls=5, failedCalls=3, successCalls=2, maxBufferCalls=5, notPermittedCalls=0 ]
熔斷器已經(jīng)打開枣抱,拒絕訪問被保護(hù)方法~
熔斷器打開中:state=OPEN , metrics[ failureRate=60.0, bufferedCalls=5, failedCalls=3, successCalls=2, maxBufferCalls=5, notPermittedCalls=1 ]
執(zhí)行結(jié)束后:state=OPEN , metrics[ failureRate=60.0, bufferedCalls=5, failedCalls=3, successCalls=2, maxBufferCalls=5, notPermittedCalls=1 ]

注意到異常A發(fā)生的前后bufferedCalls熔吗、failedCallssuccessCalls三個(gè)參數(shù)的值都沒有沒有發(fā)生變化佳晶,說明白名單的優(yōu)先級(jí)高于黑名單桅狠,源碼中也有提到Ignoring an exception has priority over recording an exception

/**
* @see #ignoreExceptions(Class[]) ). Ignoring an exception has priority over recording an exception.
* <p>
* Example:
* recordExceptions(Throwable.class) and ignoreExceptions(RuntimeException.class)
* would capture all Errors and checked Exceptions, and ignore unchecked
* <p>
*/

同時(shí)也可以看出白名單所謂的忽略,是指不計(jì)入緩沖區(qū)中(即不算成功也不算失斀窝怼)中跌,有降級(jí)方法會(huì)調(diào)用降級(jí)方法,沒有降級(jí)方法會(huì)拋出異常菇篡,和其他異常無異漩符。

執(zhí)行開始前:state=OPEN , metrics[ failureRate=60.0, bufferedCalls=5, failedCalls=3, successCalls=2, maxBufferCalls=5, notPermittedCalls=0 ]
熔斷器已經(jīng)打開,拒絕訪問被保護(hù)方法~
熔斷器打開中:state=OPEN , metrics[ failureRate=60.0, bufferedCalls=5, failedCalls=3, successCalls=2, maxBufferCalls=5, notPermittedCalls=1 ]
執(zhí)行結(jié)束后:state=OPEN , metrics[ failureRate=60.0, bufferedCalls=5, failedCalls=3, successCalls=2, maxBufferCalls=5, notPermittedCalls=1 ]

當(dāng)環(huán)形緩沖區(qū)大小被填滿時(shí)會(huì)計(jì)算失敗率驱还,這時(shí)請求會(huì)被拒絕獲取不到count的值嗜暴,且notPermittedCalls會(huì)增加。


接下來我們實(shí)驗(yàn)一下多線程下熔斷器關(guān)閉和熔斷器半開兩種情況下緩沖環(huán)的區(qū)別议蟆,我們先開15個(gè)線程進(jìn)行調(diào)用測試熔斷器關(guān)閉時(shí)的緩沖環(huán)闷沥,熔斷之后等10s再開15個(gè)線程進(jìn)行調(diào)用測試熔斷器半開時(shí)的緩沖環(huán):

public class CircuitBreakerServiceImplTest{
    
    @Autowired
    private CircuitBreakerServiceImpl circuitService;
    
    @Test
    public void circuitBreakerThreadTest() throws InterruptedException {
        ExecutorService pool = Executors.newCachedThreadPool();
        for (int i=0; i<15; i++){
            pool.submit(
                // circuitService::circuitBreakerAOP
                circuitService::circuitBreakerNotAOP);
        }
        pool.shutdown();

        while (!pool.isTerminated());

        Thread.sleep(10000);
        log.info("熔斷器狀態(tài)已轉(zhuǎn)為半開");
        pool = Executors.newCachedThreadPool();
        for (int i=0; i<15; i++){
            pool.submit(
                // circuitService::circuitBreakerAOP
                circuitService::circuitBreakerNotAOP);
        }
        pool.shutdown();

        while (!pool.isTerminated());
        for (int i=0; i<10; i++){
            
        }
    }
}

15個(gè)線程都通過了熔斷器,由于正常返回需要查數(shù)據(jù)庫咐容,所以會(huì)慢很多狐赡,失敗率很快就達(dá)到了100%,而且觀察到如下的記錄:

異常B疟丙,需要被記錄,方法被降級(jí)了~~
降級(jí)方法中:state=OPEN , metrics[ failureRate=100.0, bufferedCalls=5, failedCalls=5, successCalls=0, maxBufferCalls=5, notPermittedCalls=0 ]

可以看出,雖然熔斷器已經(jīng)打開了鸟雏,可是異常B還是進(jìn)入了降級(jí)方法享郊,拋出的異常不是notPermittedCalls數(shù)量為0,說明在熔斷器轉(zhuǎn)換成打開之前所有請求都通過了熔斷器孝鹊,緩沖環(huán)不會(huì)控制線程的并發(fā)炊琉。

執(zhí)行結(jié)束后:state=OPEN , metrics[ failureRate=80.0, bufferedCalls=5, failedCalls=4, successCalls=1, maxBufferCalls=5, notPermittedCalls=0 ]
執(zhí)行結(jié)束后:state=OPEN , metrics[ failureRate=60.0, bufferedCalls=5, failedCalls=3, successCalls=2, maxBufferCalls=5, notPermittedCalls=0 ]
執(zhí)行結(jié)束后:state=OPEN , metrics[ failureRate=40.0, bufferedCalls=5, failedCalls=2, successCalls=3, maxBufferCalls=5, notPermittedCalls=0 ]
執(zhí)行結(jié)束后:state=OPEN , metrics[ failureRate=20.0, bufferedCalls=5, failedCalls=1, successCalls=4, maxBufferCalls=5, notPermittedCalls=0 ]

同時(shí)以上幾條正常執(zhí)行的服務(wù)完成后,熔斷器的失敗率在下降又活,說明熔斷器打開狀態(tài)下還是會(huì)計(jì)算失敗率苔咪,由于環(huán)形緩沖區(qū)大小為5,初步推斷成功的狀態(tài)會(huì)依次覆蓋最開始的幾個(gè)狀態(tài)柳骄,所以得到了上述結(jié)果团赏。

接下來分析后15個(gè)線程的結(jié)果

熔斷器狀態(tài)已轉(zhuǎn)為半開
執(zhí)行開始前:state=OPEN , metrics[ failureRate=0.0, bufferedCalls=5, failedCalls=0, successCalls=5, maxBufferCalls=5, notPermittedCalls=0 ]
執(zhí)行開始前:state=OPEN , metrics[ failureRate=0.0, bufferedCalls=5, failedCalls=0, successCalls=5, maxBufferCalls=5, notPermittedCalls=0 ]
執(zhí)行開始前:state=OPEN , metrics[ failureRate=0.0, bufferedCalls=5, failedCalls=0, successCalls=5, maxBufferCalls=5, notPermittedCalls=0 ]
執(zhí)行開始前:state=OPEN , metrics[ failureRate=0.0, bufferedCalls=5, failedCalls=0, successCalls=5, maxBufferCalls=5, notPermittedCalls=0 ]
執(zhí)行開始前:state=OPEN , metrics[ failureRate=0.0, bufferedCalls=5, failedCalls=0, successCalls=5, maxBufferCalls=5, notPermittedCalls=0 ]
執(zhí)行開始前:state=OPEN , metrics[ failureRate=0.0, bufferedCalls=5, failedCalls=0, successCalls=5, maxBufferCalls=5, notPermittedCalls=0 ]
count的值 = 16
服務(wù)正常運(yùn)行,獲取用戶列表
執(zhí)行開始前:state=OPEN , metrics[ failureRate=0.0, bufferedCalls=5, failedCalls=0, successCalls=5, maxBufferCalls=5, notPermittedCalls=0 ]
熔斷器狀態(tài)改變:2019-07-29T17:19:19.959+08:00[Asia/Shanghai]: CircuitBreaker 'backendA' changed state from OPEN to HALF_OPEN
count的值 = 18
count的值 = 17
服務(wù)正常運(yùn)行耐薯,獲取用戶列表
count的值 = 19
count的值 = 15

熔斷器狀態(tài)從打開到半開我設(shè)置的是5s舔清,前15個(gè)線程調(diào)用之后我等待了10s丝里,熔斷器應(yīng)該已經(jīng)變?yōu)榘腴_了,但是執(zhí)行開始前熔斷器的狀態(tài)卻是OPEN体谒,這是因?yàn)槟J(rèn)的配置項(xiàng)automaticTransitionFromOpenToHalfOpenEnabled=false杯聚,時(shí)間到了也不會(huì)自動(dòng)轉(zhuǎn)換,需要有新的請求來觸發(fā)熔斷器的狀態(tài)轉(zhuǎn)換抒痒。同時(shí)我們發(fā)現(xiàn)幌绍,好像狀態(tài)改變后還是進(jìn)了超過4個(gè)請求,似乎半開狀態(tài)的環(huán)并不能限制線程數(shù)故响?這是由于這些進(jìn)程是在熔斷器打開時(shí)一起進(jìn)來的傀广。為了更好的觀察環(huán)半開時(shí)候環(huán)大小是否限制線程數(shù),我們修改一下配置:

resilience4j:
  circuitbreaker:
    configs:
      myDefault:
        automaticTransitionFromOpenToHalfOpenEnabled: true # 是否自動(dòng)從打開到半開

我們再試一次:

熔斷器狀態(tài)已轉(zhuǎn)為半開
執(zhí)行開始前:state=HALF_OPEN , metrics[ failureRate=-1.0, bufferedCalls=0, failedCalls=0, successCalls=0, maxBufferCalls=4, notPermittedCalls=0 ]
執(zhí)行開始前:state=HALF_OPEN , metrics[ failureRate=-1.0, bufferedCalls=0, failedCalls=0, successCalls=0, maxBufferCalls=4, notPermittedCalls=0 ]
執(zhí)行開始前:state=HALF_OPEN , metrics[ failureRate=-1.0, bufferedCalls=0, failedCalls=0, successCalls=0, maxBufferCalls=4, notPermittedCalls=0 ]
count的值 = 15
count的值 = 16
服務(wù)正常運(yùn)行被去,獲取用戶列表
 異常B主儡,需要被記錄,方法被降級(jí)了~~
降級(jí)方法中:state=HALF_OPEN , metrics[ failureRate=-1.0, bufferedCalls=1, failedCalls=1, successCalls=0, maxBufferCalls=4, notPermittedCalls=0 ]
執(zhí)行結(jié)束后:state=HALF_OPEN , metrics[ failureRate=-1.0, bufferedCalls=1, failedCalls=1, successCalls=0, maxBufferCalls=4, notPermittedCalls=0 ]
count的值 = 17
異常A,不需要被記錄,方法被降級(jí)了~~
降級(jí)方法中:state=HALF_OPEN , metrics[ failureRate=-1.0, bufferedCalls=2, failedCalls=2, successCalls=0, maxBufferCalls=4, notPermittedCalls=0 ]
執(zhí)行開始前:state=HALF_OPEN , metrics[ failureRate=-1.0, bufferedCalls=2, failedCalls=2, successCalls=0, maxBufferCalls=4, notPermittedCalls=0 ]
count的值 = 18
執(zhí)行開始前:state=HALF_OPEN , metrics[ failureRate=-1.0, bufferedCalls=2, failedCalls=2, successCalls=0, maxBufferCalls=4, notPermittedCalls=0 ]
異常B惨缆,需要被記錄,方法被降級(jí)了~~
降級(jí)方法中:state=HALF_OPEN , metrics[ failureRate=-1.0, bufferedCalls=3, failedCalls=3, successCalls=0, maxBufferCalls=4, notPermittedCalls=0 ]
執(zhí)行結(jié)束后:state=HALF_OPEN , metrics[ failureRate=-1.0, bufferedCalls=3, failedCalls=3, successCalls=0, maxBufferCalls=4, notPermittedCalls=0 ]
熔斷器已經(jīng)打開:2019-07-29T17:36:14.189+08:00[Asia/Shanghai]: CircuitBreaker 'backendA' recorded a call which was not permitted.
執(zhí)行開始前:state=HALF_OPEN , metrics[ failureRate=-1.0, bufferedCalls=2, failedCalls=2, successCalls=0, maxBufferCalls=4, notPermittedCalls=0 ]
執(zhí)行結(jié)束后:state=HALF_OPEN , metrics[ failureRate=-1.0, bufferedCalls=2, failedCalls=2, successCalls=0, maxBufferCalls=4, notPermittedCalls=0 ]
熔斷器已經(jīng)打開糜值,拒絕訪問被保護(hù)方法~

結(jié)果只有4個(gè)請求進(jìn)去了,可以看出雖然熔斷器狀態(tài)還是半開坯墨,但是已經(jīng)熔斷了寂汇,說明在半開狀態(tài)下,超過環(huán)大小的請求會(huì)被直接拒絕捣染。

綜上骄瓣,circuitbreaker的機(jī)制已經(jīng)被證實(shí),且十分清晰耍攘,以下為幾個(gè)需要注意的點(diǎn):

  • 失敗率的計(jì)算必須等環(huán)裝滿才會(huì)計(jì)算
  • 白名單優(yōu)先級(jí)高于黑名單且白名單上的異常會(huì)被忽略榕栏,不會(huì)占用緩沖環(huán)位置,即不會(huì)計(jì)入失敗率計(jì)算
  • 熔斷器打開時(shí)同樣會(huì)計(jì)算失敗率蕾各,當(dāng)狀態(tài)轉(zhuǎn)換為半開時(shí)重置為-1
  • 只要出現(xiàn)異常都可以調(diào)用降級(jí)方法扒磁,不論是在白名單還是黑名單
  • 熔斷器的緩沖環(huán)有兩個(gè),一個(gè)關(guān)閉時(shí)的緩沖環(huán)式曲,一個(gè)打開時(shí)的緩沖環(huán)
  • 熔斷器關(guān)閉時(shí)妨托,直至熔斷器狀態(tài)轉(zhuǎn)換前所有請求都會(huì)通過,不會(huì)受到限制
  • 熔斷器半開時(shí)吝羞,限制請求數(shù)為緩沖環(huán)的大小兰伤,其他請求會(huì)等待
  • 熔斷器從打開到半開的轉(zhuǎn)換默認(rèn)還需要請求進(jìn)行觸發(fā),也可通過automaticTransitionFromOpenToHalfOpenEnabled=true設(shè)置為自動(dòng)觸發(fā)

TimeLimiter

簡介

Hystrix不同钧排,Resilience4j將超時(shí)控制器從熔斷器中獨(dú)立出來敦腔,成為了一個(gè)單獨(dú)的組件,主要的作用就是對方法調(diào)用進(jìn)行超時(shí)控制恨溜。實(shí)現(xiàn)的原理和Hystrix相似会烙,都是通過調(diào)用Futureget方法來進(jìn)行超時(shí)控制负懦。

可配置參數(shù)

配置參數(shù) 默認(rèn)值 描述
timeoutDuration 1(s) 超時(shí)時(shí)間限定
cancelRunningFuture true 當(dāng)超時(shí)時(shí)是否關(guān)閉取消線程

測試前準(zhǔn)備

pom.xml

<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-timelimiter</artifactId>
    <version>0.16.0</version>
</dependency>

TimeLimiter沒有整合進(jìn)resilience4j-spring-boot中,需要單獨(dú)添加依賴

application.yml配置

timelimiter:
    timeoutDuration: 3000 # 超時(shí)時(shí)長
    cancelRunningFuture: true # 發(fā)生異常是否關(guān)閉線程

TimeLimiter沒有配置自動(dòng)注入柏腻,需要自己進(jìn)行注入纸厉,寫下面兩個(gè)文件進(jìn)行配置自動(dòng)注入:

TimeLimiterProperties

用于將application.yml中的配置轉(zhuǎn)換為TimeLimiterProperties對象:

@Data
@Component
@ConfigurationProperties(prefix = "resilience4j.timelimiter")
public class TimeLimiterProperties {

    private Duration timeoutDuration;

    private boolean cancelRunningFuture;
}

TimeLimiterConfiguration

TimeLimiterProperties對象寫入到TimeLimiter的配置中:

@Configuration
public class TimeLimiterConfiguration {

    @Autowired
    private TimeLimiterProperties timeLimiterProperties;

    @Bean
    public TimeLimiter timeLimiter(){
        return TimeLimiter.of(timeLimiterConfig());
    }

    private TimeLimiterConfig timeLimiterConfig(){
        return TimeLimiterConfig.custom()
                .timeoutDuration(timeLimiterProperties.getTimeoutDuration())
                .cancelRunningFuture(timeLimiterProperties.isCancelRunningFuture()).build();
    }
}

調(diào)用方法

還是以之前查詢用戶列表的后端服務(wù)為例。TimeLimiter目前僅支持程序式調(diào)用五嫂,還不能使用AOP的方式調(diào)用颗品。

因?yàn)?strong>TimeLimiter通常與CircuitBreaker聯(lián)合使用,很少單獨(dú)使用沃缘,所以直接介紹聯(lián)合使用的步驟躯枢。

TimeLimiter沒有注冊器,所以通過@Autowired注解自動(dòng)注入依賴直接使用槐臀,因?yàn)?strong>TimeLimter是基于Futureget方法的锄蹂,所以需要?jiǎng)?chuàng)建線程池,然后通過線程池的submit方法獲取Future對象:

public class CircuitBreakerServiceImpl {
    
    @Autowired
    private RemoteServiceConnector remoteServiceConnector;
    
    @Autowired
    private CircuitBreakerRegistry circuitBreakerRegistry;
    
    @Autowired
    private TimeLimiter timeLimiter;

    public List<User> circuitBreakerTimeLimiter(){
        // 通過注冊器獲取熔斷器的實(shí)例
        CircuitBreaker circuitBreaker = circuitBreakerRegistry.circuitBreaker("backendA");
        CircuitBreakerUtil.getCircuitBreakerStatus("執(zhí)行開始前:", circuitBreaker);
        // 創(chuàng)建單線程的線程池
        ExecutorService pool = Executors.newSingleThreadExecutor();
        //將被保護(hù)方法包裝為能夠返回Future的supplier函數(shù)
        Supplier<Future<List<User>>> futureSupplier = () -> pool.submit(remoteServiceConnector::process);
        // 先用限時(shí)器包裝水慨,再用熔斷器包裝
        Callable<List<User>> restrictedCall = TimeLimiter.decorateFutureSupplier(timeLimiter, futureSupplier);
        Callable<List<User>> chainedCallable = CircuitBreaker.decorateCallable(circuitBreaker, restrictedCall);
        // 使用Try.of().recover()調(diào)用并進(jìn)行降級(jí)處理
        Try<List<User>> result = Try.of(chainedCallable::call)
            .recover(CallNotPermittedException.class, throwable ->{
                log.info("熔斷器已經(jīng)打開得糜,拒絕訪問被保護(hù)方法~");
                CircuitBreakerUtil.getCircuitBreakerStatus("熔斷器打開中", circuitBreaker);
                List<User> users = new ArrayList();
                return users;
            })
            .recover(throwable -> {
                log.info(throwable.getLocalizedMessage() + ",方法被降級(jí)了~~");
                CircuitBreakerUtil.getCircuitBreakerStatus("降級(jí)方法中:",circuitBreaker);
                List<User> users = new ArrayList();
                return users;
            });
        CircuitBreakerUtil.getCircuitBreakerStatus("執(zhí)行結(jié)束后:", circuitBreaker);
        return result.get();
    }
}

使用測試

異常ABapplication.yml文件中沒有修改:

recordExceptions: # 記錄的異常
    - com.example.resilience4j.exceptions.BusinessBException
    - com.example.resilience4j.exceptions.BusinessAException
ignoreExceptions: # 忽略的異常
    - com.example.resilience4j.exceptions.BusinessAException

使用另一個(gè)遠(yuǎn)程服務(wù)接口的實(shí)現(xiàn),將num%4==3的情況讓線程休眠5s晰洒,大于我們TimeLimiter的限制時(shí)間:

public class RemoteServiceImpl implements RemoteService {
    
    private static AtomicInteger count = new AtomicInteger(0);

    public List<User> process() {
        int num = count.getAndIncrement();
        log.info("count的值 = " + num);
        if (num % 4 == 1){
            throw new BusinessAException("異常A朝抖,不需要被記錄");
        }
        if (num % 4 == 2){
            throw new BusinessBException("異常B,需要被記錄");
        }
        if (num % 4 == 3){
            Thread.sleep(5000);
        }
        log.info("服務(wù)正常運(yùn)行谍珊,獲取用戶列表");
        // 模擬數(shù)據(jù)庫的正常查詢
        return repository.findAll();
    }
}

把調(diào)用方法進(jìn)行單元測試治宣,循環(huán)10遍:

public class CircuitBreakerServiceImplTest{
    
    @Autowired
    private CircuitBreakerServiceImpl circuitService;
    
    @Test
    public void circuitBreakerTimeLimiterTest() {
        for (int i=0; i<10; i++){
            circuitService.circuitBreakerTimeLimiter();
        }
    }
}

看下運(yùn)行結(jié)果:

執(zhí)行開始前:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=0, failedCalls=0, successCalls=0, maxBufferCalls=5, notPermittedCalls=0 ]
count的值 = 0
服務(wù)正常運(yùn)行,獲取用戶列表
執(zhí)行結(jié)束后:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=1, failedCalls=0, successCalls=1, maxBufferCalls=5, notPermittedCalls=0 ]
執(zhí)行開始前:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=1, failedCalls=0, successCalls=1, maxBufferCalls=5, notPermittedCalls=0 ]
count的值 = 1
com.example.resilience4j.exceptions.BusinessAException: 異常A砌滞,不需要被記錄,方法被降級(jí)了~~
降級(jí)方法中:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=1, failedCalls=0, successCalls=1, maxBufferCalls=5, notPermittedCalls=0 ]
執(zhí)行結(jié)束后:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=1, failedCalls=0, successCalls=1, maxBufferCalls=5, notPermittedCalls=0 ]
執(zhí)行開始前:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=1, failedCalls=0, successCalls=1, maxBufferCalls=5, notPermittedCalls=0 ]
count的值 = 2
com.example.resilience4j.exceptions.BusinessBException: 異常B侮邀,需要被記錄,方法被降級(jí)了~~
降級(jí)方法中:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=1, failedCalls=0, successCalls=1, maxBufferCalls=5, notPermittedCalls=0 ]
執(zhí)行結(jié)束后:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=1, failedCalls=0, successCalls=1, maxBufferCalls=5, notPermittedCalls=0 ]
執(zhí)行開始前:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=1, failedCalls=0, successCalls=1, maxBufferCalls=5, notPermittedCalls=0 ]
count的值 = 3
null,方法被降級(jí)了~~
降級(jí)方法中:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=1, failedCalls=0, successCalls=1, maxBufferCalls=5, notPermittedCalls=0 ]
執(zhí)行結(jié)束后:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=1, failedCalls=0, successCalls=1, maxBufferCalls=5, notPermittedCalls=0 ]

發(fā)現(xiàn)熔斷器任何異常和超時(shí)都沒有失敗。贝润。完全不會(huì)觸發(fā)熔斷豌拙,這是為什么呢?我們把異常toString()看一下:

java.util.concurrent.ExecutionException: com.example.resilience4j.exceptions.BusinessBException: 異常B题暖,需要被記錄,方法被降級(jí)了~~
java.util.concurrent.TimeoutException,方法被降級(jí)了~~

這下原因就很明顯了,線程池會(huì)將線程中的任何異常包裝為ExecutionException捉超,而熔斷器沒有把異常解包胧卤,由于我們設(shè)置了黑名單,而熔斷器又沒有找到黑名單上的異常拼岳,所以失效了枝誊。這是一個(gè)已知的bug,會(huì)在下個(gè)版本(0.16.0之后)中修正惜纸,目前來說如果需要同時(shí)使用TimeLimiterCircuitBreaker的話叶撒,黑白名單的設(shè)置是不起作用的绝骚,需要自定義自己的謂詞邏輯,并在test()方法中將異常解包進(jìn)行判斷祠够,比如像下面這樣:

public class RecordFailurePredicate implements Predicate<Throwable> {

    @Override
    public boolean test(Throwable throwable) {
        if (throwable.getCause() instanceof BusinessAException) return false;
        else return true;
    }
}

然后在application.yml文件中指定這個(gè)類作為判斷類:

circuitbreaker:
    configs:
      default:
        recordFailurePredicate: com.example.resilience4j.predicate.RecordFailurePredicate

就能自定義自己的黑白名單了压汪,我們再運(yùn)行一次試試:

java.util.concurrent.TimeoutException,方法被降級(jí)了~~
降級(jí)方法中:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=3, failedCalls=2, successCalls=1, maxBufferCalls=5, notPermittedCalls=0 ]
執(zhí)行結(jié)束后:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=3, failedCalls=2, successCalls=1, maxBufferCalls=5, notPermittedCalls=0 ]

可以看出,TimeLimiter已經(jīng)生效了古瓤,同時(shí)CircuitBreaker也正常工作止剖。

Note:

最新版0.17.0,該bug已經(jīng)修復(fù)落君,黑白名單可以正常使用穿香。

Retry

簡介

同熔斷器一樣,重試組件也提供了注冊器绎速,可以通過注冊器獲取實(shí)例來進(jìn)行重試皮获,同樣可以跟熔斷器配合使用。

可配置參數(shù)

配置參數(shù) 默認(rèn)值 描述
maxAttempts 3 最大重試次數(shù)
waitDuration 500[ms] 固定重試間隔
intervalFunction numberOfAttempts -> waitDuration 用來改變重試時(shí)間間隔纹冤,可以選擇指數(shù)退避或者隨機(jī)時(shí)間間隔
retryOnResultPredicate result -> false 自定義結(jié)果重試規(guī)則洒宝,需要重試的返回true
retryOnExceptionPredicate throwable -> true 自定義異常重試規(guī)則,需要重試的返回true
retryExceptions empty 需要重試的異常列表
ignoreExceptions empty 需要忽略的異常列表

測試前準(zhǔn)備

pom.xml

不需要引入新的依賴赵哲,已經(jīng)集成在resilience4j-spring-boot中了

application.yml配置

resilience4j:
  retry:
    configs:
      default:
      maxRetryAttempts: 3
      waitDuration: 10s
      enableExponentialBackoff: true    # 是否允許使用指數(shù)退避算法進(jìn)行重試間隔時(shí)間的計(jì)算
      expontialBackoffMultiplier: 2     # 指數(shù)退避算法的乘數(shù)
      enableRandomizedWait: false       # 是否允許使用隨機(jī)的重試間隔
      randomizedWaitFactor: 0.5         # 隨機(jī)因子
      resultPredicate: com.example.resilience4j.predicate.RetryOnResultPredicate    
      retryExceptionPredicate: com.example.resilience4j.predicate.RetryOnExceptionPredicate
      retryExceptions:
        - com.example.resilience4j.exceptions.BusinessBException
        - com.example.resilience4j.exceptions.BusinessAException
        - io.github.resilience4j.circuitbreaker.CallNotPermittedException
      ignoreExceptions:
        - io.github.resilience4j.circuitbreaker.CallNotPermittedException
      instances:
        backendA:
          baseConfig: default
          waitDuration: 5s
        backendB:
          baseConfig: default
          maxRetryAttempts: 2   

application.yml可以配置的參數(shù)多出了幾個(gè)enableExponentialBackoff待德、expontialBackoffMultiplierenableRandomizedWait枫夺、randomizedWaitFactor将宪,分別代表是否允許指數(shù)退避間隔時(shí)間,指數(shù)退避的乘數(shù)橡庞、是否允許隨機(jī)間隔時(shí)間较坛、隨機(jī)因子,注意指數(shù)退避和隨機(jī)間隔不能同時(shí)啟用扒最。

用于監(jiān)控重試組件狀態(tài)及事件的工具類

同樣為了監(jiān)控重試組件丑勤,寫一個(gè)工具類:

@Log4j2
public class RetryUtil {

    /**
     * @Description: 獲取重試的狀態(tài)
     */
    public static void getRetryStatus(String time, Retry retry){
        Retry.Metrics metrics = retry.getMetrics();
        long failedRetryNum = metrics.getNumberOfFailedCallsWithRetryAttempt();
        long failedNotRetryNum = metrics.getNumberOfFailedCallsWithoutRetryAttempt();
        long successfulRetryNum = metrics.getNumberOfSuccessfulCallsWithRetryAttempt();
        long successfulNotyRetryNum = metrics.getNumberOfSuccessfulCallsWithoutRetryAttempt();

        log.info(time + "state=" + " metrics[ failedRetryNum=" + failedRetryNum +
                ", failedNotRetryNum=" + failedNotRetryNum +
                ", successfulRetryNum=" + successfulRetryNum +
                ", successfulNotyRetryNum=" + successfulNotyRetryNum +
                " ]"
        );
    }

    /**
     * @Description: 監(jiān)聽重試事件
     */
    public static void addRetryListener(Retry retry){
        retry.getEventPublisher()
                .onSuccess(event -> log.info("服務(wù)調(diào)用成功:" + event.toString()))
                .onError(event -> log.info("服務(wù)調(diào)用失敗:" + event.toString()))
                .onIgnoredError(event -> log.info("服務(wù)調(diào)用失敗吧趣,但異常被忽略:" + event.toString()))
                .onRetry(event -> log.info("重試:第" + event.getNumberOfRetryAttempts() + "次"))
        ;
    }
}

調(diào)用方法

還是以之前查詢用戶列表的服務(wù)為例法竞。Retry支持AOP和程序式兩種方式的調(diào)用.

程序式的調(diào)用方法

CircuitBreaker的調(diào)用方式差不多,和熔斷器配合使用有兩種調(diào)用方式强挫,一種是先用重試組件裝飾岔霸,再用熔斷器裝飾,這時(shí)熔斷器的失敗需要等重試結(jié)束才計(jì)算俯渤,另一種是先用熔斷器裝飾呆细,再用重試組件裝飾,這時(shí)每次調(diào)用服務(wù)都會(huì)記錄進(jìn)熔斷器的緩沖環(huán)中八匠,需要注意的是絮爷,第二種方式需要把CallNotPermittedException放進(jìn)重試組件的白名單中趴酣,因?yàn)槿蹟嗥鞔蜷_時(shí)重試是沒有意義的:

public class CircuitBreakerServiceImpl {
    
    @Autowired
    private RemoteServiceConnector remoteServiceConnector;
    
    @Autowired
    private CircuitBreakerRegistry circuitBreakerRegistry;
    
    @Autowired
    private RetryRegistry retryRegistry;

    public List<User> circuitBreakerRetryNotAOP(){
        // 通過注冊器獲取熔斷器的實(shí)例
        CircuitBreaker circuitBreaker = circuitBreakerRegistry.circuitBreaker("backendA");
        // 通過注冊器獲取重試組件實(shí)例
        Retry retry = retryRegistry.retry("backendA");
        CircuitBreakerUtil.getCircuitBreakerStatus("執(zhí)行開始前:", circuitBreaker);
        // 先用重試組件包裝,再用熔斷器包裝
        CheckedFunction0<List<User>> checkedSupplier = Retry.decorateCheckedSupplier(retry, remoteServiceConnector::process);
        CheckedFunction0<List<User>> chainedSupplier = CircuitBreaker .decorateCheckedSupplier(circuitBreaker, checkedSupplier);
        // 使用Try.of().recover()調(diào)用并進(jìn)行降級(jí)處理
        Try<List<User>> result = Try.of(chainedSupplier).
                recover(CallNotPermittedException.class, throwable -> {
                    log.info("已經(jīng)被熔斷坑夯,停止重試");
                    return new ArrayList<>();
                })
                .recover(throwable -> {
                    log.info("重試失敗: " + throwable.getLocalizedMessage());
                    return new ArrayList<>();
                });
        RetryUtil.getRetryStatus("執(zhí)行結(jié)束: ", retry);
        CircuitBreakerUtil.getCircuitBreakerStatus("執(zhí)行結(jié)束:", circuitBreaker);
        return result.get();
    }
}

AOP式的調(diào)用方法

首先在連接器方法上使用@Retry(name="",fallbackMethod="")注解岖寞,其中name是要使用的重試器實(shí)例的名稱,fallbackMethod是要使用的降級(jí)方法:

public RemoteServiceConnector{
    
    @CircuitBreaker(name = "backendA", fallbackMethod = "fallBack")
    @Retry(name = "backendA", fallbackMethod = "fallBack")
    public List<User> process() throws TimeoutException, InterruptedException {
        List<User> users;
        users = remoteServic.process();
        return users;
    }
} 

要求和熔斷器一致渊涝,但是需要注意同時(shí)注解重試組件和熔斷器的話慎璧,是按照第二種方案來的,即每一次請求都會(huì)被熔斷器記錄跨释。

之后直接調(diào)用方法:

public class CircuitBreakerServiceImpl {
    
    @Autowired
    private RemoteServiceConnector remoteServiceConnector;
    
    @Autowired
    private CircuitBreakerRegistry circuitBreakerRegistry;
    
    @Autowired
    private RetryRegistry retryRegistry;

    public List<User> circuitBreakerRetryAOP() throws TimeoutException, InterruptedException {
        List<User> result = remoteServiceConnector.process();
        RetryUtil.getRetryStatus("執(zhí)行結(jié)束:", retryRegistry.retry("backendA"));
        CircuitBreakerUtil
            .getCircuitBreakerStatus("執(zhí)行結(jié)束:", circuitBreakerRegistry.circuitBreaker("backendA"));
        return result;
    }
}

使用測試

異常ABapplication.yml文件中設(shè)定為都需要重試胸私,因?yàn)槭褂玫谝环N方案,所以不需要將CallNotPermittedException設(shè)定在重試組件的白名單中鳖谈,同時(shí)為了測試重試過程中的異常是否會(huì)被熔斷器記錄岁疼,將異常A從熔斷器白名單中去除:

recordExceptions: # 記錄的異常
    - com.example.resilience4j.exceptions.BusinessBException
    - com.example.resilience4j.exceptions.BusinessAException
ignoreExceptions: # 忽略的異常
#   - com.example.resilience4j.exceptions.BusinessAException
# ...
resultPredicate: com.example.resilience4j.predicate.RetryOnResultPredicate
retryExceptions:
    - com.example.resilience4j.exceptions.BusinessBException
    - com.example.resilience4j.exceptions.BusinessAException
    - io.github.resilience4j.circuitbreaker.CallNotPermittedException
ignoreExceptions:
#   - io.github.resilience4j.circuitbreaker.CallNotPermittedException

使用另一個(gè)遠(yuǎn)程服務(wù)接口的實(shí)現(xiàn),將num%4==2的情況返回null缆娃,測試根據(jù)返回結(jié)果進(jìn)行重試的功能:

public class RemoteServiceImpl implements RemoteService {

    private static AtomicInteger count = new AtomicInteger(0);

    public List<User> process() {
        int num = count.getAndIncrement();
        log.info("count的值 = " + num);
        if (num % 4 == 1){
            throw new BusinessAException("異常A捷绒,需要重試");
        }
        if (num % 4 == 2){
            return null;
        }
        if (num % 4 == 3){
            throw new BusinessBException("異常B,需要重試");
        }
        log.info("服務(wù)正常運(yùn)行贯要,獲取用戶列表");
        // 模擬數(shù)據(jù)庫的正常查詢
        return repository.findAll();
    }
}

同時(shí)添加一個(gè)類自定義哪些返回值需要重試暖侨,設(shè)定為返回值為空就進(jìn)行重試,這樣num % 4 == 2時(shí)就可以測試不拋異常崇渗,根據(jù)返回結(jié)果進(jìn)行重試了:

public class RetryOnResultPredicate implements Predicate {

    @Override
    public boolean test(Object o) {
        return o == null ? true : false;
    }
}

使用CircuitBreakerServiceImpl中的AOP或者程序式調(diào)用方法進(jìn)行單元測試字逗,循環(huán)調(diào)用10次:

public class CircuitBreakerServiceImplTest{
    
    @Autowired
    private CircuitBreakerServiceImpl circuitService;
    
    @Test
    public void circuitBreakerRetryTest() {
        for (int i=0; i<10; i++){
            // circuitService.circuitBreakerRetryAOP();
            circuitService.circuitBreakerRetryNotAOP();
        }
    }
}

看一下運(yùn)行結(jié)果:

count的值 = 0
服務(wù)正常運(yùn)行尚揣,獲取用戶列表
執(zhí)行結(jié)束: state= metrics[ failedRetryNum=0, failedNotRetryNum=0, successfulRetryNum=0, successfulNotyRetryNum=1 ]
執(zhí)行結(jié)束:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=1, failedCalls=0, successCalls=1, maxBufferCalls=5, notPermittedCalls=0 ]
count的值 = 1
重試:第1次
count的值 = 2
重試:第2次
count的值 = 3
服務(wù)調(diào)用失旛锏酢:2019-07-09T19:06:59.705+08:00[Asia/Shanghai]: Retry 'backendA' recorded a failed retry attempt. Number of retry attempts: '3', Last exception was: 'com.example.resilience4j.exceptions.BusinessBException: 異常B,需要重試'.
重試失敗: 異常B刨摩,需要重試
執(zhí)行結(jié)束: state= metrics[ failedRetryNum=1, failedNotRetryNum=0, successfulRetryNum=0, successfulNotyRetryNum=1 ]
執(zhí)行結(jié)束:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=2, failedCalls=1, successCalls=1, maxBufferCalls=5, notPermittedCalls=0 ]

這部分結(jié)果可以看出來跟狱,重試最大次數(shù)設(shè)置為3結(jié)果其實(shí)只重試了2次俭厚,服務(wù)共執(zhí)行了3次,重試3次后熔斷器只記錄了1次驶臊。而且返回值為null時(shí)也確實(shí)進(jìn)行重試了挪挤。

服務(wù)正常運(yùn)行,獲取用戶列表
執(zhí)行結(jié)束: state= metrics[ failedRetryNum=2, failedNotRetryNum=0, successfulRetryNum=0, successfulNotyRetryNum=3 ]
執(zhí)行結(jié)束:state=OPEN , metrics[ failureRate=40.0, bufferedCalls=5, failedCalls=2, successCalls=3, maxBufferCalls=5, notPermittedCalls=0 ]
已經(jīng)被熔斷关翎,停止重試
執(zhí)行結(jié)束: state= metrics[ failedRetryNum=2, failedNotRetryNum=0, successfulRetryNum=0, successfulNotyRetryNum=3 ]
執(zhí)行結(jié)束:state=OPEN , metrics[ failureRate=40.0, bufferedCalls=5, failedCalls=2, successCalls=3, maxBufferCalls=5, notPermittedCalls=1 ]

當(dāng)熔斷之后不會(huì)再進(jìn)行重試扛门。

接下來我修改一下調(diào)用服務(wù)的實(shí)現(xiàn):

public class RemoteServiceImpl implements RemoteService {

    private static AtomicInteger count = new AtomicInteger(0);

    public List<User> process() {
        int num = count.getAndIncrement();
        log.info("count的值 = " + num);
        if (num % 4 == 1){
            throw new BusinessAException("異常A,需要重試");
        }
        if (num % 4 == 3){
            return null;
        }
        if (num % 4 == 2){
            throw new BusinessBException("異常B笤休,需要重試");
        }
        log.info("服務(wù)正常運(yùn)行,獲取用戶列表");
        // 模擬數(shù)據(jù)庫的正常查詢
        return repository.findAll();
    }
}

num%4==2變成異常B症副,num%4==3變成返回null店雅,看一下最后一次重試返回值為null屬于重試成功還是重試失敗政基。

運(yùn)行結(jié)果如下:

count的值 = 0
服務(wù)正常運(yùn)行,獲取用戶列表
執(zhí)行結(jié)束: state= metrics[ failedRetryNum=0, failedNotRetryNum=0, successfulRetryNum=0, successfulNotyRetryNum=1 ]
執(zhí)行結(jié)束:state=CLOSED , metrics[ failureRate=-1.0, bufferedCalls=1, failedCalls=0, successCalls=1, maxBufferCalls=5, notPermittedCalls=0 ]
count的值 = 1
重試:第1次
count的值 = 2
重試:第2次
count的值 = 3
服務(wù)調(diào)用成功:2019-07-09T19:17:35.836+08:00[Asia/Shanghai]: Retry 'backendA' recorded a successful retry attempt. Number of retry attempts: '3', Last exception was: 'com.example.resilience4j.exceptions.BusinessBException: 異常B闹啦,需要重試'.

如上可知如果最后一次重試不拋出異常就算作重試成功沮明,不管結(jié)果是否需要繼續(xù)重試。

Bulkhead

簡介

Resilence4jBulkhead提供兩種實(shí)現(xiàn)窍奋,一種是基于信號(hào)量的荐健,另一種是基于有等待隊(duì)列的固定大小的線程池的,由于基于信號(hào)量的Bulkhead能很好地在多線程和I/O模型下工作琳袄,所以選擇介紹基于信號(hào)量的Bulkhead的使用江场。

可配置參數(shù)

配置參數(shù) 默認(rèn)值 描述
maxConcurrentCalls 25 可允許的最大并發(fā)線程數(shù)
maxWaitDuration 0 嘗試進(jìn)入飽和艙壁時(shí)應(yīng)阻止線程的最大時(shí)間

測試前準(zhǔn)備

pom.xml

不需要引入新的依賴,已經(jīng)集成在resilience4j-spring-boot中了

application.yml配置

resilience4j:
  bulkhead:
    configs:
      default:
        maxConcurrentCalls: 10
        maxWaitDuration: 1000
    instances:
      backendA:
        baseConfig: default
        maxConcurrentCalls: 3
      backendB:
        baseConfig: default
        maxWaitDuration: 100

CircuitBreaker差不多窖逗,都是可以通過繼承覆蓋配置設(shè)定實(shí)例的址否。

用于監(jiān)控Bulkhead狀態(tài)及事件的工具類

同樣為了監(jiān)控Bulkhead組件,寫一個(gè)工具類:

@Log4j2
public class BulkhdadUtil {

    /**
     * @Description: 獲取bulkhead的狀態(tài)
     */
    public static void getBulkheadStatus(String time, Bulkhead bulkhead){
        Bulkhead.Metrics metrics = bulkhead.getMetrics();
        // Returns the number of parallel executions this bulkhead can support at this point in time.
        int availableConcurrentCalls =  metrics.getAvailableConcurrentCalls();
        // Returns the configured max amount of concurrent calls
        int maxAllowedConcurrentCalls = metrics.getMaxAllowedConcurrentCalls();

        log.info(time  + ", metrics[ availableConcurrentCalls=" + availableConcurrentCalls +
                ", maxAllowedConcurrentCalls=" + maxAllowedConcurrentCalls + " ]");
    }

    /**
     * @Description: 監(jiān)聽bulkhead事件
     */
    public static void addBulkheadListener(Bulkhead bulkhead){
        bulkhead.getEventPublisher()
                .onCallFinished(event -> log.info(event.toString()))
                .onCallPermitted(event -> log.info(event.toString()))
                .onCallRejected(event -> log.info(event.toString()));
    }
}

調(diào)用方法

還是以之前查詢用戶列表的服務(wù)為例碎紊。Bulkhead支持AOP和程序式兩種方式的調(diào)用佑附。

程序式的調(diào)用方法

調(diào)用方法都類似,裝飾方法之后用Try.of().recover()來執(zhí)行:

public class BulkheadServiceImpl {

    @Autowired
    private RemoteServiceConnector remoteServiceConnector;

    @Autowired
    private BulkheadRegistry bulkheadRegistry;
    
    public List<User> bulkheadNotAOP(){
        // 通過注冊器獲得Bulkhead實(shí)例
        Bulkhead bulkhead = bulkheadRegistry.bulkhead("backendA");
        BulkhdadUtil.getBulkheadStatus("開始執(zhí)行前: ", bulkhead);
        // 通過Try.of().recover()調(diào)用裝飾后的服務(wù)
        Try<List<User>> result = Try.of(
            Bulkhead.decorateCheckedSupplier(bulkhead, remoteServiceConnector::process))
            .recover(BulkheadFullException.class, throwable -> {
                log.info("服務(wù)失敗: " + throwable.getLocalizedMessage());
                return new ArrayList();
            });
        BulkhdadUtil.getBulkheadStatus("執(zhí)行結(jié)束: ", bulkhead);
        return result.get();
    }
}

AOP式的調(diào)用方法

首先在連接器方法上使用@Bulkhead(name="", fallbackMethod="", type="")注解仗考,其中name是要使用的Bulkhead實(shí)例的名稱音同,fallbackMethod是要使用的降級(jí)方法,type是選擇信號(hào)量或線程池的Bulkhead

public RemoteServiceConnector{
    
    @Bulkhead(name = "backendA", fallbackMethod = "fallback", type = Bulkhead.Type.SEMAPHORE)
    public List<User> process() throws TimeoutException, InterruptedException {
        List<User> users;
        users = remoteServic.process();
        return users;
    }
    
    private List<User> fallback(BulkheadFullException e){
        log.info("服務(wù)失敗: " + e.getLocalizedMessage());
        return new ArrayList();
    }
} 

如果Retry秃嗜、CircuitBreaker权均、Bulkhead同時(shí)注解在方法上,默認(rèn)的順序是Retry>CircuitBreaker>Bulkhead痪寻,即先控制并發(fā)再熔斷最后重試螺句,之后直接調(diào)用方法:

public class BulkheadServiceImpl {
    
    @Autowired
    private RemoteServiceConnector remoteServiceConnector;
    
    @Autowired
    private BulkheadRegistry bulkheadRegistry;

    public List<User> bulkheadAOP() throws TimeoutException, InterruptedException {
        List<User> result = remoteServiceConnector.process();
        BulkheadUtil.getBulkheadStatus("執(zhí)行結(jié)束:", bulkheadRegistry.retry("backendA"));
        return result;
    }
}

使用測試

application.yml文件中將backenA線程數(shù)限制為1,便于觀察橡类,最大等待時(shí)間為1s蛇尚,超過1s的會(huì)走降級(jí)方法:

instances:
    backendA:
        baseConfig: default
        maxConcurrentCalls: 1

使用另一個(gè)遠(yuǎn)程服務(wù)接口的實(shí)現(xiàn),不拋出異常顾画,當(dāng)做正常服務(wù)進(jìn)行:

public class RemoteServiceImpl implements RemoteService {

    private static AtomicInteger count = new AtomicInteger(0);

    public List<User> process() {
        int num = count.getAndIncrement();
        log.info("count的值 = " + num);
        log.info("服務(wù)正常運(yùn)行取劫,獲取用戶列表");
        // 模擬數(shù)據(jù)庫正常查詢
        return repository.findAll();
    }
}

用線程池調(diào)5個(gè)線程去請求服務(wù):

public class BulkheadServiceImplTest{
    
    @Autowired
    private BulkheadServiceImpl bulkheadService;
    
    @Autowired
    private BulkheadRegistry bulkheadRegistry;
    
    @Test
    public void bulkheadTest() {
        BulkhdadUtil.addBulkheadListener(bulkheadRegistry.bulkhead("backendA"));
        ExecutorService pool = Executors.newCachedThreadPool();
        for (int i=0; i<5; i++){
            pool.submit(() -> {
                // bulkheadService.bulkheadAOP();
                bulkheadService.bulkheadNotAOP();
            });
        }
        pool.shutdown();

        while (!pool.isTerminated());
        }
    }
}

看一下運(yùn)行結(jié)果:

開始執(zhí)行前: , metrics[ availableConcurrentCalls=1, maxAllowedConcurrentCalls=1 ]
開始執(zhí)行前: , metrics[ availableConcurrentCalls=1, maxAllowedConcurrentCalls=1 ]
開始執(zhí)行前: , metrics[ availableConcurrentCalls=1, maxAllowedConcurrentCalls=1 ]
開始執(zhí)行前: , metrics[ availableConcurrentCalls=1, maxAllowedConcurrentCalls=1 ]
Bulkhead 'backendA' permitted a call.
count的值 = 0
服務(wù)正常運(yùn)行,獲取用戶列表
開始執(zhí)行前: , metrics[ availableConcurrentCalls=0, maxAllowedConcurrentCalls=1 ]
Bulkhead 'backendA' rejected a call.
Bulkhead 'backendA' rejected a call.
Bulkhead 'backendA' rejected a call.
Bulkhead 'backendA' rejected a call.
服務(wù)失敗: Bulkhead 'backendA' is full and does not permit further calls
執(zhí)行結(jié)束: , metrics[ availableConcurrentCalls=0, maxAllowedConcurrentCalls=1 ]
服務(wù)失敗: Bulkhead 'backendA' is full and does not permit further calls
執(zhí)行結(jié)束: , metrics[ availableConcurrentCalls=0, maxAllowedConcurrentCalls=1 ]
服務(wù)失敗: Bulkhead 'backendA' is full and does not permit further calls
執(zhí)行結(jié)束: , metrics[ availableConcurrentCalls=0, maxAllowedConcurrentCalls=1 ]
服務(wù)失敗: Bulkhead 'backendA' is full and does not permit further calls
執(zhí)行結(jié)束: , metrics[ availableConcurrentCalls=0, maxAllowedConcurrentCalls=1 ]
Bulkhead 'backendA' has finished a call.
執(zhí)行結(jié)束: , metrics[ availableConcurrentCalls=1, maxAllowedConcurrentCalls=1 ]

由上可以看出研侣,5個(gè)請求只有一個(gè)進(jìn)入谱邪,其余觸發(fā)rejected事件,然后自動(dòng)進(jìn)入降級(jí)方法庶诡。接下來我們把等待時(shí)間稍微加長一些:

instances:
    backendA:
        baseConfig: default
        maxConcurrentCalls: 1
        maxWaitDuration: 5000

再運(yùn)行一次:

開始執(zhí)行前: , metrics[ availableConcurrentCalls=1, maxAllowedConcurrentCalls=1 ]
開始執(zhí)行前: , metrics[ availableConcurrentCalls=1, maxAllowedConcurrentCalls=1 ]
開始執(zhí)行前: , metrics[ availableConcurrentCalls=1, maxAllowedConcurrentCalls=1 ]
開始執(zhí)行前: , metrics[ availableConcurrentCalls=1, maxAllowedConcurrentCalls=1 ]
開始執(zhí)行前: , metrics[ availableConcurrentCalls=1, maxAllowedConcurrentCalls=1 ]
Bulkhead 'backendA' permitted a call.
count的值 = 0
服務(wù)正常運(yùn)行惦银,獲取用戶列表
Bulkhead 'backendA' permitted a call.
count的值 = 1
Bulkhead 'backendA' has finished a call.
服務(wù)正常運(yùn)行,獲取用戶列表
執(zhí)行結(jié)束: , metrics[ availableConcurrentCalls=0, maxAllowedConcurrentCalls=1 ]
Bulkhead 'backendA' has finished a call.
執(zhí)行結(jié)束: , metrics[ availableConcurrentCalls=1, maxAllowedConcurrentCalls=1 ]
Bulkhead 'backendA' permitted a call.

前面的線程沒有馬上被拒絕,而是等待了一段時(shí)間再執(zhí)行扯俱。

RateLimiter

簡介

高頻控制是可以限制服務(wù)調(diào)用頻率书蚪,Resilience4jRateLimiter可以對頻率進(jìn)行納秒級(jí)別的控制,在每一個(gè)周期刷新可以調(diào)用的次數(shù)迅栅,還可以設(shè)定線程等待權(quán)限的時(shí)間殊校。

可配置參數(shù)

配置參數(shù) 默認(rèn)值 描述
timeoutDuration 5[s] 線程等待權(quán)限的默認(rèn)等待時(shí)間
limitRefreshPeriod 500[ns] 權(quán)限刷新的時(shí)間,每個(gè)周期結(jié)束后读存,RateLimiter將會(huì)把權(quán)限計(jì)數(shù)設(shè)置為limitForPeriod的值
limiteForPeriod 50 一個(gè)限制刷新期間的可用權(quán)限數(shù)

測試前準(zhǔn)備

pom.xml

不需要引入新的依賴为流,已經(jīng)集成在resilience4j-spring-boot中了

application.yml配置

resilience4j:
 ratelimiter:
    configs:
      default:
        limitForPeriod: 5
        limitRefreshPeriod: 1s
        timeoutDuration: 5s
    instances:
      backendA:
        baseConfig: default
        limitForPeriod: 1
      backendB:
        baseConfig: default
        timeoutDuration: 0s

用于監(jiān)控RateLimiter狀態(tài)及事件的工具類

同樣為了監(jiān)控RateLimiter組件,寫一個(gè)工具類:

@Log4j2
public class RateLimiterUtil {

    /**
     * @Description: 獲取rateLimiter的狀態(tài)
     */
    public static void getRateLimiterStatus(String time, RateLimiter rateLimiter){
        RateLimiter.Metrics metrics = rateLimiter.getMetrics();
        // Returns the number of availablePermissions in this duration.
        int availablePermissions =  metrics.getAvailablePermissions();
        // Returns the number of WaitingThreads
        int numberOfWaitingThreads = metrics.getNumberOfWaitingThreads();

        log.info(time  + ", metrics[ availablePermissions=" + availablePermissions +
                ", numberOfWaitingThreads=" + numberOfWaitingThreads + " ]");
    }

    /**
     * @Description: 監(jiān)聽rateLimiter事件
     */
    public static void addRateLimiterListener(RateLimiter rateLimiter){
        rateLimiter.getEventPublisher()
                .onSuccess(event -> log.info(event.toString()))
                .onFailure(event -> log.info(event.toString()));
    }
}

調(diào)用方法

還是以之前查詢用戶列表的服務(wù)為例让簿。RateLimiter支持AOP和程序式兩種方式的調(diào)用敬察。

程序式的調(diào)用方法

調(diào)用方法都類似,裝飾方法之后用Try.of().recover()來執(zhí)行:

public class RateLimiterServiceImpl {

    @Autowired
    private RemoteServiceConnector remoteServiceConnector;

    @Autowired
    private RateLimiterRegistry rateLimiterRegistry;
    
    public List<User> ratelimiterNotAOP(){
        // 通過注冊器獲得RateLimiter實(shí)例
        RateLimiter rateLimiter = rateLimiterRegistry.rateLimiter("backendA");
        RateLimiterUtil.getRateLimiterStatus("開始執(zhí)行前: ", rateLimiter);
        // 通過Try.of().recover()調(diào)用裝飾后的服務(wù)
        Try<List<User>> result = Try.of(
            Bulkhead.decorateCheckedSupplier(rateLimiter, remoteServiceConnector::process))
            .recover(BulkheadFullException.class, throwable -> {
                log.info("服務(wù)失敗: " + throwable.getLocalizedMessage());
                return new ArrayList();
            });
        RateLimiterUtil.getRateLimiterStatus("執(zhí)行結(jié)束: ", rateLimiter);
        return result.get();
    }
}

AOP式的調(diào)用方法

首先在連接器方法上使用@RateLimiter(name="", fallbackMethod="")注解拜英,其中name是要使用的RateLimiter實(shí)例的名稱静汤,fallbackMethod是要使用的降級(jí)方法:

public RemoteServiceConnector{
    
    @RateLimiter(name = "backendA", fallbackMethod = "fallback")
    public List<User> process() throws TimeoutException, InterruptedException {
        List<User> users;
        users = remoteServic.process();
        return users;
    }
    
    private List<User> fallback(BulkheadFullException e){
        log.info("服務(wù)失敗: " + e.getLocalizedMessage());
        return new ArrayList();
    }
} 

如果RetryCircuitBreaker居凶、Bulkhead虫给、RateLimiter同時(shí)注解在方法上,默認(rèn)的順序是Retry>CircuitBreaker>RateLimiter>Bulkhead侠碧,即先控制并發(fā)再限流然后熔斷最后重試

接下來直接調(diào)用方法:

public class RateLimiterServiceImpl {
    
    @Autowired
    private RemoteServiceConnector remoteServiceConnector;
    
    @Autowired
    private RateLimiterRegistry rateLimiterRegistry;

    public List<User> rateLimiterAOP() throws TimeoutException, InterruptedException {
        List<User> result = remoteServiceConnector.process();
        BulkheadUtil.getBulkheadStatus("執(zhí)行結(jié)束:", rateLimiterRegistry.retry("backendA"));
        return result;
    }
}

使用測試

application.yml文件中將backenA設(shè)定為20s只能處理1個(gè)請求抹估,為便于觀察,刷新時(shí)間設(shè)定為20s弄兜,等待時(shí)間設(shè)定為5s

configs:
      default:
        limitForPeriod: 5
        limitRefreshPeriod: 20s
        timeoutDuration: 5s
    instances:
      backendA:
        baseConfig: default
        limitForPeriod: 1

使用另一個(gè)遠(yuǎn)程服務(wù)接口的實(shí)現(xiàn)药蜻,不拋出異常,當(dāng)做正常服務(wù)進(jìn)行替饿,為了讓結(jié)果明顯一些语泽,讓方法sleep 5秒:

public class RemoteServiceImpl implements RemoteService {

    private static AtomicInteger count = new AtomicInteger(0);

    public List<User> process() throws InterruptedException  {
        int num = count.getAndIncrement();
        log.info("count的值 = " + num);
        Thread.sleep(5000);
        log.info("服務(wù)正常運(yùn)行,獲取用戶列表");
        // 模擬數(shù)據(jù)庫正常查詢
        return repository.findAll();
    }
}

用線程池調(diào)5個(gè)線程去請求服務(wù):

public class RateLimiterServiceImplTest{
    
    @Autowired
    private RateLimiterServiceImpl rateLimiterService;
    
    @Autowired
    private RateLimiterRegistry rateLimiterRegistry;
    
    @Test
    public void rateLimiterTest() {
        RateLimiterUtil.addRateLimiterListener(rateLimiterRegistry.rateLimiter("backendA"));
        ExecutorService pool = Executors.newCachedThreadPool();
        for (int i=0; i<5; i++){
            pool.submit(() -> {
                // rateLimiterService.rateLimiterAOP();
                rateLimiterService.rateLimiterNotAOP();
            });
        }
        pool.shutdown();

        while (!pool.isTerminated());
        }
    }
}

看一下測試結(jié)果:

開始執(zhí)行前: , metrics[ availablePermissions=1, numberOfWaitingThreads=0 ]
開始執(zhí)行前: , metrics[ availablePermissions=1, numberOfWaitingThreads=0 ]
開始執(zhí)行前: , metrics[ availablePermissions=1, numberOfWaitingThreads=0 ]
開始執(zhí)行前: , metrics[ availablePermissions=1, numberOfWaitingThreads=0 ]
開始執(zhí)行前: , metrics[ availablePermissions=0, numberOfWaitingThreads=0 ]
RateLimiterEvent{type=SUCCESSFUL_ACQUIRE, rateLimiterName='backendA', creationTime=2019-07-10T17:06:15.735+08:00[Asia/Shanghai]}
count的值 = 0
RateLimiterEvent{type=FAILED_ACQUIRE, rateLimiterName='backendA', creationTime=2019-07-10T17:06:20.737+08:00[Asia/Shanghai]}
RateLimiterEvent{type=FAILED_ACQUIRE, rateLimiterName='backendA', creationTime=2019-07-10T17:06:20.739+08:00[Asia/Shanghai]}
RateLimiterEvent{type=FAILED_ACQUIRE, rateLimiterName='backendA', creationTime=2019-07-10T17:06:20.740+08:00[Asia/Shanghai]}
服務(wù)失敗: RateLimiter 'backendA' does not permit further calls
服務(wù)失敗: RateLimiter 'backendA' does not permit further calls
執(zhí)行結(jié)束: , metrics[ availablePermissions=0, numberOfWaitingThreads=1 ]
執(zhí)行結(jié)束: , metrics[ availablePermissions=0, numberOfWaitingThreads=1 ]
RateLimiterEvent{type=FAILED_ACQUIRE, rateLimiterName='backendA', creationTime=2019-07-10T17:06:20.745+08:00[Asia/Shanghai]}
服務(wù)正常運(yùn)行视卢,獲取用戶列表
服務(wù)失敗: RateLimiter 'backendA' does not permit further calls
執(zhí)行結(jié)束: , metrics[ availablePermissions=0, numberOfWaitingThreads=0 ]
服務(wù)失敗: RateLimiter 'backendA' does not permit further calls
執(zhí)行結(jié)束: , metrics[ availablePermissions=0, numberOfWaitingThreads=0 ]
執(zhí)行結(jié)束: , metrics[ availablePermissions=1, numberOfWaitingThreads=0 ]

只有一個(gè)服務(wù)調(diào)用成功踱卵,其他都執(zhí)行失敗了。現(xiàn)在我們把刷新時(shí)間調(diào)成1s

configs:
      default:
        limitForPeriod: 5
        limitRefreshPeriod: 1s
        timeoutDuration: 5s
    instances:
      backendA:
        baseConfig: default
        limitForPeriod: 1

重新執(zhí)行据过,結(jié)果如下:

開始執(zhí)行前: , metrics[ availablePermissions=2, numberOfWaitingThreads=0 ]
開始執(zhí)行前: , metrics[ availablePermissions=2, numberOfWaitingThreads=0 ]
開始執(zhí)行前: , metrics[ availablePermissions=2, numberOfWaitingThreads=0 ]
開始執(zhí)行前: , metrics[ availablePermissions=2, numberOfWaitingThreads=0 ]
開始執(zhí)行前: , metrics[ availablePermissions=2, numberOfWaitingThreads=0 ]
RateLimiterEvent{type=SUCCESSFUL_ACQUIRE, rateLimiterName='backendA', creationTime=2019-07-10T18:25:18.894+08:00[Asia/Shanghai]}
 count的值 = 0
RateLimiterEvent{type=SUCCESSFUL_ACQUIRE, rateLimiterName='backendA', creationTime=2019-07-10T18:25:18.894+08:00[Asia/Shanghai]}
count的值 = 1
RateLimiterEvent{type=SUCCESSFUL_ACQUIRE, rateLimiterName='backendA', creationTime=2019-07-10T18:25:19.706+08:00[Asia/Shanghai]}
count的值 = 2
RateLimiterEvent{type=SUCCESSFUL_ACQUIRE, rateLimiterName='backendA', creationTime=2019-07-10T18:25:19.706+08:00[Asia/Shanghai]}
count的值 = 3
RateLimiterEvent{type=SUCCESSFUL_ACQUIRE, rateLimiterName='backendA', creationTime=2019-07-10T18:25:20.703+08:00[Asia/Shanghai]}
count的值 = 4
服務(wù)正常運(yùn)行惋砂,獲取用戶列表
服務(wù)正常運(yùn)行,獲取用戶列表
服務(wù)正常運(yùn)行绳锅,獲取用戶列表
服務(wù)正常運(yùn)行西饵,獲取用戶列表
執(zhí)行結(jié)束: , metrics[ availablePermissions=2, numberOfWaitingThreads=0 ]
執(zhí)行結(jié)束: , metrics[ availablePermissions=2, numberOfWaitingThreads=0 ]
執(zhí)行結(jié)束: , metrics[ availablePermissions=2, numberOfWaitingThreads=0 ]
 執(zhí)行結(jié)束: , metrics[ availablePermissions=2, numberOfWaitingThreads=0 ]
服務(wù)正常運(yùn)行,獲取用戶列表
執(zhí)行結(jié)束: , metrics[ availablePermissions=2, numberOfWaitingThreads=0 ]

可以看出鳞芙,幾個(gè)服務(wù)都被放入并正常執(zhí)行了眷柔,即使上個(gè)服務(wù)還沒完成期虾,依然可以放入,只與時(shí)間有關(guān)驯嘱,而與線程無關(guān)彻消。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市宙拉,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌丙笋,老刑警劉巖谢澈,帶你破解...
    沈念sama閱讀 207,113評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異御板,居然都是意外死亡锥忿,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,644評(píng)論 2 381
  • 文/潘曉璐 我一進(jìn)店門怠肋,熙熙樓的掌柜王于貴愁眉苦臉地迎上來敬鬓,“玉大人,你說我怎么就攤上這事笙各《ご穑” “怎么了?”我有些...
    開封第一講書人閱讀 153,340評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵杈抢,是天一觀的道長数尿。 經(jīng)常有香客問我,道長惶楼,這世上最難降的妖魔是什么右蹦? 我笑而不...
    開封第一講書人閱讀 55,449評(píng)論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮歼捐,結(jié)果婚禮上何陆,老公的妹妹穿的比我還像新娘。我一直安慰自己豹储,他們只是感情好贷盲,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,445評(píng)論 5 374
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著颂翼,像睡著了一般晃洒。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上朦乏,一...
    開封第一講書人閱讀 49,166評(píng)論 1 284
  • 那天球及,我揣著相機(jī)與錄音,去河邊找鬼呻疹。 笑死吃引,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播镊尺,決...
    沈念sama閱讀 38,442評(píng)論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼朦佩,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了庐氮?” 一聲冷哼從身側(cè)響起语稠,我...
    開封第一講書人閱讀 37,105評(píng)論 0 261
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎弄砍,沒想到半個(gè)月后仙畦,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,601評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡音婶,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,066評(píng)論 2 325
  • 正文 我和宋清朗相戀三年慨畸,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片衣式。...
    茶點(diǎn)故事閱讀 38,161評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡寸士,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出碴卧,到底是詐尸還是另有隱情弱卡,我是刑警寧澤,帶...
    沈念sama閱讀 33,792評(píng)論 4 323
  • 正文 年R本政府宣布住册,位于F島的核電站谐宙,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏界弧。R本人自食惡果不足惜凡蜻,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,351評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望垢箕。 院中可真熱鬧划栓,春花似錦、人聲如沸条获。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,352評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽帅掘。三九已至委煤,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間修档,已是汗流浹背碧绞。 一陣腳步聲響...
    開封第一講書人閱讀 31,584評(píng)論 1 261
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留吱窝,地道東北人讥邻。 一個(gè)月前我還...
    沈念sama閱讀 45,618評(píng)論 2 355
  • 正文 我出身青樓迫靖,卻偏偏與公主長得像,于是被迫代替她去往敵國和親兴使。 傳聞我的和親對象是個(gè)殘疾皇子系宜,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,916評(píng)論 2 344