從Server端談Tracing上報
從上篇文章我們知道,Tracing數據是使用TraceSegmentServiceClient通過GRPC發(fā)送到服務端彼妻,那么服務端是如何接收請求處理的呢?
從Server說起
前面我們說到服務端基于微內核架構初始化了各種Module嫌佑,每個Module又有很多Service,服務啟動時會找到Module進行初始化侨歉,初始化模塊時先初始化依賴模塊再初始化非依賴模塊屋摇,模塊的具體承載對象是ModuleProvider,其中CoreModule顧名思意是核心模塊幽邓,其對象的Provider是CoreModuleProvider炮温,在之前的文章我們已經介紹過CoreModuleProvider初始化過程,其中其實就包括了Server的初始化過程牵舵,其實在Server框架中Server有兩種
- GPRCServer 用于接收 SkyWalking Agent 發(fā)送的 gRPC 請求柒啤。正如前面課時介紹的那樣, SkyWalking 6.x 中的 Trace 上報畸颅、JVM 監(jiān)控上報担巩、服務以及服務實例注冊請求、心跳請求都是通過 gRPC 請求實現的没炒。
- HTTPServer 用于接收 SkyWalking Agent 以及用戶的 Http 請求涛癌。
在 GRPCServer 實現中首先會根據配置初始化 NettyServerBuilder 對象(其中會指定服務監(jiān)聽的地址和端口,以及每個連接上的并發(fā)請求數等參數)送火,然后創(chuàng)建并啟動 io.grpc.Server 接收gRPC 請求拳话。gRPC 的 Java 實現底層是依靠 Netty 實現的,Netty 是一個高性能的開源網絡庫种吸,由于 Java 本身的 NIO API 使用起來比較麻煩弃衍,而 Netty 底層封裝了 Java NIO 并對外提供了簡單易用的 API,所以很多開源軟件的網絡模塊都是使用 Netty 實現的骨稿。
NettyServerBuilder借助于ServerImplBuilder 構建ServerImpl,ServerImpl通過NettyClientTransportServersBuilder構建Server氯质,最終構造出來的Server就是io.grpc.netty.NettyServer败玉,在CoreModuleProvider初始化GRPCServer的代碼如下:
if (moduleConfig.isGRPCSslEnabled()) {
grpcServer = new GRPCServer(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(),
moduleConfig.getGRPCSslCertChainPath(),
moduleConfig.getGRPCSslKeyPath(),
null
);
} else {
grpcServer = new GRPCServer(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort());
}
我們縱觀一下CoreModuleProvider的源代碼谢鹊,發(fā)現還有幾處:
prepare方法
this.registerServiceImplementation(GRPCHandlerRegister.class, new GRPCHandlerRegisterImpl(grpcServer));
start 方法
grpcServer.addHandler(new RemoteServiceHandler(getManager()));
grpcServer.addHandler(new HealthCheckServiceHandler());
GRPCHandlerRegisterImpl 用于實現gRPC的擴展性彻磁,和Serverlet的Filter類似,用于權限驗證或限流等橫向擴展功能的接口注冊辙浑。
這里簡單介紹一下上面start方法里面的兩個 GRPCHandler 的功能:
HealthCheckServiceHandler:在 Cluster 模塊中提供了支持多種第三方服務發(fā)展組件的實現激涤,前文介紹的 cluster-zookeeper-plugin 模塊使用的 curator-x-discovery 擴展庫底層是依賴 Zookeeper 的 Watcher 來監(jiān)聽一個 OAP 服務實例是否可用。但是有的第三方服務發(fā)現組件(例如 Consul)會依靠定期健康檢查(Health Check)來檢查一個 OAP 服務實例是否可用,此時 OAP 就需要保留一個接口來處理健康檢查請求倦踢,這就是 HealthCheckServiceHandler 的核心功能送滞。對 Consul 感興趣的同學,可以參考下面兩篇文檔:
https://www.consul.io/docs/agent/checks.html
https://github.com/grpc/grpc/blob/master/doc/health-checking.mdRemoteServiceHandler:OAP 集群中各個 OAP 實例節(jié)點之間通信的接口辱挥,在后面會詳細介紹該 GRPCHandler 的實現以及通信方式犁嗅。
其實真正的上報Tracing數據的處理是在SharingServerModule模塊提供的SharingServerModuleProvider處理的,SkyWalking OAP 需要接收外部請求的地方還是挺多的晤碘,例如 Agent 上報的監(jiān)控數據褂微、 SkyWalking Rocketbot 的查詢請求、OAP 集群中節(jié)點之間的相互通信园爷,等等宠蚂。除了 CoreModuleProvider 中會啟動 Server 組件之外,sharing-server-plugin 模塊中也可以啟動單獨的一套 Server 實例童社,并監(jiān)聽獨立的端口求厕,這套 Server 實例主要用來接收 Agent 的注冊請求、上報的 JVM 監(jiān)控數據以及 Trace 數據扰楼。
SharingServerModuleProvider初始化GRPCServer的過程和CoreModuleProvider類似呀癣,唯一的區(qū)別是SharingServerModuleProvider可以復用CoreModuleProvider創(chuàng)建的GRPCServer。
trace-receiver-plugin
trace-receiver-plugin其實是真正處理上報數據的項目灭抑,負責的模塊是TraceModule十艾,Provider是TraceModuleProvider,TraceModule依賴了5個模塊腾节,其中兩個分別是CoreModule和SharingServerModule,首先我們看一下start方法的代碼:
@Override
public void start() {
GRPCHandlerRegister grpcHandlerRegister = getManager().find(SharingServerModule.NAME)
.provider()
.getService(GRPCHandlerRegister.class);
HTTPHandlerRegister httpHandlerRegister = getManager().find(SharingServerModule.NAME)
.provider()
.getService(HTTPHandlerRegister.class);
TraceSegmentReportServiceHandler traceSegmentReportServiceHandler = new TraceSegmentReportServiceHandler(getManager());
grpcHandlerRegister.addHandler(traceSegmentReportServiceHandler);
grpcHandlerRegister.addHandler(new TraceSegmentReportServiceHandlerCompat(traceSegmentReportServiceHandler));
httpHandlerRegister.addHandler(new TraceSegmentReportHandler(getManager()),
Collections.singletonList(HttpMethod.POST)
);
}
可以看到start方法使用SharingServerModule的GRPCHandlerRegister服務注冊了兩個自己的處理器分別是TraceSegmentReportServiceHandler荤牍,TraceSegmentReportServiceHandlerCompat案腺,TraceSegmentReportServiceHandler 負責接收 Agent 發(fā)送來的 SegmentObject 數據,并調用 SegmentParserServiceImpl.send() 方法進行解析康吵,然后再將SegmentObject交給TraceAnalyzer.doAnalysis()方法處理劈榨。
public void doAnalysis(SegmentObject segmentObject) {
if (segmentObject.getSpansList().size() == 0) {
return;
}
createSpanListeners();
notifySegmentListener(segmentObject);
segmentObject.getSpansList().forEach(spanObject -> {
if (spanObject.getSpanId() == 0) {
notifyFirstListener(spanObject, segmentObject);
}
if (SpanType.Exit.equals(spanObject.getSpanType())) {
notifyExitListener(spanObject, segmentObject);
} else if (SpanType.Entry.equals(spanObject.getSpanType())) {
notifyEntryListener(spanObject, segmentObject);
} else if (SpanType.Local.equals(spanObject.getSpanType())) {
notifyLocalListener(spanObject, segmentObject);
} else {
log.error("span type value was unexpected, span type name: {}", spanObject.getSpanType()
.name());
}
});
notifyListenerToBuild();
}
上面的其實分為三步
- 首先注冊監(jiān)聽
- 將數據將給第一步注冊的監(jiān)聽處理
- 最后調用AnalysisListener將數據將給聚合流程處理
注冊監(jiān)聽
private void createSpanListeners() {
listenerManager.getSpanListenerFactories()
.forEach(
spanListenerFactory -> analysisListeners.add(
spanListenerFactory.create(moduleManager, config)));
}
所有監(jiān)聽接口通過SegmentParserListenerManager進行管理,由剛才的代碼我們看到監(jiān)聽分為6種
- FirstAnalysisListener
- ExitAnalysisListener
- EntryAnalysisListener
- LocalAnalysisListener
- SegmentListener
- AnalysisListener
剛才我們說到SharingServerModule依賴于5個模塊晦嵌,其中一個就是AnalyzerModule的AnalyzerModuleProvider同辣,那么AnalyzerModuleProvider就是負責SegmentParserListenerManager對象創(chuàng)建和監(jiān)聽器初始化的入口,具體方法是listenerManager
private SegmentParserListenerManager listenerManager() {
SegmentParserListenerManager listenerManager = new SegmentParserListenerManager();
if (moduleConfig.isTraceAnalysis()) {
listenerManager.add(new RPCAnalysisListener.Factory(getManager()));
listenerManager.add(new EndpointDepFromCrossThreadAnalysisListener.Factory(getManager()));
listenerManager.add(new NetworkAddressAliasMappingListener.Factory(getManager()));
}
listenerManager.add(new SegmentAnalysisListener.Factory(getManager(), moduleConfig));
return listenerManager;
}
上面注冊的監(jiān)聽實例囊括了我們剛才提到的6種監(jiān)聽接口惭载,就是說在極端情況下TraceAnalyzer.doAnalysis()方法會使用上面所有的實例進行數據加工處理旱函,其中RPCAnalysisListener,EndpointDepFromCrossThreadAnalysisListener描滔,NetworkAddressAliasMappingListener為可選流程棒妨,通過moduleConfig.isTraceAnalysis()開關開啟,限于篇幅含长,所以下面我們重點介紹SegmentAnalysisListener券腔。
FirstAnalysisListener
FirstSpanListener 的 parseFirst() 方法處理 TraceSegment 中的第一個 Span伏穆,這里只有 SegmentSpanListener 實現了該方法,具體實現分為三步:
- 檢測當前 TraceSegment 是否被成功采樣纷纫。
- 將 segmentCoreInfo 中記錄的 TraceSegment 數據拷貝到 segment 字段中枕扫。
- 將 endpointNameId 記錄到 firstEndpointId 字段,通過前面的分析我們知道辱魁,endpointNameId 在 Spring MVC 里面對應的是 URL铡原,在 Dubbo 里面對應的是 RPC 請求 path 與方法名稱的拼接。
上面說的具體實現是在SegmentAnalysisListener的parseFirst方法
@Override
public void parseFirst(SpanObject span, SegmentObject segmentObject) {
// 檢測是否采樣成功
if (sampleStatus.equals(SAMPLE_STATUS.IGNORE)) {
return;
}
if (StringUtil.isEmpty(serviceId)) {
serviceName = namingControl.formatServiceName(segmentObject.getService());
serviceId = IDManager.ServiceID.buildId(
serviceName,
true
);
}
long timeBucket = TimeBucket.getRecordTimeBucket(startTimestamp);
// 將 segmentCoreInfo 中記錄的 TraceSegment 數據拷貝到 segment 字段中商叹。
segment.setSegmentId(segmentObject.getTraceSegmentId());
segment.setServiceId(serviceId);
segment.setServiceInstanceId(IDManager.ServiceInstanceID.buildId(
serviceId,
namingControl.formatInstanceName(segmentObject.getServiceInstance())
));
segment.setLatency(duration);
segment.setStartTime(startTimestamp);
segment.setTimeBucket(timeBucket);
segment.setIsError(BooleanUtils.booleanToValue(isError));
segment.setDataBinary(segmentObject.toByteArray());
// 將 endpointNameId 記錄到 firstEndpointId 字段燕刻,通過前面的分析我們知道,/endpointNameId 在 Spring MVC 里面對應的是 URL剖笙,在 Dubbo 里面對應的是 RPC 請求 path 與方法名稱的拼接卵洗。
endpointName = namingControl.formatEndpointName(serviceName, span.getOperationName());
endpointId = IDManager.EndpointID.buildId(
serviceId,
endpointName
);
}
EntryAnalysisListener
EntrySpanListener 實現的 parseEntry() 方法對于 Entry 類型的 Span 進行處理。SegmentAnalysisListener.parseEntry() 方法只做了一件事弥咪,就是將 serviceName 記錄到 endpointId 字段中:
@Override
public void parseEntry(SpanObject span, SegmentObject segmentObject) {
if (StringUtil.isEmpty(serviceId)) {
serviceName = namingControl.formatServiceName(segmentObject.getService());
serviceId = IDManager.ServiceID.buildId(
serviceName, true);
}
endpointName = namingControl.formatEndpointName(serviceName, span.getOperationName());
endpointId = IDManager.EndpointID.buildId(
serviceId,
endpointName
);
}
LocalAnalysisListener
主要是為了解決localspan和exitspan丟失元數據的問題过蹂,SegmentAnalysisListener并不會實現LocalAnalysisListener,但是RPCAnalysisListener和EndpointDepFromCrossThreadAnalysisListener確有實現聚至,拿EndpointDepFromCrossThreadAnalysisListener為例酷勺,其主要的實現是將SegmentObject里面的SpanObject信息緩存到RPCTrafficSourceBuilder對象,然后在build環(huán)節(jié)交給聚合流程處理扳躬,限于篇幅脆诉,這里不詳細介紹。
ExitAnalysisListener
同樣SegmentAnalysisListener并不會實現ExitAnalysisListener贷币,但是RPCAnalysisListener和EndpointDepFromCrossThreadAnalysisListener確有實現击胜,以RPCAnalysisListener為例,其主要的實現是將將SegmentObject里面的SpanObject信息緩存到List<RPCTrafficSourceBuilder> callingOutTraffic,List<DatabaseSlowStatementBuilder> dbSlowStatementBuilders兩個變量役纹,然后在build環(huán)節(jié)交給聚合流程處理偶摔,限于篇幅,這里不詳細介紹促脉。
SegmentListener
主要實現在SegmentAnalysisListener的parseSegment方法辰斋,主要用來給segment對象設置TraceId,根據Span的起始時間設置抽樣狀態(tài)sampleStatus
AnalysisListener
SegmentListener繼承了AnalysisListener瘸味,調用的是build方法
@Override
public void build() {
if (sampleStatus.equals(SAMPLE_STATUS.IGNORE)) {
if (log.isDebugEnabled()) {
log.debug("segment ignored, trace id: {}", segment.getTraceId());
}
return;
}
if (log.isDebugEnabled()) {
log.debug("segment listener build, segment id: {}", segment.getSegmentId());
}
segment.setEndpointId(endpointId);
sourceReceiver.receive(segment);
addAutocompleteTags();
}
上面的方法其實就是做了一件事件宫仗,將segment對象的信息,交給sourceReceiver處理硫戈。
sourceReceiver
sourceReceiver來自于CoreModule锰什,我們在CoreModuleProvider的構造方法可以發(fā)現其實它是一個SourceReceiverImpl實例,SourceReceiver 底層封裝的 DispatcherManager 會根據 Segment 選擇相應的 SourceDispatcher 實現 —— SegmentDispatcher 進行分發(fā),DispatcherManager的scan方法會掃描所有SourceDispatcher接口實現類汁胆,并將其泛型類型的scope方法返回的內容即DefaultScopeDefine.SEGMENT作為KEY緩存到一個Map對象dispatcherMap,forward方法會根據傳入實例的類型的scope方法從dispatcherMap拿到SourceDispatcher即SegmentDispatcher調用其dispatch處理梭姓。
public void forward(ISource source) {
if (source == null) {
return;
}
List<SourceDispatcher> dispatchers = dispatcherMap.get(source.scope());
/**
* Dispatcher is only generated by oal script analysis result.
* So these will/could be possible, the given source doesn't have the dispatcher,
* when the receiver is open, and oal script doesn't ask for analysis.
*/
if (dispatchers != null) {
source.prepare();
for (SourceDispatcher dispatcher : dispatchers) {
dispatcher.dispatch(source);
}
}
}
SegmentDispatcher.dispatch() 方法中會將 Segment 中的數據拷貝到 SegmentRecord 對象中。
SegmentRecord 繼承了 StorageData 接口嫩码,與前面介紹的 RegisterSource 以及 Metrics 的實現類似誉尖,通過注解指明了 Trace 數據存儲的 index 名稱的前綴(最終寫入的 index 是由該前綴以及 TimeBucket 后綴兩部分共同構成)以及各個字段對應的 field 名稱,如下所示:
// @Stream 注解的 name 屬性指定了 index 的名稱(index 前綴)铸题,processor 指定了處理該類型數據的 StreamProcessor 實現
@SuperDataset
@Stream(name = SegmentRecord.INDEX_NAME, scopeId = DefaultScopeDefine.SEGMENT, builder = SegmentRecord.Builder.class, processor = RecordStreamProcessor.class)
public class SegmentRecord extends Record {
public static final String INDEX_NAME = "segment";
public static final String SEGMENT_ID = "segment_id";
public static final String TRACE_ID = "trace_id";
public static final String SERVICE_ID = "service_id";
public static final String SERVICE_INSTANCE_ID = "service_instance_id";
public static final String ENDPOINT_ID = "endpoint_id";
public static final String START_TIME = "start_time";
public static final String LATENCY = "latency";
public static final String IS_ERROR = "is_error";
public static final String DATA_BINARY = "data_binary";
public static final String TAGS = "tags";
@Setter
@Getter
@Column(columnName = SEGMENT_ID, length = 150)
private String segmentId;
@Setter
@Getter
@Column(columnName = TRACE_ID, length = 150)
@BanyanDBGlobalIndex(extraFields = {})
private String traceId;
@Setter
@Getter
@Column(columnName = SERVICE_ID)
@BanyanDBShardingKey(index = 0)
private String serviceId;
@Setter
@Getter
@Column(columnName = SERVICE_INSTANCE_ID)
@BanyanDBShardingKey(index = 1)
private String serviceInstanceId;
@Setter
@Getter
@Column(columnName = ENDPOINT_ID)
private String endpointId;
@Setter
@Getter
@Column(columnName = START_TIME)
private long startTime;
@Setter
@Getter
@Column(columnName = LATENCY)
private int latency;
@Setter
@Getter
@Column(columnName = IS_ERROR)
@BanyanDBShardingKey(index = 2)
private int isError;
@Setter
@Getter
@Column(columnName = DATA_BINARY, storageOnly = true)
private byte[] dataBinary;
@Setter
@Getter
@Column(columnName = TAGS, indexOnly = true)
private List<String> tags;
// ...其它代碼
在 SegmentRecord 的父類 —— Record 中還定義了一個 timeBucket 字段(long 類型)铡恕,對應的 field 名稱是 "time_bucket"。
RecordStreamProcessor 的核心功能是為每個 Record 類型創(chuàng)建相應的 worker 鏈丢间。在 RecordStreamProcessor 中探熔,每個 Record 類型對應的 worker 鏈中只有一個worker 實例 —— RecordPersistentWorker。RecordPersistentWorker 負責 SegmentRecord 數據的持久化烘挫。
@Override
public void in(Record record) {
try {
InsertRequest insertRequest = recordDAO.prepareBatchInsert(model, record);
batchDAO.insert(insertRequest);
} catch (IOException e) {
LOGGER.error(e.getMessage(), e);
}
}
recordDAO由StorageDAO創(chuàng)建诀艰,StorageDAO由StorageModule初始化,StorageDAO對應的實例是 StorageEsDAO饮六,創(chuàng)建的batchDAO為RecordEsDAO其垄,batchDAO其實對應的是BatchProcessEsDAO,最后RecordPersistentWorker 生成的全部 IndexRequest 請求會交給全局唯一的 BatchProcessEsDAO 實例批量發(fā)送到 ES 卤橄,完成寫入绿满。
Metrics
有了上面的基礎知道,我們來研究一下聚合到底是怎么做的窟扑。上面我們提到在RPCAnalysisListener的parseExit方法會將Tracing上報數據緩存到dbSlowStatementBuilders集合喇颁,這個集合就是用來收集上報數據中存儲的慢查詢數據用的。收集到數據之后在build方法將數據交給sourceReceiver處理辜膝,sourceReceiver接收的其實是一個DatabaseSlowStatement對象无牵,所以其對應的處理實現類是DatabaseStatementDispatcher,它的dispatch方法其實就是做了聚合的事情厂抖,因為對于慢查詢來說,我們一般都會去關注前N個慢查詢信息克懊,
@Override
public void dispatch(DatabaseSlowStatement source) {
TopNDatabaseStatement statement = new TopNDatabaseStatement();
statement.setId(source.getId());
statement.setServiceId(source.getDatabaseServiceId());
statement.setLatency(source.getLatency());
statement.setStatement(source.getStatement());
statement.setTimeBucket(source.getTimeBucket());
statement.setTraceId(source.getTraceId());
TopNStreamProcessor.getInstance().in(statement);
}
TopNStreamProcessor 為每個 TopN 類型(其實只有 TopNDatabaseStatement)提供的 Worker 鏈中只有一個 Worker —— TopNWorker忱辅。與前文介紹的 MetricsPersistentWorker 以及 RecordPersistentWorker 類似,TopNWorker 也繼承了 PersistenceWorker 抽象類谭溉,其結構如下圖所示墙懂,TopNWorker 也是先將 TopNDatabaseStatement 暫存到 DataCarrier,然后由后臺 Consumer 線程定期讀取并調用 onWork() 方法進行處理扮念。
在 TopNWorker.onWorker() 方法中會將 TopNDatabaseStatement 暫存到 LimitedSizeBufferedData 中進行排序损搬。LimitedSizeBufferedData 使用雙隊列模式,繼承了 BufferedData 中進行排序。LimitedSizeBufferedData 底層的隊列實現是 HashMap嵌套LinkedList數組HashMap<String, LinkedList<STORAGE_DATA>>巧勤,其 data 字段(Map 類型)中維護了每個存儲服務的慢查詢(即 TopNDatabaseStatement)列表嵌灰,每個列表都是定長的(由 limitedSize 字段指定,默認 50)颅悉,在調用 ReadWriteSafeCache.write() 方法寫入的時候會按照 latency 從大到小排列沽瞭,并只保留最多 50 個元素。
那么數據是如何寫入的剩瓶?
PersistenceTimer
在CoreModuleProvider的notifyAfterCompleted方法會啟動一個PersistenceTimer實例并調用其start 方法PersistenceTimer.INSTANCE.start(getManager(), moduleConfig)處理PersistenceWorker的入庫驹溃,在它的extractDataAndSave方法主要處理兩類PersistenceWorker
- TopNStreamProcessor
- MetricsStreamProcessor
調用PersistenceWorker的buildBatchRequests拿到List<PrepareRequest>之后再調用batchDAO進行入庫操作,batchDAO就是IBatchDAO延曙,我們之前有介紹豌鹤,這里不再贅述。
MetricsStreamProcessor
MetricsStreamProcessor相比TopNStreamProcessor更為復雜的聚合枝缔,主要是解決度量數據的按時間維度的精度模糊聚合布疙,舉個例子,JVM數據一般按年月日時分秒進行記錄魂仍,我們需要按小時或者分鐘的曲線圖拐辽,所以需要將低級單位的數據向高級單位進行聚合,這樣不但可以節(jié)省存儲擦酌,也可以提高查詢性能俱诸,因為監(jiān)控數據不太需要太精確,只需要看到一個發(fā)展的趨勢就可以了赊舶,所以我們可以采用聚合的方式對數據進行模糊化處理睁搭,我們把這個過程叫做Downsampling。
MetricsStreamProcessor 中為每個 Metrics 類型維護了一個 Worker 鏈笼平,
private Map<Class<? extends Metrics>, MetricsAggregateWorker> entryWorkers = new HashMap<>();
MetricsStreamProcessor 初始化 entryWorkers 集合的核心邏輯也是在 create() 方法中园骆,具體方法如下:
public void create(ModuleDefineHolder moduleDefineHolder,
StreamDefinition stream,
Class<? extends Metrics> metricsClass) throws StorageException {
final StorageBuilderFactory storageBuilderFactory = moduleDefineHolder.find(StorageModule.NAME)
.provider()
.getService(StorageBuilderFactory.class);
final Class<? extends StorageBuilder> builder = storageBuilderFactory.builderOf(
metricsClass, stream.getBuilder());
StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
IMetricsDAO metricsDAO;
try {
metricsDAO = storageDAO.newMetricsDao(builder.getDeclaredConstructor().newInstance());
} catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
throw new UnexpectedException("Create " + stream.getBuilder().getSimpleName() + " metrics DAO failure.", e);
}
ModelCreator modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ModelCreator.class);
DownSamplingConfigService configService = moduleDefineHolder.find(CoreModule.NAME)
.provider()
.getService(DownSamplingConfigService.class);
MetricsPersistentWorker hourPersistentWorker = null;
MetricsPersistentWorker dayPersistentWorker = null;
MetricsTransWorker transWorker = null;
final MetricsExtension metricsExtension = metricsClass.getAnnotation(MetricsExtension.class);
/**
* All metrics default are `supportDownSampling` and `insertAndUpdate`, unless it has explicit definition.
*/
boolean supportDownSampling = true;
boolean supportUpdate = true;
boolean timeRelativeID = true;
if (metricsExtension != null) {
supportDownSampling = metricsExtension.supportDownSampling();
supportUpdate = metricsExtension.supportUpdate();
timeRelativeID = metricsExtension.timeRelativeID();
}
if (supportDownSampling) {
if (configService.shouldToHour()) {
Model model = modelSetter.add(
metricsClass, stream.getScopeId(), new Storage(stream.getName(), timeRelativeID, DownSampling.Hour),
false
);
hourPersistentWorker = downSamplingWorker(moduleDefineHolder, metricsDAO, model, supportUpdate);
}
if (configService.shouldToDay()) {
Model model = modelSetter.add(
metricsClass, stream.getScopeId(), new Storage(stream.getName(), timeRelativeID, DownSampling.Day),
false
);
dayPersistentWorker = downSamplingWorker(moduleDefineHolder, metricsDAO, model, supportUpdate);
}
transWorker = new MetricsTransWorker(
moduleDefineHolder, hourPersistentWorker, dayPersistentWorker);
}
Model model = modelSetter.add(
metricsClass, stream.getScopeId(), new Storage(stream.getName(), timeRelativeID, DownSampling.Minute),
false
);
MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker(
moduleDefineHolder, metricsDAO, model, transWorker, supportUpdate);
String remoteReceiverWorkerName = stream.getName() + "_rec";
IWorkerInstanceSetter workerInstanceSetter = moduleDefineHolder.find(CoreModule.NAME)
.provider()
.getService(IWorkerInstanceSetter.class);
workerInstanceSetter.put(remoteReceiverWorkerName, minutePersistentWorker, metricsClass);
MetricsRemoteWorker remoteWorker = new MetricsRemoteWorker(moduleDefineHolder, remoteReceiverWorkerName);
MetricsAggregateWorker aggregateWorker = new MetricsAggregateWorker(
moduleDefineHolder, remoteWorker, stream.getName(), l1FlushPeriod);
entryWorkers.put(metricsClass, aggregateWorker);
}
其中 minutePersistentWorker 是一定會存在的,其他 DownSampling(Hour寓调、Day锌唾、Month) 對應的 PersistentWorker 則會根據配置的創(chuàng)建并添加,在 CoreModuleProvider.prepare() 方法中有下面這行代碼夺英,會獲取 Downsampling 配置并保存于 DownsamplingConfigService 對象中配置晌涕。后續(xù)創(chuàng)建上述 Worker 時,會從中獲取配置的 DownSampling痛悯。
this.registerServiceImplementation(
DownSamplingConfigService.class, new DownSamplingConfigService(moduleConfig.getDownsampling()));
MetricsAggregateWorker
MetricsAggregateWorker為聚合入口的第一個Worker,其功能就是進行簡單的聚合,MetricsAggregateWorker 在收到 Metrics 數據的時候余黎,會先寫到內部的 DataCarrier 中緩存,然后由 Consumer 線程(都屬于名為 “METRICS_L1_AGGREGATION” 的 BulkConsumePool)消費并進行聚合载萌,并將聚合結果寫入到 MergeDataCache 中的 current 隊列暫存惧财。
同時巡扇,Consumer 會定期(默認1秒,通過 METRICS_L1_AGGREGATION_SEND_CYCLE 配置修改)觸發(fā) current 隊列和 last 隊列的切換垮衷,然后讀取 last 隊列中暫存的數據厅翔,并發(fā)送到下一個 Worker 中處理。
寫入 DataCarrier 的邏輯在前面已經分析過了帘靡,這里不再贅述知给。下面深入分析兩個點:
- Consumer 線程消費 DataCarrier 并聚合監(jiān)控數據的相關實現。
- Consumer 線程定期清理 MergeDataCache 緩沖區(qū)并發(fā)送監(jiān)控數據的相關實現描姚。
Consumer 線程在消費 DataCarrier 數據的時候涩赢,首先會進行 Metrics 聚合(即相同 Metrics 合并成一個),然后寫入 MergeDataCache 中轩勘,實現如下:
/**
* Dequeue consuming. According to {@link IConsumer#consume(List)}, this is a serial operation for every work
* instance.
*
* @param metricsList from the queue.
*/
private void onWork(List<Metrics> metricsList) {
metricsList.forEach(metrics -> {
aggregationCounter.inc();
mergeDataCache.accept(metrics);
});
flush();
}
將聚合后的 Metrics 寫入 MergeDataCache 之后筒扒,Consumer 線程會每隔一秒將 MergeDataCache 中的數據發(fā)送到下一個 Worker 處理,相關實現如下:
private void flush() {
long currentTime = System.currentTimeMillis();
if (currentTime - lastSendTime > l1FlushPeriod) {
mergeDataCache.read().forEach(
data -> {
if (log.isDebugEnabled()) {
log.debug(data.toString());
}
nextWorker.in(data);
}
);
lastSendTime = currentTime;
}
}
MetricsRemoteWorker
MetricsAggregateWorker 指向的下一個 Worker 是 MetricsRemoteWorker 绊寻,底層是通過 RemoteSenderService 將監(jiān)控數據發(fā)送到遠端節(jié)點花墩。
它提供了三種不同的發(fā)送策略:
- HashCode 策略:根據 Hash 值選擇發(fā)送到目標 OAP 節(jié)點。MetricsRemoteWorker 默認使用該策略澄步。
- Rolling 策略:輪訓方式選擇目標 OAP 節(jié)點冰蘑。
- ForeverFirst 策略:始終選擇第一個 OAP 節(jié)點作為目標節(jié)點。RegisterRemoteWorker 默認使用該策略村缸。
@Override
public final void in(Metrics metrics) {
try {
remoteSender.send(remoteReceiverWorkerName, metrics, Selector.HashCode);
} catch (Throwable e) {
log.error(e.getMessage(), e);
}
}
為什么要發(fā)到其他 OAP 節(jié)點進行處理呢祠肥?
在 CoreModuleProvider 啟動過程中,我們可以看到 OAP 節(jié)點的角色選擇邏輯梯皿,如下所示:
if (Mixed.name().equalsIgnoreCase(moduleConfig.getRole()) ||
Aggregator.name().equalsIgnoreCase(moduleConfig.getRole())) {
RemoteInstance gRPCServerInstance = new RemoteInstance(
new Address(moduleConfig.getGRPCHost(),
moduleConfig.getGRPCPort(), true));
// 只有Mixed仇箱、Aggregator兩種角色的OAP節(jié)點才會通過Cluster模塊進行注冊
this.getManager().find(ClusterModule.NAME).provider()
.getService(ClusterRegister.class)
.registerRemote(gRPCServerInstance);
}
在 application.yml 配置文件中可以配置 Mixed、Receiver东羹、Aggregator 三種角色:
- Receiver 節(jié)點:負責接收 Agent 請求并進行 L1 級別的聚合處理剂桥,后續(xù)的 L2 級別的聚合操作由其他兩種類型的節(jié)點處理。
- Mixed 節(jié)點:負責接收 Agent 請求以及其他 OAP 節(jié)點 L1 聚合結果属提,進行 L1 級別和 L2 級別的聚合處理权逗。
- Aggregator 節(jié)點和:負責接收其他 OAP節(jié)點的 L1 聚合結果,進行 L2 級別的聚合處理冤议。
正是因為集群環(huán)境中不同的服務有不同的角色旬迹,不同步角色做的聚合是不一樣的,所以進行請求轉發(fā)處理求类。
MetricsTransWorker
MetricsRemoteWorker 之后的下一個 worker 是 MetricsTransWorker,其中有兩個字段分別指向兩個不同 Downsampling 粒度的 PersistenceWorker 對象屹耐,如下:
private final MetricsPersistentWorker hourPersistenceWorker;
private final MetricsPersistentWorker dayPersistenceWorker;
MetricsTransWorker.in() 方法會根據上述字段是否為空尸疆,將 Metrics 數據分別轉發(fā)到不同的 PersistenceWorker 中進行處理:
public void in(Metrics metrics) {
// 檢測 Hour椿猎、Day、Month 對應的 PersistenceWorker 是否為空寿弱,若不為空犯眠,
// 則將 Metrics 數據拷貝一份并調整時間窗口粒度,交到相應的
// PersistenceWorker 處理症革,這里省略了具體邏輯
// 最后筐咧,直接轉發(fā)給 minutePersistenceWorker 進行處理
if (Objects.nonNull(minutePersistenceWorker)) {
aggregationMinCounter.inc();
minutePersistenceWorker.in(metrics);
}
}
MetricsPersistentWorker
這個之前已經描述過,這里不再贅述
Logging
SkyWalking OAP 底層支持 ElasticSearch噪矛、H2量蕊、MySQL 等多種持久化存儲,同時也支持讀取其他分布式鏈路追蹤系統(tǒng)的數據艇挨,例如残炮,jaeger(Uber 開源的分布式跟蹤系統(tǒng))、zipkin(Twitter 開源的分布式跟蹤系統(tǒng))缩滨。
首先势就,OAP 存儲了兩種類型的數據:時間相關的數據和非時間相關的數據(與“時序”這個專有名詞區(qū)分一下)。注冊到 OAP 集群的 Service脉漏、ServiceInstance 以及同步的 EndpointName苞冯、NetworkAddress 都是非時間相關的數據,一個穩(wěn)定的服務產生的這些數據是有限的侧巨,我們可以用幾個固定的 ES 索引(或數據庫表)來存儲這些數據舅锄。
而像 JVM 監(jiān)控等都屬于時間相關的數據,它們的數據量會隨時間流逝而線性增加刃泡。如果將這些時間相關的數據存儲到幾個固定的 ES 索引中巧娱,就會導致這些 ES 索引(或數據庫表)非常大,這種顯然是不能落地的烘贴。既然一個 ES 索引(或數據庫表)存不下禁添,一般會考慮切分,常見的切分方式按照時間窗口以及 DownSampling 進行切分桨踪。
Model 與 ES 索引
常見的 ORM 框架會將數據中的表結構映射成 Java 類老翘,表中的一個行數據會映射成一個 Java Bean 對象,在 OAP 的存儲抽象中也有類似的操作锻离。SkyWalking 會將 [Metric + DownSampling] 對應的一系列 ES 索引與一個 Model 對象進行映射铺峭。
Model 對象記錄了對應 ES 索引的核心信息:
- name(String類型):Metric 名稱,在 OAP 創(chuàng)建 ES 索引時會使用
- columns(List類型):ES 索引 中的 Field 集合汽纠,一個 ModelColumn 對象記錄了一個 Field 的名稱卫键、類型等信息。
- capableOfTimeSeries(boolean類型):對應 ES 索引中存儲的數據是否為時間相關的數據虱朵。
- downsampling(Downsampling類型):如果是時間相關的數據莉炉,則需要指定其 Downsampling 單位钓账,可選值有 Second、 Minute絮宁、 Hour梆暮、 Day、 Month绍昂。對于非時間相關的數據啦粹,則該字段值為 Downsampling.NONE。
- deleteHistory(boolean類型):是否刪除歷史數據窘游。
- scopeId(int類型):對應指標的全局唯一 ID唠椭。
很明顯,ES 索引的名稱由三部分構成:Metric 名稱张峰、DownSampling泪蔫、時間窗口(后面兩部分只有時序數據才會有),而 ES 索引的別名由 Metric 名稱和 DownSampling 構成喘批。
在 CoreModuleProvider 啟動過程中撩荣,會掃描 @Stream 注解,創(chuàng)建相應的 Model 對象饶深,并由 StorageModels 統(tǒng)一管理(底層維護了 List 集合)餐曹。 @Stream 中會包含 Model 需要的信息即可。StorageModels 同時實現了 IModelManager敌厘、ModelCreator台猴、ModelManipulator 三個 Service 接口,在 CoreModuleProvider 的 prepare() 方法中會將 StorageModels 作為上述三個 Service 接口的實現注冊到 services 集合中。如果回看一下上面的代碼你就會發(fā)現在org.MetricsStreamProcessor.create從CoreModuleProvider獲取ModelCreator進行注冊俱两。
//...
ModelCreator modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ModelCreator.class);
//...
if (configService.shouldToHour()) {
Model model = modelSetter.add(
metricsClass, stream.getScopeId(), new Storage(stream.getName(), timeRelativeID, DownSampling.Hour),
false
);
hourPersistentWorker = downSamplingWorker(moduleDefineHolder, metricsDAO, model, supportUpdate);
}
//...
prepare 方法先注冊監(jiān)聽
annotationScan.registerListener(new StreamAnnotationListener(getManager()));
start 方法啟動@Stream 注解的掃描
// ...
annotationScan.scan();
// ...
下面介紹一下Model的初始化過程饱狂,主要還是調用StorageModels的add方法進行注冊,我們分析一下metricsClass來自于哪里
- AnalyzerModuleProvider 的start方法中會調用MeterProcessService的start方法
- MeterProcessService會初始化一個MetricConvert
public void start(List<MeterConfig> configs) {
final MeterSystem meterSystem = manager.find(CoreModule.NAME).provider().getService(MeterSystem.class);
this.metricConverts = configs.stream().map(c -> new MetricConvert(c, meterSystem)).collect(Collectors.toList());
}
- MetricConvert會調用Analyzer的build方法
public MetricConvert(MetricRuleConfig rule, MeterSystem service) {
Preconditions.checkState(!Strings.isNullOrEmpty(rule.getMetricPrefix()));
this.analyzers = rule.getMetricsRules().stream().map(
r -> Analyzer.build(
formatMetricName(rule, r.getName()),
rule.getFilter(),
Strings.isNullOrEmpty(rule.getExpSuffix()) ?
r.getExp() : String.format("(%s).%s", r.getExp(), rule.getExpSuffix()),
service
)
).collect(toList());
}
- Analyzer 會MeterSystem的create來初始化MetricsStreamProcessor宪彩,并調用MetricsStreamProcessor的create方法休讳。
初始化存儲結構
很多 ORM 框架(例如,Hibernate )提供了在應用啟動時根據 Java Bean 初始化表結構的功能尿孔。SkyWalking OAP 也提供了類似的功能俊柔,主要在 ModelInstaller 中完成,核心在 whenCreating() 方法中:
@Override
public void whenCreating(Model model) throws StorageException {
if (RunningMode.isNoInitMode()) {
while (!isExists(model)) {
try {
log.info(
"table: {} does not exist. OAP is running in 'no-init' mode, waiting... retry 3s later.",
model
.getName()
);
Thread.sleep(3000L);
} catch (InterruptedException e) {
log.error(e.getMessage());
}
}
} else {
if (!isExists(model)) {
log.info("table: {} does not exist", model.getName());
createTable(model);
}
}
}
在StorageModels的add方法中會調用List<CreatingListener> listeners來遍歷具體實現類來訪問whenCreating 來創(chuàng)建表結構活合,這個CreatingListener接口的唯一實現是ModelInstaller雏婶,峰回路轉,你有沒有發(fā)現所有流程已經串接起來白指,也就是AnalyzerModuleProvider初始化model的時候同時創(chuàng)建了表結構留晚。
createTable方法是個虛方法,主要有兩個實現
- StorageEsInstaller
- H2TableInstaller
我們主要關注StorageEsInstaller
@Override
protected void createTable(Model model) throws StorageException {
if (model.isTimeSeries()) {
createTimeSeriesTable(model);
} else {
createNormalTable(model);
}
}
代碼很清晰就是創(chuàng)建我們介紹兩種表
數據抽象
了解了索引(或是數據庫表結構)的初始化流程之后告嘲,再來看 SkyWalking OAP是如何對持久化數據進行抽象的倔丈。
SkyWalking 與 ORM 類似憨闰,會將索引中的一個 Document (或是數據庫表中的一條數據)映射為一個 StorageData 對象。StorageData 接口中定義了一個 id() 方法需五,負責返回該條數據的唯一標識。我們上面的就是TopN就是一個StorageData 對象實現轧坎。
-
Metrics:所有監(jiān)控指標的頂級抽象宏邮,其中定義了一個 timeBucket 字段(long類型),它是所有監(jiān)控指標的公共字段缸血,用于表示該監(jiān)控點所處的時間窗口蜜氨。另外,timeBucket 字段被 @Column 注解標記捎泻,在 OAP 啟動時會被掃描到轉換成 Model 中的 ModelColumn飒炎,在初始化 ES 索引時就會創(chuàng)建相應的 Field。Metrics 抽象類中還定義了計算監(jiān)控數據的一些公共方法:
calculate() 方法:大部分 Metrics 都會有一個 value 字段來記錄該監(jiān)控點的值笆豁,例如郎汪,CountMetrics 中的 value 字段記錄了時間窗口內事件的次數,MaxLongMetrics 中的 value 字段記錄時間窗口內的最大值闯狱。calculate() 方法就是用來計算該 value 值煞赢。
combine() 方法:合并兩個監(jiān)控點。對于不同含義的監(jiān)控數據哄孤,合并方式也有所不同照筑,例如,CountMetrics 中 combine() 方法的實現是將兩個監(jiān)控點的值進行求和瘦陈;MaxLongMetrics 中 combine() 方法的實現是取兩個監(jiān)控點的最大值凝危。
Record:抽象了所有記錄類型的數據,其子類如下圖所示晨逝。其中 SegmentRecord 對應的是 TraceSegment 數據蛾默、AlarmRecord 對應一條告警、TopNDatabaseStatement 對應一條慢查詢記錄咏花,這些數據都是一條條的記錄趴生。
上述記錄類型的數據也有一個公共字段 —— timeBucket(long 類型),表示該條記錄數據所在的時間窗口昏翰,也同樣被 @Column 注解標記苍匆。Record 抽象類中沒有定義其他的公共方法。
RegisterSource:抽象了服務注冊棚菊、服務實例注冊浸踩、EndpointName(以及 NetworkAddress)同步三個過程中產生的數據。其中统求,定義了三個公共字段检碗,且這三個字段都被 @Column 注解標注了:
sequence(int 類型):上述三個過程中的數據都會產生一個全局唯一的 ID据块,該全局 ID 就保存在該字段中。
registerTime(long 類型):第一注冊(或同步)時的時間戳折剃。
heartbeatTime(long 類型):心跳時間戳另假。
在這三個抽象類下還有很多具體的實現類,這些實現類會根據對應的具體數據怕犁,擴展新的字段和方法
最后边篮,我們來看 StorageBuilder 這個接口,它與 StorageData 接口的關系非常緊密奏甫,在 StorageData 的全部實現類中庸娱,都有一個內部 Builder 類實現類了 StorageBuilder 接口成畦。StorageBuilder 接口中定義了 map2Data() 和 data2Map() 兩個方法,實現了 StorageData 對象與 Map 之間的相互轉換。
DAO 層架構
在創(chuàng)建 ES 索引時使用到的 ElasticSearchClient疆液, 是對 RestHighLevelClient 進行了一層封裝闽晦,位于 library-client 模塊边锁,其中還提供了 JDBC 以及 gRPC 的 Client
SkyWalking OAP 也提供了DAO 層抽象辕坝,如下圖所示,大致可以分為三類:Receiver DAO奈梳、Cache DAO杈湾、Query DAO。
數據 TTL
Metrics攘须、Trace 等(時間相關的數據)對應的 ES 索引都是按照時間進行切分的漆撞,隨著時間的推移,ES 索引會越來越多于宙。為了解決這個問題浮驳,SkyWalking 只會在 ES 中存儲一段時間內的數據,CoreModuleProvider 會啟動 DataTTLKeeperTimer 定時清理過期數據捞魁。