0 前言
在現(xiàn)行微服務的趨勢下弛姜,一次調(diào)用的過程中涉及多個服務節(jié)點柠硕,產(chǎn)生的日志分布在不同的服務器上鲁纠,雖說可以使用ELK技術(shù)將分散的日志总棵,匯總到es中,但是如何將這些日志貫穿起來改含,則是一個關(guān)鍵問題情龄。
如果需要查看一次調(diào)用的全鏈路日志,則一般的做法是通過在系統(tǒng)邊界中產(chǎn)生一個 traceId
捍壤,向調(diào)用鏈的后續(xù)服務傳遞 traceId
骤视,后續(xù)服務繼續(xù)使用 traceId
打印日志,并再向其他后續(xù)服務傳遞 traceId
鹃觉,此過程簡稱专酗,traceId透傳。
在使用HTTP協(xié)議作為服務協(xié)議的系統(tǒng)里盗扇,可以統(tǒng)一使用一個封裝好的http client做traceId透傳祷肯。但是dubbo實現(xiàn)traceId透傳就稍微復雜些了。根據(jù)上節(jié)講的《☆聊聊Dubbo(六):核心源碼-Filter鏈原理》粱玲,一般情況下躬柬,會自定義Filter來實現(xiàn)traceId透傳,但還有兩種比較特殊的實現(xiàn)方式:(1)重新實現(xiàn)dubbo內(nèi)部的相關(guān)類抽减;(2)基于RpcContext實現(xiàn)允青;
1 基于重寫實現(xiàn)
1.1 源碼分析
Proxy 是 Dubbo 使用javassist為consumer 端service生成的動態(tài)代理instance。
Implement 是provider端的service實現(xiàn)instance卵沉。
traceId透傳颠锉,即要求Proxy 和 Implement具有相同的traceId。Dubbo具有良好的分層特征史汗,transport的對象是RPCInvocation琼掠。
所以,重寫的重點邏輯實現(xiàn)停撞,就是Proxy將traceId放入RPCInvocation瓷蛙,交由Client進行序列化和TCP傳輸,Server反序列化得到RPCInvocation戈毒,取出traceId艰猬,交由Implement即可。
下面為consumer端 JavassistProxyFactory
的代碼分析:
public class JavassistProxyFactory extends AbstractProxyFactory {
/**
* Spring容器啟動時埋市,該代理工廠類方法會為Consumer生成Service代理類
* invoker和interfaces都是從Spring配置文件中讀取出來
*/
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
// 生成Service代理類的每個方法的字節(jié)碼冠桃,都調(diào)用了InvokerInvocationHandler.invoke(...)方法,
// 做實際RpcInvocation包裝道宅、序列化食听、TCP傳輸胸蛛、反序列化結(jié)果
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper類不能正確處理帶$的類名
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
下面為consumer端 InvokerInvocationHandler
的代碼分析:
public class InvokerInvocationHandler implements InvocationHandler {
private final Invoker<?> invoker;
public InvokerInvocationHandler(Invoker<?> handler){
this.invoker = handler;
}
/**
* 真正調(diào)用RPC時,各個Service代理的字節(jié)碼里調(diào)用了這個通用的invoke
* proxy就是之前生成的代理對象樱报,第二個參數(shù)是方法名葬项,第三個參數(shù)是參數(shù)列表
* 知道了(1)哪個接口(2)哪個方法(3)參數(shù)是什么,就完全可以映射到Provider端實現(xiàn)并獲取返回值
*/
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
// 因為到這里肃弟,還是consumer端的業(yè)務線程玷室,所以在這里取ThreadLocal里的traceId零蓉,
// 再放入RpcInvocation的attachment笤受,那么Provider就可以從收到的RpcInvocation實例取出透傳的traceId
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
}
下面為Provider端 DubboProtocol
的代碼分析:
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
//如果是callback 需要處理高版本調(diào)用低版本的問題
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || methodsStr.indexOf(",") == -1){
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods){
if (inv.getMethodName().equals(method)){
hasMethod = true;
break;
}
}
}
if (!hasMethod){
logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );
return null;
}
}
// Provider收到報文之后,從線程池中取出一個線程敌蜂,反序列化出RpcInvocation箩兽、并調(diào)用實現(xiàn)類的對應方法
// 所以,此處就是Provider端的實現(xiàn)類的線程章喉,取出traceId汗贫,放入ThreadLocal中
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
return invoker.invoke(inv);
}
throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
1.2 具體實現(xiàn)
package com.alibaba.dubbo.rpc.proxy;
/**
* traceId工具類這個類是新添加的
*/
public class TraceIdUtil {
private static final ThreadLocal<String> TRACE_ID = new ThreadLocal<String>();
public static String getTraceId() {
return TRACE_ID.get();
}
public static void setTraceId(String traceId) {
TRACE_ID.set(traceId);
}
}
/**
* InvokerHandler 這個類 是修改的
*/
public class InvokerInvocationHandler implements InvocationHandler {
private final Invoker<?> invoker;
public InvokerInvocationHandler(Invoker<?> handler){
this.invoker = handler;
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
// 這里將cosumer 端的traceId放入RpcInvocation
RpcInvocation rpcInvocation = new RpcInvocation(method, args);
rpcInvocation.setAttachment("traceId", TraceIdUtil.getTraceId());
return invoker.invoke(rpcInvocation).recreate();
}
}
package com.alibaba.dubbo.rpc.protocol.dubbo;
/**
* dubbo protocol support 重新實現(xiàn)DubboProtocol
*
*/
public class DubboProtocol extends AbstractProtocol {
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
//如果是callback 需要處理高版本調(diào)用低版本的問題
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || methodsStr.indexOf(",") == -1){
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods){
if (inv.getMethodName().equals(method)){
hasMethod = true;
break;
}
}
}
if (!hasMethod){
logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
// 這里將收到的consumer端的traceId放入provider端的thread local
TraceIdUtil.setTraceId(inv.getAttachment("traceId"));
return invoker.invoke(inv);
}
throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
}
}
2 基于RpcContext實現(xiàn)
在具體講解自定義filter來實現(xiàn)透傳traceId的方案前,我們先來研究下RpcContext對象秸脱。其RpcContext本質(zhì)上是個ThreadLocal對象落包,其維護了一次rpc交互的上下文信息。
/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.rpc;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.utils.NetUtils;
/**
* Thread local context. (API, ThreadLocal, ThreadSafe)
*
* 注意:RpcContext是一個臨時狀態(tài)記錄器摊唇,當接收到RPC請求咐蝇,或發(fā)起RPC請求時,RpcContext的狀態(tài)都會變化巷查。
* 比如:A調(diào)B有序,B再調(diào)C,則B機器上岛请,在B調(diào)C之前旭寿,RpcContext記錄的是A調(diào)B的信息,在B調(diào)C之后崇败,RpcContext記錄的是B調(diào)C的信息盅称。
*
* @see com.alibaba.dubbo.rpc.filter.ContextFilter
* @author qian.lei
* @author william.liangf
* @export
*/
public class RpcContext {
private static final ThreadLocal<RpcContext> LOCAL = new ThreadLocal<RpcContext>() {
@Override
protected RpcContext initialValue() {
return new RpcContext();
}
};
/**
* get context.
*
* @return context
*/
public static RpcContext getContext() {
return LOCAL.get();
}
/**
* remove context.
*
* @see com.alibaba.dubbo.rpc.filter.ContextFilter
*/
public static void removeContext() {
LOCAL.remove();
}
private Future<?> future;
private List<URL> urls;
private URL url;
private String methodName;
private Class<?>[] parameterTypes;
private Object[] arguments;
private InetSocketAddress localAddress;
private InetSocketAddress remoteAddress;
private final Map<String, String> attachments = new HashMap<String, String>();
private final Map<String, Object> values = new HashMap<String, Object>();
// now we don't use the 'values' map to hold these objects
// we want these objects to be as generic as possible
private Object request;
private Object response;
@Deprecated
private List<Invoker<?>> invokers;
@Deprecated
private Invoker<?> invoker;
@Deprecated
private Invocation invocation;
protected RpcContext() {
}
/**
* Get the request object of the underlying RPC protocol, e.g. HttpServletRequest
*
* @return null if the underlying protocol doesn't provide support for getting request
*/
public Object getRequest() {
return request;
}
/**
* Get the request object of the underlying RPC protocol, e.g. HttpServletRequest
*
* @return null if the underlying protocol doesn't provide support for getting request or the request is not of the specified type
*/
@SuppressWarnings("unchecked")
public <T> T getRequest(Class<T> clazz) {
return (request != null && clazz.isAssignableFrom(request.getClass())) ? (T) request : null;
}
public void setRequest(Object request) {
this.request = request;
}
/**
* Get the response object of the underlying RPC protocol, e.g. HttpServletResponse
*
* @return null if the underlying protocol doesn't provide support for getting response
*/
public Object getResponse() {
return response;
}
/**
* Get the response object of the underlying RPC protocol, e.g. HttpServletResponse
*
* @return null if the underlying protocol doesn't provide support for getting response or the response is not of the specified type
*/
@SuppressWarnings("unchecked")
public <T> T getResponse(Class<T> clazz) {
return (response != null && clazz.isAssignableFrom(response.getClass())) ? (T) response : null;
}
public void setResponse(Object response) {
this.response = response;
}
/**
* is provider side.
*
* @return provider side.
*/
public boolean isProviderSide() {
URL url = getUrl();
if (url == null) {
return false;
}
InetSocketAddress address = getRemoteAddress();
if (address == null) {
return false;
}
String host;
if (address.getAddress() == null) {
host = address.getHostName();
} else {
host = address.getAddress().getHostAddress();
}
return url.getPort() != address.getPort() ||
! NetUtils.filterLocalHost(url.getIp()).equals(NetUtils.filterLocalHost(host));
}
/**
* is consumer side.
*
* @return consumer side.
*/
public boolean isConsumerSide() {
URL url = getUrl();
if (url == null) {
return false;
}
InetSocketAddress address = getRemoteAddress();
if (address == null) {
return false;
}
String host;
if (address.getAddress() == null) {
host = address.getHostName();
} else {
host = address.getAddress().getHostAddress();
}
return url.getPort() == address.getPort() &&
NetUtils.filterLocalHost(url.getIp()).equals(NetUtils.filterLocalHost(host));
}
/**
* get future.
*
* @param <T>
* @return future
*/
@SuppressWarnings("unchecked")
public <T> Future<T> getFuture() {
return (Future<T>) future;
}
/**
* set future.
*
* @param future
*/
public void setFuture(Future<?> future) {
this.future = future;
}
public List<URL> getUrls() {
return urls == null && url != null ? (List<URL>) Arrays.asList(url) : urls;
}
public void setUrls(List<URL> urls) {
this.urls = urls;
}
public URL getUrl() {
return url;
}
public void setUrl(URL url) {
this.url = url;
}
/**
* get method name.
*
* @return method name.
*/
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
/**
* get parameter types.
*
* @serial
*/
public Class<?>[] getParameterTypes() {
return parameterTypes;
}
public void setParameterTypes(Class<?>[] parameterTypes) {
this.parameterTypes = parameterTypes;
}
/**
* get arguments.
*
* @return arguments.
*/
public Object[] getArguments() {
return arguments;
}
public void setArguments(Object[] arguments) {
this.arguments = arguments;
}
/**
* set local address.
*
* @param address
* @return context
*/
public RpcContext setLocalAddress(InetSocketAddress address) {
this.localAddress = address;
return this;
}
/**
* set local address.
*
* @param host
* @param port
* @return context
*/
public RpcContext setLocalAddress(String host, int port) {
if (port < 0) {
port = 0;
}
this.localAddress = InetSocketAddress.createUnresolved(host, port);
return this;
}
/**
* get local address.
*
* @return local address
*/
public InetSocketAddress getLocalAddress() {
return localAddress;
}
public String getLocalAddressString() {
return getLocalHost() + ":" + getLocalPort();
}
/**
* get local host name.
*
* @return local host name
*/
public String getLocalHostName() {
String host = localAddress == null ? null : localAddress.getHostName();
if (host == null || host.length() == 0) {
return getLocalHost();
}
return host;
}
/**
* set remote address.
*
* @param address
* @return context
*/
public RpcContext setRemoteAddress(InetSocketAddress address) {
this.remoteAddress = address;
return this;
}
/**
* set remote address.
*
* @param host
* @param port
* @return context
*/
public RpcContext setRemoteAddress(String host, int port) {
if (port < 0) {
port = 0;
}
this.remoteAddress = InetSocketAddress.createUnresolved(host, port);
return this;
}
/**
* get remote address.
*
* @return remote address
*/
public InetSocketAddress getRemoteAddress() {
return remoteAddress;
}
/**
* get remote address string.
*
* @return remote address string.
*/
public String getRemoteAddressString() {
return getRemoteHost() + ":" + getRemotePort();
}
/**
* get remote host name.
*
* @return remote host name
*/
public String getRemoteHostName() {
return remoteAddress == null ? null : remoteAddress.getHostName();
}
/**
* get local host.
*
* @return local host
*/
public String getLocalHost() {
String host = localAddress == null ? null :
localAddress.getAddress() == null ? localAddress.getHostName()
: NetUtils.filterLocalHost(localAddress.getAddress().getHostAddress());
if (host == null || host.length() == 0) {
return NetUtils.getLocalHost();
}
return host;
}
/**
* get local port.
*
* @return port
*/
public int getLocalPort() {
return localAddress == null ? 0 : localAddress.getPort();
}
/**
* get remote host.
*
* @return remote host
*/
public String getRemoteHost() {
return remoteAddress == null ? null :
remoteAddress.getAddress() == null ? remoteAddress.getHostName()
: NetUtils.filterLocalHost(remoteAddress.getAddress().getHostAddress());
}
/**
* get remote port.
*
* @return remote port
*/
public int getRemotePort() {
return remoteAddress == null ? 0 : remoteAddress.getPort();
}
/**
* get attachment.
*
* @param key
* @return attachment
*/
public String getAttachment(String key) {
return attachments.get(key);
}
/**
* set attachment.
*
* @param key
* @param value
* @return context
*/
public RpcContext setAttachment(String key, String value) {
if (value == null) {
attachments.remove(key);
} else {
attachments.put(key, value);
}
return this;
}
/**
* remove attachment.
*
* @param key
* @return context
*/
public RpcContext removeAttachment(String key) {
attachments.remove(key);
return this;
}
/**
* get attachments.
*
* @return attachments
*/
public Map<String, String> getAttachments() {
return attachments;
}
/**
* set attachments
*
* @param attachment
* @return context
*/
public RpcContext setAttachments(Map<String, String> attachment) {
this.attachments.clear();
if (attachment != null && attachment.size() > 0) {
this.attachments.putAll(attachment);
}
return this;
}
public void clearAttachments() {
this.attachments.clear();
}
/**
* get values.
*
* @return values
*/
public Map<String, Object> get() {
return values;
}
/**
* set value.
*
* @param key
* @param value
* @return context
*/
public RpcContext set(String key, Object value) {
if (value == null) {
values.remove(key);
} else {
values.put(key, value);
}
return this;
}
/**
* remove value.
*
* @param key
* @return value
*/
public RpcContext remove(String key) {
values.remove(key);
return this;
}
/**
* get value.
*
* @param key
* @return value
*/
public Object get(String key) {
return values.get(key);
}
public RpcContext setInvokers(List<Invoker<?>> invokers) {
this.invokers = invokers;
if (invokers != null && invokers.size() > 0) {
List<URL> urls = new ArrayList<URL>(invokers.size());
for (Invoker<?> invoker : invokers) {
urls.add(invoker.getUrl());
}
setUrls(urls);
}
return this;
}
public RpcContext setInvoker(Invoker<?> invoker) {
this.invoker = invoker;
if (invoker != null) {
setUrl(invoker.getUrl());
}
return this;
}
public RpcContext setInvocation(Invocation invocation) {
this.invocation = invocation;
if (invocation != null) {
setMethodName(invocation.getMethodName());
setParameterTypes(invocation.getParameterTypes());
setArguments(invocation.getArguments());
}
return this;
}
/**
* @deprecated Replace to isProviderSide()
*/
@Deprecated
public boolean isServerSide() {
return isProviderSide();
}
/**
* @deprecated Replace to isConsumerSide()
*/
@Deprecated
public boolean isClientSide() {
return isConsumerSide();
}
/**
* @deprecated Replace to getUrls()
*/
@Deprecated
@SuppressWarnings({ "unchecked", "rawtypes" })
public List<Invoker<?>> getInvokers() {
return invokers == null && invoker != null ? (List)Arrays.asList(invoker) : invokers;
}
/**
* @deprecated Replace to getUrl()
*/
@Deprecated
public Invoker<?> getInvoker() {
return invoker;
}
/**
* @deprecated Replace to getMethodName(), getParameterTypes(), getArguments()
*/
@Deprecated
public Invocation getInvocation() {
return invocation;
}
/**
* 異步調(diào)用 畸肆,需要返回值验懊,即使步調(diào)用Future.get方法,也會處理調(diào)用超時問題.
* @param callable
* @return 通過future.get()獲取返回結(jié)果.
*/
@SuppressWarnings("unchecked")
public <T> Future<T> asyncCall(Callable<T> callable) {
try {
try {
setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
final T o = callable.call();
//local調(diào)用會直接返回結(jié)果.
if (o != null) {
FutureTask<T> f = new FutureTask<T>(new Callable<T>() {
public T call() throws Exception {
return o;
}
});
f.run();
return f;
} else {
}
} catch (Exception e) {
throw new RpcException(e);
} finally {
removeAttachment(Constants.ASYNC_KEY);
}
} catch (final RpcException e) {
return new Future<T>() {
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
public boolean isCancelled() {
return false;
}
public boolean isDone() {
return true;
}
public T get() throws InterruptedException, ExecutionException {
throw new ExecutionException(e.getCause());
}
public T get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException,
TimeoutException {
return get();
}
};
}
return ((Future<T>)getContext().getFuture());
}
/**
* oneway調(diào)用循狰,只發(fā)送請求咧擂,不接收返回結(jié)果.
* @param callable
*/
public void asyncCall(Runnable runable) {
try {
setAttachment(Constants.RETURN_KEY, Boolean.FALSE.toString());
runable.run();
} catch (Throwable e) {
//FIXME 異常是否應該放在future中逞盆?
throw new RpcException("oneway call error ." + e.getMessage(), e);
} finally {
removeAttachment(Constants.RETURN_KEY);
}
}
}
注:RpcContext里的attachments信息會填入到RpcInvocation對象中, 一起傳遞過去。
因此有人就建議可以簡單的把traceId注入到RpcContext中松申,這樣就可以簡單的實現(xiàn)traceId的透傳了云芦,事實是否如此俯逾,先讓我們來一起實踐一下。
定義Dubbo接口類:
public interface IEchoService {
String echo(String name);
}
編寫服務端代碼(Provider):
@Service("echoService")
public class EchoServiceImpl implements IEchoService {
@Override
public String echo(String name) {
String traceId = RpcContext.getContext().getAttachment("traceId");
System.out.println("name = " + name + ", traceId = " + traceId);
return name;
}
public static void main(String[] args) {
ClassPathXmlApplicationContext applicationContext =
new ClassPathXmlApplicationContext("spring-dubbo-test-producer.xml");
System.out.println("server start");
while (true) {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
}
}
}
}
編寫客戶端代碼(Consumer):
public class EchoServiceConsumer {
public static void main(String[] args) {
ClassPathXmlApplicationContext applicationContext =
new ClassPathXmlApplicationContext("spring-dubbo-test-consumer.xml");
IEchoService service = (IEchoService) applicationContext
.getBean("echoService");
// *) 設置traceId
RpcContext.getContext().setAttachment("traceId", "100001");
System.out.println(RpcContext.getContext().getAttachments());
// *) 第一調(diào)用
service.echo("lilei");
// *) 第二次調(diào)用
System.out.println(RpcContext.getContext().getAttachments());
service.echo("hanmeimei");
}
}
執(zhí)行的結(jié)果如下:
服務端輸出:
name = lilei, traceId = 100001
name = hanmeimei, traceId = null
客戶端輸出:
{traceId=100001}
{}
從服務端的輸出信息中舅逸,我們可以驚喜的發(fā)現(xiàn)桌肴,traceId確實傳遞過去了,但是只有第一次有琉历,第二次沒有坠七。而從客戶端對RpcContext的內(nèi)容輸出,也印證了這個現(xiàn)象旗笔,同時產(chǎn)生這個現(xiàn)象的本質(zhì)原因是 RpcContext對象的attachment在一次rpc交互后被清空了彪置。
給RpcContext的clearAttachments方法, 設置斷點后復現(xiàn). 我們可以找到如下調(diào)用堆棧:
java.lang.Thread.State: RUNNABLE
at com.alibaba.dubbo.rpc.RpcContext.clearAttachments(RpcContext.java:438)
at com.alibaba.dubbo.rpc.filter.ConsumerContextFilter.invoke(ConsumerContextFilter.java:50)
at com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:91)
at com.alibaba.dubbo.rpc.protocol.InvokerWrapper.invoke(InvokerWrapper.java:53)
at com.alibaba.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:77)
at com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke(AbstractClusterInvoker.java:227)
at com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker.invoke(MockClusterInvoker.java:72)
at com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler.invoke(InvokerInvocationHandler.java:52)
at com.alibaba.dubbo.common.bytecode.proxy0.echo(proxy0.java:-1)
at com.test.dubbo.EchoServiceConsumer.main(EchoServiceConsumer.java:20)
其最直接的調(diào)用為Dubbo自帶的ConsumerContextFilter,讓我們來分析其代碼:
@Activate(
group = {"consumer"},
order = -10000
)
public class ConsumerContextFilter implements Filter {
public ConsumerContextFilter() {
}
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
RpcContext.getContext().setInvoker(invoker).setInvocation(invocation)
.setLocalAddress(NetUtils.getLocalHost(), 0)
.setRemoteAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort());
if(invocation instanceof RpcInvocation) {
((RpcInvocation)invocation).setInvoker(invoker);
}
Result var3;
try {
var3 = invoker.invoke(invocation);
} finally {
RpcContext.getContext().clearAttachments();
}
return var3;
}
}
確實在finally代碼片段中蝇恶,我們發(fā)現(xiàn)RpcContext在每次rpc調(diào)用后, 都會清空attachment對象拳魁。
既然我們找到了本質(zhì)原因,那么解決方法撮弧,可以在每次調(diào)用的時候潘懊,重新設置下traceId,比如像這樣(看著感覺吃像相對難看了一點):
// *) 第一調(diào)用
RpcContext.getContext().setAttachment("traceId", "100001");
service.echo("lilei");
// *) 第二次調(diào)用
RpcContext.getContext().setAttachment("traceId", "100001");
service.echo("hanmeimei");
3 基于Filter實現(xiàn)
先引入一個工具類:
public class TraceIdUtils {
private static final ThreadLocal<String> traceIdCache
= new ThreadLocal<String>();
public static String getTraceId() {
return traceIdCache.get();
}
public static void setTraceId(String traceId) {
traceIdCache.set(traceId);
}
public static void clear() {
traceIdCache.remove();
}
}
然后我們定義一個Filter類:
package com.test.dubbo;
public class TraceIdFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
String traceId = RpcContext.getContext().getAttachment("traceId");
if ( !StringUtils.isEmpty(traceId) ) {
// *) 從RpcContext里獲取traceId并保存
TraceIdUtils.setTraceId(traceId);
} else {
// *) 交互前重新設置traceId, 避免信息丟失
RpcContext.getContext().setAttachment("traceId", TraceIdUtils.getTraceId());
}
// *) 實際的rpc調(diào)用
return invoker.invoke(invocation);
}
}
在resource目錄下, 添加META-INF/dubbo目錄, 繼而添加com.alibaba.dubbo.rpc.Filter文件:
編輯(com.alibaba.dubbo.rpc.Filter文件)內(nèi)容如下:
traceIdFilter=com.test.dubbo.TraceIdFilter
然后我們給dubbo的producer和consumer都配置對應的filter項:
服務端:
<dubbo:service interface="com.test.dubbo.IEchoService" ref="echoService" version="1.0.0"
filter="traceIdFilter"/>
客戶端:
<dubbo:reference interface="com.test.dubbo.IEchoService" id="echoService" version="1.0.0"
filter="traceIdFilter"/>
服務端的測試代碼小改為如下:
@Service("echoService")
public class EchoServiceImpl implements IEchoService {
@Override
public String echo(String name) {
String traceId = TraceIdUtils.getTraceId();
System.out.println("name = " + name + ", traceId = " + traceId);
return name;
}
}
客戶端的測試代碼片段為:
// *) 第一調(diào)用
RpcContext.getContext().setAttachment("traceId", "100001");
service.echo("lilei");
// *) 第二次調(diào)用
service.echo("hanmeimei");
同樣的代碼, 測試結(jié)果如下:
服務端輸出:
name = lilei, traceId = 100001
name = hanmeimei, traceId = 100001
客戶端輸出:
{traceId=100001}
{}
符合預期贿衍,感覺這個方案就非常優(yōu)雅了授舟。RpcContext的attachment依舊被清空(ConsumerContextFilter在自定義的Filter后執(zhí)行),但是每次rpc交互前贸辈,traceId會被重新注入释树,保證跟蹤線索透傳成功。