一路翻、前言
前面講解了Dubbo的服務(wù)降級狈癞,本節(jié)我們來講解dubbo中的并發(fā)控制,并發(fā)控制分為客戶端并發(fā)控制和服務(wù)端并發(fā)控制茂契。
二蝶桶、并發(fā)控制
2.1 客戶端并發(fā)控制
在服務(wù)消費方法進行并發(fā)控制需要設(shè)置actives參數(shù),如下:
<dubbo:reference id="userService" interface="com.test.UserServiceBo"
group="dubbo" version="1.0.0" timeout="3000" actives="10"/>
設(shè)置com.test.UserServiceBo接口中所有方法掉冶,每個方法最多同時并發(fā)請求10個請求。
也可以使用下面方法設(shè)置接口中的單個方法的并發(fā)請求個數(shù)厌小,如下:
<dubbo:reference id="userService" interface="com.test.UserServiceBo"
group="dubbo" version="1.0.0" timeout="3000">
<dubbo:method name="sayHello" actives="10" />
</dubbo:reference>
如上設(shè)置sayHello方法的并發(fā)請求數(shù)量最大為10恢共,如果客戶端請求該方法并發(fā)超過了10則客戶端會被阻塞,等客戶端并發(fā)請求數(shù)量少于10的時候璧亚,該請求才會被發(fā)送到服務(wù)提供方服務(wù)器讨韭。在dubbo中客戶端并發(fā)控制是使用ActiveLimitFilter過濾器來控制的,代碼如下:
public class ActiveLimitFilter implements Filter {
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
//獲取設(shè)置的acvites的值癣蟋,默認(rèn)為0
int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
//獲取當(dāng)前方法目前并發(fā)請求數(shù)量
RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
if (max > 0) {//說明設(shè)置了actives變量
long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
long start = System.currentTimeMillis();
long remain = timeout;
int active = count.getActive();
//如果該方法并發(fā)請求數(shù)量大于設(shè)置值透硝,則掛起當(dāng)前線程。
if (active >= max) {
synchronized (count) {
while ((active = count.getActive()) >= max) {
try {
count.wait(remain);
} catch (InterruptedException e) {
}
//如果等待時間超時疯搅,則拋出異常
long elapsed = System.currentTimeMillis() - start;
remain = timeout - elapsed;
if (remain <= 0) {
throw new RpcException("Waiting concurrent invoke timeout in client-side for service: "
+ invoker.getInterface().getName() + ", method: "
+ invocation.getMethodName() + ", elapsed: " + elapsed
+ ", timeout: " + timeout + ". concurrent invokes: " + active
+ ". max concurrent invoke limit: " + max);
}
}
}
}
}
//沒有限流時候濒生,正常調(diào)用
try {
long begin = System.currentTimeMillis();
RpcStatus.beginCount(url, methodName);
try {
Result result = invoker.invoke(invocation);
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);
return result;
} catch (RuntimeException t) {
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);
throw t;
}
} finally {
if (max > 0) {
synchronized (count) {
count.notify();
}
}
}
}
}
可知客戶端并發(fā)控制,是如果當(dāng)并發(fā)量達到指定值后幔欧,當(dāng)前客戶端請求線程會被掛起罪治,如果在等待超時期間并發(fā)請求量少了丽声,那么阻塞的線程會被激活,然后發(fā)送請求到服務(wù)提供方规阀,如果等待超時了恒序,則直接拋出異常,這時候服務(wù)根本都沒有發(fā)送到服務(wù)提供方服務(wù)器谁撼。
2.2 服務(wù)端并發(fā)控制
在服務(wù)提供方進行并發(fā)控制需要設(shè)置executes參數(shù)歧胁,如下:
<dubbo:service interface="com.test.UserServiceBo" ref="userService"
group="dubbo" version="1.0.0" timeout="3000" executes="10"/>
設(shè)置com.test.UserServiceBo接口中所有方法,每個方法最多同時并發(fā)處理10個請求厉碟,這里并發(fā)是指同時在處理10個請求喊巍。
也可以使用下面方法設(shè)置接口中的單個方法的并發(fā)處理個數(shù),如下:
<dubbo:service interface="com.test.UserServiceBo" ref="userService"
group="dubbo" version="1.0.0" timeout="3000" >
<dubbo:method name="sayHello" executes="10" />
</dubbo:service>
如上設(shè)置sayHello方法的并發(fā)處理數(shù)量為10.
需要注意的是箍鼓,服務(wù)提供方設(shè)置并發(fā)數(shù)量后崭参,如果同時請求數(shù)量大于了設(shè)置的executes的值,則會拋出異常款咖,而不是像消費端設(shè)置actives時候何暮,會等待。服務(wù)提供方并發(fā)控制是使用ExecuteLimitFilter過濾器實現(xiàn)的铐殃,ExecuteLimitFilter代碼如下:
public class ExecuteLimitFilter implements Filter {
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
//默認(rèn)不設(shè)置executes時候海洼,其值為0
int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);
if (max > 0) {//max>0說明設(shè)置了executes值
RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());
//可知如果并發(fā)處理數(shù)量大于設(shè)置的值,會拋出異常
executesLimit = count.getSemaphore(max);
if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) {
throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");
}
}
...
try {//沒有限流時候富腊,激活filter鏈
Result result = invoker.invoke(invocation);
return result;
} catch (Throwable t) {
...
} finally {
...
}
}
}
所以當(dāng)使用executes參數(shù)時候要注意坏逢,當(dāng)并發(fā)量過大時候,多余的請求會失敗赘被。
三是整、總結(jié)
本節(jié)我們講解了dubbo中客戶端并發(fā)控制和服務(wù)端并發(fā)控制。另外另外想系統(tǒng)學(xué)dubbo的單擊我 ,想學(xué)并發(fā)的童鞋可以 單擊我