擴(kuò)展閱讀:
一聘鳞、Overview
1.1氢伟、通過(guò) Kylin 查詢
其中 olap_model_6607769716595835175.json 內(nèi)容如下:
{
"version": "1.0",
"defaultSchema": "DEFAULT",
"schemas": [
{
"type": "custom",
"name": "DEFAULT",
"factory": "org.apache.kylin.query.schema.OLAPSchemaFactory",
"operand": {
"project": "learn_kylin"
},
"functions": [
{
name: 'PERCENTILE',
className: 'org.apache.kylin.measure.percentile.PercentileAggFunc'
},
{
name: 'CONCAT',
className: 'org.apache.kylin.query.udf.ConcatUDF'
},
{
name: 'MASSIN',
className: 'org.apache.kylin.query.udf.MassInUDF'
},
{
name: 'INTERSECT_COUNT',
className: 'org.apache.kylin.measure.bitmap.BitmapIntersectDistinctCountAggFunc'
},
{
name: 'VERSION',
className: 'org.apache.kylin.query.udf.VersionUDF'
},
{
name: 'PERCENTILE_APPROX',
className: 'org.apache.kylin.measure.percentile.PercentileAggFunc'
}
]
}
]
}
最主要的是指定了:
- schema factory class:
org.apache.kylin.query.schema.OLAPSchemaFactory
- project: learn_kylin
1.2、下推到其他計(jì)算引擎
二苔埋、OLAPSchemaFactory & OLAPSchema
在上文中提到块请,通過(guò) calcite jdbc 創(chuàng)建 connection 的時(shí)候,指定了 schema facotry 為 org.apache.kylin.query.schema.OLAPSchemaFactory
, 即在 validate 的過(guò)程中會(huì)使用 OLAPSchemaFactory 創(chuàng)建 Scehma馋辈。
OLAPSchemaFactory 繼承于 calcite SchemaFactory,用于 create Scehma倍谜。Scehma 主要用于獲取 table迈螟、function、subSchema 等元數(shù)據(jù)尔崔,類圖如下
OLAPSchemaFactory#create
如下答毫,創(chuàng)建的 Schema 為 OLAPSchema 類型:
public Schema create(SchemaPlus parentSchema, String schemaName, Map<String, Object> operand) {
String project = (String) operand.get(SCHEMA_PROJECT);
Schema newSchema = new OLAPSchema(project, schemaName, exposeMore(project));
return newSchema;
}
所以在 validate 的過(guò)程中,會(huì)通過(guò)調(diào)用 OLAPSchema#getTable
去替換一個(gè) SqlIdentifier季春,OLAPSchema#getTable 得到的是一個(gè) OLAPTable
2.1洗搂、OLAPTable
toRel 方法如下,得到一個(gè) OLAPTableScan
public RelNode toRel(ToRelContext context, RelOptTable relOptTable) {
int fieldCount = relOptTable.getRowType().getFieldCount();
int[] fields = identityList(fieldCount);
return new OLAPTableScan(context.getCluster(), relOptTable, this, fields);
}
三、Kylin 自定義 rules 及 RelNode
以下面這條 sql 為例:
SELECT KYLIN_SALES.TRANS_ID, SUM(KYLIN_SALES.PRICE), COUNT(KYLIN_ACCOUNT.ACCOUNT_ID)
FROM KYLIN_SALES
INNER JOIN KYLIN_ACCOUNT ON KYLIN_SALES.BUYER_ID = KYLIN_ACCOUNT.ACCOUNT_ID
WHERE KYLIN_SALES.LSTG_SITE_ID != 1000
GROUP BY KYLIN_SALES.TRANS_ID
ORDER BY TRANS_ID
LIMIT 10;
3.1蚕脏、SqlNode
3.2侦副、HepPlanner 優(yōu)化后的 RelNode
LogicalSort(sort0=[$0], dir0=[ASC], fetch=[10])
LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)], EXPR$2=[COUNT($2)])
LogicalProject(TRANS_ID=[$0], PRICE=[$5], ACCOUNT_ID=[$13])
LogicalFilter(condition=[<>($4, 1000)])
LogicalJoin(condition=[=($7, $13)], joinType=[inner])
OLAPTableScan(table=[[DEFAULT, KYLIN_SALES]], ctx=[], fields=[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]])
OLAPTableScan(table=[[DEFAULT, KYLIN_ACCOUNT]], ctx=[], fields=[[0, 1, 2, 3, 4, 5]])
在 SqlNode 轉(zhuǎn)成 RelNode 的過(guò)程中侦锯,會(huì)調(diào)用到 SqlToRelConverter#convertFrom驼鞭,對(duì)于 SqlIdentity 會(huì)執(zhí)行:
- 通過(guò) validator 獲取該 idt 的 OLAPTable
- 調(diào)用 OLAPTable.toRel 得到 OLAPTableScan(已在上文描述)
這樣 SqlIdentity 就轉(zhuǎn)成了 OLAPTableScan,類圖如上
3.3尺碰、VolcanoPlanner 優(yōu)化后的 OLAPRelNode
在 optimize 過(guò)程中挣棕,在真正調(diào)用 VolcanoPlanner 進(jìn)行 optimize 之前,會(huì)遍歷整個(gè)樹(shù)罕偎,對(duì)于 TableScan 類型的節(jié)點(diǎn)調(diào)用其 register 方法涡驮。對(duì)于 OLAPTableScan 來(lái)說(shuō)囚衔,通過(guò) OLAPTableScan#register
將 Kylin 自定義的 OLAP rules 添加到 planner 中(并刪除一些不需要的 rules),最終這些 rules 會(huì)應(yīng)用到 RelNode 上词身。新增的 rules:
- OLAPToEnumerableConverterRule: RelNode -> OLAPToEnumerableConverter
- OLAPFilterRule: LogicalFilter -> OLAPFilterRel
- OLAPProjectRule: LogicalProject -> OLAPProjectRel
- OLAPAggregateRule: LogicalAggregate -> OLAPAggregateRel
- OLAPJoinRule: LogicalJoin -> OLAPJoinRel/OLAPFilterRel
- OLAPLimitRule: Sort -> OLAPLimitRel
- OLAPSortRule: Sort -> OLAPSortRel
- OLAPUnionRule: Union -> OLAPUnionRel
- OLAPWindowRule: Window -> OLAPWindowRel
- OLAPValuesRule: LogicalValues -> OLAPValuesRel
public void register(RelOptPlanner planner) {
// force clear the query context before traversal relational operators
OLAPContext.clearThreadLocalContexts();
// register OLAP rules
addRules(planner, kylinConfig.getCalciteAddRule());
planner.addRule(OLAPToEnumerableConverterRule.INSTANCE);
planner.addRule(OLAPFilterRule.INSTANCE);
planner.addRule(OLAPProjectRule.INSTANCE);
planner.addRule(OLAPAggregateRule.INSTANCE);
planner.addRule(OLAPJoinRule.INSTANCE);
planner.addRule(OLAPLimitRule.INSTANCE);
planner.addRule(OLAPSortRule.INSTANCE);
planner.addRule(OLAPUnionRule.INSTANCE);
planner.addRule(OLAPWindowRule.INSTANCE);
planner.addRule(OLAPValuesRule.INSTANCE);
planner.addRule(AggregateProjectReduceRule.INSTANCE);
// CalcitePrepareImpl.CONSTANT_REDUCTION_RULES
if (kylinConfig.isReduceExpressionsRulesEnabled()) {
planner.addRule(ReduceExpressionsRule.PROJECT_INSTANCE);
planner.addRule(ReduceExpressionsRule.FILTER_INSTANCE);
planner.addRule(ReduceExpressionsRule.CALC_INSTANCE);
planner.addRule(ReduceExpressionsRule.JOIN_INSTANCE);
}
removeRules(planner, kylinConfig.getCalciteRemoveRule());
if (!kylinConfig.isEnumerableRulesEnabled()) {
for (RelOptRule rule : CalcitePrepareImpl.ENUMERABLE_RULES) {
planner.removeRule(rule);
}
}
// since join is the entry point, we can't push filter past join
planner.removeRule(FilterJoinRule.FILTER_ON_JOIN);
planner.removeRule(FilterJoinRule.JOIN);
// since we don't have statistic of table, the optimization of join is too cost
planner.removeRule(JoinCommuteRule.INSTANCE);
planner.removeRule(JoinPushThroughJoinRule.LEFT);
planner.removeRule(JoinPushThroughJoinRule.RIGHT);
// keep tree structure like filter -> aggregation -> project -> join/table scan, implementOLAP() rely on this tree pattern
planner.removeRule(AggregateJoinTransposeRule.INSTANCE);
planner.removeRule(AggregateProjectMergeRule.INSTANCE);
planner.removeRule(FilterProjectTransposeRule.INSTANCE);
planner.removeRule(SortJoinTransposeRule.INSTANCE);
planner.removeRule(JoinPushExpressionsRule.INSTANCE);
planner.removeRule(SortUnionTransposeRule.INSTANCE);
planner.removeRule(JoinUnionTransposeRule.LEFT_UNION);
planner.removeRule(JoinUnionTransposeRule.RIGHT_UNION);
planner.removeRule(AggregateUnionTransposeRule.INSTANCE);
planner.removeRule(DateRangeRules.FILTER_INSTANCE);
planner.removeRule(SemiJoinRule.JOIN);
planner.removeRule(SemiJoinRule.PROJECT);
// distinct count will be split into a separated query that is joined with the left query
planner.removeRule(AggregateExpandDistinctAggregatesRule.INSTANCE);
// see Dec 26th email @ http://mail-archives.apache.org/mod_mbox/calcite-dev/201412.mbox/browser
planner.removeRule(ExpandConversionRule.INSTANCE);
}
VolcanoPlanner 優(yōu)化后的 RelNode 如下:
OLAPToEnumerableConverter
OLAPLimitRel(ctx=[], fetch=[10])
OLAPSortRel(sort0=[$0], dir0=[ASC], ctx=[])
OLAPAggregateRel(group=[{0}], EXPR$1=[SUM($1)], EXPR$2=[COUNT($2)], ctx=[])
OLAPProjectRel(TRANS_ID=[$0], PRICE=[$5], ACCOUNT_ID=[$13], ctx=[])
OLAPFilterRel(condition=[<>($4, 1000)], ctx=[])
OLAPJoinRel(condition=[=($7, $13)], joinType=[inner], ctx=[])
OLAPTableScan(table=[[DEFAULT, KYLIN_SALES]], ctx=[], fields=[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]])
OLAPTableScan(table=[[DEFAULT, KYLIN_ACCOUNT]], ctx=[], fields=[[0, 1, 2, 3, 4, 5]])
3.4、各個(gè) OLAPRule番枚、OLAPRel 剖析
四法严、選擇 Realization 邏輯
整個(gè)過(guò)程封裝在 RealizationChooser#attemptSelectRealization
中,核心流程如下圖:
4.1葫笼、Realization 分類
分為 Cube 和 HYBRID 兩類深啤,其中 HYBRID 是一個(gè)或多個(gè)其他實(shí)現(xiàn)(Cube)的組合。假設(shè)用戶有一個(gè)名為 Cube_V1 的多維數(shù)據(jù)集路星,它已經(jīng)建立了幾個(gè)月; 現(xiàn)在溯街,用戶希望添加新的維度或指標(biāo)以滿足其業(yè)務(wù)需求; 于是他創(chuàng)建了一個(gè)名為 Cube_V2 的新立方體。由于某些原因用戶想要保留 Cube_V1 洋丐,并且期望從 Cube_V1 的結(jié)束日期開(kāi)始構(gòu)建 Cube_V2 ; 可能的原因包括:
- 歷史源數(shù)據(jù)已從 Hadoop 中刪除呈昔,從一開(kāi)始就無(wú)法構(gòu)建 Cube_V2
- Cube 很大,重建需要很長(zhǎng)時(shí)間
- 新維度/指標(biāo)僅在某一天有效或應(yīng)用;
對(duì)于針對(duì)通用維度/指標(biāo)的查詢友绝,用戶期望掃描 Cube_V1 和 Cube_V2 以獲得完整的結(jié)果集; 在這樣的背景下韩肝,引入 HTBRID(混合模型)來(lái)解決這個(gè)問(wèn)題,如下:
- 混合模型沒(méi)有真正的存儲(chǔ)空間; 它就像在表格上的虛擬數(shù)據(jù)庫(kù)視圖一樣
- 混合實(shí)例充當(dāng)委托者九榔,將請(qǐng)求轉(zhuǎn)發(fā)給其子實(shí)現(xiàn)哀峻,然后在從實(shí)例返回時(shí)合并結(jié)果
- 混合模型的目的是連接歷史 Cube 和新 Cube,類似 union
- 若同時(shí)有 Cube 和 HYBRID 滿足某一個(gè)查詢哲泊,優(yōu)先使用 HYBRID剩蟀,因?yàn)槠鋽?shù)據(jù)更全
4.2、RealizationCost 的 cost 如何計(jì)算切威?
public int CubeInstance#getCost() {
// COST_WEIGHT_MEASURE = 1;
// COST_WEIGHT_DIMENSION = 10;
// COST_WEIGHT_INNER_JOIN = 100;
// 組成 rowKey 的 col 個(gè)數(shù)
int countedDimensionNum = getRowKeyColumnCount();
int c = countedDimensionNum * COST_WEIGHT_DIMENSION + getMeasures().size() * COST_WEIGHT_MEASURE;
DataModelDesc model = getModel();
for (JoinTableDesc join : model.getJoinTables()) {
if (join.getJoin().isInnerJoin())
c += CubeInstance.COST_WEIGHT_INNER_JOIN;
}
return c;
}
public int HybridInstance#getCost() {
int c = Integer.MAX_VALUE;
for (IRealization realization : getRealizations()) {
c = Math.min(realization.getCost(), c);
}
return c;
}
需要討論:
- 為什么 left join 不像 inner join 會(huì)使得 cost 變大育特?
4.3、RealizationCost 如何比較
- realization 優(yōu)先級(jí)更高的會(huì)優(yōu)先被使用(Cube 類型的 IRealization 優(yōu)先級(jí)小于 HYBRID 類型的 IRealization)
- 若兩個(gè) realization 都不存在優(yōu)先級(jí),則 cost 更小的會(huì)被優(yōu)先使用
-
RemoveBlackoutRealizationsRule
:符合以下幾種情況的 realization 會(huì)被移除:- 黑名單中的
- 當(dāng)白名單不為空缰冤,不在白名單中的
- 被配置
kylin.query.realization-filter
過(guò)濾的
-
RemoveUncapableRealizationsRule
:移除不適用的犬缨,詳見(jiàn)下文 isCapable 分析 -
RealizationSortRule
:對(duì)適用(應(yīng)用RemoveBlackoutRealizationsRule
和RemoveUncapableRealizationsRule
后還在的)的 realizations 進(jìn)行排序,排序規(guī)則是優(yōu)先級(jí)更高的 realization 排在更前面棉浸,若均不存在優(yōu)先級(jí)怀薛,則 cost 更小的排在更前面
4.4、CapabilityResult IRealization#isCapable(...)
CapabilityResult 包含:
boolean capable
int cost
IncapableCause incapableCause
CubeInstance#isCapable 主要判斷 Cube 所具備的維度和度量是否能滿足查詢需要的迷郑,只有
查詢的維度組合是 Cube 的維度組合或其子集
查詢的度量組合是 Cube 的度量組合或其子集才能滿足枝恋,否則 isCapable 均返回 false
若整個(gè) attemptSelectRealization 結(jié)束發(fā)現(xiàn)沒(méi)有滿足的 realization,則會(huì)拋 NoRealizationFoundException 異常
若獲取到了 realization嗡害,會(huì)設(shè)置為
olapContext
的 realization焚碌,會(huì)在OLAPEnumerator#queryStorage
中使用
五、Cuboid/Segment 查詢
OLAPTableScan 真正掃描時(shí)會(huì)觸發(fā) Cuboid/Segment 的查詢霸妹,核心流程如下:
5.1十电、Cuboid 選擇
在 Cuboid#findCuboid
中實(shí)現(xiàn):
public static Cuboid findCuboid(CuboidScheduler cuboidScheduler, Set<TblColRef> dimensions,
Collection<FunctionDesc> metrics) {
long cuboidID = toCuboidId(cuboidScheduler.getCubeDesc(), dimensions, metrics);
return Cuboid.findById(cuboidScheduler, cuboidID);
}
cuboidID 計(jì)算方式如下:
public static long toCuboidId(CubeDesc cubeDesc, Set<TblColRef> dimensions, Collection<FunctionDesc> metrics) {
for (FunctionDesc metric : metrics) {
if (metric.getMeasureType().onlyAggrInBaseCuboid())
return Cuboid.getBaseCuboidId(cubeDesc);
}
long cuboidID = 0;
// dimensions 包含 group 列和 where 條件列
for (TblColRef column : dimensions) {
// 獲取維度列在 rowKey 中的 index
int index = cubeDesc.getRowkey().getColumnBitIndex(column);
// 見(jiàn)如下示例
cuboidID |= 1L << index;
}
return cuboidID;
}
下面舉個(gè)簡(jiǎn)單的例子,假設(shè)表一共有三列ABC叹螟,那么所有的 cuboid 組合就是:
5.2鹃骂、CubeSegmentScanner 內(nèi)部流程
SequentialCubeTupleIterator 最終是要調(diào)用 CubeSegmentScanner 去獲取 Cuboid 數(shù)據(jù)。
在對(duì)每個(gè) segment 進(jìn)行掃描的時(shí)候首妖,首先需要根據(jù)篩選到的 cuboid id 去獲取相應(yīng)的 region 信息(主要是起始region id 和 region數(shù))偎漫。
這樣就可以獲取每個(gè) segment 需要掃描的region,由于 Kylin 目前的數(shù)據(jù)都存儲(chǔ)在 HBase 當(dāng)中有缆,因此掃描的過(guò)程都在 HBase中進(jìn)行象踊。對(duì)于每個(gè) region,都會(huì)啟動(dòng)一個(gè)線程來(lái)向 HBase 發(fā)送掃描請(qǐng)求棚壁,然后將所有掃描的結(jié)果返回杯矩,聚合之后再返回上一層。為了加快掃描效率袖外,Kylin 還使用了 HBase 的 coprocessor 來(lái)對(duì)每個(gè)region的掃描結(jié)果進(jìn)行預(yù)聚合史隆。