Hystrix源碼解析

Hystrix的用途以及使用場景就不在這里贅述了耗绿,這里只關(guān)注源碼級別的實現(xiàn)原理杨箭。

1膝蜈、AbstractCommand鹅心、HystrixCommand吕粗、HystrixObservableCommand

1.1 AbstractCommand

Hystrix會把對服務(wù)的請求封裝成命令并通過執(zhí)行命令的方式進行請求調(diào)用,抽象類AbstractCommand即是這些命令的核心父類接口旭愧,此接口完成了大部分的Hystrix的邏輯實現(xiàn)颅筋。

1.1.1 核心屬性
  • HystrixCircuitBreaker circuitBreaker
    Hystrix在執(zhí)行具體的調(diào)用操作之前會調(diào)用HystrixCircuitBreaker的相關(guān)方法判斷熔斷器是否打開,如果熔斷器已經(jīng)打開則直接fallback快速失敗输枯,否則才會進行實際的調(diào)用议泵。下面的章節(jié)會詳細(xì)介紹HystrixCircuitBreaker接口。

  • HystrixThreadPool threadPool
    Hystrix線程池桃熄,Hystrix命令執(zhí)行隔離策略有兩種:線程池和信號量先口。默認(rèn)是線程池,即命令會異步運行在線程池中瞳收。信號量則使用原有的線程碉京。使用線程池有一定的延遲,但是可以有較高的吞吐量螟深。

  • HystrixCommandKey commandKey
    命令唯一標(biāo)識谐宙,Hystrix會為每一個命令緩存一個HystrixCircuitBreaker對象以及一個HystrixCommandMetrics對象,這些對象被緩存在的靜態(tài)屬性Map中血崭,命令通過HystrixCommandKey標(biāo)識其唯一性卧惜,在使用Feign接口時,命令唯一標(biāo)識即為方法簽名夹纫。

  • HystrixCommandGroupKey commandGroup
    命令組唯一標(biāo)識咽瓷,每一個依賴服務(wù)對應(yīng)一個命令組,每一個命令組對應(yīng)一個線程池舰讹,例如茅姜,某應(yīng)用依賴了S1和S2兩個服務(wù),則會有兩個線程池,服務(wù)的調(diào)用運行在自己的線程池中钻洒,在使用Feign接口時奋姿,命令組唯一標(biāo)識即為服務(wù)的名稱。

  • HystrixThreadPoolKey threadPoolKey
    線程池唯一標(biāo)識素标,和命令組唯一標(biāo)識保持一致称诗。

  • HystrixThreadPool threadPool
    用于執(zhí)行Hystrix命令的線程池。此線程池是服務(wù)級別的头遭,即每一個服務(wù)對應(yīng)一個線程池寓免,那么對這個服務(wù)的所有調(diào)用請求都會在此線程池中執(zhí)行。

  • HystrixCommandMetrics metrics
    命令執(zhí)行情況的統(tǒng)計度量计维,Hystrix通過HystrixCommandMetrics記錄和統(tǒng)計命令執(zhí)行的總次數(shù)袜香、失敗次數(shù)等信息。

  • ExecutionResult executionResult
    命令執(zhí)行結(jié)果鲫惶,命令執(zhí)行過程中會不斷的把執(zhí)行情況匯總至ExecutionResult蜈首,當(dāng)命令完成后會把ExecutionResult交給HystrixCommandMetrics做統(tǒng)計之用。

1.1.2 核心方法
  • Observable<R> toObservable()
    注冊命令執(zhí)行完成事件以及觸發(fā)事件源欠母。此方法返回一個Cold Observable對象欢策,即在沒有訂閱者時不進行事件發(fā)布,而是等待艺蝴,直到有訂閱者時才發(fā)布事件猬腰。

  • Observable<R> observable()
    注冊命令執(zhí)行完成事件以及觸發(fā)事件源。此方法返回一個Hot Observable對象猜敢,即不管是否有訂閱者都會發(fā)布事件姑荷,這種情況下有可能出現(xiàn)訂閱者只觀察到整個過程的一部分的現(xiàn)象。

  • Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd)
    執(zhí)行熔斷語義缩擂,是熔斷判斷的核心方法鼠冕,Hystrix首先會從緩存中獲取結(jié)果,如果從緩存中獲取結(jié)果失敗則調(diào)用此方法執(zhí)行熔斷語義胯盯,此方法首先調(diào)用HystrixCircuitBreaker#allowRequest()判斷是否允許請求懈费,如果不允許則直接調(diào)用handleShortCircuitViaFallback()方法執(zhí)行快速失敗的邏輯,如果允許則判斷信號量資源是否能獲取博脑,如果獲取失敗則調(diào)用handleSemaphoreRejectionViaFallback()方法執(zhí)行信號量資源拒絕快速失敗邏輯憎乙,如果資源獲取成功則執(zhí)行命令。其源碼如下:

    private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
        // 熔斷器是否允許請求
        if (circuitBreaker.allowRequest()) {
            final TryableSemaphore executionSemaphore = getExecutionSemaphore();
            final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
            final Action0 singleSemaphoreRelease = new Action0() {
                @Override
                public void call() {
                    if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
                        executionSemaphore.release();
                    }
                }
            };
    
            final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
                @Override
                public void call(Throwable t) {
                    eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
                }
            };
            // 信號量獲取資源是否成功
            if (executionSemaphore.tryAcquire()) {
                try {
                    /* used to track userThreadExecutionTime */
                    executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                    // 成功則執(zhí)行命令
                    return executeCommandAndObserve(_cmd)
                            .doOnError(markExceptionThrown)
                            .doOnTerminate(singleSemaphoreRelease)
                            .doOnUnsubscribe(singleSemaphoreRelease);
                } catch (RuntimeException e) {
                    return Observable.error(e);
                }
            } else {
                // 執(zhí)行信號量獲取資源拒絕快速失敗邏輯
                return handleSemaphoreRejectionViaFallback();
            }
        } else {
            // 執(zhí)行熔斷器開啟是的快速失敗邏輯
            return handleShortCircuitViaFallback();
        }
    }
    
  • Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd)
    注冊熔斷相關(guān)事件叉趣,用于收集執(zhí)行結(jié)果情況泞边,執(zhí)行結(jié)果情況記錄在ExecutionResult中。

  • Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd)
    此方法會判斷是需要在線程池中執(zhí)行命令還是使用原有線程執(zhí)行命令疗杉,如果是前者則注冊線程完成相關(guān)的事件阵谚。

  • void handleCommandEnd(boolean commandExecutionStarted)
    命令執(zhí)行結(jié)束會調(diào)用此方法,此方法會將ExecutionResult對象交由HystrixCommandMetrics做執(zhí)行情況的統(tǒng)計。

  • Observable<R> handleSemaphoreRejectionViaFallback()
    信號量資源拒絕時的快速失敗邏輯實現(xiàn)梢什,最終會執(zhí)行用戶指定的fallback邏輯奠蹬。

  • Observable<R> handleShortCircuitViaFallback()
    熔斷器打開時的快速失敗邏輯實現(xiàn),最終會執(zhí)行用戶指定的fallback邏輯嗡午。

1.2. HystrixCommand囤躁、HystrixObservableCommand

Hystrix的兩個核心命令,在使用Hystrix時第一步是創(chuàng)建命令翼馆,那么這里說的命令即是HystrixCommand和HystrixObservableCommand或者是他們的子類割以。

1.2.1 HystrixCommand

AbstractCommand的子類金度,用于返回一個單一的結(jié)果应媚,同時提供同步和異步兩種方式執(zhí)行命令:

  1. Future<R> queue() 異步方式執(zhí)行命令。
  2. public R execute() 同步方式執(zhí)行命令猜极。

HystrixCommand是一個抽象類中姜,其中run()方法為抽象方法,在使用時一般需要實現(xiàn)run()和getFallback()兩個方法跟伏,如下:

  • protected abstract R run() throws Exception
  • protected R getFallback()

這兩個方法分別對應(yīng)正常的結(jié)果返回和快速失敗的結(jié)果返回丢胚,在調(diào)用queue()和execute()時會回調(diào)到這兩個方法。

1.2.2 HystrixObservableCommand

AbstractCommand的子類受扳,用于返回多個結(jié)果携龟,同時提供Hot Observable和Cold Observable兩種執(zhí)行方式:

  1. Observable<R> observable() 返回一個Hot Observable。
  2. Observable<R> toObservable() 返回一個Cold Observable勘高。

值得注意的是這兩個方法并不是HystrixObservableCommand中的峡蟋,而是其父類AbstractCommand中的方法。

在使用HystrixObservableCommand時需要實現(xiàn)construct()和resumeWithFallback()兩個方法:

  • protected Observable<String> construct()
  • protected Observable<String> resumeWithFallback()

這兩個方法分別返回正常的Observable以及快速失敗的Observable华望。在調(diào)用observable()和toObservable()時會回調(diào)到這兩個方法蕊蝗。

所謂返回多個結(jié)果,是說可以通過Observable的相關(guān)方法獲取到多個結(jié)果的迭代器赖舟,如下是一個使用HystrixObservableCommand的小例子:

package com.zws.cloud.consumer;

import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixObservableCommand;
import rx.Emitter;
import rx.Observable;

import java.util.Iterator;

/**
 * @Author wensh.zhu
 * @Date 2019/5/7 16:55
 */
public class HystrixObservableCommandTest {

    public static void main(String[] args) {
        HystrixObservableCommandHello commandHello = new HystrixObservableCommandHello();
        
        // 得到結(jié)果的迭代器
        Iterator<String> iterator = commandHello.observe().toBlocking().getIterator();

        while (iterator.hasNext()) {
            System.out.println(iterator.next());
        }
    }
}

class HystrixObservableCommandHello extends HystrixObservableCommand<String> {

    public HystrixObservableCommandHello() {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("TestGroup")).andCommandKey(HystrixCommandKey.Factory.asKey("Hello")));
    }

    @Override
    protected Observable<String> construct() {
        return Observable.<String>create(emitter -> {
            emitter.onNext("abc");
            
            /** 
             // 模擬異常發(fā)生
            String name = null;
            System.out.println(name.length());
             **/

            emitter.onNext("efg");

            emitter.onCompleted();
        }, Emitter.BackpressureMode.NONE).doOnError(throwable -> System.out.println("哎呀蓬戚,觀察到異常啦"));
    }

    @Override
    protected Observable<String> resumeWithFallback() {
        return Observable.create(emitter -> {
            emitter.onNext("error");

            emitter.onNext("ERROR");

            emitter.onCompleted();
        }, Emitter.BackpressureMode.NONE);
    }
}

2、HystrixCircuitBreaker

HystrixCircuitBreaker是Hystrix的核心接口宾抓,其實現(xiàn)類為HystrixCircuitBreaker.HystrixCircuitBreakerImpl子漩,重要方法如下:

  • boolean isOpen()
    熔斷器是否打開,類HystrixCircuitBreakerImpl中有一類型為AtomicBoolean的屬性circuitOpen石洗,表示熔斷器是否打開幢泼。此方法首先判斷circuitOpen是否為true,如果為true則直接返回true劲腿。否則判斷請求次數(shù)是否達到熔斷請求次數(shù)閾值旭绒,此閾值默認(rèn)為20,如果沒有達到則直接返回false,如果請求次數(shù)大于等于20挥吵,則判斷這些請求中失敗的請求次數(shù)占比是否小于50%(默認(rèn))重父,如果小于則返回false,否則將circuitOpen設(shè)置為true忽匈,并更新熔斷時間戳房午,熔斷時間戳記錄在類HystrixCircuitBreakerImpl的屬性circuitOpenedOrLastTestedTime中,類型為AtomicLong丹允。所以綜上可以得出熔斷器打開需要滿足兩個條件:

    • 請求次數(shù)大于等于20(默認(rèn))
    • 失敗請求次數(shù)占比大于等于50%(默認(rèn))
  • boolean allowSingleTest()
    熔斷器是否處于半打開狀態(tài)郭厌。此方法的邏輯是:如果熔斷器沒有打開(即circuitOpen為false)則直接返回false,如果熔斷器已經(jīng)打開則判斷當(dāng)前時間戳是否大于熔斷器打開的時間戳+熔斷窗口時間(默認(rèn)5秒)雕蔽,如果是則更新熔斷器打開的時間戳為當(dāng)前時間并返回true折柠。也就是說:

    1. 熔斷器打開后需要等待至少5秒才可以再次嘗試請求。
    2. 如果此次嘗試失敗則至少再等待5秒才可以嘗試批狐。
  • void markSuccess()
    此方法會在熔斷器打開的狀態(tài)下:

    1. 關(guān)閉熔斷器
      即把circuitOpen設(shè)置為false扇售。
    2. 并將統(tǒng)計數(shù)據(jù)置零
      調(diào)用HystrixCommandMetrics的resetStream()方法。
  • boolean allowRequest()
    是否允許請求嚣艇,Hystrix在執(zhí)行具體的調(diào)用之前通過調(diào)用此方法判斷是否允許請求承冰,如果不允許則直接fullback快速失敗。此方法調(diào)用isOpen()和allowSingleTest()返回熔斷器是否關(guān)閉或半開食零,具體邏輯:首先判斷調(diào)用isOpen()方法判斷熔斷器是否打開困乒,如果熔斷器打開則直接返回false,否則調(diào)用allowSingleTest()方法判斷是否允許嘗試請求贰谣,如果是則返回true娜搂,否則返回false。

    HystrixCircuitBreakerImpl源碼如下:

    static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
        private final HystrixCommandProperties properties;
        private final HystrixCommandMetrics metrics;
    
        // 保存熔斷器狀態(tài):開啟or關(guān)閉
        private AtomicBoolean circuitOpen = new AtomicBoolean(false);
        
        // 保存熔斷器開啟或最后更新時間戳
        private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong();
    
        protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
            this.properties = properties;
            this.metrics = metrics;
        }
    
        public void markSuccess() {
            if (circuitOpen.get()) {
                // 熔斷器關(guān)閉并重置統(tǒng)計數(shù)據(jù)
                if (circuitOpen.compareAndSet(true, false)) {
                    metrics.resetStream();
                }
            }
        }
    
        @Override
        public boolean allowRequest() {
            if (properties.circuitBreakerForceOpen().get()) {
                return false;
            }
            if (properties.circuitBreakerForceClosed().get()) {
                isOpen();
                return true;
            }
            // 熔斷器是否關(guān)閉或者半開
            return !isOpen() || allowSingleTest();
        }
    
        public boolean allowSingleTest() {
            long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
            // 熔斷器開啟且開啟時長大于等于5秒則為半開狀態(tài)
            if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
                // 更新熔斷器開啟時間戳
                if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
                    return true;
                }
            }
            return false;
        }
    
        @Override
        public boolean isOpen() {
            if (circuitOpen.get()) {
                return true;
            }
    
            HealthCounts health = metrics.getHealthCounts();
            // 請求次數(shù)小于20直接返回false
            if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
                return false;
            }
    
            if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
                return false;
            } else {
                // 請求失敗次數(shù)占比大于等于50%開啟熔斷器冈爹、記錄開啟時間戳
                if (circuitOpen.compareAndSet(false, true)) {
                    circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
                    return true;
                } else {
                    return true;
                }
            }
        }
    }
    

熔斷器對象是命令級別的守呜,也就是說每一個命令都會對應(yīng)一個HystrixCircuitBreaker對象椿访,這些HystrixCircuitBreaker對象被緩存在類HystrixCircuitBreaker.Factory的屬性circuitBreakersByCommand中听系,如下:

private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();

3术浪、HystrixCommandMetrics、HealthCountsStream憋肖、HystrixEventStream因痛、HystrixCommandMetrics.HealthCounts

這些都是和Hystrix的命令執(zhí)行情況統(tǒng)計相關(guān)的類,這些統(tǒng)計信息直接影響到Hystrix的熔斷斷判岸更。

3.1 HystrixCommandMetrics.HealthCounts

此類保存了命令執(zhí)行的總次數(shù)鸵膏、失敗次數(shù)以及失敗次數(shù)的占比,plus()方法則用于更新這些數(shù)據(jù)怎炊,HystrixCircuitBreaker就是根據(jù)此類的統(tǒng)計信息做熔斷器的打開和半開判斷的谭企,源碼如下:

public static class HealthCounts {
    private final long totalCount;
    private final long errorCount;
    private final int errorPercentage;

    HealthCounts(long total, long error) {
        this.totalCount = total;
        this.errorCount = error;
        if (totalCount > 0) {
            this.errorPercentage = (int) ((double) errorCount / totalCount * 100);
        } else {
            this.errorPercentage = 0;
        }
    }
    // 略
    
    public HealthCounts plus(long[] eventTypeCounts) {
        long updatedTotalCount = totalCount;
        long updatedErrorCount = errorCount;

        long successCount = eventTypeCounts[HystrixEventType.SUCCESS.ordinal()];
        long failureCount = eventTypeCounts[HystrixEventType.FAILURE.ordinal()];
        long timeoutCount = eventTypeCounts[HystrixEventType.TIMEOUT.ordinal()];
        long threadPoolRejectedCount = eventTypeCounts[HystrixEventType.THREAD_POOL_REJECTED.ordinal()];
        long semaphoreRejectedCount = eventTypeCounts[HystrixEventType.SEMAPHORE_REJECTED.ordinal()];

        updatedTotalCount += (successCount + failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
        updatedErrorCount += (failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
        return new HealthCounts(updatedTotalCount, updatedErrorCount);
    }
}

3.2 HealthCountsStream

從上面我們知道HystrixCommandMetrics.HealthCounts保存了命令執(zhí)行的總次數(shù)廓译、失敗次數(shù),那么這些統(tǒng)計數(shù)字是怎么來的呢债查?答案就是HealthCountsStream類非区,首先此類的父類BucketedCounterStream中有一個BehaviorSubject<Output>類型的屬性counterSubject,Output是泛型盹廷,這里即是HystrixCommandMetrics.HealthCounts征绸。HealthCountsStream保存了一個回調(diào)函數(shù),用于通過調(diào)用HystrixCommandMetrics.HealthCounts的plus()方法累計命令執(zhí)行的總次數(shù)和失敗次數(shù)俄占。每當(dāng)HealthCountsStream接受到事件時都會回調(diào)此函數(shù)管怠。那么事件時在哪里觸發(fā)的呢?答案是HystrixEventStream缸榄。

3.3 HystrixEventStream

HystrixEventStream用于發(fā)布命令相關(guān)事件渤弛,其實現(xiàn)類有HystrixCommandCompletionStream、HystrixThreadPoolStartStream碰凶、HystrixCollapserEventStream等暮芭,每一個實現(xiàn)類都有一個write()方法,此方法發(fā)布一種命令相關(guān)事件欲低,例如:HystrixCommandCompletionStream負(fù)責(zé)發(fā)布命令完成事件,HystrixThreadPoolStartStream發(fā)布線程池開始事件畜晰。這些實現(xiàn)類發(fā)布的事件會被上面說的類HealthCountsStream中的一個回調(diào)函數(shù)監(jiān)聽到砾莱。

3.4 HystrixCommandMetrics

此類用于重置統(tǒng)計信息以及接受命令執(zhí)行完成和開始的通知。當(dāng)命令開始執(zhí)行和執(zhí)行結(jié)束時分別調(diào)用此類的markCommandStart()方法和markCommandDone()方法凄鼻,這兩個方法分別通過HystrixCommandStartStream和HystrixCommandCompletionStream發(fā)布命令開始和結(jié)束事件腊瑟。另外HystrixCircuitBreaker的markSuccess()方法被調(diào)用時會調(diào)用HystrixCommandMetrics#resetStream()方法,此方法會刪除緩存中命令對應(yīng)的HystrixCommandMetrics并創(chuàng)建一個新的HystrixCommandMetrics块蚌。相關(guān)源碼如下:

void markCommandStart(HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties.ExecutionIsolationStrategy isolationStrategy) {
    int currentCount = concurrentExecutionCount.incrementAndGet();
    HystrixThreadEventStream.getInstance().commandExecutionStarted(commandKey, threadPoolKey, isolationStrategy, currentCount);
}

void markCommandDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, boolean executionStarted) {
    HystrixThreadEventStream.getInstance().executionDone(executionResult, commandKey, threadPoolKey);
    if (executionStarted) {
        concurrentExecutionCount.decrementAndGet();
    }
}

synchronized void resetStream() {
    healthCountsStream.unsubscribe();
    HealthCountsStream.removeByKey(key);
    healthCountsStream = HealthCountsStream.getInstance(key, properties);
}

4闰非、Feign與Hystrix

4.1 Hystrix依賴與配置

在我們使用Spring Cloud時,更多是在使用Feign接口時使用Hystrix峭范,那么在Feign接口中使用Hystrix需要那些配置呢财松?很簡單:

  1. 引入Hystrix依賴,如下:
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
    </dependency>
    
  2. 配置項eign.hystrix.enabled設(shè)置為true纱控。
    feign:
      hystrix:
        enabled: true
    

4.2 HystrixInvocationHandler

當(dāng)配置項eign.hystrix.enabled設(shè)置為true時辆毡,配置類FeignClientsConfiguration的內(nèi)部靜態(tài)類HystrixFeignConfiguration會向Spring上下文中注入類型為HystrixFeign.Builder的Bean,如下:

@Configuration
@ConditionalOnClass({ HystrixCommand.class, HystrixFeign.class })
protected static class HystrixFeignConfiguration {
    @Bean
    @Scope("prototype")
    @ConditionalOnMissingBean
    @ConditionalOnProperty(name = "feign.hystrix.enabled")
    public Feign.Builder feignHystrixBuilder() {
        return HystrixFeign.builder();
    }
}

public final class HystrixFeign {

  public static Builder builder() {
    return new Builder();
  }

  public static final class Builder extends Feign.Builder {
      Feign build(final FallbackFactory<?> nullableFallbackFactory) {
        super.invocationHandlerFactory(new InvocationHandlerFactory() {
            @Override
            public InvocationHandler create(Target target,
                                            Map<Method, MethodHandler> dispatch) {
              // 這里返回HystrixInvocationHandler對象
              return new HystrixInvocationHandler(target, dispatch, setterFactory,
                  nullableFallbackFactory);
            }
         });
        super.contract(new HystrixDelegatingContract(contract));
        return super.build();
      }
      
      // 略
  }
}

HystrixFeign.builder()方法返回HystrixFeign的內(nèi)部類HystrixFeign.Builder甜害,當(dāng)為Feign接口生成代理對象時舶掖,HystrixFeign.Builder類的build()方法被調(diào)用,此方法會創(chuàng)建執(zhí)行處理器HystrixInvocationHandler尔店,當(dāng)Feign接口執(zhí)行時就會調(diào)用HystrixInvocationHandler的invoke()方法眨攘,那么接下來就簡單了主慰,invoke()方法創(chuàng)建HystrixCommand對象并執(zhí)行其execute()方法,如下:

public Object invoke(final Object proxy, final Method method, final Object[] args)
    throws Throwable {
  // 略

  HystrixCommand<Object> hystrixCommand =
      new HystrixCommand<Object>(setterMethodMap.get(method)) {
        @Override
        protected Object run() throws Exception {
          try {
            return HystrixInvocationHandler.this.dispatch.get(method).invoke(args);
          } catch (Exception e) {
            throw e;
          } catch (Throwable t) {
            throw (Error) t;
          }
        }

        @Override
        protected Object getFallback() {
          if (fallbackFactory == null) {
            return super.getFallback();
          }
          try {
            Object fallback = fallbackFactory.create(getExecutionException());
            Object result = fallbackMethodMap.get(method).invoke(fallback, args);
            if (isReturnsHystrixCommand(method)) {
              return ((HystrixCommand) result).execute();
            } else if (isReturnsObservable(method)) {
              // Create a cold Observable
              return ((Observable) result).toBlocking().first();
            } else if (isReturnsSingle(method)) {
              // Create a cold Observable as a Single
              return ((Single) result).toObservable().toBlocking().first();
            } else if (isReturnsCompletable(method)) {
              ((Completable) result).await();
              return null;
            } else {
              return result;
            }
          } catch (IllegalAccessException e) {
            throw new AssertionError(e);
          } catch (InvocationTargetException e) {
            throw new AssertionError(e.getCause());
          }
        }
      };

  if (Util.isDefault(method)) {
    return hystrixCommand.execute();
  } else if (isReturnsHystrixCommand(method)) {
    return hystrixCommand;
  } else if (isReturnsObservable(method)) {
    return hystrixCommand.toObservable();
  } else if (isReturnsSingle(method)) {
    return hystrixCommand.toObservable().toSingle();
  } else if (isReturnsCompletable(method)) {
    return hystrixCommand.toObservable().toCompletable();
  }
  return hystrixCommand.execute();
}

5 注解的方式使用Hystrix

當(dāng)不使用Feign接口時我們還可以使用注解@HystrixCommand和@HystrixCollapse的方式使用Hystrix鲫售。前者執(zhí)行單一命令河哑,后者則可以聚合多個請求最終以批量的方式執(zhí)行命令。

5.1 注解的配置與使用

  • 配置
    在SpringBoot啟動類添加@EnableCircuitBreaker注解龟虎。
  • 使用
    下面是一個使用注解的小例子:
    @Service
    public class HelloService {
    
        @Resource
        private RestTemplate restTemplate;
    
        @HystrixCommand(groupKey = "gTest", commandKey = "hi", fallbackMethod = "fallBack")
        public String hi() {
            return restTemplate.getForEntity("http://Eureka-Producer/hi", String.class).getBody();
        }
    
        public String fallBack() {
            return "熔斷發(fā)生";
        }
    
        @HystrixCollapser(batchMethod = "getMembers", collapserProperties = {
                // 收集1秒內(nèi)的請求
                @HystrixProperty(name = "timerDelayInMilliseconds", value = "1000")
        })
        public Future<String> getMember(Integer id) {
            System.out.println("執(zhí)行單個查詢的方法");
            return null;
        }
    
        @HystrixCommand
        public List<String> getMembers(List<Integer> ids) {
            System.out.println("執(zhí)行合并操作調(diào)用");
            List<String> mems = new ArrayList<>();
            for(Integer id : ids) {
                mems.add("name" + id);
            }
            return mems;
        }
    }
    
    @RestController
    public class HelloController {
    
        @Resource
        private HelloService helloService;
    
        @GetMapping("/hi")
        public String hi() {
            return helloService.hi();
        }
    
        @GetMapping("/coll")
        public String coll() throws ExecutionException, InterruptedException {
            HystrixRequestContext.initializeContext();
    
            // 這里模擬短時間內(nèi)3次調(diào)用
            Future<String> f1 = helloService.getMember(1);
            Future<String> f2 = helloService.getMember(2);
            Future<String> f3 = helloService.getMember(3);
            return f1.get() + ", " + f2.get() + ", " + f3.get();
        }
    
    }
    

5.2 注解的實現(xiàn)原理

@EnableCircuitBreaker注解會使配置類HystrixCircuitBreakerConfiguration會被加載璃谨,此配置類向Spring上下文中注入類一個切面Bean:HystrixCommandAspect,源碼如下:

@Configuration
public class HystrixCircuitBreakerConfiguration {

    @Bean
    public HystrixCommandAspect hystrixCommandAspect() {
        return new HystrixCommandAspect();
    }
    // 略
}

此切面會攔截標(biāo)記了@HystrixCommand和@HystrixCollapse的方法鲤妥,并根據(jù)不同注解以及相關(guān)配置選擇對應(yīng)的命令并執(zhí)行佳吞,如下:

@Aspect
public class HystrixCommandAspect {

    private static final Map<HystrixPointcutType, MetaHolderFactory> META_HOLDER_FACTORY_MAP;

    static {
        META_HOLDER_FACTORY_MAP = ImmutableMap.<HystrixPointcutType, MetaHolderFactory>builder()
                .put(HystrixPointcutType.COMMAND, new CommandMetaHolderFactory())
                .put(HystrixPointcutType.COLLAPSER, new CollapserMetaHolderFactory())
                .build();
    }

    @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")

    public void hystrixCommandAnnotationPointcut() {
    }

    @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
    public void hystrixCollapserAnnotationPointcut() {
    }

    @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
    public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
        Method method = getMethodFromTarget(joinPoint);
        Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
        if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
            throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
                    "annotations at the same time");
        }
        MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
        MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
        HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
        ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
                metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();

        Object result;
        try {
            if (!metaHolder.isObservable()) {
                result = CommandExecutor.execute(invokable, executionType, metaHolder);
            } else {
                result = executeObservable(invokable, executionType, metaHolder);
            }
        } catch (HystrixBadRequestException e) {
            throw e.getCause();
        } catch (HystrixRuntimeException e) {
            throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
        }
        return result;
    }
    // 略
}

public class HystrixCommandFactory {

    private static final HystrixCommandFactory INSTANCE = new HystrixCommandFactory();

    private HystrixCommandFactory() {

    }

    public static HystrixCommandFactory getInstance() {
        return INSTANCE;
    }

    public HystrixInvokable create(MetaHolder metaHolder) {
        HystrixInvokable executable;
        // 選擇創(chuàng)建對應(yīng)的命令
        if (metaHolder.isCollapserAnnotationPresent()) {
            executable = new CommandCollapser(metaHolder);
        } else if (metaHolder.isObservable()) {
            executable = new GenericObservableCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
        } else {
            executable = new GenericCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
        }
        return executable;
    }

    // 略
}

6、總結(jié)

Hystrix會把請求封裝成命令(HystrixCommand或HystrixObservableCommand)棉安,并以執(zhí)行命令的方式進行請求的調(diào)用底扳,整個過程如下:

  1. 創(chuàng)建命令HystrixCommand或者HystrixObservableCommand。
  2. 調(diào)用命令對應(yīng)的方法贡耽,HystrixCommand:execute()或queue()衷模,HystrixObservableCommand:toObservable()或observe()。
  3. 判斷緩存是否開啟緩存以及緩存中是否已經(jīng)緩存了請求結(jié)果蒲赂,如果有則直接返回阱冶。
  4. HystrixCircuitBreaker熔斷器是否允許請求,如果不允許則執(zhí)行快速失敗邏輯滥嘴。
  5. 信號量/線程池資源獲取是否成功木蹬,如果失敗則執(zhí)行快速失敗邏輯。
  6. 執(zhí)行run()方法或者construct()方法若皱。
  7. 返回結(jié)果更新統(tǒng)計數(shù)據(jù)镊叁。

放個彩蛋:

image
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市走触,隨后出現(xiàn)的幾起案子晦譬,更是在濱河造成了極大的恐慌,老刑警劉巖互广,帶你破解...
    沈念sama閱讀 218,546評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件敛腌,死亡現(xiàn)場離奇詭異,居然都是意外死亡兜辞,警方通過查閱死者的電腦和手機迎瞧,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,224評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來逸吵,“玉大人凶硅,你說我怎么就攤上這事∩ㄖ澹” “怎么了足绅?”我有些...
    開封第一講書人閱讀 164,911評論 0 354
  • 文/不壞的土叔 我叫張陵捷绑,是天一觀的道長。 經(jīng)常有香客問我氢妈,道長粹污,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,737評論 1 294
  • 正文 為了忘掉前任首量,我火速辦了婚禮壮吩,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘加缘。我一直安慰自己鸭叙,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,753評論 6 392
  • 文/花漫 我一把揭開白布拣宏。 她就那樣靜靜地躺著沈贝,像睡著了一般。 火紅的嫁衣襯著肌膚如雪勋乾。 梳的紋絲不亂的頭發(fā)上宋下,一...
    開封第一講書人閱讀 51,598評論 1 305
  • 那天,我揣著相機與錄音辑莫,去河邊找鬼学歧。 笑死,一個胖子當(dāng)著我的面吹牛摆昧,可吹牛的內(nèi)容都是我干的撩满。 我是一名探鬼主播,決...
    沈念sama閱讀 40,338評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼绅你,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了昭躺?” 一聲冷哼從身側(cè)響起忌锯,我...
    開封第一講書人閱讀 39,249評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎领炫,沒想到半個月后偶垮,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,696評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡帝洪,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,888評論 3 336
  • 正文 我和宋清朗相戀三年似舵,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片葱峡。...
    茶點故事閱讀 40,013評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡砚哗,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出砰奕,到底是詐尸還是另有隱情蛛芥,我是刑警寧澤提鸟,帶...
    沈念sama閱讀 35,731評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站仅淑,受9級特大地震影響称勋,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜涯竟,卻給世界環(huán)境...
    茶點故事閱讀 41,348評論 3 330
  • 文/蒙蒙 一赡鲜、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧庐船,春花似錦银酬、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,929評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至盗棵,卻和暖如春壮韭,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背纹因。 一陣腳步聲響...
    開封第一講書人閱讀 33,048評論 1 270
  • 我被黑心中介騙來泰國打工喷屋, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人瞭恰。 一個月前我還...
    沈念sama閱讀 48,203評論 3 370
  • 正文 我出身青樓屯曹,卻偏偏與公主長得像,于是被迫代替她去往敵國和親惊畏。 傳聞我的和親對象是個殘疾皇子恶耽,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,960評論 2 355