zipkin 數(shù)據(jù)上報(bào)的研究

前言:ZIPKIN作為當(dāng)前“分布式服務(wù)鏈路跟蹤”問(wèn)題的流行解決方案之一,正在被越來(lái)越多的公司和個(gè)人學(xué)習(xí)使用。其中很重要的一塊,就是上報(bào)鏈路數(shù)據(jù)秃臣。那么知道服務(wù)端如何接收數(shù)據(jù),以及我們?cè)撛鯓由蠄?bào)數(shù)據(jù)到服務(wù)端就顯得十分重要哪工。雖然ZIPKIN官方也開(kāi)源了一個(gè)客戶端Brave奥此,但是本文卻并不想直接介紹Brave弧哎,而是想站在一個(gè)從零開(kāi)發(fā)ZIPKIN客戶端的角度,一層層分析解決如何自己寫(xiě)一個(gè)ZIPKIN客戶端得院,直到最后引出Brave傻铣。本文最終想達(dá)到的效果,就是希望通過(guò)本文祥绞,能讓大家對(duì)ZIPKIN的鏈路上報(bào)有一個(gè)詳盡的理解和認(rèn)識(shí)非洲。

零行代碼快速開(kāi)始

ZIPKIN是Spring Boot服務(wù),因此啟動(dòng)起來(lái)十分方便蜕径,直接運(yùn)行ZIPKIN JAR包就可以了两踏。ZIPKIN JAR包我們可以自行編譯ZIPKIN源碼獲得,也可以從下面的倉(cāng)庫(kù)獲取兜喻,這個(gè)倉(cāng)庫(kù)專門(mén)存放ZIPKIN各種編譯好的JAR包梦染,倉(cāng)庫(kù)地址為:

https://dl.bintray.com/openzipkin/maven/io/zipkin/java/

進(jìn)入zipkin-server目錄下載server jar包,比如下載2.11.5版本朴皆,運(yùn)行jar包:

java -jar zipkin-server-2.11.5-exec.jar

這樣本地就起了一個(gè)ZIPKIN服務(wù)帕识,瀏覽器中輸入http://localhost:9411/zipkin/,即可打開(kāi)ZIPKIN首頁(yè)遂铡,效果如下:

image

接下來(lái)肮疗,我們開(kāi)始上報(bào)數(shù)據(jù),現(xiàn)在我不想寫(xiě)任何代碼扒接,那么就用postman發(fā)起一次post請(qǐng)求上報(bào)數(shù)據(jù)吧:

image

上報(bào)后在本地ZIPKIN服務(wù)上即可看到剛上報(bào)的數(shù)據(jù)效果:

image

沒(méi)有任何一句代碼伪货,一個(gè)完整的ZIPKIN數(shù)據(jù)上報(bào)存儲(chǔ)展示就完成了,那么我們來(lái)思考下:

1钾怔、如果自己寫(xiě)一個(gè)上ZIPKIN客戶端碱呼,該如何寫(xiě)?

分析:要上傳數(shù)據(jù)給服務(wù)端宗侦,那么必須要搞清楚愚臀,ZIPKIN服務(wù)端可以接受什么樣格式的數(shù)據(jù)?支持的編碼解碼協(xié)議是什么矾利?還有支持那些通信傳輸協(xié)議懊悯?

Task: 了解了服務(wù)怎么接收數(shù)據(jù)后,客戶端的task也就明確了:
1)以ZIPKIN服務(wù)端支持的數(shù)據(jù)格式組織數(shù)據(jù)梦皮;
2)以ZIPKIN服務(wù)端支持的編解碼協(xié)議編碼數(shù)據(jù);
3)以ZIPKIN服務(wù)端支持的傳輸協(xié)議上報(bào)數(shù)據(jù)桃焕。

2剑肯、基礎(chǔ)ZIPKIN客戶端完成后,如何適配各種組件观堂?

描述:一個(gè)服務(wù)通常涉及多個(gè)組件让网,包括服務(wù)框架呀忧,消息中間件,各種數(shù)據(jù)庫(kù)等溃睹,那么怎么上報(bào)這些組件的數(shù)據(jù)給服務(wù)端了而账,值得我們思考?

分析:其實(shí)最簡(jiǎn)單的方法就是采用一個(gè)裝飾者模式包裝一下組件接口因篇,在其中加入上報(bào)的邏輯泞辐,但是這種方法局限很大拳昌,不靈活称鳞。除此之外,我們或許我們可以利用下攔截器技術(shù)咳蔚,AOP技術(shù)商佑,以及Agent锯茄,探針等技術(shù)做到無(wú)侵入上報(bào)數(shù)據(jù)。

服務(wù)端如何接收數(shù)據(jù)的茶没?

要想自己寫(xiě)一個(gè)ZIPKIN客戶端肌幽,必須搞清楚ZIPKIN服務(wù)端是怎么接收數(shù)據(jù)的,包括數(shù)據(jù)格式協(xié)議是怎樣的抓半?編解碼協(xié)議是怎樣的喂急?還有支持那么傳輸協(xié)議?

ZIPKIN支持的數(shù)據(jù)格式是怎樣的琅关?

ZIPKIN是兼容OpenTracing標(biāo)準(zhǔn)的煮岁,OpenTracing規(guī)定,每個(gè)Span包含如下?tīng)顟B(tài):

  • 操作名稱
  • 起始時(shí)間
  • 結(jié)束時(shí)間
  • 一組 KV 值涣易,作為階段的標(biāo)簽(Span Tags)
  • 階段日志(Span Logs)
  • 階段上下文(SpanContext)画机,其中包含 Trace ID 和 Span ID
  • 引用關(guān)系(References)

OpenTracing規(guī)范與zipkin對(duì)應(yīng)關(guān)系如下:

OpenTracing ZIPKIN
操作名稱 name
起始時(shí)間 timestamp
結(jié)束時(shí)間 timestamp+duration
標(biāo)簽 tags
Span上下文 traceId; id
引用關(guān)系 parentId
Span日志 無(wú)

除此之外,ZIPKIN SAPN還增加了其他幾個(gè)字端:

字端 描述
Kind kind Spanl類型新症,比如是Server還是Client
Annotaion 表示某個(gè)時(shí)間點(diǎn)發(fā)生的Event,Event類型:cs:Client Send 請(qǐng)求步氏;sr:Server Receive到請(qǐng)求;ss:Server 處理完成徒爹、并Send Response荚醒;cr:Client Receive 到響應(yīng)
Endpoint localEndpoint 描述本地服務(wù)信息,比如服務(wù)名隆嗅,ip等界阁,方便根據(jù)服務(wù)名檢索鏈路
Endpoint remoteEndpoint RPC調(diào)用時(shí),描述遠(yuǎn)程服務(wù)的服務(wù)信息

最終胖喳,包含兼容OpenTracing標(biāo)準(zhǔn)的字端泡躯,以及其本身的一些字端后,zipkin的span數(shù)據(jù)字端如下:

image

了解了ZIPKIN的數(shù)據(jù)字端格式后,我們?cè)倏纯碯IPKIN支持的編解碼協(xié)議较剃。

ZIPKIN支持那些編解碼協(xié)議咕别?

Zipkin主要支持三種編解碼協(xié)議,分別為JSON, PROTO3, THRIFT写穴。

JSON編解碼如下:

JSON_V1 {
    public Encoding encoding() {
        return Encoding.JSON;
    }

    public boolean decode(byte[] bytes, Collection<Span> out) {
        Span result = this.decodeOne(bytes);
        if (result == null) {
            return false;
        } else {
            out.add(result);
            return true;
        }
    }

    public boolean decodeList(byte[] spans, Collection<Span> out) {
        return (new V1JsonSpanReader()).readList(spans, out);
    }

    public Span decodeOne(byte[] span) {
        V1Span v1 = (V1Span)JsonCodec.readOne(new V1JsonSpanReader(), span);
        List<Span> out = new ArrayList(1);
        V1SpanConverter.create().convert(v1, out);
        return (Span)out.get(0);
    }

    public List<Span> decodeList(byte[] spans) {
        return decodeList(this, spans);
    }
}

JSON_V2 {
    public Encoding encoding() {
        return Encoding.JSON;
    }

    public boolean decode(byte[] span, Collection<Span> out) {
        return JsonCodec.read(new V2SpanReader(), span, out);
    }

    public boolean decodeList(byte[] spans, Collection<Span> out) {
        return JsonCodec.readList(new V2SpanReader(), spans, out);
    }

    @Nullable
    public Span decodeOne(byte[] span) {
        return (Span)JsonCodec.readOne(new V2SpanReader(), span);
    }

    public List<Span> decodeList(byte[] spans) {
        return decodeList(this, spans);
    }
}

THRIFT編解碼如下:

THRIFT {
    public Encoding encoding() {
        return Encoding.THRIFT;
    }

    public boolean decode(byte[] span, Collection<Span> out) {
        return ThriftCodec.read(span, out);
    }

    public boolean decodeList(byte[] spans, Collection<Span> out) {
        return ThriftCodec.readList(spans, out);
    }

    public Span decodeOne(byte[] span) {
        return ThriftCodec.readOne(span);
    }

    public List<Span> decodeList(byte[] spans) {
        return decodeList(this, spans);
    }
}

PROTO3編解碼如下:

PROTO3 {
    public Encoding encoding() {
        return Encoding.PROTO3;
    }

    public boolean decode(byte[] span, Collection<Span> out) {
        return Proto3Codec.read(span, out);
    }

    public boolean decodeList(byte[] spans, Collection<Span> out) {
        return Proto3Codec.readList(spans, out);
    }

    @Nullable
    public Span decodeOne(byte[] span) {
        return Proto3Codec.readOne(span);
    }

    public List<Span> decodeList(byte[] spans) {
        return decodeList(this, spans);
    }
}

其中Json是默認(rèn)支持的惰拱,也是使用起來(lái)最方便的,除此之外啊送,還包括Thirft和Proto3可供開(kāi)發(fā)者選擇偿短。

ZIPKIN支持那些傳輸協(xié)議?

Zipkin默認(rèn)支持Http協(xié)議删掀,除此之外翔冀,它還支持kafka,rabbitmq以及scribe協(xié)議:

image

他們的初始化過(guò)程如下:

image

傳輸協(xié)議支持的編解碼協(xié)議如下:

image

其中Scribe限定了只支持Thirft協(xié)議,而HTTP披泪、Kafka和RabbitMQ則是三種協(xié)議都支持纤子。

如何做到支持所有的編碼解碼協(xié)議了?ZIPKIN中提供了一個(gè)自動(dòng)探測(cè)編解碼的類SpanBytesDecoderDetector款票,其中核心方法如下:

static BytesDecoder<Span> detectDecoder(byte[] bytes) {
    if (bytes[0] <= 16) { // binary format
    if (protobuf3(bytes)) return SpanBytesDecoder.PROTO3;
        return SpanBytesDecoder.THRIFT; /* the first byte is the TType, in a range 0-16 */
    } else if (bytes[0] != '[' && bytes[0] != '{') {
        throw new IllegalArgumentException("Could not detect the span format");
    }
    if (contains(bytes, ENDPOINT_FIELD_SUFFIX)) return SpanBytesDecoder.JSON_V2;
    if (contains(bytes, TAGS_FIELD)) return SpanBytesDecoder.JSON_V2;
    return SpanBytesDecoder.JSON_V1;
}

現(xiàn)在我們已經(jīng)知道了zipkin服務(wù)接收的數(shù)據(jù)格式以及編解碼協(xié)議和傳輸協(xié)議控硼,那么接下來(lái)就可以寫(xiě)一個(gè)客戶端了!

自己寫(xiě)一個(gè)ZIPKIN客戶端

對(duì)于服務(wù)端如何接收數(shù)據(jù)艾少,有了一個(gè)全面的認(rèn)識(shí)后卡乾,我們就可以著手開(kāi)始寫(xiě)一個(gè)ZIPKIN客戶端了。

那么缚够,首先定義客戶端上報(bào)的數(shù)據(jù)格式幔妨,最簡(jiǎn)單的方式就是定義一個(gè)跟ZIPKIN服務(wù)端一樣數(shù)據(jù)格式的Span就可以了:

@Setter
@Getter
public static class MySapn {
    private String traceId;
    private String parentId;
    private String id;
    private String name;
    private long timestamp;
    private long duration;
    private Map<String, String> tags;
    String kind;
    Endpoint localEndpoint, remoteEndpoint;

    public static enum Kind {
        CLIENT,
        SERVER,
        PRODUCER,
        CONSUMER
    }

    public static class Endpoint {
        String serviceName, ipv4, ipv6;
        byte[] ipv4Bytes, ipv6Bytes;
        int port; // zero means null

        public Endpoint(String serviceName) {
            this.serviceName = serviceName;
        }
    }
}

數(shù)據(jù)格式確定后,接著就編碼數(shù)據(jù)谍椅,ZIPKIN支持三種編碼方式误堡,JSON、THIFT和PROTO3雏吭,為了簡(jiǎn)單方便锁施,我們選擇JSON協(xié)議編碼Span數(shù)據(jù)。注意杖们,ZIPKIN JSON字符串前后需要加括號(hào)悉抵。

數(shù)據(jù)編碼后,接著上報(bào)數(shù)據(jù)摘完,ZIPKIN默認(rèn)支持HTTP協(xié)議方式姥饰,JAVA HTTP請(qǐng)求包很多,我們隨便選擇一種孝治,比如選擇Apach的HttpClient jar包列粪,代碼如下:

public class App {

    private static final String serverUrl = "http://localhost:9411/api/v2/spans";

    public static void main(String[] args) {
        MySapn span = new MySapn();
        span.traceId = "1ae1e4f435814744";
        span.parentId = "1ae1e4f435814744";
        span.id = "d1ab9cd2c50d13d1";
        span.kind = MySapn.Kind.SERVER.toString();
        span.name = "my client test";
        span.timestamp = 1565933251470428L;
        span.duration = 8286;
        span.localEndpoint = new MySapn.Endpoint("My client");
        Map<String, String> tags = new HashMap<>();
        tags.put("name", "pioneeryi");
        tags.put("lover", "dandan");
        span.tags = tags;

        doPost(serverUrl, span);
        System.out.println("Hello World!");
    }

    public static void doPost(String url, MySapn span) {
        try {
            HttpClient httpClient = new DefaultHttpClient();

            HttpPost post = new HttpPost(url);
            post.setHeader("Content-Type", "application/json");
            post.setHeader("charset", "UTF-8");

            String body = new Gson().toJson(span);
            body = "[" + body + "]";
            System.out.print(body);

            StringEntity entity = new StringEntity(body);
            post.setEntity(entity);

            HttpResponse httpResponse = httpClient.execute(post);
            System.out.print(httpResponse);
        } catch (Exception exception) {
            System.out.print("do post request fail");
        }
    }
}

maven pom如下:

<dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpclient</artifactId>
    <version>4.5.6</version>
</dependency>

<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
    <version>2.8.5</version>
</dependency>

運(yùn)行上面main方法栅螟,即可上報(bào)Span數(shù)據(jù),打開(kāi)zipkin首頁(yè):http://localhost:9411/zipkin/篱竭,搜索剛上報(bào)的Span的TraceId,展示效果如下:

image

一個(gè)最簡(jiǎn)單的步绸,采用JSON編碼數(shù)據(jù)掺逼,HTTP協(xié)議上傳數(shù)據(jù)的客戶端我們就完成了。為了用戶調(diào)用方便瓤介,我們可以將上面的代碼封裝為一個(gè)接口供用戶使用:

public void reportSpan(MySpan span)

但是這個(gè)太簡(jiǎn)陋了吕喘,我們只支持了一種一個(gè)編解碼協(xié)議,一種傳輸協(xié)議刑桑,開(kāi)始優(yōu)化:

優(yōu)化一:支持多種編解碼氯质,支持多種傳輸協(xié)議。這個(gè)時(shí)候就需要定一個(gè)上報(bào)接口類了祠斧,根據(jù)不同傳輸協(xié)議提供不同實(shí)現(xiàn)闻察。編解碼也是同樣的,定義編碼接口琢锋,根據(jù)不同編碼協(xié)議提供不同實(shí)現(xiàn)辕漂。

此外,當(dāng)前這個(gè)客戶端是同步上報(bào)的吴超,性能很差钉嘹,因此必須改成異步上報(bào),接著優(yōu)化:

優(yōu)化二:上報(bào)改成異步上報(bào)鲸阻,隊(duì)列+線程跋涣。

有了這兩步優(yōu)化后,client大體框架就初步成型了鸟悴,寫(xiě)好了客戶端后陈辱,怎么適配各個(gè)組件,做到無(wú)侵入上報(bào)也很重要遣臼,繼續(xù)優(yōu)化:

優(yōu)化三:適配各個(gè)組件性置,比如Spring Boot,Kafak,MySql等等揍堰。

一個(gè)完整的Client鹏浅,還是有很多工作要做的,這里咱就不繼續(xù)深入優(yōu)化開(kāi)發(fā)了屏歹,直接看看官方的Brave怎么做的隐砸!

探究ZIPKIN客戶端Brave

為了說(shuō)明Brave的使用和上報(bào)過(guò)程,我們先寫(xiě)一個(gè)很簡(jiǎn)單的上報(bào)Demo蝙眶,進(jìn)行演示季希。Demo將上報(bào)一個(gè)“一父Span,兩個(gè)子Span“的鏈路褪那,demo如下:

public class TraceDemo {
    public static void main(String[] args) {
        Sender sender = OkHttpSender.create("http://localhost:9411/api/v2/spans");
        AsyncReporter asyncReporter = AsyncReporter.create(sender);
        Tracing tracing = Tracing.newBuilder()
            .localServiceName("my-service")
            .spanReporter(asyncReporter)
            .build();
        Tracer tracer = tracing.tracer();

        Span parentSpan = tracer.newTrace().name("parent span").start();

        Span childSpan1 = tracer.newChild(parentSpan.context()).name("child span1").start();
        sleep(500);
        childSpan1.finish();

        Span childSpan2 = tracer.newChild(parentSpan.context()).name("child span2").start();
        sleep(500);
        childSpan2.finish();

        parentSpan.finish();
    }
}

其中 maven pom 引入如下包:

<dependency>
    <groupId>io.zipkin.brave</groupId>
    <artifactId>brave</artifactId>
    <version>5.3.3</version>
</dependency>

<dependency>
    <groupId>io.zipkin.reporter2</groupId>
    <artifactId>zipkin-sender-okhttp3</artifactId>
    <version>2.6.0</version>
</dependency>

啟動(dòng)zipkin 服務(wù):

java -jar zipkin-server-2.11.5-exec.jar

然后在瀏覽器中輸入:http://localhost:9411 ,即可打開(kāi)zipkin首頁(yè)式塌,看到示例代碼上報(bào)的效果圖如下:

image

一個(gè)最簡(jiǎn)單的但是卻是很完整的一個(gè)ZIPKIN鏈路上報(bào)演示博敬,就如上面Demo所示,那么接下來(lái)分析一下整個(gè)鏈路的上報(bào)過(guò)程峰尝!

鏈路上報(bào)解析

自己開(kāi)發(fā)一個(gè)客戶端時(shí)偏窝,我們首先會(huì)封裝一個(gè)Span類,Brave也不例外武学,它也定義了Span數(shù)據(jù)結(jié)構(gòu)祭往;那么定義好Span后,誰(shuí)來(lái)負(fù)責(zé)構(gòu)造Span了火窒?

Brave中定義了一個(gè)類叫Tracer來(lái)完成構(gòu)造Span的工作硼补;Brave生成好了Span后,此時(shí)需要編碼發(fā)送了熏矿,那么誰(shuí)又來(lái)發(fā)送了已骇?

Brave定義了Reporter組件,它支持異步發(fā)送以及多傳輸協(xié)議以及多編碼協(xié)議發(fā)送Span數(shù)據(jù)曲掰。

看起來(lái)疾捍,各個(gè)組件已經(jīng)完備了,組件有點(diǎn)多栏妖!此時(shí)需要那么一個(gè)人將這些組件組織起來(lái)乱豆,同時(shí)與服務(wù)端取得聯(lián)系,開(kāi)始打通整個(gè)流程了吊趾,這就是Tracing類的功能宛裕。

接下來(lái)詳細(xì)講解一下各個(gè)組件的功能,并根據(jù)Demo代碼论泛,將各個(gè)組件串起來(lái)揩尸,最終梳理清楚Brave上報(bào)的流程和原理。

Brave Span

Brave中Span相關(guān)類有:SpanCustomizer屁奏、NoopSpanCustomizer岩榆、CurrentSpanCustomizer、RealSpanCustomizer坟瓢、Span勇边、NoopSpan、RealSpan折联。

示例代碼中粒褒,關(guān)于span的操作如下:

Span span = tracer.newTrace().name("encode").start();
try {
  doSomethingExpensive();
} finally {
  span.finish();
}

首先通過(guò)tracer生成一個(gè)span,最后诚镰,調(diào)用span.finish()上報(bào)奕坟,接下來(lái)就來(lái)看看span祥款,以及這個(gè)finish干了什么。

咱們用的Span的實(shí)現(xiàn)子類為RealSpan月杉,RealSpan兩個(gè)核心方法start, finish:

@Override 
public Span start() {
    return start(clock.currentTimeMicroseconds());
}

@Override 
public Span start(long timestamp) {
    synchronized (state) {
        state.startTimestamp(timestamp);
    }
    return this;
}

start方法主要記錄開(kāi)始時(shí)間刃跛,接下來(lái)看看finish 方法:

@Override 
public void finish(long timestamp) {
    if (!pendingSpans.remove(context)) return;
    synchronized (state) {
        state.finishTimestamp(timestamp);
    }
    finishedSpanHandler.handle(context, state);
}

這里交給FinishedSpanHandler來(lái)處理。FinishedSpanHandler是一個(gè)抽象類苛萎,他有如下子類實(shí)現(xiàn):

  • ZipkinFinishedSpanHandler
  • MetricsFinishedSpanHandler
  • NoopAwareFinishedSpan
  • CompositeFinishedSpanHandler

我們主要關(guān)注ZipkinFinishedSpanHandler實(shí)現(xiàn):

public final class ZipkinFinishedSpanHandler extends FinishedSpanHandler {
    final Reporter<zipkin2.Span> spanReporter;
    final MutableSpanConverter converter;

    public ZipkinFinishedSpanHandler(Reporter<zipkin2.Span> spanReporter,
    ErrorParser errorParser, String serviceName, String ip, int port) {
        this.spanReporter = spanReporter;
        this.converter = new MutableSpanConverter(errorParser, serviceName, ip, port);
    }

    @Override public boolean handle(TraceContext context, MutableSpan span) {
        if (!Boolean.TRUE.equals(context.sampled())) return true;

        Span.Builder builderWithContextData = Span.newBuilder()
            .traceId(context.traceIdString())
            .parentId(context.parentIdString())
            .id(context.spanIdString());
        if (context.debug()) builderWithContextData.debug(true);

        converter.convert(span, builderWithContextData);
        spanReporter.report(builderWithContextData.build());
        return true;
    }

    @Override public String toString() {
        return spanReporter.toString();
    }
}

可見(jiàn)首懈,上面最終是通過(guò)Reporter組件來(lái)上報(bào)數(shù)據(jù)的究履。那么Report是如何上報(bào)的了?

Brave Reporter

因?yàn)樯蠄?bào)組件要支持多種編碼協(xié)議以及多種傳輸協(xié)議泥彤,因此邏輯比較復(fù)雜吟吝,官方專門(mén)建了一個(gè)項(xiàng)目:zipkin-reporter-java

我們首先看看Reporter接口類定義:

public interface Reporter<S> {
    Reporter<Span> NOOP = new Reporter<Span>() {
        @Override public void report(Span span) {
        }

        @Override public String toString() {
        return "NoopReporter{}";
        }
    };
    Reporter<Span> CONSOLE = new Reporter<Span>() {
        @Override public void report(Span span) {
             System.out.println(span.toString());
        }

        @Override public String toString() {
            return "ConsoleReporter{}";
        }
    };

    /**
    * Schedules the span to be sent onto the transport.
    *
    * @param span Span, should not be <code>null</code>.
    */
    void report(S span);
}

Reporter有三個(gè)子類實(shí)現(xiàn),分別是CONSOLE,NOOP同仆,和AsyncReporter俗或。CONSOLE就是直接控制臺(tái)打印出Span數(shù)據(jù)蕴侣,一般用來(lái)調(diào)試差不多辱志;NOOP是一個(gè)空實(shí)現(xiàn)揩懒,啥也不干;AsyncReporter是我們平時(shí)上報(bào)用的Reporter智亮,他提供異步上報(bào)數(shù)據(jù)能力。

AsyncReporter

AsyncReporter is how you actually get spans to zipkin. By default, it waits up to a second before flushes any pending spans out of process via a Sender.

根據(jù)不同協(xié)議,Ayncreporter執(zhí)行相應(yīng)的build邏輯:

public AsyncReporter<Span> build() {
    switch(this.sender.encoding()) {
        case JSON:
            return this.build(SpanBytesEncoder.JSON_V2);
        case PROTO3:
            return this.build(SpanBytesEncoder.PROTO3);
        case THRIFT:
            return this.build(SpanBytesEncoder.THRIFT);
        default:
            throw new UnsupportedOperationException(this.sender.encoding().name());
    }
}

build方法詳細(xì)邏輯颠区,如下:

public <S> AsyncReporter<S> build(BytesEncoder<S> encoder) {
    if (encoder == null) throw new NullPointerException("encoder == null");

    if (encoder.encoding() != sender.encoding()) {
        throw new IllegalArgumentException(String.format(
    "Encoder doesn't match Sender: %s %s", encoder.encoding(), sender.encoding()));
    }

    final BoundedAsyncReporter<S> result = new BoundedAsyncReporter<>(this, encoder);

    if (messageTimeoutNanos > 0) { 
        // Start a thread that flushes the queue in a loop.
        final BufferNextMessage<S> consumer =
        BufferNextMessage.create(encoder.encoding(), messageMaxBytes, messageTimeoutNanos);
        Thread flushThread = threadFactory.newThread(new Flusher<>(result, consumer));
        flushThread.setName("AsyncReporter{" + sender + "}");
        flushThread.setDaemon(true);
        flushThread.start();
    }
    return result;
}

可以看到AsyncReporter的build方法中颅夺,啟動(dòng)了一個(gè)守護(hù)線程flushThread碗啄,一直循環(huán)調(diào)用BoundedAsyncReporter的flush方法:

void flush(BufferNextMessage<S> bundler) {
    if (closed.get()) throw new IllegalStateException("closed");
    pending.drainTo(bundler, bundler.remainingNanos());

    // record after flushing reduces the amount of gauge events vs on doing this on report
    metrics.updateQueuedSpans(pending.count);
    metrics.updateQueuedBytes(pending.sizeInBytes);

    // loop around if we are running, and the bundle isn't full
    // if we are closed, try to send what's pending
    if (!bundler.isReady() && !closed.get()) return;

    // Signal that we are about to send a message of a known size in bytes
    metrics.incrementMessages();
    metrics.incrementMessageBytes(bundler.sizeInBytes());
    ArrayList<byte[]> nextMessage = new ArrayList<>(bundler.count());
    bundler.drain(new SpanWithSizeConsumer<S>() {
        @Override public boolean offer(S next, int nextSizeInBytes) {
            nextMessage.add(encoder.encode(next)); // speculatively add to the pending message
            if (sender.messageSizeInBytes(nextMessage) > messageMaxBytes) {
                // if we overran the message size, remove the encoded message.
                nextMessage.remove(nextMessage.size() - 1);
                return false;
            }
            return true;
        }
    });

    try {
        sender.sendSpans(nextMessage).execute();
    } catch (IOException | RuntimeException | Error t) {
        ......
    }
}

Flush方法的主要邏輯如下:

  • 將隊(duì)列pending中的數(shù)據(jù),提取到nextMessage鏈表中瘫想;
  • 調(diào)用Sender的sendSpans方法国夜,發(fā)送到nextMessage鏈表中的Span數(shù)據(jù)到Zipkin筹裕;

這樣,Reporter即做到了異步發(fā)送抗斤!

Sender

Sender組件完成發(fā)送Span到zipkin服務(wù)端的最后一步,即利用某個(gè)傳輸協(xié)議负拟,將數(shù)據(jù)發(fā)送到zipkin服務(wù)端花吟。

public abstract class Sender extends Component {
    public Sender() {
    }

    public abstract Encoding encoding();

    public abstract int messageMaxBytes();

    public abstract int messageSizeInBytes(List<byte[]> var1);

    public int messageSizeInBytes(int encodedSizeInBytes) {
        return this.messageSizeInBytes(Collections.singletonList(new byte[encodedSizeInBytes]));
    }

    public abstract Call<Void> sendSpans(List<byte[]> var1);
}

其中核心的方法sendSpans是一個(gè)抽象方法键菱,不同傳輸協(xié)議的Sender會(huì)提供具體的實(shí)現(xiàn)邏輯,其子類有:
ActiveMQSender、FakeSender纷闺、KafkaSender、LibthriftSender、OkHttpSender限嫌、RabbitMQSender撒穷、URLConnectionSender。

不同協(xié)議均按照自身協(xié)議規(guī)范執(zhí)行發(fā)送邏輯蛤奥,因?yàn)槲覀兊腄emo中用的是OkHttpSender,所以我們主要看看OkHttpSender是如何實(shí)現(xiàn)的。Demo中使用如下:

Sender sender = OkHttpSender.create("http://localhost:9411/api/v2/spans");
AsyncReporter asyncReporter = AsyncReporter.create(sender);

這里,通過(guò)AsyncReporter.create方法啡省,我們將OkHttpSender注入到了Reporter中,那么接下來(lái)看看OkHttpSender的sendSpans方法實(shí)現(xiàn):

@Override 
public zipkin2.Call<Void> sendSpans(List<byte[]> encodedSpans) {
    if (closeCalled) throw new IllegalStateException("closed");
    Request request;
    try {
        request = newRequest(encoder.encode(encodedSpans));
    } catch (IOException e) {
        throw zipkin2.internal.Platform.get().uncheckedIOException(e);
    }
    return new HttpCall(client.newCall(request));
}

執(zhí)行完這個(gè)方法后纵潦,會(huì)返回一個(gè)HttpCall凳干,Reporter的flush方法中會(huì)調(diào)用HttpCall的execute方法,完成Http請(qǐng)求發(fā)送泌绣。

Brave Tracer

Span數(shù)據(jù)結(jié)構(gòu),包括發(fā)送Span的組件我們搞清楚了,那么誰(shuí)來(lái)負(fù)責(zé)創(chuàng)建Span了?這就是Tracer的工作,他負(fù)責(zé)創(chuàng)建Span及提供Span的各種操作怜庸,主要方法如下表所示:

方法名 描述
Span newTrace() 創(chuàng)建一個(gè)Root Span
Span joinSpan(TraceContext context) 公用一個(gè)SpanId,主要存在于RPC場(chǎng)景中
Span newChild(TraceContext parent) 創(chuàng)建一個(gè)子Span
Span nextSpan(TraceContextOrSamplingFlags extracted) 基于請(qǐng)求的參數(shù)信息創(chuàng)建一個(gè)新的Span
Span toSpan(TraceContext context) 通過(guò)TraceContext創(chuàng)建一個(gè)Span
Span currentSpan() 獲取當(dāng)前Span
Span nextSpan() 基于當(dāng)前Span生成一個(gè)子Span

Brave Tracing

現(xiàn)在Span有了,創(chuàng)建Span的組件有了,發(fā)送Span的組件也有了,那就只需要一個(gè)把他們組合起來(lái)的類似工廠的角色了,那就是Tracing,他的主要工作就是連接服務(wù)器,然后利用Tracer創(chuàng)建出Span,接著發(fā)送Span到zipkin服務(wù)端侦副。

Tracing源碼采用的Builder模式挣棕,再看看我們Demo中創(chuàng)建Tracing的代碼:

Tracing tracing = Tracing.newBuilder()
            .localServiceName("my-service")
            .spanReporter(asyncReporter)
            .build();

我們Tracing.newBuilder()創(chuàng)建了一個(gè)Tracing的Builder细燎,然后指定了這個(gè)Tracing的服務(wù)名偿枕,使用什么Reporter,接著調(diào)用了Builder的build方法,我們看看build方法代碼:

public Tracing build() {
    if (clock == null) clock = Platform.get().clock();
    if (localIp == null) localIp = Platform.get().linkLocalIp();
    if (spanReporter == null) spanReporter = new LoggingReporter();
    return new Default(this);
}

它調(diào)用了Tracing的默認(rèn)實(shí)現(xiàn),默認(rèn)實(shí)現(xiàn)子類如下:

static final class Default extends Tracing {
    final Tracer tracer;
    final Propagation.Factory propagationFactory;
    final Propagation<String> stringPropagation;
    final CurrentTraceContext currentTraceContext;
    final Sampler sampler;
    final Clock clock;
    final ErrorParser errorParser;

    Default(Builder builder) {
      this.clock = builder.clock;
      this.errorParser = builder.errorParser;
      this.propagationFactory = builder.propagationFactory;
      this.stringPropagation = builder.propagationFactory.create(Propagation.KeyFactory.STRING);
      this.currentTraceContext = builder.currentTraceContext;
      this.sampler = builder.sampler;
      zipkin2.Endpoint localEndpoint = zipkin2.Endpoint.newBuilder()
          .serviceName(builder.localServiceName)
          .ip(builder.localIp)
          .port(builder.localPort)
          .build();
      SpanReporter reporter = new SpanReporter(localEndpoint, builder.reporter, noop);
      this.tracer = new Tracer(
          builder.clock,
          builder.propagationFactory,
          reporter,
          new PendingSpans(localEndpoint, clock, reporter, noop),
          builder.sampler,
          builder.errorParser,
          builder.currentTraceContext,
          builder.traceId128Bit || propagationFactory.requires128BitTraceId(),
          builder.supportsJoin && propagationFactory.supportsJoin(),
          noop
      );
      maybeSetCurrent();
    }

從上面可以看到涡相,主要干的工作有:

  • 根據(jù)Spanreporter,生成FinishedSpanHandler育特,發(fā)送Span用烙无;
  • 根據(jù)FinishedSpanHandler以及其他默認(rèn)信息生成Tracer;

OK迂苛,現(xiàn)在對(duì)于DEMO中的Brave的上報(bào)數(shù)據(jù)流程和原理是不是清楚了不少鼓择!

鏈路上報(bào)總結(jié)

Zipkin鏈路上報(bào)看起來(lái)很復(fù)雜三幻,其實(shí)剝離各種封裝,去除各種組件呐能,其主線邏輯就是如下三步:

  • 構(gòu)造span對(duì)象念搬,包括traceId,parentId,以及其自身的spanId等參數(shù)摆出;
  • 選一種編解碼協(xié)議朗徊,比如JSON,或者THRIF,或者PROTO3對(duì)Span進(jìn)行編碼偎漫;
  • 將編碼后的span爷恳,通過(guò)利用一種傳輸協(xié)議上報(bào)到服務(wù)端;

還有一塊未分析:Brave是如何Support各個(gè)組件的象踊。因?yàn)楸疚膬?nèi)容較多温亲,放到以后的文章分析!

后記

本文為我的調(diào)用鏈系列文章之一杯矩,已有文章如下:

轉(zhuǎn)自: http://www.reibang.com/p/17ce989e108e

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末栈虚,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子菊碟,更是在濱河造成了極大的恐慌节芥,老刑警劉巖在刺,帶你破解...
    沈念sama閱讀 219,539評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件逆害,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡蚣驼,警方通過(guò)查閱死者的電腦和手機(jī)魄幕,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,594評(píng)論 3 396
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)颖杏,“玉大人纯陨,你說(shuō)我怎么就攤上這事。” “怎么了翼抠?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,871評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵咙轩,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我阴颖,道長(zhǎng)活喊,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,963評(píng)論 1 295
  • 正文 為了忘掉前任量愧,我火速辦了婚禮钾菊,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘偎肃。我一直安慰自己煞烫,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,984評(píng)論 6 393
  • 文/花漫 我一把揭開(kāi)白布累颂。 她就那樣靜靜地躺著滞详,像睡著了一般。 火紅的嫁衣襯著肌膚如雪喘落。 梳的紋絲不亂的頭發(fā)上茵宪,一...
    開(kāi)封第一講書(shū)人閱讀 51,763評(píng)論 1 307
  • 那天,我揣著相機(jī)與錄音瘦棋,去河邊找鬼稀火。 笑死,一個(gè)胖子當(dāng)著我的面吹牛赌朋,可吹牛的內(nèi)容都是我干的凰狞。 我是一名探鬼主播,決...
    沈念sama閱讀 40,468評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼沛慢,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼赡若!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起团甲,我...
    開(kāi)封第一講書(shū)人閱讀 39,357評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤逾冬,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后躺苦,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體身腻,經(jīng)...
    沈念sama閱讀 45,850評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,002評(píng)論 3 338
  • 正文 我和宋清朗相戀三年匹厘,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了嘀趟。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,144評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡愈诚,死狀恐怖她按,靈堂內(nèi)的尸體忽然破棺而出牛隅,到底是詐尸還是另有隱情,我是刑警寧澤酌泰,帶...
    沈念sama閱讀 35,823評(píng)論 5 346
  • 正文 年R本政府宣布媒佣,位于F島的核電站,受9級(jí)特大地震影響陵刹,放射性物質(zhì)發(fā)生泄漏丈攒。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,483評(píng)論 3 331
  • 文/蒙蒙 一授霸、第九天 我趴在偏房一處隱蔽的房頂上張望巡验。 院中可真熱鬧,春花似錦碘耳、人聲如沸显设。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,026評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)捕捂。三九已至,卻和暖如春斗搞,著一層夾襖步出監(jiān)牢的瞬間指攒,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,150評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工僻焚, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留允悦,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,415評(píng)論 3 373
  • 正文 我出身青樓虑啤,卻偏偏與公主長(zhǎng)得像隙弛,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子狞山,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,092評(píng)論 2 355