Spring Cloud Ribbon 源碼分析

Spring Cloud Ribbon 源碼分析

前言

原理介紹

ribbon提供了http請(qǐng)求負(fù)載均衡的能力葛账,既然要擴(kuò)展調(diào)度能力梧却,就需要在請(qǐng)求之前,通過(guò)某種調(diào)度策略選擇合適的server來(lái)調(diào)用集歇。
想要實(shí)現(xiàn)這個(gè)效果需要以下幾步:

  1. 對(duì)http的請(qǐng)求已經(jīng)攔截绍刮,對(duì)其進(jìn)行擴(kuò)展
  2. 通過(guò)調(diào)度策略選擇合適的server
  3. 改寫(xiě)http的ip和port來(lái)定向調(diào)用

源碼分析

1. http請(qǐng)求攔截

首先介紹ribbon是如何進(jìn)行給請(qǐng)求增加攔截器温圆,需要用到LoadBalancerAutoConfiguration

LoadBalancerAutoConfiguration
public class LoadBalancerAutoConfiguration {

    @LoadBalanced
    @Autowired(required = false)
    private List<RestTemplate> restTemplates = Collections.emptyList();

    @Bean
    public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
            final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
        return () -> restTemplateCustomizers.ifAvailable(customizers -> {
            for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
                for (RestTemplateCustomizer customizer : customizers) {
                    customizer.customize(restTemplate);
                }
            }
        });
    }

        @Bean
        public LoadBalancerInterceptor loadBalancerInterceptor(
                LoadBalancerClient loadBalancerClient,
                LoadBalancerRequestFactory requestFactory) {
            return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
        }

        @Bean
        @ConditionalOnMissingBean
        public RestTemplateCustomizer restTemplateCustomizer(
                final LoadBalancerInterceptor loadBalancerInterceptor) {
            return restTemplate -> {
                List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                        restTemplate.getInterceptors());
                list.add(loadBalancerInterceptor);
                restTemplate.setInterceptors(list);
            };
        }

LoadBalancerAutoConfiguration的restTemplates通過(guò)@LoadBalanced注解,將所有帶有此注解的RestTemplate的對(duì)象都注入到list中孩革。@LoadBalanced內(nèi)部其實(shí)是繼承@Qualifier實(shí)現(xiàn)岁歉。

通過(guò)RestTemplateCustomizer來(lái)對(duì)每個(gè)restTemplate實(shí)現(xiàn)擴(kuò)展,將LoadBalancerInterceptor也加入到攔截器列表中

在整個(gè)restTemplate的調(diào)用過(guò)程中膝蜈,會(huì)從getForObject -> execute -> doExecute -> request.execute 中來(lái)對(duì)攔截器進(jìn)行處理锅移。

    protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
            @Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {

        // ...
        ClientHttpResponse response = null;
        try {
            ClientHttpRequest request = createRequest(url, method);
            if (requestCallback != null) {
                requestCallback.doWithRequest(request);
            }
            response = request.execute();
            handleResponse(url, method, response);
            return (responseExtractor != null ? responseExtractor.extractData(response) : null);
        }
        catch (IOException ex) {
            // ...
        }
        finally {
            if (response != null) {
                response.close();
            }
        }
    }
    private class InterceptingRequestExecution implements ClientHttpRequestExecution {
  
        private final Iterator<ClientHttpRequestInterceptor> iterator;

        public InterceptingRequestExecution() {
            this.iterator = interceptors.iterator();
        }

        @Override
        public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
            if (this.iterator.hasNext()) {
                ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
                return nextInterceptor.intercept(request, body, this);
            }
            else {
                HttpMethod method = request.getMethod();
                Assert.state(method != null, "No standard HTTP method");
                ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
                request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
                if (body.length > 0) {
                    if (delegate instanceof StreamingHttpOutputMessage) {
                        StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
                        streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
                    }
                    else {
                        StreamUtils.copy(body, delegate.getBody());
                    }
                }
                return delegate.execute();
            }
        }
    }

一直調(diào)用到InterceptingRequestExecution的execute方法中熔掺,這里先判斷當(dāng)前是否有攔截器,如果存在則調(diào)用intercept非剃,我們這里通過(guò)LoadBalancerAutoConfiguration注入了LoadBalancerInterceptor置逻,具體看下LoadBalancerInterceptor的實(shí)現(xiàn)。

public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {

    private LoadBalancerClient loadBalancer;

    private LoadBalancerRequestFactory requestFactory;

    public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
        this.loadBalancer = loadBalancer;
        this.requestFactory = requestFactory;
    }

    public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
        // for backwards compatibility
        this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
    }

    @Override
    public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
            final ClientHttpRequestExecution execution) throws IOException {
        final URI originalUri = request.getURI();
        String serviceName = originalUri.getHost();
        Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
        return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
    }
}

這里的loadBalancer是在創(chuàng)建Interceptor時(shí)备绽,注入的RibbonLoadBalancerClient券坞,自動(dòng)注入的過(guò)程在RibbonAutoConfiguration中

RibbonAutoConfiguration
public class RibbonAutoConfiguration {
    @Bean
    @ConditionalOnMissingBean
    public SpringClientFactory springClientFactory() {
        SpringClientFactory factory = new SpringClientFactory();
        factory.setConfigurations(this.configurations);
        return factory;
    }

    @Bean
    @ConditionalOnMissingBean(LoadBalancerClient.class)
    public LoadBalancerClient loadBalancerClient() {
        return new RibbonLoadBalancerClient(springClientFactory());
    }
}

至此完成了前期的請(qǐng)求調(diào)用攔截的部分,開(kāi)始進(jìn)入ribbon調(diào)度策略的選擇環(huán)節(jié)疯坤。

2. 調(diào)度策略

public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
            throws IOException {
        ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
        Server server = getServer(loadBalancer, hint);
        if (server == null) {
            throw new IllegalStateException("No instances available for " + serviceId);
        }
        RibbonServer ribbonServer = new RibbonServer(serviceId, server,
                isSecure(server, serviceId),
                serverIntrospector(serviceId).getMetadata(server));

        return execute(serviceId, ribbonServer, request);
    }

首先實(shí)例化ILoadBalancer的bean报慕,默認(rèn)會(huì)注入ZoneAwareLoadBalancer的bean,ZoneAwareLoadBalancer的繼承關(guān)系如下圖压怠。


ILoadBalancer的注入實(shí)現(xiàn)在RibbonClientConfiguration中。

RibbonClientConfiguration
    // client的請(qǐng)求的默認(rèn)配置
    @Bean
    @ConditionalOnMissingBean
    public IClientConfig ribbonClientConfig() {
        DefaultClientConfigImpl config = new DefaultClientConfigImpl();
        config.loadProperties(this.name);
        config.set(CommonClientConfigKey.ConnectTimeout, DEFAULT_CONNECT_TIMEOUT);
        config.set(CommonClientConfigKey.ReadTimeout, DEFAULT_READ_TIMEOUT);
        config.set(CommonClientConfigKey.GZipPayload, DEFAULT_GZIP_PAYLOAD);
        return config;
    }
    // 調(diào)度規(guī)則
    @Bean
    @ConditionalOnMissingBean
    public IRule ribbonRule(IClientConfig config) {
        if (this.propertiesFactory.isSet(IRule.class, name)) {
            return this.propertiesFactory.get(IRule.class, config, name);
        }
        ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
        rule.initWithNiwsConfig(config);
        return rule;
    }
    // 維持心跳ping
    @Bean
    @ConditionalOnMissingBean
    public IPing ribbonPing(IClientConfig config) {
        if (this.propertiesFactory.isSet(IPing.class, name)) {
            return this.propertiesFactory.get(IPing.class, config, name);
        }
        return new DummyPing();
    }
    // 服務(wù)端提供的server列表
    @Bean
    @ConditionalOnMissingBean
    @SuppressWarnings("unchecked")
    public ServerList<Server> ribbonServerList(IClientConfig config) {
        if (this.propertiesFactory.isSet(ServerList.class, name)) {
            return this.propertiesFactory.get(ServerList.class, config, name);
        }
        ConfigurationBasedServerList serverList = new ConfigurationBasedServerList();
        serverList.initWithNiwsConfig(config);
        return serverList;
    }
    // server列表動(dòng)態(tài)更新
    @Bean
    @ConditionalOnMissingBean
    public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {
        return new PollingServerListUpdater(config);
    }
    // loadbalance的客戶端對(duì)象
    @Bean
    @ConditionalOnMissingBean
    public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
            ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
            IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
        if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
            return this.propertiesFactory.get(ILoadBalancer.class, config, name);
        }
        return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                serverListFilter, serverListUpdater);
    }
    // server列表選擇過(guò)濾器
    @Bean
    @ConditionalOnMissingBean
    @SuppressWarnings("unchecked")
    public ServerListFilter<Server> ribbonServerListFilter(IClientConfig config) {
        if (this.propertiesFactory.isSet(ServerListFilter.class, name)) {
            return this.propertiesFactory.get(ServerListFilter.class, config, name);
        }
        ZonePreferenceServerListFilter filter = new ZonePreferenceServerListFilter();
        filter.initWithNiwsConfig(config);
        return filter;
    }
    //  loadbalancer的上下文配置
    @Bean
    @ConditionalOnMissingBean
    public RibbonLoadBalancerContext ribbonLoadBalancerContext(ILoadBalancer loadBalancer,
            IClientConfig config, RetryHandler retryHandler) {
        return new RibbonLoadBalancerContext(loadBalancer, config, retryHandler);
    }
    // 重試處理器
    @Bean
    @ConditionalOnMissingBean
    public RetryHandler retryHandler(IClientConfig config) {
        return new DefaultLoadBalancerRetryHandler(config);
    }
    // server 檢查器(內(nèi)史晌)菌瘫,檢查是否安全端口,獲取基本信息等布卡。
    @Bean
    @ConditionalOnMissingBean
    public ServerIntrospector serverIntrospector() {
        return new DefaultServerIntrospector();
    }

ZoneAwareLoadBalancer的實(shí)例化還依賴的其他的bean雨让,如ZoneAvoidanceRuleDummyPing忿等、ConfigurationBasedServerList栖忠、DefaultLoadBalancerRetryHandler

ZoneAwareLoadBalancer

ZoneAwareLoadBalancer 在構(gòu)造方法中,會(huì)完成一系列的初始化動(dòng)作贸街,如serverList的設(shè)置庵寞、開(kāi)啟server刷新任務(wù),初始化loadBalanceStat薛匪,配置負(fù)載均衡策略捐川、ping策略、filter等操作逸尖。

public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule,
                                 IPing ping, ServerList<T> serverList, ServerListFilter<T> filter,
                                 ServerListUpdater serverListUpdater) {
        super(clientConfig, rule, ping, serverList, filter, serverListUpdater);
    }

public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
                                         ServerList<T> serverList, ServerListFilter<T> filter,
                                         ServerListUpdater serverListUpdater) {
        super(clientConfig, rule, ping);
        this.serverListImpl = serverList;
        this.filter = filter;
        this.serverListUpdater = serverListUpdater;
        if (filter instanceof AbstractServerListFilter) {
            ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
        }
        restOfInit(clientConfig);
    }
restOfInit()
void restOfInit(IClientConfig clientConfig) {
        boolean primeConnection = this.isEnablePrimingConnections();
        // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
        this.setEnablePrimingConnections(false);
        enableAndInitLearnNewServersFeature();

        updateListOfServers();
        if (primeConnection && this.getPrimeConnections() != null) {
            this.getPrimeConnections()
                    .primeConnections(getReachableServers());
        }
        this.setEnablePrimingConnections(primeConnection);
        LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
    }

在restOfInit方法中古沥,有兩個(gè)關(guān)鍵方法:enableAndInitLearnNewServersFeatureupdateListOfServers

enableAndInitLearnNewServersFeature()
public void enableAndInitLearnNewServersFeature() {
        LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
        serverListUpdater.start(updateAction);
    }
 @Override
    public synchronized void start(final UpdateAction updateAction) {
        if (isActive.compareAndSet(false, true)) {
            final Runnable wrapperRunnable = new Runnable() {
                @Override
                public void run() {
                    if (!isActive.get()) {
                        if (scheduledFuture != null) {
                            scheduledFuture.cancel(true);
                        }
                        return;
                    }
                    try {
                        updateAction.doUpdate();
                        lastUpdated = System.currentTimeMillis();
                    } catch (Exception e) {
                        logger.warn("Failed one update cycle", e);
                    }
                }
            };

            scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                    wrapperRunnable,
                    initialDelayMs,
                    refreshIntervalMs,
                    TimeUnit.MILLISECONDS
            );
        } else {
            logger.info("Already active, no-op");
        }
    }

可以看到這個(gè)方法開(kāi)啟了一個(gè)schedule,這個(gè)schedule的用途是每隔一段時(shí)間刷新serverList娇跟。這里傳入了一個(gè)UpdateAction岩齿,這個(gè)UpdateAction是

protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
        @Override
        public void doUpdate() {
            updateListOfServers();
        }
    };

通過(guò)調(diào)用updateListOfServers來(lái)刷新serverList,而刷新的時(shí)間間隔refreshIntervalMs默認(rèn)是30s苞俘。

updateListOfServers
@VisibleForTesting
    public void updateListOfServers() {
        List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {
            servers = serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                    getIdentifier(), servers);

            if (filter != null) {
                servers = filter.getFilteredListOfServers(servers);
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
            }
        }
        updateAllServerList(servers);
    }

這里的servers其實(shí)就是我們配置的listOfServers列表盹沈,在獲取到最新的servers之后,會(huì)調(diào)用updateAllServerList來(lái)更新緩存苗胀。

 protected void updateAllServerList(List<T> ls) {
        // other threads might be doing this - in which case, we pass
        if (serverListUpdateInProgress.compareAndSet(false, true)) {
            try {
                for (T s : ls) {
                    s.setAlive(true); // set so that clients can start using these
                                      // servers right away instead
                                      // of having to wait out the ping cycle.
                }
                setServersList(ls);
                super.forceQuickPing();
            } finally {
                serverListUpdateInProgress.set(false);
            }
        }
    }

@Override
    public void setServersList(List lsrv) {
        super.setServersList(lsrv);
        List<T> serverList = (List<T>) lsrv;
        Map<String, List<Server>> serversInZones = new HashMap<String, List<Server>>();
        for (Server server : serverList) {
            // make sure ServerStats is created to avoid creating them on hot
            // path
            getLoadBalancerStats().getSingleServerStat(server);
            String zone = server.getZone();
            if (zone != null) {
                zone = zone.toLowerCase();
                List<Server> servers = serversInZones.get(zone);
                if (servers == null) {
                    servers = new ArrayList<Server>();
                    serversInZones.put(zone, servers);
                }
                servers.add(server);
            }
        }
        setServerListForZones(serversInZones);
    }

首先在super.setSersList中襟诸,將全局的server列表瓦堵,也就是allServerList做更新,
在for循環(huán)中歌亲,調(diào)用getSingleServerStat來(lái)刷新一下內(nèi)部cache菇用,最后設(shè)置zone。

這里需要說(shuō)明一下LoadBalancerStats

LoadBalancerStats

public class LoadBalancerStats implements IClientConfigAware {
    
    private static final String PREFIX = "LBStats_";
    
    String name;
    
    // Map<Server,ServerStats> serverStatsMap = new ConcurrentHashMap<Server,ServerStats>();
    volatile Map<String, ZoneStats> zoneStatsMap = new ConcurrentHashMap<String, ZoneStats>();
    volatile Map<String, List<? extends Server>> upServerListZoneMap = new ConcurrentHashMap<String, List<? extends Server>>();
    
    private volatile CachedDynamicIntProperty connectionFailureThreshold;
        
    private volatile CachedDynamicIntProperty circuitTrippedTimeoutFactor;

    private volatile CachedDynamicIntProperty maxCircuitTrippedTimeout;

    private static final DynamicIntProperty SERVERSTATS_EXPIRE_MINUTES = 
        DynamicPropertyFactory.getInstance().getIntProperty("niws.loadbalancer.serverStats.expire.minutes", 30);
    
    private final LoadingCache<Server, ServerStats> serverStatsCache = 
        CacheBuilder.newBuilder()
            .expireAfterAccess(SERVERSTATS_EXPIRE_MINUTES.get(), TimeUnit.MINUTES)
            .removalListener(new RemovalListener<Server, ServerStats>() {
                @Override
                public void onRemoval(RemovalNotification<Server, ServerStats> notification) {
                    notification.getValue().close();
                }
            })
            .build(
                new CacheLoader<Server, ServerStats>() {
                    public ServerStats load(Server server) {
                        return createServerStats(server);
                    }
                });

ribbon通過(guò)這個(gè)對(duì)象來(lái)記錄每個(gè)server的運(yùn)行特征和統(tǒng)計(jì)數(shù)據(jù)陷揪,如各種連接時(shí)間惋鸥,響應(yīng)時(shí)間,熔斷配置悍缠,請(qǐng)求次數(shù)卦绣,serverStat,zoneStat等飞蚓。

LoadBalancerStats的這些參數(shù)滤港,會(huì)作為負(fù)載均衡選擇策略的重要參考依據(jù)。

至此趴拧,ZoneAwareLoadBalancer的bean實(shí)例化完成溅漾。

回到調(diào)度選擇的開(kāi)始處,在完成loadBalance的instance之后著榴,開(kāi)始選擇Server添履。

getServer
protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
        if (loadBalancer == null) {
            return null;
        }
        // Use 'default' on a null hint, or just pass it on?
        return loadBalancer.chooseServer(hint != null ? hint : "default");
    }
chooseServer
 @Override
    public Server chooseServer(Object key) {
        if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
            logger.debug("Zone aware logic disabled or there is only one zone");
            return super.chooseServer(key);
        }
        Server server = null;
        try {
            LoadBalancerStats lbStats = getLoadBalancerStats();
            Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
            logger.debug("Zone snapshots: {}", zoneSnapshot);
            if (triggeringLoad == null) {
                triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
                        "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
            }

            if (triggeringBlackoutPercentage == null) {
                triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
                        "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
            }
            Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
            logger.debug("Available zones: {}", availableZones);
            if (availableZones != null &&  availableZones.size() < zoneSnapshot.keySet().size()) {
                String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
                logger.debug("Zone chosen: {}", zone);
                if (zone != null) {
                    BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
                    server = zoneLoadBalancer.chooseServer(key);
                }
            }
        } catch (Exception e) {
            logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
        }
        if (server != null) {
            return server;
        } else {
            logger.debug("Zone avoidance logic is not invoked.");
            return super.chooseServer(key);
        }
    }

選擇分兩部分

  1. 如果沒(méi)有配置zone,即只有一個(gè)zone脑又,直接調(diào)用super.chooseServer暮胧。
  2. 如果有一個(gè)以上的zone, 則根據(jù)zone的算法選擇一個(gè)server。

先看一下不使用zone算法的實(shí)現(xiàn)问麸。

public Server chooseServer(Object key) {
        if (counter == null) {
            counter = createCounter();
        }
        counter.increment();
        if (rule == null) {
            return null;
        } else {
            try {
                return rule.choose(key);
            } catch (Exception e) {
                logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
                return null;
            }
        }
    }

增加調(diào)用次數(shù)往衷,并通過(guò)role的實(shí)現(xiàn)選擇一個(gè)server。
這里的zone是ZoneAvoidanceRule

@Override
    public Server choose(Object key) {
        ILoadBalancer lb = getLoadBalancer();
        Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
        if (server.isPresent()) {
            return server.get();
        } else {
            return null;
        }       
    }

public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
        List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
        if (eligible.size() == 0) {
            return Optional.absent();
        }
        return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
    }

內(nèi)部通過(guò)輪詢的機(jī)制口叙,來(lái)選擇server炼绘,除了ZoneAvoidanceRule的實(shí)現(xiàn)之外,還有提供了很多的其他rule策略妄田,如下圖:


比如WeighedResponseTimeRule就是通過(guò)響應(yīng)時(shí)間來(lái)動(dòng)態(tài)選擇server俺亮,具體算法的數(shù)據(jù)就是依賴LoadBalanceStat的數(shù)據(jù)統(tǒng)計(jì)。

3. uri改寫(xiě)和調(diào)用

繼續(xù)回到調(diào)度的最開(kāi)始疟呐,選擇好server之后脚曾,就這些對(duì)象封裝成RibbonServer,就繼續(xù)執(zhí)行execute方法启具。

@Override
    public <T> T execute(String serviceId, ServiceInstance serviceInstance,
            LoadBalancerRequest<T> request) throws IOException {
        Server server = null;
        if (serviceInstance instanceof RibbonServer) {
            server = ((RibbonServer) serviceInstance).getServer();
        }
        if (server == null) {
            throw new IllegalStateException("No instances available for " + serviceId);
        }

        RibbonLoadBalancerContext context = this.clientFactory
                .getLoadBalancerContext(serviceId);
        RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);

        try {
            T returnVal = request.apply(serviceInstance);
            statsRecorder.recordStats(returnVal);
            return returnVal;
        }
        // catch IOException and rethrow so RestTemplate behaves correctly
        catch (IOException ex) {
            statsRecorder.recordStats(ex);
            throw ex;
        }
        catch (Exception ex) {
            statsRecorder.recordStats(ex);
            ReflectionUtils.rethrowRuntimeException(ex);
        }
        return null;
    }

這個(gè)方法主要就是request.apply的調(diào)用本讥。
request是在前面的調(diào)用中,this.requestFactory.createRequest(request, body, execution) 來(lái)構(gòu)建。

public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request, final byte[] body,
            final ClientHttpRequestExecution execution) {
        return instance -> {
            HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, this.loadBalancer);
            if (this.transformers != null) {
                for (LoadBalancerRequestTransformer transformer : this.transformers) {
                    serviceRequest = transformer.transformRequest(serviceRequest, instance);
                }
            }
            return execution.execute(serviceRequest, body);
        };
    }

可以看到這里通過(guò)定義了一個(gè)匿名內(nèi)部類拷沸,定義了apply這個(gè)方法的實(shí)現(xiàn)色查。
封裝了一個(gè)ServiceRequestWrapper,并提供了一個(gè)擴(kuò)展口撞芍,可以自定義LoadBalancerRequestTransformer來(lái)改變?cè)械囊恍?shí)現(xiàn)秧了,如可以改變之前的server選擇。
轉(zhuǎn)化完畢后序无,繼續(xù)執(zhí)行execute验毡。

private class InterceptingRequestExecution implements ClientHttpRequestExecution {

        private final Iterator<ClientHttpRequestInterceptor> iterator;

        public InterceptingRequestExecution() {
            this.iterator = interceptors.iterator();
        }

        @Override
        public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
            if (this.iterator.hasNext()) {
                ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
                return nextInterceptor.intercept(request, body, this);
            }
            else {
                HttpMethod method = request.getMethod();
                Assert.state(method != null, "No standard HTTP method");
                ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
                request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
                if (body.length > 0) {
                    if (delegate instanceof StreamingHttpOutputMessage) {
                        StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
                        streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
                    }
                    else {
                        StreamUtils.copy(body, delegate.getBody());
                    }
                }
                return delegate.execute();
            }
        }
    }

這里重新創(chuàng)建了ClientHttpRequest,在getURI中帝嗡,重構(gòu)了uri的地址晶通,將之前uri中的服務(wù)名,改成了選擇的ip和端口哟玷,重新設(shè)置headers后狮辽,發(fā)起真正的request調(diào)用。

至此碗降,一個(gè)完成的請(qǐng)求的ribbon負(fù)載過(guò)程執(zhí)行完成隘竭。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市讼渊,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌尊剔,老刑警劉巖爪幻,帶你破解...
    沈念sama閱讀 206,968評(píng)論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異须误,居然都是意外死亡挨稿,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門(mén)京痢,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)奶甘,“玉大人,你說(shuō)我怎么就攤上這事祭椰〕艏遥” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 153,220評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵方淤,是天一觀的道長(zhǎng)钉赁。 經(jīng)常有香客問(wèn)我,道長(zhǎng)携茂,這世上最難降的妖魔是什么你踩? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,416評(píng)論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮,結(jié)果婚禮上带膜,老公的妹妹穿的比我還像新娘吩谦。我一直安慰自己,他們只是感情好膝藕,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,425評(píng)論 5 374
  • 文/花漫 我一把揭開(kāi)白布式廷。 她就那樣靜靜地躺著,像睡著了一般束莫。 火紅的嫁衣襯著肌膚如雪懒棉。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 49,144評(píng)論 1 285
  • 那天览绿,我揣著相機(jī)與錄音策严,去河邊找鬼。 笑死饿敲,一個(gè)胖子當(dāng)著我的面吹牛妻导,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播怀各,決...
    沈念sama閱讀 38,432評(píng)論 3 401
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼倔韭,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了瓢对?” 一聲冷哼從身側(cè)響起寿酌,我...
    開(kāi)封第一講書(shū)人閱讀 37,088評(píng)論 0 261
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎硕蛹,沒(méi)想到半個(gè)月后蝶防,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體馏颂,經(jīng)...
    沈念sama閱讀 43,586評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡观蓄,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,028評(píng)論 2 325
  • 正文 我和宋清朗相戀三年谭企,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片埃仪。...
    茶點(diǎn)故事閱讀 38,137評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡乙濒,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出卵蛉,到底是詐尸還是另有隱情颁股,我是刑警寧澤,帶...
    沈念sama閱讀 33,783評(píng)論 4 324
  • 正文 年R本政府宣布毙玻,位于F島的核電站豌蟋,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏桑滩。R本人自食惡果不足惜梧疲,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,343評(píng)論 3 307
  • 文/蒙蒙 一允睹、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧幌氮,春花似錦缭受、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,333評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至宇智,卻和暖如春蔓搞,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背随橘。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,559評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工喂分, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人机蔗。 一個(gè)月前我還...
    沈念sama閱讀 45,595評(píng)論 2 355
  • 正文 我出身青樓蒲祈,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親萝嘁。 傳聞我的和親對(duì)象是個(gè)殘疾皇子梆掸,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,901評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容