

本文繼續(xù)分析dubbo的cluster層松却,此層封裝多個提供者的路由及負(fù)載均衡奠货,并橋接注冊中心封孙,以Invoker為中心主慰,擴展接口為Cluster, Directory, Router, LoadBalance;






Cluster將Directory中的多個Invoker偽裝成一個 Invoker抱究,對上層透明,偽裝過程包含了容錯邏輯带斑,調(diào)用失敗后鼓寺,重試另一個;





public interface Cluster {

? ? /**

? ? * Merge the directory invokers to a virtual invoker.

? ? *

? ? * @param <T>

? ? * @param directory

? ? * @return cluster invoker

? ? * @throws RpcException

? ? */

? ? @Adaptive

? ? <T> Invoker<T> join(Directory<T> directory) throws RpcException;



Failover Cluster:失敗自動切換习勤,當(dāng)出現(xiàn)失敗踪栋,重試其它服務(wù)器 [1]。通常用于讀操作图毕,但重試會帶來更長延遲夷都。可通過 retries=”2″ 來設(shè)置重試次數(shù)(不含第一次)予颤。

Failfast Cluster:快速失敗囤官,只發(fā)起一次調(diào)用冬阳,失敗立即報錯。通常用于非冪等性的寫操作党饮,比如新增記錄肝陪。

Failsafe Cluster:失敗安全,出現(xiàn)異常時劫谅,直接忽略见坑。通常用于寫入審計日志等操作嚷掠。

Failback Cluster:失敗自動恢復(fù)捏检,后臺記錄失敗請求,定時重發(fā)不皆。通常用于消息通知操作贯城。

Forking Cluster:并行調(diào)用多個服務(wù)器,只要一個成功即返回霹娄。通常用于實時性要求較高的讀操作能犯,但需要浪費更多服務(wù)資源∪埽可通過 forks=”2″ 來設(shè)置最大并行數(shù)踩晶。

Broadcast Cluster:廣播調(diào)用所有提供者,逐個調(diào)用枕磁,任意一臺報錯則報錯 [2]渡蜻。通常用于通知所有提供者更新緩存或日志等本地資源信息。


public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {

? ? ? ? List<Invoker<T>> copyinvokers = invokers;

? ? ? ? checkInvokers(copyinvokers, invocation);

? ? ? ? int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;

? ? ? ? if (len <= 0) {

? ? ? ? ? ? len = 1;

? ? ? ? }

? ? ? ? // retry loop.

? ? ? ? RpcException le = null; // last exception.

? ? ? ? List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.

? ? ? ? Set<String> providers = new HashSet<String>(len);

? ? ? ? for (int i = 0; i < len; i++) {

? ? ? ? ? ? //Reselect before retry to avoid a change of candidate `invokers`.

? ? ? ? ? ? //NOTE: if `invokers` changed, then `invoked` also lose accuracy.

? ? ? ? ? ? if (i > 0) {

? ? ? ? ? ? ? ? checkWhetherDestroyed();

? ? ? ? ? ? ? ? copyinvokers = list(invocation);

? ? ? ? ? ? ? ? // check again

? ? ? ? ? ? ? ? checkInvokers(copyinvokers, invocation);

? ? ? ? ? ? }

? ? ? ? ? ? Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);

? ? ? ? ? ? invoked.add(invoker);

? ? ? ? ? ? RpcContext.getContext().setInvokers((List) invoked);

? ? ? ? ? ? try {

? ? ? ? ? ? ? ? Result result = invoker.invoke(invocation);

? ? ? ? ? ? ? ? if (le != null && logger.isWarnEnabled()) {

? ? ? ? ? ? ? ? ? ? logger.warn("Although retry the method " + invocation.getMethodName()

? ? ? ? ? ? ? ? ? ? ? ? ? ? + " in the service " + getInterface().getName()

? ? ? ? ? ? ? ? ? ? ? ? ? ? + " was successful by the provider " + invoker.getUrl().getAddress()

? ? ? ? ? ? ? ? ? ? ? ? ? ? + ", but there have been failed providers " + providers

? ? ? ? ? ? ? ? ? ? ? ? ? ? + " (" + providers.size() + "/" + copyinvokers.size()

? ? ? ? ? ? ? ? ? ? ? ? ? ? + ") from the registry " + directory.getUrl().getAddress()

? ? ? ? ? ? ? ? ? ? ? ? ? ? + " on the consumer " + NetUtils.getLocalHost()

? ? ? ? ? ? ? ? ? ? ? ? ? ? + " using the dubbo version " + Version.getVersion() + ". Last error is: "

? ? ? ? ? ? ? ? ? ? ? ? ? ? + le.getMessage(), le);

? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? return result;

? ? ? ? ? ? } catch (RpcException e) {

? ? ? ? ? ? ? ? if (e.isBiz()) { // biz exception.

? ? ? ? ? ? ? ? ? ? throw e;

? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? le = e;

? ? ? ? ? ? } catch (Throwable e) {

? ? ? ? ? ? ? ? le = new RpcException(e.getMessage(), e);

? ? ? ? ? ? } finally {

? ? ? ? ? ? ? ? providers.add(invoker.getUrl().getAddress());

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "

? ? ? ? ? ? ? ? + invocation.getMethodName() + " in the service " + getInterface().getName()

? ? ? ? ? ? ? ? + ". Tried " + len + " times of the providers " + providers

? ? ? ? ? ? ? ? + " (" + providers.size() + "/" + copyinvokers.size()

? ? ? ? ? ? ? ? + ") from the registry " + directory.getUrl().getAddress()

? ? ? ? ? ? ? ? + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "

? ? ? ? ? ? ? ? + Version.getVersion() + ". Last error is: "

? ? ? ? ? ? ? ? + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);

? ? }



public Result invoke(final Invocation invocation) throws RpcException {

? ? ? ? checkWhetherDestroyed();

? ? ? ? LoadBalance loadbalance = null;

? ? ? ? List<Invoker<T>> invokers = list(invocation);

? ? ? ? if (invokers != null && !invokers.isEmpty()) {

? ? ? ? ? ? loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()

? ? ? ? ? ? ? ? ? ? .getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));

? ? ? ? }

? ? ? ? RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

? ? ? ? return doInvoke(invocation, invokers, loadbalance);

? ? }

protected List<Invoker<T>> list(Invocation invocation) throws RpcException {

? ? ? ? List<Invoker<T>> invokers = directory.list(invocation);

? ? ? ? return invokers;

? ? }




public interface Directory<T> extends Node {

? ? /**

? ? * get service type.

? ? *

? ? * @return service type.

? ? */

? ? Class<T> getInterface();

? ? /**

? ? * list invokers.

? ? *

? ? * @return invokers

? ? */

? ? List<Invoker<T>> list(Invocation invocation) throws RpcException;



public List<Invoker<T>> list(Invocation invocation) throws RpcException {

? ? ? ? if (destroyed) {

? ? ? ? ? ? throw new RpcException("Directory already destroyed .url: " + getUrl());

? ? ? ? }

? ? ? ? List<Invoker<T>> invokers = doList(invocation);

? ? ? ? List<Router> localRouters = this.routers; // local reference

? ? ? ? if (localRouters != null && !localRouters.isEmpty()) {

? ? ? ? ? ? for (Router router : localRouters) {

? ? ? ? ? ? ? ? try {

? ? ? ? ? ? ? ? ? ? if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {

? ? ? ? ? ? ? ? ? ? ? ? invokers = router.route(invokers, getConsumerUrl(), invocation);

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? } catch (Throwable t) {

? ? ? ? ? ? ? ? ? ? logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? return invokers;

? ? }




public synchronized void notify(List<URL> urls) {

? ? ? ? List<URL> invokerUrls = new ArrayList<URL>();

? ? ? ? List<URL> routerUrls = new ArrayList<URL>();

? ? ? ? List<URL> configuratorUrls = new ArrayList<URL>();

? ? ? ? for (URL url : urls) {

? ? ? ? ? ? String protocol = url.getProtocol();

? ? ? ? ? ? String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);

? ? ? ? ? ? if (Constants.ROUTERS_CATEGORY.equals(category)

? ? ? ? ? ? ? ? ? ? || Constants.ROUTE_PROTOCOL.equals(protocol)) {

? ? ? ? ? ? ? ? routerUrls.add(url);

? ? ? ? ? ? } else if (Constants.CONFIGURATORS_CATEGORY.equals(category)

? ? ? ? ? ? ? ? ? ? || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {

? ? ? ? ? ? ? ? configuratorUrls.add(url);

? ? ? ? ? ? } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {

? ? ? ? ? ? ? ? invokerUrls.add(url);

? ? ? ? ? ? } else {

? ? ? ? ? ? ? ? logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? // configurators

? ? ? ? if (configuratorUrls != null && !configuratorUrls.isEmpty()) {

? ? ? ? ? ? this.configurators = toConfigurators(configuratorUrls);

? ? ? ? }

? ? ? ? // routers

? ? ? ? if (routerUrls != null && !routerUrls.isEmpty()) {

? ? ? ? ? ? List<Router> routers = toRouters(routerUrls);

? ? ? ? ? ? if (routers != null) { // null - do nothing

? ? ? ? ? ? ? ? setRouters(routers);

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? List<Configurator> localConfigurators = this.configurators; // local reference

? ? ? ? // merge override parameters

? ? ? ? this.overrideDirectoryUrl = directoryUrl;

? ? ? ? if (localConfigurators != null && !localConfigurators.isEmpty()) {

? ? ? ? ? ? for (Configurator configurator : localConfigurators) {

? ? ? ? ? ? ? ? this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? // providers

? ? ? ? refreshInvoker(invokerUrls);

? ? }



配置規(guī)則:向注冊中心寫入動態(tài)配置覆蓋規(guī)則 [1]。該功能通常由監(jiān)控中心或治理中心的頁面完成佛猛;



public List<Invoker<T>> doList(Invocation invocation) {

? ? ? ? if (forbidden) {

? ? ? ? ? ? // 1. No service provider 2. Service providers are disabled

? ? ? ? ? ? throw new RpcException(RpcException.FORBIDDEN_EXCEPTION,

? ? ? ? ? ? ? ? "No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +? NetUtils.getLocalHost()

? ? ? ? ? ? ? ? ? ? ? ? + " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist).");

? ? ? ? }

? ? ? ? List<Invoker<T>> invokers = null;

? ? ? ? Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference

? ? ? ? if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {

? ? ? ? ? ? String methodName = RpcUtils.getMethodName(invocation);

? ? ? ? ? ? Object[] args = RpcUtils.getArguments(invocation);

? ? ? ? ? ? if (args != null && args.length > 0 && args[0] != null

? ? ? ? ? ? ? ? ? ? && (args[0] instanceof String || args[0].getClass().isEnum())) {

? ? ? ? ? ? ? ? invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter

? ? ? ? ? ? }

? ? ? ? ? ? if (invokers == null) {

? ? ? ? ? ? ? ? invokers = localMethodInvokerMap.get(methodName);

? ? ? ? ? ? }

? ? ? ? ? ? if (invokers == null) {

? ? ? ? ? ? ? ? invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);

? ? ? ? ? ? }

? ? ? ? ? ? if (invokers == null) {

? ? ? ? ? ? ? ? Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();

? ? ? ? ? ? ? ? if (iterator.hasNext()) {

? ? ? ? ? ? ? ? ? ? invokers = iterator.next();

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;

? ? }





protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {

? ? ? ? return invokers;

? ? }




public interface Router extends Comparable<Router> {

? ? /**

? ? * get the router url.

? ? *

? ? * @return url

? ? */

? ? URL getUrl();

? ? /**

? ? * route.

? ? *

? ? * @param invokers

? ? * @param url? ? ? ? refer url

? ? * @param invocation

? ? * @return routed invokers

? ? * @throws RpcException

? ? */

? ? <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;



ScriptRouter:腳本路由規(guī)則支持 JDK 腳本引擎的所有腳本寓涨,比如:javascript, jruby, groovy 等盯串,通過type=javascript參數(shù)設(shè)置腳本類型,缺省為javascript戒良;

ConditionRouter:基于條件表達(dá)式的路由規(guī)則体捏,如:host = => host =;=> 之前的為消費者匹配條件糯崎,所有參數(shù)和消費者的 URL 進行對比几缭,=> 之后為提供者地址列表的過濾條件,所有參數(shù)和提供者的 URL 進行對比沃呢;



public ScriptRouter(URL url) {

? ? ? ? this.url = url;

? ? ? ? String type = url.getParameter(Constants.TYPE_KEY);

? ? ? ? this.priority = url.getParameter(Constants.PRIORITY_KEY, 0);

? ? ? ? String rule = url.getParameterAndDecoded(Constants.RULE_KEY);

? ? ? ? if (type == null || type.length() == 0) {

? ? ? ? ? ? type = Constants.DEFAULT_SCRIPT_TYPE_KEY;

? ? ? ? }

? ? ? ? if (rule == null || rule.length() == 0) {

? ? ? ? ? ? throw new IllegalStateException(new IllegalStateException("route rule can not be empty. rule:" + rule));

? ? ? ? }

? ? ? ? ScriptEngine engine = engines.get(type);

? ? ? ? if (engine == null) {

? ? ? ? ? ? engine = new ScriptEngineManager().getEngineByName(type);

? ? ? ? ? ? if (engine == null) {

? ? ? ? ? ? ? ? throw new IllegalStateException(new IllegalStateException("Unsupported route rule type: " + type + ", rule: " + rule));

? ? ? ? ? ? }

? ? ? ? ? ? engines.put(type, engine);

? ? ? ? }

? ? ? ? this.engine = engine;

? ? ? ? this.rule = rule;

? ? }


"script://" + URL.encode("(function route(invokers) { ... } (invokers))")


(function route(invokers) {

? ? var result = new java.util.ArrayList(invokers.size());

? ? for (i = 0; i < invokers.size(); i ++) {

? ? ? ? if ("".equals(invokers.get(i).getUrl().getHost())) {

? ? ? ? ? ? result.add(invokers.get(i));

? ? ? ? }

? ? }

? ? return result;

} (invokers)); // 表示立即執(zhí)行方法


public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {

? ? ? ? try {

? ? ? ? ? ? List<Invoker<T>> invokersCopy = new ArrayList<Invoker<T>>(invokers);

? ? ? ? ? ? Compilable compilable = (Compilable) engine;

? ? ? ? ? ? Bindings bindings = engine.createBindings();

? ? ? ? ? ? bindings.put("invokers", invokersCopy);

? ? ? ? ? ? bindings.put("invocation", invocation);

? ? ? ? ? ? bindings.put("context", RpcContext.getContext());

? ? ? ? ? ? CompiledScript function = compilable.compile(rule);

? ? ? ? ? ? Object obj = function.eval(bindings);

? ? ? ? ? ? if (obj instanceof Invoker[]) {

? ? ? ? ? ? ? ? invokersCopy = Arrays.asList((Invoker<T>[]) obj);

? ? ? ? ? ? } else if (obj instanceof Object[]) {

? ? ? ? ? ? ? ? invokersCopy = new ArrayList<Invoker<T>>();

? ? ? ? ? ? ? ? for (Object inv : (Object[]) obj) {

? ? ? ? ? ? ? ? ? ? invokersCopy.add((Invoker<T>) inv);

? ? ? ? ? ? ? ? }

? ? ? ? ? ? } else {

? ? ? ? ? ? ? ? invokersCopy = (List<Invoker<T>>) obj;

? ? ? ? ? ? }

? ? ? ? ? ? return invokersCopy;

? ? ? ? } catch (ScriptException e) {

? ? ? ? ? ? //fail then ignore rule .invokers.

? ? ? ? ? ? logger.error("route error , rule has been ignored. rule: " + rule + ", method:" + invocation.getMethodName() + ", url: " + RpcContext.getContext().getUrl(), e);

? ? ? ? ? ? return invokers;

? ? ? ? }

? ? }





public interface LoadBalance {

? ? /**

? ? * select one invoker in list.

? ? *

? ? * @param invokers? invokers.

? ? * @param url? ? ? ? refer url

? ? * @param invocation invocation.

? ? * @return selected invoker.

? ? */

? ? @Adaptive("loadbalance")

? ? <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;



Random LoadBalance:隨機敛熬,按權(quán)重設(shè)置隨機概率,在一個截面上碰撞的概率高第股,但調(diào)用量越大分布越均勻应民,而且按概率使用權(quán)重后也比較均勻,有利于動態(tài)調(diào)整提供者權(quán)重夕吻;

RoundRobin LoadBalance:輪詢诲锹,按公約后的權(quán)重設(shè)置輪詢比率;存在慢的提供者累積請求的問題涉馅,比如:第二臺機器很慢归园,但沒掛,當(dāng)請求調(diào)到第二臺時就卡在那稚矿,


LeastActive LoadBalance:最少活躍調(diào)用數(shù)晤揣,相同活躍數(shù)的隨機桥爽,活躍數(shù)指調(diào)用前后計數(shù)差;使慢的提供者收到更少請求昧识,因為越慢的提供者的調(diào)用前后計數(shù)差會越大钠四;

ConsistentHash LoadBalance:一致性 Hash,相同參數(shù)的請求總是發(fā)到同一提供者滞诺;當(dāng)某一臺提供者掛時形导,原本發(fā)往該提供者的請求,基于虛擬節(jié)點习霹,平攤到其它提供者朵耕,不會引起劇烈變動;


protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {

? ? ? ? int length = invokers.size(); // Number of invokers

? ? ? ? int totalWeight = 0; // The sum of weights

? ? ? ? boolean sameWeight = true; // Every invoker has the same weight?

? ? ? ? for (int i = 0; i < length; i++) {

? ? ? ? ? ? int weight = getWeight(invokers.get(i), invocation);

? ? ? ? ? ? totalWeight += weight; // Sum

? ? ? ? ? ? if (sameWeight && i > 0

? ? ? ? ? ? ? ? ? ? && weight != getWeight(invokers.get(i - 1), invocation)) {

? ? ? ? ? ? ? ? sameWeight = false;

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? if (totalWeight > 0 && !sameWeight) {

? ? ? ? ? ? // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.

? ? ? ? ? ? int offset = random.nextInt(totalWeight);

? ? ? ? ? ? // Return a invoker based on the random value.

? ? ? ? ? ? for (int i = 0; i < length; i++) {

? ? ? ? ? ? ? ? offset -= getWeight(invokers.get(i), invocation);

? ? ? ? ? ? ? ? if (offset < 0) {

? ? ? ? ? ? ? ? ? ? return invokers.get(i);

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? // If all invokers have the same weight value or totalWeight=0, return evenly.

? ? ? ? return invokers.get(random.nextInt(length));

? ? }




