我們知道拷淘,Zipkin這個(gè)工具可以幫助我們收集分布式系統(tǒng)中各個(gè)系統(tǒng)之間的調(diào)用連關(guān)系,而且除了Servlet之外還能收集:MQ茄猫、線程池、WebSocket困肩、Feign划纽、Hystrix、RxJava锌畸、WebFlux等等組件之間的調(diào)用關(guān)系勇劣。本篇文章就來分析一下Zipkin是如何完成這些功能的
我們先以最常用的Servlet接受請(qǐng)求為例來分析
在spring-cloud-sleuth的spring.factories文件中注入的很多類中包含了一個(gè)類:TraceWebServletAutoConfiguration
,一看就知道潭枣,這是為Servlet環(huán)境量身定制的一個(gè)自動(dòng)裝配類
在這個(gè)類中比默,創(chuàng)建了一個(gè)Filter,這個(gè)Filter就是攔截web請(qǐng)求卸耘,完成Servlet請(qǐng)求鏈路的收集的利器
@Bean
@ConditionalOnMissingBean
public TracingFilter tracingFilter(HttpTracing tracing) {
return (TracingFilter) TracingFilter.create(tracing);
}
我們直接來看這個(gè)攔截器都是做了一些什么東西吧
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
HttpServletRequest httpRequest = (HttpServletRequest)request;
HttpServletResponse httpResponse = this.servlet.httpResponse(response);
TraceContext context = (TraceContext)request.getAttribute(TraceContext.class.getName());
if (context != null) {
Scope scope = this.currentTraceContext.maybeScope(context);
try {
chain.doFilter(request, response);
} finally {
scope.close();
}
} else {
Span span = this.handler.handleReceive(this.extractor, httpRequest);
request.setAttribute(SpanCustomizer.class.getName(), span.customizer());
request.setAttribute(TraceContext.class.getName(), span.context());
Throwable error = null;
Scope scope = this.currentTraceContext.newScope(span.context());
try {
chain.doFilter(httpRequest, httpResponse);
} catch (ServletException | RuntimeException | Error | IOException var19) {
error = var19;
throw var19;
} finally {
scope.close();
if (this.servlet.isAsync(httpRequest)) {
this.servlet.handleAsync(this.handler, httpRequest, span);
} else {
this.handler.handleSend(ADAPTER.adaptResponse(httpRequest, httpResponse), error, span);
}
}
}
}
Span的創(chuàng)建
第一步退敦,嘗試從request中獲取TraceContext,TraceContext包含了本次請(qǐng)求的鏈路信息蚣抗,假如這個(gè)請(qǐng)求是從上游系統(tǒng)過來的話侈百,那么這里就會(huì)存在這個(gè)信息瓮下。
我們先重點(diǎn)看不存在上游系統(tǒng)時(shí)的分支,這個(gè)時(shí)候钝域,第一步就應(yīng)該去創(chuàng)建一個(gè)span讽坏。關(guān)于span和trace的概念上篇文章已經(jīng)提到過了,這里就不再展開了例证。
public <C> Span handleReceive(TraceContext.Extractor<C> extractor, C carrier, Req request) {
Span span = nextSpan(extractor.extract(carrier), request);
span.kind(Span.Kind.SERVER);
return handleStart(request, span);
}
Span nextSpan(TraceContextOrSamplingFlags extracted, Req request) {
if (extracted.sampled() == null) { // Otherwise, try to make a new decision
extracted = extracted.sampled(sampler.trySample(adapter, request));
}
return extracted.context() != null
? tracer.joinSpan(extracted.context())
: tracer.nextSpan(extracted);
}
這個(gè)三目表達(dá)式的意思是看當(dāng)前環(huán)境中是否存在span路呜,如果存在則加入當(dāng)前環(huán)境的span,否則繼續(xù)進(jìn)入創(chuàng)建span的邏輯
public Span nextSpan(TraceContextOrSamplingFlags extracted) {
TraceContext parent = extracted.context();
if (extracted.samplingFlags() != null) {
TraceContext implicitParent = currentTraceContext.get();
if (implicitParent == null) {
return toSpan(newContextBuilder(null, extracted.samplingFlags())
.extra(extracted.extra()).build());
}
// fall through, with an implicit parent, not an extracted one
parent = appendExtra(implicitParent, extracted.extra());
}
if (parent != null) {
TraceContext.Builder builder;
if (extracted.samplingFlags() != null) {
builder = newContextBuilder(parent, extracted.samplingFlags());
} else {
builder = newContextBuilder(parent, sampler);
}
return toSpan(builder.build());
}
TraceIdContext traceIdContext = extracted.traceIdContext();
if (extracted.traceIdContext() != null) {
Boolean sampled = traceIdContext.sampled();
if (sampled == null) sampled = sampler.isSampled(traceIdContext.traceId());
return toSpan(TraceContext.newBuilder()
.sampled(sampled)
.debug(traceIdContext.debug())
.traceIdHigh(traceIdContext.traceIdHigh()).traceId(traceIdContext.traceId())
.spanId(nextId())
.extra(extracted.extra()).build());
}
// TraceContextOrSamplingFlags is a union of 3 types, we've checked all three
throw new AssertionError("should not reach here");
}
首先會(huì)嘗試獲取trace织咧,因?yàn)槭堑谝淮握?qǐng)求胀葱,所以這個(gè)時(shí)候trace也不存在所以會(huì)進(jìn)入到toSpan
方法
public Span toSpan(TraceContext context) {
if (context == null) throw new NullPointerException("context == null");
TraceContext decorated = propagationFactory.decorate(context);
if (!noop.get() && Boolean.TRUE.equals(decorated.sampled())) {
return RealSpan.create(decorated, recorder, errorParser);
}
return NoopSpan.create(decorated);
}
這里如果我們沒有特殊指定的話會(huì)使用RealSpan來創(chuàng)建span,這個(gè)span的最終實(shí)現(xiàn)類是AutoValue_RealSpan
接著返回最開始的handleReceive
方法
public <C> Span handleReceive(TraceContext.Extractor<C> extractor, C carrier, Req request) {
Span span = nextSpan(extractor.extract(carrier), request);
span.kind(Span.Kind.SERVER);
return handleStart(request, span);
}
span創(chuàng)建完畢后就會(huì)設(shè)置kind笙蒙,這個(gè)kand代表了服務(wù)類型抵屿,這里就是設(shè)置了服務(wù)類型為服務(wù)端。
接下來就是去開啟記錄鏈路信息
Span handleStart(Req request, Span span) {
if (span.isNoop()) return span;
Scope ws = currentTraceContext.maybeScope(span.context());
try {
parser.request(adapter, request, span.customizer());
Endpoint.Builder remoteEndpoint = Endpoint.newBuilder();
if (parseRemoteEndpoint(request, remoteEndpoint)) {
span.remoteEndpoint(remoteEndpoint.build());
}
} finally {
ws.close();
}
return span.start();
}
開啟過程中記錄了幾個(gè)信息
public <Req> void request(HttpAdapter<Req, ?> adapter, Req req, SpanCustomizer customizer) {
customizer.name(spanName(adapter, req));
String method = adapter.method(req);
if (method != null) customizer.tag("http.method", method);
String path = adapter.path(req);
if (path != null) customizer.tag("http.path", path);
}
public Span start() {
return start(clock.currentTimeMicroseconds());
}
synchronized MutableSpan start(long timestamp) {
span.timestamp(this.timestamp = timestamp);
return this;
}
接著在回到文章最開始提到的Filter方法中
在span和trace創(chuàng)建完成后捅位,會(huì)把它們添加到request中
Scope的創(chuàng)建
然后是一個(gè)scope的創(chuàng)建轧葛,這個(gè)scope和日志組件說息息相關(guān)的。簡(jiǎn)單來說艇搀,它會(huì)把traceId尿扯、parentId、spanId打印到當(dāng)前系統(tǒng)打印的每一行日志中
public Scope newScope(@Nullable TraceContext currentSpan) {
final String previousTraceId = MDC.get("traceId");
final String previousParentId = MDC.get("parentId");
final String previousSpanId = MDC.get("spanId");
final String spanExportable = MDC.get("spanExportable");
final String legacyPreviousTraceId = MDC.get(LEGACY_TRACE_ID_NAME);
final String legacyPreviousParentId = MDC.get(LEGACY_PARENT_ID_NAME);
final String legacyPreviousSpanId = MDC.get(LEGACY_SPAN_ID_NAME);
final String legacySpanExportable = MDC.get(LEGACY_EXPORTABLE_NAME);
if (currentSpan != null) {
String traceIdString = currentSpan.traceIdString();
MDC.put("traceId", traceIdString);
MDC.put(LEGACY_TRACE_ID_NAME, traceIdString);
String parentId = currentSpan.parentId() != null ?
HexCodec.toLowerHex(currentSpan.parentId()) :
null;
replace("parentId", parentId);
replace(LEGACY_PARENT_ID_NAME, parentId);
String spanId = HexCodec.toLowerHex(currentSpan.spanId());
MDC.put("spanId", spanId);
MDC.put(LEGACY_SPAN_ID_NAME, spanId);
String sampled = String.valueOf(currentSpan.sampled());
MDC.put("spanExportable", sampled);
MDC.put(LEGACY_EXPORTABLE_NAME, sampled);
log("Starting scope for span: {}", currentSpan);
if (currentSpan.parentId() != null) {
if (log.isTraceEnabled()) {
log.trace("With parent: {}", currentSpan.parentId());
}
}
}
else {
MDC.remove("traceId");
MDC.remove("parentId");
MDC.remove("spanId");
MDC.remove("spanExportable");
MDC.remove(LEGACY_TRACE_ID_NAME);
MDC.remove(LEGACY_PARENT_ID_NAME);
MDC.remove(LEGACY_SPAN_ID_NAME);
MDC.remove(LEGACY_EXPORTABLE_NAME);
}
Scope scope = this.delegate.newScope(currentSpan);
class ThreadContextCurrentTraceContextScope implements Scope {
@Override public void close() {
log("Closing scope for span: {}", currentSpan);
scope.close();
replace("traceId", previousTraceId);
replace("parentId", previousParentId);
replace("spanId", previousSpanId);
replace("spanExportable", spanExportable);
replace(LEGACY_TRACE_ID_NAME, legacyPreviousTraceId);
replace(LEGACY_PARENT_ID_NAME, legacyPreviousParentId);
replace(LEGACY_SPAN_ID_NAME, legacyPreviousSpanId);
replace(LEGACY_EXPORTABLE_NAME, legacySpanExportable);
}
}
return new ThreadContextCurrentTraceContextScope();
}
Span的上送
接下來當(dāng)剩下的執(zhí)行鏈執(zhí)行完畢后焰雕,本次請(qǐng)求也就該結(jié)束了衷笋。在請(qǐng)求結(jié)束時(shí),span就會(huì)被上送到Zipkin服務(wù)端中
public void handleSend(@Nullable Resp response, @Nullable Throwable error, Span span) {
handleFinish(response, error, span);
}
void handleFinish(@Nullable Resp response, @Nullable Throwable error, Span span) {
if (span.isNoop()) return;
try {
Scope ws = currentTraceContext.maybeScope(span.context());
try {
parser.response(adapter, response, error, span.customizer());
} finally {
ws.close(); // close the scope before finishing the span
}
} finally {
finishInNullScope(span);
}
}
首先在span中記錄本次調(diào)用的相應(yīng)信息
public <Resp> void response(HttpAdapter<?, Resp> adapter, @Nullable Resp res,
@Nullable Throwable error, SpanCustomizer customizer) {
int statusCode = 0;
if (res != null) {
statusCode = adapter.statusCodeAsInt(res);
String nameFromRoute = spanNameFromRoute(adapter, res, statusCode);
if (nameFromRoute != null) customizer.name(nameFromRoute);
String maybeStatus = maybeStatusAsString(statusCode, 299);
if (maybeStatus != null) customizer.tag("http.status_code", maybeStatus);
}
error(statusCode, error, customizer);
}
接著清空Scope
void finishInNullScope(Span span) {
Scope ws = currentTraceContext.maybeScope(null);
try {
span.finish();
} finally {
ws.close();
}
}
之后說span的上傳
public void finish(TraceContext context) {
MutableSpan span = spanMap.remove(context);
if (span == null || noop.get()) return;
synchronized (span) {
span.finish(span.clock.currentTimeMicroseconds());
reporter.report(span.toSpan());
}
}
具體上傳的實(shí)現(xiàn)是由Sender
接口的實(shí)現(xiàn)類實(shí)現(xiàn)的矩屁,它的實(shí)現(xiàn)類默認(rèn)情況下是這三個(gè)
屏幕快照 2019-11-18 下午10.31.01
而一個(gè)span內(nèi)容則是這樣的
RabbitMQ鏈路追蹤
當(dāng)看完SpringMVC鏈路追蹤的實(shí)現(xiàn)方式之后右莱,再去看其他的方式,我想肯定是非常簡(jiǎn)單的档插。這里我們以RabbitMQ為例:
首先查找spring-cloud-sleuth的spring.factories文件,看到關(guān)于消息中間件的追蹤配置類是這個(gè)TraceMessagingAutoConfiguration
看這個(gè)類關(guān)于RabbitMQ的東西
@Configuration
@ConditionalOnProperty(value = "spring.sleuth.messaging.rabbit.enabled", matchIfMissing = true)
@ConditionalOnClass(RabbitTemplate.class)
protected static class SleuthRabbitConfiguration {
@Bean
@ConditionalOnMissingBean
SpringRabbitTracing springRabbitTracing(Tracing tracing,
SleuthMessagingProperties properties) {
return SpringRabbitTracing.newBuilder(tracing)
.remoteServiceName(properties.getMessaging().getRabbit().getRemoteServiceName())
.build();
}
@Bean
@ConditionalOnMissingBean
static SleuthRabbitBeanPostProcessor sleuthRabbitBeanPostProcessor(BeanFactory beanFactory) {
return new SleuthRabbitBeanPostProcessor(beanFactory);
}
}
這里其實(shí)大致就可以猜測(cè)出來了亚再,肯定是使用了SleuthRabbitBeanPostProcessor
在構(gòu)造RabbitTemplate
的使用做了一些改造郭膛,比如說加個(gè)攔截器啥的,然后當(dāng)使用RabbitTemplate
發(fā)送消息時(shí)自動(dòng)添加Header等東西就完成了整個(gè)流程了