Opentracing基本模型
如圖甸鸟,在跟蹤鏈中有以下幾個比較重要的數(shù)據(jù)結(jié)構(gòu)和概念:
span:標(biāo)識一次分布式調(diào)用朵你,其自身包含了id舆瘪,parentId(指向上級Span的id)朽肥, traceIds梗脾,服務(wù)名稱等重要屬性,其應(yīng)盡量保持精簡繁涂;
trace:標(biāo)識整個請求鏈拱她,即一些列Span的組合。其自身的ID將貫穿整個調(diào)用鏈扔罪,其中的每個Span都必須攜帶這個traceId秉沼,因此traceId將在整個調(diào)用鏈中傳遞;
cs:客戶端發(fā)起請求矿酵,標(biāo)志Span的開始唬复;
sr:服務(wù)端接收到請求,并開始處理內(nèi)部事務(wù)全肮,其中sr - cs則為網(wǎng)絡(luò)延遲和時鐘抖動敞咧;
ss:服務(wù)端處理完請求,返回響應(yīng)內(nèi)容辜腺,其中ss - sr則為服務(wù)端處理請求耗時休建;
cr:客戶端接收到服務(wù)端響應(yīng)內(nèi)容,標(biāo)志著Span的結(jié)束评疗,其中cr - ss則為網(wǎng)絡(luò)延遲和時鐘抖動测砂。
客戶端調(diào)用時間=cr-cs
服務(wù)端處理時間=sr-ss
分布式系統(tǒng)調(diào)用跟蹤的基本架構(gòu)要求
低侵入性,高性能壤巷,高可用容錯邑彪,低丟失率等。
基于GRPC的分布式系統(tǒng)調(diào)用跟蹤實踐
創(chuàng)建TraceContext
TraceContext通過Threadlocal對span進行保存胧华,并且將traceid和spanid向底層服務(wù)傳遞寄症,zebra對線程上下文傳遞進行了封裝,具體參照GRPC如何實現(xiàn)公共參數(shù)與業(yè)務(wù)參數(shù)分離傳遞下面是TraceContext具體代碼
public class TraceContext{
private static final String SPAN_LIST_KEY = "spanList";
public static final String TRACE_ID_KEY = "traceId";
public static final String SPAN_ID_KEY = "spanId";
public static final String ANNO_CS = "cs";
public static final String ANNO_CR = "cr";
public static final String ANNO_SR = "sr";
public static final String ANNO_SS = "ss";
private TraceContext(){}
public static void setTraceId(String traceId) {
RpcContext.getContext().set(TRACE_ID_KEY, traceId);
}
public static String getTraceId() {
return (String) RpcContext.getContext().get(TRACE_ID_KEY);
}
public static String getSpanId() {
return (String) RpcContext.getContext().get(SPAN_ID_KEY);
}
public static void setSpanId(String spanId) {
RpcContext.getContext().set(SPAN_ID_KEY, spanId);
}
@SuppressWarnings("unchecked")
public static void addSpan(Span span){
((List<Span>)RpcContext.getContext().get(SPAN_LIST_KEY)).add(span);
}
@SuppressWarnings("unchecked")
public static List<Span> getSpans(){
return (List<Span>) RpcContext.getContext().get(SPAN_LIST_KEY);
}
public static void clear(){
RpcContext.getContext().remove(TRACE_ID_KEY);
RpcContext.getContext().remove(SPAN_ID_KEY);
RpcContext.getContext().remove(SPAN_LIST_KEY);
}
public static void start(){
clear();
RpcContext.getContext().set(SPAN_LIST_KEY, new ArrayList<Span>());
}
}
創(chuàng)建TraceAgent
TraceAgent將span信息上傳至kafka矩动,代碼如下:
public class TraceAgent {
private GrpcProperties grpcProperties;
private KafkaSender sender;
private AsyncReporter<zipkin2.Span> report;
public TraceAgent() {
grpcProperties = SpringContextUtils.getBean(GrpcProperties.class);
sender = KafkaSender.newBuilder().bootstrapServers(grpcProperties.getCallChainUpdAddr()).topic("zipkin").encoding(Encoding.JSON).build();
report = AsyncReporter.builder(sender).build();
}
public void send(final List<Span> spans){
spans.forEach(item ->{
report.report(item);
});
}
}
創(chuàng)建ZebraClientTracing
ZebraClientTracing用于記錄調(diào)用端的span信息有巧,具體代碼如下:
@Component
public class ZebraClientTracing {
public Span startTrace(String method) {
String id = IdUtils.get() + "";
String traceId = null;
if (null == TraceContext.getTraceId()) {
TraceContext.start();
traceId = id;
} else {
traceId = TraceContext.getTraceId();
}
long timestamp = System.currentTimeMillis() * 1000;
// 注冊本地信息
Endpoint endpoint = Endpoint.newBuilder().ip(NetUtils.getLocalHost()).serviceName(EtcdRegistry.serviceName)
.port(50003).build();
// 初始化span
Span consumerSpan = Span.newBuilder().localEndpoint(endpoint).id(id).traceId(traceId)
.parentId(TraceContext.getSpanId() + "").name(EtcdRegistry.serviceName).timestamp(timestamp)
.addAnnotation(timestamp, TraceContext.ANNO_CS).putTag("method", method)
.putTag("pkgId", RpcContext.getContext().getAttachment("pkg")).build();
// 將tracing id和spanid放到上下文
RpcContext.getContext().get().put(TraceContext.TRACE_ID_KEY, consumerSpan.traceId());
RpcContext.getContext().get().put(TraceContext.SPAN_ID_KEY, String.valueOf(consumerSpan.id()));
return consumerSpan;
}
public void endTrace(Span span, Stopwatch watch,int code) {
span = span.toBuilder().addAnnotation(System.currentTimeMillis() * 1000, TraceContext.ANNO_CR)
.duration(watch.stop().elapsed(TimeUnit.MICROSECONDS)).putTag("code", code+"").build();
TraceAgent traceAgent = new TraceAgent();
traceAgent.send(TraceContext.getSpans());
}
}
創(chuàng)建ZebraServerTracing
ZebraServerTracing用于記錄服務(wù)端的span信息,具體代碼如下:
@Component
public class ZebraServerTracing {
public Span startTrace(String method) {
String traceId = (String) RpcContext.getContext().get(TraceContext.TRACE_ID_KEY);
String parentSpanId = (String) RpcContext.getContext().get(TraceContext.SPAN_ID_KEY);
String id = IdUtils.get() + "";
TraceContext.start();
TraceContext.setTraceId(traceId);
TraceContext.setSpanId(parentSpanId);
long timestamp = System.currentTimeMillis() * 1000;
Endpoint endpoint = Endpoint.newBuilder().ip(NetUtils.getLocalHost()).serviceName(EtcdRegistry.serviceName)
.port(50003).build();
Span providerSpan = Span.newBuilder().id(id).parentId(parentSpanId).traceId(traceId)
.name(EtcdRegistry.serviceName).timestamp(timestamp).localEndpoint(endpoint)
.addAnnotation(timestamp, TraceContext.ANNO_SR).putTag("method", method)
.putTag("pkgId", RpcContext.getContext().getAttachment("pkg"))
.build();
TraceContext.addSpan(providerSpan);
return providerSpan;
}
public void endTrace(Span span, Stopwatch watch,int code) {
span = span.toBuilder().addAnnotation(System.currentTimeMillis() * 1000, TraceContext.ANNO_SS)
.duration(watch.stop().elapsed(TimeUnit.MICROSECONDS)).putTag("code", code+"").build();
TraceAgent traceAgent = new TraceAgent();
traceAgent.send(TraceContext.getSpans());
}
}
創(chuàng)建grpc client攔截器
public class HeaderClientInterceptor implements ClientInterceptor {
private static final Logger log = LogManager.getLogger(HeaderClientInterceptor.class);
private final ZebraClientTracing clientTracing;
public static ClientInterceptor instance() {
return new HeaderClientInterceptor();
}
private HeaderClientInterceptor() {
clientTracing = SpringContextUtils.getBean(ZebraClientTracing.class);
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
//判斷API網(wǎng)關(guān)是否要打開調(diào)用鏈
boolean isGatewayTracing = "1".equals(RpcContext.getContext().getAttachment(ZebraConstants.ZEBRA_OPEN_TRACING))?true:false;
boolean isSubTracing = RpcContext.getContext().get(TraceContext.TRACE_ID_KEY)!=null?true:false;
Stopwatch watch =null;
Span span =null;
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
if(isSubTracing||isGatewayTracing){
span =clientTracing.startTrace(method.getFullMethodName());
watch = Stopwatch.createStarted();
}
copyThreadLocalToMetadata(headers);
super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onHeaders(Metadata headers) {
super.onHeaders(headers);
}
@Override
public void onClose(Status status, Metadata trailers) {
super.onClose(status, trailers);
if(isSubTracing||isGatewayTracing)
clientTracing.endTrace(span, watch,status.getCode().value());
}
}, headers);
}
};
}
private void copyThreadLocalToMetadata(Metadata headers) {
Map<String, String> attachments = RpcContext.getContext().getAttachments();
Map<String, Object> values = RpcContext.getContext().get();
try {
if (!attachments.isEmpty()) {
headers.put(GrpcUtil.GRPC_CONTEXT_ATTACHMENTS, SerializerUtil.toJson(attachments));
}
if (!values.isEmpty()) {
headers.put(GrpcUtil.GRPC_CONTEXT_VALUES, SerializerUtil.toJson(values));
}
} catch (Throwable e) {
log.error(e.getMessage(), e);
}
}
}
創(chuàng)建grpc server攔截器
public class HeaderServerInterceptor implements ServerInterceptor {
private static final Logger log = LogManager.getLogger(HeaderServerInterceptor.class);
private final ZebraServerTracing serverTracing;
public static ServerInterceptor instance() {
return new HeaderServerInterceptor();
}
private HeaderServerInterceptor() {
serverTracing = SpringContextUtils.getBean(ZebraServerTracing.class);
}
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, final Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
boolean isSubTracing = RpcContext.getContext().get(TraceContext.TRACE_ID_KEY) != null ? true : false;
Stopwatch watch = null;
Span span = null;
@Override
public void request(int numMessages) {
if (isSubTracing) {
span = serverTracing.startTrace(call.getMethodDescriptor().getFullMethodName());
watch = Stopwatch.createStarted();
}
InetSocketAddress remoteAddress = (InetSocketAddress) call.getAttributes()
.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
RpcContext.getContext().setAttachment(ZebraConstants.REMOTE_ADDRESS, remoteAddress.getHostString());
copyMetadataToThreadLocal(headers);
log.debug("FullMethodName:{},RemoteAddress={},attachments={},context={}",
call.getMethodDescriptor().getFullMethodName(), remoteAddress.getHostString(),
headers.get(GrpcUtil.GRPC_CONTEXT_ATTACHMENTS), headers.get(GrpcUtil.GRPC_CONTEXT_VALUES));
super.request(numMessages);
}
@Override
public void close(Status status, Metadata trailers) {
delegate().close(status, trailers);
if(isSubTracing)
serverTracing.endTrace(span, watch,status.getCode().value());
}
}, headers);
}
private void copyMetadataToThreadLocal(Metadata headers) {
String attachments = headers.get(GrpcUtil.GRPC_CONTEXT_ATTACHMENTS);
String values = headers.get(GrpcUtil.GRPC_CONTEXT_VALUES);
try {
if (attachments != null) {
Map<String, String> attachmentsMap = SerializerUtil.fromJson(attachments,
new TypeToken<Map<String, String>>() {
}.getType());
RpcContext.getContext().setAttachments(attachmentsMap);
}
if (values != null) {
Map<String, Object> valuesMap = SerializerUtil.fromJson(values, new TypeToken<Map<String, Object>>() {
}.getType());
for (Map.Entry<String, Object> entry : valuesMap.entrySet()) {
RpcContext.getContext().set(entry.getKey(), entry.getValue());
}
}
} catch (Throwable e) {
log.error(e.getMessage(), e);
}
}
}