Spark原本預(yù)計(jì)在2.3版本實(shí)現(xiàn)聚合下推,雖然不知道是何原因最終沒有能夠在2.3版本最終實(shí)現(xiàn)纸型,但是因?yàn)楣ぷ餍枰磁椋仨氁獜木酆虾瘮?shù)下手優(yōu)化Spark SQL,遂思考之實(shí)現(xiàn)之绊袋。
一篇有意義的參考文章
網(wǎng)上有個(gè)牛人想在2.2版本實(shí)現(xiàn)聚合下推并提交代碼到Spark毕匀,結(jié)果在pull request里被拒絕了,Spark的人說他們?cè)?.3會(huì)實(shí)現(xiàn)一套新的DataSource API癌别,即DataSource API v2皂岔,所以讓他不要這么執(zhí)著于提交這個(gè)代碼,我也是很醉展姐。躁垛。。這里貼出他的博文:
該大牛是基于物理計(jì)劃實(shí)現(xiàn)的下推圾笨,局限性比較大教馆。所以我參考了他的思路,從邏輯計(jì)劃和物理計(jì)劃兩個(gè)方面都做了一些優(yōu)化擂达。這里只講邏輯計(jì)劃的下推土铺。下推,必然最后是推到數(shù)據(jù)源層板鬓,而Spark沒有實(shí)現(xiàn)DataSource的聚合數(shù)據(jù)源的接口悲敷,這里可以參考下剛剛分享博文實(shí)現(xiàn)的AggregatedFilteredScan
接口,我也是基于這個(gè)接口的做法實(shí)現(xiàn)的俭令。
下推的意義
無論是傳統(tǒng)的謂詞下推后德,還是聚合下推,意義都在于將一些操作推到數(shù)據(jù)源層抄腔,這樣從數(shù)據(jù)源里返回的數(shù)據(jù)就會(huì)極大減少瓢湃。磁盤讀寫和網(wǎng)絡(luò)開銷都會(huì)降低理张,性能會(huì)得到提升。
難點(diǎn)的實(shí)現(xiàn)思路
聚合下推的最大難點(diǎn)绵患,我認(rèn)為是遇到了join雾叭,當(dāng)join的on的兩列不屬于group列或者aggregate列該腫么辦。最開始我認(rèn)為這種情況可能沒有辦法下推藏雏,因?yàn)檫@樣勢(shì)必要在在group列中加上了原本不屬于group的某一join列拷况,這樣會(huì)影響聚合的結(jié)果并且會(huì)多一次聚合。但是經(jīng)過大神提點(diǎn)掘殴,其實(shí)這樣也是可以下推的,原因有二:
- 即使多一個(gè)聚合節(jié)點(diǎn)粟誓,SQL執(zhí)行的結(jié)果也是對(duì)的奏寨,也就是最終結(jié)果來看其實(shí)不應(yīng)該聚合結(jié)果。
- 一般來說鹰服,join on的兩列不可能有相同的行數(shù)病瞳,如果行數(shù)相同,那么按照數(shù)據(jù)庫的設(shè)計(jì)規(guī)范悲酷,這兩張表就應(yīng)該Union成一張表套菜。所以多的這個(gè)聚合節(jié)點(diǎn),也是會(huì)減少數(shù)據(jù)源的數(shù)據(jù)傳輸?shù)摹?/li>
這兩點(diǎn)在后面的例子會(huì)有展示设易。
一個(gè)下推到j(luò)oin的簡(jiǎn)單思路以及結(jié)果
數(shù)據(jù)源準(zhǔn)備
使用寫一個(gè)Spark DataSource的隨手筆記的Scott數(shù)據(jù)源逗柴。
初始SQL以及邏輯計(jì)劃
SQL:
SELECT AVG(salary), deptName
FROM emp
JOIN dept
ON emp.deptNo = dept.deptNo
GROUP BY deptName;
LogicalPlan:
Aggregate [deptName#8], [avg(cast(salary#6 as double)) AS avg(salary)#19, deptName#8]
+- Project [salary#6, deptName#8]
+- Join Inner, (deptNo#5 = deptNo#7)
:- Project [deptNo#5, salary#6]
: +- Filter isnotnull(deptNo#5)
: +- Relation[empNo#2,empName#3,mgr#4,deptNo#5,salary#6] TestAggregatePushdownPlan2Scan(emp,StructType(StructField(empNo,IntegerType,true), StructField(empName,StringType,true), StructField(mgr,IntegerType,true), StructField(deptNo,IntegerType,true), StructField(salary,FloatType,true)))
+- Project [deptNo#7, deptName#8]
+- Filter isnotnull(deptNo#7)
+- Relation[deptNo#7,deptName#8,loc#9] TestAggregatePushdownPlan2Scan(dept,StructType(StructField(deptNo,IntegerType,true), StructField(deptName,StringType,true), StructField(loc,StringType,true)))
逐步下推
- Step 1,指針在最上層顿肺,aggregate節(jié)點(diǎn)推到其child節(jié)點(diǎn)project的下面戏溺,同時(shí)將project里的salary#6替換成avg(salary)#19:
Project [avg(salary)#19, deptName#8]
+- Aggregate [deptName#8], [avg(cast(salary#6 as double)) AS avg(salary)#19, deptName#8]
+- Join Inner, (deptNo#5 = deptNo#7)
:- Project [deptNo#5, salary#6]
: +- Filter isnotnull(deptNo#5)
: +- Relation[empNo#2,empName#3,mgr#4,deptNo#5,salary#6] TestAggregatePushdownPlan2Scan(emp,StructType(StructField(empNo,IntegerType,true), StructField(empName,StringType,true), StructField(mgr,IntegerType,true), StructField(deptNo,IntegerType,true), StructField(salary,FloatType,true)))
+- Project [deptNo#7, deptName#8]
+- Filter isnotnull(deptNo#7)
+- Relation[deptNo#7,deptName#8,loc#9] TestAggregatePushdownPlan2Scan(dept,StructType(StructField(deptNo,IntegerType,true), StructField(deptName,StringType,true), StructField(loc,StringType,true)))
- Step 2,指針在第二層屠尊,這次是下推一層后的aggregate節(jié)點(diǎn)旷祸,搜索join節(jié)點(diǎn)下面的左右子project節(jié)點(diǎn),看哪個(gè)有salary#6讼昆,往salary#6所在的子節(jié)點(diǎn)上方添加一個(gè)以join字段為group by條件的聚合節(jié)點(diǎn)托享,假設(shè)新生成exprId是20:
Project [avg(salary)#19, deptName#8]
+- Aggregate [deptName#8], [avg(cast(salary#20 as double)) AS avg(salary)#19, deptName#8]
+- Join Inner, (deptNo#5 = deptNo#7)
:- Aggregate [deptNo#5], [avg(cast(salary#6 as double)) AS salary#20, deptNo#5]
: +- Project [deptNo#5, salary#6]
: +- Filter isnotnull(deptNo#5)
: +- Relation[empNo#2,empName#3,mgr#4,deptNo#5,salary#6] TestAggregatePushdownPlan2Scan(emp,StructType(StructField(empNo,IntegerType,true), StructField(empName,StringType,true), StructField(mgr,IntegerType,true), StructField(deptNo,IntegerType,true), StructField(salary,FloatType,true)))
+- Project [deptNo#7, deptName#8]
+- Filter isnotnull(deptNo#7)
+- Relation[deptNo#7,deptName#8,loc#9] TestAggregatePushdownPlan2Scan(dept,StructType(StructField(deptNo,IntegerType,true), StructField(deptName,StringType,true), StructField(loc,StringType,true)))
- Step 3,指針在第4層的aggregate浸赫,與1類似闰围,將aggregate推到project下方,并將project中的salary#6替換成salary#20:
Project [avg(salary)#19, deptName#8]
+- Aggregate [deptName#8], [avg(cast(salary#20 as double)) AS avg(salary)#19, deptName#8]
+- Join Inner, (deptNo#5 = deptNo#7)
:- Project [deptNo#5, salary#20]
: +- Aggregate [deptNo#5], [avg(cast(salary#6 as double)) AS salary#20, deptNo#5]
: +- Filter isnotnull(deptNo#5)
: +- Relation[empNo#2,empName#3,mgr#4,deptNo#5,salary#6] TestAggregatePushdownPlan2Scan(emp,StructType(StructField(empNo,IntegerType,true), StructField(empName,StringType,true), StructField(mgr,IntegerType,true), StructField(deptNo,IntegerType,true), StructField(salary,FloatType,true)))
+- Project [deptNo#7, deptName#8]
+- Filter isnotnull(deptNo#7)
+- Relation[deptNo#7,deptName#8,loc#9] TestAggregatePushdownPlan2Scan(dept,StructType(StructField(deptNo,IntegerType,true), StructField(deptName,StringType,true), StructField(loc,StringType,true)))
- Step 2掺炭,此時(shí)aggregate的子節(jié)點(diǎn)是join辫诅,將avg(salary)#19的exprId(19)生成新id,假設(shè)這里變成avg(salary)#20:
Project [avg(salary)#20, deptName#8]
+- Aggregate [deptName#8], [avg(cast(salary#6 as double)) AS avg(salary)#20, deptName#8]
+- Join Inner, (deptNo#5 = deptNo#7)
:- Project [deptNo#5, salary#6]
: +- Filter isnotnull(deptNo#5)
: +- Relation[empNo#2,empName#3,mgr#4,deptNo#5,salary#6] TestAggregatePushdownPlan2Scan(emp,StructType(StructField(empNo,IntegerType,true), StructField(empName,StringType,true), StructField(mgr,IntegerType,true), StructField(deptNo,IntegerType,true), StructField(salary,FloatType,true)))
+- Project [deptNo#7, deptName#8]
+- Filter isnotnull(deptNo#7)
+- Relation[deptNo#7,deptName#8,loc#9] TestAggregatePushdownPlan2Scan(dept,StructType(StructField(deptNo,IntegerType,true), StructField(deptName,StringType,true), StructField(loc,StringType,true)))
- Step 3涧狮,搜索join節(jié)點(diǎn)下面的左右子project節(jié)點(diǎn)炕矮,看哪個(gè)有salary#6么夫,往salary#6所在的子節(jié)點(diǎn)上方添加一個(gè)以join字段為group by條件的聚合節(jié)點(diǎn),假設(shè)新生成exprId是21:
Project [avg(salary)#20, deptName#8]
+- Aggregate [deptName#8], [avg(cast(salary#21 as double)) AS avg(salary)#20, deptName#8]
+- Join Inner, (deptNo#5 = deptNo#7)
:- Aggregate [deptNo#5], [avg(cast(salary#6 as double)) AS salary#21, deptNo#5]
: +- Project [deptNo#5, salary#6]
: +- Filter isnotnull(deptNo#5)
: +- Relation[empNo#2,empName#3,mgr#4,deptNo#5,salary#6] TestAggregatePushdownPlan2Scan(emp,StructType(StructField(empNo,IntegerType,true), StructField(empName,StringType,true), StructField(mgr,IntegerType,true), StructField(deptNo,IntegerType,true), StructField(salary,FloatType,true)))
+- Project [deptNo#7, deptName#8]
+- Filter isnotnull(deptNo#7)
+- Relation[deptNo#7,deptName#8,loc#9] TestAggregatePushdownPlan2Scan(dept,StructType(StructField(deptNo,IntegerType,true), StructField(deptName,StringType,true), StructField(loc,StringType,true)))
- Step 4肤视,與1類似档痪,將aggregate推到project下方,并將project中的salary#6替換成salary#21:
Project [avg(salary)#20, deptName#8]
+- Aggregate [deptName#8], [avg(cast(salary#6 as double)) AS avg(salary)#20, deptName#8]
+- Join Inner, (deptNo#5 = deptNo#7)
:- Project [deptNo#5, salary#21]
: +- Aggregate [deptNo#5], [avg(cast(salary#6 as double)) AS salary#21, deptNo#5]
: +- Filter isnotnull(deptNo#5)
: +- Relation[empNo#2,empName#3,mgr#4,deptNo#5,salary#6] TestAggregatePushdownPlan2Scan(emp,StructType(StructField(empNo,IntegerType,true), StructField(empName,StringType,true), StructField(mgr,IntegerType,true), StructField(deptNo,IntegerType,true), StructField(salary,FloatType,true)))
+- Project [deptNo#7, deptName#8]
+- Filter isnotnull(deptNo#7)
+- Relation[deptNo#7,deptName#8,loc#9] TestAggregatePushdownPlan2Scan(dept,StructType(StructField(deptNo,IntegerType,true), StructField(deptName,StringType,true), StructField(loc,StringType,true)))
這時(shí)aggregate-filter-relation的組合就會(huì)調(diào)用到上面提到的AggregatedFilteredScan
接口邢滑,調(diào)用到數(shù)據(jù)源的buildscan()方法腐螟。
結(jié)果查看
這個(gè)下推,其實(shí)等價(jià)于下面的SQL和執(zhí)行計(jì)劃:
SQL:
SELECT AVG(salary), deptName
FROM
(SELECT AVG(salary), deptNo FROM emp GROUP BY deptNo) a
JOIN dept
ON a.deptNo = dept.deptNo
GROUP BY deptName
LogicalPlan:
Project [avg(avgsalary)#21, deptName#9]
+- Aggregate [deptName#9], [avg(avgsalary#2) AS avg(avgsalary)#21, deptName#9]
+- Join Inner, (deptNo#6 = deptNo#8)
:- Project [deptNo#6, avgsalary#2]
: +- Aggregate [deptNo#6], [avg(cast(salary#7 as double)) AS avgsalary#2, deptNo#6]
: +- Filter isnotnull(deptNo#6)
: +- Relation[empNo#3,empName#4,mgr#5,deptNo#6,salary#7] TestAggregatePushdownPlan2Scan(emp,StructType(StructField(empNo,IntegerType,true), StructField(empName,StringType,true), StructField(mgr,IntegerType,true), StructField(deptNo,IntegerType,true), StructField(salary,FloatType,true)))
+- Project [deptNo#8, deptName#9]
+- Filter isnotnull(deptNo#8)
+- Relation[deptNo#8,deptName#9,loc#10] TestAggregatePushdownPlan2Scan(dept,StructType(StructField(deptNo,IntegerType,true), StructField(deptName,StringType,true), StructField(loc,StringType,true)))
在沒有聚合下推的情況下困后,返回的join兩側(cè)的數(shù)據(jù)源是:
data without aggregate function:
List(ListBuffer(20, 800.0), ListBuffer(20, 3000.0), ListBuffer(20, 2975.0), ListBuffer(30, 1600.0), ListBuffer(30, 1250.0), ListBuffer(30, 2950.0), ListBuffer(10, 5000.0))
data without aggregate function:
List(ListBuffer(10, accounting), ListBuffer(20, research), ListBuffer(30, sales), ListBuffer(40, operations))
而在下推的情況下乐纸,返回的join兩側(cè)數(shù)據(jù)源是:
data with aggregate function:
ListBuffer(List(30, 5800.0, 3), List(20, 6775.0, 3), List(10, 5000.0, 1))
data without aggregate function:
List(ListBuffer(10, accounting), ListBuffer(20, research), ListBuffer(30, sales), ListBuffer(40, operations))
可以看到,下推和沒下推摇予,在一側(cè)數(shù)據(jù)源中拿到的數(shù)據(jù)有明顯減少汽绢。這還只是數(shù)據(jù)量在不到20的情況下。在數(shù)據(jù)量大的情況下侧戴,那么聚合下推效果會(huì)更好宁昭。
二者查詢結(jié)果都是:
+------------------+----------+
|avg(salary) |deptName |
+------------------+----------+
|1933.3333333333333|sales |
|5000.0 |accounting|
|2258.3333333333335|research |
+------------------+----------+