鏈路收集大體邏輯
這里先不分析skywalking是如何自動收集數(shù)據(jù)的,而是說一下agent在收集后如何存儲與發(fā)送給collector,這部分的架構(gòu)關(guān)系到性能開銷與對服務(wù)的影響
大體邏輯如下:
agent內(nèi)部緩存維護(hù)了一個生產(chǎn)消費(fèi)者拱撵,收集數(shù)據(jù)時將生產(chǎn)的數(shù)據(jù)按分區(qū)放到緩存中芍锚,消費(fèi)者用多線程消費(fèi)數(shù)據(jù),將緩存的數(shù)據(jù)封裝成grpc對象發(fā)送給collector
鏈路數(shù)據(jù)接收與發(fā)送
數(shù)據(jù)的接收與發(fā)送主要在類TraceSegmentServiceClient中處理
其中的一個重要屬性是DataCarrier泥张,它來實(shí)現(xiàn)的生產(chǎn)消費(fèi)模式
private volatile DataCarrier<TraceSegment> carrier;
大致結(jié)構(gòu)如下
DataCarrier
屬性如下:
//一個buffer的大小
private final int bufferSize;
//channel的大小
private final int channelSize;
private Channels<T> channels;
//消費(fèi)者線程池封裝
private ConsumerPool<T> consumerPool;
private String name;
方法#produce生產(chǎn)數(shù)據(jù)
public boolean produce(T data) {
if (consumerPool != null) {
if (!consumerPool.isRunning()) {
return false;
}
}
return this.channels.save(data);
}
channel的save方法
public boolean save(T data) {
//計算放在channel哪個位置
int index = dataPartitioner.partition(bufferChannels.length, data);
//重試次數(shù)
int retryCountDown = 1;
if (BufferStrategy.IF_POSSIBLE.equals(strategy)) {
int maxRetryCount = dataPartitioner.maxRetryCount();
if (maxRetryCount > 1) {
retryCountDown = maxRetryCount;
}
}
for (; retryCountDown > 0; retryCountDown--) {
//保存成功返回true
if (bufferChannels[index].save(data)) {
return true;
}
}
return false;
}
進(jìn)入到Buffer的save方法莹痢,TraceSegmentServiceClient用的策略是IF_POSSIBLE种蘸,緩存位置還有值直接返回,所以消費(fèi)不過來會丟失部分?jǐn)?shù)據(jù)
boolean save(T data) {
//數(shù)組位置自增
int i = index.getAndIncrement();
//不為空的處理
if (buffer[i] != null) {
switch (strategy) {
case BLOCKING:
boolean isFirstTimeBlocking = true;
while (buffer[i] != null) {
if (isFirstTimeBlocking) {
isFirstTimeBlocking = false;
for (QueueBlockingCallback<T> callback : callbacks) {
callback.notify(data);
}
}
try {
Thread.sleep(1L);
} catch (InterruptedException e) {
}
}
break;
case IF_POSSIBLE:
return false;
case OVERRIDE:
default:
}
}
//寫入緩存
buffer[i] = data;
return true;
}
DataCarrier的consume方法初始化消費(fèi)者線程池
public DataCarrier consume(Class<? extends IConsumer<T>> consumerClass, int num, long consumeCycle) {
if (consumerPool != null) {
consumerPool.close();
}
consumerPool = new ConsumerPool<T>(this.name, this.channels, consumerClass, num, consumeCycle);
consumerPool.begin();
return this;
}
參數(shù)consumerClass就是TraceSegmentServiceClient類自己竞膳,實(shí)現(xiàn)具體的消費(fèi)方法航瞭,初始化以后線程池就啟動了
consumerPool方法begin
public void begin() {
if (running) {
return;
}
try {
lock.lock();
//把channel分給不同的thread
this.allocateBuffer2Thread();
for (ConsumerThread consumerThread : consumerThreads) {
consumerThread.start();
}
running = true;
} finally {
lock.unlock();
}
}
cusumerThread的run方法
@Override
public void run() {
running = true;
while (running) {
boolean hasData = consume();
if (!hasData) {
try {
Thread.sleep(consumeCycle);
} catch (InterruptedException e) {
}
}
}
consume();
consumer.onExit();
}
最終會調(diào)到的TraceSegmentServiceClient這個消費(fèi)者的consume方法,將TraceSegment轉(zhuǎn)換成grpc對象發(fā)送給collector
@Override
public void consume(List<TraceSegment> data) {
if (CONNECTED.equals(status)) {
final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.collect(new StreamObserver<Downstream>() {
@Override
public void onNext(Downstream downstream) {
}
@Override
public void onError(Throwable throwable) {
status.finished();
if (logger.isErrorEnable()) {
logger.error(throwable, "Send UpstreamSegment to collector fail with a grpc internal exception.");
}
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(throwable);
}
@Override
public void onCompleted() {
status.finished();
}
});
for (TraceSegment segment : data) {
try {
UpstreamSegment upstreamSegment = segment.transform();
upstreamSegmentStreamObserver.onNext(upstreamSegment);
} catch (Throwable t) {
logger.error(t, "Transform and send UpstreamSegment to collector fail.");
}
}
upstreamSegmentStreamObserver.onCompleted();
status.wait4Finish();
segmentUplinkedCounter += data.size();
} else {
segmentAbandonedCounter += data.size();
}
printUplinkStatus();
}
channel
channel中包含一個Buffer數(shù)組:
//Buffer數(shù)組
private final Buffer<T>[] bufferChannels;
//數(shù)據(jù)分區(qū)策略
private IDataPartitioner<T> dataPartitioner;
//Buffer策略
private BufferStrategy strategy;
Buffer對象
public class Buffer<T> {
//對象數(shù)組
private final Object[] buffer;
private BufferStrategy strategy;
//位置標(biāo)記
private AtomicRangeInteger index;
private List<QueueBlockingCallback<T>> callbacks;
...
抽樣服務(wù)SamplingService
作用是對TraceSegment進(jìn)行抽樣坦辟,鏈路跟蹤必須要考慮的功能刊侯,服務(wù)壓力大時全量收集會占用cpu、內(nèi)存长窄、網(wǎng)絡(luò)等資源
agent通過agent.config
配置檔中的agent.sample_n_per_3_secs
設(shè)置每三秒收集的TraceSegment的個數(shù)滔吠,大于0為開啟狀態(tài),默認(rèn)全量收集
初始化一個3秒的定時任務(wù)
@Override
public void boot() throws Throwable {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
//大于0開啟抽樣
if (Config.Agent.SAMPLE_N_PER_3_SECS > 0) {
on = true;
this.resetSamplingFactor();
ScheduledExecutorService service = Executors
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("SamplingService"));
//定時任務(wù)
scheduledFuture = service.scheduleAtFixedRate(new RunnableWithExceptionProtection(new Runnable() {
@Override
public void run() {
resetSamplingFactor();
}
}, new RunnableWithExceptionProtection.CallbackWhenException() {
@Override public void handle(Throwable t) {
logger.error("unexpected exception.", t);
}
}), 0, 3, TimeUnit.SECONDS);
logger.debug("Agent sampling mechanism started. Sample {} traces in 3 seconds.", Config.Agent.SAMPLE_N_PER_3_SECS);
}
}
抽樣邏輯
public boolean trySampling() {
if (on) {
int factor = samplingFactorHolder.get();
if (factor < Config.Agent.SAMPLE_N_PER_3_SECS) {
boolean success = samplingFactorHolder.compareAndSet(factor, factor + 1);
return success;
} else {
return false;
}
}
return true;
}
skywalking的agent不支持按比例抽樣挠日,比較遺憾