一個基于Netflix Ribbon的負載均衡與服務(wù)發(fā)現(xiàn)方案

背景

筆者接觸到的一個項目里邊有很多微服務(wù)莉测,但受限于項目創(chuàng)立之初時間和團隊技術(shù)條件所限高诺,都是最基本的SpringBoot工程挨措,沒有使用Spring Cloud Netflix或alibaba這類“全家桶”來搭建减牺,所有沒有負載均衡是嗜、服務(wù)注冊解取、服務(wù)發(fā)現(xiàn)步责、服務(wù)配置、熔斷等這些微服務(wù)治理層面的東西禀苦。每個微服務(wù)采用多節(jié)點部署蔓肯,使用nginx或阿里云slb來做負載均衡,大體上是"微服務(wù)A -> nginx/slb -> 微服務(wù)B"這樣一種方式振乏,每當重要線上活動來臨蔗包、服務(wù)的擴容需要人工的方式去配置nginx來增加新節(jié)點的路由代理,開發(fā)運維人員一直這樣初步的做著每個服務(wù)的流量在多節(jié)點進行分攤昆码。除了運維方式比較原始之外气忠,另外,當多節(jié)點中的某個節(jié)點出現(xiàn)故障時赋咽,由于缺乏自動故障轉(zhuǎn)移機制仍然會有一部分請求進入到故障節(jié)點旧噪,對應(yīng)的異常錯誤為前端用戶所感知。更為糟糕的是脓匿,高并發(fā)時間由于節(jié)點故障異常導(dǎo)致的上游服務(wù)(這里上游服務(wù)指的是出現(xiàn)節(jié)點故障的服務(wù)的調(diào)用者)超時請求異常淘钟,通常微服務(wù)之間這類http調(diào)用超時設(shè)置為5-30秒之間,這就導(dǎo)致了上游服務(wù)的QPS性能驟減,請求層層積壓,出現(xiàn)了傳說中的級聯(lián)故障撩穿。

針對以上種種钉答,筆者給這個項目或者說這個公司同樣類型架構(gòu)的一眾項目做架構(gòu)升級方案,考慮到實施成本和開發(fā)團隊的接受程度摧玫,原則是盡量減少架構(gòu)升級的改造工作量,盡量減少團隊的學(xué)習(xí)成本,同時又要解決上面所講的所有痛點慧耍,方案不求高大上,但求使用為主丐谋。

本文是當時思路的一些整理芍碧,時間也有些遠,加之筆者不是對Spring Cloud或netflix Ribbon所有的細節(jié)都了解号俐,所以行文邏輯不是很有條理泌豆,或許還有錯誤的地方。如有不當吏饿,請大家指正踪危。

分析

先定義問題蔬浙,確定問題的邊界,然后再解決問題陨倡。

1敛滋、我們需要的是一個能夠有自動負載路由功能的東西,幫助服務(wù)之間rest調(diào)用進行多節(jié)點路由兴革,路由可以采用輪詢規(guī)則绎晃、這是負載均衡,同時要能及時的發(fā)現(xiàn)被調(diào)服務(wù)中的故障節(jié)點杂曲、并及時的下線節(jié)點使得調(diào)用者能夠快速失敗庶艾、當故障恢復(fù)之后及時上線節(jié)點,這屬于熔斷機制擎勘。

2咱揍、路由配置能過改變現(xiàn)狀的人工去主機配置文件,要有一個統(tǒng)一的服務(wù)路由配置中心棚饵。

3煤裙、盡可能少的引入第三方的組件減少學(xué)習(xí)成本,盡可能少的減少改造工作量噪漾,方案力求使用為主硼砰。

筆者經(jīng)過思考,和研究各種負載均衡方案(有服務(wù)端負載均衡欣硼,和客戶端負載均衡)题翰,想到了一個方案:

如果利用Ribbon做客戶端負載均衡,結(jié)合配置中心比如Apollo或者Spring Cloud Config诈胜,一定程度上也可以實現(xiàn)動態(tài)配置新節(jié)點上線豹障,健康檢查識別故障服務(wù)節(jié)點并下線等功能:

Ribbon本身是可以選擇從本地配置文件中的或者從Eureka拉取的服務(wù)列表中選擇節(jié)點進行路由的,默認的負載均衡規(guī)則是輪詢焦匈,如果選擇引入Eureka血公,那么就是Spring Cloud標準的解決方案了:服務(wù)注冊中心Eureka負責收集各個應(yīng)用啟動的時候主動上報上來的自己的服務(wù)信息,比如服務(wù)名缓熟、地址坞笙、端口等,然后調(diào)用的應(yīng)用就可以通過eureka獲得所謂遠端服務(wù)的服務(wù)列表了荚虚,調(diào)用端的Ribbon做客戶端負載均衡從拉取的服務(wù)列表中選擇可用的服務(wù)按照負載均衡策略選一個調(diào)用。

但這樣一來需要團隊進行Eureka的搭建和維護籍茧,同時需要每一個服務(wù)都引入Eureka相關(guān)依賴并進行代碼和配置版述、使之能夠進行服務(wù)注冊,需要動員人力進行各個服務(wù)的代碼進行改造寞冯。而經(jīng)過了解渴析,團隊目前服務(wù)之間調(diào)用使用的是RestTemplate(實現(xiàn)配置的是Apache HttpClient)晚伙,做的面向接口的開發(fā),代碼類似如下:

@Autowired
private RestTemplate restTemplate;

restTemplate.getForObject(serviceFullUrl, clazz, new Object[0]); //GET調(diào)用后端服務(wù)

restTemplate.postForObject(serviceFullUrl, requestBody, clazz, new Object[0]);//POST調(diào)用

托Spring RestTemplate接口與實現(xiàn)分離的福俭茧,那么如果能僅僅改造RestTemplate的實現(xiàn)層咆疗,使之具備客戶端負載均衡、服務(wù)發(fā)現(xiàn)母债、被調(diào)服務(wù)故障下線午磁、被調(diào)故障恢復(fù)上線等功能那便是極好的。而且由于少了調(diào)用服務(wù)與被調(diào)服務(wù)之間的nginx毡们,也減少了運維人員的工作量迅皇。

解決方案思路

現(xiàn)在就是想辦法要讓Ribbon本地的服務(wù)列表動態(tài)化,Ribbon可以讀取本地Spring文件的服務(wù)列表衙熔,而配置文件是可以通過Config配置中心進行統(tǒng)一管理登颓、并且搭配Spring bus機制可以做到配置發(fā)生變化時主動通知各服務(wù)來實時更新本地的配置,這樣一來就實現(xiàn)了本地服務(wù)列表的動態(tài)化红氯。應(yīng)用可以免去各個服務(wù)主動上報給注冊中心這個動作(改為使用配置中心去配置)框咙。然后節(jié)點健康原來是由Eureka告訴Ribbon的,現(xiàn)在要ribbon主動根據(jù)服務(wù)列表去心跳檢查痢甘。一圖勝千言喇嘱,邏輯架構(gòu)如下所示:

進入正題,下面結(jié)合源代碼分析一下Ribbon的原理产阱,同時說明一下需要對Ribbon源碼進行哪些改造婉称、擴展和封裝,使之滿足我們上面的需求构蹬。

使用的主要開源項目的版本:

spring boot 2.1.13.RELEASE

spring-cloud-netflix-ribbon 2.1.5.RELEASE

spring cloud Greenwich.SR6

Ribbon主要代碼分析與改造
Ribbon的自動裝配

spring-cloud-netflix-ribbon-2.1.5.RELEASE.jar包里的spring.factories文件:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.ribbon.RibbonAutoConfiguration

是spring cloud為了繼承ribbon而開發(fā)的自動裝配類王暗,Ribbon自動裝配的Bean都在這。

同時庄敛,

/**
 *  所有RibbonClient的默認配置
 *  author: linyang
 * */
@Configuration
public class DefaultRibbonConfig {
        private Logger logger = LoggerFactory.getLogger(DefaultRibbonConfig.class);
        //在可用服務(wù)中輪詢選擇
        @Bean
        public IRule ribbonRule() {
            logger.info("全局IRule實現(xiàn)為AvailabilityFilteringRule俗壹,實例化...");
            return new AvailabilityFilteringRule();
        }
        //定時健康檢查服務(wù)列表中的應(yīng)用,置可用狀態(tài)
        @Bean
        public IPing ribbonPing() {
            logger.info("全局IPing實現(xiàn)為HealthCheckPing藻烤,實例化...");
            return new HealthCheckPing();
        }
}

然后

/**
 * 統(tǒng)一配置所有ribbon client
 * author: linyang
 * */
@RibbonClients(defaultConfiguration = DefaultRibbonConfig.class)
public class DefaultRibbonClientsConfig {
    
    public static class BazServiceList extends ConfigurationBasedServerList {

        public BazServiceList(IClientConfig config) {
            super.initWithNiwsConfig(config);
        }

    }
}

上面三個類結(jié)合Spring自動裝配绷雏,會進到org.springframework.cloud.netflix.ribbon.RibbonClientConfiguration這個配置類里邊,然后Spring對里邊的Bean進行自動注入:

    @Bean
    @ConditionalOnMissingBean
    public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
            ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
            IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
        if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
            return this.propertiesFactory.get(ILoadBalancer.class, config, name);
        }
        return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                serverListFilter, serverListUpdater);
    }

Ok怖亭,我們看到默認的ILoadBalancer是ZoneAwareLoadBalancer這個實現(xiàn)涎显,它的父類是DynamicServerListLoadBalancer,再上一級父類是BaseLoadBalancer兴猩,同時通過源碼可以看到期吓,這三個類實例化的時候都會先super()自己父類的構(gòu)造方法∏阒ィ可以看到讨勤,我們之前提到的Ribbon的“服務(wù)列表”就緩存在BaseLoadBalancer里:

protected volatile List<Server> allServerList;
protected volatile List<Server> upServerList;
protected ReadWriteLock allServerLock;
protected ReadWriteLock upServerLock;

除了上面的3個類之外箭跳,還有兩個定時任務(wù)也很關(guān)鍵:

  • PollingServerListUpdater 定時更新緩存的server list

  • PingTask 定時執(zhí)行ping來確定server節(jié)點的狀態(tài)

下面分“Ribbon的啟動流程”和“LoadBalancerClient根據(jù)服務(wù)名自動根據(jù)Rule選擇服務(wù)”兩條線來進行分析:

Ribbon的啟動流程

前文提到ZoneAwareLoadBalancer實例化

@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
        ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
        IRule rule, IPing ping, ServerListUpdater serverListUpdater){
    //...
    return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                serverListFilter, serverListUpdater);
}

構(gòu)造是傳入的參數(shù)ServerListUpdater也是在這個類配置注入的:

@Bean
@ConditionalOnMissingBean
public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {
        return new PollingServerListUpdater(config);
}

然后new ZoneAwareLoadBalancer會調(diào)用父類DynamicServerListLoadBalancer的構(gòu)造方法:

public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerList<T> serverList, ServerListFilter<T> filter, ServerListUpdater serverListUpdater) {
    super(clientConfig, rule, ping);
    this.isSecure = false;
    this.useTunnel = false;
    this.serverListUpdateInProgress = new AtomicBoolean(false);
    this.updateAction = new NamelessClass_1();
    this.serverListImpl = serverList;
    this.filter = filter;
    this.serverListUpdater = serverListUpdater;
    if (filter instanceof AbstractServerListFilter) {
        ((AbstractServerListFilter)filter).setLoadBalancerStats(this.getLoadBalancerStats());
    }

    this.restOfInit(clientConfig);
    this.setIntValue();
}

其中this.restOfInit(clientConfig)會調(diào)用this.enableAndInitLearnNewServersFeature()

public void enableAndInitLearnNewServersFeature() {
    this.serverListUpdater.start(this.updateAction);
}

所以這里執(zhí)行的就是PollingServerListUpdater的start方法了:

public synchronized void start(final UpdateAction updateAction) {
    if (this.isActive.compareAndSet(false, true)) {
        Runnable wrapperRunnable = new Runnable() {
            public void run() {
                if (!PollingServerListUpdater.this.isActive.get()) {
                    if (PollingServerListUpdater.this.scheduledFuture != null) {
                        PollingServerListUpdater.this.scheduledFuture.cancel(true);
                    }
                } else {
                    try {
                        updateAction.doUpdate();
                        PollingServerListUpdater.this.lastUpdated = System.currentTimeMillis();
                    } catch (Exception var2) {
                        PollingServerListUpdater.logger.warn("Failed one update cycle", var2);
                    }

                }
            }
        };
        this.scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(wrapperRunnable, this.initialDelayMs, this.refreshIntervalMs, TimeUnit.MILLISECONDS);
        //logger.info("polling定時任務(wù)啟動,首次延遲" + this.initialDelayMs + "ms執(zhí)行潭千,之后每" + this.refreshIntervalMs + "ms執(zhí)行一次");
    } else {
        logger.info("Already active, no-op");
    }

}

其中g(shù)etRefreshExecutor()會得到一個ScheduledThreadPoolExecutor谱姓,用來定時執(zhí)行wrapperRunnable,Runnable里邊業(yè)務(wù)邏輯主要是執(zhí)行updateAction.doUpdate()

protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
        @Override
        public void doUpdate() {
            updateListOfServers();
        }
    };
public void updateListOfServers() {
 ...
 servers = serverListImpl.getUpdatedListOfServers(); //得到服務(wù)列表
 ...
 updateAllServerList(servers); //更新服務(wù)列表
}

關(guān)于Ribbon如何從遠程獲取配置信息:

從DynamicServerListLoadBalancer里獲取服務(wù)列表配置serverListImpl.getUpdatedListOfServers()開始跟刨晴,最后找到package com.netflix.config.sources , URLConfigurationSource類:

@Override
public PollResult poll(boolean initial, Object checkPoint)
       throws IOException {
   if (configUrls == null || configUrls.length == 0) {
       return PollResult.createFull(null);
   }
   Map<String, Object> map = new HashMap<String, Object>();
   for (URL url: configUrls) {
       InputStream fin = url.openStream();
       Properties props = ConfigurationUtils.loadPropertiesFromInputStream(fin); //對遠程url的流中加載Properties
       for (Entry<Object, Object> entry: props.entrySet()) {
           map.put((String) entry.getKey(), entry.getValue());
       }
   }
   return PollResult.createFull(map);
}

package com.netflix.config.util;
ConfigurationUtils:

public static Properties loadPropertiesFromInputStream(InputStream fin) throws IOException {
   Properties props = new Properties();
   InputStreamReader reader = new InputStreamReader(fin, "UTF-8");
   try {
       props.load(reader);    //加載java Properties
       return props;
   } finally {
       if (reader != null) {
           reader.close();
       }
       if (fin != null) {
           fin.close();
       }
   }
}

之后屉来,把獲得的服務(wù)列表里的服務(wù)alive全置為true,調(diào)用setServersList(ls)割捅,然后做forceQuickPing:

protected void updateAllServerList(List<T> ls) {
        // other threads might be doing this - in which case, we pass
        if (serverListUpdateInProgress.compareAndSet(false, true)) {
            try {
                for (T s : ls) {
                    s.setAlive(true); // set so that clients can start using these
                                      // servers right away instead
                                      // of having to wait out the ping cycle.
                }

                setServersList(ls);
                super.forceQuickPing();

            } finally {
                serverListUpdateInProgress.set(false);
            }
        }
    }

調(diào)用父類BaseLoadBalancer的setServersList奶躯,然后setServerListForZones(serversInZones)

@Override
public void setServersList(List lsrv) {
        super.setServersList(lsrv); //第一次BaseLoadBalancer.setServersList(lsrv)
        //...
       
        setServerListForZones(serversInZones);//有幾個zone執(zhí)行幾次BaseLoadBalancer.setServersList(List lsrv)
       
    }

這里插一句,筆者之前通過jmap觀察JVM堆里發(fā)現(xiàn)居然有兩個BaseLoadBalancer實例亿驾,這讓筆者很疑惑嘹黔,因為按照之前的理解,BaseLoadBalancer應(yīng)該是只有一個實例才對莫瞬,仔細分析源碼發(fā)現(xiàn)了原因:

setServerListForZones這個方法是被override的儡蔓,所以最終會執(zhí)行回ZoneAwareLoadBalancer的setServerListForZones邏輯,在這里邊會通過getLoadBalancer方法完成對成員變量balancers的填充:

loadBalancer = new BaseLoadBalancer(this.getName() + "_" + zone, rule, this.getLoadBalancerStats());
BaseLoadBalancer prev = balancers.putIfAbsent(zone, loadBalancer);

也就是說疼邀,在父類DynamicServerListLoadBalancer的構(gòu)造方法完成的時候喂江,是會實例化一個new BaseLoadBalancer并且放到balancers里的(上面的setServerListForZones)。但是旁振,當從父類構(gòu)造方法返回获询,子類ZoneAwareLoadBalancer的構(gòu)造方法接下來會去執(zhí)行子類的成員變量的初始化,類似這樣一個關(guān)系:

public class ZoneAwareLoadBalancer<T extends Server> extends DynamicServerListLoadBalancer<T> {
    private ConcurrentHashMap<String, BaseLoadBalancer> balancers = new ConcurrentHashMap(); //構(gòu)造方法結(jié)束拐袜,成員變量balancers初始化吉嚣,置為new ConcurrentHashMap()
    public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerList<T> serverList, ServerListFilter<T> filter, ServerListUpdater serverListUpdater) {
        //父類DynamicServerListLoadBalancer的構(gòu)造方法,最后會通過setServerListForZones調(diào)用回子類的setServerListForZones方法蹬铺,然后填充了balancers
        super(clientConfig, rule, ping, serverList, filter, serverListUpdater);
    }
}

所以盡管在父類里對balancers進行了填充尝哆,但這時候因為子類構(gòu)造方法返回前又相當于執(zhí)行了一次balancers = new ConcurrentHashMap,所以balancers又變空了甜攀。當下一次polling任務(wù)執(zhí)行的時候秋泄,會認為balancer未初始化,所以還會new一個BaseLoadBalancer填充進去规阀。 這就是兩個BaseLoadBalancer的由來恒序。

其實可以認為,第一個BaseLoadBalancer實例這時候已經(jīng)成為了垃圾對象谁撼、失去了引用奸焙,會在下一次ygc的時候被gc回收。

DynamicServerListLoadBalancer.java

//這個方法在初始化和之后的每次polling任務(wù)都會執(zhí)行

public void setServersList(List lsrv) {
  super.setServersList(lsrv); //第一次BaseLoadBalancer.setServersList(List lsrv)

  setServerListForZones(serversInZones); //有幾個zone執(zhí)行幾次BaseLoadBalancer.setServersList(List lsrv)
}

其次,上述setServerListForZones邏輯new出來的BaseLoadBalancer用的構(gòu)造方法最后ping都是null与帆,如下:

public BaseLoadBalancer(String lbName, IRule rule, LoadBalancerStats lbStats) {
    this(lbName, rule, lbStats, (IPing)null);
}

所以執(zhí)行下一次polling任務(wù),BaseLoadBalancer.setServersList(lsrv)的時候會進到canSkipPing()墨榄、把全部server都置成alive玄糟!筆者的解決辦法是把這段邏輯注釋掉了的。

兩個BaseLoadBalancer實例其中一個成為了垃圾對象袄秩,交給gc去處理就好了阵翎。zone對應(yīng)的BaseLoadBalancer執(zhí)行setServersList時因為ping=null會默認把所有server的狀態(tài)置為alive,需要去掉這部分邏輯之剧。

最后郭卫,看下BaseLoadBalancer.setServersList(lsrv)方法,也就是設(shè)置服務(wù)列表的核心方法:

public void setServersList(List lsrv) {
    Lock writeLock = allServerLock.writeLock();
    logger.debug("LoadBalancer [{}]: clearing server list (SET op)", name);
    
    ArrayList<Server> newServers = new ArrayList<Server>();
    writeLock.lock();
    try {
        ArrayList<Server> allServers = new ArrayList<Server>();
        for (Object server : lsrv) {
            if (server == null) {
                continue;
            }

            if (server instanceof String) {
                server = new Server((String) server);
            }

            if (server instanceof Server) {
                logger.debug("LoadBalancer [{}]:  addServer [{}]", name, ((Server) server).getId());
                /* 
                 *
                   allServers里邊新加載的服務(wù)列表原來默認都是alive的背稼,
                   這里改為如果服務(wù)在當前的allServerList里存在贰军、先以當前的為準;如果不存在則先置為alive蟹肘,我們認為這樣更合理词疼。
                    這樣當pinger超時阻塞而暫未返回的時候,不會得出錯誤的服務(wù)狀態(tài)
                */
                for(Server s : allServerList) {
                    if(((Server) server).getId().equals(s.getId())) {
                        ((Server) server).setAlive(s.isAlive());
                    }
                }

                allServers.add((Server) server);
            } else {
                throw new IllegalArgumentException(
                        "Type String or Server expected, instead found:"
                                + server.getClass());
            }

        }
        boolean listChanged = false;
        if (!allServerList.equals(allServers)) {
            listChanged = true;
            if (changeListeners != null && changeListeners.size() > 0) {
               List<Server> oldList = ImmutableList.copyOf(allServerList);
               List<Server> newList = ImmutableList.copyOf(allServers);                   
               for (ServerListChangeListener l: changeListeners) {
                   try {
                       l.serverListChanged(oldList, newList);
                   } catch (Exception e) {
                       logger.error("LoadBalancer [{}]: Error invoking server list change listener", name, e);
                   }
               }
            }
        }
        if (isEnablePrimingConnections()) {
            for (Server server : allServers) {
                if (!allServerList.contains(server)) {
                    server.setReadyToServe(false);
                    newServers.add((Server) server);
                }
            }
            if (primeConnections != null) {
                primeConnections.primeConnectionsAsync(newServers, this);
            }
        }
        // This will reset readyToServe flag to true on all servers
        // regardless whether
        // previous priming connections are success or not
        
        allServerList = allServers; //這時候重置了allServerList的引用
        
        if (canSkipPing()) {
            /**
              *  這段注釋掉的原因:
              *  每個zone對應(yīng)的BaseLoadBalancer的ping=null帘腹,將會把所有server都再置為alive贰盗,
              *  與筆者認為的“已經(jīng)存在的server先默認認為與原來狀態(tài)一樣、新server才默認置為alive”的原則相左
             *  
             * */
            /*
            for (Server s : allServerList) {
                s.setAlive(true);
            }
            upServerList = allServerList;
           */
        } else if (listChanged) {
            forceQuickPing();
        }

        logger.debug("服務(wù)列表已更新BaseLoadBalancer.setServersList() 阳欲,allServerList :" + JSON.toJSONString(allServerList));

    } finally {
        writeLock.unlock();
    }
}
LoadBalancerClient根據(jù)服務(wù)名自動根據(jù)Rule選擇服務(wù)

前面花了很多時間分析Ribbon本地內(nèi)存里的服務(wù)列表是怎么更新的舵盈,有了服務(wù)列表那么就可以按照一定的Rule規(guī)則來選擇1個服務(wù)來執(zhí)行調(diào)用了,所以接下來關(guān)鍵是規(guī)則球化。筆者這里使用Ribbon的AvailabilityFilteringRule秽晚,即在當前可用服務(wù)中輪詢選擇,所以關(guān)鍵就在于如何判斷各服務(wù)的可用狀態(tài)Alive赊窥,Ribbon使用IPing接口來判斷每個服務(wù)節(jié)點的可用狀態(tài)爆惧,其默認實現(xiàn)是使用的DummyPing,就是每個服務(wù)isAlive都返回true锨能,因為Ribbon一般是搭配Eureka使用的扯再,由后者負責維護各服務(wù)節(jié)點的可用狀態(tài),Ribbon默認認為從Eureka獲得的節(jié)點就都是可用的址遇。所以我們這里要自己實現(xiàn)一個自定義的Ping熄阻,來判斷每個節(jié)點的狀態(tài):

@Configuration
public class DefaultRibbonConfig {
    private Logger logger = LoggerFactory.getLogger(DefaultRibbonConfig.class);

    public DefaultRibbonConfig() {
    }

    @Bean
    public IRule ribbonRule() {
        this.logger.info("全局IRule實現(xiàn)為AvailabilityFilteringRule,實例化...");
        return new AvailabilityFilteringRule();
    }

    @Bean
    public IPing ribbonPing() {
        this.logger.info("全局IPing實現(xiàn)為HealthCheckPing倔约,實例化...");
        return new HealthCheckPing();
    }
}

HealthCheckPing:

/**
    自定義的Ping秃殉,向各服務(wù)節(jié)點統(tǒng)一的一個健康檢查接口發(fā)送Http HEAD請求來判斷節(jié)點的可用狀態(tài)
*/
public class HealthCheckPing implements IPing {
    private Logger logger = LoggerFactory.getLogger(HealthCheckPing.class);
    private static CloseableHttpClient httpClient;

    public HealthCheckPing() {
    }

    private CloseableHttpClient getHttpClient() {
        if (httpClient == null) {
            httpClient = HttpClientBuilder.create().build();
        }

        return httpClient;
    }

    public boolean isAlive(Server server) {
        boolean isAlive = true;
        String url = "http://" + server.getId() + "/" + server.getServiceName() + "/healthcheck/checkAlive";
        HttpHead headRequest = new HttpHead(url);
        RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(2000).setSocketTimeout(2000).setConnectionRequestTimeout(2000).build();
        headRequest.setConfig(requestConfig);

        try {
            HttpResponse response = this.getHttpClient().execute(headRequest);
            isAlive = response.getStatusLine().getStatusCode() == 200;
        } catch (Exception var10) {
            this.logger.error(var10.getMessage());
            isAlive = false;
        } finally {
            this.logger.info("心跳檢測結(jié)果:節(jié)點" + url + "  ,狀態(tài):[" + isAlive + "] ");
            headRequest.abort();
        }

        return isAlive;
    }
}

在BaseLoadBalancer實例化的時候會啟動定時任務(wù)PingTask,默認10秒一次:

/**    
    ShutdownEnabledTimer是個java Timer钾军,Runntime.shutdown的時候會調(diào)用cancel()結(jié)束定時,
    而定時執(zhí)行的邏輯就是PingTask的run()方法了
*/
void setupPingTask() {
    if (!this.canSkipPing()) {
        if (this.lbTimer != null) {
            this.lbTimer.cancel();
        }

        String uuid = UUID.randomUUID().toString().replaceAll("-", "");
        this.lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + uuid + "-" + this.name, true);
        this.lbTimer.schedule(new BaseLoadBalancer.PingTask(), 0L, (long)(this.pingIntervalSeconds * 1000));
        this.forceQuickPing();
    }
}

定時執(zhí)行PingTask鳄袍,task里邊邏輯是new Pinger(pingStrategy).runPinger();

public void runPinger() throws Exception {
    if (!pingInProgress.compareAndSet(false, true)) { 
        logger.debug("runPinger pingInProgress " + pingInProgress.get());
        return; // Ping in progress - nothing to do
    }
    
    // we are "in" - we get to Ping

    Server[] allServers = null;
    boolean[] results = null;

    Lock allLock = null;
    Lock upLock = null;
    String uuid = UUID.randomUUID().toString().replaceAll("-","");
    try {
        /*
         * The readLock should be free unless an addServer operation is
         * going on...
         */
        allLock = allServerLock.readLock();
        allLock.lock();
        allServers = allServerList.toArray(new Server[allServerList.size()]);
        
        allLock.unlock();

        int numCandidates = allServers.length;
        results = pingerStrategy.pingServers(ping, allServers);

        final List<Server> newUpList = new ArrayList<Server>();
        final List<Server> changedServers = new ArrayList<Server>();
        final Map<String, Server> allServersMap = new HashMap<String, Server>(); //暫存allServers,里邊的Server狀態(tài)是正確的
        
        //根據(jù)ping結(jié)果吏恭,設(shè)置allServers里Server的狀態(tài)
        for (int i = 0; i < numCandidates; i++) {
            boolean isAlive = results[i];
            Server svr = allServers[i];
            boolean oldIsAlive = svr.isAlive();
            svr.setAlive(isAlive);

            allServersMap.put(svr.getId(), svr);
            
            if (oldIsAlive != isAlive) {
                changedServers.add(svr);    //狀態(tài)發(fā)生變化的
                logger.debug("LoadBalancer [{}]:  Server [{}] status changed to {}", 
                    name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
            }

            if (isAlive) {
                newUpList.add(svr);
            }
        }
        upLock = upServerLock.writeLock();
        upLock.lock();
        upServerList = newUpList;
        upLock.unlock();
        
        changeServerStatusInAllServerList(allServersMap); //補償邏輯
        notifyServerStatusChangeListener(changedServers);
    } finally {
        pingInProgress.set(false);
    }
}

runPinger()的關(guān)鍵代碼是results = pingerStrategy.pingServers(ping, allServers);

里邊就是對每個server都用iPing接口實現(xiàn)的ping邏輯進行驗證一遍拗小,獲得每個server的最新狀態(tài)。

記錄一下本次changedServers都有哪些樱哼。注意哀九,這里會去持有一個真正的allServerList的引用并去更新狀態(tài),upServerList也會去更新搅幅。 allServers = allServerList.toArray(new Server[allServerList.size()]); 所以定時線程PingTask的作用是用iPing里的邏輯驗證服務(wù)器的狀態(tài)阅束,然后更新服務(wù)器列表狀態(tài)。

但是茄唐,坑爹的是ping持有的這個allServerList的引用會“失效”息裸!

ping接口持有的allServerList的引用為什么會失效,原因是PollingServerListUpdater線程每次加載配置的serverlist的時候琢融,默認都是alive的界牡、且用的是allServerList = serverList這種方式,這樣就算allServerList定義為volatile類型也無濟于事漾抬,畢竟整個引用指向了另一個對象(這時候之前的對象正在被Ping接口持有)宿亡。

而這時候Ping接口里邊的邏輯因為在阻塞超時2秒,沒有執(zhí)行完纳令,所以相當于PIng接口里邊持有的allServerList指向的還是之前的那個對象挽荠。所以這時候Ping接口根據(jù)當前檢查結(jié)果去更新服務(wù)列表也是沒用的了,因為持有的服務(wù)列表是過期的對象平绩。

簡單來說就是polling和ping兩個定時任務(wù)同時去修改服務(wù)列表圈匆、加上polling默認服務(wù)alive、且直接修改allServerList引用捏雌,導(dǎo)致的并發(fā)問題跃赚。(源碼里allServerList有個讀寫鎖,但是對這個場景來說并沒有用)

我的解決辦法是在BaseLoadBalancer里邊在runPinger最后加了補償邏輯性湿,最后再更新一遍allServerList纬傲;

 /**
   * 在runPinger之后,再修改一次allServerList肤频,防止runPinger的過程中allServerList已經(jīng)被修改了引用
   * 這時候runPinger持有的已經(jīng)是過期引用叹括,修改的server狀態(tài)也是沒法更新到真正的allServerList了。
 * */
private void changeServerStatusInAllServerList(Map<String, Server> allServersMap) {

        Lock allLock = null;
        allLock = allServerLock.writeLock();
        allLock.lock();
        try {
            for(Server s : allServerList) {
                Server serverWithRightStatus = allServersMap.get(s.getId());
                if(null != serverWithRightStatus) {
                    s.setAlive(serverWithRightStatus.isAlive());
                    logger.debug("修改了Server {}的狀態(tài)宵荒,最新狀態(tài)為alive={}" , s.getId(), s.isAlive());
                }
            }
        }finally {
            allLock.unlock();
        }
    }

但這樣還不夠汁雷,如果一個節(jié)點down掉净嘀,在下一次ping循環(huán)時,超時2秒之內(nèi)pinger還沒返回的時候侠讯,如果正好這2s的時候發(fā)生了調(diào)用挖藏,本該down的節(jié)點仍然被設(shè)置的是alive,所以關(guān)鍵問題是要改一下polling線程每次“都設(shè)置為alive”的邏輯:回顧一下前文中BaseLoadBalancer的setServersList(List lsrv)方法中的兩段注釋:

/* 
 *
    allServers里邊新加載的服務(wù)列表原來默認都是alive的厢漩,
    這里改為如果服務(wù)在當前的allServerList里存在熬苍、先以當前的為準;
    如果不存在則先置為alive袁翁,我們認為這樣更合理。
    這樣當pinger超時阻塞而暫未返回的時候婿脸,不會得出錯誤的服務(wù)狀態(tài)
 */
 for(Server s : allServerList) {
     if(((Server) server).getId().equals(s.getId())) {
         ((Server) server).setAlive(s.isAlive());
     }
 }

然后又發(fā)現(xiàn)了新的問題:down的節(jié)點在2秒的時候又被設(shè)置回alive了粱胜!

檢查代碼發(fā)現(xiàn)是第二次setServerList的時候又給改回來了,也就是zone的setServerList的時候狐树;(見上文焙压,setServerList會被掉兩次,后面也會有解釋)

 if (canSkipPing()) {
        /**
        *  這段注釋掉的原因:
        *  每個zone對應(yīng)的BaseLoadBalancer的ping=null抑钟,將會把所有server都再置為alive涯曲,
        *  與筆者認為的“已經(jīng)存在的server先默認認為與原來狀態(tài)一樣、新server才默認置為alive”的原則相左
        * */
     
        /*
            for (Server s : allServerList) {
                s.setAlive(true);
            }
            upServerList = allServerList;
        */
  } else if (listChanged) {
            forceQuickPing();
  }

上面的代碼會走到注釋的地方在塔,所以還是zone對應(yīng)的balancer有兩個的原因幻件,其中一個ping為null

暫時注釋掉這段解決了問題。

BaseLoadBalancer的setServersList方法蛔溃,執(zhí)行兩次

PollingServerListUpdater-0或者PollingServerListUpdater-1線程組成個線程池绰沥,負責執(zhí)行30秒一次的定時任務(wù)。這個定時任務(wù)執(zhí)行BaseLoadBalancer的setServersList方法贺待,也執(zhí)行了兩次徽曲。

原因是在DynamicServerListLoadBalancer初始化的時候會初始化PollingServerListUpdater并調(diào)用其start()啟動定時任務(wù),定時執(zhí)行updateAction.doUpdate()麸塞,里邊也就是setServersList(List lsrv)秃臣,它接連調(diào)用了BaseLoadBalancer的setServersList方法,和一個setServerListForZones方法哪工,而后者的實現(xiàn)是在子類ZoneAwareLoadBalancer里奥此,getLoadBalancer(zone).setServersList(entry.getValue());就又會調(diào)用一次setServersList方法了。這就是為啥日志里看是PollingServerListUpdater線程執(zhí)行了兩次BaseLoadBalancer的setServersList的邏輯正勒。

這個執(zhí)行兩次得院,感覺是一種業(yè)務(wù)邏輯:服務(wù)列表更新,相當于是先更新當前banancer的章贞,然后更新當前所屬zone的祥绞;也就是說每30秒只執(zhí)行一次polling線程的邏輯非洲,然后setServersList執(zhí)行了兩次。

對com.netflix.loadbalancer.Server類的字段進行擴充蜕径,增加應(yīng)用名serviceName两踏,Ribbon配置服務(wù)列表的時候是IP+Port,我們這次要求加上服務(wù)名兜喻,即ip:port/serviceName

package com.netflix.loadbalancer;

import com.netflix.util.Pair;

/**
 * Class that represents a typical Server (or an addressable Node) i.e. a
 * Host:port identifier
 * 
 * @author stonse
 * 
 *  在原來的基礎(chǔ)上增加了應(yīng)用名serviceName
 *  刷新服務(wù)列表的同時保存應(yīng)用名梦染,給自定義Ping做心跳檢測用
 */
public class Server {

    //...
    
    public static final String UNKNOWN_ZONE = "UNKNOWN";
    private String host;
    private int port = 80;
    private String scheme;
    private volatile String id;
    private volatile boolean isAliveFlag;
    private String zone = UNKNOWN_ZONE;
    private volatile boolean readyToServe = true;
    private String serviceName; //服務(wù)名,與serviceName.ribbon.xxx中的serviceName一樣朴皆,是服務(wù)提供者的實際應(yīng)用名稱

    // ...

    public void setId(String id) {
        Pair<String, Integer> hostPort = getHostPort(id);
        if (hostPort != null) {
            this.id = hostPort.first() + ":" + hostPort.second();
            this.host = hostPort.first();
            this.port = hostPort.second();
            this.scheme = getScheme(id);
        } else {
            this.id = null;
        }
        this.serviceName = parseServiceName(id); //add by liny
    }
    
    //獲取服務(wù)名
    public String getServiceName() {
        return this.serviceName;
    }
    
    //解析服務(wù)名
    private String parseServiceName(String id) {
        if (id != null) {
            if (id.toLowerCase().startsWith("http://")) {
                id = id.substring(7);
            } else if (id.toLowerCase().startsWith("https://")) {
                id = id.substring(8);
            }

            if (id.contains("/")) {
                int slash_idx = id.indexOf("/");
                id = id.substring(slash_idx);
                if(id.length()>1)
                    return id.substring(1);
                else
                    return "";
            }else {
                return "";
            }
        }else {
            return null;
        }

    }
    
  // ...  
}

到此帕识,我們了解了定時更新服務(wù)列表與定時Ping來判斷服務(wù)節(jié)點的可用性。接下來看下組件提供的工具類的代碼:

/**
 * 創(chuàng)建一個有負載均衡功能的restTemplate遂铡,最終提供出去的工具類
 * 
 * */
public class LbRestTemplate {
    private Logger logger = LoggerFactory.getLogger(LbRestTemplate.class);
    
//  @Autowired
    private RestTemplate restTemplate;
    
//  @Autowired
    private LoadBalancerClient loadBalancer;
    
    public LbRestTemplate(RestTemplate restTemplate, LoadBalancerClient loadBalancer){
        this.restTemplate = restTemplate;
        this.loadBalancer = loadBalancer;
    }
    
    public <T extends Object> T getForBean(String serviceName, String interfaceUrl, Class<T> clazz){
        ServiceInstance instance = loadBalancer.choose(serviceName);
        String contextPath ;
        if(null == instance)
            throw new RuntimeException(serviceName + "當前無可用節(jié)點");
        URI serviceAddress = instance.getUri();
        if(instance instanceof RibbonServer) {
            contextPath = ((RibbonServer) instance).getServer().getServiceName();
        }else {
            contextPath = serviceName;
        }

        String serviceFullUrl = serviceAddress + "/" + contextPath + interfaceUrl;

        logger.debug("serviceFullUrl:{}", serviceFullUrl);

        return restTemplate.getForObject(serviceFullUrl, clazz);
    }

    public <T extends Object> T postForBean(String serviceName, String interfaceUrl, Object requestBody, Class<T> clazz){
        ServiceInstance instance = loadBalancer.choose(serviceName);
        String contextPath ;
        if(null == instance)
            throw new RuntimeException(serviceName + "當前無可用節(jié)點");
        URI serviceAddress = instance.getUri();
        if(instance instanceof RibbonServer) {
            contextPath = ((RibbonServer) instance).getServer().getServiceName();
        }else {
            contextPath = serviceName;
        }

        String serviceFullUrl = serviceAddress + "/" + contextPath + interfaceUrl;

        logger.debug("serviceFullUrl:{}", serviceFullUrl);

        return restTemplate.postForObject(serviceFullUrl, requestBody, clazz);

    }

}

LbRestTemplate通過Spring SPI機制注入到Spring IOC容器后肮疗,LoadBalancerClient會Ribbon的RibbonAutoConfiguration也通過SPI自動注入:

    @Bean
    @ConditionalOnMissingBean(LoadBalancerClient.class)
    public LoadBalancerClient loadBalancerClient() {
        return new RibbonLoadBalancerClient(springClientFactory());
    }

RibbonLoadBalancerClient.choose(serviceName)最終會走到ZoneAwareLoadBalancer以及BaseLoadBalancer的chooseServer方法里,后者掉用this.rule.choose()方法來根據(jù)Rule來選服務(wù)扒接。

總結(jié):

至此伪货,Ribbon的整個流程就串起來了:polling動態(tài)更新服務(wù)列表、服務(wù)狀態(tài)判定ping钾怔、動態(tài)路由根據(jù)rule選取服務(wù)節(jié)點碱呼。

Ribbon一般是配合Eureka使用,節(jié)點健康狀態(tài)從Eureka獲取到本地然后直接取用宗侦、不會發(fā)生阻塞愚臀,我們使用了自定義的Ping根據(jù)心跳檢查來判定對端服務(wù)節(jié)點的可用狀態(tài),當節(jié)點故障時會有超時阻塞凝垛。所以針對阻塞的情況我們對Ribbon源代碼中節(jié)點狀態(tài)邏輯做了一系列的優(yōu)化懊悯。

參考:

Spring Cloud Netflix

Netflix/ribbon: Ribbon is a Inter Process Communication (remote procedure calls) library with built in software load balancers. The primary usage model involves REST calls with various serialization scheme support. (github.com)

Ribbon的應(yīng)用 - 知乎 (zhihu.com)

深入理解Ribbon原理 - 知乎 (zhihu.com)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市梦皮,隨后出現(xiàn)的幾起案子炭分,更是在濱河造成了極大的恐慌,老刑警劉巖剑肯,帶你破解...
    沈念sama閱讀 219,490評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件捧毛,死亡現(xiàn)場離奇詭異,居然都是意外死亡让网,警方通過查閱死者的電腦和手機呀忧,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,581評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來溃睹,“玉大人而账,你說我怎么就攤上這事∫蚱” “怎么了泞辐?”我有些...
    開封第一講書人閱讀 165,830評論 0 356
  • 文/不壞的土叔 我叫張陵笔横,是天一觀的道長。 經(jīng)常有香客問我咐吼,道長吹缔,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,957評論 1 295
  • 正文 為了忘掉前任锯茄,我火速辦了婚禮厢塘,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘肌幽。我一直安慰自己晚碾,他們只是感情好,可當我...
    茶點故事閱讀 67,974評論 6 393
  • 文/花漫 我一把揭開白布喂急。 她就那樣靜靜地躺著迄薄,像睡著了一般。 火紅的嫁衣襯著肌膚如雪煮岁。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,754評論 1 307
  • 那天涣易,我揣著相機與錄音画机,去河邊找鬼。 笑死新症,一個胖子當著我的面吹牛步氏,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播徒爹,決...
    沈念sama閱讀 40,464評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼荚醒,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了隆嗅?” 一聲冷哼從身側(cè)響起界阁,我...
    開封第一講書人閱讀 39,357評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎胖喳,沒想到半個月后泡躯,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,847評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡丽焊,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,995評論 3 338
  • 正文 我和宋清朗相戀三年较剃,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片技健。...
    茶點故事閱讀 40,137評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡写穴,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出雌贱,到底是詐尸還是另有隱情啊送,我是刑警寧澤偿短,帶...
    沈念sama閱讀 35,819評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站删掀,受9級特大地震影響翔冀,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜披泪,卻給世界環(huán)境...
    茶點故事閱讀 41,482評論 3 331
  • 文/蒙蒙 一纤子、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧款票,春花似錦控硼、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,023評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至缚够,卻和暖如春幔妨,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背谍椅。 一陣腳步聲響...
    開封第一講書人閱讀 33,149評論 1 272
  • 我被黑心中介騙來泰國打工误堡, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人雏吭。 一個月前我還...
    沈念sama閱讀 48,409評論 3 373
  • 正文 我出身青樓锁施,卻偏偏與公主長得像,于是被迫代替她去往敵國和親杖们。 傳聞我的和親對象是個殘疾皇子悉抵,可洞房花燭夜當晚...
    茶點故事閱讀 45,086評論 2 355

推薦閱讀更多精彩內(nèi)容