Ribbon原理解析

一. 核心接口

  1. ILoadBalancer
    Ribbon通過ILoadBalancer接口對外提供統(tǒng)一的選擇服務(wù)器(Server)的功能,此接口會根據(jù)不同的負(fù)載均衡策略(IRule)選擇合適的Server返回給使用者。其核心方法如下:

    public interface ILoadBalancer {
    
        public void addServers(List<Server> newServers);
    
        public Server chooseServer(Object key);
        
        public void markServerDown(Server server);
        
        public List<Server> getReachableServers();
    
        public List<Server> getAllServers();
    }
    

    此接口默認(rèn)實(shí)現(xiàn)類為ZoneAwareLoadBalancer,相關(guān)類關(guān)系圖如下:


    image
  2. IRule
    IRule是負(fù)載均衡策略的抽象,ILoadBalancer通過調(diào)用IRule的choose()方法返回Server聘裁,其核心方法如下:

    public interface IRule{
        
        public Server choose(Object key);
        
        public void setLoadBalancer(ILoadBalancer lb);
        
        public ILoadBalancer getLoadBalancer();    
    }
    

    實(shí)現(xiàn)類有:

    • BestAviableRule
      跳過熔斷的Server,在剩下的Server中選擇并發(fā)請求最低的Server
    • ClientConfigEnabledRoundRobinRule、RoundRobinRule
      輪詢
    • RandomRule
      隨機(jī)選擇
    • RetryRule
      可重試的策略乱顾,可以對其他策略進(jìn)行重試,默認(rèn)輪詢重試
    • WeightedResponseTimeRule
      根據(jù)響應(yīng)時間加權(quán)宫静,響應(yīng)時間越短權(quán)重越大
    • AvailabilityFilteringRule
      剔除因為連續(xù)鏈接走净、讀失敗或鏈接超過最大限制導(dǎo)致熔斷的Server券时,在剩下讀Server中進(jìn)行輪詢。

    相關(guān)類圖如下:


    image
  3. IPing
    IPing用來檢測Server是否可用伏伯,ILoadBalancer的實(shí)現(xiàn)類維護(hù)一個Timer每隔10s檢測一次Server的可用狀態(tài)橘洞,其核心方法有:

    public interface IPing {
    
        public boolean isAlive(Server server);
    }
    

    其實(shí)現(xiàn)類有:


    image
  4. IClientConfig
    IClientConfig主要定義了用于初始化各種客戶端和負(fù)載均衡器的配置信息,器實(shí)現(xiàn)類為DefaultClientConfigImpl说搅。

二. 負(fù)載均衡的邏輯實(shí)現(xiàn)

1. Server的選擇

ILoadBalancer接口的主要實(shí)現(xiàn)類為BaseLoadBalancer和ZoneAwareLoadBalancer炸枣,ZoneAwareLoadBalancer為BaseLoadBalancer的子類并且其也重寫了chooseServer方法,ZoneAwareLoadBalancer從其名稱可以看出這個實(shí)現(xiàn)類是和Spring Cloud的分區(qū)有關(guān)的弄唧,當(dāng)分區(qū)的數(shù)量為1(默認(rèn)配置)時它直接調(diào)用父類BaseLoadBalancer的chooseServer()方法适肠,源碼如下:

@Override
public Server chooseServer(Object key) {
    if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
        // 調(diào)用父類BaseLoadBalancer的chooseServer()方法
        return super.chooseServer(key);
    }
    
    // 略
}

類BaseLoadBalancer的chooseServer()方法直接調(diào)用IRule接口的choose()方法,源碼如下:

public Server chooseServer(Object key) {
    if (counter == null) {
        counter = createCounter();
    }
    counter.increment();
    if (rule == null) {
        return null;
    } else {
        try {
            return rule.choose(key);
        } catch (Exception e) {
            logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
            return null;
        }
    }
}

這里IRule的實(shí)現(xiàn)類為ZoneAvoidanceRule候引,choose()方法的實(shí)現(xiàn)在其父類PredicateBasedRule中侯养,如下:

@Override
public Server choose(Object key) {
    ILoadBalancer lb = getLoadBalancer();
    Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
    if (server.isPresent()) {
        return server.get();
    } else {
        return null;
    }       
}

從上面源碼可以看出,其先調(diào)用ILoadBalancer的getAllServers()方法獲取所有Server列表背伴,getAllServers()方法的實(shí)現(xiàn)在BaseLoadBalancer類中沸毁,此類維護(hù)了一個List<Server>類型的屬性allServerList,所有Server都緩存至此集合中傻寂。獲取Server列表后調(diào)用chooseRoundRobinAfterFiltering()方法返回Server對象息尺。chooseRoundRobinAfterFiltering()方法會根據(jù)loadBalancerKey篩選出候選的Server,然后通過輪詢的負(fù)載均衡策略選出Server疾掰,相關(guān)源碼如下:

public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
    List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
    if (eligible.size() == 0) {
        return Optional.absent();
    }
    return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}

private int incrementAndGetModulo(int modulo) {
    for (;;) {
        int current = nextIndex.get();
        int next = (current + 1) % modulo;
        if (nextIndex.compareAndSet(current, next) && current < modulo)
            return current;
    }
}

可以看到其輪詢選擇Server的策略為獲取次數(shù)加1然后對Server數(shù)量取余得到搂誉。

2. Server的狀態(tài)檢測

BaseLoadBalancer類的集合allServerList緩存了所有Server信息,但是這些Server的狀態(tài)有可能發(fā)生變化静檬,比如Server不可用了炭懊,Ribbon就需要及時感知到,那么Ribbon是如何感知Server可用不可用的呢拂檩?
BaseLoadBalancer的構(gòu)造函數(shù)中初始化了一個任務(wù)調(diào)度器Timer侮腹,這個調(diào)度器每隔10s執(zhí)行一次PingTask任務(wù),相關(guān)源碼如下:

public BaseLoadBalancer(String name, IRule rule, LoadBalancerStats stats,
            IPing ping, IPingStrategy pingStrategy) {
    
    this.name = name;
    this.ping = ping;
    this.pingStrategy = pingStrategy;
    setRule(rule);
    setupPingTask();
    lbStats = stats;
    init();
}
    
void setupPingTask() {
    if (canSkipPing()) {
        return;
    }
    if (lbTimer != null) {
        lbTimer.cancel();
    }
    lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
            true);
    lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
    forceQuickPing();
}

class PingTask extends TimerTask {
    public void run() {
        try {
            new Pinger(pingStrategy).runPinger();
        } catch (Exception e) {
            logger.error("LoadBalancer [{}]: Error pinging", name, e);
        }
    }
}

深入Pinger和SerialPingStrategy的源碼可知稻励,最終通過NIWSDiscoveryPing這一IPing實(shí)現(xiàn)類判斷Server是否可用父阻,NIWSDiscoveryPing的isAlive()方法通過判斷與Server關(guān)聯(lián)的InstanceInfo的status是否為UP來判斷Server是否可用,其isAlive()方法源碼如下:

public boolean isAlive(Server server) {
    boolean isAlive = true;
    if (server!=null && server instanceof DiscoveryEnabledServer){
        DiscoveryEnabledServer dServer = (DiscoveryEnabledServer)server;                
        InstanceInfo instanceInfo = dServer.getInstanceInfo();
        if (instanceInfo!=null){                    
            InstanceStatus status = instanceInfo.getStatus();
            if (status!=null){
                // 其狀態(tài)是否為UP
                isAlive = status.equals(InstanceStatus.UP);
            }
        }
    }
    return isAlive;
}

三望抽、Ribbon的使用姿勢

1. RestTemplate + @LoadBalanced

  • 使用
    提供一個標(biāo)記@LoadBalanced的RestTemplate Bean加矛,然后直接使用此Bean發(fā)起請求即可,如下:

    @Configuration
    public class Config {
    
        @Bean
        @LoadBalanced
        RestTemplate restTemplate() {
            // 提供一個標(biāo)記@LoadBalanced的RestTemplat Bean
            return new RestTemplate();
        }
    }
    
    @RestController
    public class HelloController {
    
        @Resource
        private RestTemplate restTemplate;
        
        @GetMapping("/hi")
        public String hi() {
            // 直接使用即可
            return restTemplate.getForEntity("http://Eureka-Producer/hello", String.class).getBody();
        }
    }
    
  • 實(shí)現(xiàn)原理
    當(dāng)實(shí)例化LoadBalancerAutoConfiguration時煤篙,給所有標(biāo)記了@LoadBalanced的RestTemplate Bean設(shè)置了攔截器LoadBalancerInterceptor斟览,此實(shí)例保存在了RestTemplate的父類InterceptingHttpAccessor的集合List<ClientHttpRequestInterceptor> interceptors中。RestTemplate相關(guān)類圖如下:

    image

    設(shè)置攔截器LoadBalancerInterceptor源碼如下:

    @Configuration
    @ConditionalOnClass(RestTemplate.class)
    @ConditionalOnBean(LoadBalancerClient.class)
    @EnableConfigurationProperties(LoadBalancerRetryProperties.class)
    public class LoadBalancerAutoConfiguration {
    
        // 1. 收集到所有標(biāo)記了@LoadBalanced的RestTemplate
        @LoadBalanced
        @Autowired(required = false)
        private List<RestTemplate> restTemplates = Collections.emptyList();
    
        @Bean
        public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
                final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
            return () -> restTemplateCustomizers.ifAvailable(customizers -> {
                for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
                    for (RestTemplateCustomizer customizer : customizers) {
                        // 3. 對于每一個RestTemplate執(zhí)行customize()方法
                        customizer.customize(restTemplate);
                    }
                }
            });
        }
    
        @Bean
        @ConditionalOnMissingBean
        public LoadBalancerRequestFactory loadBalancerRequestFactory(
                LoadBalancerClient loadBalancerClient) {
            return new LoadBalancerRequestFactory(loadBalancerClient, transformers);
        }
    
        @Configuration
        @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
        static class LoadBalancerInterceptorConfig {
            @Bean
            public LoadBalancerInterceptor ribbonInterceptor(
                    LoadBalancerClient loadBalancerClient,
                    LoadBalancerRequestFactory requestFactory) {
                // 2. 注入LoadBalancerInterceptor
                return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
            }
    
            @Bean
            @ConditionalOnMissingBean
            public RestTemplateCustomizer restTemplateCustomizer(
                    final LoadBalancerInterceptor loadBalancerInterceptor) {
                return restTemplate -> {
                    // 4. customize()方法給RestTemplate設(shè)置LoadBalancerInterceptor
                    List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                            restTemplate.getInterceptors());
                    list.add(loadBalancerInterceptor);
                    restTemplate.setInterceptors(list);
                };
            }
        }
        // 略
    }
    

    從上面源碼可以看出LoadBalancerInterceptor的構(gòu)造函數(shù)接受兩個參數(shù):LoadBalancerClient和LoadBalancerRequestFactory辑奈,LoadBalancerRequestFactory的實(shí)例在此Configuration中被注入類苛茂,而LoadBalancerClient的實(shí)例卻沒有已烤。那么LoadBalancerClient的實(shí)例是在哪里實(shí)例化的呢?答案是RibbonAutoConfiguration味悄,這個Configuration注入了LoadBalancerClient的實(shí)現(xiàn)類RibbonLoadBalancerClient的實(shí)例和SpringClientFactory的實(shí)例草戈,相關(guān)源碼如下:

    @Bean
    public SpringClientFactory springClientFactory() {
        SpringClientFactory factory = new SpringClientFactory();
        factory.setConfigurations(this.configurations);
        return factory;
    }
    
    @Bean
    @ConditionalOnMissingBean(LoadBalancerClient.class)
    public LoadBalancerClient loadBalancerClient() {
        return new RibbonLoadBalancerClient(springClientFactory());
    }
    

    至此攔截器LoadBalancerInterceptor創(chuàng)建完成并且保存在了RestTemplate的集合屬性中,那么RestTemplate是如何利用此攔截器的呢侍瑟?當(dāng)我們使用RestTemplate發(fā)起請求時最終會調(diào)用到RestTemplate的doExecute()方法唐片,此方法會創(chuàng)建ClientHttpRequest對象并調(diào)用其execute()方法發(fā)起請求,源碼如下:

    protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
            @Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {
    
        ClientHttpResponse response = null;
        try {
            // 1. 創(chuàng)建ClientHttpRequest涨颜。
            ClientHttpRequest request = createRequest(url, method);
            if (requestCallback != null) {
                requestCallback.doWithRequest(request);
            }
            // 2. 執(zhí)行其execute()方法獲取結(jié)果费韭。
            response = request.execute();
            handleResponse(url, method, response);
            return (responseExtractor != null ? responseExtractor.extractData(response) : null);
        }
        catch (IOException ex) {
            String resource = url.toString();
            String query = url.getRawQuery();
            resource = (query != null ? resource.substring(0, resource.indexOf('?')) : resource);
            throw new ResourceAccessException("I/O error on " + method.name() +
                    " request for \"" + resource + "\": " + ex.getMessage(), ex);
        }
        finally {
            if (response != null) {
                response.close();
            }
        }
    }
    
    protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException {
        ClientHttpRequest request = getRequestFactory().createRequest(url, method);
        if (logger.isDebugEnabled()) {
            logger.debug("HTTP " + method.name() + " " + url);
        }
        return request;
    }
    
    @Override
    public ClientHttpRequestFactory getRequestFactory() {
        List<ClientHttpRequestInterceptor> interceptors = getInterceptors();
        if (!CollectionUtils.isEmpty(interceptors)) {
            ClientHttpRequestFactory factory = this.interceptingRequestFactory;
            if (factory == null) {
                factory = new InterceptingClientHttpRequestFactory(super.getRequestFactory(), interceptors);
                this.interceptingRequestFactory = factory;
            }
            return factory;
        }
        else {
            return super.getRequestFactory();
        }
    }
    

    從上面的getRequestFactory()方法可以看到當(dāng)集合interceptors不為空的時候ClientHttpRequest對象是由類InterceptingClientHttpRequestFactory的createRequest()方法創(chuàng)建出來的,并且集合interceptors作為參數(shù)傳遞到了InterceptingClientHttpRequestFactory中庭瑰,深入InterceptingClientHttpRequestFactory的createRequest()方法星持,如下:

    public class InterceptingClientHttpRequestFactory extends AbstractClientHttpRequestFactoryWrapper {
    
        private final List<ClientHttpRequestInterceptor> interceptors;
    
        public InterceptingClientHttpRequestFactory(ClientHttpRequestFactory requestFactory,
                @Nullable List<ClientHttpRequestInterceptor> interceptors) {
    
            super(requestFactory);
            this.interceptors = (interceptors != null ? interceptors : Collections.emptyList());
        }
    
        @Override
        protected ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod, ClientHttpRequestFactory requestFactory) {
            // 直接返回InterceptingClientHttpRequest對象。
            return new InterceptingClientHttpRequest(requestFactory, this.interceptors, uri, httpMethod);
        }
    
    }
    

    可以看到攔截器最終傳遞到了InterceptingClientHttpRequest中弹灭,上面說了RestTemplate的doExecute()方法創(chuàng)建了InterceptingClientHttpRequest對象且調(diào)用了其execute()方法獲取響應(yīng)結(jié)果督暂,深入其execute()方法發(fā)現(xiàn)在execute()中直接調(diào)用了攔截器的intercept()方法,也即InterceptingClientHttpRequest的intercept()方法穷吮,源碼如下:

    public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
        if (this.iterator.hasNext()) {
            ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
            // 這里調(diào)用InterceptingClientHttpRequest的intercept()方法
            return nextInterceptor.intercept(request, body, this);
        }
        // 略
    }
    

    也就是說RestTemplate的請求最終是委托給InterceptingClientHttpRequest來處理逻翁。那么InterceptingClientHttpRequest是如何利用Ribbon相關(guān)接口處理請求的呢?且看InterceptingClientHttpRequest的intercept()方法:

    public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
    
        private LoadBalancerClient loadBalancer;
        private LoadBalancerRequestFactory requestFactory;
    
        public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
            this.loadBalancer = loadBalancer;
            this.requestFactory = requestFactory;
        }
    
        public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
            // for backwards compatibility
            this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
        }
    
        @Override
        public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
                final ClientHttpRequestExecution execution) throws IOException {
            final URI originalUri = request.getURI();
            String serviceName = originalUri.getHost();
            // 直接調(diào)用LoadBalancerClient的execute()方法捡鱼。
            return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
        }
    }
    

    可以看到InterceptingClientHttpRequest的intercept()方法直接調(diào)用LoadBalancerClient的execute()方法八回,LoadBalancerClient是一個接口,這里其實(shí)現(xiàn)類為RibbonLoadBalancerClient驾诈,上面創(chuàng)建InterceptingClientHttpRequest時提到LoadBalancerAutoConfiguration注入了RibbonLoadBalancerClient Bean缠诅,此Bean通過構(gòu)造函數(shù)保存在了LoadBalancerClient中。那么接下來就是LoadBalancerClient的execute()方法了乍迄,類是LoadBalancerClient非常有意思管引,先看下其類圖:

    image

    LoadBalancerClient的execute()方法首先會通過調(diào)用SpringClientFactory的getLoadBalancer()方法獲取ILoadBalancer,那么此方法是如何返回ILoadBalancer呢闯两?很簡單褥伴,就是從Spring上下文中獲取,那么Spring上下文中的ILoadBalancer時何時注入的呢生蚁?答案是RibbonClientConfiguration噩翠,此Configuration向Spring上下文注入了以下Bean:

    • ILoadBalancer的實(shí)現(xiàn)類ZoneAwareLoadBalancer戏自。
    • IRule的實(shí)現(xiàn)類ZoneAvoidanceRule邦投。
    • IClientConfig的實(shí)現(xiàn)類DefaultClientConfigImpl。

    另外EurekaRibbonClientConfiguration還注入了:

    • ServerList的實(shí)現(xiàn)類DomainExtractingServerList和DiscoveryEnabledNIWSServerList擅笔。
    • IPing的實(shí)現(xiàn)類NIWSDiscoveryPing志衣。

    源碼如下:

    @Bean
    @ConditionalOnMissingBean
    public IClientConfig ribbonClientConfig() {
        DefaultClientConfigImpl config = new DefaultClientConfigImpl();
        config.loadProperties(this.name);
        config.set(CommonClientConfigKey.ConnectTimeout, DEFAULT_CONNECT_TIMEOUT);
        config.set(CommonClientConfigKey.ReadTimeout, DEFAULT_READ_TIMEOUT);
        config.set(CommonClientConfigKey.GZipPayload, DEFAULT_GZIP_PAYLOAD);
        return config;
    }
    
    @Bean
    @ConditionalOnMissingBean
    public IRule ribbonRule(IClientConfig config) {
        if (this.propertiesFactory.isSet(IRule.class, name)) {
            return this.propertiesFactory.get(IRule.class, config, name);
        }
        ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
        rule.initWithNiwsConfig(config);
        return rule;
    }
    
    @Bean
    @ConditionalOnMissingBean
    public ServerList<Server> ribbonServerList(IClientConfig config) {
        if (this.propertiesFactory.isSet(ServerList.class, name)) {
            return this.propertiesFactory.get(ServerList.class, config, name);
        }
        ConfigurationBasedServerList serverList = new ConfigurationBasedServerList();
        serverList.initWithNiwsConfig(config);
        return serverList;
    }
    
    @Bean
    @ConditionalOnMissingBean
    public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
            ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
            IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
        if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
            return this.propertiesFactory.get(ILoadBalancer.class, config, name);
        }
        return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                serverListFilter, serverListUpdater);
    }
    
    @Bean
    @ConditionalOnMissingBean
    public IPing ribbonPing(IClientConfig config) {
        if (this.propertiesFactory.isSet(IPing.class, serviceId)) {
            return this.propertiesFactory.get(IPing.class, config, serviceId);
        }
        NIWSDiscoveryPing ping = new NIWSDiscoveryPing();
        ping.initWithNiwsConfig(config);
        return ping;
    }
    
    @Bean
    @ConditionalOnMissingBean
    public ServerList<?> ribbonServerList(IClientConfig config, Provider<EurekaClient> eurekaClientProvider) {
        if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
            return this.propertiesFactory.get(ServerList.class, config, serviceId);
        }
        DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
                config, eurekaClientProvider);
        DomainExtractingServerList serverList = new DomainExtractingServerList(
                discoveryServerList, config, this.approximateZoneFromHostname);
        return serverList;
    }
    

    ZoneAwareLoadBalancer的構(gòu)造函數(shù)通過調(diào)用DiscoveryEnabledNIWSServerList的getUpdatedListOfServers()方法獲取Server集合屯援,DiscoveryEnabledNIWSServerList維護(hù)了一個Provider<EurekaClient>類型的屬性eurekaClientProvider,eurekaClientProvider緩存了EurekaClient的實(shí)現(xiàn)類CloudEurekaClient的實(shí)例念脯,getUpdatedListOfServers()方法通過調(diào)用CloudEurekaClient的getInstancesByVipAddress()方法從Eureka Client緩存中獲取應(yīng)用對應(yīng)的所有InstanceInfo列表狞洋。源碼如下:

    // 緩存了EurekaClient的實(shí)現(xiàn)類CloudEurekaClient的實(shí)例
    private final Provider<EurekaClient> eurekaClientProvider;
    
    @Override
    public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
        return obtainServersViaDiscovery();
    }
    
    private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
        List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
    
        if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
            logger.warn("EurekaClient has not been initialized yet, returning an empty list");
            return new ArrayList<DiscoveryEnabledServer>();
        }
    
        EurekaClient eurekaClient = eurekaClientProvider.get();
        if (vipAddresses!=null){
            for (String vipAddress : vipAddresses.split(",")) {
                // if targetRegion is null, it will be interpreted as the same region of client
                List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
                for (InstanceInfo ii : listOfInstanceInfo) {
                    if (ii.getStatus().equals(InstanceStatus.UP)) {
    
                        if(shouldUseOverridePort){
                            if(logger.isDebugEnabled()){
                                logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
                            }
    
                            InstanceInfo copy = new InstanceInfo(ii);
    
                            if(isSecure){
                                ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
                            }else{
                                ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
                            }
                        }
    
                        DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr);
                        serverList.add(des);
                    }
                }
                if (serverList.size()>0 && prioritizeVipAddressBasedServers){
                    break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
                }
            }
        }
        return serverList;
    }
    

    LoadBalancerClient的execute()方法在通過調(diào)用SpringClientFactory的getLoadBalancer()方法獲取ILoadBalancer后調(diào)用其chooseServer()返回一個Server對象,如下:

    public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException {
        // 1. 獲取ILoadBalancer
        ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
        
        // 2. 通過ILoadBalancer選擇一個Server
        Server server = getServer(loadBalancer, hint);
        if (server == null) {
            throw new IllegalStateException("No instances available for " + serviceId);
        }
        RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
                serviceId), serverIntrospector(serviceId).getMetadata(server));
    
        // 3. 對Server發(fā)起請求
        return execute(serviceId, ribbonServer, request);
    }
    
    protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
        if (loadBalancer == null) {
            return null;
        }
        // Use 'default' on a null hint, or just pass it on?
        return loadBalancer.chooseServer(hint != null ? hint : "default");
    }
    

    ZoneAwareLoadBalancer的chooseServer()方法會通過調(diào)用ZoneAvoidanceRule的choose()方法返回一個Server绿店,ZoneAvoidanceRule繼承類ClientConfigEnabledRoundRobinRule吉懊,所以其會根據(jù)ZoneAwareLoadBalancer獲取的Server列表采用輪詢的負(fù)載均衡策略選擇一個Server返回;最后根據(jù)此Server的地址等向其發(fā)起請求假勿。
    相關(guān)類圖如下:

    image

2. Feign接口

相對于RestTemplate+@Loadbalance的方式借嗽,我們在使用Spring Cloud的時候使用更多的是Feign接口,因為Feign接口使用起來會更加簡單转培,下面就是一個使用Feign接口調(diào)用服務(wù)的例子:

// 定義Feign接口
@FeignClient(value = "Eureka-Producer", fallbackFactory = HelloClientFallbackFactory.class)
public interface HelloClient {

    @GetMapping("/hello")
    String hello();
}

// 訂單熔斷快速失敗回調(diào)
@Component
public class HelloClientFallbackFactory implements FallbackFactory<HelloClient>, HelloClient {

    @Override
    public HelloClient create(Throwable throwable) {
        return this;
    }

    @Override
    public String hello() {
        return "熔斷";
    }
}

// 使用
@RestController
public class HelloController {

    @Resource
    private HelloClient helloClient;

    @GetMapping("/hello")
    public String hello() {
        return helloClient.hello();
    }
}

與RestTemplate的通過RibbonLoadBalancerClient獲取Server并執(zhí)行請求類似恶导,F(xiàn)eign接口通過LoadBalancerFeignClient獲取Server并執(zhí)行請求。DefaultFeignLoadBalancedConfiguration會注入LoadBalancerFeignClient Bean浸须,源碼如下:

@Configuration
class DefaultFeignLoadBalancedConfiguration {
    @Bean
    @ConditionalOnMissingBean
    public Client feignClient(CachingSpringLoadBalancerFactory cachingFactory,
                              SpringClientFactory clientFactory) {
        return new LoadBalancerFeignClient(new Client.Default(null, null),
                cachingFactory, clientFactory);
    }
}

那么Feign接口是如何通過LoadBalancerFeignClient實(shí)現(xiàn)負(fù)載均衡調(diào)用的呢惨寿?在《Feign源碼解析》一文中介紹到Feign接口的代理實(shí)現(xiàn)類由FeignClientFactoryBean負(fù)責(zé)生成,F(xiàn)eignClientFactoryBean實(shí)現(xiàn)了FactoryBean删窒,所以其getObject()方法會返回Feign接口的代理實(shí)現(xiàn)裂垦,getObject()方法會從Spring上下文中獲取到LoadBalancerFeignClient,源碼如下:

@Override
public Object getObject() throws Exception {
    return getTarget();
}

<T> T getTarget() {
    FeignContext context = applicationContext.getBean(FeignContext.class);
    Feign.Builder builder = feign(context);

    if (!StringUtils.hasText(this.url)) {
        if (!this.name.startsWith("http")) {
            url = "http://" + this.name;
        }
        else {
            url = this.name;
        }
        url += cleanPath();
        return (T) loadBalance(builder, context, new HardCodedTarget<>(this.type,
                this.name, url));
    }
    if (StringUtils.hasText(this.url) && !this.url.startsWith("http")) {
        this.url = "http://" + this.url;
    }
    String url = this.url + cleanPath();
    // 從Spring上下文中獲取LoadBalancerFeignClient
    Client client = getOptional(context, Client.class);
    if (client != null) {
        if (client instanceof LoadBalancerFeignClient) {
            // not load balancing because we have a url,
            // but ribbon is on the classpath, so unwrap
            client = ((LoadBalancerFeignClient)client).getDelegate();
        }
        builder.client(client);
    }
    Targeter targeter = get(context, Targeter.class);
    return (T) targeter.target(this, builder, context, new HardCodedTarget<>(
            this.type, this.name, url));
}

LoadBalancerFeignClient對外提供服務(wù)的接口是execute()方法易稠,那么此方法是何時被Feign接口調(diào)用的呢缸废?從《Feign源碼解析》一文中可知SynchronousMethodHandler作為MethodHandler的實(shí)現(xiàn)在調(diào)用Feign接口時進(jìn)行攔截并執(zhí)行其invoke()方法,invoke()方法則調(diào)用了LoadBalancerFeignClient的execute()方法發(fā)起網(wǎng)絡(luò)請求驶社,相關(guān)源碼如下:

@Override
public Object invoke(Object[] argv) throws Throwable {
    RequestTemplate template = buildTemplateFromArgs.create(argv);
    Retryer retryer = this.retryer.clone();
    while (true) {
      try {
        return executeAndDecode(template);
      } catch (RetryableException e) {
        // 略
        continue;
      }
    }
}

Object executeAndDecode(RequestTemplate template) throws Throwable {
    Request request = targetRequest(template);
    
    if (logLevel != Logger.Level.NONE) {
      logger.logRequest(metadata.configKey(), logLevel, request);
    }
    
    Response response;
    long start = System.nanoTime();
    try {
      // 調(diào)用LoadBalancerFeignClient的execute()方法獲取響應(yīng)企量。
      response = client.execute(request, options);
    } catch (IOException e) {
      if (logLevel != Logger.Level.NONE) {
        logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
      }
      throw errorExecuting(request, e);
    }
    
    // 略
}

那么LoadBalancerFeignClient的execute()方法又是如何利用Ribbon做負(fù)載均衡的呢?其通過調(diào)用CachingSpringLoadBalancerFactory的create()方法獲取FeignLoadBalancer對象亡电,F(xiàn)eignLoadBalancer對象持有一個ILoadBalancer的對象實(shí)例届巩,此ILoadBalancer對象實(shí)例是CachingSpringLoadBalancerFactory通過調(diào)用SpringClientFactory的getLoadBalancer()方法從Spring上下文中獲取的,源碼如下:

public FeignLoadBalancer create(String clientName) {
    FeignLoadBalancer client = this.cache.get(clientName);
    if(client != null) {
        return client;
    }
    IClientConfig config = this.factory.getClientConfig(clientName);
    ILoadBalancer lb = this.factory.getLoadBalancer(clientName);
    ServerIntrospector serverIntrospector = this.factory.getInstance(clientName, ServerIntrospector.class);
    client = loadBalancedRetryFactory != null ? new RetryableFeignLoadBalancer(lb, config, serverIntrospector,
        loadBalancedRetryFactory) : new FeignLoadBalancer(lb, config, serverIntrospector);
    this.cache.put(clientName, client);
    return client;
}

創(chuàng)建完FeignLoadBalancer后緊接著接著調(diào)用了FeignLoadBalancer的executeWithLoadBalancer()方法份乒,如下:

@Override
public Response execute(Request request, Request.Options options) throws IOException {
    URI asUri = URI.create(request.url());
    String clientName = asUri.getHost();
    URI uriWithoutHost = cleanUrl(request.url(), clientName);
    FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
            this.delegate, request, uriWithoutHost);

    IClientConfig requestConfig = getClientConfig(options, clientName);
    // 執(zhí)行FeignLoadBalancer的executeWithLoadBalancer()方法恕汇。
    return lbClient(clientName).executeWithLoadBalancer(ribbonRequest,
            requestConfig).toResponse();
    // 略
}
// 創(chuàng)建FeignLoadBalancer對象并返回
private FeignLoadBalancer lbClient(String clientName) {
    return this.lbClientFactory.create(clientName);
}

executeWithLoadBalancer()方法的具體實(shí)現(xiàn)在類FeignLoadBalancer的父類AbstractLoadBalancerAwareClient中,如下:

public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
    LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);

    try {
        return command.submit(
            new ServerOperation<T>() {
                @Override
                public Observable<T> call(Server server) {
                    URI finalUri = reconstructURIWithServer(server, request.getUri());
                    S requestForServer = (S) request.replaceUri(finalUri);
                    try {
                        return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                    } 
                    catch (Exception e) {
                        return Observable.error(e);
                    }
                }
            })
            .toBlocking()
            .single();
    } catch (Exception e) {
        // 略
    }
}

executeWithLoadBalancer()方法創(chuàng)建了LoadBalancerCommand對象并且向提交(submit()方法)了一個ServerOperation對象或辖,跟蹤LoadBalancerCommand的submit()方法發(fā)現(xiàn)其調(diào)用了selectServer()方法獲取Server瘾英,而selectServer()方法則委托給了FeignLoadBalancer的父類LoadBalancerContext的getServerFromLoadBalancer()方法獲取Server,如下:

public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
    LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);

    try {
        return command.submit(
            new ServerOperation<T>() {
                @Override
                public Observable<T> call(Server server) {
                    URI finalUri = reconstructURIWithServer(server, request.getUri());
                    S requestForServer = (S) request.replaceUri(finalUri);
                    try {
                        return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                    } 
                    catch (Exception e) {
                        return Observable.error(e);
                    }
                }
            })
            .toBlocking()
            .single();
    } catch (Exception e) {
        Throwable t = e.getCause();
        if (t instanceof ClientException) {
            throw (ClientException) t;
        } else {
            throw new ClientException(e);
        }
    }
    
}

public Observable<T> submit(final ServerOperation<T> operation) {
        final ExecutionInfoContext context = new ExecutionInfoContext();
    // 略
    
    // 這里當(dāng)server為null時調(diào)用selectServer()獲取Server颂暇。
    Observable<T> o = 
            (server == null ? selectServer() : Observable.just(server))
            .concatMap(new Func1<Server, Observable<T>>() {
                @Override
                // Called for each server being selected
                public Observable<T> call(Server server) {
                    context.setServer(server);
                    final ServerStats stats = loadBalancerContext.getServerStats(server);
                    
                    // Called for each attempt and retry
                    Observable<T> o = Observable
                            .just(server)
                            .concatMap(new Func1<Server, Observable<T>>() {
                                @Override
                                public Observable<T> call(final Server server) {
                                    // 略
                }
            });
        // 略
}

private Observable<Server> selectServer() {
    return Observable.create(new OnSubscribe<Server>() {
        @Override
        public void call(Subscriber<? super Server> next) {
            try {
                // 調(diào)用LoadBalancerContext的getServerFromLoadBalancer()獲取Server
                Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
                next.onNext(server);
                next.onCompleted();
            } catch (Exception e) {
                next.onError(e);
            }
        }
    });
}

FeignLoadBalancer和LoadBalancerCommand互相依賴缺谴、彼此調(diào)用,最終FeignLoadBalancer的父類LoadBalancerContext的getServerFromLoadBalancer()方法返回了Server耳鸯,此方法通過調(diào)用其持有的ILoadBalancer對象的chooseServer()方法獲取Server湿蛔,源碼如下:

public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
        String host = null;
    int port = -1;
    if (original != null) {
        host = original.getHost();
    }
    if (original != null) {
        Pair<String, Integer> schemeAndPort = deriveSchemeAndPortFromPartialUri(original);        
        port = schemeAndPort.second();
    }
    // 獲取ILoadBalancer
    ILoadBalancer lb = getLoadBalancer();
    // 調(diào)用ILoadBalancer的chooseServer()方法獲取Server膀曾。
    Server svc = lb.chooseServer(loadBalancerKey);
    if (svc == null){
        throw new ClientException(ClientException.ErrorType.GENERAL,
                "Load balancer does not have available server for client: "
                        + clientName);
    }
    host = svc.getHost();
    if (host == null){
        throw new ClientException(ClientException.ErrorType.GENERAL,
                "Invalid Server for :" + svc);
    }
    logger.debug("{} using LB returned Server: {} for request {}", new Object[]{clientName, svc, original});
    return svc;
}

至此終于看到了通過ILoadBalancer獲取Server的代碼了,相關(guān)類圖如下:

image

四阳啥、總結(jié)

Ribbon通過ILoadBalancer接口提供負(fù)載均衡服務(wù)添谊,其實(shí)現(xiàn)原理為:

  • ILoadBalancer依賴ServerList通過DiscoveryClient從Eureka Client處獲取Server列表并緩存這些Server列表。
  • IPing接口定時對ILoadBalancer緩存的Server列表進(jìn)行檢測察迟,判斷其是否可用斩狱。
  • IRule接口是負(fù)載均衡策略的抽象,ILoadBalancer通過IRule選出一個Server扎瓶。

當(dāng)使用RestTemplate+@LoadBalanced的方式進(jìn)行服務(wù)調(diào)用時喊废,LoadBalancerInterceptor和RibbonLoadBalancerClient作為橋梁結(jié)合Ribbon提供負(fù)載均衡服務(wù)。

當(dāng)使用Feign接口調(diào)用服務(wù)時栗弟,LoadBalancerFeignClient和FeignLoadBalancer作為調(diào)用Ribbon的入口為Feign接口提供負(fù)載均衡服務(wù)污筷。

不管使用那種姿勢,最終都會通過Ribbon的ILoadBalancer接口實(shí)現(xiàn)負(fù)載均衡乍赫。

最后放兩個彩蛋

  1. Ribbon相關(guān)Configuration以及注入的Bean:

    • RibbonAutoConfiguration

      • 注入了 LoadBalancerClient的實(shí)現(xiàn)類RibbonLoadBalancerClient瓣蛀。
      • 注入了SpringClientFactory。
    • LoadBalancerAutoConfiguration

      • 注入了LoadBalancerInterceptor雷厂。
      • 給RestTemplate設(shè)置LoadBalancerInterceptor惋增。
    • RibbonClientConfiguration

      • 注入了ILoadBalancer的實(shí)現(xiàn)類ZoneAwareLoadBalancer。
      • 注入了IRule的實(shí)現(xiàn)類ZoneAvoidanceRule改鲫。
      • 注入了IClientConfig的實(shí)現(xiàn)類DefaultClientConfigImpl诈皿。
    • EurekaRibbonClientConfiguration

      • 注入了IPing的實(shí)現(xiàn)類NIWSDiscoveryPing。
      • 注入了ServerList的實(shí)現(xiàn)類DiscoveryEnabledNIWSServerList像棘。
  2. Ribbon相關(guān)類關(guān)系圖:

image
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末稽亏,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子缕题,更是在濱河造成了極大的恐慌截歉,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,270評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件烟零,死亡現(xiàn)場離奇詭異瘪松,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)锨阿,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,489評論 3 395
  • 文/潘曉璐 我一進(jìn)店門宵睦,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人墅诡,你說我怎么就攤上這事壳嚎。” “怎么了?”我有些...
    開封第一講書人閱讀 165,630評論 0 356
  • 文/不壞的土叔 我叫張陵诬辈,是天一觀的道長。 經(jīng)常有香客問我荐吉,道長焙糟,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,906評論 1 295
  • 正文 為了忘掉前任样屠,我火速辦了婚禮穿撮,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘痪欲。我一直安慰自己悦穿,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,928評論 6 392
  • 文/花漫 我一把揭開白布业踢。 她就那樣靜靜地躺著栗柒,像睡著了一般。 火紅的嫁衣襯著肌膚如雪知举。 梳的紋絲不亂的頭發(fā)上瞬沦,一...
    開封第一講書人閱讀 51,718評論 1 305
  • 那天,我揣著相機(jī)與錄音雇锡,去河邊找鬼逛钻。 笑死,一個胖子當(dāng)著我的面吹牛锰提,可吹牛的內(nèi)容都是我干的曙痘。 我是一名探鬼主播,決...
    沈念sama閱讀 40,442評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼立肘,長吁一口氣:“原來是場噩夢啊……” “哼边坤!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起谅年,我...
    開封第一講書人閱讀 39,345評論 0 276
  • 序言:老撾萬榮一對情侶失蹤惩嘉,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后踢故,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體文黎,經(jīng)...
    沈念sama閱讀 45,802評論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,984評論 3 337
  • 正文 我和宋清朗相戀三年殿较,在試婚紗的時候發(fā)現(xiàn)自己被綠了耸峭。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,117評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡淋纲,死狀恐怖劳闹,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情,我是刑警寧澤本涕,帶...
    沈念sama閱讀 35,810評論 5 346
  • 正文 年R本政府宣布业汰,位于F島的核電站,受9級特大地震影響菩颖,放射性物質(zhì)發(fā)生泄漏样漆。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,462評論 3 331
  • 文/蒙蒙 一晦闰、第九天 我趴在偏房一處隱蔽的房頂上張望放祟。 院中可真熱鬧,春花似錦呻右、人聲如沸跪妥。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,011評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽眉撵。三九已至,卻和暖如春落塑,著一層夾襖步出監(jiān)牢的瞬間执桌,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,139評論 1 272
  • 我被黑心中介騙來泰國打工芜赌, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留枣抱,地道東北人律姨。 一個月前我還...
    沈念sama閱讀 48,377評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像,于是被迫代替她去往敵國和親挑胸。 傳聞我的和親對象是個殘疾皇子妓雾,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,060評論 2 355

推薦閱讀更多精彩內(nèi)容