Skywalking收集與發(fā)送鏈路數(shù)據(jù)部分源碼解析

鏈路收集大體邏輯

這里先不分析skywalking是如何自動收集數(shù)據(jù)的,而是說一下agent在收集后如何存儲與發(fā)送給collector,這部分的架構(gòu)關(guān)系到性能開銷與對服務(wù)的影響

大體邏輯如下:

agent內(nèi)部緩存維護(hù)了一個生產(chǎn)消費(fèi)者拱撵,收集數(shù)據(jù)時將生產(chǎn)的數(shù)據(jù)按分區(qū)放到緩存中芍锚,消費(fèi)者用多線程消費(fèi)數(shù)據(jù),將緩存的數(shù)據(jù)封裝成grpc對象發(fā)送給collector

鏈路數(shù)據(jù)接收與發(fā)送

數(shù)據(jù)的接收與發(fā)送主要在類TraceSegmentServiceClient中處理
其中的一個重要屬性是DataCarrier泥张,它來實(shí)現(xiàn)的生產(chǎn)消費(fèi)模式

private volatile DataCarrier<TraceSegment> carrier;

大致結(jié)構(gòu)如下


DataCarrier.png

DataCarrier

屬性如下:

    //一個buffer的大小
    private final int bufferSize;
    //channel的大小
    private final int channelSize;
    private Channels<T> channels;
    //消費(fèi)者線程池封裝
    private ConsumerPool<T> consumerPool;
    private String name;
    

方法#produce生產(chǎn)數(shù)據(jù)

    public boolean produce(T data) {
        if (consumerPool != null) {
            if (!consumerPool.isRunning()) {
                return false;
            }
        }

        return this.channels.save(data);
    }

channel的save方法

public boolean save(T data) {
        //計算放在channel哪個位置
        int index = dataPartitioner.partition(bufferChannels.length, data);
        //重試次數(shù)
        int retryCountDown = 1;
        if (BufferStrategy.IF_POSSIBLE.equals(strategy)) {
            int maxRetryCount = dataPartitioner.maxRetryCount();
            if (maxRetryCount > 1) {
                retryCountDown = maxRetryCount;
            }
        }
        for (; retryCountDown > 0; retryCountDown--) {
            //保存成功返回true
            if (bufferChannels[index].save(data)) {
                return true;
            }
        }
        return false;
    }

進(jìn)入到Buffer的save方法莹痢,TraceSegmentServiceClient用的策略是IF_POSSIBLE种蘸,緩存位置還有值直接返回,所以消費(fèi)不過來會丟失部分?jǐn)?shù)據(jù)

    boolean save(T data) {
        //數(shù)組位置自增
        int i = index.getAndIncrement();
        //不為空的處理
        if (buffer[i] != null) {
            switch (strategy) {
                case BLOCKING:
                    boolean isFirstTimeBlocking = true;
                    while (buffer[i] != null) {
                        if (isFirstTimeBlocking) {
                            isFirstTimeBlocking = false;
                            for (QueueBlockingCallback<T> callback : callbacks) {
                                callback.notify(data);
                            }
                        }
                        try {
                            Thread.sleep(1L);
                        } catch (InterruptedException e) {
                        }
                    }
                    break;
                case IF_POSSIBLE:
                    return false;
                case OVERRIDE:
                default:
            }
        }
        //寫入緩存
        buffer[i] = data;
        return true;
    }

DataCarrier的consume方法初始化消費(fèi)者線程池

    public DataCarrier consume(Class<? extends IConsumer<T>> consumerClass, int num, long consumeCycle) {
        if (consumerPool != null) {
            consumerPool.close();
        }
        consumerPool = new ConsumerPool<T>(this.name, this.channels, consumerClass, num, consumeCycle);
        consumerPool.begin();
        return this;
    }

參數(shù)consumerClass就是TraceSegmentServiceClient類自己竞膳,實(shí)現(xiàn)具體的消費(fèi)方法航瞭,初始化以后線程池就啟動了

consumerPool方法begin

    public void begin() {
        if (running) {
            return;
        }
        try {
            lock.lock();
            //把channel分給不同的thread
            this.allocateBuffer2Thread();
            for (ConsumerThread consumerThread : consumerThreads) {
                consumerThread.start();
            }
            running = true;
        } finally {
            lock.unlock();
        }
    }

cusumerThread的run方法

    @Override
    public void run() {
        running = true;

        while (running) {
            boolean hasData = consume();

            if (!hasData) {
                try {
                    Thread.sleep(consumeCycle);
                } catch (InterruptedException e) {
                }
            }
        }

        consume();

        consumer.onExit();
    }

最終會調(diào)到的TraceSegmentServiceClient這個消費(fèi)者的consume方法,將TraceSegment轉(zhuǎn)換成grpc對象發(fā)送給collector

@Override
    public void consume(List<TraceSegment> data) {
        if (CONNECTED.equals(status)) {
            final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
            StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.collect(new StreamObserver<Downstream>() {
                @Override
                public void onNext(Downstream downstream) {

                }

                @Override
                public void onError(Throwable throwable) {
                    status.finished();
                    if (logger.isErrorEnable()) {
                        logger.error(throwable, "Send UpstreamSegment to collector fail with a grpc internal exception.");
                    }
                    ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(throwable);
                }

                @Override
                public void onCompleted() {
                    status.finished();
                }
            });

            for (TraceSegment segment : data) {
                try {
                    UpstreamSegment upstreamSegment = segment.transform();
                    upstreamSegmentStreamObserver.onNext(upstreamSegment);
                } catch (Throwable t) {
                    logger.error(t, "Transform and send UpstreamSegment to collector fail.");
                }
            }
            upstreamSegmentStreamObserver.onCompleted();

            status.wait4Finish();
            segmentUplinkedCounter += data.size();
        } else {
            segmentAbandonedCounter += data.size();
        }

        printUplinkStatus();
    }

channel

channel中包含一個Buffer數(shù)組:

    //Buffer數(shù)組
    private final Buffer<T>[] bufferChannels;
    //數(shù)據(jù)分區(qū)策略
    private IDataPartitioner<T> dataPartitioner;
    //Buffer策略
    private BufferStrategy strategy;

Buffer對象

    public class Buffer<T> {
    //對象數(shù)組
    private final Object[] buffer;
    private BufferStrategy strategy;
    //位置標(biāo)記
    private AtomicRangeInteger index;
    private List<QueueBlockingCallback<T>> callbacks;
    ... 
    

抽樣服務(wù)SamplingService

作用是對TraceSegment進(jìn)行抽樣坦辟,鏈路跟蹤必須要考慮的功能刊侯,服務(wù)壓力大時全量收集會占用cpu、內(nèi)存长窄、網(wǎng)絡(luò)等資源

agent通過agent.config配置檔中的agent.sample_n_per_3_secs設(shè)置每三秒收集的TraceSegment的個數(shù)滔吠,大于0為開啟狀態(tài),默認(rèn)全量收集

初始化一個3秒的定時任務(wù)

    @Override
    public void boot() throws Throwable {
        if (scheduledFuture != null) {
            scheduledFuture.cancel(true);
        }
        //大于0開啟抽樣
        if (Config.Agent.SAMPLE_N_PER_3_SECS > 0) {
            on = true;
            this.resetSamplingFactor();
            ScheduledExecutorService service = Executors
                .newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("SamplingService"));
            //定時任務(wù)
            scheduledFuture = service.scheduleAtFixedRate(new RunnableWithExceptionProtection(new Runnable() {
                @Override
                public void run() {
                    resetSamplingFactor();
                }
            }, new RunnableWithExceptionProtection.CallbackWhenException() {
                @Override public void handle(Throwable t) {
                    logger.error("unexpected exception.", t);
                }
            }), 0, 3, TimeUnit.SECONDS);
            logger.debug("Agent sampling mechanism started. Sample {} traces in 3 seconds.", Config.Agent.SAMPLE_N_PER_3_SECS);
        }
    }

抽樣邏輯

    public boolean trySampling() {
        if (on) {
            int factor = samplingFactorHolder.get();
            if (factor < Config.Agent.SAMPLE_N_PER_3_SECS) {
                boolean success = samplingFactorHolder.compareAndSet(factor, factor + 1);
                return success;
            } else {
                return false;
            }
        }
        return true;
    }

skywalking的agent不支持按比例抽樣挠日,比較遺憾

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市翰舌,隨后出現(xiàn)的幾起案子嚣潜,更是在濱河造成了極大的恐慌,老刑警劉巖椅贱,帶你破解...
    沈念sama閱讀 206,378評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件懂算,死亡現(xiàn)場離奇詭異,居然都是意外死亡庇麦,警方通過查閱死者的電腦和手機(jī)计技,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,356評論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來山橄,“玉大人,你說我怎么就攤上這事≈ゴ耍” “怎么了?”我有些...
    開封第一講書人閱讀 152,702評論 0 342
  • 文/不壞的土叔 我叫張陵萌衬,是天一觀的道長。 經(jīng)常有香客問我它抱,道長秕豫,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,259評論 1 279
  • 正文 為了忘掉前任观蓄,我火速辦了婚禮混移,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘侮穿。我一直安慰自己沫屡,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,263評論 5 371
  • 文/花漫 我一把揭開白布撮珠。 她就那樣靜靜地躺著沮脖,像睡著了一般。 火紅的嫁衣襯著肌膚如雪芯急。 梳的紋絲不亂的頭發(fā)上勺届,一...
    開封第一講書人閱讀 49,036評論 1 285
  • 那天,我揣著相機(jī)與錄音娶耍,去河邊找鬼免姿。 笑死,一個胖子當(dāng)著我的面吹牛榕酒,可吹牛的內(nèi)容都是我干的胚膊。 我是一名探鬼主播,決...
    沈念sama閱讀 38,349評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼想鹰,長吁一口氣:“原來是場噩夢啊……” “哼紊婉!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起辑舷,我...
    開封第一講書人閱讀 36,979評論 0 259
  • 序言:老撾萬榮一對情侶失蹤喻犁,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后何缓,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體肢础,經(jīng)...
    沈念sama閱讀 43,469評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,938評論 2 323
  • 正文 我和宋清朗相戀三年碌廓,在試婚紗的時候發(fā)現(xiàn)自己被綠了传轰。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,059評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡谷婆,死狀恐怖慨蛙,靈堂內(nèi)的尸體忽然破棺而出辽聊,到底是詐尸還是另有隱情,我是刑警寧澤股淡,帶...
    沈念sama閱讀 33,703評論 4 323
  • 正文 年R本政府宣布身隐,位于F島的核電站,受9級特大地震影響唯灵,放射性物質(zhì)發(fā)生泄漏贾铝。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,257評論 3 307
  • 文/蒙蒙 一埠帕、第九天 我趴在偏房一處隱蔽的房頂上張望垢揩。 院中可真熱鬧,春花似錦敛瓷、人聲如沸叁巨。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,262評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽锋勺。三九已至,卻和暖如春狡蝶,著一層夾襖步出監(jiān)牢的瞬間庶橱,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工贪惹, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留苏章,地道東北人。 一個月前我還...
    沈念sama閱讀 45,501評論 2 354
  • 正文 我出身青樓奏瞬,卻偏偏與公主長得像枫绅,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子硼端,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,792評論 2 345

推薦閱讀更多精彩內(nèi)容