0 前言
對(duì)于Java WEB應(yīng)用來(lái)說(shuō),Spring的Filter可以攔截WEB接口調(diào)用严拒,但對(duì)于Dubbo接口,Spring的Filter就不起作用了。
Dubbo中的Filter實(shí)現(xiàn)是 專(zhuān)門(mén)為服務(wù)提供方和服務(wù)消費(fèi)方調(diào)用過(guò)程進(jìn)行攔截募疮,Dubbo本身的大多功能均基于此擴(kuò)展點(diǎn)實(shí)現(xiàn),每次遠(yuǎn)程方法執(zhí)行僻弹,該攔截都會(huì)被執(zhí)行阿浓,但請(qǐng)注意其對(duì)性能的影響。
所以蹋绽,在實(shí)際業(yè)務(wù)開(kāi)發(fā)中芭毙,使用最多的可能就是對(duì)Filter接口進(jìn)行擴(kuò)展,在服務(wù)調(diào)用鏈路中嵌入我們自身的處理邏輯卸耘,如日志打印退敦、調(diào)用耗時(shí)統(tǒng)計(jì)等。
Dubbo官方針對(duì)Filter做了很多的原生支持蚣抗,目前大致有20來(lái)個(gè)吧侈百,包括我們熟知的RpcContext,accesslog功能都是通過(guò)filter來(lái)實(shí)現(xiàn)了,下面一起詳細(xì)看一下Filter的實(shí)現(xiàn)钝域。
1 構(gòu)造Filter鏈
Dubbo的Filter實(shí)現(xiàn)入口是 在ProtocolFilterWrapper讽坏,因?yàn)镻rotocolFilterWrapper是Protocol的包裝類(lèi),所以會(huì)在加載的Extension的時(shí)候被自動(dòng)包裝進(jìn)來(lái)(理解這里的前提是 理解Dubbo的SPI機(jī)制 )例证,該封裝器實(shí)現(xiàn)了Protocol接口路呜,并提供了一個(gè)參數(shù)類(lèi)型為Protocol的構(gòu)造方法。Dubbo依據(jù)這個(gè)構(gòu)造方法識(shí)別出封裝器织咧,并將該封裝器作為其他Protocol接口實(shí)現(xiàn)的代理胀葱。
接下來(lái),我們看一下ProtocolFilterWrapper中是如何構(gòu)造Filter鏈:
public class ProtocolFilterWrapper implements Protocol {
private final Protocol protocol;
// 帶參數(shù)構(gòu)造器烦感,ExtensionLoad通過(guò)該構(gòu)造器識(shí)別封裝器
public ProtocolFilterWrapper(Protocol protocol){
if (protocol == null) {
throw new IllegalArgumentException("protocol == null");
}
this.protocol = protocol;
}
public int getDefaultPort() {
return protocol.getDefaultPort();
}
// 對(duì)提供方服務(wù)暴露進(jìn)行封裝巡社,組裝filter調(diào)用鏈
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// 向注冊(cè)中心發(fā)布服務(wù)的時(shí)候并不會(huì)進(jìn)行filter調(diào)用鏈
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
// 對(duì)消費(fèi)方服務(wù)引用進(jìn)行封裝,組裝filter調(diào)用鏈
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 向注冊(cè)中心引用服務(wù)的時(shí)候并不會(huì)進(jìn)行filter調(diào)用鏈
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}
public void destroy() {
protocol.destroy();
}
// 構(gòu)造filter調(diào)用鏈
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
// 獲得所有激活的Filter(已經(jīng)排好序的)
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (filters.size() > 0) {
for (int i = filters.size() - 1; i >= 0; i --) {
final Filter filter = filters.get(i);
// 復(fù)制引用手趣,構(gòu)建filter調(diào)用鏈
final Invoker<T> next = last;
// 這里只是構(gòu)造一個(gè)最簡(jiǎn)化的Invoker作為調(diào)用鏈的載體Invoker
last = new Invoker<T>() {
public Class<T> getInterface() {
return invoker.getInterface();
}
public URL getUrl() {
return invoker.getUrl();
}
public boolean isAvailable() {
return invoker.isAvailable();
}
// 關(guān)鍵代碼晌该,單向鏈表指針傳遞
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
}
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
}
這里的關(guān)鍵代碼在buildInvokerChain方法,參數(shù)invoker為實(shí)際的服務(wù)( 對(duì)于消費(fèi)方而言绿渣,就是服務(wù)的動(dòng)態(tài)代理 )朝群。從ExtensionLoader獲取到已經(jīng)過(guò)排序的Filter列表(排序規(guī)則可參見(jiàn)ActivateComparator),然后開(kāi)始倒序組裝中符。
這里是個(gè)典型的裝飾器模式姜胖,不過(guò)裝飾器鏈條上的每個(gè)節(jié)點(diǎn)都是一個(gè)匿名內(nèi)部類(lèi)Invoker實(shí)例。
- 每個(gè)節(jié)點(diǎn)invoker持有一個(gè)Filter引用淀散,一個(gè)下級(jí)invoker節(jié)點(diǎn)引用以及實(shí)際調(diào)用的invoker實(shí)例(雖然持有但并不實(shí)際調(diào)用右莱,僅僅是提供獲取實(shí)際invoker相關(guān)參數(shù)的功能,如getInterface档插,getUrl等方法)慢蜓;
- 通過(guò)invoke方法,invoker節(jié)點(diǎn)將下級(jí)節(jié)點(diǎn)傳遞給當(dāng)前的filter進(jìn)行調(diào)用郭膛;
- filter在執(zhí)行invoke方法時(shí)晨抡,就會(huì)觸發(fā)下級(jí)節(jié)點(diǎn)invoker調(diào)用其invoke方法,實(shí)現(xiàn)調(diào)用的向下傳遞则剃;
- 當(dāng)?shù)竭_(dá)最后一級(jí)invoker節(jié)點(diǎn)耘柱,即實(shí)際服務(wù)invoker,即可執(zhí)行真實(shí)業(yè)務(wù)邏輯棍现;
這條調(diào)用鏈的每個(gè)節(jié)點(diǎn)都為真實(shí)的invoker增加了自定義的功能调煎,在整個(gè)鏈條上不斷豐富功能,是典型的裝飾器模式轴咱。
看到上面的內(nèi)容汛蝙,我們大致能明白實(shí)現(xiàn)是這樣子的烈涮,通過(guò)獲取所有可以被激活的Filter鏈,然后根據(jù)一定順序構(gòu)造出一個(gè)Filter的調(diào)用鏈窖剑,最后的調(diào)用鏈大致是這樣子:Filter1->Filter2->Filter3->......->Invoker坚洽,這個(gè)構(gòu)造Filter鏈的邏輯非常簡(jiǎn)單,重點(diǎn)就在于如何獲取被激活的Filter鏈西土。
// 將key在url中對(duì)應(yīng)的配置值切換成字符串信息數(shù)組
public List<T> getActivateExtension(URL url, String key, String group) {
String value = url.getParameter(key);
return getActivateExtension(url, value == null || value.length() == 0 ? null : Constants.COMMA_SPLIT_PATTERN.split(value), group);
}
public List<T> getActivateExtension(URL url, String[] values, String group) {
List<T> exts = new ArrayList<T>();
// 所有用戶自己配置的filter信息(有些Filter是默認(rèn)激活的讶舰,有些是配置激活的,這里這里的names就指的配置激活的filter信息)
List<String> names = values == null ? new ArrayList<String>(0) : Arrays.asList(values);
// 如果這些名稱(chēng)里不包括去除default的標(biāo)志(-default)需了,換言之就是加載Dubbo提供的默認(rèn)Filter
if (! names.contains(Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY)) {
// 加載extension信息
getExtensionClasses();
for (Map.Entry<String, Activate> entry : cachedActivates.entrySet()) {
// name指的是SPI讀取的配置文件的key
String name = entry.getKey();
Activate activate = entry.getValue();
// group主要是區(qū)分是在provider端生效還是consumer端生效
if (isMatchGroup(group, activate.group())) {
T ext = getExtension(name);
// 這里以Filter為例:三個(gè)判斷條件的含義依次是:
// 1. 用戶配置的filter列表中不包含當(dāng)前ext
// 2. 用戶配置的filter列表中不包含當(dāng)前ext的加-的key
// 3. 如果用戶的配置信息(url中體現(xiàn))中有可以激活的配置key并且數(shù)據(jù)不為0,false,null跳昼,N/A,也就是說(shuō)有正常的使用
if (! names.contains(name)
&& ! names.contains(Constants.REMOVE_VALUE_PREFIX + name)
&& isActive(activate, url)) {
exts.add(ext);
}
}
}
// 根據(jù)Activate注解上的order排序
Collections.sort(exts, ActivateComparator.COMPARATOR);
}
// 進(jìn)行到此步驟的時(shí)候Dubbo提供的原生的Filter已經(jīng)被添加完畢了肋乍,下面處理用戶自己擴(kuò)展的Filter
List<T> usrs = new ArrayList<T>();
for (int i = 0; i < names.size(); i ++) {
String name = names.get(i);
// 如果單個(gè)name不是以-開(kāi)頭并且所有的key里面并不包含-'name'(也就是說(shuō)如果配置成了"dubbo,-dubbo"這種的可以鹅颊,這個(gè)if是進(jìn)不去的)
if (! name.startsWith(Constants.REMOVE_VALUE_PREFIX)
&& ! names.contains(Constants.REMOVE_VALUE_PREFIX + name)) {
// 可以通過(guò)default關(guān)鍵字替換Dubbo原生的Filter鏈,主要用來(lái)控制調(diào)用鏈順序
if (Constants.DEFAULT_KEY.equals(name)) {
if (usrs.size() > 0) {
exts.addAll(0, usrs);
usrs.clear();
}
} else {
// 加入用戶自己定義的擴(kuò)展Filter
T ext = getExtension(name);
usrs.add(ext);
}
}
}
if (usrs.size() > 0) {
exts.addAll(usrs);
}
return exts;
}
基本上到這里就能看到Filter鏈?zhǔn)侨绾伪患虞d進(jìn)來(lái)的墓造,這里設(shè)計(jì)的非常靈活堪伍,忍不住要感嘆一下:通過(guò)簡(jiǎn)單的配置‘-’可以手動(dòng)剔除Dubbo原生的一定加載Filter,通過(guò)default來(lái)代替Dubbo原生的一定會(huì)加載的Filter從而來(lái)控制順序觅闽。這些設(shè)計(jì)雖然都是很小的功能點(diǎn)帝雇,但是總體的感覺(jué)是十分靈活,考慮的比較周到蛉拙。
所以尸闸,從上面源碼分析得知:
默認(rèn)filter鏈,先執(zhí)行原生filter孕锄,再依次執(zhí)行自定義filter吮廉,繼而回溯到原點(diǎn)。
知道了Filter構(gòu)造的過(guò)程之后畸肆,我們就詳細(xì)看幾個(gè)比較重要的Filter信息茧痕。首先,看一下com.alibaba.dubbo.rpc.Filter接口的源碼恼除,如下:
@SPI
public interface Filter {
Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException;
}
Dubbo原生的filter定義在META-INF/dubbo/internal/com.alibaba.dubbo.rpc.filter文件中,具體如下:
echo=com.alibaba.dubbo.rpc.filter.EchoFilter
generic=com.alibaba.dubbo.rpc.filter.GenericFilter
genericimpl=com.alibaba.dubbo.rpc.filter.GenericImplFilter
token=com.alibaba.dubbo.rpc.filter.TokenFilter
accesslog=com.alibaba.dubbo.rpc.filter.AccessLogFilter
activelimit=com.alibaba.dubbo.rpc.filter.ActiveLimitFilter
classloader=com.alibaba.dubbo.rpc.filter.ClassLoaderFilter
context=com.alibaba.dubbo.rpc.filter.ContextFilter
consumercontext=com.alibaba.dubbo.rpc.filter.ConsumerContextFilter
exception=com.alibaba.dubbo.rpc.filter.ExceptionFilter
executelimit=com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter
deprecated=com.alibaba.dubbo.rpc.filter.DeprecatedFilter
compatible=com.alibaba.dubbo.rpc.filter.CompatibleFilter
timeout=com.alibaba.dubbo.rpc.filter.TimeoutFilter
monitor=com.alibaba.dubbo.monitor.support.MonitorFilter
validation=com.alibaba.dubbo.validation.filter.ValidationFilter
cache=com.alibaba.dubbo.cache.filter.CacheFilter
trace=com.alibaba.dubbo.rpc.protocol.dubbo.filter.TraceFilter
future=com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter
Dubbo自帶超時(shí)過(guò)濾器TimeoutFilter實(shí)現(xiàn)如下:
@Activate(group = Constants.PROVIDER)
public class TimeoutFilter implements Filter {
private static final Logger logger = LoggerFactory.getLogger(TimeoutFilter.class);
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
long start = System.currentTimeMillis();
Result result = invoker.invoke(invocation);
long elapsed = System.currentTimeMillis() - start;
if (invoker.getUrl() != null
&& elapsed > invoker.getUrl().getMethodParameter(invocation.getMethodName(),
"timeout", Integer.MAX_VALUE)) {
if (logger.isWarnEnabled()) {
logger.warn("invoke time out. method: " + invocation.getMethodName()
+ "arguments: " + Arrays.toString(invocation.getArguments()) + " , url is "
+ invoker.getUrl() + ", invoke elapsed " + elapsed + " ms.");
}
}
return result;
}
}
-
注解@Activate是否是Dubbo Filter必須的曼氛,其上的group和order分別扮演什么樣的角色?
對(duì)于Dubbo原生自帶的filter豁辉,注解@Activate是必須,其group用于provider/consumer的站隊(duì)舀患,而order值是filter順序的依據(jù)徽级。但是對(duì)于自定義filter而言,注解@Activate沒(méi)被用到聊浅,其分組和順序餐抢,完全由用戶手工配置指定现使。如果自定義filter添加了@Activate注解,并指定了group了旷痕,則這些自定義filter將升級(jí)為原生filter組碳锈。
-
Filter的順序是否可以調(diào)整, 如何實(shí)現(xiàn)?
可以調(diào)整,通過(guò)'-'符號(hào)可以去除某些filter欺抗,而default代表默認(rèn)激活的原生filter子鏈售碳,通過(guò)重排default和自定義filter的順序,達(dá)到實(shí)現(xiàn)順序控制的目的绞呈。
讓我們來(lái)構(gòu)建幾個(gè)case贸人,來(lái)看看如何配置能滿足。假定自定義filter的對(duì)象為filter1佃声,filter2:
case 1: 其執(zhí)行順序?yàn)? 原生filter子鏈->filter1->filter2
<dubbo:reference filter="filter1,filter2"/>
case 2: 其執(zhí)行順序?yàn)? filter1->filter2->原生filter子鏈
<dubbo:reference filter="filter1,filter2,default"/>
case 3: 其執(zhí)行順序?yàn)? filter1->原生filter子鏈->filter2, 同時(shí)去掉原生的TokenFilter(token)
<dubbo:service filter="filter1,default,filter2,-token"/>
Filter在作用端區(qū)分的話主要是區(qū)分為consumer和provider艺智,下面我們就以這個(gè)為區(qū)分來(lái)分別介紹一些重點(diǎn)的Filter。
2 Consumer
2.1 ConsumerContextFilter
package com.alibaba.dubbo.rpc.filter;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.common.utils.NetUtils;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcInvocation;
/**
* ConsumerContextInvokerFilter(默認(rèn)觸發(fā))
*/
@Activate(group = Constants.CONSUMER, order = -10000)
public class ConsumerContextFilter implements Filter {
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
// 在當(dāng)前的RpcContext中記錄本地調(diào)用的一次狀態(tài)信息
RpcContext.getContext()
.setInvoker(invoker)
.setInvocation(invocation)
.setLocalAddress(NetUtils.getLocalHost(), 0)
.setRemoteAddress(invoker.getUrl().getHost(),
invoker.getUrl().getPort());
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(invoker);
}
try {
return invoker.invoke(invocation);
} finally {
RpcContext.getContext().clearAttachments();
}
}
}
其實(shí)簡(jiǎn)單來(lái)看這個(gè)Filter的話是十分簡(jiǎn)單圾亏,它又是怎么將客戶端設(shè)置的隱式參數(shù)傳遞給服務(wù)端呢十拣?
載體就是Invocation對(duì)象,在客戶端調(diào)用Invoker.invoke方法時(shí)候召嘶,會(huì)去取當(dāng)前狀態(tài)記錄器RpcContext中的attachments屬性父晶,然后設(shè)置到RpcInvocation對(duì)象中,在RpcInvocation傳遞到provider的時(shí)候會(huì)通過(guò)另外一個(gè)過(guò)濾器ContextFilter將RpcInvocation對(duì)象重新設(shè)置回RpcContext中供服務(wù)端邏輯重新獲取隱式參數(shù)弄跌。
這就是為什么RpcContext只能記錄一次請(qǐng)求的狀態(tài)信息甲喝,因?yàn)樵诘诙握{(diào)用的時(shí)候參數(shù)已經(jīng)被新的RpcInvocation覆蓋掉,第一次的請(qǐng)求信息對(duì)于第二次執(zhí)行是不可見(jiàn)的铛只。
2.2 ActiveLimitFilter
package com.alibaba.dubbo.rpc.filter;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcStatus;
/**
* LimitInvokerFilter(當(dāng)配置了actives并且值不為0的時(shí)候觸發(fā))
*/
@Activate(group = Constants.CONSUMER, value = Constants.ACTIVES_KEY)
public class ActiveLimitFilter implements Filter {
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
// 主要記錄每臺(tái)機(jī)器針對(duì)某個(gè)方法的并發(fā)數(shù)量
RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
if (max > 0) {
long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
long start = System.currentTimeMillis();
long remain = timeout;
int active = count.getActive();
if (active >= max) {
synchronized (count) {
// 這個(gè)while循環(huán)是必要的埠胖,因?yàn)樵谝淮蝫ait結(jié)束后,可能線程調(diào)用已經(jīng)結(jié)束了淳玩,騰出來(lái)consumer的空間
while ((active = count.getActive()) >= max) {
try {
count.wait(remain);
} catch (InterruptedException e) {
}
// 如果wait方法被中斷的話直撤,remain這時(shí)候有可能大于0
// 如果其中一個(gè)線程運(yùn)行結(jié)束后自動(dòng)調(diào)用notify方法的話,也有可能remain大于0
long elapsed = System.currentTimeMillis() - start;
remain = timeout - elapsed;
if (remain <= 0) {
throw new RpcException("Waiting concurrent invoke timeout in client-side for service: "
+ invoker.getInterface().getName() + ", method: "
+ invocation.getMethodName() + ", elapsed: " + elapsed
+ ", timeout: " + timeout + ". concurrent invokes: " + active
+ ". max concurrent invoke limit: " + max);
}
}
}
}
}
try {
// 調(diào)用開(kāi)始和結(jié)束后增減并發(fā)數(shù)量
long begin = System.currentTimeMillis();
RpcStatus.beginCount(url, methodName);
try {
Result result = invoker.invoke(invocation);
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);
return result;
} catch (RuntimeException t) {
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);
throw t;
}
} finally {
if (max > 0) {
// 這里很關(guān)鍵蜕着,因?yàn)橐粋€(gè)調(diào)用完成后要通知正在等待執(zhí)行的隊(duì)列
synchronized (count) {
count.notify();
}
}
}
}
}
ActiveLimitFilter主要用于 限制同一個(gè)客戶端對(duì)于一個(gè)服務(wù)端方法的并發(fā)調(diào)用量(客戶端限流)谋竖。
2.3 FutureFilter
Future主要是處理事件信息,主要有以下幾個(gè)事件:
- oninvoke 在方法調(diào)用前觸發(fā)(如果調(diào)用出現(xiàn)異常則會(huì)直接觸發(fā)onthrow方法)
- onreturn 在方法返回會(huì)觸發(fā)(如果調(diào)用出現(xiàn)異常則會(huì)直接觸發(fā)onthrow方法)
- onthrow 調(diào)用出現(xiàn)異常時(shí)候觸發(fā)
package com.alibaba.dubbo.rpc.protocol.dubbo.filter;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.remoting.exchange.ResponseCallback;
import com.alibaba.dubbo.remoting.exchange.ResponseFuture;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.StaticContext;
import com.alibaba.dubbo.rpc.protocol.dubbo.FutureAdapter;
import com.alibaba.dubbo.rpc.support.RpcUtils;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.Future;
/**
* EventFilter
*/
@Activate(group = Constants.CONSUMER)
public class FutureFilter implements Filter {
protected static final Logger logger = LoggerFactory.getLogger(FutureFilter.class);
public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);
// 這里主要處理回調(diào)邏輯承匣,主要區(qū)分三個(gè)時(shí)間:oninvoke:調(diào)用前觸發(fā)蓖乘,onreturn:調(diào)用后觸發(fā) onthrow:出現(xiàn)異常情況時(shí)候觸發(fā)
fireInvokeCallback(invoker, invocation);
// 需要在調(diào)用前配置好是否有返回值,已供invoker判斷是否需要返回future.
Result result = invoker.invoke(invocation);
if (isAsync) {
asyncCallback(invoker, invocation);
} else {
syncCallback(invoker, invocation, result);
}
return result;
}
private void syncCallback(final Invoker<?> invoker, final Invocation invocation, final Result result) {
if (result.hasException()) {
fireThrowCallback(invoker, invocation, result.getException());
} else {
fireReturnCallback(invoker, invocation, result.getValue());
}
}
/**
* 同步異步的主要處理區(qū)別:
* 1. 同步調(diào)用的話韧骗,事件觸發(fā)是直接調(diào)用的嘉抒,沒(méi)有任何邏輯;
* 2. 異步的話就是首先獲取到調(diào)用產(chǎn)生的Future對(duì)象袍暴,然后復(fù)寫(xiě)Future的done()方法些侍,
* 將fireThrowCallback和fireReturnCallback邏輯引入即可隶症。
*/
private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {
Future<?> f = RpcContext.getContext().getFuture();
if (f instanceof FutureAdapter) {
ResponseFuture future = ((FutureAdapter<?>) f).getFuture();
future.setCallback(new ResponseCallback() {
public void done(Object rpcResult) {
if (rpcResult == null) {
logger.error(new IllegalStateException("invalid result value : null, expected " + Result.class.getName()));
return;
}
///must be rpcResult
if (!(rpcResult instanceof Result)) {
logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected " + Result.class.getName()));
return;
}
Result result = (Result) rpcResult;
if (result.hasException()) {
fireThrowCallback(invoker, invocation, result.getException());
} else {
fireReturnCallback(invoker, invocation, result.getValue());
}
}
public void caught(Throwable exception) {
fireThrowCallback(invoker, invocation, exception);
}
});
}
}
private void fireInvokeCallback(final Invoker<?> invoker, final Invocation invocation) {
final Method onInvokeMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_METHOD_KEY));
final Object onInvokeInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_INSTANCE_KEY));
if (onInvokeMethod == null && onInvokeInst == null) {
return;
}
if (onInvokeMethod == null || onInvokeInst == null) {
throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onInvokeMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
}
// 由于JDK的安全檢查耗時(shí)較多.所以通過(guò)setAccessible(true)的方式關(guān)閉安全檢查就可以達(dá)到提升反射速度的目的
if (!onInvokeMethod.isAccessible()) {
onInvokeMethod.setAccessible(true);
}
// 從下面代碼可以看出oninvoke的方法參數(shù)要與調(diào)用的方法參數(shù)一致
Object[] params = invocation.getArguments();
try {
onInvokeMethod.invoke(onInvokeInst, params);
} catch (InvocationTargetException e) {
fireThrowCallback(invoker, invocation, e.getTargetException());
} catch (Throwable e) {
fireThrowCallback(invoker, invocation, e);
}
}
// fireReturnCallback的邏輯與fireThrowCallback基本一樣,所以不用看了
private void fireReturnCallback(final Invoker<?> invoker, final Invocation invocation, final Object result) {
final Method onReturnMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_METHOD_KEY));
final Object onReturnInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_INSTANCE_KEY));
//not set onreturn callback
if (onReturnMethod == null && onReturnInst == null) {
return;
}
if (onReturnMethod == null || onReturnInst == null) {
throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onReturnMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
}
if (!onReturnMethod.isAccessible()) {
onReturnMethod.setAccessible(true);
}
Object[] args = invocation.getArguments();
Object[] params;
Class<?>[] rParaTypes = onReturnMethod.getParameterTypes();
if (rParaTypes.length > 1) {
if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {
params = new Object[2];
params[0] = result;
params[1] = args;
} else {
params = new Object[args.length + 1];
params[0] = result;
System.arraycopy(args, 0, params, 1, args.length);
}
} else {
params = new Object[]{result};
}
try {
onReturnMethod.invoke(onReturnInst, params);
} catch (InvocationTargetException e) {
fireThrowCallback(invoker, invocation, e.getTargetException());
} catch (Throwable e) {
fireThrowCallback(invoker, invocation, e);
}
}
// fireReturnCallback的邏輯與fireThrowCallback基本一樣岗宣,所以不用看了
private void fireThrowCallback(final Invoker<?> invoker, final Invocation invocation, final Throwable exception) {
final Method onthrowMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_METHOD_KEY));
final Object onthrowInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_INSTANCE_KEY));
//onthrow callback not configured
if (onthrowMethod == null && onthrowInst == null) {
return;
}
if (onthrowMethod == null || onthrowInst == null) {
throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onthrow callback config , but no such " + (onthrowMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
}
if (!onthrowMethod.isAccessible()) {
onthrowMethod.setAccessible(true);
}
Class<?>[] rParaTypes = onthrowMethod.getParameterTypes();
if (rParaTypes[0].isAssignableFrom(exception.getClass())) {
try {
// 因?yàn)閛nthrow方法的參數(shù)第一個(gè)值必須為異常信息蚂会,所以這里需要構(gòu)造參數(shù)列表
Object[] args = invocation.getArguments();
Object[] params;
if (rParaTypes.length > 1) {
// 回調(diào)方法只有一個(gè)參數(shù)而且這個(gè)參數(shù)是數(shù)組(單獨(dú)拎出來(lái)計(jì)算的好處是這樣可以少?gòu)?fù)制一個(gè)數(shù)組)
if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {
params = new Object[2];
params[0] = exception;
params[1] = args;
} else {
// 回調(diào)方法有多于一個(gè)參數(shù)
params = new Object[args.length + 1];
params[0] = exception;
System.arraycopy(args, 0, params, 1, args.length);
}
} else {
// 回調(diào)方法沒(méi)有參數(shù)
params = new Object[]{exception};
}
onthrowMethod.invoke(onthrowInst, params);
} catch (Throwable e) {
logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), e);
}
} else {
logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), exception);
}
}
}
3 Provider
3.1 ContextFilter
ContextFilter和ConsumerContextFilter是結(jié)合使用的,之前的介紹中已經(jīng)看了ConsumerContextFilter狈定,下面再簡(jiǎn)單看一下ContextFilter颂龙,來(lái)驗(yàn)證上面講到的邏輯。
package com.alibaba.dubbo.rpc.filter;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcInvocation;
import java.util.HashMap;
import java.util.Map;
/**
* ContextInvokerFilter
*/
@Activate(group = Constants.PROVIDER, order = -10000)
public class ContextFilter implements Filter {
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
Map<String, String> attachments = invocation.getAttachments();
if (attachments != null) {
// 隱式參數(shù)重剔除一些核心消息
attachments = new HashMap<String, String>(attachments);
attachments.remove(Constants.PATH_KEY);
attachments.remove(Constants.GROUP_KEY);
attachments.remove(Constants.VERSION_KEY);
attachments.remove(Constants.DUBBO_VERSION_KEY);
attachments.remove(Constants.TOKEN_KEY);
attachments.remove(Constants.TIMEOUT_KEY);
attachments.remove(Constants.ASYNC_KEY);// Remove async property to avoid being passed to the following invoke chain.
}
RpcContext.getContext()
.setInvoker(invoker)
.setInvocation(invocation)
// .setAttachments(attachments) // merged from dubbox
.setLocalAddress(invoker.getUrl().getHost(),
invoker.getUrl().getPort());
// mreged from dubbox
// we may already added some attachments into RpcContext before this filter (e.g. in rest protocol)
if (attachments != null) {
// 這里又重新將invocation和attachments信息設(shè)置到RpcContext纽什,
// 這里設(shè)置以后provider的代碼就可以獲取到consumer端傳遞的一些隱式參數(shù)了
if (RpcContext.getContext().getAttachments() != null) {
RpcContext.getContext().getAttachments().putAll(attachments);
} else {
RpcContext.getContext().setAttachments(attachments);
}
}
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(invoker);
}
try {
return invoker.invoke(invocation);
} finally {
RpcContext.removeContext();
}
}
}
3.2 EchoFilter
回響測(cè)試主要用來(lái)檢測(cè)服務(wù)是否正常(網(wǎng)絡(luò)狀態(tài))措嵌,單純的檢測(cè)網(wǎng)絡(luò)情況的話其實(shí)不需要執(zhí)行真正的業(yè)務(wù)邏輯的,所以通過(guò)Filter驗(yàn)證一下即可芦缰。
package com.alibaba.dubbo.rpc.filter;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcResult;
/**
* EchoInvokerFilter
*/
@Activate(group = Constants.PROVIDER, order = -110000)
public class EchoFilter implements Filter {
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
if (inv.getMethodName().equals(Constants.$ECHO) && inv.getArguments() != null && inv.getArguments().length == 1)
return new RpcResult(inv.getArguments()[0]);
return invoker.invoke(inv);
}
}
3.3 ExecuteLimitFilter
服務(wù)端接口限制限流的具體執(zhí)行邏輯就是在ExecuteLimitFilter中企巢,因?yàn)榉?wù)端不需要考慮重試等待邏輯,一旦當(dāng)前執(zhí)行的線程數(shù)量大于指定數(shù)量让蕾,就直接返回失敗了浪规,所以實(shí)現(xiàn)邏輯相對(duì)于ActiveLimitFilter倒是簡(jiǎn)便了不少。
package com.alibaba.dubbo.rpc.filter;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcStatus;
import java.util.concurrent.Semaphore;
/**
* ThreadLimitInvokerFilter
*/
@Activate(group = Constants.PROVIDER, value = Constants.EXECUTES_KEY)
public class ExecuteLimitFilter implements Filter {
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
Semaphore executesLimit = null;
boolean acquireResult = false;
int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);
if (max > 0) {
RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());
// if (count.getActive() >= max) {
/**
* http://manzhizhen.iteye.com/blog/2386408
* use semaphore for concurrency control (to limit thread number)
*/
executesLimit = count.getSemaphore(max);
if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) {
throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");
}
}
long begin = System.currentTimeMillis();
boolean isSuccess = true;
RpcStatus.beginCount(url, methodName);
try {
Result result = invoker.invoke(invocation);
return result;
} catch (Throwable t) {
isSuccess = false;
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else {
throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
}
} finally {
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
if(acquireResult) {
executesLimit.release();
}
}
}
}
3.4 ExceptionFilter
Dubbo 對(duì)于異常的處理有自己的一套規(guī)則:
- 如果是 checked異常 則直接拋出探孝;
- 如果是unchecked異常 但是在接口上有聲明笋婿,也會(huì)直接拋出;
- 如果異常類(lèi)和接口類(lèi)在同一jar包里顿颅,直接拋出缸濒;
- 如果是 JDK自帶的異常 ,直接拋出粱腻;
- 如果是 Dubbo的異常 庇配,直接拋出;
- 其余的都包裝成RuntimeException然后拋出(避免異常在Client不能反序列化問(wèn)題)绍些;
package com.alibaba.dubbo.rpc.filter;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.common.utils.ReflectUtils;
import com.alibaba.dubbo.common.utils.StringUtils;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcResult;
import com.alibaba.dubbo.rpc.service.GenericService;
import java.lang.reflect.Method;
/**
* ExceptionInvokerFilter
* <p>
* Functions:
* <ol>
* <li>unexpected exception will be logged in ERROR level on provider side. Unexpected exception are unchecked
* exception not declared on the interface</li>
* <li>Wrap the exception not introduced in API package into RuntimeException. Framework will serialize the outer exception but stringnize its cause in order to avoid of possible serialization problem on client side</li>
* </ol>
*/
@Activate(group = Constants.PROVIDER)
public class ExceptionFilter implements Filter {
private final Logger logger;
public ExceptionFilter() {
this(LoggerFactory.getLogger(ExceptionFilter.class));
}
public ExceptionFilter(Logger logger) {
this.logger = logger;
}
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
try {
Result result = invoker.invoke(invocation);
if (result.hasException() && GenericService.class != invoker.getInterface()) {
try {
Throwable exception = result.getException();
// directly throw if it's checked exception
if (!(exception instanceof RuntimeException) && (exception instanceof Exception)) {
return result;
}
// directly throw if the exception appears in the signature
try {
Method method = invoker.getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes());
Class<?>[] exceptionClassses = method.getExceptionTypes();
for (Class<?> exceptionClass : exceptionClassses) {
if (exception.getClass().equals(exceptionClass)) {
return result;
}
}
} catch (NoSuchMethodException e) {
return result;
}
// for the exception not found in method's signature, print ERROR message in server's log.
logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost()
+ ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
+ ", exception: " + exception.getClass().getName() + ": " + exception.getMessage(), exception);
// directly throw if exception class and interface class are in the same jar file.
String serviceFile = ReflectUtils.getCodeBase(invoker.getInterface());
String exceptionFile = ReflectUtils.getCodeBase(exception.getClass());
if (serviceFile == null || exceptionFile == null || serviceFile.equals(exceptionFile)) {
return result;
}
// directly throw if it's JDK exception
String className = exception.getClass().getName();
if (className.startsWith("java.") || className.startsWith("javax.")) {
return result;
}
// directly throw if it's dubbo exception
if (exception instanceof RpcException) {
return result;
}
// otherwise, wrap with RuntimeException and throw back to the client
return new RpcResult(new RuntimeException(StringUtils.toString(exception)));
} catch (Throwable e) {
logger.warn("Fail to ExceptionFilter when called by " + RpcContext.getContext().getRemoteHost()
+ ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
+ ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e);
return result;
}
}
return result;
} catch (RuntimeException e) {
logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost()
+ ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
+ ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e);
throw e;
}
}
}
到這捞慌,Dubbo中的幾個(gè)核心Filter已經(jīng)講完,F(xiàn)ilter其實(shí)沒(méi)有那么復(fù)雜柬批,在開(kāi)發(fā)過(guò)程中啸澡,也可以參考此思路實(shí)現(xiàn)自己的Filter鏈。