Dubbo分析之Cluster層

前言

本文繼續(xù)分析dubbo的cluster層松却,此層封裝多個提供者的路由及負(fù)載均衡奠货,并橋接注冊中心封孙,以Invoker為中心主慰,擴展接口為Cluster, Directory, Router, LoadBalance;

Cluster接口

整個cluster層可以使用如下圖片概括:

各節(jié)點關(guān)系:

這里的Invoker是Provider的一個可調(diào)用Service的抽象亿乳,Invoker封裝了Provider地址及Service接口信息硝拧;

Directory代表多個Invoker,可以把它看成List葛假,但與List不同的是障陶,它的值可能是動態(tài)變化的,比如注冊中心推送變更聊训;

Cluster將Directory中的多個Invoker偽裝成一個 Invoker抱究,對上層透明,偽裝過程包含了容錯邏輯带斑,調(diào)用失敗后鼓寺,重試另一個;

Router負(fù)責(zé)從多個Invoker中按路由規(guī)則選出子集遏暴,比如讀寫分離侄刽,應(yīng)用隔離等;

LoadBalance負(fù)責(zé)從多個Invoker中選出具體的一個用于本次調(diào)用朋凉,選的過程包含了負(fù)載均衡算法州丹,調(diào)用失敗后,需要重選杂彭;

Cluster經(jīng)過目錄墓毒,路由,負(fù)載均衡獲取到一個可用的Invoker亲怠,交給上層調(diào)用所计,接口如下:

@SPI(FailoverCluster.NAME)

public interface Cluster {

? ? /**

? ? * Merge the directory invokers to a virtual invoker.

? ? *

? ? * @param <T>

? ? * @param directory

? ? * @return cluster invoker

? ? * @throws RpcException

? ? */

? ? @Adaptive

? ? <T> Invoker<T> join(Directory<T> directory) throws RpcException;

}

Cluster是一個集群容錯接口,經(jīng)過路由团秽,負(fù)載均衡之后獲取的Invoker主胧,由容錯機制來處理,dubbo提供了多種容錯機制包括:

Failover Cluster:失敗自動切換习勤,當(dāng)出現(xiàn)失敗踪栋,重試其它服務(wù)器 [1]。通常用于讀操作图毕,但重試會帶來更長延遲夷都。可通過 retries=”2″ 來設(shè)置重試次數(shù)(不含第一次)予颤。

Failfast Cluster:快速失敗囤官,只發(fā)起一次調(diào)用冬阳,失敗立即報錯。通常用于非冪等性的寫操作党饮,比如新增記錄肝陪。

Failsafe Cluster:失敗安全,出現(xiàn)異常時劫谅,直接忽略见坑。通常用于寫入審計日志等操作嚷掠。

Failback Cluster:失敗自動恢復(fù)捏检,后臺記錄失敗請求,定時重發(fā)不皆。通常用于消息通知操作贯城。

Forking Cluster:并行調(diào)用多個服務(wù)器,只要一個成功即返回霹娄。通常用于實時性要求較高的讀操作能犯,但需要浪費更多服務(wù)資源∪埽可通過 forks=”2″ 來設(shè)置最大并行數(shù)踩晶。

Broadcast Cluster:廣播調(diào)用所有提供者,逐個調(diào)用枕磁,任意一臺報錯則報錯 [2]渡蜻。通常用于通知所有提供者更新緩存或日志等本地資源信息。

默認(rèn)使用了FailoverCluster计济,失敗的時候會默認(rèn)重試其他服務(wù)器茸苇,默認(rèn)為兩次;當(dāng)然也可以擴展其他的容錯機制沦寂;看一下默認(rèn)的FailoverCluster容錯機制学密,具體源碼在FailoverClusterInvoker中:

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {

? ? ? ? List<Invoker<T>> copyinvokers = invokers;

? ? ? ? checkInvokers(copyinvokers, invocation);

? ? ? ? int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;

? ? ? ? if (len <= 0) {

? ? ? ? ? ? len = 1;

? ? ? ? }

? ? ? ? // retry loop.

? ? ? ? RpcException le = null; // last exception.

? ? ? ? List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.

? ? ? ? Set<String> providers = new HashSet<String>(len);

? ? ? ? for (int i = 0; i < len; i++) {

? ? ? ? ? ? //Reselect before retry to avoid a change of candidate `invokers`.

? ? ? ? ? ? //NOTE: if `invokers` changed, then `invoked` also lose accuracy.

? ? ? ? ? ? if (i > 0) {

? ? ? ? ? ? ? ? checkWhetherDestroyed();

? ? ? ? ? ? ? ? copyinvokers = list(invocation);

? ? ? ? ? ? ? ? // check again

? ? ? ? ? ? ? ? checkInvokers(copyinvokers, invocation);

? ? ? ? ? ? }

? ? ? ? ? ? Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);

? ? ? ? ? ? invoked.add(invoker);

? ? ? ? ? ? RpcContext.getContext().setInvokers((List) invoked);

? ? ? ? ? ? try {

? ? ? ? ? ? ? ? Result result = invoker.invoke(invocation);

? ? ? ? ? ? ? ? if (le != null && logger.isWarnEnabled()) {

? ? ? ? ? ? ? ? ? ? logger.warn("Although retry the method " + invocation.getMethodName()

? ? ? ? ? ? ? ? ? ? ? ? ? ? + " in the service " + getInterface().getName()

? ? ? ? ? ? ? ? ? ? ? ? ? ? + " was successful by the provider " + invoker.getUrl().getAddress()

? ? ? ? ? ? ? ? ? ? ? ? ? ? + ", but there have been failed providers " + providers

? ? ? ? ? ? ? ? ? ? ? ? ? ? + " (" + providers.size() + "/" + copyinvokers.size()

? ? ? ? ? ? ? ? ? ? ? ? ? ? + ") from the registry " + directory.getUrl().getAddress()

? ? ? ? ? ? ? ? ? ? ? ? ? ? + " on the consumer " + NetUtils.getLocalHost()

? ? ? ? ? ? ? ? ? ? ? ? ? ? + " using the dubbo version " + Version.getVersion() + ". Last error is: "

? ? ? ? ? ? ? ? ? ? ? ? ? ? + le.getMessage(), le);

? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? return result;

? ? ? ? ? ? } catch (RpcException e) {

? ? ? ? ? ? ? ? if (e.isBiz()) { // biz exception.

? ? ? ? ? ? ? ? ? ? throw e;

? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? le = e;

? ? ? ? ? ? } catch (Throwable e) {

? ? ? ? ? ? ? ? le = new RpcException(e.getMessage(), e);

? ? ? ? ? ? } finally {

? ? ? ? ? ? ? ? providers.add(invoker.getUrl().getAddress());

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "

? ? ? ? ? ? ? ? + invocation.getMethodName() + " in the service " + getInterface().getName()

? ? ? ? ? ? ? ? + ". Tried " + len + " times of the providers " + providers

? ? ? ? ? ? ? ? + " (" + providers.size() + "/" + copyinvokers.size()

? ? ? ? ? ? ? ? + ") from the registry " + directory.getUrl().getAddress()

? ? ? ? ? ? ? ? + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "

? ? ? ? ? ? ? ? + Version.getVersion() + ". Last error is: "

? ? ? ? ? ? ? ? + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);

? ? }

invocation是客戶端傳給服務(wù)器的相關(guān)參數(shù)包括(方法名稱,方法參數(shù)传藏,參數(shù)值腻暮,附件信息),invokers是經(jīng)過路由之后的服務(wù)器列表毯侦,loadbalance是指定的負(fù)載均衡策略哭靖;首先檢查invokers是否為空,為空直接拋異常叫惊,然后獲取重試的次數(shù)默認(rèn)為2次款青,接下來就是循環(huán)調(diào)用指定次數(shù),如果不是第一次調(diào)用(表示第一次調(diào)用失敗)霍狰,會重新加載服務(wù)器列表抡草,然后通過負(fù)載均衡策略獲取唯一的Invoker饰及,最后就是通過Invoker把invocation發(fā)送給服務(wù)器,返回結(jié)果Result康震;

具體的doInvoke方法是在抽象類AbstractClusterInvoker中被調(diào)用的:

public Result invoke(final Invocation invocation) throws RpcException {

? ? ? ? checkWhetherDestroyed();

? ? ? ? LoadBalance loadbalance = null;

? ? ? ? List<Invoker<T>> invokers = list(invocation);

? ? ? ? if (invokers != null && !invokers.isEmpty()) {

? ? ? ? ? ? loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()

? ? ? ? ? ? ? ? ? ? .getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));

? ? ? ? }

? ? ? ? RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

? ? ? ? return doInvoke(invocation, invokers, loadbalance);

? ? }

protected List<Invoker<T>> list(Invocation invocation) throws RpcException {

? ? ? ? List<Invoker<T>> invokers = directory.list(invocation);

? ? ? ? return invokers;

? ? }

首先通過Directory獲取Invoker列表燎含,同時在Directory中也會做路由處理,然后獲取負(fù)載均衡策略腿短,最后調(diào)用具體的容錯策略屏箍;下面具體看一下Directory;

Directory接口

接口定義如下:

public interface Directory<T> extends Node {

? ? /**

? ? * get service type.

? ? *

? ? * @return service type.

? ? */

? ? Class<T> getInterface();

? ? /**

? ? * list invokers.

? ? *

? ? * @return invokers

? ? */

? ? List<Invoker<T>> list(Invocation invocation) throws RpcException;

}

目錄服務(wù)作用就是獲取指定接口的服務(wù)列表橘忱,具體實現(xiàn)有兩個:StaticDirectory和RegistryDirectory赴魁,同時都繼承于AbstractDirectory;從名字可以大致知道StaticDirectory是一個固定的目錄服務(wù),表示里面的Invoker列表不會動態(tài)改變;RegistryDirectory是一個動態(tài)的目錄服務(wù)末贾,通過注冊中心動態(tài)更新服務(wù)列表;list實現(xiàn)在抽象類中:

public List<Invoker<T>> list(Invocation invocation) throws RpcException {

? ? ? ? if (destroyed) {

? ? ? ? ? ? throw new RpcException("Directory already destroyed .url: " + getUrl());

? ? ? ? }

? ? ? ? List<Invoker<T>> invokers = doList(invocation);

? ? ? ? List<Router> localRouters = this.routers; // local reference

? ? ? ? if (localRouters != null && !localRouters.isEmpty()) {

? ? ? ? ? ? for (Router router : localRouters) {

? ? ? ? ? ? ? ? try {

? ? ? ? ? ? ? ? ? ? if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {

? ? ? ? ? ? ? ? ? ? ? ? invokers = router.route(invokers, getConsumerUrl(), invocation);

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? } catch (Throwable t) {

? ? ? ? ? ? ? ? ? ? logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? return invokers;

? ? }

首先檢查目錄是否被銷毀潘拱,然后調(diào)用doList,具體在實現(xiàn)類中定義拧略,最后調(diào)用路由功能芦岂,下面重點看一下StaticDirectory和RegistryDirectory中的doList方法

1.RegistryDirectory

是一個動態(tài)的目錄服務(wù),所有可以看到RegistryDirectory同時也繼承了NotifyListener接口垫蛆,是一個通知接口禽最,注冊中心有服務(wù)列表更新的時候,同時通知RegistryDirectory月褥,通知邏輯如下:

public synchronized void notify(List<URL> urls) {

? ? ? ? List<URL> invokerUrls = new ArrayList<URL>();

? ? ? ? List<URL> routerUrls = new ArrayList<URL>();

? ? ? ? List<URL> configuratorUrls = new ArrayList<URL>();

? ? ? ? for (URL url : urls) {

? ? ? ? ? ? String protocol = url.getProtocol();

? ? ? ? ? ? String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);

? ? ? ? ? ? if (Constants.ROUTERS_CATEGORY.equals(category)

? ? ? ? ? ? ? ? ? ? || Constants.ROUTE_PROTOCOL.equals(protocol)) {

? ? ? ? ? ? ? ? routerUrls.add(url);

? ? ? ? ? ? } else if (Constants.CONFIGURATORS_CATEGORY.equals(category)

? ? ? ? ? ? ? ? ? ? || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {

? ? ? ? ? ? ? ? configuratorUrls.add(url);

? ? ? ? ? ? } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {

? ? ? ? ? ? ? ? invokerUrls.add(url);

? ? ? ? ? ? } else {

? ? ? ? ? ? ? ? logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? // configurators

? ? ? ? if (configuratorUrls != null && !configuratorUrls.isEmpty()) {

? ? ? ? ? ? this.configurators = toConfigurators(configuratorUrls);

? ? ? ? }

? ? ? ? // routers

? ? ? ? if (routerUrls != null && !routerUrls.isEmpty()) {

? ? ? ? ? ? List<Router> routers = toRouters(routerUrls);

? ? ? ? ? ? if (routers != null) { // null - do nothing

? ? ? ? ? ? ? ? setRouters(routers);

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? List<Configurator> localConfigurators = this.configurators; // local reference

? ? ? ? // merge override parameters

? ? ? ? this.overrideDirectoryUrl = directoryUrl;

? ? ? ? if (localConfigurators != null && !localConfigurators.isEmpty()) {

? ? ? ? ? ? for (Configurator configurator : localConfigurators) {

? ? ? ? ? ? ? ? this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? // providers

? ? ? ? refreshInvoker(invokerUrls);

? ? }

此通知接口會接受三種類別的url包括:router(路由)弛随,configurator(配置),provider(服務(wù)提供方)宁赤;

路由規(guī)則:決定一次dubbo服務(wù)調(diào)用的目標(biāo)服務(wù)器舀透,分為條件路由規(guī)則和腳本路由規(guī)則,并且支持可擴展决左,向注冊中心寫入路由規(guī)則的操作通常由監(jiān)控中心或治理中心的頁面完成愕够;

配置規(guī)則:向注冊中心寫入動態(tài)配置覆蓋規(guī)則 [1]。該功能通常由監(jiān)控中心或治理中心的頁面完成佛猛;

provider:動態(tài)提供的服務(wù)列表

路由規(guī)則和配置規(guī)則其實就是對provider服務(wù)列表更新和過濾處理惑芭,refreshInvoker方法就是根據(jù)三種url類別刷新本地的invoker列表,下面看一下RegistryDirectory實現(xiàn)的doList接口:

public List<Invoker<T>> doList(Invocation invocation) {

? ? ? ? if (forbidden) {

? ? ? ? ? ? // 1. No service provider 2. Service providers are disabled

? ? ? ? ? ? throw new RpcException(RpcException.FORBIDDEN_EXCEPTION,

? ? ? ? ? ? ? ? "No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +? NetUtils.getLocalHost()

? ? ? ? ? ? ? ? ? ? ? ? + " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist).");

? ? ? ? }

? ? ? ? List<Invoker<T>> invokers = null;

? ? ? ? Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference

? ? ? ? if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {

? ? ? ? ? ? String methodName = RpcUtils.getMethodName(invocation);

? ? ? ? ? ? Object[] args = RpcUtils.getArguments(invocation);

? ? ? ? ? ? if (args != null && args.length > 0 && args[0] != null

? ? ? ? ? ? ? ? ? ? && (args[0] instanceof String || args[0].getClass().isEnum())) {

? ? ? ? ? ? ? ? invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter

? ? ? ? ? ? }

? ? ? ? ? ? if (invokers == null) {

? ? ? ? ? ? ? ? invokers = localMethodInvokerMap.get(methodName);

? ? ? ? ? ? }

? ? ? ? ? ? if (invokers == null) {

? ? ? ? ? ? ? ? invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);

? ? ? ? ? ? }

? ? ? ? ? ? if (invokers == null) {

? ? ? ? ? ? ? ? Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();

? ? ? ? ? ? ? ? if (iterator.hasNext()) {

? ? ? ? ? ? ? ? ? ? invokers = iterator.next();

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;

? ? }

refreshInvoker處理之后继找,服務(wù)列表已methodInvokerMap存在遂跟,一個方法對應(yīng)服務(wù)列表Map>>;

通過Invocation中指定的方法獲取對應(yīng)的服務(wù)列表,如果具體的方法沒有對應(yīng)的服務(wù)列表幻锁,則獲取”*”對應(yīng)的服務(wù)列表凯亮;處理完之后就在父類中進行路由處理,路由規(guī)則同樣是通過通知接口獲取的哄尔,路由規(guī)則在下章介紹假消;

2.StaticDirectory

這是一個靜態(tài)的目錄服務(wù),里面的服務(wù)列表在初始化的時候就已經(jīng)存在岭接,并且不會改變富拗;StaticDirectory用得比較少,主要用在服務(wù)對多注冊中心的引用;

protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {

? ? ? ? return invokers;

? ? }

因為是靜態(tài)的鸣戴,所有doList方法也很簡單啃沪,直接返回內(nèi)存中的服務(wù)列表即可;

Router接口

路由規(guī)則決定一次dubbo服務(wù)調(diào)用的目標(biāo)服務(wù)器葵擎,分為條件路由規(guī)則和腳本路由規(guī)則谅阿,并且支持可擴展,接口如下:

public interface Router extends Comparable<Router> {

? ? /**

? ? * get the router url.

? ? *

? ? * @return url

? ? */

? ? URL getUrl();

? ? /**

? ? * route.

? ? *

? ? * @param invokers

? ? * @param url? ? ? ? refer url

? ? * @param invocation

? ? * @return routed invokers

? ? * @throws RpcException

? ? */

? ? <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;

}

接口中提供的route方法通過一定的規(guī)則過濾出invokers的一個子集酬滤;提供了三個實現(xiàn)類:ScriptRouter,ConditionRouter和MockInvokersSelector

ScriptRouter:腳本路由規(guī)則支持 JDK 腳本引擎的所有腳本寓涨,比如:javascript, jruby, groovy 等盯串,通過type=javascript參數(shù)設(shè)置腳本類型,缺省為javascript戒良;

ConditionRouter:基于條件表達(dá)式的路由規(guī)則体捏,如:host = 10.20.153.10 => host = 10.20.153.11;=> 之前的為消費者匹配條件糯崎,所有參數(shù)和消費者的 URL 進行對比几缭,=> 之后為提供者地址列表的過濾條件,所有參數(shù)和提供者的 URL 進行對比沃呢;

MockInvokersSelector:是否被配置為使用mock年栓,此路由器保證只有具有協(xié)議MOCK的調(diào)用者出現(xiàn)在最終的調(diào)用者列表中,所有其他調(diào)用者將被排除薄霜;

下面重點看一下ScriptRouter源碼

public ScriptRouter(URL url) {

? ? ? ? this.url = url;

? ? ? ? String type = url.getParameter(Constants.TYPE_KEY);

? ? ? ? this.priority = url.getParameter(Constants.PRIORITY_KEY, 0);

? ? ? ? String rule = url.getParameterAndDecoded(Constants.RULE_KEY);

? ? ? ? if (type == null || type.length() == 0) {

? ? ? ? ? ? type = Constants.DEFAULT_SCRIPT_TYPE_KEY;

? ? ? ? }

? ? ? ? if (rule == null || rule.length() == 0) {

? ? ? ? ? ? throw new IllegalStateException(new IllegalStateException("route rule can not be empty. rule:" + rule));

? ? ? ? }

? ? ? ? ScriptEngine engine = engines.get(type);

? ? ? ? if (engine == null) {

? ? ? ? ? ? engine = new ScriptEngineManager().getEngineByName(type);

? ? ? ? ? ? if (engine == null) {

? ? ? ? ? ? ? ? throw new IllegalStateException(new IllegalStateException("Unsupported route rule type: " + type + ", rule: " + rule));

? ? ? ? ? ? }

? ? ? ? ? ? engines.put(type, engine);

? ? ? ? }

? ? ? ? this.engine = engine;

? ? ? ? this.rule = rule;

? ? }

構(gòu)造器分別初始化腳本引擎(engine)和腳本代碼(rule)某抓,默認(rèn)的腳本引擎是javascript;看一個具體的url:

"script://0.0.0.0/com.foo.BarService?category=routers&dynamic=false&rule=" + URL.encode("(function route(invokers) { ... } (invokers))")

script協(xié)議表示一個腳本協(xié)議惰瓜,rule后面是一段javascript腳本否副,傳入的參數(shù)是invokers;

(function route(invokers) {

? ? var result = new java.util.ArrayList(invokers.size());

? ? for (i = 0; i < invokers.size(); i ++) {

? ? ? ? if ("10.20.153.10".equals(invokers.get(i).getUrl().getHost())) {

? ? ? ? ? ? result.add(invokers.get(i));

? ? ? ? }

? ? }

? ? return result;

} (invokers)); // 表示立即執(zhí)行方法

如上這段腳本過濾出host為10.20.153.10崎坊,具體是如何執(zhí)行這段腳本的备禀,在route方法中:

public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {

? ? ? ? try {

? ? ? ? ? ? List<Invoker<T>> invokersCopy = new ArrayList<Invoker<T>>(invokers);

? ? ? ? ? ? Compilable compilable = (Compilable) engine;

? ? ? ? ? ? Bindings bindings = engine.createBindings();

? ? ? ? ? ? bindings.put("invokers", invokersCopy);

? ? ? ? ? ? bindings.put("invocation", invocation);

? ? ? ? ? ? bindings.put("context", RpcContext.getContext());

? ? ? ? ? ? CompiledScript function = compilable.compile(rule);

? ? ? ? ? ? Object obj = function.eval(bindings);

? ? ? ? ? ? if (obj instanceof Invoker[]) {

? ? ? ? ? ? ? ? invokersCopy = Arrays.asList((Invoker<T>[]) obj);

? ? ? ? ? ? } else if (obj instanceof Object[]) {

? ? ? ? ? ? ? ? invokersCopy = new ArrayList<Invoker<T>>();

? ? ? ? ? ? ? ? for (Object inv : (Object[]) obj) {

? ? ? ? ? ? ? ? ? ? invokersCopy.add((Invoker<T>) inv);

? ? ? ? ? ? ? ? }

? ? ? ? ? ? } else {

? ? ? ? ? ? ? ? invokersCopy = (List<Invoker<T>>) obj;

? ? ? ? ? ? }

? ? ? ? ? ? return invokersCopy;

? ? ? ? } catch (ScriptException e) {

? ? ? ? ? ? //fail then ignore rule .invokers.

? ? ? ? ? ? logger.error("route error , rule has been ignored. rule: " + rule + ", method:" + invocation.getMethodName() + ", url: " + RpcContext.getContext().getUrl(), e);

? ? ? ? ? ? return invokers;

? ? ? ? }

? ? }

首先通過腳本引擎編譯腳本,然后執(zhí)行腳本,同時傳入Bindings參數(shù)曲尸,這樣在腳本中就可以獲取invokers呻待,然后進行過濾;最后來看一下負(fù)載均衡策略

LoadBalance接口

在集群負(fù)載均衡時队腐,Dubbo提供了多種均衡策略蚕捉,缺省為random隨機調(diào)用,可以自行擴展負(fù)載均衡策略柴淘;接口類如下:

@SPI(RandomLoadBalance.NAME)

public interface LoadBalance {

? ? /**

? ? * select one invoker in list.

? ? *

? ? * @param invokers? invokers.

? ? * @param url? ? ? ? refer url

? ? * @param invocation invocation.

? ? * @return selected invoker.

? ? */

? ? @Adaptive("loadbalance")

? ? <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;

}

SPI定義了默認(rèn)的策略為RandomLoadBalance迫淹,提供了一個select方法,通過策略從服務(wù)列表中選擇一個invoker为严;dubbo默認(rèn)提供了多種策略:

Random LoadBalance:隨機敛熬,按權(quán)重設(shè)置隨機概率,在一個截面上碰撞的概率高第股,但調(diào)用量越大分布越均勻应民,而且按概率使用權(quán)重后也比較均勻,有利于動態(tài)調(diào)整提供者權(quán)重夕吻;

RoundRobin LoadBalance:輪詢诲锹,按公約后的權(quán)重設(shè)置輪詢比率;存在慢的提供者累積請求的問題涉馅,比如:第二臺機器很慢归园,但沒掛,當(dāng)請求調(diào)到第二臺時就卡在那稚矿,

久而久之庸诱,所有請求都卡在調(diào)到第二臺上;

LeastActive LoadBalance:最少活躍調(diào)用數(shù)晤揣,相同活躍數(shù)的隨機桥爽,活躍數(shù)指調(diào)用前后計數(shù)差;使慢的提供者收到更少請求昧识,因為越慢的提供者的調(diào)用前后計數(shù)差會越大钠四;

ConsistentHash LoadBalance:一致性 Hash,相同參數(shù)的請求總是發(fā)到同一提供者滞诺;當(dāng)某一臺提供者掛時形导,原本發(fā)往該提供者的請求,基于虛擬節(jié)點习霹,平攤到其它提供者朵耕,不會引起劇烈變動;

下面重點看一下默認(rèn)的RandomLoadBalance源碼

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {

? ? ? ? int length = invokers.size(); // Number of invokers

? ? ? ? int totalWeight = 0; // The sum of weights

? ? ? ? boolean sameWeight = true; // Every invoker has the same weight?

? ? ? ? for (int i = 0; i < length; i++) {

? ? ? ? ? ? int weight = getWeight(invokers.get(i), invocation);

? ? ? ? ? ? totalWeight += weight; // Sum

? ? ? ? ? ? if (sameWeight && i > 0

? ? ? ? ? ? ? ? ? ? && weight != getWeight(invokers.get(i - 1), invocation)) {

? ? ? ? ? ? ? ? sameWeight = false;

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? if (totalWeight > 0 && !sameWeight) {

? ? ? ? ? ? // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.

? ? ? ? ? ? int offset = random.nextInt(totalWeight);

? ? ? ? ? ? // Return a invoker based on the random value.

? ? ? ? ? ? for (int i = 0; i < length; i++) {

? ? ? ? ? ? ? ? offset -= getWeight(invokers.get(i), invocation);

? ? ? ? ? ? ? ? if (offset < 0) {

? ? ? ? ? ? ? ? ? ? return invokers.get(i);

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? // If all invokers have the same weight value or totalWeight=0, return evenly.

? ? ? ? return invokers.get(random.nextInt(length));

? ? }

首先計算總權(quán)重淋叶,同時檢查是否每一個服務(wù)都有相同的權(quán)重阎曹;如果總權(quán)重大于0并且服務(wù)的權(quán)重都不相同,則通過權(quán)重來隨機選擇,否則直接通過Random函數(shù)來隨機处嫌;

總結(jié)

本文圍繞Cluster層中的幾個重要的接口從上到下來分別介紹栅贴,并重點介紹了其中的某些實現(xiàn)類;結(jié)合官方提供的調(diào)用圖熏迹,還是很容易理解此層的檐薯。

歡迎工作一到五年的Java工程師朋友們加入Java架構(gòu)開發(fā): 855835163

群內(nèi)提供免費的Java架構(gòu)學(xué)習(xí)資料(里面有高可用、高并發(fā)注暗、高性能及分布式坛缕、Jvm性能調(diào)優(yōu)、Spring源碼捆昏,MyBatis赚楚,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個知識點的架構(gòu)資料)合理利用自己每一分每一秒的時間來學(xué)習(xí)提升自己,不要再用"沒有時間“來掩飾自己思想上的懶惰骗卜!趁年輕宠页,使勁拼,給未來的自己一個交代寇仓!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末举户,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子焚刺,更是在濱河造成了極大的恐慌敛摘,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,406評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件乳愉,死亡現(xiàn)場離奇詭異,居然都是意外死亡屯远,警方通過查閱死者的電腦和手機蔓姚,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,395評論 3 398
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來慨丐,“玉大人坡脐,你說我怎么就攤上這事》拷遥” “怎么了备闲?”我有些...
    開封第一講書人閱讀 167,815評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長捅暴。 經(jīng)常有香客問我恬砂,道長,這世上最難降的妖魔是什么蓬痒? 我笑而不...
    開封第一講書人閱讀 59,537評論 1 296
  • 正文 為了忘掉前任泻骤,我火速辦了婚禮,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘狱掂。我一直安慰自己演痒,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 68,536評論 6 397
  • 文/花漫 我一把揭開白布趋惨。 她就那樣靜靜地躺著鸟顺,像睡著了一般。 火紅的嫁衣襯著肌膚如雪器虾。 梳的紋絲不亂的頭發(fā)上讯嫂,一...
    開封第一講書人閱讀 52,184評論 1 308
  • 那天,我揣著相機與錄音曾撤,去河邊找鬼端姚。 笑死,一個胖子當(dāng)著我的面吹牛挤悉,可吹牛的內(nèi)容都是我干的渐裸。 我是一名探鬼主播,決...
    沈念sama閱讀 40,776評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼装悲,長吁一口氣:“原來是場噩夢啊……” “哼昏鹃!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起诀诊,我...
    開封第一講書人閱讀 39,668評論 0 276
  • 序言:老撾萬榮一對情侶失蹤洞渤,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后属瓣,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體载迄,經(jīng)...
    沈念sama閱讀 46,212評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,299評論 3 340
  • 正文 我和宋清朗相戀三年抡蛙,在試婚紗的時候發(fā)現(xiàn)自己被綠了护昧。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,438評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡粗截,死狀恐怖惋耙,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情熊昌,我是刑警寧澤绽榛,帶...
    沈念sama閱讀 36,128評論 5 349
  • 正文 年R本政府宣布,位于F島的核電站婿屹,受9級特大地震影響灭美,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜选泻,卻給世界環(huán)境...
    茶點故事閱讀 41,807評論 3 333
  • 文/蒙蒙 一冲粤、第九天 我趴在偏房一處隱蔽的房頂上張望美莫。 院中可真熱鬧,春花似錦梯捕、人聲如沸厢呵。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,279評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽襟铭。三九已至,卻和暖如春短曾,著一層夾襖步出監(jiān)牢的瞬間寒砖,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,395評論 1 272
  • 我被黑心中介騙來泰國打工嫉拐, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留哩都,地道東北人。 一個月前我還...
    沈念sama閱讀 48,827評論 3 376
  • 正文 我出身青樓婉徘,卻偏偏與公主長得像漠嵌,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子盖呼,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,446評論 2 359

推薦閱讀更多精彩內(nèi)容