OAL概念
用戶自定義的描述分析過程的可擴(kuò)展, 輕量級(jí)的編譯型語言, 在運(yùn)行時(shí)編譯成 class 文件, 用于 skywalking 流計(jì)算
支持兩種
- 硬編碼定義
- OAL 定義, 用于指標(biāo)數(shù)據(jù), 針對(duì)特定服務(wù), 服務(wù)實(shí)例等進(jìn)行統(tǒng)計(jì)數(shù)據(jù)聚合計(jì)算
skywalking 流計(jì)算, 目前存在以下4中數(shù)據(jù)類型
- Record: 明細(xì)數(shù)據(jù), Trace, 日志
- Metrics: 指標(biāo)數(shù)據(jù), OAL 一般生成這個(gè)類型
- TopN: 周期采樣數(shù)據(jù), SQL 周期性采集
- ManagementData: 管理數(shù)據(jù), 目前只有 UITemplate用于配置管理后臺(tái)的頁面模板
- NoneStream: 非流數(shù)據(jù)保存, 目前用于 ProfileTaskRecord
對(duì)應(yīng)的有以下處理器, 處理器的職責(zé)為將數(shù)據(jù)添加到處理隊(duì)列, 后續(xù)用 DAO 批量保存
- RecordStreamProcessor
- MetricsStreamProcessor
- TopNStreamProcessor
- ManagementStreamProcessor
- NoneStreamStreamProcessor: 主要支撐頁面交互, 直接對(duì)db操作, 類似crud
OAL 主要用于指標(biāo)部分的自動(dòng)化生成, 對(duì)應(yīng) Metrics 和 MetricsStreamProcessor, 采用運(yùn)行時(shí)編譯成class, 因此 OAL 不會(huì)影響執(zhí)行效率
OAL 基本語法
8.0.0 版本后在 /config/*.oal 中可以直接調(diào)整配置
基礎(chǔ)語法類似于 lambda 表達(dá)式
// 聲明 Metrics
METRICS_NAME = from(CAST SCOPE.(* | [FIELD][,FIELD ...]))
[.filter(CAST FIELD OP [INT | STRING])]
.FUNCTION([PARAM][, PARAM ...])
// 禁用 Metrics
disable(METRICS_NAME);
- METRICS_NAME: 指標(biāo)名稱
- from: 定義數(shù)據(jù)源
- SCOPE: 定義為: All(全局訪問), Service(服務(wù)), ServiceInstance(服務(wù)實(shí)例), Endpoint, ServiceRelation, ServiceInstanceRelation, and EndpointRelation
- (* | [FIELD][,FIELD ...]): 需要提取的字段
- CAST: 類型轉(zhuǎn)換, 類似 lambda 的map, 例如: from((str->long)Service.tag["transmission.latency"]) 將 transmission.latency 字段轉(zhuǎn)換為 long 類型, filter, function 中都可以使用
- filter(可選): 通過定義的字段來過濾, 可以是多個(gè), 也可以沒有, 多個(gè)時(shí)為 AND 關(guān)系
- 多級(jí)過濾: service_2xx = from(Service.*).filter(responseCode >= 200).filter(responseCode < 400).cpm()
- FUNCTION: 聚集函數(shù)定義, 聚合生成新的指標(biāo), 例如: 百分比, longAvg, percent, rate, count, histogram(熱力圖), apdex 等等
官方文檔
OAL 工作階段
詞法和語法分析
通過 Antlr 定義, 源碼語法定義在 oap-server/oal-grammar 中, 分為 OALLexer.g4 和 OALParser.g4
- OALLexer: 定義詞法樹
- OALParser: 定義語法樹
基本語法結(jié)構(gòu)如下
// 定義最上級(jí)語法, Metrics 聲明 或是 禁用 Metrics
root
: (aggregationStatement | disableStatement)*
;
// Metrics 聲明則包含 "變量 = metricStatement 注釋" 的機(jī)構(gòu)
aggregationStatement
: variable (SPACE)? EQUAL (SPACE)? metricStatement DelimitedComment? LineComment? (SEMI|EOF)
;
// 聲明 disable語法, disable(METRICS_NAME);
disableStatement
: DISABLE LR_BRACKET disableSource RR_BRACKET DelimitedComment? LineComment? (SEMI|EOF)
;
// 定義 from(...).filter(...)+.function(...) 的結(jié)構(gòu)
metricStatement
: FROM LR_BRACKET source (sourceAttributeStmt+) RR_BRACKET (filterStatement+)? DOT aggregateFunction
;
動(dòng)態(tài)代碼生成
動(dòng)態(tài)代碼生成通過 Javassist 輔助生成運(yùn)行時(shí)代碼, 直接將生成好代碼注入 JVM, 代碼模板位于 oap-server/oal-rt 項(xiàng)目的 resouces 目錄中, 模板通過freemarker處理成真正的代碼
生成代碼的目標(biāo)為
- 生成帶 @Stream 注解的 XXXMetrics(抽象類) 的實(shí)現(xiàn)類, 此實(shí)現(xiàn)類和使用的 function 對(duì)應(yīng), 有 AvgFunction, AvgHistogramFunction 等等
- 生成 XXXMetrics 的 StorageBuilder , 用于 具體 Metrics 實(shí)現(xiàn)到 Map<String, Object> 相互轉(zhuǎn)換
- 生成 XXXDispather, 用于 metrics 的基本信息注冊(cè), 并調(diào)用
MetricsStreamProcessor#in
入隊(duì),稍后進(jìn)行批量處理
觸發(fā)的流程
- CoreModuleProvider#prepare 中注冊(cè) OALEngineLoaderService
- CoreModuleProvider#start 加載 oal/disable.oal
- MeshReceiverProvider#start 加載 oal/core.oal
- ...
加載代碼為 OALEngineLoaderService#load
// 加載引擎
OALEngine engine = loadOALEngine(define);
StreamAnnotationListener streamAnnotationListener = new StreamAnnotationListener(moduleManager);
engine.setStreamListener(streamAnnotationListener);
// 設(shè)置用于Metrics分配服務(wù) DispatcherManager
engine.setDispatcherListener(moduleManager.find(CoreModule.NAME)
.provider()
.getService(SourceReceiver.class)
.getDispatcherDetectorListener());
// 設(shè)置dao存儲(chǔ)模塊
engine.setStorageBuilderFactory(moduleManager.find(StorageModule.NAME)
.provider()
.getService(StorageBuilderFactory.class));
// 開始解析
engine.start(OALEngineLoaderService.class.getClassLoader());
engine.notifyAllListeners();
oalDefineSet.add(define);
OALEngine 目前實(shí)現(xiàn)類為 OALRuntime, OALRuntime 實(shí)現(xiàn)如下, 主要生成 Metrics 數(shù)據(jù), 以及 MetricsBuilder( Metrics 對(duì)象到 Map<String,Object>
的映射), 以下 XXX 對(duì)應(yīng) metrics name
- 初始化進(jìn)行 freemarker 的 Configuration 初始化, 加載 /code-templates 下模板
- 讀取配置文件 *.oal
- 通過語法分析ScriptParser 將 oal 中的配置解析成 OALScripts
- 將 OALScripts 處理成 XXXMetrics 對(duì)應(yīng)的 Class, 具體方法通過 OALRuntime#generateMetricsClass, 通過 javassit 生成 (OALRuntime#generateMetricsClass)
- 構(gòu)建空的構(gòu)造函數(shù)
- 添加 field
- 通過 freemarker 生成 method, 定義好的 method 有 id, hashCode, equals, serialize, deserialize, getMeta, toHour, toDay, 每個(gè)方法在 /code-templates 下都有對(duì)應(yīng)模板
- 添加 Stream 注解,
@Stream(name = "${tableName}", scopeId = ${sourceScopeId}, builder = ${metricsName}Metrics.Builder.class, processor = MetricsStreamProcessor.class)
- 將 OALScripts 處理成對(duì)應(yīng)的 XXXMetricsBuilder, 實(shí)現(xiàn) entity2Storage 和 storage2Entity 方法, 用于 dao 層到 實(shí)體的轉(zhuǎn)換
- 將 OALScripts 處理成對(duì)應(yīng)的 ${scopeName}Dispatcher
生成器可以通過環(huán)境變量配置 SW_OAL_ENGINE_DEBUG 類設(shè)置是否生成對(duì)應(yīng)的
.class
文件
實(shí)例
oal 配置如下, from 定義了 Scope, function 定義了對(duì)應(yīng)的 Metrics 實(shí)現(xiàn)類
- Scope 相同的將合并到同一個(gè) SourceDispather 中進(jìn)行分發(fā)
- function 則通過不同的實(shí)現(xiàn)類進(jìn)行分發(fā)
all_percentile = from(All.latency).percentile(10); // Multiple values including p50, p75, p90, p95, p99
all_heatmap = from(All.latency).histogram(100, 20);
對(duì)應(yīng)生成代碼, 上述兩條oal 都對(duì)應(yīng)到同一個(gè) Scope 模塊, 都對(duì)應(yīng)到 ALL 上
/**
* SourceDispatcher 分發(fā)器
*/
public class AllDispatcher implements SourceDispatcher<All> {
private void doAllPercentile(All var1) {
AllPercentileMetrics var2 = new AllPercentileMetrics();
var2.setTimeBucket(var1.getTimeBucket());
var2.combine(var1.getLatency(), 10);
MetricsStreamProcessor.getInstance().in(var2);
}
private void doAllHeatmap(All var1) {
AllHeatmapMetrics var2 = new AllHeatmapMetrics();
var2.setTimeBucket(var1.getTimeBucket());
var2.combine(var1.getLatency(), 100, 20);
MetricsStreamProcessor.getInstance().in(var2);
}
public void dispatch(ISource var1) {
All var2 = (All)var1;
this.doAllPercentile(var2);
this.doAllHeatmap(var2);
}
public AllDispatcher() {
}
}
/**
* 帶 function 的Metrics
*/
@Stream(
name = "all_heatmap",
scopeId = DefaultScopeDefine.ALL,
builder = AllHeatmapMetricsBuilder.class,
processor = MetricsStreamProcessor.class
)
public class AllHeatmapMetrics extends HistogramMetrics implements WithMetadata {
public AllHeatmapMetrics() {
}
protected String id0() {
StringBuilder var1 = new StringBuilder(String.valueOf(this.getTimeBucket()));
return var1.toString();
}
public int hashCode() {
byte var1 = 17;
int var2 = 31 * var1 + (int)this.getTimeBucket();
return var2;
}
public int remoteHashCode() {
byte var1 = 17;
return var1;
}
public boolean equals(Object var1) {
if (this == var1) {
return true;
} else if (var1 == null) {
return false;
} else if (this.getClass() != var1.getClass()) {
return false;
} else {
AllHeatmapMetrics var2 = (AllHeatmapMetrics)var1;
return this.getTimeBucket() == var2.getTimeBucket();
}
}
public Builder serialize() {
Builder var1 = RemoteData.newBuilder();
var1.addDataLongs(this.getTimeBucket());
var1.addDataObjectStrings(this.getDataset().toStorageData());
return var1;
}
public void deserialize(RemoteData var1) {
this.setTimeBucket(var1.getDataLongs(0));
this.setDataset(new DataTable(var1.getDataObjectStrings(0)));
}
public MetricsMetaInfo getMeta() {
return new MetricsMetaInfo("all_heatmap", 0);
}
public Metrics toHour() {
AllHeatmapMetrics var1 = new AllHeatmapMetrics();
DataTable var2 = new DataTable();
var2.copyFrom(this.getDataset());
var1.setDataset(var2);
var1.setTimeBucket(this.toTimeBucketInHour());
return var1;
}
public Metrics toDay() {
AllHeatmapMetrics var1 = new AllHeatmapMetrics();
DataTable var2 = new DataTable();
var2.copyFrom(this.getDataset());
var1.setDataset(var2);
var1.setTimeBucket(this.toTimeBucketInDay());
return var1;
}
}
/**
* StorageBuilder 實(shí)現(xiàn)
*/
public class AllHeatmapMetricsBuilder implements StorageHashMapBuilder {
public AllHeatmapMetricsBuilder() {
}
public Map entity2Storage(StorageData var1) {
AllHeatmapMetrics var2 = (AllHeatmapMetrics)var1;
HashMap var3 = new HashMap();
var3.put((Object)"dataset", var2.getDataset());
var3.put((Object)"time_bucket", new Long(var2.getTimeBucket()));
return var3;
}
public StorageData storage2Entity(Map var1) {
AllHeatmapMetrics var2 = new AllHeatmapMetrics();
var2.setDataset(new DataTable((String)var1.get("dataset")));
var2.setTimeBucket(((Number)var1.get("time_bucket")).longValue());
return var2;
}
}
@Stream(
name = "all_percentile",
scopeId = DefaultScopeDefine.ALL,
builder = AllPercentileMetricsBuilder.class,
processor = MetricsStreamProcessor.class
)
public class AllPercentileMetrics extends PercentileMetrics implements WithMetadata {
public AllPercentileMetrics() {
}
...
}
grpc 的模塊依賴
Trace 模塊依賴如下, AnalyzerModule 會(huì)依賴 AnalyzerModule, AnalyzerModule 加載時(shí)會(huì)對(duì) core.oal 進(jìn)行加載, 因此在 grpc 接口調(diào)用時(shí) oal 已生成完成
@Override
public String[] requiredModules() {
return new String[] {
TelemetryModule.NAME,
CoreModule.NAME,
AnalyzerModule.NAME,
SharingServerModule.NAME,
ConfigurationModule.NAME
};
}