1.dubbo調(diào)用過程
以dubbo官方demo為例僚纷,在provider端矩距,從netty接收到消息,遞交給業(yè)務(wù)線程池處理開始怖竭,到真正調(diào)用到業(yè)務(wù)方法sayHello()結(jié)束锥债,中間經(jīng)過了十幾個(gè)Filter的處理。見下圖
那么這些Filter是如何初始化的痊臭,調(diào)用的時(shí)候又是如何執(zhí)行的呢哮肚?接下來一步一步介紹。
2.dubbo服務(wù)導(dǎo)出過程
dubbo在進(jìn)行服務(wù)導(dǎo)出時(shí)主要做了如下一些工作
可以看到:第二步中核心工作就包括Filter的初始化广匙。見下ProtocolFilterWrapper#export方法
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}
其中有一步是buildInvokerChain允趟,從名字上也可以看出,這是初始化一個(gè)責(zé)任鏈鸦致,對應(yīng)設(shè)計(jì)模式中的責(zé)任鏈模式拼窥。接著看這個(gè)責(zé)任鏈?zhǔn)窃趺闯跏蓟模≒rotocolFilterWrapper#buildInvokerChain)
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
//利用dubbo spi得到實(shí)現(xiàn)了Filter接口的所有實(shí)例,形成List<Filter>
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
//遍歷這個(gè)List<Filter>蹋凝,形成一個(gè)Invoker鏈表
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
@Override
public Class<T> getInterface() {
return invoker.getInterface();
}
@Override
public URL getUrl() {
return invoker.getUrl();
}
@Override
public boolean isAvailable() {
return invoker.isAvailable();
}
//執(zhí)行責(zé)任鏈上某個(gè)節(jié)點(diǎn)的invoke邏輯時(shí),同時(shí)會(huì)傳入next節(jié)點(diǎn)信息总棵,以支持鏈?zhǔn)綀?zhí)行
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
asyncResult = filter.invoke(next, invocation);
} catch (Exception e) {
// onError callback
if (filter instanceof ListenableFilter) {
Filter.Listener listener = ((ListenableFilter) filter).listener();
if (listener != null) {
listener.onError(e, invoker, invocation);
}
}
throw e;
}
return asyncResult;
}
@Override
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return new CallbackRegistrationInvoker<>(last, filters);
}
可以看到這個(gè)Filter的責(zé)任鏈初始化過程
1.通過dubbo spi機(jī)制鳍寂,取得所有Filter實(shí)例形成一個(gè)ArrayList<Filter>
2.遍歷這個(gè)ArrayList,以next指針初始化一個(gè)Invoker的鏈表InvokerList
3.Invoker執(zhí)行邏輯即執(zhí)行Filter的invoke方法的同時(shí)情龄,將next指針作為參數(shù)傳入迄汛,以支持鏈?zhǔn)秸{(diào)用
整個(gè)過程結(jié)束之后,會(huì)有兩個(gè)List骤视,一個(gè)ArrayList<Filter>鞍爱,一個(gè)以NEXT指針形成的InvokerList,這兩個(gè)List就是dubbo Filter機(jī)制的基礎(chǔ)专酗。
3.Filter接口
@SPI
public interface Filter {
/**
* Does not need to override/implement this method.
*/
Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException;
interface Listener {
void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation);
void onError(Throwable t, Invoker<?> invoker, Invocation invocation);
}
}
Filter接口提供了一個(gè)invoke方法睹逃,另一個(gè)是Listener接口,有onResponse,onError兩個(gè)方法沉填。
這兩部分對應(yīng)著Filter對請求和響應(yīng)的處理邏輯
請求的處理以AccessLogFilter#invoke的實(shí)現(xiàn)為例,可以看到疗隶,其首先進(jìn)行AccessLog的處理,然后調(diào)用
invoker.invoke().這個(gè)invoker即之前Filter初始化的時(shí)候翼闹,以Next指針形成的那個(gè)InvokerList鏈表中的節(jié)點(diǎn)斑鼻。這樣一來,整個(gè)鏈表中的節(jié)點(diǎn)都會(huì)得到順序執(zhí)行猎荠。
@Override
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
try {
String accessLogKey = invoker.getUrl().getParameter(ACCESS_LOG_KEY);
if (ConfigUtils.isNotEmpty(accessLogKey)) {
AccessLogData logData = buildAccessLogData(invoker, inv);
log(accessLogKey, logData);
}
} catch (Throwable t) {
logger.warn("Exception in AccessLogFilter of service(" + invoker + " -> " + inv + ")", t);
}
return invoker.invoke(inv);
}
響應(yīng)的處理可以見CallbackRegistrationInvoker#invoke
asyncResult = asyncResult.whenCompleteWithContext((r, t) -> {
for (int i = filters.size() - 1; i >= 0; i--) {
Filter filter = filters.get(i);
// onResponse callback
if (filter instanceof ListenableFilter) {
Filter.Listener listener = ((ListenableFilter) filter).listener();
if (listener != null) {
if (t == null) {
listener.onResponse(r, filterInvoker, invocation);
} else {
listener.onError(t, filterInvoker, invocation);
}
}
} else {
filter.onResponse(r, filterInvoker, invocation);
}
}
});
之前Filter初始化的時(shí)候求晶,會(huì)形成兩個(gè)list,next指針形成的那個(gè)鏈表用于對請求的處理蚁孔,另一個(gè)ArrayList<Filter> 就是在此時(shí)執(zhí)行,拿到結(jié)果之后饮焦,遍歷這個(gè)ArrayList,執(zhí)行其onResponse或者onError方法拒垃,如此一來停撞,請求和響應(yīng)應(yīng)就會(huì)經(jīng)過所有生效的Filter處理。
總結(jié):dubbo的Filter機(jī)制是一種典型的責(zé)任鏈模式悼瓮,這個(gè)鏈的基礎(chǔ)即上述兩個(gè)list戈毒。如果我們在自己的業(yè)務(wù)場景中,需要對請求或者響應(yīng)做一些通用的處理横堡,那么也很簡單埋市,直接基于dubbo spi( http://www.reibang.com/p/d4d7ebc8f7bb
),自定義邏輯實(shí)現(xiàn)Filter接口的相應(yīng)方法即可命贴。