SegmentTrace
包含了從Kafka初始化幌衣,接收數(shù)據(jù)矾削、解析構(gòu)建壤玫、存儲(chǔ);核心的源碼流程如下:
KafkaFetcher
-> TraceSegmentHandler#handle
->SegmentParserServiceImpl#send
-> TraceAnalyzer#doAnalysis
-> AnalysisListener#parsexxx
-> AnalysisListener#build
->SourceReceiver#receive
-> dispatcherManager#forward
-> XXXDispatcher#dispatch
Kafka pull數(shù)據(jù)處理
TraceSegmentHandler#handle
中調(diào)用SegmentParserServiceImpl#send(segment)
public void send(SegmentObject segment) {
final TraceAnalyzer traceAnalyzer = new TraceAnalyzer(moduleManager, listenerManager, config);
traceAnalyzer.doAnalysis(segment);
}
doAnalysis
解析segment
public void doAnalysis(SegmentObject segmentObject) {
if (segmentObject.getSpansList().size() == 0) {
return;
}
createSpanListeners();//創(chuàng)建監(jiān)聽(tīng)器
notifySegmentListener(segmentObject);//處理trace
segmentObject.getSpansList().forEach(spanObject -> {
if (spanObject.getSpanId() == 0) {
notifyFirstListener(spanObject, segmentObject);//根據(jù)第一個(gè)span的信息做一些處理
}
if (SpanType.Exit.equals(spanObject.getSpanType())) {
notifyExitListener(spanObject, segmentObject);
} else if (SpanType.Entry.equals(spanObject.getSpanType())) {
notifyEntryListener(spanObject, segmentObject);//這里有很重要的鏈路的metric信息構(gòu)建
} else if (SpanType.Local.equals(spanObject.getSpanType())) {
notifyLocalListener(spanObject, segmentObject);
} else {
log.error("span type value was unexpected, span type name: {}", spanObject.getSpanType()
.name());
}
});
notifyListenerToBuild();
}
notifyEntryListener
中調(diào)用MultiScopesAnalysisListener#parseEntry
,把上下游的鏈路信息完善到sourceBuilder里哼凯,并添加到entrySourceBuilders中欲间,在build
環(huán)節(jié),進(jìn)一步構(gòu)建成各維度的souce數(shù)據(jù)断部,包括鏈路的trace,以及調(diào)用統(tǒng)計(jì)如調(diào)用次數(shù)猎贴,pxx,響應(yīng)時(shí)長(zhǎng)等metric信息都在這個(gè)環(huán)節(jié)創(chuàng)建蝴光。
...
sourceReceiver.receive(entrySourceBuilder.toAll());
sourceReceiver.receive(entrySourceBuilder.toService());
sourceReceiver.receive(entrySourceBuilder.toServiceInstance());
sourceReceiver.receive(entrySourceBuilder.toEndpoint());
sourceReceiver.receive(entrySourceBuilder.toServiceRelation());
sourceReceiver.receive(entrySourceBuilder.toServiceInstanceRelation());
EndpointRelation endpointRelation = entrySourceBuilder.toEndpointRelation();
sourceReceiver.receive(endpointRelation);
...
從上邊MultiScopesAnalysisListener#build
代碼片段中可以看到包含了Service
她渴、ServiceInstance
、Endpoint
蔑祟、ServiceRelation
趁耗、ServiceInstanceRelation
、EndpointRelation
這些類(lèi)型的Source
疆虚;并將這些Source
提交給sourceReceiver
苛败,其底層封裝的DispatcherManager
會(huì)根據(jù) Source
的類(lèi)型選擇相應(yīng)的SourceDispatcher
,通過(guò)方法dispatch
進(jìn)一步處理。
具體的SourceDispatcher類(lèi)是哪一個(gè)呢径簿?
public class EndpointCallRelationDispatcher implements SourceDispatcher<EndpointRelation> {
@Override
public void dispatch(EndpointRelation source) {
switch (source.getDetectPoint()) {
case SERVER:
serverSide(source);
break;
}
}
從這個(gè)類(lèi)EndpointCallRelationDispatcher#dispatch
的參數(shù)可以看出罢屈,這個(gè)類(lèi)負(fù)責(zé)EndpointRelation
這種類(lèi)型的Source。查看dispatch的實(shí)現(xiàn)有以下這些:
BrowserAppTrafficSourceDispatcher.dispatch(SOURCE)
BrowserErrorLogRecordDispatcher.dispatch(BrowserErrorLog)
DatabaseStatementDispatcher.dispatch(DatabaseSlowStatement)
EndpointCallRelationDispatcher.dispatch(EndpointRelation)
EndpointMetaDispatcher.dispatch(EndpointMeta)
EndpointTrafficDispatcher.dispatch(Endpoint)
InstanceTrafficDispatcher.dispatch(ServiceInstance)
InstanceUpdateDispatcher.dispatch(ServiceInstanceUpdate)
LogRecordDispatcher.dispatch(Log)
NetworkAddressAliasSetupDispatcher.dispatch(NetworkAddressAliasSetup)
SegmentDispatcher.dispatch(Segment)
ServiceCallRelationDispatcher.dispatch(ServiceRelation)
ServiceInstanceCallRelationDispatcher.dispatch(ServiceInstanceRelation)
ServiceMetaDispatcher.dispatch(ServiceMeta)
ServiceTrafficDispatcher.dispatch(Service)
ZipkinSpanRecordDispatcher.dispatch(ZipkinSpan)
從以上清單中不難發(fā)現(xiàn)還缺少好多Source以及對(duì)應(yīng)的Dispatcher類(lèi)型篇亭;這些缺失的類(lèi)缠捌,在Skywalking中是通過(guò)OAL機(jī)制在OAP啟動(dòng)時(shí)動(dòng)態(tài)生成,OAL腳本位于/config文件夾中译蒂,用戶只需更改并重新啟動(dòng)服務(wù)器即可使其生效曼月。但是,OAL腳本還是編譯語(yǔ)言柔昼,OAL運(yùn)行時(shí)會(huì)動(dòng)態(tài)生成Java代碼十嘿。
可以在系統(tǒng)環(huán)境中添加SW_OAL_ENGINE_DEBUG=Y打開(kāi)開(kāi)關(guān),以查看生成了哪些類(lèi)岳锁,在oal-rt目錄下的dispatcher 和 metrics兩個(gè)目錄查看
這些生成的Metric的主要SCOPE為
All
,Service
蹦魔,ServiceInstance
激率,Endpoint
,ServiceRelation
勿决,ServiceInstanceRelation
乒躺,EndpointRelation
。此外低缩,還有一些輔助SCOPE嘉冒。查看官網(wǎng)的SCOPE定義曹货,可以找到所有現(xiàn)有的SCOPE和字段
Source
類(lèi)的scope方法指定了SourceDispatcher的一個(gè)數(shù)字標(biāo)識(shí),
public abstract class Source {
public abstract int scope();
最終這些Source會(huì)在SourceDispatcher的dispatch中讳推,轉(zhuǎn)換成StorageData顶籽,并交由MetricsStreamProcessor#in 進(jìn)入L1、L2的聚合處理银觅,報(bào)警處理礼饱,導(dǎo)出處理。
L1聚合
創(chuàng)建Worker究驴,并構(gòu)建worker鏈路:
- 啟動(dòng)掃描Stream注解的時(shí)候,在StreamAnnotationListener#notify中镊绪,通過(guò)MetricsStreamProcessor#create方法為每種Metrics生成一個(gè)MetricsAggregateWorker(當(dāng)前實(shí)例內(nèi)L1聚合),創(chuàng)建并注冊(cè)一個(gè)這種Metric類(lèi)型的遠(yuǎn)程Worker服務(wù)MetricsPersistentWorker(給其他實(shí)例的數(shù)據(jù)做L2聚合和報(bào)警洒忧、存儲(chǔ))
- 創(chuàng)建MetricsRemoteWorker并指定為MetricsAggregateWorker(L1聚合)的nextWorker蝴韭,當(dāng)完成L1聚合后將通過(guò)MetricsRemoteWorker當(dāng)前的數(shù)據(jù)傳遞給遠(yuǎn)程的Worker服務(wù)MetricsPersistentWorker用于L2處理
-
數(shù)據(jù)在worker鏈路的流傳的邏輯為:MetricsAggregateWorker(本實(shí)例做L1) -> MetricsRemoteWorke(本實(shí)例傳遞給遠(yuǎn)程MetricsPersistentWorker) -> MetricsPersistentWorker(遠(yuǎn)程實(shí)例,完成L2處理)->min級(jí)數(shù)據(jù)存儲(chǔ)/更新->執(zhí)行Hour聚合處理->執(zhí)行day聚合處理->提交給AlarmWorker->提交給ExportWorker
image.png
- MetricsAggregateWorker的一些實(shí)現(xiàn)細(xì)節(jié): 接收到Metrics數(shù)據(jù)后熙侍,放入dataCarrier(10000*2)中榄鉴,然后有一個(gè)線程去消費(fèi)處理Metric,將metric丟入MergableBufferedData中執(zhí)行初次的聚合核行,MergableBufferedData中是一個(gè)map牢硅,遇到id相同的則執(zhí)行聚合
public void accept(final METRICS data) {
final String id = data.id();
final METRICS existed = buffer.get(id);
if (existed == null) {
buffer.put(id, data);
} else {
final boolean isAbandoned = !existed.combine(data);
if (isAbandoned) {
buffer.remove(id);
}
}
}
MetricsPersistentWorker完成L2聚合
MetricsPersistentWorker內(nèi)部使用了讀寫(xiě)buffer緩沖,且buffer是可聚合的
即處理數(shù)據(jù)的時(shí)候芝雪,是:
- 丟入寫(xiě)buffer减余,這個(gè)寫(xiě)buffer在接收數(shù)據(jù)的時(shí)候具有聚合的作用
- 定時(shí)任務(wù)讀buffer,這時(shí)候交換buffer的讀寫(xiě)標(biāo)識(shí)惩系,把之前已寫(xiě)入數(shù)據(jù)的buffer變成讀buffer位岔,將數(shù)據(jù)讀出來(lái),進(jìn)行下一步的處理堡牡。
MetricsRemoteWorker對(duì)應(yīng)的遠(yuǎn)程服務(wù)是MetricsPersistentWorker抒抬,其內(nèi)部有這三個(gè)很重要的worker,從其名字基本就可知道這些worker完成什么任務(wù)晤柄。
this.nextAlarmWorker = Optional.ofNullable(nextAlarmWorker);
this.nextExportWorker = Optional.ofNullable(nextExportWorker);
this.transWorker = Optional.ofNullable(transWorker);
在通過(guò)in方法查看到數(shù)據(jù)寫(xiě)入buffer后擦剑,讀buffer未完成后續(xù)操作的邏輯稍微繞一些,是在PersistenceTimer#start 中開(kāi)啟一個(gè)定時(shí)任務(wù)定時(shí)讀數(shù)據(jù)進(jìn)行處理芥颈,間隔是persistentPeriod(默認(rèn)是3秒)
public void buildBatchRequests(List<PrepareRequest> prepareRequests) {
//取出一批
final List<INPUT> dataList = getCache().read();
//預(yù)處理
prepareBatch(dataList, prepareRequests);
}
prepareBatch中是最核心的邏輯:
- 在prepareBatch中遍歷Metrics
- 每個(gè)metric記錄都要交給transWorker做處理
- 當(dāng)已處理的數(shù)據(jù)滿2000條的時(shí)候?qū)慐S
- 當(dāng)當(dāng)前批次全部處理完的時(shí)候?qū)慐S
- 寫(xiě)ES的時(shí)候惠勒,如果記錄已存在,則先聚合老數(shù)據(jù)再更新
- 寫(xiě)ES完成后爬坑,嘗試將數(shù)據(jù)交給nextAlarmWorker和nextExportWorker纠屋。