kylin3之前的TopN實(shí)現(xiàn)原理
看一個(gè)典型的 Top-N 查詢示例。該查詢是選擇在 2015 年 10 月 1 日抗楔,地址在北京惯雳,銷售商品按價(jià)格之和排序(倒序),找前 100 個(gè)妥箕。
在 Kylin v1.5 之前滥酥,SQL 中的 group by 列,需聲明成維度畦幢,所以這個(gè) Cube 的維度中要有日期坎吻,地點(diǎn)和商品名,度量是 SUM(PRICE) 宇葱。圖中展示了一個(gè)這樣設(shè)計(jì) Cube瘦真。因?yàn)樯唐返幕鶖?shù)很大刊头,計(jì)算的 cuboid 的行數(shù)會(huì)很多;而度量值 SUM(PRICE) 是非排序的诸尽,因此需要將這些紀(jì)錄都從存儲(chǔ)器讀到 Kylin 查詢引擎中(內(nèi)存)原杂, 然后再排序找出最高的紀(jì)錄;這樣的解決辦法總開銷較大弦讽。
Kylin 選擇了 Space-Saving 算法 污尉,以及它的一個(gè)衍生版 Parallel Space-Saving,并在此之上做了特定的優(yōu)化往产,有了 Top-N 之后被碗,Cube 的設(shè)計(jì)會(huì)比以前簡(jiǎn)單很多,因?yàn)橄駝偛诺纳唐访麜?huì)被挪到 Measure 中去仿村,在 Measure 里按 Sum 值做倒序锐朴,只保留最大的若干值
值得一提的是需要用多少空間運(yùn)算 Top-N。簡(jiǎn)單來(lái)說(shuō)存儲(chǔ)空間越多準(zhǔn)確率越高蔼囊。我們通過(guò)使用生成一些樣本數(shù)據(jù)然后用 Space-Saving 計(jì)算焚志,并且跟真實(shí)結(jié)果做比較,發(fā)現(xiàn) 50 倍空間對(duì)于普通的數(shù)據(jù)分布是夠用的畏鼓。也即酱酬,用戶需要 Top 100 的結(jié)果,Kylin 對(duì)于每種組合條件值云矫,保留 Top 5000 的紀(jì)錄, 并供以后再次合并膳沽。這樣即使多次合并, Top100 依然是比較接近真實(shí)結(jié)果
Top-N 的優(yōu)點(diǎn):因?yàn)樗槐A?Top 的記錄让禀,會(huì)讓 Cube 空間大幅度減少挑社,而查詢性能大大提升。在一個(gè)典型的例子里巡揍,改用 Top-N 后痛阻,Cube 的大小減少了 90%,而查詢時(shí)間則只有以前的 10% 不到腮敌。
缺點(diǎn)是它可能是近似的結(jié)果(當(dāng) 50 倍空間也無(wú)法容納所有基數(shù)的時(shí)候)阱当。如果業(yè)務(wù)場(chǎng)景需要絕對(duì)精確的話,它可能不適合糜工。
Top-N 誤差率由很多因素決定的:
數(shù)據(jù)的分布:數(shù)據(jù)分布越陡斗这,誤差越小。
算法使用的空間:如果對(duì)精度要求高的話啤斗,可以選擇用更多的空間換取更精準(zhǔn)的準(zhǔn)確率 表箭。在實(shí)際使用中,可以做一些比較以了解誤差情況。
kylin3之后的TopN實(shí)現(xiàn)原理
當(dāng)前Kylin4的TopN UDAF注冊(cè)是在org.apache.kylin.engine.spark.job.CuboidAggregator#aggInternal, 代碼如下:
def aggInternal(ss: SparkSession,
dataSet: DataFrame,
dimensions: util.Set[Integer],
measures: util.Map[Integer, FunctionDesc],
isSparkSql: Boolean): DataFrame = {
//省略
measure.expression.toUpperCase(Locale.ROOT) match {
//省略
case "TOP_N" =>
// Uses new TopN aggregate function
// located in kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/udaf/TopN.scala
val schema = StructType(measure.pra.map { col =>
val dateType = col.dataType
if (col == measure) {
StructField(s"MEASURE_${col.columnName}", dateType)
} else {
StructField(s"DIMENSION_${col.columnName}", dateType)
}
})
if (reuseLayout) {
new Column(ReuseTopN(measure.returnType.precision, schema, columns.head.expr)
.toAggregateExpression()).as(id.toString)
} else {
new Column(EncodeTopN(measure.returnType.precision, schema, columns.head.expr, columns.drop(1).map(_.expr))
.toAggregateExpression()).as(id.toString)
}
//省略
case _ =>
max(columns.head).as(id.toString)
}
}.toSeq
//省略
if (reuseLayout) {
val columns = NSparkCubingUtil.getColumns(dimensions) ++ measureColumns(dataSet.schema, measures)
df.select(columns: _*)
} else {
df
}
}
其實(shí)TopN最初的實(shí)現(xiàn)的在org.apache.kylin.engine.spark.job.TopNUDAF免钻,但是可以看到目前TopN的實(shí)現(xiàn)是在org.apache.spark.sql.udaf.BaseTopN.scala彼水,最新的實(shí)現(xiàn)主要針對(duì)舊的實(shí)現(xiàn)修復(fù)了性能問(wèn)題,詳情可以查看KYLIN-4760极舔。
Kylin 4.0的TopN是通過(guò)Spark UDAF的方式實(shí)現(xiàn)的凤覆,以下是實(shí)現(xiàn)類接口之間的關(guān)系,可以看到最終實(shí)現(xiàn)的是BaseTopN拆魏,繼承的是TypedImperativeAggregate盯桦。然后BaseTopN又有兩個(gè)子類,分別是EncodeTopN和ReuseTopN渤刃,當(dāng)從平表(FlatTable)開始構(gòu)建的時(shí)候拥峦,F(xiàn)latTable中沒(méi)有構(gòu)建過(guò)TopN,這里會(huì)調(diào)用EncodeTopN卖子,再次之后從已經(jīng)構(gòu)建好的cuboid構(gòu)建下一層cuboid的時(shí)候會(huì)調(diào)用ReuseTopN略号,避免重復(fù)計(jì)算,接口關(guān)系圖如下:
繼承TypedImperativeAggregate實(shí)現(xiàn)TopN洋闽,而不是UserDefineAggregateFunction主要是因?yàn)閁serDefinedAggregateFunction 是把 catalyst 內(nèi)部 internalRow 類型轉(zhuǎn)換為了 Row 類型玄柠,然后使用用戶自己的 update 方法處理,然后TypedImperativeAggregate需要自己做序列化诫舅、反序列化處理羽利,少了一層轉(zhuǎn)換。
TopNCounter介紹
前面提到Space-Saving算法是在TopNCounter中實(shí)現(xiàn)的刊懈,此處我們對(duì)TopNCounter的實(shí)現(xiàn)進(jìn)行一個(gè)簡(jiǎn)要的介紹这弧。BaseTopN對(duì)象初始化的時(shí)候會(huì)創(chuàng)建TopNCounter對(duì)象,用戶保存計(jì)算過(guò)程中符合TopN條件的行俏讹,對(duì)應(yīng)于Spark UDAF的概念是aggregate buffer。update畜吊,merge泽疆,eval都是處理的TopNCounter。TopNCounter在初始化的時(shí)候需要指定容量, 大小建議為N * TopNCounter.EXTRA_SPACE_RATE, 其中N為TopN定義的大小玲献,EXTRA_SPACE_RATE為建議額外空間調(diào)整參數(shù)殉疼,默認(rèn)為10, 也就是說(shuō)如果定義的topn(10,4), 那么TopNCounter的初始化大小則為10 * 10 = 100 捌年。
TopN的處理流程可以見下圖:
update()主要將傳入的行通過(guò)TopNCounter.offer() 將一行的內(nèi)容插入到TopNCounter對(duì)象中瓢娜,merge則是對(duì)兩個(gè)經(jīng)過(guò)update()操作的group進(jìn)行去重合并,最后在eval()的時(shí)候調(diào)用TopNCounter.sortAndRetain()來(lái)排序和調(diào)整TopNCounter大小礼预,最終得到聚合結(jié)果眠砾。
Kylin 4.0目前使用的是parquet進(jìn)行存儲(chǔ),我們定義topn(10,4)托酸, TopNCounter.EXTRA_SPACE_RATE 設(shè)置為1褒颈。cuboid中維度和度量列明的映射關(guān)系為:
0 -> seller_id
1 -> item_id
2 -> id
3 -> price
4 -> Count
5 -> TopN
如下是只有TopN和只有SUM的cuboid內(nèi)容:
值得注意的是第二行柒巫,Count為11,但是實(shí)際上TopN列只存儲(chǔ)了10個(gè)值谷丸,這是因?yàn)門opNCounter的容量只有10 * EXTRA_SPACE_RATE = 10堡掏, 超過(guò)10的內(nèi)容不會(huì)被存儲(chǔ),這也是當(dāng)前TopN存在誤差的原因所在刨疼∪洌可以看到TopN將計(jì)算的維度和group by的維度放到了一起,然后用數(shù)組的形式進(jìn)行存儲(chǔ)揩慕。
對(duì)于sum度量亭畜,kylin則是直接存儲(chǔ)的sum后的聚合值。
參考漩绵,其實(shí)是抄了一邊
Apache Kylin的Top-N近似預(yù)計(jì)算-InfoQ
Kylin 4.0 TopN Introduction CN - Kylin 4.0 TopN Introduction CN - Apache Software Foundation
Apache Kylin權(quán)威指南(九):Top_N 度量?jī)?yōu)化-InfoQ
問(wèn)題:
- 對(duì)于Kylin3贱案,是哪個(gè)維度會(huì)移入到 Measure中(不定義為維度實(shí)現(xiàn),我感覺(jué)怎么是個(gè)鎖)止吐,多個(gè)維度移入到 Measure又是如何實(shí)現(xiàn)
- TopNCounter中的方法調(diào)用流程