深入探究ZIPKIN調(diào)用鏈跟蹤——鏈路上報(bào)篇

前言:ZIPKIN作為當(dāng)前“分布式服務(wù)鏈路跟蹤”問題的流行解決方案之一,正在被越來越多的公司和個(gè)人學(xué)習(xí)使用晾剖。其中很重要的一塊,就是上報(bào)鏈路數(shù)據(jù)梯嗽。那么知道服務(wù)端如何接收數(shù)據(jù)齿尽,以及我們該怎樣上報(bào)數(shù)據(jù)到服務(wù)端就顯得十分重要。雖然ZIPKIN官方也開源了一個(gè)客戶端Brave灯节,但是本文卻并不想直接介紹Brave雕什,而是想站在一個(gè)從零開發(fā)ZIPKIN客戶端的角度,一層層分析解決如何自己寫一個(gè)ZIPKIN客戶端显晶,直到最后引出Brave贷岸。本文最終想達(dá)到的效果,就是希望通過本文磷雇,能讓大家對ZIPKIN的鏈路上報(bào)有一個(gè)詳盡的理解和認(rèn)識偿警。

零行代碼快速開始

ZIPKIN是Spring Boot服務(wù),因此啟動起來十分方便唯笙,直接運(yùn)行ZIPKIN JAR包就可以了螟蒸。ZIPKIN JAR包我們可以自行編譯ZIPKIN源碼獲得,也可以從下面的倉庫獲取崩掘,這個(gè)倉庫專門存放ZIPKIN各種編譯好的JAR包七嫌,倉庫地址為:

https://dl.bintray.com/openzipkin/maven/io/zipkin/java/

進(jìn)入zipkin-server目錄下載server jar包,比如下載2.11.5版本苞慢,運(yùn)行jar包:

java -jar zipkin-server-2.11.5-exec.jar

這樣本地就起了一個(gè)ZIPKIN服務(wù)诵原,瀏覽器中輸入http://localhost:9411/zipkin/,即可打開ZIPKIN首頁,效果如下:

zipkin首頁

接下來绍赛,我們開始上報(bào)數(shù)據(jù)蔓纠,現(xiàn)在我不想寫任何代碼,那么就用postman發(fā)起一次post請求上報(bào)數(shù)據(jù)吧:


POSTMAN 上報(bào)數(shù)據(jù)

上報(bào)后在本地ZIPKIN服務(wù)上即可看到剛上報(bào)的數(shù)據(jù)效果:


上報(bào)數(shù)據(jù)展示

沒有任何一句代碼吗蚌,一個(gè)完整的ZIPKIN數(shù)據(jù)上報(bào)存儲展示就完成了腿倚,那么我們來思考下:

1、如果自己寫一個(gè)上ZIPKIN客戶端蚯妇,該如何寫敷燎?

分析:要上傳數(shù)據(jù)給服務(wù)端,那么必須要搞清楚箩言,ZIPKIN服務(wù)端可以接受什么樣格式的數(shù)據(jù)懈叹?支持的編碼解碼協(xié)議是什么?還有支持那些通信傳輸協(xié)議分扎?

Task: 了解了服務(wù)怎么接收數(shù)據(jù)后澄成,客戶端的task也就明確了:
1)以ZIPKIN服務(wù)端支持的數(shù)據(jù)格式組織數(shù)據(jù);
2)以ZIPKIN服務(wù)端支持的編解碼協(xié)議編碼數(shù)據(jù)畏吓;
3)以ZIPKIN服務(wù)端支持的傳輸協(xié)議上報(bào)數(shù)據(jù)墨状。

2、基礎(chǔ)ZIPKIN客戶端完成后菲饼,如何適配各種組件肾砂?

描述:一個(gè)服務(wù)通常涉及多個(gè)組件,包括服務(wù)框架宏悦,消息中間件镐确,各種數(shù)據(jù)庫等,那么怎么上報(bào)這些組件的數(shù)據(jù)給服務(wù)端了饼煞,值得我們思考源葫?

分析:其實(shí)最簡單的方法就是采用一個(gè)裝飾者模式包裝一下組件接口,在其中加入上報(bào)的邏輯砖瞧,但是這種方法局限很大息堂,不靈活。除此之外块促,我們或許我們可以利用下攔截器技術(shù)荣堰,AOP技術(shù),以及Agent竭翠,探針等技術(shù)做到無侵入上報(bào)數(shù)據(jù)振坚。

服務(wù)端如何接收數(shù)據(jù)的?

要想自己寫一個(gè)ZIPKIN客戶端斋扰,必須搞清楚ZIPKIN服務(wù)端是怎么接收數(shù)據(jù)的渡八,包括數(shù)據(jù)格式協(xié)議是怎樣的啃洋?編解碼協(xié)議是怎樣的?還有支持那么傳輸協(xié)議呀狼?

ZIPKIN支持的數(shù)據(jù)格式是怎樣的?

ZIPKIN是兼容OpenTracing標(biāo)準(zhǔn)的损离,OpenTracing規(guī)定哥艇,每個(gè)Span包含如下狀態(tài):

  • 操作名稱
  • 起始時(shí)間
  • 結(jié)束時(shí)間
  • 一組 KV 值,作為階段的標(biāo)簽(Span Tags)
  • 階段日志(Span Logs)
  • 階段上下文(SpanContext)僻澎,其中包含 Trace ID 和 Span ID
  • 引用關(guān)系(References)

OpenTracing規(guī)范與zipkin對應(yīng)關(guān)系如下:

OpenTracing ZIPKIN
操作名稱 name
起始時(shí)間 timestamp
結(jié)束時(shí)間 timestamp+duration
標(biāo)簽 tags
Span上下文 traceId; id
引用關(guān)系 parentId
Span日志

除此之外貌踏,ZIPKIN SAPN還增加了其他幾個(gè)字端:

字端 描述
Kind kind Spanl類型,比如是Server還是Client
Annotaion 表示某個(gè)時(shí)間點(diǎn)發(fā)生的Event,Event類型:cs:Client Send 請求窟勃;sr:Server Receive到請求祖乳;ss:Server 處理完成、并Send Response秉氧;cr:Client Receive 到響應(yīng)
Endpoint localEndpoint 描述本地服務(wù)信息眷昆,比如服務(wù)名,ip等汁咏,方便根據(jù)服務(wù)名檢索鏈路
Endpoint remoteEndpoint RPC調(diào)用時(shí)亚斋,描述遠(yuǎn)程服務(wù)的服務(wù)信息

最終,包含兼容OpenTracing標(biāo)準(zhǔn)的字端攘滩,以及其本身的一些字端后帅刊,zipkin的span數(shù)據(jù)字端如下:


ZIPKIN Span

了解了ZIPKIN的數(shù)據(jù)字端格式后,我們再看看ZIPKIN支持的編解碼協(xié)議漂问。

ZIPKIN支持那些編解碼協(xié)議赖瞒?

Zipkin主要支持三種編解碼協(xié)議,分別為JSON, PROTO3, THRIFT蚤假。

JSON編解碼如下:

JSON_V1 {
    public Encoding encoding() {
        return Encoding.JSON;
    }

    public boolean decode(byte[] bytes, Collection<Span> out) {
        Span result = this.decodeOne(bytes);
        if (result == null) {
            return false;
        } else {
            out.add(result);
            return true;
        }
    }

    public boolean decodeList(byte[] spans, Collection<Span> out) {
        return (new V1JsonSpanReader()).readList(spans, out);
    }

    public Span decodeOne(byte[] span) {
        V1Span v1 = (V1Span)JsonCodec.readOne(new V1JsonSpanReader(), span);
        List<Span> out = new ArrayList(1);
        V1SpanConverter.create().convert(v1, out);
        return (Span)out.get(0);
    }

    public List<Span> decodeList(byte[] spans) {
        return decodeList(this, spans);
    }
}

JSON_V2 {
    public Encoding encoding() {
        return Encoding.JSON;
    }

    public boolean decode(byte[] span, Collection<Span> out) {
        return JsonCodec.read(new V2SpanReader(), span, out);
    }

    public boolean decodeList(byte[] spans, Collection<Span> out) {
        return JsonCodec.readList(new V2SpanReader(), spans, out);
    }

    @Nullable
    public Span decodeOne(byte[] span) {
        return (Span)JsonCodec.readOne(new V2SpanReader(), span);
    }

    public List<Span> decodeList(byte[] spans) {
        return decodeList(this, spans);
    }
}

THRIFT編解碼如下:

THRIFT {
    public Encoding encoding() {
        return Encoding.THRIFT;
    }

    public boolean decode(byte[] span, Collection<Span> out) {
        return ThriftCodec.read(span, out);
    }

    public boolean decodeList(byte[] spans, Collection<Span> out) {
        return ThriftCodec.readList(spans, out);
    }

    public Span decodeOne(byte[] span) {
        return ThriftCodec.readOne(span);
    }

    public List<Span> decodeList(byte[] spans) {
        return decodeList(this, spans);
    }
}

PROTO3編解碼如下:

PROTO3 {
    public Encoding encoding() {
        return Encoding.PROTO3;
    }

    public boolean decode(byte[] span, Collection<Span> out) {
        return Proto3Codec.read(span, out);
    }

    public boolean decodeList(byte[] spans, Collection<Span> out) {
        return Proto3Codec.readList(spans, out);
    }

    @Nullable
    public Span decodeOne(byte[] span) {
        return Proto3Codec.readOne(span);
    }

    public List<Span> decodeList(byte[] spans) {
        return decodeList(this, spans);
    }
}

其中Json是默認(rèn)支持的栏饮,也是使用起來最方便的,除此之外磷仰,還包括Thirft和Proto3可供開發(fā)者選擇抡爹。

ZIPKIN支持那些傳輸協(xié)議?

Zipkin默認(rèn)支持Http協(xié)議芒划,除此之外冬竟,它還支持kafka,rabbitmq以及scribe協(xié)議:


ZIPKIN支持的協(xié)議

他們的初始化過程如下:


Collector初始化g

傳輸協(xié)議支持的編解碼協(xié)議如下:


傳輸協(xié)議支持的編解碼協(xié)議情況

其中Scribe限定了只支持Thirft協(xié)議,而HTTP民逼、Kafka和RabbitMQ則是三種協(xié)議都支持泵殴。

如何做到支持所有的編碼解碼協(xié)議了?ZIPKIN中提供了一個(gè)自動探測編解碼的類SpanBytesDecoderDetector拼苍,其中核心方法如下:

static BytesDecoder<Span> detectDecoder(byte[] bytes) {
    if (bytes[0] <= 16) { // binary format
    if (protobuf3(bytes)) return SpanBytesDecoder.PROTO3;
        return SpanBytesDecoder.THRIFT; /* the first byte is the TType, in a range 0-16 */
    } else if (bytes[0] != '[' && bytes[0] != '{') {
        throw new IllegalArgumentException("Could not detect the span format");
    }
    if (contains(bytes, ENDPOINT_FIELD_SUFFIX)) return SpanBytesDecoder.JSON_V2;
    if (contains(bytes, TAGS_FIELD)) return SpanBytesDecoder.JSON_V2;
    return SpanBytesDecoder.JSON_V1;
}

現(xiàn)在我們已經(jīng)知道了zipkin服務(wù)接收的數(shù)據(jù)格式以及編解碼協(xié)議和傳輸協(xié)議笑诅,那么接下來就可以寫一個(gè)客戶端了调缨!

自己寫一個(gè)ZIPKIN客戶端

對于服務(wù)端如何接收數(shù)據(jù),有了一個(gè)全面的認(rèn)識后吆你,我們就可以著手開始寫一個(gè)ZIPKIN客戶端了弦叶。

那么,首先定義客戶端上報(bào)的數(shù)據(jù)格式妇多,最簡單的方式就是定義一個(gè)跟ZIPKIN服務(wù)端一樣數(shù)據(jù)格式的Span就可以了:

@Setter
@Getter
public static class MySapn {
    private String traceId;
    private String parentId;
    private String id;
    private String name;
    private long timestamp;
    private long duration;
    private Map<String, String> tags;
    String kind;
    Endpoint localEndpoint, remoteEndpoint;

    public static enum Kind {
        CLIENT,
        SERVER,
        PRODUCER,
        CONSUMER
    }

    public static class Endpoint {
        String serviceName, ipv4, ipv6;
        byte[] ipv4Bytes, ipv6Bytes;
        int port; // zero means null

        public Endpoint(String serviceName) {
            this.serviceName = serviceName;
        }
    }
}

數(shù)據(jù)格式確定后伤哺,接著就編碼數(shù)據(jù),ZIPKIN支持三種編碼方式者祖,JSON立莉、THIFT和PROTO3,為了簡單方便七问,我們選擇JSON協(xié)議編碼Span數(shù)據(jù)。注意械巡,ZIPKIN JSON字符串前后需要加括號刹淌。

數(shù)據(jù)編碼后,接著上報(bào)數(shù)據(jù)讥耗,ZIPKIN默認(rèn)支持HTTP協(xié)議方式芦鳍,JAVA HTTP請求包很多,我們隨便選擇一種葛账,比如選擇Apach的HttpClient jar包柠衅,代碼如下:

public class App {

    private static final String serverUrl = "http://localhost:9411/api/v2/spans";

    public static void main(String[] args) {
        MySapn span = new MySapn();
        span.traceId = "1ae1e4f435814744";
        span.parentId = "1ae1e4f435814744";
        span.id = "d1ab9cd2c50d13d1";
        span.kind = MySapn.Kind.SERVER.toString();
        span.name = "my client test";
        span.timestamp = 1565933251470428L;
        span.duration = 8286;
        span.localEndpoint = new MySapn.Endpoint("My client");
        Map<String, String> tags = new HashMap<>();
        tags.put("name", "pioneeryi");
        tags.put("lover", "dandan");
        span.tags = tags;

        doPost(serverUrl, span);
        System.out.println("Hello World!");
    }

    public static void doPost(String url, MySapn span) {
        try {
            HttpClient httpClient = new DefaultHttpClient();

            HttpPost post = new HttpPost(url);
            post.setHeader("Content-Type", "application/json");
            post.setHeader("charset", "UTF-8");

            String body = new Gson().toJson(span);
            body = "[" + body + "]";
            System.out.print(body);

            StringEntity entity = new StringEntity(body);
            post.setEntity(entity);

            HttpResponse httpResponse = httpClient.execute(post);
            System.out.print(httpResponse);
        } catch (Exception exception) {
            System.out.print("do post request fail");
        }
    }
}

maven pom如下:

<dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpclient</artifactId>
    <version>4.5.6</version>
</dependency>

<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
    <version>2.8.5</version>
</dependency>

運(yùn)行上面main方法,即可上報(bào)Span數(shù)據(jù)籍琳,打開zipkin首頁:http://localhost:9411/zipkin/菲宴,搜索剛上報(bào)的Span的TraceId,展示效果如下:

上報(bào)數(shù)據(jù)展示

一個(gè)最簡單的趋急,采用JSON編碼數(shù)據(jù)喝峦,HTTP協(xié)議上傳數(shù)據(jù)的客戶端我們就完成了。為了用戶調(diào)用方便呜达,我們可以將上面的代碼封裝為一個(gè)接口供用戶使用:

public void reportSpan(MySpan span)

但是這個(gè)太簡陋了谣蠢,我們只支持了一種一個(gè)編解碼協(xié)議,一種傳輸協(xié)議查近,開始優(yōu)化:

優(yōu)化一:支持多種編解碼眉踱,支持多種傳輸協(xié)議。這個(gè)時(shí)候就需要定一個(gè)上報(bào)接口類了霜威,根據(jù)不同傳輸協(xié)議提供不同實(shí)現(xiàn)谈喳。編解碼也是同樣的,定義編碼接口戈泼,根據(jù)不同編碼協(xié)議提供不同實(shí)現(xiàn)婿禽。

此外赏僧,當(dāng)前這個(gè)客戶端是同步上報(bào)的,性能很差扭倾,因此必須改成異步上報(bào)淀零,接著優(yōu)化:

優(yōu)化二:上報(bào)改成異步上報(bào),隊(duì)列+線程膛壹。

有了這兩步優(yōu)化后驾中,client大體框架就初步成型了,寫好了客戶端后恢筝,怎么適配各個(gè)組件哀卫,做到無侵入上報(bào)也很重要巨坊,繼續(xù)優(yōu)化:

優(yōu)化三:適配各個(gè)組件撬槽,比如Spring Boot,Kafak,MySql等等趾撵。

一個(gè)完整的Client侄柔,還是有很多工作要做的,這里咱就不繼續(xù)深入優(yōu)化開發(fā)了占调,直接看看官方的Brave怎么做的暂题!

探究ZIPKIN客戶端Brave

為了說明Brave的使用和上報(bào)過程,我們先寫一個(gè)很簡單的上報(bào)Demo究珊,進(jìn)行演示薪者。Demo將上報(bào)一個(gè)“一父Span,兩個(gè)子Span“的鏈路,demo如下:

public class TraceDemo {
    public static void main(String[] args) {
        Sender sender = OkHttpSender.create("http://localhost:9411/api/v2/spans");
        AsyncReporter asyncReporter = AsyncReporter.create(sender);
        Tracing tracing = Tracing.newBuilder()
            .localServiceName("my-service")
            .spanReporter(asyncReporter)
            .build();
        Tracer tracer = tracing.tracer();

        Span parentSpan = tracer.newTrace().name("parent span").start();
        
        Span childSpan1 = tracer.newChild(parentSpan.context()).name("child span1").start();
        sleep(500);
        childSpan1.finish();

        Span childSpan2 = tracer.newChild(parentSpan.context()).name("child span2").start();
        sleep(500);
        childSpan2.finish();
        
        parentSpan.finish();
    }
}

其中 maven pom 引入如下包:

<dependency>
    <groupId>io.zipkin.brave</groupId>
    <artifactId>brave</artifactId>
    <version>5.3.3</version>
</dependency>

<dependency>
    <groupId>io.zipkin.reporter2</groupId>
    <artifactId>zipkin-sender-okhttp3</artifactId>
    <version>2.6.0</version>
</dependency>

啟動zipkin 服務(wù):

java -jar zipkin-server-2.11.5-exec.jar

然后在瀏覽器中輸入:http://localhost:9411 剿涮,即可打開zipkin首頁言津,看到示例代碼上報(bào)的效果圖如下:

上報(bào)數(shù)據(jù)展示

一個(gè)最簡單的但是卻是很完整的一個(gè)ZIPKIN鏈路上報(bào)演示,就如上面Demo所示取试,那么接下來分析一下整個(gè)鏈路的上報(bào)過程悬槽!

鏈路上報(bào)解析

自己開發(fā)一個(gè)客戶端時(shí),我們首先會封裝一個(gè)Span類瞬浓,Brave也不例外初婆,它也定義了Span數(shù)據(jù)結(jié)構(gòu);那么定義好Span后猿棉,誰來負(fù)責(zé)構(gòu)造Span了磅叛?

Brave中定義了一個(gè)類叫Tracer來完成構(gòu)造Span的工作;Brave生成好了Span后萨赁,此時(shí)需要編碼發(fā)送了宪躯,那么誰又來發(fā)送了?

Brave定義了Reporter組件位迂,它支持異步發(fā)送以及多傳輸協(xié)議以及多編碼協(xié)議發(fā)送Span數(shù)據(jù)访雪。

看起來详瑞,各個(gè)組件已經(jīng)完備了,組件有點(diǎn)多臣缀!此時(shí)需要那么一個(gè)人將這些組件組織起來坝橡,同時(shí)與服務(wù)端取得聯(lián)系,開始打通整個(gè)流程了精置,這就是Tracing類的功能计寇。

接下來詳細(xì)講解一下各個(gè)組件的功能,并根據(jù)Demo代碼脂倦,將各個(gè)組件串起來番宁,最終梳理清楚Brave上報(bào)的流程和原理。

Brave Span

Brave中Span相關(guān)類有:SpanCustomizer赖阻、NoopSpanCustomizer蝶押、CurrentSpanCustomizer、RealSpanCustomizer火欧、Span棋电、NoopSpan、RealSpan苇侵。

示例代碼中赶盔,關(guān)于span的操作如下:

Span span = tracer.newTrace().name("encode").start();
try {
  doSomethingExpensive();
} finally {
  span.finish();
}

首先通過tracer生成一個(gè)span,最后榆浓,調(diào)用span.finish()上報(bào)于未,接下來就來看看span杨名,以及這個(gè)finish干了什么蓉冈。

咱們用的Span的實(shí)現(xiàn)子類為RealSpan,RealSpan兩個(gè)核心方法start, finish:

@Override 
public Span start() {
    return start(clock.currentTimeMicroseconds());
}

@Override 
public Span start(long timestamp) {
    synchronized (state) {
        state.startTimestamp(timestamp);
    }
    return this;
}

start方法主要記錄開始時(shí)間填渠,接下來看看finish 方法:

@Override 
public void finish(long timestamp) {
    if (!pendingSpans.remove(context)) return;
    synchronized (state) {
        state.finishTimestamp(timestamp);
    }
    finishedSpanHandler.handle(context, state);
}

這里交給FinishedSpanHandler來處理杉适。FinishedSpanHandler是一個(gè)抽象類谎倔,他有如下子類實(shí)現(xiàn):

  • ZipkinFinishedSpanHandler
  • MetricsFinishedSpanHandler
  • NoopAwareFinishedSpan
  • CompositeFinishedSpanHandler

我們主要關(guān)注ZipkinFinishedSpanHandler實(shí)現(xiàn):

public final class ZipkinFinishedSpanHandler extends FinishedSpanHandler {
    final Reporter<zipkin2.Span> spanReporter;
    final MutableSpanConverter converter;

    public ZipkinFinishedSpanHandler(Reporter<zipkin2.Span> spanReporter,
    ErrorParser errorParser, String serviceName, String ip, int port) {
        this.spanReporter = spanReporter;
        this.converter = new MutableSpanConverter(errorParser, serviceName, ip, port);
    }

    @Override public boolean handle(TraceContext context, MutableSpan span) {
        if (!Boolean.TRUE.equals(context.sampled())) return true;

        Span.Builder builderWithContextData = Span.newBuilder()
            .traceId(context.traceIdString())
            .parentId(context.parentIdString())
            .id(context.spanIdString());
        if (context.debug()) builderWithContextData.debug(true);

        converter.convert(span, builderWithContextData);
        spanReporter.report(builderWithContextData.build());
        return true;
    }

    @Override public String toString() {
        return spanReporter.toString();
    }
}

可見,上面最終是通過Reporter組件來上報(bào)數(shù)據(jù)的猿推。那么Report是如何上報(bào)的了片习?

Brave Reporter

因?yàn)樯蠄?bào)組件要支持多種編碼協(xié)議以及多種傳輸協(xié)議,因此邏輯比較復(fù)雜蹬叭,官方專門建了一個(gè)項(xiàng)目:zipkin-reporter-java

我們首先看看Reporter接口類定義:

public interface Reporter<S> {
    Reporter<Span> NOOP = new Reporter<Span>() {
        @Override public void report(Span span) {
        }

        @Override public String toString() {
        return "NoopReporter{}";
        }
    };
    Reporter<Span> CONSOLE = new Reporter<Span>() {
        @Override public void report(Span span) {
             System.out.println(span.toString());
        }

        @Override public String toString() {
            return "ConsoleReporter{}";
        }
    };

    /**
    * Schedules the span to be sent onto the transport.
    *
    * @param span Span, should not be <code>null</code>.
    */
    void report(S span);
}

Reporter有三個(gè)子類實(shí)現(xiàn)藕咏,分別是CONSOLE,NOOP秽五,和AsyncReporter孽查。CONSOLE就是直接控制臺打印出Span數(shù)據(jù),一般用來調(diào)試差不多坦喘;NOOP是一個(gè)空實(shí)現(xiàn)盲再,啥也不干西设;AsyncReporter是我們平時(shí)上報(bào)用的Reporter,他提供異步上報(bào)數(shù)據(jù)能力答朋。

AsyncReporter

AsyncReporter is how you actually get spans to zipkin. By default, it waits up to a second before flushes any pending spans out of process via a Sender.

根據(jù)不同協(xié)議贷揽,Ayncreporter執(zhí)行相應(yīng)的build邏輯:

public AsyncReporter<Span> build() {
    switch(this.sender.encoding()) {
        case JSON:
            return this.build(SpanBytesEncoder.JSON_V2);
        case PROTO3:
            return this.build(SpanBytesEncoder.PROTO3);
        case THRIFT:
            return this.build(SpanBytesEncoder.THRIFT);
        default:
            throw new UnsupportedOperationException(this.sender.encoding().name());
    }
}

build方法詳細(xì)邏輯,如下:

public <S> AsyncReporter<S> build(BytesEncoder<S> encoder) {
    if (encoder == null) throw new NullPointerException("encoder == null");

    if (encoder.encoding() != sender.encoding()) {
        throw new IllegalArgumentException(String.format(
    "Encoder doesn't match Sender: %s %s", encoder.encoding(), sender.encoding()));
    }

    final BoundedAsyncReporter<S> result = new BoundedAsyncReporter<>(this, encoder);

    if (messageTimeoutNanos > 0) { 
        // Start a thread that flushes the queue in a loop.
        final BufferNextMessage<S> consumer =
        BufferNextMessage.create(encoder.encoding(), messageMaxBytes, messageTimeoutNanos);
        Thread flushThread = threadFactory.newThread(new Flusher<>(result, consumer));
        flushThread.setName("AsyncReporter{" + sender + "}");
        flushThread.setDaemon(true);
        flushThread.start();
    }
    return result;
}

可以看到AsyncReporter的build方法中梦碗,啟動了一個(gè)守護(hù)線程flushThread禽绪,一直循環(huán)調(diào)用BoundedAsyncReporter的flush方法:

void flush(BufferNextMessage<S> bundler) {
    if (closed.get()) throw new IllegalStateException("closed");
    pending.drainTo(bundler, bundler.remainingNanos());

    // record after flushing reduces the amount of gauge events vs on doing this on report
    metrics.updateQueuedSpans(pending.count);
    metrics.updateQueuedBytes(pending.sizeInBytes);

    // loop around if we are running, and the bundle isn't full
    // if we are closed, try to send what's pending
    if (!bundler.isReady() && !closed.get()) return;

    // Signal that we are about to send a message of a known size in bytes
    metrics.incrementMessages();
    metrics.incrementMessageBytes(bundler.sizeInBytes());
    ArrayList<byte[]> nextMessage = new ArrayList<>(bundler.count());
    bundler.drain(new SpanWithSizeConsumer<S>() {
        @Override public boolean offer(S next, int nextSizeInBytes) {
            nextMessage.add(encoder.encode(next)); // speculatively add to the pending message
            if (sender.messageSizeInBytes(nextMessage) > messageMaxBytes) {
                // if we overran the message size, remove the encoded message.
                nextMessage.remove(nextMessage.size() - 1);
                return false;
            }
            return true;
        }
    });

    try {
        sender.sendSpans(nextMessage).execute();
    } catch (IOException | RuntimeException | Error t) {
        ......
    }
}

Flush方法的主要邏輯如下:

  • 將隊(duì)列pending中的數(shù)據(jù),提取到nextMessage鏈表中洪规;
  • 調(diào)用Sender的sendSpans方法印屁,發(fā)送到nextMessage鏈表中的Span數(shù)據(jù)到Zipkin;

這樣斩例,Reporter即做到了異步發(fā)送雄人!

Sender

Sender組件完成發(fā)送Span到zipkin服務(wù)端的最后一步,即利用某個(gè)傳輸協(xié)議樱拴,將數(shù)據(jù)發(fā)送到zipkin服務(wù)端柠衍。

public abstract class Sender extends Component {
    public Sender() {
    }
    
    public abstract Encoding encoding();
    
    public abstract int messageMaxBytes();
    
    public abstract int messageSizeInBytes(List<byte[]> var1);
    
    public int messageSizeInBytes(int encodedSizeInBytes) {
        return this.messageSizeInBytes(Collections.singletonList(new byte[encodedSizeInBytes]));
    }
    
    public abstract Call<Void> sendSpans(List<byte[]> var1);
}

其中核心的方法sendSpans是一個(gè)抽象方法洋满,不同傳輸協(xié)議的Sender會提供具體的實(shí)現(xiàn)邏輯晶乔,其子類有:
ActiveMQSender、FakeSender牺勾、KafkaSender正罢、LibthriftSender、OkHttpSender驻民、RabbitMQSender翻具、URLConnectionSender。

不同協(xié)議均按照自身協(xié)議規(guī)范執(zhí)行發(fā)送邏輯回还,因?yàn)槲覀兊腄emo中用的是OkHttpSender裆泳,所以我們主要看看OkHttpSender是如何實(shí)現(xiàn)的。Demo中使用如下:

Sender sender = OkHttpSender.create("http://localhost:9411/api/v2/spans");
AsyncReporter asyncReporter = AsyncReporter.create(sender);

這里柠硕,通過AsyncReporter.create方法工禾,我們將OkHttpSender注入到了Reporter中,那么接下來看看OkHttpSender的sendSpans方法實(shí)現(xiàn):

@Override 
public zipkin2.Call<Void> sendSpans(List<byte[]> encodedSpans) {
    if (closeCalled) throw new IllegalStateException("closed");
    Request request;
    try {
        request = newRequest(encoder.encode(encodedSpans));
    } catch (IOException e) {
        throw zipkin2.internal.Platform.get().uncheckedIOException(e);
    }
    return new HttpCall(client.newCall(request));
}

執(zhí)行完這個(gè)方法后蝗柔,會返回一個(gè)HttpCall闻葵,Reporter的flush方法中會調(diào)用HttpCall的execute方法,完成Http請求發(fā)送癣丧。

Brave Tracer

Span數(shù)據(jù)結(jié)構(gòu)槽畔,包括發(fā)送Span的組件我們搞清楚了,那么誰來負(fù)責(zé)創(chuàng)建Span了胁编?這就是Tracer的工作厢钧,他負(fù)責(zé)創(chuàng)建Span及提供Span的各種操作鳞尔,主要方法如下表所示:

方法名 描述
Span newTrace() 創(chuàng)建一個(gè)Root Span
Span joinSpan(TraceContext context) 公用一個(gè)SpanId,主要存在于RPC場景中
Span newChild(TraceContext parent) 創(chuàng)建一個(gè)子Span
Span nextSpan(TraceContextOrSamplingFlags extracted) 基于請求的參數(shù)信息創(chuàng)建一個(gè)新的Span
Span toSpan(TraceContext context) 通過TraceContext創(chuàng)建一個(gè)Span
Span currentSpan() 獲取當(dāng)前Span
Span nextSpan() 基于當(dāng)前Span生成一個(gè)子Span

Brave Tracing

現(xiàn)在Span有了,創(chuàng)建Span的組件有了早直,發(fā)送Span的組件也有了铅檩,那就只需要一個(gè)把他們組合起來的類似工廠的角色了,那就是Tracing,他的主要工作就是連接服務(wù)器莽鸿,然后利用Tracer創(chuàng)建出Span昧旨,接著發(fā)送Span到zipkin服務(wù)端。

Tracing源碼采用的Builder模式祥得,再看看我們Demo中創(chuàng)建Tracing的代碼:

Tracing tracing = Tracing.newBuilder()
            .localServiceName("my-service")
            .spanReporter(asyncReporter)
            .build();

我們Tracing.newBuilder()創(chuàng)建了一個(gè)Tracing的Builder兔沃,然后指定了這個(gè)Tracing的服務(wù)名,使用什么Reporter级及,接著調(diào)用了Builder的build方法乒疏,我們看看build方法代碼:

public Tracing build() {
    if (clock == null) clock = Platform.get().clock();
    if (localIp == null) localIp = Platform.get().linkLocalIp();
    if (spanReporter == null) spanReporter = new LoggingReporter();
    return new Default(this);
}

它調(diào)用了Tracing的默認(rèn)實(shí)現(xiàn),默認(rèn)實(shí)現(xiàn)子類如下:

static final class Default extends Tracing {
    final Tracer tracer;
    final Propagation.Factory propagationFactory;
    final Propagation<String> stringPropagation;
    final CurrentTraceContext currentTraceContext;
    final Sampler sampler;
    final Clock clock;
    final ErrorParser errorParser;

    Default(Builder builder) {
      this.clock = builder.clock;
      this.errorParser = builder.errorParser;
      this.propagationFactory = builder.propagationFactory;
      this.stringPropagation = builder.propagationFactory.create(Propagation.KeyFactory.STRING);
      this.currentTraceContext = builder.currentTraceContext;
      this.sampler = builder.sampler;
      zipkin2.Endpoint localEndpoint = zipkin2.Endpoint.newBuilder()
          .serviceName(builder.localServiceName)
          .ip(builder.localIp)
          .port(builder.localPort)
          .build();
      SpanReporter reporter = new SpanReporter(localEndpoint, builder.reporter, noop);
      this.tracer = new Tracer(
          builder.clock,
          builder.propagationFactory,
          reporter,
          new PendingSpans(localEndpoint, clock, reporter, noop),
          builder.sampler,
          builder.errorParser,
          builder.currentTraceContext,
          builder.traceId128Bit || propagationFactory.requires128BitTraceId(),
          builder.supportsJoin && propagationFactory.supportsJoin(),
          noop
      );
      maybeSetCurrent();
    }

從上面可以看到饮焦,主要干的工作有:

  • 根據(jù)Spanreporter,生成FinishedSpanHandler怕吴,發(fā)送Span用;
  • 根據(jù)FinishedSpanHandler以及其他默認(rèn)信息生成Tracer;

OK县踢,現(xiàn)在對于DEMO中的Brave的上報(bào)數(shù)據(jù)流程和原理是不是清楚了不少转绷!

鏈路上報(bào)總結(jié)

Zipkin鏈路上報(bào)看起來很復(fù)雜,其實(shí)剝離各種封裝硼啤,去除各種組件议经,其主線邏輯就是如下三步:

  • 構(gòu)造span對象,包括traceId谴返,parentId,以及其自身的spanId等參數(shù)煞肾;
  • 選一種編解碼協(xié)議,比如JSON嗓袱,或者THRIF,或者PROTO3對Span進(jìn)行編碼籍救;
  • 將編碼后的span,通過利用一種傳輸協(xié)議上報(bào)到服務(wù)端渠抹;

還有一塊未分析:Brave是如何Support各個(gè)組件的蝙昙。因?yàn)楸疚膬?nèi)容較多,放到以后的文章分析逼肯!

后記

本文為我的調(diào)用鏈系列文章之一耸黑,已有文章如下:

OK,ZIPKIN 鏈路上報(bào)分析到此為止篮幢,祝大家工作順利大刊,天天開心!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市缺菌,隨后出現(xiàn)的幾起案子葫辐,更是在濱河造成了極大的恐慌,老刑警劉巖伴郁,帶你破解...
    沈念sama閱讀 206,378評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件耿战,死亡現(xiàn)場離奇詭異,居然都是意外死亡焊傅,警方通過查閱死者的電腦和手機(jī)剂陡,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,356評論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來狐胎,“玉大人鸭栖,你說我怎么就攤上這事∥粘玻” “怎么了晕鹊?”我有些...
    開封第一講書人閱讀 152,702評論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長暴浦。 經(jīng)常有香客問我溅话,道長,這世上最難降的妖魔是什么歌焦? 我笑而不...
    開封第一講書人閱讀 55,259評論 1 279
  • 正文 為了忘掉前任飞几,我火速辦了婚禮,結(jié)果婚禮上同规,老公的妹妹穿的比我還像新娘循狰。我一直安慰自己窟社,他們只是感情好券勺,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,263評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著灿里,像睡著了一般关炼。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上匣吊,一...
    開封第一講書人閱讀 49,036評論 1 285
  • 那天儒拂,我揣著相機(jī)與錄音,去河邊找鬼色鸳。 笑死社痛,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的命雀。 我是一名探鬼主播蒜哀,決...
    沈念sama閱讀 38,349評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼吏砂!你這毒婦竟也來了撵儿?” 一聲冷哼從身側(cè)響起乘客,我...
    開封第一講書人閱讀 36,979評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎淀歇,沒想到半個(gè)月后易核,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,469評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡浪默,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,938評論 2 323
  • 正文 我和宋清朗相戀三年牡直,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片纳决。...
    茶點(diǎn)故事閱讀 38,059評論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡井氢,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出岳链,到底是詐尸還是另有隱情花竞,我是刑警寧澤,帶...
    沈念sama閱讀 33,703評論 4 323
  • 正文 年R本政府宣布掸哑,位于F島的核電站约急,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏苗分。R本人自食惡果不足惜厌蔽,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,257評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望摔癣。 院中可真熱鬧奴饮,春花似錦、人聲如沸择浊。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,262評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽琢岩。三九已至投剥,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間担孔,已是汗流浹背江锨。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留糕篇,地道東北人啄育。 一個(gè)月前我還...
    沈念sama閱讀 45,501評論 2 354
  • 正文 我出身青樓,卻偏偏與公主長得像拌消,于是被迫代替她去往敵國和親挑豌。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,792評論 2 345