聚合函數(shù)的分類
聲明式聚合函數(shù): 可以由Catalyst中的表達式直接構建的聚合函數(shù)危彩,也是比較簡單的聚合函數(shù)類型徒欣,最常見的count, sum,avg等都是聲明式聚合函數(shù)逐样。
命令式聚合函數(shù): 是指一類需要顯式實現(xiàn)幾個方法來操作聚合緩沖區(qū)AggBuffer中的數(shù)據(jù)的聚合函數(shù)。命令式聚合函數(shù)不那么常見,能找到的命令式聚合函數(shù)包括基數(shù)統(tǒng)計hyperLogLogPlus脂新、透視轉換pivotFirst等秽澳。
帶類型的命令式聚合函數(shù): 帶類型的命令式聚合函數(shù)是最靈活的一種聚合函數(shù)類型,它允許使用用戶自定義對象作為聚合緩沖區(qū)戏羽。涉及用戶自定義類型的聚合都是這種類型担神,例如collect_list、collect_set始花、percentile等妄讯。
聚合緩沖區(qū)和聚合模式
Partial模式: 先把局部數(shù)據(jù)進行聚合,比如先計算每個分區(qū)的sum
PartialMerge模式: 出現(xiàn)在多種類型的聚合函數(shù)同時聚合的情況酷宵,比如同時聚集sum和countDistinct亥贸。這時候緩沖區(qū)聚合之后的結果,仍然是中間結果浇垦;
Final模式: 把聚合緩沖區(qū)中的聚合結果再進行聚合炕置,比如計算分區(qū)sum的sum;
Complete模式: 沒有中間聚合過程男韧,每個分組的全體值都需要在一次聚合過程中參與計算朴摊。(待舉例)
在當前的實現(xiàn)中,這幾個模式的分類其實并不是很好此虑,可以參考AggregationIterator中的注釋:
/**
* The following combinations of AggregationMode are supported:
* - Partial
* - PartialMerge (for single distinct)
* - Partial and PartialMerge (for single distinct)
* - Final
* - Complete (for SortAggregate with functions that does not support Partial)
* - Final and Complete (currently not used)
*
* TODO: AggregateMode should have only two modes: Update and Merge, AggregateExpression
* could have a flag to tell it's final or not.
*/
planAggregateWithoutDistinct:不帶有distinct方法的聚合實現(xiàn)
-
Step1: 【Partial模式】計算聚合的Partial結果
groupingExpressions:group列(a)
aggregateExpressions: 聚合(partial_sum(cast(b#11 as bigint))]))
-
Step2: 【Final】計算聚合的Final結果
groupingExpressions:group列(a) + distinct使用的列(b)
aggregateExpressions: 聚合(sum(b))
create temporary view data as select * from values
(1, 1),
(1, 2),
(2, 1),
(2, 2),
(3, 1),
(3, 2)
as data(a, b);
explain select sum(b) from data group by a;
== Physical Plan ==
*HashAggregate(keys=[a#10], functions=[sum(cast(b#11 as bigint))])
+- Exchange hashpartitioning(a#10, 200)
+- *HashAggregate(keys=[a#10], functions=[partial_sum(cast(b#11 as bigint))])
+- LocalTableScan [a#10, b#11]
planAggregateWithOneDistinct: 帶有distinct方法的聚合實現(xiàn)
create temporary view data as select * from values
(1, 1),
(1, 2),
(2, 1),
(2, 2),
(3, 1),
(3, 2)
as data(a, b);
explain select sum(b), sum(distinct b) from data group by a;
== Physical Plan ==
*HashAggregate(keys=[a#10], functions=[sum(cast(b#11 as bigint)), sum(distinct cast(b#11 as bigint)#94L)])
+- Exchange hashpartitioning(a#10, 200)
+- *HashAggregate(keys=[a#10], functions=[merge_sum(cast(b#11 as bigint)), partial_sum(distinct cast(b#11 as bigint)#94L)]) // step3
+- *HashAggregate(keys=[a#10, cast(b#11 as bigint)#94L], functions=[merge_sum(cast(b#11 as bigint))]) // step2
+- Exchange hashpartitioning(a#10, cast(b#11 as bigint)#94L, 200)
+- *HashAggregate(keys=[a#10, cast(b#11 as bigint) AS cast(b#11 as bigint)#94L], functions=[partial_sum(cast(b#11 as bigint))]) // step1
+- LocalTableScan [a#10, b#11]
-
Step1: 【Partial模式】計算非distinct聚合的Partial結果
groupingExpressions:group列(a) + distinct使用的列(b)
aggregateExpressions: 非distinct的聚合(sum(b))
resultExpressions = group列(a) + distinct使用的列(b) + 非distinct的Partial聚合結果(sum(b))
-
Step2: 【PartialMerge】計算非distinct聚合的PartialMerge結果
groupingExpressions:group列(a) + distinct使用的列(b)
aggregateExpressions: 非distinct的聚合(sum(b))
resultExpressions = group列(a) + distinct使用的列(b) + 非distinct的Partial聚合結果(sum(b))
-
Step3: 【PartialMerge】計算帶有distinct聚合的PartialMerge結果
groupingExpressions:group列(a)
aggregateExpressions: 非distinct的聚合(sum(b)) + 帶有distinct的聚合(partial_sum(distinct cast(b#11 as bigint)#94L))
resultExpressions = group列(a) + 非distinct的merge聚合結果 + 帶有distinct的partial聚合結果
-
Step4: 【Final】計算非distinct聚合的PartialMerge結果
groupingExpressions:group列(a)
aggregateExpressions: 非distinct的聚合(Final模式) + 帶有distinct的聚合(Final模式)
resultExpressions = group列(a) + distinct使用的列(b) + 非distinct的Partial聚合結果(sum(b))
參考:
- 《SparkSQL內核剖析》【Aggregation篇】:https://blog.csdn.net/renq_654321/article/details/94925717