前言
MergeableClusterInvoker 中會讀取 URL 中的 merger 參數(shù)值蝠咆,如果 merger 參數(shù)以 "." 開頭探膊,則表示 "." 后的內(nèi)容是一個方法名哼御,這個方法名是遠程目標方法的返回類型中的一個方法逗余,MergeableClusterInvoker 在拿到所有 Invoker 返回的結(jié)果對象之后说榆,會遍歷每個返回結(jié)果喉誊,并調(diào)用 merger 參數(shù)指定的方法,合并這些結(jié)果值羔挡。
其實洁奈,除了上述指定 Merger 方法名稱的合并方式之外,Dubbo 內(nèi)部還提供了很多默認的 Merger 實現(xiàn)绞灼,這也就是本課時將要分析的內(nèi)容睬魂。本文將詳細介紹 MergerFactory 工廠類、Merger 接口以及針對 Java 中常見數(shù)據(jù)類型的 Merger 實現(xiàn)镀赌。
MergerFactory
在 MergeableClusterInvoker 使用默認 Merger 實現(xiàn)的時候氯哮,會通過 MergerFactory 以及服務(wù)接口返回值類型(returnType),選擇合適的 Merger 實現(xiàn)商佛。
在 MergerFactory 中維護了一個 ConcurrentHashMap 集合(即 MERGER_CACHE 字段)喉钢,用來緩存服務(wù)接口返回值類型與 Merger 實例之間的映射關(guān)系。
MergerFactory.getMerger() 方法會根據(jù)傳入的 returnType 類型良姆,從 MERGER_CACHE 緩存中查找相應(yīng)的 Merger 實現(xiàn)肠虽,下面我們來看該方法的具體實現(xiàn):
public class MergerFactory {
private static final ConcurrentMap<Class<?>, Merger<?>> MERGER_CACHE =
new ConcurrentHashMap<Class<?>, Merger<?>>();
public static <T> Merger<T> getMerger(Class<T> returnType) {
if (returnType == null) {
// returnType為空,直接拋出異常
throw new IllegalArgumentException("returnType is null");
}
Merger result;
// returnType為數(shù)組類型
if (returnType.isArray()) {
// 獲取數(shù)組中元素的類型
Class type = returnType.getComponentType();
// 獲取元素類型對應(yīng)的Merger實現(xiàn)
result = MERGER_CACHE.get(type);
if (result == null) {
loadMergers();
result = MERGER_CACHE.get(type);
}
// 如果Dubbo沒有提供元素類型對應(yīng)的Merger實現(xiàn)玛追,則返回ArrayMerger
if (result == null && !type.isPrimitive()) {
result = ArrayMerger.INSTANCE;
}
} else {
// 如果returnType不是數(shù)組類型税课,則直接從MERGER_CACHE緩存查找對應(yīng)的Merger實例
result = MERGER_CACHE.get(returnType);
if (result == null) {
loadMergers();
result = MERGER_CACHE.get(returnType);
}
}
return result;
}
}
loadMergers() 方法會通過 Dubbo SPI 方式加載 Merger 接口全部擴展實現(xiàn)的名稱,并填充到 MERGER_CACHE 集合中痊剖,具體實現(xiàn)如下:
public class MergerFactory {
private static final ConcurrentMap<Class<?>, Merger<?>> MERGER_CACHE =
new ConcurrentHashMap<Class<?>, Merger<?>>();
static void loadMergers() {
// 獲取Merger接口的所有擴展名稱
Set<String> names = ExtensionLoader.getExtensionLoader(Merger.class)
.getSupportedExtensions();
// 遍歷所有Merger擴展實現(xiàn)
for (String name : names) {
Merger m = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(name);
// 將Merger實例與對應(yīng)returnType的映射關(guān)系記錄到MERGER_CACHE集合中
MERGER_CACHE.putIfAbsent(ReflectUtils.getGenericClass(m.getClass()), m);
}
}
}
ArrayMerger
在 Dubbo 中提供了處理不同類型返回值的 Merger 實現(xiàn)韩玩,其中不僅有處理 boolean[]、byte[]陆馁、char[]找颓、double[]、float[]叮贩、int[]击狮、long[]、short[] 等基礎(chǔ)類型數(shù)組的 Merger 實現(xiàn)益老,還有處理 List彪蓬、Set、Map 等集合類的 Merger 實現(xiàn)捺萌,具體繼承關(guān)系如下圖所示:
ArrayMerger
首先來看 ArrayMerger 實現(xiàn):當服務(wù)接口的返回值為數(shù)組的時候档冬,會使用 ArrayMerger 將多個數(shù)組合并成一個數(shù)組,也就是將二維數(shù)組拍平成一維數(shù)組。ArrayMerger.merge() 方法的具體實現(xiàn)如下:
public class ArrayMerger implements Merger<Object[]> {
public static final ArrayMerger INSTANCE = new ArrayMerger();
@Override
public Object[] merge(Object[]... items) {
if (ArrayUtils.isEmpty(items)) {
// 傳入的結(jié)果集合為空捣郊,則直接返回空數(shù)組
return new Object[0];
}
int i = 0;
// 查找第一個不為null的結(jié)果
while (i < items.length && items[i] == null) {
i++;
}
// 所有items數(shù)組中全部結(jié)果都為null,則直接返回空數(shù)組
if (i == items.length) {
return new Object[0];
}
Class<?> type = items[i].getClass().getComponentType();
int totalLen = 0;
for (; i < items.length; i++) {
if (items[i] == null) {
// 忽略為null的結(jié)果
continue;
}
Class<?> itemType = items[i].getClass().getComponentType();
if (itemType != type) {
// 保證類型相同慈参,類型不同直接拋異常
throw new IllegalArgumentException("Arguments' types are different");
}
totalLen += items[i].length;
}
if (totalLen == 0) {
// 確定最終數(shù)組的長度
return new Object[0];
}
Object result = Array.newInstance(type, totalLen);
int index = 0;
// 遍歷全部的結(jié)果數(shù)組呛牲,將items二維數(shù)組中的每個元素都加到result中,形成一維數(shù)組
for (Object[] array : items) {
if (array != null) {
for (int j = 0; j < array.length; j++) {
Array.set(result, index++, array[j]);
}
}
}
return (Object[]) result;
}
}
IntArrayMerger
其他基礎(chǔ)數(shù)據(jù)類型數(shù)組的 Merger 實現(xiàn)驮配,與 ArrayMerger 的實現(xiàn)非常類似娘扩,都是將相應(yīng)類型的二維數(shù)組拍平成同類型的一維數(shù)組,這里以 IntArrayMerger 為例進行分析:
public class IntArrayMerger implements Merger<int[]> {
@Override
public int[] merge(int[]... items) {
if (ArrayUtils.isEmpty(items)) {
// 檢測傳入的多個int[]不能為空
return new int[0];
}
// 直接使用Stream的API將多個int[]數(shù)組拍平成一個int[]數(shù)組
return Arrays.stream(items).filter(Objects::nonNull)
.flatMapToInt(Arrays::stream)
.toArray();
}
}
剩余的其他基礎(chǔ)類型的 Merger 實現(xiàn)類壮锻,例如琐旁,F(xiàn)loatArrayMerger、IntArrayMerger猜绣、LongArrayMerger灰殴、BooleanArrayMerger、ByteArrayMerger掰邢、CharArrayMerger牺陶、DoubleArrayMerger 等,這里就不再贅述辣之。
MapMerger
SetMerger掰伸、ListMerger 和 MapMerger 是針對 Set 、List 和 Map 返回值的 Merger 實現(xiàn)怀估,它們會將多個 Set(或 List狮鸭、Map)集合合并成一個 Set(或 List、Map)集合多搀,核心原理與 ArrayMerger 的實現(xiàn)類似歧蕉。這里先來看 MapMerger 的核心實現(xiàn):
public class MapMerger implements Merger<Map<?, ?>> {
@Override
public Map<?, ?> merge(Map<?, ?>... items) {
if (ArrayUtils.isEmpty(items)) {
// 空結(jié)果集時,這就返回空Map
return Collections.emptyMap();
}
// 將items中所有Map集合中的KV康铭,添加到result這一個Map集合中
Map<Object, Object> result = new HashMap<Object, Object>();
Stream.of(items).filter(Objects::nonNull).forEach(result::putAll);
return result;
}
}
接下來再看 SetMerger 和 ListMerger 的核心實現(xiàn):
public class SetMerger implements Merger<Set<?>> {
@Override
public Set<Object> merge(Set<?>... items) {
if (ArrayUtils.isEmpty(items)) {
// 空結(jié)果集時廊谓,這就返回空Set集合
return Collections.emptySet();
}
// 創(chuàng)建一個新的HashSet集合,傳入的所有Set集合都添加到result中
Set<Object> result = new HashSet<Object>();
Stream.of(items).filter(Objects::nonNull).forEach(result::addAll);
return result;
}
}
public class ListMerger implements Merger<List<?>> {
@Override
public List<Object> merge(List<?>... items) {
if (ArrayUtils.isEmpty(items)) {
// 空結(jié)果集時麻削,這就返回空List集合
return Collections.emptyList();
}
// 通過Stream API將傳入的所有List集合拍平成一個List集合并返回
return Stream.of(items).filter(Objects::nonNull)
.flatMap(Collection::stream)
.collect(Collectors.toList());
}
}
自定義 Merger 擴展實現(xiàn)
介紹完 Dubbo 自帶的 Merger 實現(xiàn)之后蒸痹,下面還可以嘗試動手寫一個自己的 Merger 實現(xiàn),這里以 dubbo-demo-xml 中的 Provider 和 Consumer 為例進行修改呛哟。
首先在 dubbo-demo-xml-provider 示例模塊中發(fā)布兩個服務(wù)叠荠,分別屬于 groupA 和 groupB,相應(yīng)的 dubbo-provider.xml 配置如下:
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
<dubbo:application metadata-type="remote" name="demo-provider"/>
<dubbo:metadata-report address="zookeeper://127.0.0.1:2181"/>
<dubbo:registry address="zookeeper://127.0.0.1:2181"/>
<dubbo:protocol name="dubbo"/>
<!-- 配置兩個Spring Bean -->
<bean id="demoService" class="org.apache.dubbo.demo.provider.DemoServiceImpl"/>
<bean id="demoServiceB" class="org.apache.dubbo.demo.provider.DemoServiceImpl"/>
<!-- 將demoService和demoServiceB兩個Spring Bean作為服務(wù)發(fā)布出去扫责,分別屬于groupA和groupB-->
<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService" group="groupA"/>
<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoServiceB" group="groupB"/>
</beans>
接下來榛鼎,在 dubbo-demo-xml-consumer 示例模塊中進行服務(wù)引用,dubbo-consumer.xml 配置文件的具體內(nèi)容如下:
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
<dubbo:application name="demo-consumer"/>
<dubbo:registry address="zookeeper://127.0.0.1:2181"/>
<!-- 引用DemoService,這里指定了group為*者娱,即可以引用任何group的Provider抡笼,同時merger設(shè)置為true,即需要對結(jié)果進行合并-->
<dubbo:reference id="demoService" check="false" interface="org.apache.dubbo.demo.DemoService" group="*" merger="true"/>
</beans>
然后黄鳍,在 dubbo-demo-xml-consumer 示例模塊的 /resources/META-INF/dubbo 目錄下推姻,添加一個名為 org.apache.dubbo.rpc.cluster.Merger 的 Dubbo SPI 配置文件,其內(nèi)容如下:
String=org.apache.dubbo.demo.consumer.StringMerger
StringMerger 實現(xiàn)了前面介紹的 Merger 接口框沟,它會將多個 Provider 節(jié)點返回的 String 結(jié)果值拼接起來藏古,具體實現(xiàn)如下:
public class StringMerger implements Merger<String> {
@Override
public String merge(String... items) {
if (ArrayUtils.isEmpty(items)) {
// 檢測空返回值
return "";
}
String result = "";
for (String item : items) {
// 通過豎線將多個Provider的返回值拼接起來
result += item + "|";
}
return result;
}
}
最后,依次啟動 Zookeeper忍燥、dubbo-demo-xml-provider 示例模塊和 dubbo-demo-xml-consumer 示例模塊拧晕。在控制臺中會看到如下輸出:
result: Hello world, response from provider: 172.17.108.179:20880|Hello world, response from provider: 172.17.108.179:20880|
總結(jié)
本文重點介紹了 MergeableCluster 中涉及的 Merger 合并器相關(guān)的知識點。
1梅垄、首先厂捞,介紹了 MergerFactory 工廠類的核心功能,它可以配合遠程方法調(diào)用的返回值队丝,選擇對應(yīng)的 Merger 實現(xiàn)蔫敲,完成結(jié)果的合并。
2炭玫、然后奈嘿,深入分析了 Dubbo 自帶的 Merger 實現(xiàn)類,涉及 Java 中各個基礎(chǔ)類型數(shù)組的 Merger 合并器實現(xiàn)吞加,例如裙犹,IntArrayMerger、LongArrayMerger 等衔憨,它們都是將多個特定類型的一維數(shù)組拍平成相同類型的一維數(shù)組叶圃。
3、除了這些基礎(chǔ)類型數(shù)組的 Merger 實現(xiàn)践图,Dubbo 還提供了 List掺冠、Set、Map 等集合類的 Merger 實現(xiàn)码党,它們的核心是將多個集合中的元素整理到一個同類型的集合中德崭。
4、最后揖盘,還以 StringMerger 為例眉厨,介紹了如何自定義 Merger 合并器。