Spring Cloud Ribbon 分析(四)之Feign集成

前三節(jié)分析,我們主要分析和總結(jié)了Ribbon結(jié)合RestTemplate的使用,本節(jié)我們主要分析Ribbon結(jié)合Feign客戶端的使用蓖议,畢竟當(dāng)下使用Feign非常廣泛虏杰,那么下面我們就分析下Feign客戶端是如何使用Ribbon負(fù)載的,以下分析需要依賴OpenFeign依賴庫模塊


FeignRibbonClientAutoConfiguration配置文件

@ConditionalOnClass({ ILoadBalancer.class, Feign.class })
@Configuration
@AutoConfigureBefore(FeignAutoConfiguration.class)
@EnableConfigurationProperties({ FeignHttpClientProperties.class })
@Import({ HttpClientFeignLoadBalancedConfiguration.class,
        OkHttpFeignLoadBalancedConfiguration.class,
        DefaultFeignLoadBalancedConfiguration.class })
public class FeignRibbonClientAutoConfiguration {

    @Bean
    @Primary
    @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
    public CachingSpringLoadBalancerFactory cachingLBClientFactory(
            SpringClientFactory factory) {
        return new CachingSpringLoadBalancerFactory(factory);
    }

    @Bean
    @Primary
    @ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate")
    public CachingSpringLoadBalancerFactory retryabeCachingLBClientFactory(
        SpringClientFactory factory,
        LoadBalancedRetryFactory retryFactory) {
        return new CachingSpringLoadBalancerFactory(factory, retryFactory);
    }

    //設(shè)置默認(rèn)請求選項(xiàng)拒担,從Ribbon的SpringClientFactory獲取IClientConfig配置參數(shù)
    @Bean
    @ConditionalOnMissingBean
    public Request.Options feignRequestOptions() {
        return LoadBalancerFeignClient.DEFAULT_OPTIONS;
    }
}

當(dāng)前配置類我們關(guān)注下CachingSpringLoadBalancerFactory這個工廠類嘹屯,主要職責(zé)是獲取FeignLoadBalancer并緩存提供給LoadBalancerFeignClient使用,LoadBalancerFeignClient是Feign負(fù)載均衡客戶端的默認(rèn)實(shí)現(xiàn)


CachingSpringLoadBalancerFactory工廠類

public class CachingSpringLoadBalancerFactory {

    private final SpringClientFactory factory;
    private LoadBalancedRetryFactory loadBalancedRetryFactory = null;

    private volatile Map<String, FeignLoadBalancer> cache = new ConcurrentReferenceHashMap<>();

    public CachingSpringLoadBalancerFactory(SpringClientFactory factory) {
        this.factory = factory;
    }

    public CachingSpringLoadBalancerFactory(SpringClientFactory factory, LoadBalancedRetryFactory loadBalancedRetryPolicyFactory) {
        this.factory = factory;
        this.loadBalancedRetryFactory = loadBalancedRetryPolicyFactory;
    }

    public FeignLoadBalancer create(String clientName) {
        FeignLoadBalancer client = this.cache.get(clientName);
        if(client != null) {
            return client;
        }
        IClientConfig config = this.factory.getClientConfig(clientName);
        ILoadBalancer lb = this.factory.getLoadBalancer(clientName);
        ServerIntrospector serverIntrospector = this.factory.getInstance(clientName, ServerIntrospector.class);
        client = loadBalancedRetryFactory != null ? new RetryableFeignLoadBalancer(lb, config, serverIntrospector,
            loadBalancedRetryFactory) : new FeignLoadBalancer(lb, config, serverIntrospector);
        this.cache.put(clientName, client);
        return client;
    }
}

代碼片段中我們著重關(guān)注下SpringClientFactory對象,這個對象在分析一中已經(jīng)講述過从撼,通過SpringClientFactory獲取Ribbon管理配置接口(IClientConfig)與Ribbon負(fù)載均衡接口(ILoadBalancer)最終生成Feign負(fù)載均衡實(shí)現(xiàn)類(FeignLoadBalancer)州弟,最終提供給LoadBalancerFeignClient使用,LoadBalancerFeignClient內(nèi)部會調(diào)用到ILoadBalancer.chooseServer達(dá)到負(fù)載均衡的效果低零,整個調(diào)用鏈會在下文闡述


DefaultFeignLoadBalancedConfiguration

@Configuration
class DefaultFeignLoadBalancedConfiguration {
    @Bean
    @ConditionalOnMissingBean
    public Client feignClient(CachingSpringLoadBalancerFactory cachingFactory,
                              SpringClientFactory clientFactory) {
        return new LoadBalancerFeignClient(new Client.Default(null, null),
                cachingFactory, clientFactory);
    }
}

通過FeignRibbonClientAutoConfiguration的@Import({DefaultFeignLoadBalancedConfiguration.class.class})進(jìn)行加載婆翔,默認(rèn)的Feign客戶端接口實(shí)現(xiàn)類并且有負(fù)載均衡的效果


FeignAutoConfiguration

@Configuration
@ConditionalOnClass(Feign.class)
@EnableConfigurationProperties({FeignClientProperties.class, FeignHttpClientProperties.class})
public class FeignAutoConfiguration {
@Autowired(required = false)
    private List<FeignClientSpecification> configurations = new ArrayList<>();

    @Bean
    public HasFeatures feignFeature() {
        return HasFeatures.namedFeature("Feign", Feign.class);
    }

    @Bean
    public FeignContext feignContext() {
        FeignContext context = new FeignContext();
        context.setConfigurations(this.configurations);
        return context;
    }

    @Configuration
    @ConditionalOnClass(name = "feign.hystrix.HystrixFeign")
    protected static class HystrixFeignTargeterConfiguration {
        @Bean
        @ConditionalOnMissingBean
        public Targeter feignTargeter() {
            return new HystrixTargeter();
        }
    }
    ......
}

在FeignRibbonClientAutoConfiguration之后被裝配,這個配置類我們只大概講述下HystrixFeignTargeterConfiguration配置類下的Targeter,這個的作用就是生成一個Feign客戶端代理類掏婶,這個代理類會實(shí)現(xiàn)Hystrix熔斷的功能啃奴,然后調(diào)用到最終的LoadBalancerFeignClient進(jìn)行請求,整個生成Feign代理類的調(diào)用鏈為@EnableFeignClients->FeignClientsRegistrar.registerBeanDefinitions->FeignClientsRegistrar.registerFeignClient->FeignClientFactoryBean.getObject->FeignClientFactoryBean.loadBalance->HystrixTargeter.target->Feign.target->HystrixFeign#build->ReflectiveFeign.newInstance->HystrixInvocationHandler.invoke->SynchronousMethodHandler.invoke對于整個調(diào)用鏈本節(jié)暫不過多講述雄妥,感興趣可以通過堆棧進(jìn)行跟蹤,具體分析會放到單獨(dú)講述Feign相關(guān)的分析進(jìn)行澄清


LoadBalancerFeignClient負(fù)載均衡實(shí)現(xiàn)類

public class LoadBalancerFeignClient implements Client {

    static final Request.Options DEFAULT_OPTIONS = new Request.Options();

    private final Client delegate;
    private CachingSpringLoadBalancerFactory lbClientFactory;
    private SpringClientFactory clientFactory;

    ......

    @Override
    public Response execute(Request request, Request.Options options) throws IOException {
        try {
            URI asUri = URI.create(request.url());
            String clientName = asUri.getHost();
            URI uriWithoutHost = cleanUrl(request.url(), clientName);
            FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
                    this.delegate, request, uriWithoutHost);

            IClientConfig requestConfig = getClientConfig(options, clientName);
            return lbClient(clientName).executeWithLoadBalancer(ribbonRequest,
                    requestConfig).toResponse();
        }
        catch (ClientException e) {
            IOException io = findIOException(e);
            if (io != null) {
                throw io;
            }
            throw new RuntimeException(e);
        }
    }

    IClientConfig getClientConfig(Request.Options options, String clientName) {
        IClientConfig requestConfig;
        if (options == DEFAULT_OPTIONS) {
            requestConfig = this.clientFactory.getClientConfig(clientName);
        } else {
            requestConfig = new FeignOptionsClientConfig(options);
        }
        return requestConfig;
    }

    ......

    private FeignLoadBalancer lbClient(String clientName) {
        return this.lbClientFactory.create(clientName);
    }

    ......
}

通過上文我們得知最終生成的Feign代理類會調(diào)用到LoadBalancerFeignClient的execute方法,我們主要看executeWithLoadBalancer這個方法最蕾,通過負(fù)載均衡獲取結(jié)果

/**
* 當(dāng)調(diào)用者想要通過負(fù)載均衡分發(fā)請求到一個服務(wù)器時候可以使用這個方法,
* 通過recostructuriWithServer計算最終的URI老厌,而不是在請求URI中指定服務(wù)器
*/
public abstract class AbstractLoadBalancerAwareClient<S extends ClientRequest, T extends IResponse> extends LoadBalancerContext implements IClient<S, T>, IClientConfigAware {
    ......

    public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
        //構(gòu)建LoadBalancerCommand命令行
        LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);

        try {
            //提交命令請求,并返回一個RxJava的被觀察者對象瘟则,通過阻塞方式獲取最終的結(jié)果
            return command.submit(
                new ServerOperation<T>() {
                    //通過LoadBalancerContext.getServerFromLoadBalancer獲取負(fù)載均衡的Server,
                    //通過Server svc = lb.chooseServer(loadBalancerKey);獲取負(fù)載均衡Server枝秤,
                    //如何負(fù)載均衡獲取Server可以查看[分析三](http://www.reibang.com/p/f076ab3e4031)
                    @Override
                    public Observable<T> call(Server server) {
                        //計算最終的URI
                        URI finalUri = reconstructURIWithServer(server, request.getUri());
                        S requestForServer = (S) request.replaceUri(finalUri);
                        try {
                            //執(zhí)行RetryableFeignLoadBalancer.execute方法
                            return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                        } 
                        catch (Exception e) {
                            return Observable.error(e);
                        }
                    }
                })
                //阻塞BlockingObservable
                .toBlocking()
                //返回結(jié)果
                .single();
        } catch (Exception e) {
            Throwable t = e.getCause();
            if (t instanceof ClientException) {
                throw (ClientException) t;
            } else {
                throw new ClientException(e);
            }
        }
    }
}

通過注釋我們得知時候通過阻塞方式獲取結(jié)果,下文會大致分析調(diào)用流程


LoadBalancerCommand命令類

public class LoadBalancerCommand<T> {
    ......
    //創(chuàng)建被觀察者
    private Observable<Server> selectServer() {
        return Observable.create(new OnSubscribe<Server>() {
            @Override
            public void call(Subscriber<? super Server> next) {
                try {
                    //通過負(fù)載均衡方法獲取Server
                    Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey); 
                    //發(fā)射數(shù)據(jù)
                    next.onNext(server);
                    //結(jié)束當(dāng)前被觀察者
                    next.onCompleted();
                } catch (Exception e) {
                    next.onError(e);
                }
            }
        });
    }
    ......
    public Observable<T> submit(final ServerOperation<T> operation) {
        final ExecutionInfoContext context = new ExecutionInfoContext();
        
        if (listenerInvoker != null) {
            try {
                listenerInvoker.onExecutionStart();
            } catch (AbortExecutionException e) {
                return Observable.error(e);
            }
        }

        final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
        final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();

        // Use the load balancer
        Observable<T> o = 
                (server == null ? selectServer() : Observable.just(server))
                .concatMap(new Func1<Server, Observable<T>>() {
                    @Override
                    // Called for each server being selected
                    public Observable<T> call(Server server) {
                        context.setServer(server);
                        final ServerStats stats = loadBalancerContext.getServerStats(server);
                        
                        // Called for each attempt and retry
                        Observable<T> o = Observable
                                .just(server)
                                .concatMap(new Func1<Server, Observable<T>>() {
                                    @Override
                                    public Observable<T> call(final Server server) {
                                        context.incAttemptCount();
                                        loadBalancerContext.noteOpenConnection(stats);
                                        
                                        if (listenerInvoker != null) {
                                            try {
                                                listenerInvoker.onStartWithServer(context.toExecutionInfo());
                                            } catch (AbortExecutionException e) {
                                                return Observable.error(e);
                                            }
                                        }
                                        
                                        final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
                                        //執(zhí)行return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                                        return operation.call(server).doOnEach(new Observer<T>() {
                                            private T entity;
                                            @Override
                                            public void onCompleted() {
                                                recordStats(tracer, stats, entity, null);
                                                // TODO: What to do if onNext or onError are never called?
                                            }

                                            @Override
                                            public void onError(Throwable e) {
                                                recordStats(tracer, stats, null, e);
                                                logger.debug("Got error {} when executed on server {}", e, server);
                                                if (listenerInvoker != null) {
                                                    listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
                                                }
                                            }

                                            @Override
                                            public void onNext(T entity) {
                                                this.entity = entity;
                                                if (listenerInvoker != null) {
                                                    listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
                                                }
                                            }                            
                                            
                                            private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
                                                tracer.stop();
                                                loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
                                            }
                                        });
                                    }
                                });
                        
                        if (maxRetrysSame > 0) 
                            o = o.retry(retryPolicy(maxRetrysSame, true));
                        return o;
                    }
                });
            
        if (maxRetrysNext > 0 && server == null) 
            o = o.retry(retryPolicy(maxRetrysNext, false));
        //發(fā)生錯誤時繼續(xù)執(zhí)行下一個
        return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
            @Override
            public Observable<T> call(Throwable e) {
                if (context.getAttemptCount() > 0) {
                    if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
                        e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
                                "Number of retries on next server exceeded max " + maxRetrysNext
                                + " retries, while making a call for: " + context.getServer(), e);
                    }
                    else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
                        e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
                                "Number of retries exceeded max " + maxRetrysSame
                                + " retries, while making a call for: " + context.getServer(), e);
                    }
                }
                if (listenerInvoker != null) {
                    listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
                }
                return Observable.error(e);
            }
        });
    }
}

我們看見LoadBalancerCommand. submit方法內(nèi)部醋拧,最終執(zhí)行return operation.call(server)->RetryableFeignLoadBalancer.execute從下往上返回Observable<T>返回值,最終返回給最外層淀弹,最外層獲取得到Observable<T>然后.toBlocking().single()通過內(nèi)部阻塞方式獲取最終請求的返回結(jié)果,這個請求方式涉及RxJava的被觀察者與觀察者模式丹壕,所以更多RxJava使用方法請參考RxJava相關(guān)資料


BlockingObservable

public final class BlockingObservable<T> {
    ......
    private T blockForSingle(final Observable<? extends T> observable) {
        final AtomicReference<T> returnItem = new AtomicReference<T>();
        final AtomicReference<Throwable> returnException = new AtomicReference<Throwable>();
        //線程等待其他線程各自執(zhí)行完畢后再執(zhí)行
        final CountDownLatch latch = new CountDownLatch(1);

        @SuppressWarnings("unchecked")
        //觀察者,訂閱的數(shù)據(jù)為LoadBalancerCommand. selectServer返回的薇溃,調(diào)用subscribe開啟訂閱菌赖,被觀察者開始執(zhí)行call回調(diào)
        Subscription subscription = ((Observable<T>)observable).subscribe(new Subscriber<T>() {
            @Override
            public void onCompleted() {
                latch.countDown();
            }

            @Override
            public void onError(final Throwable e) {
                returnException.set(e);
                latch.countDown();
            }

            @Override
            public void onNext(final T item) {
                returnItem.set(item);
            }
        });
        //阻塞等待,直到latch計數(shù)器的值為0
        BlockingUtils.awaitForComplete(latch, subscription);

        if (returnException.get() != null) {
            Exceptions.propagate(returnException.get());
        }
        //返回最終數(shù)據(jù)
        return returnItem.get();
    }
}

RetryableFeignLoadBalancer

public class RetryableFeignLoadBalancer extends FeignLoadBalancer implements ServiceInstanceChooser {
    @Override
    public RibbonResponse execute(final RibbonRequest request, IClientConfig configOverride)
            throws IOException {
        
        ......

        return retryTemplate.execute(new RetryCallback<RibbonResponse, IOException>() {
            @Override
            public RibbonResponse doWithRetry(RetryContext retryContext) throws IOException {
                Request feignRequest = null;
                if (retryContext instanceof LoadBalancedRetryContext) {
                    ServiceInstance service = ((LoadBalancedRetryContext) retryContext).getServiceInstance();
                    if (service != null) {
                        feignRequest = ((RibbonRequest) request.replaceUri(reconstructURIWithServer(new Server(service.getHost(), service.getPort()), request.getUri()))).toRequest();
                    }
                }
                if (feignRequest == null) {
                    feignRequest = request.toRequest();
                }
                Response response = request.client().execute(feignRequest, options);
                if (retryPolicy.retryableStatusCode(response.status())) {
                    byte[] byteArray = response.body() == null ? new byte[]{} : StreamUtils.copyToByteArray(response.body().asInputStream());
                    response.close();
                    throw new RibbonResponseStatusCodeException(RetryableFeignLoadBalancer.this.clientName, response,
                            byteArray, request.getUri());
                }
                //返回最終的結(jié)果
                return new RibbonResponse(request.getUri(), response);
            }
        }, new LoadBalancedRecoveryCallback<RibbonResponse, Response>() {
            @Override
            protected RibbonResponse createResponse(Response response, URI uri) {
                return new RibbonResponse(uri, response);
            }
        });
    }

}

通過return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));請求就得到了最終的請求結(jié)果


經(jīng)過四節(jié)分析,總結(jié)了RestTemplate與Feign在使用Ribbon時候如何進(jìn)行負(fù)載均衡相關(guān)的知識痊焊,后續(xù)會繼續(xù)分析和總結(jié)Hystrix盏袄、Feign、Zuul等相關(guān)組件薄啥!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末辕羽,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子垄惧,更是在濱河造成了極大的恐慌刁愿,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,817評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件到逊,死亡現(xiàn)場離奇詭異铣口,居然都是意外死亡滤钱,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,329評論 3 385
  • 文/潘曉璐 我一進(jìn)店門脑题,熙熙樓的掌柜王于貴愁眉苦臉地迎上來件缸,“玉大人,你說我怎么就攤上這事叔遂∷叮” “怎么了?”我有些...
    開封第一講書人閱讀 157,354評論 0 348
  • 文/不壞的土叔 我叫張陵已艰,是天一觀的道長痊末。 經(jīng)常有香客問我,道長哩掺,這世上最難降的妖魔是什么凿叠? 我笑而不...
    開封第一講書人閱讀 56,498評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮嚼吞,結(jié)果婚禮上盒件,老公的妹妹穿的比我還像新娘。我一直安慰自己舱禽,他們只是感情好履恩,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,600評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著呢蔫,像睡著了一般。 火紅的嫁衣襯著肌膚如雪飒筑。 梳的紋絲不亂的頭發(fā)上片吊,一...
    開封第一講書人閱讀 49,829評論 1 290
  • 那天,我揣著相機(jī)與錄音协屡,去河邊找鬼俏脊。 笑死,一個胖子當(dāng)著我的面吹牛肤晓,可吹牛的內(nèi)容都是我干的爷贫。 我是一名探鬼主播,決...
    沈念sama閱讀 38,979評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼补憾,長吁一口氣:“原來是場噩夢啊……” “哼漫萄!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起盈匾,我...
    開封第一講書人閱讀 37,722評論 0 266
  • 序言:老撾萬榮一對情侶失蹤腾务,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后削饵,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體岩瘦,經(jīng)...
    沈念sama閱讀 44,189評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡未巫,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,519評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了启昧。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片叙凡。...
    茶點(diǎn)故事閱讀 38,654評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖密末,靈堂內(nèi)的尸體忽然破棺而出握爷,到底是詐尸還是另有隱情,我是刑警寧澤苏遥,帶...
    沈念sama閱讀 34,329評論 4 330
  • 正文 年R本政府宣布饼拍,位于F島的核電站,受9級特大地震影響田炭,放射性物質(zhì)發(fā)生泄漏师抄。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,940評論 3 313
  • 文/蒙蒙 一教硫、第九天 我趴在偏房一處隱蔽的房頂上張望叨吮。 院中可真熱鬧,春花似錦瞬矩、人聲如沸茶鉴。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,762評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽涵叮。三九已至,卻和暖如春伞插,著一層夾襖步出監(jiān)牢的瞬間割粮,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,993評論 1 266
  • 我被黑心中介騙來泰國打工媚污, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留舀瓢,地道東北人。 一個月前我還...
    沈念sama閱讀 46,382評論 2 360
  • 正文 我出身青樓耗美,卻偏偏與公主長得像京髓,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子商架,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,543評論 2 349

推薦閱讀更多精彩內(nèi)容