Dubbo源碼分析(十五) Merger實(shí)現(xiàn)

下面我們來說一下Dubbo的Merger實(shí)現(xiàn)万细。在開發(fā)中粪般,有這么一種情況尝哆,先定義了一個UserService接口频蛔,有UserServiceImpl和CategoryUserServiceImpl兩種實(shí)現(xiàn),它們又分別屬于user和category兩個組照雁,consumer將調(diào)用這兩個服務(wù)蚕愤,并按照自定義策略合并返回結(jié)果,作為最終結(jié)果饺蚊。這就需要Dubbo的Merger來實(shí)現(xiàn)了萍诱。我們先來看一下MergeableClusterInvoker的invoke方法

public Result invoke(final Invocation invocation) throws RpcException {
    // 獲得 Invoker 集合
    List<Invoker<T>> invokers = directory.list(invocation);
    // 獲得 Merger 拓展名
    String merger = getUrl().getMethodParameter(invocation.getMethodName(), Constants.MERGER_KEY);
    // 若果未配置拓展,直接調(diào)用首個可用的 Invoker 對象
    if (ConfigUtils.isEmpty(merger)) { // If a method doesn't have a merger, only invoke one Group
        for (final Invoker<T> invoker : invokers) {
            if (invoker.isAvailable()) {
                return invoker.invoke(invocation);
            }
        }
        return invokers.iterator().next().invoke(invocation);
    }

    // 通過反射污呼,獲得返回類型
    Class<?> returnType;
    try {
        returnType = getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes()).getReturnType();
    } catch (NoSuchMethodException e) {
        returnType = null;
    }

    // 提交線程池裕坊,并行執(zhí)行,發(fā)起 RPC 調(diào)用曙求,并添加到 results 中
    Map<String, Future<Result>> results = new HashMap<String, Future<Result>>();
    for (final Invoker<T> invoker : invokers) {
        Future<Result> future = executor.submit(new Callable<Result>() {
            public Result call() {
                // RPC 調(diào)用
                return invoker.invoke(new RpcInvocation(invocation, invoker));
            }
        });
        results.put(invoker.getUrl().getServiceKey(), future);
    }

    // 阻塞等待執(zhí)行執(zhí)行結(jié)果碍庵,并添加到 resultList 中
    List<Result> resultList = new ArrayList<Result>(results.size());
    int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
    for (Map.Entry<String, Future<Result>> entry : results.entrySet()) {
        Future<Result> future = entry.getValue();
        try {
            Result r = future.get(timeout, TimeUnit.MILLISECONDS);
            if (r.hasException()) { // 異常 Result 映企,打印錯誤日志悟狱,忽略
                log.error(new StringBuilder(32).append("Invoke ").append(getGroupDescFromServiceKey(entry.getKey())).append(" failed: ").append(r.getException().getMessage()).toString(), r.getException());
            } else { // 正常 Result ,添加到 resultList 中
                resultList.add(r);
            }
        } catch (Exception e) { // 異常堰氓,拋出 RpcException 異常
            throw new RpcException(new StringBuilder(32).append("Failed to invoke service ").append(entry.getKey()).append(": ").append(e.getMessage()).toString(), e);
        }
    }

    // 結(jié)果大小為空挤渐,返回空的 RpcResult
    if (resultList.isEmpty()) {
        return new RpcResult((Object) null);
    // 結(jié)果大小為 1 ,返回首個 RpcResult
    } else if (resultList.size() == 1) {
        return resultList.iterator().next();
    }
    // 返回類型為 void 双絮,返回空的 RpcResult
    if (returnType == void.class) {
        return new RpcResult((Object) null);
    }

    Object result;
    // 【第 1 種】基于合并方法
    if (merger.startsWith(".")) {
        // 獲得合并方法 Method
        merger = merger.substring(1);
        Method method;
        try {
            method = returnType.getMethod(merger, returnType);
        } catch (NoSuchMethodException e) {
            throw new RpcException(new StringBuilder(32).append("Can not merge result because missing method [ ").append(merger).append(" ] in class [ ").append(returnType.getClass().getName()).append(" ]").toString());
        }
        // 有 Method 浴麻,進(jìn)行合并
        if (method != null) {
            if (!Modifier.isPublic(method.getModifiers())) {
                method.setAccessible(true);
            }
            result = resultList.remove(0).getValue();
            try {
                // 方法返回類型匹配得问,合并時,修改 result
                if (method.getReturnType() != void.class && method.getReturnType().isAssignableFrom(result.getClass())) {
                    for (Result r : resultList) {
                        result = method.invoke(result, r.getValue());
                    }
                // 方法返回類型不匹配软免,合并時宫纬,不修改 result
                } else {
                    for (Result r : resultList) {
                        method.invoke(result, r.getValue());
                    }
                }
            } catch (Exception e) {
                throw new RpcException(new StringBuilder(32).append("Can not merge result: ").append(e.getMessage()).toString(), e);
            }
        // 無 Method ,拋出 RpcException 異常
        } else {
            throw new RpcException(new StringBuilder(32).append("Can not merge result because missing method [ ").append(merger).append(" ] in class [ ").append(returnType.getClass().getName()).append(" ]").toString());
        }
    // 【第 2 種】基于 Merger
    } else {
        Merger resultMerger;
        // 【第 2.1 種】根據(jù)返回值類型自動匹配 Merger
        if (ConfigUtils.isDefault(merger)) {
            resultMerger = MergerFactory.getMerger(returnType);
        // 【第 2.2 種】指定 Merger
        } else {
            resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger);
        }
        // 有 Merger 膏萧,進(jìn)行合并
        if (resultMerger != null) {
            List<Object> rets = new ArrayList<Object>(resultList.size());
            for (Result r : resultList) {
                rets.add(r.getValue());
            }
            result = resultMerger.merge(rets.toArray((Object[]) Array.newInstance(returnType, 0)));
        // 無 Merger 漓骚,拋出 RpcException 異常
        } else {
            throw new RpcException("There is no merger to merge result.");
        }
    }
    // 返回 RpcResult 結(jié)果
    return new RpcResult(result);
}

調(diào)用了Merger進(jìn)行結(jié)果的合并處理。我們再來看一個類MergerFactory榛泛,這個類是生產(chǎn)Merger的工廠類

 public static <T> Merger<T> getMerger(Class<T> returnType) {
    Merger result;
    // 數(shù)組類型
    if (returnType.isArray()) {
        Class type = returnType.getComponentType();
        // 從緩存中獲得 Merger 對象
        result = mergerCache.get(type);
        if (result == null) {
            loadMergers();
            result = mergerCache.get(type);
        }
        // 獲取不到蝌蹂,使用 ArrayMerger
        if (result == null && !type.isPrimitive()) {
            result = ArrayMerger.INSTANCE;
        }
    // 普通類型
    } else {
        // 從緩存中獲得 Merger 對象
        result = mergerCache.get(returnType);
        if (result == null) {
            loadMergers();
            result = mergerCache.get(returnType);
        }
    }
    return result;
}

我們再來看一個類ArrayMerger,這個類是數(shù)組結(jié)果合并的處理類曹锨,實(shí)現(xiàn)了merge方法

if (others.length == 0) {
        return null;
    }
    int totalLen = 0;
    for (int i = 0; i < others.length; i++) {
        Object item = others[i];
        if (item != null && item.getClass().isArray()) {
            totalLen += Array.getLength(item);
        } else {
            throw new IllegalArgumentException(
                    new StringBuilder(32).append(i + 1)
                            .append("th argument is not an array").toString());
        }
    }

    if (totalLen == 0) {
        return null;
    }

    Class<?> type = others[0].getClass().getComponentType();

    Object result = Array.newInstance(type, totalLen);
    int index = 0;
    for (Object array : others) {
        for (int i = 0; i < Array.getLength(array); i++) {
            Array.set(result, index++, Array.get(array, i));
        }
    }
    return (Object[]) result;

再來看一個類MapMerger孤个,這個類是Map結(jié)果合并的處理類

if (items.length == 0) {
        return null;
    }
    // 創(chuàng)建結(jié)果 Map
    Map<Object, Object> result = new HashMap<Object, Object>();
    // 合并多個 Map
    for (Map<?, ?> item : items) {
        if (item != null) {
            result.putAll(item);
        }
    }
    return result;

還有很多數(shù)據(jù)類型的處理類,比如BooleanArrayMerger,ByteArrayMerger,CharArrayMerger等沛简,就不在這里具體說了齐鲤。
通過Merger實(shí)現(xiàn)了對多個返回結(jié)果的處理。
Dubbo的Merger機(jī)制就介紹到這里了椒楣。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末佳遂,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子撒顿,更是在濱河造成了極大的恐慌丑罪,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,548評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件凤壁,死亡現(xiàn)場離奇詭異吩屹,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)拧抖,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,497評論 3 399
  • 文/潘曉璐 我一進(jìn)店門煤搜,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人唧席,你說我怎么就攤上這事擦盾。” “怎么了淌哟?”我有些...
    開封第一講書人閱讀 167,990評論 0 360
  • 文/不壞的土叔 我叫張陵迹卢,是天一觀的道長。 經(jīng)常有香客問我徒仓,道長腐碱,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,618評論 1 296
  • 正文 為了忘掉前任掉弛,我火速辦了婚禮症见,結(jié)果婚禮上喂走,老公的妹妹穿的比我還像新娘。我一直安慰自己谋作,他們只是感情好芋肠,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,618評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著遵蚜,像睡著了一般业栅。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上谬晕,一...
    開封第一講書人閱讀 52,246評論 1 308
  • 那天碘裕,我揣著相機(jī)與錄音,去河邊找鬼攒钳。 笑死帮孔,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的不撑。 我是一名探鬼主播文兢,決...
    沈念sama閱讀 40,819評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼焕檬!你這毒婦竟也來了姆坚?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,725評論 0 276
  • 序言:老撾萬榮一對情侶失蹤实愚,失蹤者是張志新(化名)和其女友劉穎兼呵,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體腊敲,經(jīng)...
    沈念sama閱讀 46,268評論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡击喂,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,356評論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了碰辅。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片懂昂。...
    茶點(diǎn)故事閱讀 40,488評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖没宾,靈堂內(nèi)的尸體忽然破棺而出凌彬,到底是詐尸還是另有隱情,我是刑警寧澤循衰,帶...
    沈念sama閱讀 36,181評論 5 350
  • 正文 年R本政府宣布铲敛,位于F島的核電站,受9級特大地震影響羹蚣,放射性物質(zhì)發(fā)生泄漏原探。R本人自食惡果不足惜乱凿,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,862評論 3 333
  • 文/蒙蒙 一顽素、第九天 我趴在偏房一處隱蔽的房頂上張望咽弦。 院中可真熱鬧,春花似錦胁出、人聲如沸型型。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,331評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽闹蒜。三九已至,卻和暖如春抑淫,著一層夾襖步出監(jiān)牢的瞬間绷落,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,445評論 1 272
  • 我被黑心中介騙來泰國打工始苇, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留砌烁,地道東北人。 一個月前我還...
    沈念sama閱讀 48,897評論 3 376
  • 正文 我出身青樓催式,卻偏偏與公主長得像函喉,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子荣月,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,500評論 2 359

推薦閱讀更多精彩內(nèi)容