spring cloud ribbon學(xué)習(xí)三:負(fù)載均衡器ILoadBalancer接口及其實(shí)現(xiàn)

看負(fù)載均衡器這源碼,好繞兔魂,看的好累。

雖然Spring Cloud中定義了LoadBalancerClient作為負(fù)載均衡器的通用接口,并且針對(duì)Ribbon實(shí)現(xiàn)了RibbonLoadBalancerClient初肉,但是它作為具體實(shí)現(xiàn)客戶端負(fù)載均衡時(shí),是通過(guò)Ribbon的com.netflix.loadbalancer.ILoadBalancer接口實(shí)現(xiàn)的郊愧。

總結(jié)一下:
ILoadBalancer接口實(shí)現(xiàn)類(lèi)做了以下的一些事情:
1.維護(hù)了存儲(chǔ)服務(wù)實(shí)例Server對(duì)象的二個(gè)列表朴译。一個(gè)用于存儲(chǔ)所有服務(wù)實(shí)例的清單,一個(gè)用于存儲(chǔ)正常服務(wù)的實(shí)例清單
2.初始化得到可用的服務(wù)列表属铁,啟動(dòng)定時(shí)任務(wù)去實(shí)時(shí)的檢測(cè)服務(wù)列表中的服務(wù)的可用性眠寿,并且間斷性的去更新服務(wù)列表,結(jié)合注冊(cè)中心焦蘑。
3.選擇可用的服務(wù)進(jìn)行調(diào)用(這個(gè)一般交給IRule去實(shí)現(xiàn)盯拱,不同的輪詢策略)

三個(gè)很重要的概念

  • ServerList接口:定義用于獲取服務(wù)器列表的方法的接口,主要實(shí)現(xiàn)DomainExtractingServerList接口,每隔30s種執(zhí)行g(shù)etUpdatedListOfServers方法進(jìn)行服務(wù)列表的更新例嘱。
  • ServerListUpdater接口:主要實(shí)現(xiàn)類(lèi)EurekaNotificationServerListUpdater和PollingServerListUpdater(默認(rèn)使用的是PollingServerListUpdater狡逢,結(jié)合Eureka注冊(cè)中心,定時(shí)任務(wù)的方式進(jìn)行服務(wù)列表的更新)
  • ServerListFilter接口:根據(jù)LoadBalancerStats然后根據(jù)一些規(guī)則去過(guò)濾部分服務(wù)拼卵,比如根據(jù)zone(區(qū)域感知)去過(guò)濾奢浑。(主要實(shí)現(xiàn)類(lèi)ZonePreferenceServerListFilter的getFilteredListOfServers會(huì)在更新服務(wù)列表的時(shí)候去執(zhí)行)。
ILoadBalancer及其實(shí)現(xiàn)

com.netflix.loadbalancer.AbstractLoadBalancer

AbstractLoadBalancer contains features required for most loadbalancing
implementations.
An anatomy of a typical LoadBalancer consists of 1. A List of Servers (nodes)
that are potentially bucketed based on a specific criteria. 2. A Class that
defines and implements a LoadBalacing Strategy via IRule 3. A
Class that defines and implements a mechanism to determine the
suitability/availability of the nodes/servers in the List.
AbstractLoadBalancer包含大多數(shù)負(fù)載均衡實(shí)現(xiàn)的特征腋腮。
典型的LoadBalancer(負(fù)載均衡器)包括
1.一個(gè)基于某些特征的服務(wù)列表雀彼。
2.一個(gè)通過(guò)IRule定義和實(shí)現(xiàn)負(fù)載均衡戰(zhàn)略的類(lèi)。
3.一個(gè)用來(lái)確定列表節(jié)點(diǎn)/服務(wù)是否可用的類(lèi)即寡。

public abstract class AbstractLoadBalancer implements ILoadBalancer {
    
public enum ServerGroup{
       
   //所有服務(wù)實(shí)例     
   ALL,
   //正常服務(wù)的實(shí)例
   STATUS_UP,
   //停止服務(wù)的實(shí)例
   STATUS_NOT_UP        
}
        
/**
 * 選擇具體的服務(wù)實(shí)例徊哑,key為null,忽略key的條件判斷
 */
public Server chooseServer() {
    return chooseServer(null);
}
/**
 * 定義了根據(jù)分組類(lèi)型來(lái)獲取不同的服務(wù)實(shí)例的列表聪富。
 */
 public abstract List<Server> getServerList(ServerGroup serverGroup);
    
/**
 * 定義了獲取LoadBalancerStats對(duì)象的方法莺丑,LoadBalancerStats對(duì)象被用來(lái)存儲(chǔ)負(fù)載均衡器中
 * 各個(gè)服務(wù)實(shí)例當(dāng)前的屬性和統(tǒng)計(jì)信息。這些信息非常有用墩蔓,我們可以利用這些信息來(lái)觀察負(fù)載均衡
 * 的運(yùn)行情況梢莽,同時(shí)這些信息也是用來(lái)制定負(fù)載均衡策略的重要依據(jù)萧豆。
 */
public abstract LoadBalancerStats getLoadBalancerStats();
    
}

com.netflix.loadbalancer.BaseLoadBalancer

com.netflix.loadbalancer.BaseLoadBalancer類(lèi)是Ribbon負(fù)載均衡器的基礎(chǔ)實(shí)現(xiàn)類(lèi),在該類(lèi)中定義了很多關(guān)于負(fù)載均衡器相關(guān)的基礎(chǔ)內(nèi)容蟹漓。

定義并維護(hù)了兩種存儲(chǔ)服務(wù)實(shí)例Server對(duì)象的列表炕横。一個(gè)用于存儲(chǔ)所有服務(wù)實(shí)例的清單,一個(gè)用于存儲(chǔ)正常服務(wù)的實(shí)例清單葡粒。

@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> allServerList = Collections
         .synchronizedList(new ArrayList<Server>());
@Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> upServerList = Collections
         .synchronizedList(new ArrayList<Server>());
  • 定義了之前我們提到的用來(lái)存儲(chǔ)負(fù)載均衡實(shí)例屬性和統(tǒng)計(jì)信息的LoadBalancerStates對(duì)象份殿。
  • 定義了檢查服務(wù)實(shí)例是否正常服務(wù)的IPing對(duì)象,在BaseLoadBalancer中默認(rèn)為null嗽交,需要在構(gòu)造時(shí)注入它的具體實(shí)現(xiàn)卿嘲。
protected IPing ping = null;
  • 定義了檢查服務(wù)實(shí)例操作的執(zhí)行策略對(duì)象IPingStrategy,在BaseLoadBalancer中默認(rèn)使用了該類(lèi)中定義的靜態(tài)內(nèi)部類(lèi)SerialPingStrategy實(shí)現(xiàn)夫壁。根據(jù)源碼拾枣,我們可以看到該策略采用線性遍歷ping服務(wù)實(shí)例的方式實(shí)現(xiàn)檢查。該策略在當(dāng)IPing對(duì)象的實(shí)現(xiàn)速度不理想盒让,或者是Server列表過(guò)大時(shí)梅肤,可能會(huì)影響到系統(tǒng)性能,這時(shí)候需要通過(guò)實(shí)現(xiàn)IPingStrategy接口并重寫(xiě)pingServer(IPing ping Server[] servers)函數(shù)去擴(kuò)展ping的執(zhí)行策略邑茄。
protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY;
private final static SerialPingStrategy DEFAULT_PING_STRATEGY = new SerialPingStrategy();
SerialPingStrategy實(shí)現(xiàn)
  • 定義了負(fù)載均衡的處理原則IRule對(duì)象姨蝴,從BaseLoadBalancerchooseServer(Object key)的實(shí)現(xiàn)源碼可以知道,負(fù)載均衡器實(shí)際將服務(wù)實(shí)例選擇的任務(wù)委托給IRule實(shí)例中的choose函數(shù)來(lái)實(shí)現(xiàn)肺缕。
    默認(rèn)初始化了RoundRobinRule實(shí)現(xiàn)左医,RoundRobinRule實(shí)現(xiàn)了最基本且常用的線性負(fù)載均衡規(guī)則。
protected IRule rule = DEFAULT_RULE;
private final static IRule DEFAULT_RULE = new RoundRobinRule();
BaseLoadBalancer的chooseServer方法
  • 啟動(dòng)ping任務(wù):在BaseLoadBalancer的默認(rèn)構(gòu)造函數(shù)中同木,會(huì)直接啟動(dòng)一個(gè)用于定時(shí)檢查Server是否健康的任務(wù)浮梢。該任務(wù)默認(rèn)的執(zhí)行間隔式10s。
/**
  * Default constructor which sets name as "default", sets null ping, and
  * {@link RoundRobinRule} as the rule.
  * <p>
  * This constructor is mainly used by {@link ClientFactory}. Calling this
  * constructor must be followed by calling {@link #init()} or
  * {@link #initWithNiwsConfig(IClientConfig)} to complete initialization.
  * This constructor is provided for reflection. When constructing
  * programatically, it is recommended to use other constructors.
  */
public BaseLoadBalancer() {
     this.name = DEFAULT_NAME;
     this.ping = null;
     setRule(DEFAULT_RULE);
     setupPingTask();
     lbStats = new LoadBalancerStats(DEFAULT_NAME);
}

定時(shí)任務(wù)

void setupPingTask() {
   if (canSkipPing()) {
       return;
   }
   if (lbTimer != null) {
      lbTimer.cancel();
   }
   lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
         true);
    lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
    forceQuickPing();
}
  • 實(shí)現(xiàn)了ILoadBalancer接口定義的一系列的基本操作:

addServers(List<Server> newServers):向負(fù)載均衡器中增加新的服務(wù)實(shí)例列表彤路,該實(shí)現(xiàn)將原本已經(jīng)維護(hù)的所有服務(wù)實(shí)例清單allServerList和新傳入的服務(wù)實(shí)例清單newServers都加入了newList中秕硝,然后通過(guò)調(diào)用setServersList函數(shù)對(duì)newList進(jìn)行處理,在BaseLoadBalancer中實(shí)現(xiàn)的時(shí)候會(huì)使用新的列表覆蓋舊的列表洲尊。

    /**
     * Add a list of servers to the 'allServer' list; does not verify
     * uniqueness, so you could give a server a greater share by adding it more
     * than once
     */
    @Override
    public void addServers(List<Server> newServers) {
        if (newServers != null && newServers.size() > 0) {
            try {
                ArrayList<Server> newList = new ArrayList<Server>();
                newList.addAll(allServerList);
                newList.addAll(newServers);
                setServersList(newList);
            } catch (Exception e) {
                logger.error("Exception while adding Servers", e);
            }
        }
    }

chooseServer(Object key) :挑選一個(gè)具體的服務(wù)實(shí)例缝裤,

  public Server chooseServer(Object key) {
        if (counter == null) {
            counter = createCounter();
        }
        counter.increment();
        if (rule == null) {
            return null;
        } else {
            try {
                return rule.choose(key);
            } catch (Throwable t) {
                return null;
            }
        }
    }

markServerDown(Server server):標(biāo)記某個(gè)服務(wù)實(shí)例為暫停服務(wù)。

public void markServerDown(Server server) {
   if (server == null) {
      return;
   }

   if (!server.isAlive()) {
       return;
   }

   logger.error("LoadBalancer:  markServerDown called on ["
        + server.getId() + "]");
   server.setAlive(false);
   //forceQuickPing();

   notifyServerStatusChangeListener(singleton(server));
}

getReachableServers():獲取可用的服務(wù)實(shí)例列表颊郎。由于BaseLoadBalancer中單獨(dú)維護(hù)了一個(gè)正常服務(wù)的實(shí)例清單,所以直接返回即可

@Override
public List<Server> getReachableServers() {
   return Collections.unmodifiableList(upServerList);
}

getAllServers():獲取所有的服務(wù)實(shí)例列表霎苗。由于BaseLoadBalancer中單獨(dú)維護(hù)了一個(gè)正常服務(wù)的實(shí)例清單姆吭,所以直接返回即可。

@Override
public List<Server> getAllServers() {
   return Collections.unmodifiableList(allServerList);
}

com.netflix.loadbalancer.DynamicServerListLoadBalancer

com.netflix.loadbalancer.DynamicServerListLoadBalancer類(lèi)繼承com.netflix.loadbalancer.BaseLoadBalancer類(lèi)唁盏,它是對(duì)基礎(chǔ)負(fù)載均衡器的擴(kuò)展内狸。
該負(fù)載均衡器中检眯,實(shí)現(xiàn)了服務(wù)實(shí)例清單在運(yùn)行期的動(dòng)態(tài)更新能力;同時(shí)昆淡,它還具備了對(duì)服務(wù)實(shí)例清單的過(guò)濾功能锰瘸,也就是說(shuō),我們可以通過(guò)過(guò)濾器來(lái)選擇性的獲取一批服務(wù)實(shí)例清單昂灵。

ServerList
DynamicServerListLoadBalancer的成員定義中避凝,我們馬上可以發(fā)現(xiàn)新增了一個(gè)關(guān)于服務(wù)列表的操作對(duì)象ServerList<T> serverListImpl。從類(lèi)名DynamicServerListLoadBalancer<T extends Server>發(fā)現(xiàn)T泛型是Server子類(lèi)眨补,即代表了一個(gè)具體的服務(wù)實(shí)例的擴(kuò)展類(lèi)管削,而ServerList接口定義如下:

volatile ServerList<T> serverListImpl;

ServerList接口定義如下

/**
 * Interface that defines the methods sed to obtain the List of Servers
 * @author stonse
 *
 * @param <T>
 */
public interface ServerList<T extends Server> {

    //用于獲取初始化的服務(wù)實(shí)例清單
    public List<T> getInitialListOfServers();
    
    //獲取更新的服務(wù)實(shí)例清單,每隔30s更新一次
    public List<T> getUpdatedListOfServers();   

}

其實(shí)現(xiàn)類(lèi):

DynamicServerListLoadBalancer中的ServerList默認(rèn)配置到底使用了哪些具體的實(shí)現(xiàn)呢撑螺?既然是該負(fù)載均衡器中實(shí)現(xiàn)服務(wù)實(shí)例的動(dòng)態(tài)更新含思,那么勢(shì)必需要Ribbon訪問(wèn)Eureka來(lái)獲取服務(wù)實(shí)例的能力,可以從Ribbon整合Eureka的包下去尋找甘晤,
org.springframework.cloud.netflix.ribbon.eureka.EurekaRibbonClientConfiguration中的含潘,

@Bean
@ConditionalOnMissingBean
public ServerList<?> ribbonServerList(IClientConfig config, Provider<EurekaClient> eurekaClientProvider) {
        if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
            return this.propertiesFactory.get(ServerList.class, config, serviceId);
        }
        DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
                config, eurekaClientProvider);
        DomainExtractingServerList serverList = new DomainExtractingServerList(
                discoveryServerList, config, this.approximateZoneFromHostname);
        return serverList;
}

可以看出創(chuàng)建了DomainExtractingServerList實(shí)例,其內(nèi)部也維護(hù)了ServerList list线婚,同時(shí)DomainExtractingServerList類(lèi)中對(duì)getInitialListOfServersgetUpdatedListOfServers的具體實(shí)現(xiàn)遏弱,其實(shí)是委托給內(nèi)部定義的ServerList<DiscoveryEnabledServer> list對(duì)象,而該對(duì)象是通過(guò)創(chuàng)建DiscoveryEnabledNIWSServerList實(shí)例傳遞進(jìn)去的

org.springframework.cloud.netflix.ribbon.eureka.DomainExtractingServerList的源碼:

那么DiscoveryEnabledNIWSServerList是如何實(shí)現(xiàn)這兩個(gè)服務(wù)實(shí)例獲取的呢酌伊?從DiscoveryEnabledNIWSServerList其源碼的私有方法obtainServersViaDiscovery通過(guò)服務(wù)發(fā)現(xiàn)機(jī)制來(lái)實(shí)現(xiàn)服務(wù)實(shí)例的獲取的腾窝,
com.netflix.niws.loadbalancer.DiscoveryEnabledNIWSServerList的obtainServersViaDiscovery方法,

@Override
public List<DiscoveryEnabledServer> getInitialListOfServers(){
        return obtainServersViaDiscovery();
}

@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
        return obtainServersViaDiscovery();
}

private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
        List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();

        if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
            logger.warn("EurekaClient has not been initialized yet, returning an empty list");
            return new ArrayList<DiscoveryEnabledServer>();
}

EurekaClient eurekaClient = eurekaClientProvider.get();
        //vipAddresses可以理解為邏輯上的服務(wù)名居砖,對(duì)這些服務(wù)名進(jìn)行遍歷奏候,將狀態(tài)為UP(正常服務(wù))的實(shí)例轉(zhuǎn)換成DiscoveryEnabledServer對(duì)象
        if (vipAddresses!=null){
            for (String vipAddress : vipAddresses.split(",")) {
                // if targetRegion is null, it will be interpreted as the same region of client
                List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
                for (InstanceInfo ii : listOfInstanceInfo) {
                    if (ii.getStatus().equals(InstanceStatus.UP)) {

                        if(shouldUseOverridePort){
                            if(logger.isDebugEnabled()){
                                logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
                            }

                            // copy is necessary since the InstanceInfo builder just uses the original reference,
                            // and we don't want to corrupt the global eureka copy of the object which may be
                            // used by other clients in our system
                            InstanceInfo copy = new InstanceInfo(ii);

                            if(isSecure){
                                ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
                            }else{
                                ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
                            }
                        }

                        DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);
                        des.setZone(DiscoveryClient.getZone(ii));
                        //將這些實(shí)例組織成列表返回。
                        serverList.add(des);
                    }
                }
                if (serverList.size()>0 && prioritizeVipAddressBasedServers){
                    break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
                }
            }
        }
        return serverList;
}

ServerListUpdater
com.netflix.loadbalancer.DynamicServerListLoadBalancer負(fù)載均衡器使用不同的策略進(jìn)行列表更新的策略蔗草。

上面分析了如何從Eureka Server中獲取服務(wù)實(shí)例清單,那么它又是如何觸發(fā)向Eureka Server去獲取服務(wù)實(shí)例清單以及如何在獲取到服務(wù)實(shí)例清單后更新本地實(shí)例清單呢?

回到com.netflix.loadbalancer.DynamicServerListLoadBalancer

protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
        @Override
        public void doUpdate() {
            //實(shí)現(xiàn)對(duì)服務(wù)列表的更新
            updateListOfServers();
        }
    };
    
    protected volatile ServerListUpdater serverListUpdater;

updateListOfServers方法

     @VisibleForTesting
    public void updateListOfServers() {
        List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {
            servers = serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                    getIdentifier(), servers);

            if (filter != null) {
                servers = filter.getFilteredListOfServers(servers);
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
            }
        }
        updateAllServerList(servers);
    }

ServerListUpdater的二種實(shí)現(xiàn):

ServerListUpdater的二種實(shí)現(xiàn)

PollingServerListUpdater:動(dòng)態(tài)服務(wù)列表更新的默認(rèn)策略,也就是說(shuō),DynamicServerListLoadBalancer負(fù)載均衡器中的默認(rèn)實(shí)現(xiàn)就是它邮弹,它通過(guò)定時(shí)任務(wù)的方式進(jìn)行服務(wù)列表的更新。

EurekaNotificationServerListUpdater:該更新器也可以服務(wù)于DynamicServerListLoadBalancer負(fù)載均衡器勺阐,但是它的觸發(fā)機(jī)制與PollingServerListUpdater不同,它需要利用Eureka的事件監(jiān)聽(tīng)來(lái)驅(qū)動(dòng)服務(wù)列表的更新操作织阅。

查看PollingServerListUpdater的實(shí)現(xiàn),

private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000; // msecs;
private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs;

    public PollingServerListUpdater() {
        this(LISTOFSERVERS_CACHE_UPDATE_DELAY, LISTOFSERVERS_CACHE_REPEAT_INTERVAL);
    }

    public PollingServerListUpdater(IClientConfig clientConfig) {
        this(LISTOFSERVERS_CACHE_UPDATE_DELAY, getRefreshIntervalMs(clientConfig));
    }

    public PollingServerListUpdater(final long initialDelayMs, final long refreshIntervalMs) {
        this.initialDelayMs = initialDelayMs;
        this.refreshIntervalMs = refreshIntervalMs;
    }

    @Override
    public synchronized void start(final UpdateAction updateAction) {
        if (isActive.compareAndSet(false, true)) {
            final Runnable wrapperRunnable = new Runnable() {
                @Override
                public void run() {
                    if (!isActive.get()) {
                        if (scheduledFuture != null) {
                            scheduledFuture.cancel(true);
                        }
                        return;
                    }
                    try {
                        updateAction.doUpdate();
                        lastUpdated = System.currentTimeMillis();
                    } catch (Exception e) {
                        logger.warn("Failed one update cycle", e);
                    }
                }
            };

            scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                    wrapperRunnable,
                    initialDelayMs,
                    refreshIntervalMs,
                    TimeUnit.MILLISECONDS
            );
        } else {
            logger.info("Already active, no-op");
        }
    }

initialDelayMsrefreshIntervalMs的默認(rèn)定義是1000和30*1000震捣,單位是毫秒荔棉。就是說(shuō),更新服務(wù)實(shí)例在初始化之后延遲1s后開(kāi)始執(zhí)行蒿赢,并以30s位周期重復(fù)執(zhí)行润樱,還會(huì)記錄最后更新時(shí)間,是否存活等信息羡棵。

ServerListFilter

volatile ServerListFilter<T> filter;

ServerListFilter接口的定義:
This interface allows for filtering the configured or dynamically obtained List of candidate servers with desirable characteristics.
該接口允許用配置或動(dòng)態(tài)獲取的具有所需特性的候選服務(wù)器列表進(jìn)行過(guò)濾壹若。

public interface ServerListFilter<T extends Server> {

    public List<T> getFilteredListOfServers(List<T> servers);

}
ServerListFilter接口的實(shí)現(xiàn)

其中,除了org.springframework.cloud.netflix.ribbon.ZonePreferenceServerListFilter的實(shí)現(xiàn)是Spring Cloud Ribbon中對(duì)Netflix Ribbon的擴(kuò)展實(shí)現(xiàn)外皂冰,其他均是Netflix Ribbon中的原生實(shí)現(xiàn)類(lèi)店展,

  • com.netflix.loadbalancer.AbstractServerListFilter:這是一個(gè)抽象的接口,接收一個(gè)重要的依據(jù)對(duì)象LoadBalancerStats
AbstractServerListFilter
  • com.netflix.loadbalancer.ZoneAffinityServerListFilter:該過(guò)濾器基于"區(qū)域感知(Zone Affinity)"的方式實(shí)現(xiàn)服務(wù)實(shí)例的過(guò)濾秃流,也就說(shuō)赂蕴,它會(huì)根據(jù)提供服務(wù)的實(shí)例所處于的區(qū)域(Zone)與消費(fèi)者自身所處區(qū)域(Zone)進(jìn)行比較,過(guò)濾掉那些不是同處一個(gè)區(qū)域的實(shí)例
@Override
    public List<T> getFilteredListOfServers(List<T> servers) {
        if (zone != null && (zoneAffinity || zoneExclusive) && servers !=null && servers.size() > 0){
            List<T> filteredServers = Lists.newArrayList(Iterables.filter(
                    servers, this.zoneAffinityPredicate.getServerOnlyPredicate()));
            if (shouldEnableZoneAffinity(filteredServers)) {
                return filteredServers;
            } else if (zoneAffinity) {
                overrideCounter.increment();
            }
        }
        return servers;
}
  • com.netflix.niws.loadbalancer.DefaultNIWSServerListFilter
    該過(guò)濾器完全繼承自ZoneAffinityServerListFilter舶胀,是默認(rèn)的NIWS(Netfilx Internal Web Service)過(guò)濾器概说。

  • com.netflix.loadbalancer.ServerListSubsetFilter
    該過(guò)濾器也繼承自ZoneAffinityServerListFilter,它非常適用于擁有大規(guī)模服務(wù)器集群(上百或者更多)的系統(tǒng)嚣伐。因?yàn)樗梢援a(chǎn)生一個(gè)“區(qū)域感知”

  • org.springframework.cloud.netflix.ribbon.ZonePreferenceServerListFilter
    spring cloud整合時(shí)新增的過(guò)濾器糖赔,若使用Spring Cloud整合EurekaRibbon時(shí)會(huì)默認(rèn)使用該過(guò)濾器,它實(shí)現(xiàn)了通過(guò)配置或者Eureka實(shí)例元數(shù)據(jù)的所屬區(qū)域(Zone)來(lái)過(guò)濾同區(qū)域的服務(wù)實(shí)例纤控,它的實(shí)現(xiàn)非常簡(jiǎn)單挂捻,首先通過(guò)ZoneAffinityServerListFilter的過(guò)濾器來(lái)獲得"區(qū)域感知"的服務(wù)實(shí)例列表,然后遍歷這個(gè)結(jié)果船万,取出根據(jù)消費(fèi)者配置預(yù)設(shè)的區(qū)域Zone來(lái)進(jìn)行過(guò)濾刻撒,如果過(guò)濾的結(jié)果是空就直接返回父類(lèi)的結(jié)果,如果不為空就返回通過(guò)消費(fèi)者的Zone過(guò)濾后的結(jié)果耿导。

@Override
public List<Server> getFilteredListOfServers(List<Server> servers) {
        List<Server> output = super.getFilteredListOfServers(servers);
        if (this.zone != null && output.size() == servers.size()) {
            List<Server> local = new ArrayList<Server>();
            for (Server server : output) {
                if (this.zone.equalsIgnoreCase(server.getZone())) {
                    local.add(server);
                }
            }
            if (!local.isEmpty()) {
                return local;
            }
        }
        return output;
}

com.netflix.loadbalancer.ZoneAwareLoadBalancer

ZoneAwareLoadBalancer負(fù)載均衡器是對(duì)DynamicServerListLoadBalancer的擴(kuò)展声怔。在DynamicServerListLoadBalancer中,我們可以看到它并沒(méi)有重寫(xiě)選擇具體服務(wù)實(shí)例的chooseServer函數(shù)舱呻,所以它依然會(huì)采用在BaseLoadBalancer中實(shí)現(xiàn)的算法醋火。使用RoundRobinRule規(guī)則悠汽,以線性輪詢的方式來(lái)選擇調(diào)用的服務(wù)實(shí)例,該算法實(shí)現(xiàn)簡(jiǎn)單并沒(méi)有區(qū)域(Zone)的概念芥驳,所以它會(huì)把所有實(shí)例視為一個(gè)Zone下的節(jié)點(diǎn)來(lái)看待柿冲,這樣就會(huì)周期性的跨區(qū)域(Zone)訪問(wèn)的情況,由于跨區(qū)域會(huì)產(chǎn)生更高的延遲兆旬,這些實(shí)例主要以防止區(qū)域性故障實(shí)現(xiàn)高可用為目的而不能作為常規(guī)訪問(wèn)的實(shí)例跑杭,所以在多區(qū)域部署的情況會(huì)出現(xiàn)一定的性能問(wèn)題普监,而該負(fù)載均衡器則可以規(guī)避這樣的問(wèn)題剔宪。

ZoneAwareLoadBalancer中胀瞪,并沒(méi)有重寫(xiě)setServersList方法,說(shuō)明實(shí)現(xiàn)服務(wù)實(shí)例清單的更新的主要邏輯沒(méi)有變化脚祟。但是重寫(xiě)了setServerListForZones方法谬以,DynamicServerListLoadBalancer中的定義:

DynamicServerListLoadBalancer

setServerListForZones函數(shù)的調(diào)用位于更新服務(wù)實(shí)例清單函數(shù)setServersList最后,根據(jù)區(qū)域Zone分組的實(shí)例列表由桌,為負(fù)載均衡器中的LoadBalancerStats對(duì)象創(chuàng)建ZoneStats并放入Map zoneServersMap集合中为黎,每一個(gè)區(qū)域Zone對(duì)應(yīng)一個(gè)ZoneStats,它用于存儲(chǔ)每個(gè)Zone的一些狀態(tài)和統(tǒng)計(jì)信息沥寥。

ZoneAwareLoadBalancer中對(duì)setServerListForZones重寫(xiě)如下:

@Override
protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) {
        super.setServerListForZones(zoneServersMap);碍舍,它存儲(chǔ)每個(gè)Zone區(qū)域?qū)?yīng)的負(fù)載均衡器。
        if (balancers == null) {
            //創(chuàng)建一個(gè)ConcurrentHashMap類(lèi)型的balancers對(duì)象
            balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();
}
        //具體的負(fù)載均衡器在getLoadBalancer函數(shù)中完成邑雅,同時(shí)在創(chuàng)建負(fù)載均衡器的時(shí)候會(huì)創(chuàng)建它的規(guī)則
        for (Map.Entry<String, List<Server>> entry: zoneServersMap.entrySet()) {
            String zone = entry.getKey().toLowerCase();
              //創(chuàng)建完負(fù)載均衡器的時(shí)候會(huì)馬上調(diào)用setServersList函數(shù)為其設(shè)置對(duì)應(yīng)Zone區(qū)域的實(shí)例清單
            getLoadBalancer(zone).setServersList(entry.getValue());
        }
        // check if there is any zone that no longer has a server
        // and set the list to empty so that the zone related metrics does not
        // contain stale data
        //對(duì)Zone區(qū)域中實(shí)例清單的檢查片橡,看看是否有Zone區(qū)域的實(shí)例清單已經(jīng)沒(méi)有實(shí)例了,是的話就將balancers中對(duì)應(yīng)的Xone區(qū)域的實(shí)例列表清空淮野,該操作
        //的作用是為了后續(xù)選擇節(jié)點(diǎn)時(shí)捧书,防止過(guò)多的Zone區(qū)域統(tǒng)計(jì)信息干擾具體實(shí)例的選擇算法
        for (Map.Entry<String, BaseLoadBalancer> existingLBEntry: balancers.entrySet()) {
            if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) {
                existingLBEntry.getValue().setServersList(Collections.emptyList());
            }
        }
    }    

getLoadBalancer方法

    //如果當(dāng)前實(shí)現(xiàn)中沒(méi)有IRule的實(shí)例,就創(chuàng)建一個(gè)AvailabilityFilteringRule規(guī)則骤星,如果有實(shí)現(xiàn)克隆一個(gè)
    @VisibleForTesting
    BaseLoadBalancer getLoadBalancer(String zone) {
        zone = zone.toLowerCase();
        BaseLoadBalancer loadBalancer = balancers.get(zone);
        if (loadBalancer == null) {
            // We need to create rule object for load balancer for each zone
            IRule rule = cloneRule(this.getRule());
            loadBalancer = new BaseLoadBalancer(this.getName() + "_" + zone, rule, this.getLoadBalancerStats());
            BaseLoadBalancer prev = balancers.putIfAbsent(zone, loadBalancer);
            if (prev != null) {
                loadBalancer = prev;
            }
        } 
        return loadBalancer;        
    }

在了解了負(fù)載均衡器如何擴(kuò)展服務(wù)實(shí)例清單的時(shí)候经瓷,看其怎樣挑選服務(wù)實(shí)例,來(lái)實(shí)現(xiàn)對(duì)區(qū)域的識(shí)別的洞难,

@Override
    public Server chooseServer(Object key) {
        if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
            logger.debug("Zone aware logic disabled or there is only one zone");
            return super.chooseServer(key);
        }
        Server server = null;
        //只有當(dāng)負(fù)載均衡器中維護(hù)的實(shí)例所屬的Zone區(qū)域的個(gè)數(shù)大于1的時(shí)候才會(huì)執(zhí)行這里的策略
        try {
            LoadBalancerStats lbStats = getLoadBalancerStats();
            //調(diào)用ZoneAvoidanceRule.createSnapshot方法舆吮,當(dāng)前的負(fù)載均衡器中所有的Zone區(qū)域分布創(chuàng)建快照,保存在Map zoneSnapshot中
            Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
            logger.debug("Zone snapshots: {}", zoneSnapshot);
            if (triggeringLoad == null) {
                triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
                        "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
            }

            if (triggeringBlackoutPercentage == null) {
                triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
                        "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
            }
            //調(diào)用ZoneAvoidanceRule.getAvailableZones方法队贱,來(lái)獲取可用Zone區(qū)域集合色冀,在該函數(shù)中會(huì)通過(guò)Zone區(qū)域快照的統(tǒng)計(jì)數(shù)據(jù)實(shí)現(xiàn)可用區(qū)的挑選。
            Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
            logger.debug("Available zones: {}", availableZones);
            //當(dāng)獲得的可用Zone區(qū)域集合不為空柱嫌,并且個(gè)數(shù)小于Zone區(qū)域總數(shù)锋恬,就隨機(jī)選擇一個(gè)Zone區(qū)域
            if (availableZones != null &&  availableZones.size() < zoneSnapshot.keySet().size()) {
                
                String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
                logger.debug("Zone chosen: {}", zone);
                if (zone != null) {
                //在確定了某個(gè)Zone區(qū)域后,則獲取了對(duì)應(yīng)Zone區(qū)域服務(wù)均衡器编丘,并調(diào)用zoneLoadBalancer.chooseServer來(lái)選擇具體的服務(wù)實(shí)例与学,而在
                //zoneLoadBalancer.chooseServer中將使用IRule接口的choose函數(shù)來(lái)選擇具體的服務(wù)實(shí)例彤悔,在這里,IRule接口的實(shí)現(xiàn)會(huì)使用ZoneAvoidanceRule來(lái)挑選具體的服務(wù)實(shí)例索守。
                    BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
                    server = zoneLoadBalancer.chooseServer(key);
                }
            }
        } catch (Throwable e) {
            logger.error("Unexpected exception when choosing server using zone aware logic", e);
        }
        if (server != null) {
            return server;
        } else {
            logger.debug("Zone avoidance logic is not invoked.");
            //否則實(shí)現(xiàn)父類(lèi)的策略
            return super.chooseServer(key);
        }
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末晕窑,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子卵佛,更是在濱河造成了極大的恐慌幕屹,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,858評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件级遭,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡渺尘,警方通過(guò)查閱死者的電腦和手機(jī)挫鸽,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,372評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)鸥跟,“玉大人丢郊,你說(shuō)我怎么就攤上這事∫阶桑” “怎么了枫匾?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,282評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)拟淮。 經(jīng)常有香客問(wèn)我干茉,道長(zhǎng),這世上最難降的妖魔是什么很泊? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,842評(píng)論 1 295
  • 正文 為了忘掉前任角虫,我火速辦了婚禮,結(jié)果婚禮上委造,老公的妹妹穿的比我還像新娘戳鹅。我一直安慰自己,他們只是感情好昏兆,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,857評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布枫虏。 她就那樣靜靜地躺著,像睡著了一般爬虱。 火紅的嫁衣襯著肌膚如雪隶债。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,679評(píng)論 1 305
  • 那天饮潦,我揣著相機(jī)與錄音燃异,去河邊找鬼。 笑死继蜡,一個(gè)胖子當(dāng)著我的面吹牛回俐,可吹牛的內(nèi)容都是我干的逛腿。 我是一名探鬼主播,決...
    沈念sama閱讀 40,406評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼仅颇,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼单默!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起忘瓦,我...
    開(kāi)封第一講書(shū)人閱讀 39,311評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤搁廓,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后耕皮,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體境蜕,經(jīng)...
    沈念sama閱讀 45,767評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,945評(píng)論 3 336
  • 正文 我和宋清朗相戀三年凌停,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了粱年。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,090評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡罚拟,死狀恐怖台诗,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情赐俗,我是刑警寧澤拉队,帶...
    沈念sama閱讀 35,785評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站阻逮,受9級(jí)特大地震影響粱快,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜叔扼,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,420評(píng)論 3 331
  • 文/蒙蒙 一皆尔、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧币励,春花似錦慷蠕、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,988評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至仅胞,卻和暖如春每辟,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背干旧。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,101評(píng)論 1 271
  • 我被黑心中介騙來(lái)泰國(guó)打工渠欺, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人椎眯。 一個(gè)月前我還...
    沈念sama閱讀 48,298評(píng)論 3 372
  • 正文 我出身青樓挠将,卻偏偏與公主長(zhǎng)得像胳岂,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子舔稀,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,033評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容