注意:
如需執(zhí)行如下代碼骏掀,請(qǐng)從官方github下載數(shù)據(jù)包 , 安裝所需spark環(huán)境
執(zhí)行如下創(chuàng)建Dataframe數(shù)據(jù)集代碼創(chuàng)建好所需的Dataframe才能用接下來(lái)的代碼對(duì)數(shù)據(jù)進(jìn)行操作窑邦。
為了美觀下面的例如.option() .load()為換行展示砾淌,真正輸入代碼時(shí)要在一行輸入。
所需創(chuàng)建的DataFrame數(shù)據(jù)集(第四行數(shù)據(jù)集路徑根據(jù)自己下載到本地的數(shù)據(jù)集地址進(jìn)行修改):
// Scala
val df = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/data/retail-data/all/*.csv")
.coalesce(5)
df.cache()
df.createOrReplaceTempView("dfTable")
7.3 Window Functions 窗口函數(shù)
在Spark里面每一行數(shù)據(jù)就是一個(gè)row,窗口函數(shù)是指定所需行(row)的數(shù)據(jù)組成一個(gè)數(shù)據(jù)集進(jìn)行計(jì)算,窗口可以進(jìn)行排名,分析,聚合操作司顿。
下面對(duì)數(shù)據(jù)集進(jìn)行處理,組成新的dfWithDate, 每行添加日期以便更好的直觀的展現(xiàn)操作兄纺。
// Scala
import org.apache.spark.sql.functions.{col, to_date}
val dfWithDate = df.withColumn("date", to_date(col("InvoiceDate"), "MM/d/yyyy H:mm"))
dfWithDate.createOrReplaceTempView("dfWithDate")
第一步
就是利用窗口函數(shù)創(chuàng)建一個(gè)窗口,partitionBy是根據(jù)客戶的id和購(gòu)買日期進(jìn)行分組,rowsBetween代表了這個(gè)窗口里面包含了哪幾行(實(shí)例中是從"前面所有行"->"當(dāng)前行")
// Scala
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col
val windowSpec = Window
.partitionBy("CustomerId", "date")
.orderBy(col("Quantity").desc)
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
第二步
下面根據(jù)這個(gè)窗口計(jì)算并返回所需列
- 求出每個(gè)用戶購(gòu)買最多的股票Stock數(shù)目
import org.apache.spark.sql.functions.max
val maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)
- 求出該顧客購(gòu)買不同股票Stock數(shù)量的排名大溜,dense rank表明按照排名往下排序不和數(shù)量綁定排名例如有1個(gè)第一名和2個(gè)第二名那么接下來(lái)的第三名則是從正常的第三名開始計(jì)數(shù),而rank 則是從第四名開始計(jì)數(shù)囤热。
import org.apache.spark.sql.functions.{dense_rank, rank}
val purchaseDenseRank = dense_rank().over(windowSpec)
val purchaseRank = rank().over(windowSpec)
第三步
這時(shí)候maxPurchaseQuantity, purchaseDenseRank, purchaseRank將會(huì)返回三個(gè)列猎提,下面我們將這三個(gè)列和指定的dataframe的列一同返回得出所需結(jié)論:
// Scala
import org.apache.spark.sql.functions.col
dfWithDate.where("CustomerId IS NOT NULL").orderBy("CustomerId")
.select(
col("CustomerId"),
col("date"),
col("Quantity"),
purchaseRank.alias("quantityRank"),
purchaseDenseRank.alias("quantityDenseRank"),
maxPurchaseQuantity.alias("maxPurchaseQuantity")).show()
結(jié)果如下:
7.4 Grouping Sets 分組集合
group-by可以使我們對(duì)組內(nèi)進(jìn)行聚合操作,但是有些時(shí)候我們需要跨組進(jìn)行操作旁蔼,這個(gè)時(shí)候我們就需要用到分組集合。
首先我們對(duì)上面已經(jīng)處理好的dfWithDate 這個(gè)Dataframe進(jìn)行去空.drop()操作:
// in Scala
val dfNoNull = dfWithDate.drop()
dfNoNull.createOrReplaceTempView("dfNoNull")
注意: Grouping Sets對(duì)空值有效疙教,所以需要去除空值保證結(jié)果不受影響棺聊。
下面通過(guò)SQL來(lái)得出所有的股票代碼和持有該股票的用戶以及每個(gè)用戶持有該股票的總數(shù)目:
-- SQL
SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
GROUP BY customerId, stockCode GROUPING SETS((customerId, stockCode))
ORDER BY CustomerId DESC, stockCode DESC
如果我們想求整個(gè)股票的數(shù)量而不是根據(jù)客戶和股票分組則通過(guò)下面這個(gè)SQL語(yǔ)句:
-- SQL
SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
GROUP BY customerId, stockCode GROUPING SETS((customerId, stockCode),())
ORDER BY CustomerId DESC, stockCode DESC
然而分組集合僅僅在SQL里面提供,spark則使用 rullup 和 cube.
Rollups
目前討論的都是顯式數(shù)據(jù)集贞谓,及提供確定的數(shù)據(jù)集給Spark進(jìn)行操作限佩,下面將根據(jù)日期+不同國(guó)家股票創(chuàng)建rollups求出每個(gè)日期下面不同國(guó)家股票的購(gòu)買總數(shù):
Rollup可以返回的結(jié)果:
- 單個(gè)日期不同國(guó)家股票購(gòu)買總數(shù)
- 單個(gè)日期所有國(guó)家股票購(gòu)買總數(shù)
- 所有日期所有國(guó)家股票購(gòu)買總數(shù)
val rolledUpDF = dfNoNull.rollup("Date", "Country").agg(sum("Quantity"))
.selectExpr("Date", "Country", "`sum(Quantity)` as total_quantity")
.orderBy("Date")
rolledUpDF.show()
結(jié)果如下:
可以看到上圖結(jié)果兩個(gè)都為空的那個(gè)row為股票總數(shù),可通過(guò)如下語(yǔ)句取出:
rolledUpDF.where("Country IS NULL").show()
rolledUpDF.where("Date IS NULL").show()
Cube
Cube相比Rollups更為深層次的返回結(jié)果:
- 可以返回所有不論國(guó)家和日期的股票總額
- 可以返回單個(gè)日期的所有國(guó)家的股票總額
- 可以返回單個(gè)國(guó)家的每個(gè)日期的股票總額
- 可以返回單個(gè)國(guó)家的不同日期的股票總額
和Rollup調(diào)用的方法差不多:
// Scala
dfNoNull.cube("Date", "Country").agg(sum(col("Quantity")))
.select("Date", "Country", "sum(Quantity)").orderBy("Date").show()
結(jié)果如圖:
Grouping Metadata
這個(gè)的意思就是希望顯示的分組級(jí)別及從最細(xì)的到最粗的分組裸弦,而不是像cube那樣一股腦的全部顯示:
- 級(jí)別0 可以返回所有股票總額
- 級(jí)別1 可以返回單個(gè)客戶所有購(gòu)買的股票總額
- 級(jí)別2 可以返回單個(gè)股票所有用戶購(gòu)買的總額
- 級(jí)別3 可以返回單個(gè)客戶單個(gè)股票編碼的股票總額(最高級(jí)別)
由grouping_id來(lái)控制
// Scala
import org.apache.spark.sql.functions.{grouping_id, sum, expr}
dfNoNull.cube("customerId", "stockCode").agg(grouping_id(), sum("Quantity"))
.orderBy(expr("grouping_id()").desc)
.show()
結(jié)果和上面的cube的結(jié)果差不多祟同,只不過(guò)根據(jù)分組級(jí)別進(jìn)行了分組.
Pivot
可以將行轉(zhuǎn)換為列進(jìn)行操作,如下示例我們可以將給定的國(guó)家按照日期來(lái)計(jì)算股票的數(shù)額:
// Scala
val pivoted = dfWithDate.groupBy("date").pivot("Country").sum()
由于數(shù)目龐大僅展示美國(guó)的日期大于2011-12-05的數(shù)額.
// Scala
pivoted.where("date > '2011-12-05'").select("date" ,"`USA_sum(Quantity)`").show()
7.5 User-Defined Aggregation Functions 用戶自定義聚合函數(shù)(UDAF)
注意:
目前UDAF只能在Spark2.3以上版本及Java和Scala上實(shí)現(xiàn).
可以根據(jù)業(yè)務(wù)邏輯自己制定分組聚合理疙,然而必須繼承基類:
// Scala
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
class BoolAnd extends UserDefinedAggregateFunction {
def inputSchema: org.apache.spark.sql.types.StructType =
StructType(StructField("value", BooleanType) :: Nil)
def bufferSchema: StructType = StructType(
StructField("result", BooleanType) :: Nil
)
def dataType: DataType = BooleanType
def deterministic: Boolean = true
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = true
}
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = buffer.getAs[Boolean](0) && input.getAs[Boolean](0)
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getAs[Boolean](0) && buffer2.getAs[Boolean](0)
}
def evaluate(buffer: Row): Any = {
buffer(0)
}
}
現(xiàn)在我們只需注冊(cè)進(jìn)udf中便可使用.
// Scala
val ba = new BoolAnd
spark.udf.register("booland", ba)
import org.apache.spark.sql.functions._
spark.range(1)
.selectExpr("explode(array(TRUE, TRUE, TRUE)) as t")
.selectExpr("explode(array(TRUE, FALSE, TRUE)) as f", "t")
.select(ba(col("t")), expr("booland(f)"))
.show()