前言:ZIPKIN作為當(dāng)前“分布式服務(wù)鏈路跟蹤”問題的流行解決方案之一,正在被越來越多的公司和個(gè)人學(xué)習(xí)使用晾剖。其中很重要的一塊,就是上報(bào)鏈路數(shù)據(jù)梯嗽。那么知道服務(wù)端如何接收數(shù)據(jù)齿尽,以及我們該怎樣上報(bào)數(shù)據(jù)到服務(wù)端就顯得十分重要。雖然ZIPKIN官方也開源了一個(gè)客戶端Brave灯节,但是本文卻并不想直接介紹Brave雕什,而是想站在一個(gè)從零開發(fā)ZIPKIN客戶端的角度,一層層分析解決如何自己寫一個(gè)ZIPKIN客戶端显晶,直到最后引出Brave贷岸。本文最終想達(dá)到的效果,就是希望通過本文磷雇,能讓大家對ZIPKIN的鏈路上報(bào)有一個(gè)詳盡的理解和認(rèn)識偿警。
零行代碼快速開始
ZIPKIN是Spring Boot服務(wù),因此啟動起來十分方便唯笙,直接運(yùn)行ZIPKIN JAR包就可以了螟蒸。ZIPKIN JAR包我們可以自行編譯ZIPKIN源碼獲得,也可以從下面的倉庫獲取崩掘,這個(gè)倉庫專門存放ZIPKIN各種編譯好的JAR包七嫌,倉庫地址為:
https://dl.bintray.com/openzipkin/maven/io/zipkin/java/
進(jìn)入zipkin-server目錄下載server jar包,比如下載2.11.5版本苞慢,運(yùn)行jar包:
java -jar zipkin-server-2.11.5-exec.jar
這樣本地就起了一個(gè)ZIPKIN服務(wù)诵原,瀏覽器中輸入http://localhost:9411/zipkin/,即可打開ZIPKIN首頁,效果如下:
接下來绍赛,我們開始上報(bào)數(shù)據(jù)蔓纠,現(xiàn)在我不想寫任何代碼,那么就用postman發(fā)起一次post請求上報(bào)數(shù)據(jù)吧:
上報(bào)后在本地ZIPKIN服務(wù)上即可看到剛上報(bào)的數(shù)據(jù)效果:
沒有任何一句代碼吗蚌,一個(gè)完整的ZIPKIN數(shù)據(jù)上報(bào)存儲展示就完成了腿倚,那么我們來思考下:
1、如果自己寫一個(gè)上ZIPKIN客戶端蚯妇,該如何寫敷燎?
分析:要上傳數(shù)據(jù)給服務(wù)端,那么必須要搞清楚箩言,ZIPKIN服務(wù)端可以接受什么樣格式的數(shù)據(jù)懈叹?支持的編碼解碼協(xié)議是什么?還有支持那些通信傳輸協(xié)議分扎?
Task: 了解了服務(wù)怎么接收數(shù)據(jù)后澄成,客戶端的task也就明確了:
1)以ZIPKIN服務(wù)端支持的數(shù)據(jù)格式組織數(shù)據(jù);
2)以ZIPKIN服務(wù)端支持的編解碼協(xié)議編碼數(shù)據(jù)畏吓;
3)以ZIPKIN服務(wù)端支持的傳輸協(xié)議上報(bào)數(shù)據(jù)墨状。
2、基礎(chǔ)ZIPKIN客戶端完成后菲饼,如何適配各種組件肾砂?
描述:一個(gè)服務(wù)通常涉及多個(gè)組件,包括服務(wù)框架宏悦,消息中間件镐确,各種數(shù)據(jù)庫等,那么怎么上報(bào)這些組件的數(shù)據(jù)給服務(wù)端了饼煞,值得我們思考源葫?
分析:其實(shí)最簡單的方法就是采用一個(gè)裝飾者模式包裝一下組件接口,在其中加入上報(bào)的邏輯砖瞧,但是這種方法局限很大息堂,不靈活。除此之外块促,我們或許我們可以利用下攔截器技術(shù)荣堰,AOP技術(shù),以及Agent竭翠,探針等技術(shù)做到無侵入上報(bào)數(shù)據(jù)振坚。
服務(wù)端如何接收數(shù)據(jù)的?
要想自己寫一個(gè)ZIPKIN客戶端斋扰,必須搞清楚ZIPKIN服務(wù)端是怎么接收數(shù)據(jù)的渡八,包括數(shù)據(jù)格式協(xié)議是怎樣的啃洋?編解碼協(xié)議是怎樣的?還有支持那么傳輸協(xié)議呀狼?
ZIPKIN支持的數(shù)據(jù)格式是怎樣的?
ZIPKIN是兼容OpenTracing標(biāo)準(zhǔn)的损离,OpenTracing規(guī)定哥艇,每個(gè)Span包含如下狀態(tài):
- 操作名稱
- 起始時(shí)間
- 結(jié)束時(shí)間
- 一組 KV 值,作為階段的標(biāo)簽(Span Tags)
- 階段日志(Span Logs)
- 階段上下文(SpanContext)僻澎,其中包含 Trace ID 和 Span ID
- 引用關(guān)系(References)
OpenTracing規(guī)范與zipkin對應(yīng)關(guān)系如下:
OpenTracing | ZIPKIN |
---|---|
操作名稱 | name |
起始時(shí)間 | timestamp |
結(jié)束時(shí)間 | timestamp+duration |
標(biāo)簽 | tags |
Span上下文 | traceId; id |
引用關(guān)系 | parentId |
Span日志 | 無 |
除此之外貌踏,ZIPKIN SAPN還增加了其他幾個(gè)字端:
字端 | 描述 |
---|---|
Kind kind | Spanl類型,比如是Server還是Client |
Annotaion | 表示某個(gè)時(shí)間點(diǎn)發(fā)生的Event,Event類型:cs:Client Send 請求窟勃;sr:Server Receive到請求祖乳;ss:Server 處理完成、并Send Response秉氧;cr:Client Receive 到響應(yīng) |
Endpoint localEndpoint | 描述本地服務(wù)信息眷昆,比如服務(wù)名,ip等汁咏,方便根據(jù)服務(wù)名檢索鏈路 |
Endpoint remoteEndpoint | RPC調(diào)用時(shí)亚斋,描述遠(yuǎn)程服務(wù)的服務(wù)信息 |
最終,包含兼容OpenTracing標(biāo)準(zhǔn)的字端攘滩,以及其本身的一些字端后帅刊,zipkin的span數(shù)據(jù)字端如下:
了解了ZIPKIN的數(shù)據(jù)字端格式后,我們再看看ZIPKIN支持的編解碼協(xié)議漂问。
ZIPKIN支持那些編解碼協(xié)議赖瞒?
Zipkin主要支持三種編解碼協(xié)議,分別為JSON, PROTO3, THRIFT蚤假。
JSON編解碼如下:
JSON_V1 {
public Encoding encoding() {
return Encoding.JSON;
}
public boolean decode(byte[] bytes, Collection<Span> out) {
Span result = this.decodeOne(bytes);
if (result == null) {
return false;
} else {
out.add(result);
return true;
}
}
public boolean decodeList(byte[] spans, Collection<Span> out) {
return (new V1JsonSpanReader()).readList(spans, out);
}
public Span decodeOne(byte[] span) {
V1Span v1 = (V1Span)JsonCodec.readOne(new V1JsonSpanReader(), span);
List<Span> out = new ArrayList(1);
V1SpanConverter.create().convert(v1, out);
return (Span)out.get(0);
}
public List<Span> decodeList(byte[] spans) {
return decodeList(this, spans);
}
}
JSON_V2 {
public Encoding encoding() {
return Encoding.JSON;
}
public boolean decode(byte[] span, Collection<Span> out) {
return JsonCodec.read(new V2SpanReader(), span, out);
}
public boolean decodeList(byte[] spans, Collection<Span> out) {
return JsonCodec.readList(new V2SpanReader(), spans, out);
}
@Nullable
public Span decodeOne(byte[] span) {
return (Span)JsonCodec.readOne(new V2SpanReader(), span);
}
public List<Span> decodeList(byte[] spans) {
return decodeList(this, spans);
}
}
THRIFT編解碼如下:
THRIFT {
public Encoding encoding() {
return Encoding.THRIFT;
}
public boolean decode(byte[] span, Collection<Span> out) {
return ThriftCodec.read(span, out);
}
public boolean decodeList(byte[] spans, Collection<Span> out) {
return ThriftCodec.readList(spans, out);
}
public Span decodeOne(byte[] span) {
return ThriftCodec.readOne(span);
}
public List<Span> decodeList(byte[] spans) {
return decodeList(this, spans);
}
}
PROTO3編解碼如下:
PROTO3 {
public Encoding encoding() {
return Encoding.PROTO3;
}
public boolean decode(byte[] span, Collection<Span> out) {
return Proto3Codec.read(span, out);
}
public boolean decodeList(byte[] spans, Collection<Span> out) {
return Proto3Codec.readList(spans, out);
}
@Nullable
public Span decodeOne(byte[] span) {
return Proto3Codec.readOne(span);
}
public List<Span> decodeList(byte[] spans) {
return decodeList(this, spans);
}
}
其中Json是默認(rèn)支持的栏饮,也是使用起來最方便的,除此之外磷仰,還包括Thirft和Proto3可供開發(fā)者選擇抡爹。
ZIPKIN支持那些傳輸協(xié)議?
Zipkin默認(rèn)支持Http協(xié)議芒划,除此之外冬竟,它還支持kafka,rabbitmq以及scribe協(xié)議:
他們的初始化過程如下:
傳輸協(xié)議支持的編解碼協(xié)議如下:
其中Scribe限定了只支持Thirft協(xié)議,而HTTP民逼、Kafka和RabbitMQ則是三種協(xié)議都支持泵殴。
如何做到支持所有的編碼解碼協(xié)議了?ZIPKIN中提供了一個(gè)自動探測編解碼的類SpanBytesDecoderDetector拼苍,其中核心方法如下:
static BytesDecoder<Span> detectDecoder(byte[] bytes) {
if (bytes[0] <= 16) { // binary format
if (protobuf3(bytes)) return SpanBytesDecoder.PROTO3;
return SpanBytesDecoder.THRIFT; /* the first byte is the TType, in a range 0-16 */
} else if (bytes[0] != '[' && bytes[0] != '{') {
throw new IllegalArgumentException("Could not detect the span format");
}
if (contains(bytes, ENDPOINT_FIELD_SUFFIX)) return SpanBytesDecoder.JSON_V2;
if (contains(bytes, TAGS_FIELD)) return SpanBytesDecoder.JSON_V2;
return SpanBytesDecoder.JSON_V1;
}
現(xiàn)在我們已經(jīng)知道了zipkin服務(wù)接收的數(shù)據(jù)格式以及編解碼協(xié)議和傳輸協(xié)議笑诅,那么接下來就可以寫一個(gè)客戶端了调缨!
自己寫一個(gè)ZIPKIN客戶端
對于服務(wù)端如何接收數(shù)據(jù),有了一個(gè)全面的認(rèn)識后吆你,我們就可以著手開始寫一個(gè)ZIPKIN客戶端了弦叶。
那么,首先定義客戶端上報(bào)的數(shù)據(jù)格式妇多,最簡單的方式就是定義一個(gè)跟ZIPKIN服務(wù)端一樣數(shù)據(jù)格式的Span就可以了:
@Setter
@Getter
public static class MySapn {
private String traceId;
private String parentId;
private String id;
private String name;
private long timestamp;
private long duration;
private Map<String, String> tags;
String kind;
Endpoint localEndpoint, remoteEndpoint;
public static enum Kind {
CLIENT,
SERVER,
PRODUCER,
CONSUMER
}
public static class Endpoint {
String serviceName, ipv4, ipv6;
byte[] ipv4Bytes, ipv6Bytes;
int port; // zero means null
public Endpoint(String serviceName) {
this.serviceName = serviceName;
}
}
}
數(shù)據(jù)格式確定后伤哺,接著就編碼數(shù)據(jù),ZIPKIN支持三種編碼方式者祖,JSON立莉、THIFT和PROTO3,為了簡單方便七问,我們選擇JSON協(xié)議編碼Span數(shù)據(jù)。注意械巡,ZIPKIN JSON字符串前后需要加括號刹淌。
數(shù)據(jù)編碼后,接著上報(bào)數(shù)據(jù)讥耗,ZIPKIN默認(rèn)支持HTTP協(xié)議方式芦鳍,JAVA HTTP請求包很多,我們隨便選擇一種葛账,比如選擇Apach的HttpClient jar包柠衅,代碼如下:
public class App {
private static final String serverUrl = "http://localhost:9411/api/v2/spans";
public static void main(String[] args) {
MySapn span = new MySapn();
span.traceId = "1ae1e4f435814744";
span.parentId = "1ae1e4f435814744";
span.id = "d1ab9cd2c50d13d1";
span.kind = MySapn.Kind.SERVER.toString();
span.name = "my client test";
span.timestamp = 1565933251470428L;
span.duration = 8286;
span.localEndpoint = new MySapn.Endpoint("My client");
Map<String, String> tags = new HashMap<>();
tags.put("name", "pioneeryi");
tags.put("lover", "dandan");
span.tags = tags;
doPost(serverUrl, span);
System.out.println("Hello World!");
}
public static void doPost(String url, MySapn span) {
try {
HttpClient httpClient = new DefaultHttpClient();
HttpPost post = new HttpPost(url);
post.setHeader("Content-Type", "application/json");
post.setHeader("charset", "UTF-8");
String body = new Gson().toJson(span);
body = "[" + body + "]";
System.out.print(body);
StringEntity entity = new StringEntity(body);
post.setEntity(entity);
HttpResponse httpResponse = httpClient.execute(post);
System.out.print(httpResponse);
} catch (Exception exception) {
System.out.print("do post request fail");
}
}
}
maven pom如下:
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.6</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
運(yùn)行上面main方法,即可上報(bào)Span數(shù)據(jù)籍琳,打開zipkin首頁:http://localhost:9411/zipkin/菲宴,搜索剛上報(bào)的Span的TraceId,展示效果如下:
一個(gè)最簡單的趋急,采用JSON編碼數(shù)據(jù)喝峦,HTTP協(xié)議上傳數(shù)據(jù)的客戶端我們就完成了。為了用戶調(diào)用方便呜达,我們可以將上面的代碼封裝為一個(gè)接口供用戶使用:
public void reportSpan(MySpan span)
但是這個(gè)太簡陋了谣蠢,我們只支持了一種一個(gè)編解碼協(xié)議,一種傳輸協(xié)議查近,開始優(yōu)化:
優(yōu)化一:支持多種編解碼眉踱,支持多種傳輸協(xié)議。這個(gè)時(shí)候就需要定一個(gè)上報(bào)接口類了霜威,根據(jù)不同傳輸協(xié)議提供不同實(shí)現(xiàn)谈喳。編解碼也是同樣的,定義編碼接口戈泼,根據(jù)不同編碼協(xié)議提供不同實(shí)現(xiàn)婿禽。
此外赏僧,當(dāng)前這個(gè)客戶端是同步上報(bào)的,性能很差扭倾,因此必須改成異步上報(bào)淀零,接著優(yōu)化:
優(yōu)化二:上報(bào)改成異步上報(bào),隊(duì)列+線程膛壹。
有了這兩步優(yōu)化后驾中,client大體框架就初步成型了,寫好了客戶端后恢筝,怎么適配各個(gè)組件哀卫,做到無侵入上報(bào)也很重要巨坊,繼續(xù)優(yōu)化:
優(yōu)化三:適配各個(gè)組件撬槽,比如Spring Boot,Kafak,MySql等等趾撵。
一個(gè)完整的Client侄柔,還是有很多工作要做的,這里咱就不繼續(xù)深入優(yōu)化開發(fā)了占调,直接看看官方的Brave怎么做的暂题!
探究ZIPKIN客戶端Brave
為了說明Brave的使用和上報(bào)過程,我們先寫一個(gè)很簡單的上報(bào)Demo究珊,進(jìn)行演示薪者。Demo將上報(bào)一個(gè)“一父Span,兩個(gè)子Span“的鏈路,demo如下:
public class TraceDemo {
public static void main(String[] args) {
Sender sender = OkHttpSender.create("http://localhost:9411/api/v2/spans");
AsyncReporter asyncReporter = AsyncReporter.create(sender);
Tracing tracing = Tracing.newBuilder()
.localServiceName("my-service")
.spanReporter(asyncReporter)
.build();
Tracer tracer = tracing.tracer();
Span parentSpan = tracer.newTrace().name("parent span").start();
Span childSpan1 = tracer.newChild(parentSpan.context()).name("child span1").start();
sleep(500);
childSpan1.finish();
Span childSpan2 = tracer.newChild(parentSpan.context()).name("child span2").start();
sleep(500);
childSpan2.finish();
parentSpan.finish();
}
}
其中 maven pom 引入如下包:
<dependency>
<groupId>io.zipkin.brave</groupId>
<artifactId>brave</artifactId>
<version>5.3.3</version>
</dependency>
<dependency>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-sender-okhttp3</artifactId>
<version>2.6.0</version>
</dependency>
啟動zipkin 服務(wù):
java -jar zipkin-server-2.11.5-exec.jar
然后在瀏覽器中輸入:http://localhost:9411 剿涮,即可打開zipkin首頁言津,看到示例代碼上報(bào)的效果圖如下:
一個(gè)最簡單的但是卻是很完整的一個(gè)ZIPKIN鏈路上報(bào)演示,就如上面Demo所示取试,那么接下來分析一下整個(gè)鏈路的上報(bào)過程悬槽!
鏈路上報(bào)解析
自己開發(fā)一個(gè)客戶端時(shí),我們首先會封裝一個(gè)Span類瞬浓,Brave也不例外初婆,它也定義了Span數(shù)據(jù)結(jié)構(gòu);那么定義好Span后猿棉,誰來負(fù)責(zé)構(gòu)造Span了磅叛?
Brave中定義了一個(gè)類叫Tracer來完成構(gòu)造Span的工作;Brave生成好了Span后萨赁,此時(shí)需要編碼發(fā)送了宪躯,那么誰又來發(fā)送了?
Brave定義了Reporter組件位迂,它支持異步發(fā)送以及多傳輸協(xié)議以及多編碼協(xié)議發(fā)送Span數(shù)據(jù)访雪。
看起來详瑞,各個(gè)組件已經(jīng)完備了,組件有點(diǎn)多臣缀!此時(shí)需要那么一個(gè)人將這些組件組織起來坝橡,同時(shí)與服務(wù)端取得聯(lián)系,開始打通整個(gè)流程了精置,這就是Tracing類的功能计寇。
接下來詳細(xì)講解一下各個(gè)組件的功能,并根據(jù)Demo代碼脂倦,將各個(gè)組件串起來番宁,最終梳理清楚Brave上報(bào)的流程和原理。
Brave Span
Brave中Span相關(guān)類有:SpanCustomizer赖阻、NoopSpanCustomizer蝶押、CurrentSpanCustomizer、RealSpanCustomizer火欧、Span棋电、NoopSpan、RealSpan苇侵。
示例代碼中赶盔,關(guān)于span的操作如下:
Span span = tracer.newTrace().name("encode").start();
try {
doSomethingExpensive();
} finally {
span.finish();
}
首先通過tracer生成一個(gè)span,最后榆浓,調(diào)用span.finish()上報(bào)于未,接下來就來看看span杨名,以及這個(gè)finish干了什么蓉冈。
咱們用的Span的實(shí)現(xiàn)子類為RealSpan,RealSpan兩個(gè)核心方法start, finish:
@Override
public Span start() {
return start(clock.currentTimeMicroseconds());
}
@Override
public Span start(long timestamp) {
synchronized (state) {
state.startTimestamp(timestamp);
}
return this;
}
start方法主要記錄開始時(shí)間填渠,接下來看看finish 方法:
@Override
public void finish(long timestamp) {
if (!pendingSpans.remove(context)) return;
synchronized (state) {
state.finishTimestamp(timestamp);
}
finishedSpanHandler.handle(context, state);
}
這里交給FinishedSpanHandler來處理杉适。FinishedSpanHandler是一個(gè)抽象類谎倔,他有如下子類實(shí)現(xiàn):
- ZipkinFinishedSpanHandler
- MetricsFinishedSpanHandler
- NoopAwareFinishedSpan
- CompositeFinishedSpanHandler
我們主要關(guān)注ZipkinFinishedSpanHandler實(shí)現(xiàn):
public final class ZipkinFinishedSpanHandler extends FinishedSpanHandler {
final Reporter<zipkin2.Span> spanReporter;
final MutableSpanConverter converter;
public ZipkinFinishedSpanHandler(Reporter<zipkin2.Span> spanReporter,
ErrorParser errorParser, String serviceName, String ip, int port) {
this.spanReporter = spanReporter;
this.converter = new MutableSpanConverter(errorParser, serviceName, ip, port);
}
@Override public boolean handle(TraceContext context, MutableSpan span) {
if (!Boolean.TRUE.equals(context.sampled())) return true;
Span.Builder builderWithContextData = Span.newBuilder()
.traceId(context.traceIdString())
.parentId(context.parentIdString())
.id(context.spanIdString());
if (context.debug()) builderWithContextData.debug(true);
converter.convert(span, builderWithContextData);
spanReporter.report(builderWithContextData.build());
return true;
}
@Override public String toString() {
return spanReporter.toString();
}
}
可見,上面最終是通過Reporter組件來上報(bào)數(shù)據(jù)的猿推。那么Report是如何上報(bào)的了片习?
Brave Reporter
因?yàn)樯蠄?bào)組件要支持多種編碼協(xié)議以及多種傳輸協(xié)議,因此邏輯比較復(fù)雜蹬叭,官方專門建了一個(gè)項(xiàng)目:zipkin-reporter-java
我們首先看看Reporter接口類定義:
public interface Reporter<S> {
Reporter<Span> NOOP = new Reporter<Span>() {
@Override public void report(Span span) {
}
@Override public String toString() {
return "NoopReporter{}";
}
};
Reporter<Span> CONSOLE = new Reporter<Span>() {
@Override public void report(Span span) {
System.out.println(span.toString());
}
@Override public String toString() {
return "ConsoleReporter{}";
}
};
/**
* Schedules the span to be sent onto the transport.
*
* @param span Span, should not be <code>null</code>.
*/
void report(S span);
}
Reporter有三個(gè)子類實(shí)現(xiàn)藕咏,分別是CONSOLE,NOOP秽五,和AsyncReporter孽查。CONSOLE就是直接控制臺打印出Span數(shù)據(jù),一般用來調(diào)試差不多坦喘;NOOP是一個(gè)空實(shí)現(xiàn)盲再,啥也不干西设;AsyncReporter是我們平時(shí)上報(bào)用的Reporter,他提供異步上報(bào)數(shù)據(jù)能力答朋。
AsyncReporter
AsyncReporter is how you actually get spans to zipkin. By default, it waits up to a second before flushes any pending spans out of process via a Sender.
根據(jù)不同協(xié)議贷揽,Ayncreporter執(zhí)行相應(yīng)的build邏輯:
public AsyncReporter<Span> build() {
switch(this.sender.encoding()) {
case JSON:
return this.build(SpanBytesEncoder.JSON_V2);
case PROTO3:
return this.build(SpanBytesEncoder.PROTO3);
case THRIFT:
return this.build(SpanBytesEncoder.THRIFT);
default:
throw new UnsupportedOperationException(this.sender.encoding().name());
}
}
build方法詳細(xì)邏輯,如下:
public <S> AsyncReporter<S> build(BytesEncoder<S> encoder) {
if (encoder == null) throw new NullPointerException("encoder == null");
if (encoder.encoding() != sender.encoding()) {
throw new IllegalArgumentException(String.format(
"Encoder doesn't match Sender: %s %s", encoder.encoding(), sender.encoding()));
}
final BoundedAsyncReporter<S> result = new BoundedAsyncReporter<>(this, encoder);
if (messageTimeoutNanos > 0) {
// Start a thread that flushes the queue in a loop.
final BufferNextMessage<S> consumer =
BufferNextMessage.create(encoder.encoding(), messageMaxBytes, messageTimeoutNanos);
Thread flushThread = threadFactory.newThread(new Flusher<>(result, consumer));
flushThread.setName("AsyncReporter{" + sender + "}");
flushThread.setDaemon(true);
flushThread.start();
}
return result;
}
可以看到AsyncReporter的build方法中梦碗,啟動了一個(gè)守護(hù)線程flushThread禽绪,一直循環(huán)調(diào)用BoundedAsyncReporter的flush方法:
void flush(BufferNextMessage<S> bundler) {
if (closed.get()) throw new IllegalStateException("closed");
pending.drainTo(bundler, bundler.remainingNanos());
// record after flushing reduces the amount of gauge events vs on doing this on report
metrics.updateQueuedSpans(pending.count);
metrics.updateQueuedBytes(pending.sizeInBytes);
// loop around if we are running, and the bundle isn't full
// if we are closed, try to send what's pending
if (!bundler.isReady() && !closed.get()) return;
// Signal that we are about to send a message of a known size in bytes
metrics.incrementMessages();
metrics.incrementMessageBytes(bundler.sizeInBytes());
ArrayList<byte[]> nextMessage = new ArrayList<>(bundler.count());
bundler.drain(new SpanWithSizeConsumer<S>() {
@Override public boolean offer(S next, int nextSizeInBytes) {
nextMessage.add(encoder.encode(next)); // speculatively add to the pending message
if (sender.messageSizeInBytes(nextMessage) > messageMaxBytes) {
// if we overran the message size, remove the encoded message.
nextMessage.remove(nextMessage.size() - 1);
return false;
}
return true;
}
});
try {
sender.sendSpans(nextMessage).execute();
} catch (IOException | RuntimeException | Error t) {
......
}
}
Flush方法的主要邏輯如下:
- 將隊(duì)列pending中的數(shù)據(jù),提取到nextMessage鏈表中洪规;
- 調(diào)用Sender的sendSpans方法印屁,發(fā)送到nextMessage鏈表中的Span數(shù)據(jù)到Zipkin;
這樣斩例,Reporter即做到了異步發(fā)送雄人!
Sender
Sender組件完成發(fā)送Span到zipkin服務(wù)端的最后一步,即利用某個(gè)傳輸協(xié)議樱拴,將數(shù)據(jù)發(fā)送到zipkin服務(wù)端柠衍。
public abstract class Sender extends Component {
public Sender() {
}
public abstract Encoding encoding();
public abstract int messageMaxBytes();
public abstract int messageSizeInBytes(List<byte[]> var1);
public int messageSizeInBytes(int encodedSizeInBytes) {
return this.messageSizeInBytes(Collections.singletonList(new byte[encodedSizeInBytes]));
}
public abstract Call<Void> sendSpans(List<byte[]> var1);
}
其中核心的方法sendSpans是一個(gè)抽象方法洋满,不同傳輸協(xié)議的Sender會提供具體的實(shí)現(xiàn)邏輯晶乔,其子類有:
ActiveMQSender、FakeSender牺勾、KafkaSender正罢、LibthriftSender、OkHttpSender驻民、RabbitMQSender翻具、URLConnectionSender。
不同協(xié)議均按照自身協(xié)議規(guī)范執(zhí)行發(fā)送邏輯回还,因?yàn)槲覀兊腄emo中用的是OkHttpSender裆泳,所以我們主要看看OkHttpSender是如何實(shí)現(xiàn)的。Demo中使用如下:
Sender sender = OkHttpSender.create("http://localhost:9411/api/v2/spans");
AsyncReporter asyncReporter = AsyncReporter.create(sender);
這里柠硕,通過AsyncReporter.create方法工禾,我們將OkHttpSender注入到了Reporter中,那么接下來看看OkHttpSender的sendSpans方法實(shí)現(xiàn):
@Override
public zipkin2.Call<Void> sendSpans(List<byte[]> encodedSpans) {
if (closeCalled) throw new IllegalStateException("closed");
Request request;
try {
request = newRequest(encoder.encode(encodedSpans));
} catch (IOException e) {
throw zipkin2.internal.Platform.get().uncheckedIOException(e);
}
return new HttpCall(client.newCall(request));
}
執(zhí)行完這個(gè)方法后蝗柔,會返回一個(gè)HttpCall闻葵,Reporter的flush方法中會調(diào)用HttpCall的execute方法,完成Http請求發(fā)送癣丧。
Brave Tracer
Span數(shù)據(jù)結(jié)構(gòu)槽畔,包括發(fā)送Span的組件我們搞清楚了,那么誰來負(fù)責(zé)創(chuàng)建Span了胁编?這就是Tracer的工作厢钧,他負(fù)責(zé)創(chuàng)建Span及提供Span的各種操作鳞尔,主要方法如下表所示:
方法名 | 描述 |
---|---|
Span newTrace() | 創(chuàng)建一個(gè)Root Span |
Span joinSpan(TraceContext context) | 公用一個(gè)SpanId,主要存在于RPC場景中 |
Span newChild(TraceContext parent) | 創(chuàng)建一個(gè)子Span |
Span nextSpan(TraceContextOrSamplingFlags extracted) | 基于請求的參數(shù)信息創(chuàng)建一個(gè)新的Span |
Span toSpan(TraceContext context) | 通過TraceContext創(chuàng)建一個(gè)Span |
Span currentSpan() | 獲取當(dāng)前Span |
Span nextSpan() | 基于當(dāng)前Span生成一個(gè)子Span |
Brave Tracing
現(xiàn)在Span有了,創(chuàng)建Span的組件有了早直,發(fā)送Span的組件也有了铅檩,那就只需要一個(gè)把他們組合起來的類似工廠的角色了,那就是Tracing,他的主要工作就是連接服務(wù)器莽鸿,然后利用Tracer創(chuàng)建出Span昧旨,接著發(fā)送Span到zipkin服務(wù)端。
Tracing源碼采用的Builder模式祥得,再看看我們Demo中創(chuàng)建Tracing的代碼:
Tracing tracing = Tracing.newBuilder()
.localServiceName("my-service")
.spanReporter(asyncReporter)
.build();
我們Tracing.newBuilder()創(chuàng)建了一個(gè)Tracing的Builder兔沃,然后指定了這個(gè)Tracing的服務(wù)名,使用什么Reporter级及,接著調(diào)用了Builder的build方法乒疏,我們看看build方法代碼:
public Tracing build() {
if (clock == null) clock = Platform.get().clock();
if (localIp == null) localIp = Platform.get().linkLocalIp();
if (spanReporter == null) spanReporter = new LoggingReporter();
return new Default(this);
}
它調(diào)用了Tracing的默認(rèn)實(shí)現(xiàn),默認(rèn)實(shí)現(xiàn)子類如下:
static final class Default extends Tracing {
final Tracer tracer;
final Propagation.Factory propagationFactory;
final Propagation<String> stringPropagation;
final CurrentTraceContext currentTraceContext;
final Sampler sampler;
final Clock clock;
final ErrorParser errorParser;
Default(Builder builder) {
this.clock = builder.clock;
this.errorParser = builder.errorParser;
this.propagationFactory = builder.propagationFactory;
this.stringPropagation = builder.propagationFactory.create(Propagation.KeyFactory.STRING);
this.currentTraceContext = builder.currentTraceContext;
this.sampler = builder.sampler;
zipkin2.Endpoint localEndpoint = zipkin2.Endpoint.newBuilder()
.serviceName(builder.localServiceName)
.ip(builder.localIp)
.port(builder.localPort)
.build();
SpanReporter reporter = new SpanReporter(localEndpoint, builder.reporter, noop);
this.tracer = new Tracer(
builder.clock,
builder.propagationFactory,
reporter,
new PendingSpans(localEndpoint, clock, reporter, noop),
builder.sampler,
builder.errorParser,
builder.currentTraceContext,
builder.traceId128Bit || propagationFactory.requires128BitTraceId(),
builder.supportsJoin && propagationFactory.supportsJoin(),
noop
);
maybeSetCurrent();
}
從上面可以看到饮焦,主要干的工作有:
- 根據(jù)Spanreporter,生成FinishedSpanHandler怕吴,發(fā)送Span用;
- 根據(jù)FinishedSpanHandler以及其他默認(rèn)信息生成Tracer;
OK县踢,現(xiàn)在對于DEMO中的Brave的上報(bào)數(shù)據(jù)流程和原理是不是清楚了不少转绷!
鏈路上報(bào)總結(jié)
Zipkin鏈路上報(bào)看起來很復(fù)雜,其實(shí)剝離各種封裝硼啤,去除各種組件议经,其主線邏輯就是如下三步:
- 構(gòu)造span對象,包括traceId谴返,parentId,以及其自身的spanId等參數(shù)煞肾;
- 選一種編解碼協(xié)議,比如JSON嗓袱,或者THRIF,或者PROTO3對Span進(jìn)行編碼籍救;
- 將編碼后的span,通過利用一種傳輸協(xié)議上報(bào)到服務(wù)端渠抹;
還有一塊未分析:Brave是如何Support各個(gè)組件的蝙昙。因?yàn)楸疚膬?nèi)容較多,放到以后的文章分析逼肯!
后記
本文為我的調(diào)用鏈系列文章之一耸黑,已有文章如下:
OK,ZIPKIN 鏈路上報(bào)分析到此為止篮幢,祝大家工作順利大刊,天天開心!