《Spark: The Definitive Guide 》第7章:Aggregations 聚合 中文學(xué)習(xí)筆記

注意:
如需執(zhí)行如下代碼骏掀,請(qǐng)從官方github下載數(shù)據(jù)包 , 安裝所需spark環(huán)境
執(zhí)行如下創(chuàng)建Dataframe數(shù)據(jù)集代碼創(chuàng)建好所需的Dataframe才能用接下來(lái)的代碼對(duì)數(shù)據(jù)進(jìn)行操作窑邦。
為了美觀下面的例如.option() .load()為換行展示砾淌,真正輸入代碼時(shí)要在一行輸入。

所需創(chuàng)建的DataFrame數(shù)據(jù)集(第四行數(shù)據(jù)集路徑根據(jù)自己下載到本地的數(shù)據(jù)集地址進(jìn)行修改):

// Scala
val df = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/data/retail-data/all/*.csv")
.coalesce(5)
df.cache()
df.createOrReplaceTempView("dfTable")

7.3 Window Functions 窗口函數(shù)

在Spark里面每一行數(shù)據(jù)就是一個(gè)row,窗口函數(shù)是指定所需行(row)的數(shù)據(jù)組成一個(gè)數(shù)據(jù)集進(jìn)行計(jì)算,窗口可以進(jìn)行排名,分析,聚合操作司顿。


窗口示例

下面對(duì)數(shù)據(jù)集進(jìn)行處理,組成新的dfWithDate, 每行添加日期以便更好的直觀的展現(xiàn)操作兄纺。

// Scala
import org.apache.spark.sql.functions.{col, to_date}
val dfWithDate = df.withColumn("date", to_date(col("InvoiceDate"), "MM/d/yyyy H:mm"))
dfWithDate.createOrReplaceTempView("dfWithDate")
原始DF和添加日期DF比較

第一步

就是利用窗口函數(shù)創(chuàng)建一個(gè)窗口,partitionBy是根據(jù)客戶的id和購(gòu)買日期進(jìn)行分組,rowsBetween代表了這個(gè)窗口里面包含了哪幾行(實(shí)例中是從"前面所有行"->"當(dāng)前行")

// Scala
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col
val windowSpec = Window
.partitionBy("CustomerId", "date")
.orderBy(col("Quantity").desc)
.rowsBetween(Window.unboundedPreceding, Window.currentRow)

第二步

下面根據(jù)這個(gè)窗口計(jì)算并返回所需列

  • 求出每個(gè)用戶購(gòu)買最多的股票Stock數(shù)目
import org.apache.spark.sql.functions.max
val maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)
  • 求出該顧客購(gòu)買不同股票Stock數(shù)量的排名大溜,dense rank表明按照排名往下排序不和數(shù)量綁定排名例如有1個(gè)第一名和2個(gè)第二名那么接下來(lái)的第三名則是從正常的第三名開始計(jì)數(shù),而rank 則是從第四名開始計(jì)數(shù)囤热。
import org.apache.spark.sql.functions.{dense_rank, rank}
val purchaseDenseRank = dense_rank().over(windowSpec)
val purchaseRank = rank().over(windowSpec)

第三步

這時(shí)候maxPurchaseQuantity, purchaseDenseRank, purchaseRank將會(huì)返回三個(gè)列猎提,下面我們將這三個(gè)列和指定的dataframe的列一同返回得出所需結(jié)論:

// Scala
import org.apache.spark.sql.functions.col

dfWithDate.where("CustomerId IS NOT NULL").orderBy("CustomerId")
  .select(
    col("CustomerId"),
    col("date"),
    col("Quantity"),
    purchaseRank.alias("quantityRank"),
    purchaseDenseRank.alias("quantityDenseRank"),
    maxPurchaseQuantity.alias("maxPurchaseQuantity")).show()

結(jié)果如下:


image.png

7.4 Grouping Sets 分組集合

group-by可以使我們對(duì)組內(nèi)進(jìn)行聚合操作,但是有些時(shí)候我們需要跨組進(jìn)行操作旁蔼,這個(gè)時(shí)候我們就需要用到分組集合。

首先我們對(duì)上面已經(jīng)處理好的dfWithDate 這個(gè)Dataframe進(jìn)行去空.drop()操作:

// in Scala
val dfNoNull = dfWithDate.drop()
dfNoNull.createOrReplaceTempView("dfNoNull")

注意: Grouping Sets對(duì)空值有效疙教,所以需要去除空值保證結(jié)果不受影響棺聊。

下面通過(guò)SQL來(lái)得出所有的股票代碼和持有該股票的用戶以及每個(gè)用戶持有該股票的總數(shù)目:

-- SQL
SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
GROUP BY customerId, stockCode GROUPING SETS((customerId, stockCode))
ORDER BY CustomerId DESC, stockCode DESC

如果我們想求整個(gè)股票的數(shù)量而不是根據(jù)客戶和股票分組則通過(guò)下面這個(gè)SQL語(yǔ)句:

-- SQL
SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
GROUP BY customerId, stockCode GROUPING SETS((customerId, stockCode),())
ORDER BY CustomerId DESC, stockCode DESC

然而分組集合僅僅在SQL里面提供,spark則使用 rullup 和 cube.

Rollups

目前討論的都是顯式數(shù)據(jù)集贞谓,及提供確定的數(shù)據(jù)集給Spark進(jìn)行操作限佩,下面將根據(jù)日期+不同國(guó)家股票創(chuàng)建rollups求出每個(gè)日期下面不同國(guó)家股票的購(gòu)買總數(shù):
Rollup可以返回的結(jié)果:

  • 單個(gè)日期不同國(guó)家股票購(gòu)買總數(shù)
  • 單個(gè)日期所有國(guó)家股票購(gòu)買總數(shù)
  • 所有日期所有國(guó)家股票購(gòu)買總數(shù)
val rolledUpDF = dfNoNull.rollup("Date", "Country").agg(sum("Quantity"))
  .selectExpr("Date", "Country", "`sum(Quantity)` as total_quantity")
  .orderBy("Date")
rolledUpDF.show()

結(jié)果如下:


image.png

可以看到上圖結(jié)果兩個(gè)都為空的那個(gè)row為股票總數(shù),可通過(guò)如下語(yǔ)句取出:

rolledUpDF.where("Country IS NULL").show()
rolledUpDF.where("Date IS NULL").show()

Cube

Cube相比Rollups更為深層次的返回結(jié)果:

  • 可以返回所有不論國(guó)家和日期的股票總額
  • 可以返回單個(gè)日期的所有國(guó)家的股票總額
  • 可以返回單個(gè)國(guó)家的每個(gè)日期的股票總額
  • 可以返回單個(gè)國(guó)家的不同日期的股票總額

和Rollup調(diào)用的方法差不多:

// Scala
dfNoNull.cube("Date", "Country").agg(sum(col("Quantity")))
  .select("Date", "Country", "sum(Quantity)").orderBy("Date").show()

結(jié)果如圖:

image.png

image.png

Grouping Metadata

這個(gè)的意思就是希望顯示的分組級(jí)別及從最細(xì)的到最粗的分組裸弦,而不是像cube那樣一股腦的全部顯示:

  • 級(jí)別0 可以返回所有股票總額
  • 級(jí)別1 可以返回單個(gè)客戶所有購(gòu)買的股票總額
  • 級(jí)別2 可以返回單個(gè)股票所有用戶購(gòu)買的總額
  • 級(jí)別3 可以返回單個(gè)客戶單個(gè)股票編碼的股票總額(最高級(jí)別)

由grouping_id來(lái)控制

// Scala
import org.apache.spark.sql.functions.{grouping_id, sum, expr}

dfNoNull.cube("customerId", "stockCode").agg(grouping_id(), sum("Quantity"))
.orderBy(expr("grouping_id()").desc)
.show()

結(jié)果和上面的cube的結(jié)果差不多祟同,只不過(guò)根據(jù)分組級(jí)別進(jìn)行了分組.

Pivot

可以將行轉(zhuǎn)換為列進(jìn)行操作,如下示例我們可以將給定的國(guó)家按照日期來(lái)計(jì)算股票的數(shù)額:

// Scala
val pivoted = dfWithDate.groupBy("date").pivot("Country").sum()

由于數(shù)目龐大僅展示美國(guó)的日期大于2011-12-05的數(shù)額.

// Scala
pivoted.where("date > '2011-12-05'").select("date" ,"`USA_sum(Quantity)`").show()
image.png

7.5 User-Defined Aggregation Functions 用戶自定義聚合函數(shù)(UDAF)

注意:
目前UDAF只能在Spark2.3以上版本及Java和Scala上實(shí)現(xiàn).

可以根據(jù)業(yè)務(wù)邏輯自己制定分組聚合理疙,然而必須繼承基類:

// Scala
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
class BoolAnd extends UserDefinedAggregateFunction {
  def inputSchema: org.apache.spark.sql.types.StructType =
    StructType(StructField("value", BooleanType) :: Nil)
  def bufferSchema: StructType = StructType(
    StructField("result", BooleanType) :: Nil
  )
  def dataType: DataType = BooleanType
  def deterministic: Boolean = true
  def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = true
  }
  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0) = buffer.getAs[Boolean](0) && input.getAs[Boolean](0)
  }
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getAs[Boolean](0) && buffer2.getAs[Boolean](0)
  }
  def evaluate(buffer: Row): Any = {
    buffer(0)
  }
}

現(xiàn)在我們只需注冊(cè)進(jìn)udf中便可使用.

// Scala
val ba = new BoolAnd
spark.udf.register("booland", ba)
import org.apache.spark.sql.functions._
spark.range(1)
  .selectExpr("explode(array(TRUE, TRUE, TRUE)) as t")
  .selectExpr("explode(array(TRUE, FALSE, TRUE)) as f", "t")
  .select(ba(col("t")), expr("booland(f)"))
  .show()
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末晕城,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子窖贤,更是在濱河造成了極大的恐慌砖顷,老刑警劉巖贰锁,帶你破解...
    沈念sama閱讀 216,744評(píng)論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異滤蝠,居然都是意外死亡豌熄,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,505評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門物咳,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)锣险,“玉大人,你說(shuō)我怎么就攤上這事览闰⌒痉簦” “怎么了?”我有些...
    開封第一講書人閱讀 163,105評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵焕济,是天一觀的道長(zhǎng)纷妆。 經(jīng)常有香客問(wèn)我,道長(zhǎng)晴弃,這世上最難降的妖魔是什么掩幢? 我笑而不...
    開封第一講書人閱讀 58,242評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮上鞠,結(jié)果婚禮上际邻,老公的妹妹穿的比我還像新娘。我一直安慰自己芍阎,他們只是感情好世曾,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,269評(píng)論 6 389
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著谴咸,像睡著了一般轮听。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上岭佳,一...
    開封第一講書人閱讀 51,215評(píng)論 1 299
  • 那天血巍,我揣著相機(jī)與錄音,去河邊找鬼珊随。 笑死述寡,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的叶洞。 我是一名探鬼主播鲫凶,決...
    沈念sama閱讀 40,096評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼衩辟!你這毒婦竟也來(lái)了螟炫?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,939評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤惭婿,失蹤者是張志新(化名)和其女友劉穎不恭,沒(méi)想到半個(gè)月后叶雹,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,354評(píng)論 1 311
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡换吧,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,573評(píng)論 2 333
  • 正文 我和宋清朗相戀三年折晦,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片沾瓦。...
    茶點(diǎn)故事閱讀 39,745評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡满着,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出贯莺,到底是詐尸還是另有隱情风喇,我是刑警寧澤,帶...
    沈念sama閱讀 35,448評(píng)論 5 344
  • 正文 年R本政府宣布缕探,位于F島的核電站魂莫,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏爹耗。R本人自食惡果不足惜耙考,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,048評(píng)論 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望潭兽。 院中可真熱鬧倦始,春花似錦、人聲如沸山卦。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,683評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)账蓉。三九已至枚碗,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間铸本,已是汗流浹背视译。 一陣腳步聲響...
    開封第一講書人閱讀 32,838評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留归敬,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,776評(píng)論 2 369
  • 正文 我出身青樓鄙早,卻偏偏與公主長(zhǎng)得像汪茧,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子限番,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,652評(píng)論 2 354