總目錄
Kylin系列(一)—— 入門
Kylin系列(二)—— Cube 構(gòu)造算法
[TOC]
Kylin cube 構(gòu)造算法
逐層算法(layer Cubing)
我們知道,一個(gè)N維的Cube,是有1個(gè)N維子立方體侮腹、N個(gè)(N-1)維子立方體嘲碧、N*(N-1)/2個(gè)(N-2)維子立方體、......父阻、N個(gè)1維子立方體和1個(gè)0維子立方體構(gòu)成愈涩,總共有2^N個(gè)子立方體望抽。在逐層算法中,按照維度數(shù)逐層減少來計(jì)算履婉,每個(gè)層級(jí)的計(jì)算(除了第一層煤篙,他是從原始數(shù)據(jù)聚合而來),是基于他上一層級(jí)的計(jì)算結(jié)果來計(jì)算的毁腿。
比如group by [A,B]的結(jié)果辑奈,可以基于group by [A,B,C]的結(jié)果,通過去掉C后聚合得來的已烤,這樣可以減少重復(fù)計(jì)算鸠窗;當(dāng)0維Cuboid計(jì)算出來的時(shí)候,整個(gè)Cube的計(jì)算也就完成了胯究。
如上圖所示稍计,展示了一個(gè)4維的Cube構(gòu)建過程。
此算法的Mapper和Reducer都比較簡單裕循。Mapper以上一層Cuboid的結(jié)果(key-value對(duì))作為輸入臣嚣。由于Key是由各維度值拼接在一起,從其中找出要聚合的維度剥哑,去掉它的值成新的key茧球,并對(duì)value進(jìn)行操作,然后把新的key和value輸出星持,進(jìn)而Hadoop MapReduce對(duì)所有新的key進(jìn)行排序抢埋、洗牌(shuffle)、再送到Reducer處督暂;Reducer的輸入會(huì)是一組具有相同key的value集合揪垄,對(duì)這些value做聚合運(yùn)算,再結(jié)合key輸出就完成了一輪計(jì)算逻翁。
舉個(gè)例子:
假設(shè)一共四個(gè)維度A/B/C/D饥努,他們的成員分別是(A1、A2八回、A3)酷愧,(B1、B2)缠诅、(C1)溶浴、(D1),有一個(gè)measure(對(duì)于這列V管引,計(jì)算sum(V))士败,這里忽略dictionary編碼。原始表如下:
那么base cuboid最終的輸出如下
(A1、B1谅将、C1漾狼、D1、2)
(A1饥臂、B2逊躁、C1、D1隅熙, 3)
(A2稽煤、B1、C1猛们、D1念脯, 5)
(A3狞洋、B1弯淘、C1、D1吉懊, 6)
(A3庐橙、B2、C1借嗽、D1态鳖, 8)
那么它作為下面一個(gè)cuboid的輸入,對(duì)于第一行輸入
(A1恶导、B1浆竭、C1、D1惨寿,2)邦泄,mapper執(zhí)行完成之后會(huì)輸出
(A1、B1裂垦、C1顺囊, 2)、
(A1蕉拢、B1特碳、D1, 2)晕换、
(A1午乓、C1、D1闸准, 2)硅瞧、
(B1、C1恕汇、D1腕唧,2)這四項(xiàng)或辖,同樣對(duì)于其他的內(nèi)一行也會(huì)輸出四行,最終他們經(jīng)過reducer的聚合運(yùn)算枣接,得到如下的結(jié)果:
(A1颂暇、B1、C1但惶, 2)
(A1耳鸯、B1、D1膀曾, 2)
(A1县爬、C1、D1添谊, 2 + 3)
(B1财喳、C1、D1斩狱,2 + 5 +6)
這個(gè)例子其實(shí)在cube的構(gòu)建過程中可以看到耳高。
一定要注意,這里的每一輪計(jì)算都是MapReducer任務(wù)所踊,且串行執(zhí)行泌枪;一個(gè)N維的Cube,至少需要N次MapReduce Job秕岛。
算法的優(yōu)點(diǎn)
- 此算法充分利用了MR的能力碌燕,處理了中間復(fù)雜的排序和洗牌工作,故而算法代碼清晰簡單继薛,易于維護(hù)修壕。
- 受益于Hadoop的日趨成熟,此算法對(duì)集群要求低惋增,運(yùn)行穩(wěn)定叠殷。
算法的缺點(diǎn)
- 當(dāng)Cube有比較多維度的時(shí)候,所需要的MR任務(wù)也相應(yīng)增加诈皿;由于Hadoop的任務(wù)調(diào)度需要耗費(fèi)額外資源林束,特別是集群較龐大的時(shí)候,反復(fù)遞交任務(wù)造成的額外開銷會(huì)很可觀
- 由于Mapper不做預(yù)聚合稽亏,此算法會(huì)對(duì)Hadoop MR輸出較多數(shù)據(jù)壶冒;雖然已經(jīng)使用了Combiner來減少從Mapper端到Reducer端的數(shù)據(jù)傳輸,所有數(shù)據(jù)依然需要通過MR來排序和組合才能被聚合截歉,無形之中增加了集群的壓力胖腾。
- 對(duì)HDFS的讀寫操作較多:由于每一層計(jì)算的輸出會(huì)用作下一層計(jì)算的輸入,這些Key-value需要寫到HDFS上;當(dāng)所有計(jì)算都完成后咸作,Kylin還需要額外一輪任務(wù)將這些文件轉(zhuǎn)成Hbase的HFile格式锨阿,以導(dǎo)入到HBase中去。
- 總體而言记罚,該算法的效率較低墅诡,尤其當(dāng)Cube維度數(shù)較大的時(shí)候。
這里其實(shí)在困惑到底什么是0維桐智,后來想明白了末早。舉個(gè)例子,現(xiàn)在有一個(gè)度量叫成交量说庭。有幾個(gè)維度從大到腥涣住:業(yè)務(wù)類型、渠道刊驴、門店姿搜。3維的例子就是[業(yè)務(wù)類型、渠道缺脉、門店],二維的例子是[業(yè)務(wù)類型痪欲、渠道],一維[業(yè)務(wù)類型],0維其實(shí)就是沒有維度悦穿,也就是全部聚合攻礼,舉個(gè)例子就是
select sum(price) from table1
其實(shí)在我看來,逐層算法就是先算維度數(shù)最高的栗柒,一層算完后礁扮,再算維度數(shù)減少的一層,以此類推瞬沦。至于為什么從層級(jí)高向?qū)蛹?jí)低計(jì)算太伊,而不是反過來,在于如果是反過來逛钻,那你每次的計(jì)算量都是初始數(shù)據(jù)僚焦,數(shù)據(jù)量非常大,沒必要曙痘。
快速Cube算法(Fast Cubing)
快速Cube算法芳悲,它還被稱作“逐段”(By Segment)或“逐塊”(By Split)算法。
該算法的主要思想边坤,對(duì)Mapper所分配的數(shù)據(jù)塊名扛,將它計(jì)算成一個(gè)完整的小Cube段(包含所有Cuboid);每個(gè)Mapper將計(jì)算完的Cube段輸出給Reducer做合并茧痒,生成大Cube肮韧,也就是最終結(jié)果。
與舊算法相比,快速算法主要有兩點(diǎn)不同:
- Mapper會(huì)利用內(nèi)存做預(yù)聚合弄企,算出所有組合超燃;Mapper輸出的每個(gè)Key都是不同的,這樣會(huì)減少輸出到Hadoop MapReduce的數(shù)據(jù)量拘领,Combiner也不再需要淋纲;
- 一輪MapReduce便會(huì)完成所有層次的計(jì)算,減少Hadoop任務(wù)的調(diào)配院究。
來說個(gè)比較洽瞬。逐層算法的每一層的計(jì)算都有一個(gè)MapReduce任務(wù),因?yàn)槭菑母呔S到低維的MR任務(wù)业汰,任務(wù)之間傳遞的數(shù)據(jù)量是非常大的伙窃。比如上面的例子,生成4維的數(shù)據(jù)样漆,需要在mapper中對(duì)全數(shù)據(jù)進(jìn)行的整理为障,再傳遞給reducer聚合,如果數(shù)據(jù)量非常大放祟,那么網(wǎng)絡(luò)IO是很大的鳍怨。而快速算法,它會(huì)對(duì)某個(gè)分片數(shù)據(jù)進(jìn)行構(gòu)造完整的cube(所有cuboid)跪妥。再將mapper中的數(shù)據(jù)送入reducer進(jìn)行大聚合生成Cube鞋喇。這其實(shí)是在map階段就已經(jīng)完成了聚合,IO是很小的眉撵。
舉個(gè)例子
這里不理解沒關(guān)系侦香,看完后面的構(gòu)建過程再翻回來看例子就能懂
一個(gè)Cube有4個(gè)維度:A,B,C,D;每個(gè)Mapper都有100萬個(gè)源記錄要處理;Mapper中的列基數(shù)是Car(A),Car(B),Car(C)和Car(D)纽疟。(cardinal 基數(shù))
當(dāng)講源記錄聚集到base cuboid(1111)時(shí)罐韩,使用舊的“逐層”算法,每個(gè)Mapper將向Hadoop輸出1百萬條記錄污朽;使用快速立方算法散吵,在預(yù)聚合之后,它預(yù)聚合之后蟆肆,它只向Hadoop輸出[distinct A,B,C,D]記錄的數(shù)量矾睦,這樣肯定比源數(shù)據(jù)小颓芭;在正常情況下顷锰,他可以源記錄大小的1/10到1/100.
當(dāng)從父cuboid聚合到子cuboid時(shí),從base cuboid(1111) 到3維cuboid 0111,將會(huì)聚合維度A亡问;我們假設(shè)維度A與其他維度獨(dú)立的官紫,聚合后肛宋,cuboid 0111的維度base cuboid的1/Card(A);所以在這一步的輸出將減少到原來的1/Card(A);
總的來說,假設(shè)維度的平均基數(shù)是Card(N),從Mapper到Reducer的寫入記錄可以減少到原始維度的1/Card(N);Hadoop的輸出越少束世,I/O和計(jì)算越少酝陈,性能就越好。
這里要提一句毁涉,其實(shí)很多都是類似的沉帮,比如在hive中處理大表, 各種的調(diào)優(yōu)都和IO贫堰、計(jì)算有關(guān)系穆壕,因?yàn)樗麄兌际腔贛R任務(wù)。
子立方體生成樹(Cuboid spanning Tree)的遍歷次序
在舊算法中其屏,Kylin按照層級(jí)喇勋,也就是廣度優(yōu)先遍歷(Broad First Search)的次序計(jì)算出各個(gè)Cuboid;在快速Cube算法中,Mapper會(huì)按照深度優(yōu)先遍歷(Depth First Search)來計(jì)算各個(gè)Cuboid偎行。
深度優(yōu)先遍歷是一個(gè)遞歸方法川背,將父cuboid壓棧以計(jì)算子Cuboid,直到?jīng)]有子Cuboid需要計(jì)算才出棧并輸出給Hadoop;需要最多暫存N個(gè)Cuboid蛤袒,N是Cube維度數(shù)熄云。
- 采用DFS,是為了兼顧C(jī)PU和內(nèi)存妙真。
- 從父Cuboid計(jì)算子Cuboid缴允,避免重復(fù)計(jì)算。
- 只壓棧當(dāng)前計(jì)算的Cuboid的父Cuboid,減少內(nèi)存占用隐孽。
- 舉個(gè)例子從3維到2維的MR任務(wù)中計(jì)算CD癌椿,BFS會(huì)壓入ABC ABD ACD BCD健蕊,mapper進(jìn)行切分菱阵,reducer進(jìn)行聚合;而在DFS中缩功,只會(huì)壓入ABCD,BCD晴及,內(nèi)存大大減少。
上圖是一個(gè)四維Cube的完整生成樹:
按照DFS的次序嫡锌,在0維Cuboid輸出前的計(jì)算次序是ABCD-》BCD-》CD-》D-》0維虑稼,ABCD,BCD,CD和D需要被暫存;在被輸出后势木,D可被輸出蛛倦,內(nèi)存得到釋放;在C被計(jì)算并輸出后啦桌,CD就可以被輸出溯壶,ABCD最后被輸出及皂。
使用DFS訪問順序,Mapper的輸出已完全排序且改,因?yàn)镃uboid ID位于行鍵的開始位置验烧,而內(nèi)部的Cuboid的行已排序。
0000 0001[D0] 0001[D1] .... 0010[C0] 0010[C1] .... 0011[C0][D0] 0011[C0][D1] .... .... 1111[A0][B0][C0][D0] .... 這里的寫法可以看構(gòu)造過程又跛。
由于mapper的輸出已經(jīng)排序碍拆,Hadoop的排序效率會(huì)更高。
此外慨蓝,mapper的預(yù)聚合發(fā)生在內(nèi)存中感混,這樣可以避免不必要的磁盤和網(wǎng)絡(luò)IO,并減少了hadoop的開銷礼烈。
在開發(fā)階段浩习,我們?cè)趍apper中遇到了OOM錯(cuò)誤;這可能發(fā)生在:
- Mapper的JVM堆大小很小
- 使用 distinct count度量
- 使用樹太深(維度太多)
- 給Mapper的數(shù)據(jù)太大
我們意識(shí)到Kylin不能認(rèn)為mapper總是有足夠的內(nèi)存济丘;Cubing算法需要自適應(yīng)各種情況谱秽;
當(dāng)主動(dòng)檢測到OOM錯(cuò)誤,會(huì)優(yōu)化內(nèi)存使用并將數(shù)據(jù)spilling到磁盤上摹迷;結(jié)果是有希望的疟赊,OOM錯(cuò)誤現(xiàn)在很少發(fā)生。
優(yōu)點(diǎn)
- 它比舊的方法更快峡碉;從我們的比較測試中可以減少30%到50%的build總時(shí)間:快在排序近哟,快在IO。
- 他在Hadoop上產(chǎn)生較少的工作負(fù)載鲫寄,并在HDFS上留下較少的中間文件吉执。
- Cubing和Spark等其他立方體引起可以輕松地重復(fù)使用該立方體代碼。
缺點(diǎn)
該算法有點(diǎn)復(fù)雜地来,這增加了維護(hù)工作戳玫;
雖然該算法可以自動(dòng)將數(shù)據(jù)spill到磁盤,但他仍希望Mapper有足夠的內(nèi)存來獲得最佳性能未斑。
用戶需要更多知識(shí)來調(diào)整立方體咕宿。
By-layer Spark Cubing算法
我們知道,RDD(Resilient Distributed DataSet)是Spark中的一個(gè)基本概念蜡秽。N維立方體的組合可以很好地描述為RDD府阀,N維立方體將具有N+1個(gè)RDD。這些RDD具有parent/child關(guān)系芽突,因?yàn)檫@些parent RDD 可用于生成child RDD试浙。通過將父RDD緩存在內(nèi)存中,子RDD的生成可以比磁盤讀取更有效寞蚌。
改進(jìn)
- 每一層的cuboid視為一個(gè)RDD
- 父RDD被盡可能cache到內(nèi)存
- RDD 被導(dǎo)出為sequence file
- 通過將“map”替換為“flatMap”田巴,以及把“reduce”替換為“reduceByKey”力细,可以復(fù)用大部分代碼
Spark中Cubing的過程
下圖DAG(有向無環(huán)圖),它詳細(xì)說明了這個(gè)過程:
在Stage 5中,Kylin使用HiveContext讀取中間Hive表固额,然后執(zhí)行一個(gè)一對(duì)一映射的"map"操作將原始值編碼為KV字節(jié)眠蚂。完成后Kylin得到一個(gè)中間編碼的RDD。
在Stage 6中斗躏,中間RDD用一個(gè)“ReduceByKey”操作聚合以獲得RDD-1逝慧,這是base cuboid。接下來啄糙,在RDD-1做了一個(gè)flatMap(一對(duì)多map),因?yàn)閎ase cuboid有N個(gè)cuboid笛臣。以此類推,各級(jí)RDD得到計(jì)算隧饼。在完成時(shí)沈堡,這些RDD將完整地保存在分布式文件系統(tǒng),但可以緩存在內(nèi)存中用于下一級(jí)計(jì)算燕雁。當(dāng)生成子cuboid時(shí)诞丽,他將從緩存中刪除。
其實(shí)我們和舊的逐層算法去比較會(huì)發(fā)現(xiàn)拐格,他們之間的構(gòu)建沒有什么大的差別僧免,只不過Spark的是在內(nèi)存中進(jìn)行的,無需從磁盤讀取和網(wǎng)絡(luò)IO捏浊。并且后面的stage的第一步是reduce懂衩。
性能測試
在所有這三種情況下,Spark都比MR快金踪,總體而言它可以減少約一半的時(shí)間浊洞。
Kylin的構(gòu)建算法以及和spark的改進(jìn)
http://cxy7.com/articles/2018/06/09/1528549073259.html
https://www.cnblogs.com/zlslch/p/7404465.html