Jaeger 提供了一整套的分布式鏈路追蹤方案,也是最早實(shí)現(xiàn)Opentracing協(xié)議的框架之一勤篮。今天我們來簡(jiǎn)單分析一下java客戶端的Span異步發(fā)送機(jī)制
1.異步發(fā)送需求簡(jiǎn)述
我們知道辑鲤,在Java應(yīng)用程序中鞭盟,如果有一些數(shù)據(jù)不是特別重要涤妒,但是又產(chǎn)生得比較多的時(shí)候搔扁,我們?cè)跒榱吮WC程序性能的情況下就會(huì)選擇采用異步的方式來保存和發(fā)送數(shù)據(jù)渣触。比如日志羡棵,Trace數(shù)據(jù)就是屬于這一類的數(shù)據(jù)。所以這一類的數(shù)據(jù)最適合暫時(shí)保存在內(nèi)存中然后異步存儲(chǔ)昵观。
那么要實(shí)現(xiàn)這類異步的需求有什么具體的要求呢晾腔?
我覺得至少應(yīng)該實(shí)現(xiàn)以下幾點(diǎn)
- 占用的資源不能太多。這其中包括線程資源啊犬,內(nèi)存資源等灼擂。
- 可以定時(shí)將內(nèi)存中的數(shù)據(jù)保存起來。
- 可以批量保存數(shù)據(jù)觉至,提升性能剔应。
- 參數(shù)可配置化
- 最重要的一點(diǎn),無(wú)論如何不能影響主業(yè)務(wù)语御。
2.Jaeger 實(shí)現(xiàn)原理分析
少?gòu)U話峻贮,先上原理圖,所謂一圖勝千言应闯。
下面我們來簡(jiǎn)單分析流程
1.在每一個(gè)線程中都會(huì)產(chǎn)生很多Span數(shù)據(jù)纤控,當(dāng)Span結(jié)束的時(shí)候會(huì)調(diào)用Tracer對(duì)象的reportSpans方法,然后Tracer就會(huì)委托給Reporter對(duì)象來發(fā)送Span數(shù)據(jù)碉纺。
public class JaegerSpan implements Span{
@Override
public void finish() {
if (computeDurationViaNanoTicks) {
long nanoDuration = tracer.clock().currentNanoTicks() - startTimeNanoTicks;
finishWithDuration(nanoDuration / 1000);
} else {
finish(tracer.clock().currentTimeMicros());
}
}
@Override
public void finish(long finishMicros) {
finishWithDuration(finishMicros - startTimeMicroseconds);
}
private void finishWithDuration(long durationMicros) {
synchronized (this) {
if (finished) {
log.warn("Span has already been finished; will not be reported again.");
return;
}
finished = true;
this.durationMicroseconds = durationMicros;
}
// 只有需要采樣的時(shí)候才發(fā)送Span
if (context.isSampled()) {
// 委托給Tracer發(fā)送
tracer.reportSpan(this);
}
}
// other functions
// ....
}
public class JaegerTracer implements Tracer, Closeable{
void reportSpan(JaegerSpan span) {
// 委托給Reporter發(fā)送
reporter.report(span);
metrics.spansFinished.inc(1);
}
// other functions
// ....
}
2.然后呢船万,Reporter對(duì)象比較會(huì)騙人,它實(shí)際上呢骨田,并沒有真正的發(fā)送給出去耿导,而是將Span數(shù)據(jù)包裹一下,以Command的形式加到了線程安全的阻塞隊(duì)列中态贤。然后Reporter對(duì)象又開啟了兩個(gè)異步線程:
jaeger.RemoteReporter-QueueProcessor 負(fù)責(zé)不斷地消費(fèi)阻塞對(duì)象的Command對(duì)象舱呻,然后丟給Sender的緩沖區(qū) 。
jaeger.RemoteReporter-FlushTimer 定時(shí)地發(fā)送FlushCommand悠汽,然后讓Sender Flush自己的緩存區(qū)
所以箱吕,我們看到隊(duì)列中有多種Command,如果隊(duì)列畫成圖的話柿冲,就類似于下面這個(gè)樣子殖氏。
其實(shí)還有另外一種Command(CloseCommand),后面我們?cè)僦v
public class RemoteReporter implements Reporter {
private RemoteReporter(Sender sender, int flushInterval, int maxQueueSize, int closeEnqueueTimeout,
Metrics metrics) {
this.sender = sender;
this.metrics = metrics;
this.closeEnqueueTimeout = closeEnqueueTimeout;
commandQueue = new ArrayBlockingQueue<Command>(maxQueueSize);
// start a thread to append spans
queueProcessor = new QueueProcessor();
queueProcessorThread = new Thread(queueProcessor, "jaeger.RemoteReporter-QueueProcessor");
queueProcessorThread.setDaemon(true);
queueProcessorThread.start();
flushTimer = new Timer("jaeger.RemoteReporter-FlushTimer", true /* isDaemon */);
flushTimer.schedule(
new TimerTask() {
@Override
public void run() {
flush();
}
},
flushInterval,
flushInterval);
}
@Override
public void report(JaegerSpan span) {
// Its better to drop spans, than to block here
// 注意這里用的是offer方法姻采。如果超過了隊(duì)列大小,那么就會(huì)丟棄后來的span數(shù)據(jù)。
// 而且這里不是簡(jiǎn)單地把span加入到隊(duì)列中慨亲,而是用Command包裝了一下婚瓜。這就是實(shí)現(xiàn)定時(shí)flush數(shù)據(jù)的秘訣。我們下面來分析
boolean added = commandQueue.offer(new AppendCommand(span));
if (!added) {
metrics.reporterDropped.inc(1);
}
}
public interface Command {
void execute() throws SenderException;
}
class AppendCommand implements Command {
private final JaegerSpan span;
public AppendCommand(JaegerSpan span) {
this.span = span;
}
@Override
public void execute() throws SenderException {
// 單純地委托給send的append方法
sender.append(span);
}
}
/**
* 刷新命令
**/
class FlushCommand implements Command {
@Override
public void execute() throws SenderException {
int n = sender.flush();
metrics.reporterSuccess.inc(n);
}
}
/**
* 阻塞隊(duì)列消費(fèi)者刑棵,不斷地從隊(duì)列中獲取命令巴刻,然后執(zhí)行Command
**/
class QueueProcessor implements Runnable {
private boolean open = true;
@Override
public void run() {
while (open) {
try {
RemoteReporter.Command command = commandQueue.take();
try {
command.execute();
} catch (SenderException e) {
metrics.reporterFailure.inc(e.getDroppedSpanCount());
}
} catch (InterruptedException e) {
log.error("QueueProcessor error:", e);
// Do nothing, and try again on next span.
}
}
}
public void close() {
open = false;
}
}
}
我們可以看到,兩種不同的Command實(shí)際上就是調(diào)用Sender的不同方法
- AppendCommand調(diào)用Sender的append方法
- FlushCommand調(diào)用Sender的flush方法
- 下面我們來看一下Sender的兩個(gè)重要方法append和fush
public abstract class ThriftSender extends ThriftSenderBase implements Sender {
@Override
public int append(JaegerSpan span) throws SenderException {
if (process == null) {
process = new Process(span.getTracer().getServiceName());
process.setTags(JaegerThriftSpanConverter.buildTags(span.getTracer().tags()));
processBytesSize = calculateProcessSize(process);
byteBufferSize += processBytesSize;
}
io.jaegertracing.thriftjava.Span thriftSpan = JaegerThriftSpanConverter.convertSpan(span);
int spanSize = calculateSpanSize(thriftSpan);
// 單個(gè)Span過大就報(bào)錯(cuò)蛉签,并且丟棄這個(gè)Span
if (spanSize > getMaxSpanBytes()) {
throw new SenderException(String.format("ThriftSender received a span that was too large, size = %d, max = %d",
spanSize, getMaxSpanBytes()), null, 1);
}
byteBufferSize += spanSize;
// 如果當(dāng)前的byteBufferSize 小于等于maxSpanBytes小胡陪,則直接加入緩沖區(qū),然后更新一下byteBufferSize
// 如果當(dāng)前的byteBufferSize 大于maxSpanBytes碍舍,則批量發(fā)送數(shù)據(jù)
if (byteBufferSize <= getMaxSpanBytes()) {
spanBuffer.add(thriftSpan);
if (byteBufferSize < getMaxSpanBytes()) {
return 0;
}
return flush();
}
int n;
try {
n = flush();
} catch (SenderException e) {
// +1 for the span not submitted in the buffer above
throw new SenderException(e.getMessage(), e.getCause(), e.getDroppedSpanCount() + 1);
}
spanBuffer.add(thriftSpan);
byteBufferSize = processBytesSize + spanSize;
return n;
}
@Override
public int flush() throws SenderException {
if (spanBuffer.isEmpty()) {
return 0;
}
int n = spanBuffer.size();
try {
// 抽象方法柠座,由具體的協(xié)議發(fā)送者實(shí)現(xiàn)(如udp,Http)
send(process, spanBuffer);
} catch (SenderException e) {
throw new SenderException("Failed to flush spans.", e, n);
} finally {
// 發(fā)送完之后清空緩存區(qū)和重置緩沖區(qū)大小
spanBuffer.clear();
byteBufferSize = processBytesSize;
}
return n;
}
}
從Sender的框架實(shí)現(xiàn)看,如果在發(fā)送的過程報(bào)錯(cuò)了片橡,也不會(huì)重試的妈经。
從Jaeger的實(shí)現(xiàn)來看,到處都對(duì)Span充斥著冷酷無(wú)情啊捧书,能丟就丟吹泡,毫不猶豫。其實(shí)Span也不能怪別人经瓷,因?yàn)閺乃錾捅粧焐狭丝蓙G的標(biāo)簽了爆哑。估計(jì)也只有日志這個(gè)哥們跟它是難兄難弟了,都是可丟棄的舆吮。
3."安全"關(guān)閉
前面我們講到阻塞隊(duì)列中的Command其實(shí)有三種揭朝。第三種其實(shí)是為平滑停機(jī)準(zhǔn)備的。第三種Command代碼如下
class CloseCommand implements Command {
@Override
public void execute() throws SenderException {
queueProcessor.close();
}
}
class QueueProcessor implements Runnable {
private boolean open = true;
@Override
public void run() {
while (open) {
try {
// 執(zhí)行命令
}
}
public void close() {
open = false;
}
}
其實(shí)很簡(jiǎn)單歪泳,就是一旦要關(guān)閉萝勤,就直接不從隊(duì)列中拿數(shù)據(jù)了。那隊(duì)列中的數(shù)據(jù)怎們辦呐伞?怎們辦敌卓,丟啊。
4.總結(jié)
從源代碼的分析過程中伶氢,大家應(yīng)該感覺到了趟径,各個(gè)Tracer組件對(duì)Span是多么的無(wú)情啊。我們?cè)倩剡^頭來癣防,看一下需求
- 占用的資源不能太多蜗巧。這其中包括線程資源,內(nèi)存資源等蕾盯。
主要占用兩個(gè)線程資源幕屹,以及一個(gè)固定大小的隊(duì)列內(nèi)存資源 - 可以定時(shí)將內(nèi)存中的數(shù)據(jù)保存起來。
后臺(tái)啟動(dòng)Timer定時(shí)刷新 - 可以批量保存數(shù)據(jù),提升性能望拖。
Sender中緩沖區(qū)渺尘,可以批量發(fā)送數(shù)據(jù) - 參數(shù)可配置化
隊(duì)列大小以及刷新間隔可以配置 - 最重要的一點(diǎn),無(wú)論如何不能影響主業(yè)務(wù)说敏。
采用異步隊(duì)列鸥跟,異步線程的方式來保證性能以及不影響業(yè)務(wù)。甚至可以使用UDPSender來保證即使服務(wù)端宕機(jī)了也不會(huì)影響客戶端