一坑鱼、前言
Dubbo作為高性能RPC框架膘流,已經(jīng)進(jìn)入Apache卵化器項(xiàng)目,雖然官方給出了dubbo使用的用戶手冊(cè)鲁沥,但是大多是一概而過呼股,使用dubbo時(shí)候要盡量了解源碼,不然會(huì)很容易入坑画恰。
二 彭谁、服務(wù)消費(fèi)端ReferenceConfig需要自行緩存
ReferenceConfig實(shí)例是個(gè)很重的實(shí)例,每個(gè)ReferenceConfig實(shí)例里面都維護(hù)了與服務(wù)注冊(cè)中心的一個(gè)長(zhǎng)鏈允扇,并且維護(hù)了與所有服務(wù)提供者的的長(zhǎng)鏈缠局。假設(shè)有一個(gè)服務(wù)注冊(cè)中心和N個(gè)服務(wù)提供者,那么每個(gè)ReferenceConfig實(shí)例里面維護(hù)了N+1個(gè)長(zhǎng)鏈考润,如果頻繁的生成ReferenceConfig實(shí)例狭园,可能會(huì)造成性能問題,甚至產(chǎn)生內(nèi)存或者連接泄露的風(fēng)險(xiǎn)糊治。特別是使用dubbo api編程時(shí)候容易忽略這個(gè)問題唱矛。
為了解決這個(gè)問題,之前都是自行緩存井辜,但是自從dubbo2.4.0版本后揖赴,dubbo 提供了簡(jiǎn)單的工具類 ReferenceConfigCache 用于緩存ReferenceConfig 實(shí)例。使用如下:
//創(chuàng)建服務(wù)消費(fèi)實(shí)例
ReferenceConfig<XxxService> reference = new ReferenceConfig<XxxService>();
reference.setInterface(XxxService.class);
reference.setVersion("1.0.0");
......
//獲取dubbo提供的緩存
ReferenceConfigCache cache = ReferenceConfigCache.getCache();
// cache.get方法中會(huì)緩存 reference對(duì)象抑胎,并且調(diào)用reference.get方法啟動(dòng)ReferenceConfig,并返回經(jīng)過代理后的服務(wù)接口的對(duì)象
XxxService xxxService = cache.get(reference);
// 使用xxxService對(duì)象
xxxService.sayHello();
需要注意的是 Cache內(nèi)持有ReferenceConfig對(duì)象的引用渐北,不要在外部再調(diào)用ReferenceConfig的destroy方法了阿逃,這會(huì)導(dǎo)致Cache內(nèi)的ReferenceConfig失效!
如果要銷毀 Cache 中的 ReferenceConfig ,將銷毀 ReferenceConfig 并釋放對(duì)應(yīng)的資源恃锉,具體使用下面方法來銷毀:
ReferenceConfigCache cache = ReferenceConfigCache.getCache()搀菩;
cache.destroy(reference);
另外以服務(wù) Group、接口破托、版本為緩存的 Key肪跋,ReferenceConfig實(shí)例為對(duì)應(yīng)的value。如果你需要使用自定義的key土砂,可以在創(chuàng)建cache時(shí)候調(diào)用ReferenceConfigCache cache = ReferenceConfigCache.getCache(keyGenerator );方法傳遞自定義的keyGenerator州既。
三、 并發(fā)控制
3.1 服務(wù)消費(fèi)方并發(fā)控制
在服務(wù)消費(fèi)方法進(jìn)行并發(fā)控制需要設(shè)置actives參數(shù)萝映,如下:
<dubbo:reference id="userService" interface="com.test.UserServiceBo"
group="dubbo" version="1.0.0" timeout="3000" actives="10"/>
設(shè)置com.test.UserServiceBo接口中所有方法吴叶,每個(gè)方法最多同時(shí)并發(fā)請(qǐng)求10個(gè)請(qǐng)求。
也可以使用下面方法設(shè)置接口中的單個(gè)方法的并發(fā)請(qǐng)求個(gè)數(shù)序臂,如下:
<dubbo:reference id="userService" interface="com.test.UserServiceBo"
group="dubbo" version="1.0.0" timeout="3000">
<dubbo:method name="sayHello" actives="10" />
</dubbo:reference>
如上設(shè)置sayHello方法的并發(fā)請(qǐng)求數(shù)量最大為10蚌卤,如果客戶端請(qǐng)求該方法并發(fā)超過了10則客戶端會(huì)被阻塞,等客戶端并發(fā)請(qǐng)求數(shù)量少于10的時(shí)候奥秆,該請(qǐng)求才會(huì)被發(fā)送到服務(wù)提供方服務(wù)器逊彭。在dubbo中客戶端并發(fā)控制是使用ActiveLimitFilter過濾器來控制的,代碼如下:
public class ActiveLimitFilter implements Filter {
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
//獲取設(shè)置的acvites的值构订,默認(rèn)為0
int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
//獲取當(dāng)前方法目前并發(fā)請(qǐng)求數(shù)量
RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
if (max > 0) {//說明設(shè)置了actives變量
long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
long start = System.currentTimeMillis();
long remain = timeout;
int active = count.getActive();
//如果該方法并發(fā)請(qǐng)求數(shù)量大于設(shè)置值侮叮,則掛起當(dāng)前線程。
if (active >= max) {
synchronized (count) {
while ((active = count.getActive()) >= max) {
try {
count.wait(remain);
} catch (InterruptedException e) {
}
//如果等待時(shí)間超時(shí)鲫咽,則拋出異常
long elapsed = System.currentTimeMillis() - start;
remain = timeout - elapsed;
if (remain <= 0) {
throw new RpcException("Waiting concurrent invoke timeout in client-side for service: "
+ invoker.getInterface().getName() + ", method: "
+ invocation.getMethodName() + ", elapsed: " + elapsed
+ ", timeout: " + timeout + ". concurrent invokes: " + active
+ ". max concurrent invoke limit: " + max);
}
}
}
}
}
//沒有限流時(shí)候签赃,正常調(diào)用
try {
long begin = System.currentTimeMillis();
RpcStatus.beginCount(url, methodName);
try {
Result result = invoker.invoke(invocation);
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);
return result;
} catch (RuntimeException t) {
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);
throw t;
}
} finally {
if (max > 0) {
synchronized (count) {
count.notify();
}
}
}
}
}
可知客戶端并發(fā)控制,是如果當(dāng)并發(fā)量達(dá)到指定值后分尸,當(dāng)前客戶端請(qǐng)求線程會(huì)被掛起锦聊,如果在等待超時(shí)期間并發(fā)請(qǐng)求量少了,那么阻塞的線程會(huì)被激活箩绍,然后發(fā)送請(qǐng)求到服務(wù)提供方孔庭,如果等待超時(shí)了,則直接拋出異常材蛛,這時(shí)候服務(wù)根本都沒有發(fā)送到服務(wù)提供方服務(wù)器圆到。
四、 改進(jìn)的廣播策略
前面我們講解集群容錯(cuò)時(shí)候談到有一個(gè)廣播策略卑吭,該策略主要用于對(duì)所有服務(wù)提供者進(jìn)行廣播消息芽淡,那么有個(gè)問題需要思考,廣播是是說你在客戶端調(diào)用接口一次豆赏,內(nèi)部就是輪詢調(diào)用所有服務(wù)提供者的機(jī)器的服務(wù)挣菲,那么你調(diào)用一次該接口富稻,返回值是什么那?比如內(nèi)部輪詢了10臺(tái)機(jī)器白胀,每個(gè)機(jī)器應(yīng)該都有一個(gè)返回值椭赋,那么你調(diào)用的這一次返回值是10個(gè)返回值的組成?其實(shí)不是或杠,返回的是輪詢調(diào)用的最后一個(gè)機(jī)器結(jié)果哪怔,那么如果我們想把所有的機(jī)器返回的結(jié)果聚合起來如何做的? 使用dubbo中更多需要注意的事情 單擊我查看文章 向抢, 單擊我觀看視頻即可知曉认境。