GRPC java 分布式調(diào)用鏈跟蹤實踐

Opentracing基本模型

image.png

如圖甸鸟,在跟蹤鏈中有以下幾個比較重要的數(shù)據(jù)結(jié)構(gòu)和概念:

span:標(biāo)識一次分布式調(diào)用朵你,其自身包含了id舆瘪,parentId(指向上級Span的id)朽肥, traceIds梗脾,服務(wù)名稱等重要屬性,其應(yīng)盡量保持精簡繁涂;
trace:標(biāo)識整個請求鏈拱她,即一些列Span的組合。其自身的ID將貫穿整個調(diào)用鏈扔罪,其中的每個Span都必須攜帶這個traceId秉沼,因此traceId將在整個調(diào)用鏈中傳遞;
cs:客戶端發(fā)起請求矿酵,標(biāo)志Span的開始唬复;
sr:服務(wù)端接收到請求,并開始處理內(nèi)部事務(wù)全肮,其中sr - cs則為網(wǎng)絡(luò)延遲和時鐘抖動敞咧;
ss:服務(wù)端處理完請求,返回響應(yīng)內(nèi)容辜腺,其中ss - sr則為服務(wù)端處理請求耗時休建;
cr:客戶端接收到服務(wù)端響應(yīng)內(nèi)容,標(biāo)志著Span的結(jié)束评疗,其中cr - ss則為網(wǎng)絡(luò)延遲和時鐘抖動测砂。

客戶端調(diào)用時間=cr-cs
服務(wù)端處理時間=sr-ss

分布式系統(tǒng)調(diào)用跟蹤的基本架構(gòu)要求

低侵入性,高性能壤巷,高可用容錯邑彪,低丟失率等。

基于GRPC的分布式系統(tǒng)調(diào)用跟蹤實踐

創(chuàng)建TraceContext

TraceContext通過Threadlocal對span進行保存胧华,并且將traceid和spanid向底層服務(wù)傳遞寄症,zebra對線程上下文傳遞進行了封裝,具體參照GRPC如何實現(xiàn)公共參數(shù)與業(yè)務(wù)參數(shù)分離傳遞下面是TraceContext具體代碼

public class TraceContext{

   private static final String SPAN_LIST_KEY = "spanList";

   public static final String TRACE_ID_KEY = "traceId";

   public static final String SPAN_ID_KEY = "spanId";

   public static final String ANNO_CS = "cs";

   public static final String ANNO_CR = "cr";

   public static final String ANNO_SR = "sr";

   public static final String ANNO_SS = "ss";

   private TraceContext(){}

   public static void setTraceId(String traceId) {
       RpcContext.getContext().set(TRACE_ID_KEY, traceId);
   }

   public static String getTraceId() {
       return (String) RpcContext.getContext().get(TRACE_ID_KEY);
   }

   public static String getSpanId() {
       return (String) RpcContext.getContext().get(SPAN_ID_KEY);
   }

   public static void setSpanId(String spanId) {
       RpcContext.getContext().set(SPAN_ID_KEY, spanId);
   }

   @SuppressWarnings("unchecked")
   public static void addSpan(Span span){
       ((List<Span>)RpcContext.getContext().get(SPAN_LIST_KEY)).add(span);
   }

   @SuppressWarnings("unchecked")
   public static List<Span> getSpans(){
       return (List<Span>) RpcContext.getContext().get(SPAN_LIST_KEY);
   }

   public static void clear(){
       RpcContext.getContext().remove(TRACE_ID_KEY);
       RpcContext.getContext().remove(SPAN_ID_KEY);
       RpcContext.getContext().remove(SPAN_LIST_KEY);
   }

   public static void start(){
       clear();
       RpcContext.getContext().set(SPAN_LIST_KEY, new ArrayList<Span>());
   }
}

創(chuàng)建TraceAgent

TraceAgent將span信息上傳至kafka矩动,代碼如下:

public class TraceAgent {
   private GrpcProperties grpcProperties;
   private KafkaSender sender;
   private AsyncReporter<zipkin2.Span> report;

   public TraceAgent() {
       grpcProperties = SpringContextUtils.getBean(GrpcProperties.class);
       sender = KafkaSender.newBuilder().bootstrapServers(grpcProperties.getCallChainUpdAddr()).topic("zipkin").encoding(Encoding.JSON).build();
       report = AsyncReporter.builder(sender).build();
   }

   public void send(final List<Span> spans){
       spans.forEach(item ->{
           report.report(item);
       });
   }
}

創(chuàng)建ZebraClientTracing

ZebraClientTracing用于記錄調(diào)用端的span信息有巧,具體代碼如下:

@Component
public class ZebraClientTracing {
   public Span startTrace(String method) {
       String id = IdUtils.get() + "";
       String traceId = null;
       if (null == TraceContext.getTraceId()) {
           TraceContext.start();
           traceId = id;
       } else {
           traceId = TraceContext.getTraceId();
       }
       long timestamp = System.currentTimeMillis() * 1000;
       // 注冊本地信息
       Endpoint endpoint = Endpoint.newBuilder().ip(NetUtils.getLocalHost()).serviceName(EtcdRegistry.serviceName)
               .port(50003).build();
       // 初始化span
       Span consumerSpan = Span.newBuilder().localEndpoint(endpoint).id(id).traceId(traceId)
               .parentId(TraceContext.getSpanId() + "").name(EtcdRegistry.serviceName).timestamp(timestamp)
               .addAnnotation(timestamp, TraceContext.ANNO_CS).putTag("method", method)
               .putTag("pkgId", RpcContext.getContext().getAttachment("pkg")).build();
       // 將tracing id和spanid放到上下文
       RpcContext.getContext().get().put(TraceContext.TRACE_ID_KEY, consumerSpan.traceId());
       RpcContext.getContext().get().put(TraceContext.SPAN_ID_KEY, String.valueOf(consumerSpan.id()));
       return consumerSpan;
   }

   public void endTrace(Span span, Stopwatch watch,int code) {
       span = span.toBuilder().addAnnotation(System.currentTimeMillis() * 1000, TraceContext.ANNO_CR)
               .duration(watch.stop().elapsed(TimeUnit.MICROSECONDS)).putTag("code", code+"").build();
       TraceAgent traceAgent = new TraceAgent();
       traceAgent.send(TraceContext.getSpans());
   }
}

創(chuàng)建ZebraServerTracing

ZebraServerTracing用于記錄服務(wù)端的span信息,具體代碼如下:

@Component
public class ZebraServerTracing {
   public Span startTrace(String method) {
       String traceId = (String) RpcContext.getContext().get(TraceContext.TRACE_ID_KEY);
       String parentSpanId = (String) RpcContext.getContext().get(TraceContext.SPAN_ID_KEY);

       String id = IdUtils.get() + "";
       TraceContext.start();
       TraceContext.setTraceId(traceId);
       TraceContext.setSpanId(parentSpanId);

       long timestamp = System.currentTimeMillis() * 1000;
       Endpoint endpoint = Endpoint.newBuilder().ip(NetUtils.getLocalHost()).serviceName(EtcdRegistry.serviceName)
               .port(50003).build();
       Span providerSpan = Span.newBuilder().id(id).parentId(parentSpanId).traceId(traceId)
               .name(EtcdRegistry.serviceName).timestamp(timestamp).localEndpoint(endpoint)
               .addAnnotation(timestamp, TraceContext.ANNO_SR).putTag("method", method)
               .putTag("pkgId", RpcContext.getContext().getAttachment("pkg"))
               .build();
       TraceContext.addSpan(providerSpan);
       return providerSpan;
   }

   public void endTrace(Span span, Stopwatch watch,int code) {
       span = span.toBuilder().addAnnotation(System.currentTimeMillis() * 1000, TraceContext.ANNO_SS)
               .duration(watch.stop().elapsed(TimeUnit.MICROSECONDS)).putTag("code", code+"").build();
       TraceAgent traceAgent = new TraceAgent();
       traceAgent.send(TraceContext.getSpans());
   }
}

創(chuàng)建grpc client攔截器

public class HeaderClientInterceptor implements ClientInterceptor {

    private static final Logger log = LogManager.getLogger(HeaderClientInterceptor.class);
    private final ZebraClientTracing clientTracing;
    
    public static ClientInterceptor instance() {
        return new HeaderClientInterceptor();
    }

    private HeaderClientInterceptor() {
        clientTracing = SpringContextUtils.getBean(ZebraClientTracing.class);
    }

    @Override
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
            CallOptions callOptions, Channel next) {
        return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
            //判斷API網(wǎng)關(guān)是否要打開調(diào)用鏈
            boolean isGatewayTracing = "1".equals(RpcContext.getContext().getAttachment(ZebraConstants.ZEBRA_OPEN_TRACING))?true:false;
            boolean isSubTracing = RpcContext.getContext().get(TraceContext.TRACE_ID_KEY)!=null?true:false;
            Stopwatch watch =null;
            Span span =null;
            
            @Override
            public void start(Listener<RespT> responseListener, Metadata headers) {
                if(isSubTracing||isGatewayTracing){
                    span =clientTracing.startTrace(method.getFullMethodName());
                    watch = Stopwatch.createStarted();
                }
                copyThreadLocalToMetadata(headers);
                super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) {
                    @Override
                    public void onHeaders(Metadata headers) {
                        super.onHeaders(headers);
                    }

                    @Override
                    public void onClose(Status status, Metadata trailers) {
                        super.onClose(status, trailers);
                        if(isSubTracing||isGatewayTracing)
                            clientTracing.endTrace(span, watch,status.getCode().value());
                    }
                }, headers);
            }
        };
    }

    private void copyThreadLocalToMetadata(Metadata headers) {
        Map<String, String> attachments = RpcContext.getContext().getAttachments();
        Map<String, Object> values = RpcContext.getContext().get();
        try {
            if (!attachments.isEmpty()) {
                headers.put(GrpcUtil.GRPC_CONTEXT_ATTACHMENTS, SerializerUtil.toJson(attachments));
            }
            if (!values.isEmpty()) {
                headers.put(GrpcUtil.GRPC_CONTEXT_VALUES, SerializerUtil.toJson(values));
            }
        } catch (Throwable e) {
            log.error(e.getMessage(), e);
        }
    }
}

創(chuàng)建grpc server攔截器

public class HeaderServerInterceptor implements ServerInterceptor {

    private static final Logger log = LogManager.getLogger(HeaderServerInterceptor.class);

    private final ZebraServerTracing serverTracing;

    public static ServerInterceptor instance() {
        return new HeaderServerInterceptor();
    }

    private HeaderServerInterceptor() {
        serverTracing = SpringContextUtils.getBean(ZebraServerTracing.class);
    }

    @Override
    public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, final Metadata headers,
            ServerCallHandler<ReqT, RespT> next) {
        return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
            boolean isSubTracing = RpcContext.getContext().get(TraceContext.TRACE_ID_KEY) != null ? true : false;
            Stopwatch watch = null;
            Span span = null;

            @Override
            public void request(int numMessages) {
                if (isSubTracing) {
                    span = serverTracing.startTrace(call.getMethodDescriptor().getFullMethodName());
                    watch = Stopwatch.createStarted();
                }
                InetSocketAddress remoteAddress = (InetSocketAddress) call.getAttributes()
                        .get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
                RpcContext.getContext().setAttachment(ZebraConstants.REMOTE_ADDRESS, remoteAddress.getHostString());
                copyMetadataToThreadLocal(headers);
                log.debug("FullMethodName:{},RemoteAddress={},attachments={},context={}",
                        call.getMethodDescriptor().getFullMethodName(), remoteAddress.getHostString(),
                        headers.get(GrpcUtil.GRPC_CONTEXT_ATTACHMENTS), headers.get(GrpcUtil.GRPC_CONTEXT_VALUES));
                super.request(numMessages);
            }

            @Override
            public void close(Status status, Metadata trailers) {
                delegate().close(status, trailers);
                if(isSubTracing)
                    serverTracing.endTrace(span, watch,status.getCode().value());
            }

        }, headers);
    }

    private void copyMetadataToThreadLocal(Metadata headers) {
        String attachments = headers.get(GrpcUtil.GRPC_CONTEXT_ATTACHMENTS);
        String values = headers.get(GrpcUtil.GRPC_CONTEXT_VALUES);
        try {
            if (attachments != null) {
                Map<String, String> attachmentsMap = SerializerUtil.fromJson(attachments,
                        new TypeToken<Map<String, String>>() {
                        }.getType());
                RpcContext.getContext().setAttachments(attachmentsMap);
            }
            if (values != null) {
                Map<String, Object> valuesMap = SerializerUtil.fromJson(values, new TypeToken<Map<String, Object>>() {
                }.getType());
                for (Map.Entry<String, Object> entry : valuesMap.entrySet()) {
                    RpcContext.getContext().set(entry.getKey(), entry.getValue());
                }
            }
        } catch (Throwable e) {
            log.error(e.getMessage(), e);
        }
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末悲没,一起剝皮案震驚了整個濱河市篮迎,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌示姿,老刑警劉巖甜橱,帶你破解...
    沈念sama閱讀 212,383評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異栈戳,居然都是意外死亡岂傲,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,522評論 3 385
  • 文/潘曉璐 我一進店門子檀,熙熙樓的掌柜王于貴愁眉苦臉地迎上來镊掖,“玉大人,你說我怎么就攤上這事褂痰∧督” “怎么了?”我有些...
    開封第一講書人閱讀 157,852評論 0 348
  • 文/不壞的土叔 我叫張陵缩歪,是天一觀的道長归薛。 經(jīng)常有香客問我,道長匪蝙,這世上最難降的妖魔是什么苟翻? 我笑而不...
    開封第一講書人閱讀 56,621評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮骗污,結(jié)果婚禮上崇猫,老公的妹妹穿的比我還像新娘。我一直安慰自己需忿,他們只是感情好诅炉,可當(dāng)我...
    茶點故事閱讀 65,741評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著屋厘,像睡著了一般涕烧。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上汗洒,一...
    開封第一講書人閱讀 49,929評論 1 290
  • 那天议纯,我揣著相機與錄音,去河邊找鬼溢谤。 笑死瞻凤,一個胖子當(dāng)著我的面吹牛憨攒,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播阀参,決...
    沈念sama閱讀 39,076評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼肝集,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了蛛壳?” 一聲冷哼從身側(cè)響起杏瞻,我...
    開封第一講書人閱讀 37,803評論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎衙荐,沒想到半個月后捞挥,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,265評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡忧吟,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,582評論 2 327
  • 正文 我和宋清朗相戀三年砌函,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片瀑罗。...
    茶點故事閱讀 38,716評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡胸嘴,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出斩祭,到底是詐尸還是另有隱情劣像,我是刑警寧澤,帶...
    沈念sama閱讀 34,395評論 4 333
  • 正文 年R本政府宣布摧玫,位于F島的核電站耳奕,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏诬像。R本人自食惡果不足惜屋群,卻給世界環(huán)境...
    茶點故事閱讀 40,039評論 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望坏挠。 院中可真熱鬧芍躏,春花似錦、人聲如沸降狠。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,798評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽榜配。三九已至否纬,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間蛋褥,已是汗流浹背临燃。 一陣腳步聲響...
    開封第一講書人閱讀 32,027評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人膜廊。 一個月前我還...
    沈念sama閱讀 46,488評論 2 361
  • 正文 我出身青樓乏沸,卻偏偏與公主長得像,于是被迫代替她去往敵國和親溃论。 傳聞我的和親對象是個殘疾皇子屎蜓,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,612評論 2 350

推薦閱讀更多精彩內(nèi)容