Spark聚合下推思路以及demo

Spark原本預(yù)計(jì)在2.3版本實(shí)現(xiàn)聚合下推,雖然不知道是何原因最終沒有能夠在2.3版本最終實(shí)現(xiàn)纸型,但是因?yàn)楣ぷ餍枰磁椋仨氁獜木酆虾瘮?shù)下手優(yōu)化Spark SQL,遂思考之實(shí)現(xiàn)之绊袋。

一篇有意義的參考文章

網(wǎng)上有個(gè)牛人想在2.2版本實(shí)現(xiàn)聚合下推并提交代碼到Spark毕匀,結(jié)果在pull request里被拒絕了,Spark的人說他們?cè)?.3會(huì)實(shí)現(xiàn)一套新的DataSource API癌别,即DataSource API v2皂岔,所以讓他不要這么執(zhí)著于提交這個(gè)代碼,我也是很醉展姐。躁垛。。這里貼出他的博文:

SparkSQL如何實(shí)現(xiàn)聚合下推

該大牛是基于物理計(jì)劃實(shí)現(xiàn)的下推圾笨,局限性比較大教馆。所以我參考了他的思路,從邏輯計(jì)劃和物理計(jì)劃兩個(gè)方面都做了一些優(yōu)化擂达。這里只講邏輯計(jì)劃的下推土铺。下推,必然最后是推到數(shù)據(jù)源層板鬓,而Spark沒有實(shí)現(xiàn)DataSource的聚合數(shù)據(jù)源的接口悲敷,這里可以參考下剛剛分享博文實(shí)現(xiàn)的AggregatedFilteredScan接口,我也是基于這個(gè)接口的做法實(shí)現(xiàn)的俭令。

下推的意義

無論是傳統(tǒng)的謂詞下推后德,還是聚合下推,意義都在于將一些操作推到數(shù)據(jù)源層抄腔,這樣從數(shù)據(jù)源里返回的數(shù)據(jù)就會(huì)極大減少瓢湃。磁盤讀寫和網(wǎng)絡(luò)開銷都會(huì)降低理张,性能會(huì)得到提升。

難點(diǎn)的實(shí)現(xiàn)思路

聚合下推的最大難點(diǎn)绵患,我認(rèn)為是遇到了join雾叭,當(dāng)join的on的兩列不屬于group列或者aggregate列該腫么辦。最開始我認(rèn)為這種情況可能沒有辦法下推藏雏,因?yàn)檫@樣勢(shì)必要在在group列中加上了原本不屬于group的某一join列拷况,這樣會(huì)影響聚合的結(jié)果并且會(huì)多一次聚合。但是經(jīng)過大神提點(diǎn)掘殴,其實(shí)這樣也是可以下推的,原因有二:

  1. 即使多一個(gè)聚合節(jié)點(diǎn)粟誓,SQL執(zhí)行的結(jié)果也是對(duì)的奏寨,也就是最終結(jié)果來看其實(shí)不應(yīng)該聚合結(jié)果。
  2. 一般來說鹰服,join on的兩列不可能有相同的行數(shù)病瞳,如果行數(shù)相同,那么按照數(shù)據(jù)庫的設(shè)計(jì)規(guī)范悲酷,這兩張表就應(yīng)該Union成一張表套菜。所以多的這個(gè)聚合節(jié)點(diǎn),也是會(huì)減少數(shù)據(jù)源的數(shù)據(jù)傳輸?shù)摹?/li>

這兩點(diǎn)在后面的例子會(huì)有展示设易。

一個(gè)下推到j(luò)oin的簡(jiǎn)單思路以及結(jié)果

數(shù)據(jù)源準(zhǔn)備

使用寫一個(gè)Spark DataSource的隨手筆記的Scott數(shù)據(jù)源逗柴。

初始SQL以及邏輯計(jì)劃

SQL:

SELECT AVG(salary), deptName 
FROM emp 
JOIN dept 
ON emp.deptNo = dept.deptNo 
GROUP BY deptName;

LogicalPlan:

Aggregate [deptName#8], [avg(cast(salary#6 as double)) AS avg(salary)#19, deptName#8]
+- Project [salary#6, deptName#8]
   +- Join Inner, (deptNo#5 = deptNo#7)
      :- Project [deptNo#5, salary#6]
      :  +- Filter isnotnull(deptNo#5)
      :     +- Relation[empNo#2,empName#3,mgr#4,deptNo#5,salary#6] TestAggregatePushdownPlan2Scan(emp,StructType(StructField(empNo,IntegerType,true), StructField(empName,StringType,true), StructField(mgr,IntegerType,true), StructField(deptNo,IntegerType,true), StructField(salary,FloatType,true)))
      +- Project [deptNo#7, deptName#8]
         +- Filter isnotnull(deptNo#7)
            +- Relation[deptNo#7,deptName#8,loc#9] TestAggregatePushdownPlan2Scan(dept,StructType(StructField(deptNo,IntegerType,true), StructField(deptName,StringType,true), StructField(loc,StringType,true)))

逐步下推

  • Step 1,指針在最上層顿肺,aggregate節(jié)點(diǎn)推到其child節(jié)點(diǎn)project的下面戏溺,同時(shí)將project里的salary#6替換成avg(salary)#19:
Project [avg(salary)#19, deptName#8]
+- Aggregate [deptName#8], [avg(cast(salary#6 as double)) AS avg(salary)#19, deptName#8]
   +- Join Inner, (deptNo#5 = deptNo#7)
      :- Project [deptNo#5, salary#6]
      :  +- Filter isnotnull(deptNo#5)
      :     +- Relation[empNo#2,empName#3,mgr#4,deptNo#5,salary#6] TestAggregatePushdownPlan2Scan(emp,StructType(StructField(empNo,IntegerType,true), StructField(empName,StringType,true), StructField(mgr,IntegerType,true), StructField(deptNo,IntegerType,true), StructField(salary,FloatType,true)))
      +- Project [deptNo#7, deptName#8]
         +- Filter isnotnull(deptNo#7)
            +- Relation[deptNo#7,deptName#8,loc#9] TestAggregatePushdownPlan2Scan(dept,StructType(StructField(deptNo,IntegerType,true), StructField(deptName,StringType,true), StructField(loc,StringType,true)))
  • Step 2,指針在第二層屠尊,這次是下推一層后的aggregate節(jié)點(diǎn)旷祸,搜索join節(jié)點(diǎn)下面的左右子project節(jié)點(diǎn),看哪個(gè)有salary#6讼昆,往salary#6所在的子節(jié)點(diǎn)上方添加一個(gè)以join字段為group by條件的聚合節(jié)點(diǎn)托享,假設(shè)新生成exprId是20:
Project [avg(salary)#19, deptName#8]
+- Aggregate [deptName#8], [avg(cast(salary#20 as double)) AS avg(salary)#19, deptName#8]
   +- Join Inner, (deptNo#5 = deptNo#7)
      :- Aggregate [deptNo#5], [avg(cast(salary#6 as double)) AS salary#20, deptNo#5]
      :  +- Project [deptNo#5, salary#6]
      :     +- Filter isnotnull(deptNo#5)
      :        +- Relation[empNo#2,empName#3,mgr#4,deptNo#5,salary#6] TestAggregatePushdownPlan2Scan(emp,StructType(StructField(empNo,IntegerType,true), StructField(empName,StringType,true), StructField(mgr,IntegerType,true), StructField(deptNo,IntegerType,true), StructField(salary,FloatType,true)))
      +- Project [deptNo#7, deptName#8]
         +- Filter isnotnull(deptNo#7)
            +- Relation[deptNo#7,deptName#8,loc#9] TestAggregatePushdownPlan2Scan(dept,StructType(StructField(deptNo,IntegerType,true), StructField(deptName,StringType,true), StructField(loc,StringType,true)))
  • Step 3,指針在第4層的aggregate浸赫,與1類似闰围,將aggregate推到project下方,并將project中的salary#6替換成salary#20:
Project [avg(salary)#19, deptName#8]
+- Aggregate [deptName#8], [avg(cast(salary#20 as double)) AS avg(salary)#19, deptName#8]
   +- Join Inner, (deptNo#5 = deptNo#7)
      :- Project [deptNo#5, salary#20]
      :  +- Aggregate [deptNo#5], [avg(cast(salary#6 as double)) AS salary#20, deptNo#5]
      :     +- Filter isnotnull(deptNo#5)
      :        +- Relation[empNo#2,empName#3,mgr#4,deptNo#5,salary#6] TestAggregatePushdownPlan2Scan(emp,StructType(StructField(empNo,IntegerType,true), StructField(empName,StringType,true), StructField(mgr,IntegerType,true), StructField(deptNo,IntegerType,true), StructField(salary,FloatType,true)))
      +- Project [deptNo#7, deptName#8]
         +- Filter isnotnull(deptNo#7)
            +- Relation[deptNo#7,deptName#8,loc#9] TestAggregatePushdownPlan2Scan(dept,StructType(StructField(deptNo,IntegerType,true), StructField(deptName,StringType,true), StructField(loc,StringType,true)))
  • Step 2掺炭,此時(shí)aggregate的子節(jié)點(diǎn)是join辫诅,將avg(salary)#19的exprId(19)生成新id,假設(shè)這里變成avg(salary)#20:
Project [avg(salary)#20, deptName#8]
+- Aggregate [deptName#8], [avg(cast(salary#6 as double)) AS avg(salary)#20, deptName#8]
   +- Join Inner, (deptNo#5 = deptNo#7)
      :- Project [deptNo#5, salary#6]
      :  +- Filter isnotnull(deptNo#5)
      :     +- Relation[empNo#2,empName#3,mgr#4,deptNo#5,salary#6] TestAggregatePushdownPlan2Scan(emp,StructType(StructField(empNo,IntegerType,true), StructField(empName,StringType,true), StructField(mgr,IntegerType,true), StructField(deptNo,IntegerType,true), StructField(salary,FloatType,true)))
      +- Project [deptNo#7, deptName#8]
         +- Filter isnotnull(deptNo#7)
            +- Relation[deptNo#7,deptName#8,loc#9] TestAggregatePushdownPlan2Scan(dept,StructType(StructField(deptNo,IntegerType,true), StructField(deptName,StringType,true), StructField(loc,StringType,true)))
  • Step 3涧狮,搜索join節(jié)點(diǎn)下面的左右子project節(jié)點(diǎn)炕矮,看哪個(gè)有salary#6么夫,往salary#6所在的子節(jié)點(diǎn)上方添加一個(gè)以join字段為group by條件的聚合節(jié)點(diǎn),假設(shè)新生成exprId是21:
Project [avg(salary)#20, deptName#8]
+- Aggregate [deptName#8], [avg(cast(salary#21 as double)) AS avg(salary)#20, deptName#8]
   +- Join Inner, (deptNo#5 = deptNo#7)
      :- Aggregate [deptNo#5], [avg(cast(salary#6 as double)) AS salary#21, deptNo#5]
      :  +- Project [deptNo#5, salary#6]
      :     +- Filter isnotnull(deptNo#5)
      :        +- Relation[empNo#2,empName#3,mgr#4,deptNo#5,salary#6] TestAggregatePushdownPlan2Scan(emp,StructType(StructField(empNo,IntegerType,true), StructField(empName,StringType,true), StructField(mgr,IntegerType,true), StructField(deptNo,IntegerType,true), StructField(salary,FloatType,true)))
      +- Project [deptNo#7, deptName#8]
         +- Filter isnotnull(deptNo#7)
            +- Relation[deptNo#7,deptName#8,loc#9] TestAggregatePushdownPlan2Scan(dept,StructType(StructField(deptNo,IntegerType,true), StructField(deptName,StringType,true), StructField(loc,StringType,true)))
  • Step 4肤视,與1類似档痪,將aggregate推到project下方,并將project中的salary#6替換成salary#21:
Project [avg(salary)#20, deptName#8]
+- Aggregate [deptName#8], [avg(cast(salary#6 as double)) AS avg(salary)#20, deptName#8]
   +- Join Inner, (deptNo#5 = deptNo#7)
      :- Project [deptNo#5, salary#21]
      :  +- Aggregate [deptNo#5], [avg(cast(salary#6 as double)) AS salary#21, deptNo#5]
      :     +- Filter isnotnull(deptNo#5)
      :        +- Relation[empNo#2,empName#3,mgr#4,deptNo#5,salary#6] TestAggregatePushdownPlan2Scan(emp,StructType(StructField(empNo,IntegerType,true), StructField(empName,StringType,true), StructField(mgr,IntegerType,true), StructField(deptNo,IntegerType,true), StructField(salary,FloatType,true)))
      +- Project [deptNo#7, deptName#8]
         +- Filter isnotnull(deptNo#7)
            +- Relation[deptNo#7,deptName#8,loc#9] TestAggregatePushdownPlan2Scan(dept,StructType(StructField(deptNo,IntegerType,true), StructField(deptName,StringType,true), StructField(loc,StringType,true)))

這時(shí)aggregate-filter-relation的組合就會(huì)調(diào)用到上面提到的AggregatedFilteredScan接口邢滑,調(diào)用到數(shù)據(jù)源的buildscan()方法腐螟。

結(jié)果查看

這個(gè)下推,其實(shí)等價(jià)于下面的SQL和執(zhí)行計(jì)劃:

SQL:

SELECT AVG(salary), deptName 
FROM 
(SELECT AVG(salary), deptNo FROM emp GROUP BY deptNo) a 
JOIN dept 
ON a.deptNo = dept.deptNo 
GROUP BY deptName

LogicalPlan:

Project [avg(avgsalary)#21, deptName#9]
+- Aggregate [deptName#9], [avg(avgsalary#2) AS avg(avgsalary)#21, deptName#9]
   +- Join Inner, (deptNo#6 = deptNo#8)
      :- Project [deptNo#6, avgsalary#2]
      :  +- Aggregate [deptNo#6], [avg(cast(salary#7 as double)) AS avgsalary#2, deptNo#6]
      :     +- Filter isnotnull(deptNo#6)
      :        +- Relation[empNo#3,empName#4,mgr#5,deptNo#6,salary#7] TestAggregatePushdownPlan2Scan(emp,StructType(StructField(empNo,IntegerType,true), StructField(empName,StringType,true), StructField(mgr,IntegerType,true), StructField(deptNo,IntegerType,true), StructField(salary,FloatType,true)))
      +- Project [deptNo#8, deptName#9]
         +- Filter isnotnull(deptNo#8)
            +- Relation[deptNo#8,deptName#9,loc#10] TestAggregatePushdownPlan2Scan(dept,StructType(StructField(deptNo,IntegerType,true), StructField(deptName,StringType,true), StructField(loc,StringType,true)))

在沒有聚合下推的情況下困后,返回的join兩側(cè)的數(shù)據(jù)源是:

data without aggregate function: 
  List(ListBuffer(20, 800.0), ListBuffer(20, 3000.0), ListBuffer(20, 2975.0), ListBuffer(30, 1600.0), ListBuffer(30, 1250.0), ListBuffer(30, 2950.0), ListBuffer(10, 5000.0))
data without aggregate function: 
  List(ListBuffer(10, accounting), ListBuffer(20, research), ListBuffer(30, sales), ListBuffer(40, operations))

而在下推的情況下乐纸,返回的join兩側(cè)數(shù)據(jù)源是:

data with aggregate function: 
  ListBuffer(List(30, 5800.0, 3), List(20, 6775.0, 3), List(10, 5000.0, 1))
data without aggregate function: 
  List(ListBuffer(10, accounting), ListBuffer(20, research), ListBuffer(30, sales), ListBuffer(40, operations))

可以看到,下推和沒下推摇予,在一側(cè)數(shù)據(jù)源中拿到的數(shù)據(jù)有明顯減少汽绢。這還只是數(shù)據(jù)量在不到20的情況下。在數(shù)據(jù)量大的情況下侧戴,那么聚合下推效果會(huì)更好宁昭。

二者查詢結(jié)果都是:

+------------------+----------+
|avg(salary)       |deptName  |
+------------------+----------+
|1933.3333333333333|sales     |
|5000.0            |accounting|
|2258.3333333333335|research  |
+------------------+----------+
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市酗宋,隨后出現(xiàn)的幾起案子盲链,更是在濱河造成了極大的恐慌厚脉,老刑警劉巖羊苟,帶你破解...
    沈念sama閱讀 206,839評(píng)論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件腹备,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡丹锹,警方通過查閱死者的電腦和手機(jī)稀颁,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,543評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來楣黍,“玉大人匾灶,你說我怎么就攤上這事∽馄” “怎么了阶女?”我有些...
    開封第一講書人閱讀 153,116評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)哩治。 經(jīng)常有香客問我秃踩,道長(zhǎng),這世上最難降的妖魔是什么业筏? 我笑而不...
    開封第一講書人閱讀 55,371評(píng)論 1 279
  • 正文 為了忘掉前任憔杨,我火速辦了婚禮,結(jié)果婚禮上蒜胖,老公的妹妹穿的比我還像新娘消别。我一直安慰自己抛蚤,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,384評(píng)論 5 374
  • 文/花漫 我一把揭開白布寻狂。 她就那樣靜靜地躺著岁经,像睡著了一般。 火紅的嫁衣襯著肌膚如雪蛇券。 梳的紋絲不亂的頭發(fā)上缀壤,一...
    開封第一講書人閱讀 49,111評(píng)論 1 285
  • 那天,我揣著相機(jī)與錄音纠亚,去河邊找鬼塘慕。 笑死,一個(gè)胖子當(dāng)著我的面吹牛蒂胞,可吹牛的內(nèi)容都是我干的苍糠。 我是一名探鬼主播,決...
    沈念sama閱讀 38,416評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼啤誊,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了拥娄?” 一聲冷哼從身側(cè)響起蚊锹,我...
    開封第一講書人閱讀 37,053評(píng)論 0 259
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎稚瘾,沒想到半個(gè)月后牡昆,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,558評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡摊欠,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,007評(píng)論 2 325
  • 正文 我和宋清朗相戀三年丢烘,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片些椒。...
    茶點(diǎn)故事閱讀 38,117評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡播瞳,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出免糕,到底是詐尸還是另有隱情赢乓,我是刑警寧澤,帶...
    沈念sama閱讀 33,756評(píng)論 4 324
  • 正文 年R本政府宣布石窑,位于F島的核電站牌芋,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏松逊。R本人自食惡果不足惜躺屁,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,324評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望经宏。 院中可真熱鬧犀暑,春花似錦驯击、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,315評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至苹熏,卻和暖如春碟贾,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背轨域。 一陣腳步聲響...
    開封第一講書人閱讀 31,539評(píng)論 1 262
  • 我被黑心中介騙來泰國打工袱耽, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人干发。 一個(gè)月前我還...
    沈念sama閱讀 45,578評(píng)論 2 355
  • 正文 我出身青樓朱巨,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國和親枉长。 傳聞我的和親對(duì)象是個(gè)殘疾皇子冀续,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,877評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容