關(guān)注小編的公眾號(hào)飘诗,后臺(tái)回復(fù)“進(jìn)群”教藻,一起來(lái)交流學(xué)習(xí)吧墓捻!
前兩篇中咱們分別介紹了使用Excel致板、Python和Hive SQL計(jì)算統(tǒng)計(jì)值交煞,這次咱們使用Spark SQL來(lái)計(jì)算統(tǒng)計(jì)值。
先來(lái)回顧一下數(shù)據(jù)和對(duì)應(yīng)的統(tǒng)計(jì)結(jié)果:
本文使用的是iris分類(lèi)數(shù)據(jù)集可岂,數(shù)據(jù)下載地址為:
http://archive.ics.uci.edu/ml/datasets/Iris
下載后轉(zhuǎn)換為xlsx格式的文件错敢,數(shù)據(jù)如下:
對(duì)應(yīng)的統(tǒng)計(jì)結(jié)果如下:
在介紹之前,我還是想先說(shuō)明一點(diǎn)缕粹,這一篇只是想先帶大家體驗(yàn)一把Spark SQL稚茅,相關(guān)更多關(guān)于原理相關(guān)的知識(shí),咱們會(huì)在后面的文章中詳細(xì)介紹平斩。
1亚享、數(shù)據(jù)導(dǎo)入
這里咱們通過(guò)讀取Excel的方式讀取出相應(yīng)的數(shù)據(jù),并得到一個(gè)DataFrame:
def createDFByCSV(spark:SparkSession) = {
val df = spark.sqlContext.read.format("com.databricks.spark.csv")
.option("header","true") //這里如果在csv第一行有屬性的話绘面,沒(méi)有就是"false"
.option("inferSchema",true.toString)//這是自動(dòng)推斷屬性列的數(shù)據(jù)類(lèi)型欺税。
.load("resources/iris.csv")
df.show()
}
結(jié)果如下:
2、使用Spark SQL計(jì)算統(tǒng)計(jì)值
2.1 最大值揭璃、最小值
使用Spark SQL統(tǒng)計(jì)最大值或者最小值晚凿,首先使用agg函數(shù)對(duì)數(shù)據(jù)進(jìn)行聚合,這個(gè)函數(shù)一般配合group by使用瘦馍,不使用group by的話就相當(dāng)于對(duì)所有的數(shù)據(jù)進(jìn)行聚合歼秽。
隨后,直接使用max和min函數(shù)就可以情组,想要輸出多個(gè)結(jié)果的話燥筷,中間用逗號(hào)分開(kāi),而使用as給聚合后的結(jié)果賦予一個(gè)列名院崇,相當(dāng)于sql中的as:
import spark.implicits._
df.agg(max($"feature1") as "max_feature1",
min($"feature2") as "min_feature2")
.show()
結(jié)果輸出如下:
上面的$代表一列的意思肆氓,相當(dāng)于col函數(shù):
df.agg(max(col("feature1")) as "max_feature1",
min(col("feature2")) as "min_feature2")
.show()
1.2 平均值
平均值的計(jì)算使用mean函數(shù):
df.agg(mean($"feature1") as "mean_feature1",
mean($"feature2") as "mean_feature2").show()
輸出為:
1.3 樣本標(biāo)準(zhǔn)差&總體標(biāo)準(zhǔn)差
樣本標(biāo)準(zhǔn)差的計(jì)算有兩個(gè)函數(shù)可以使用,分別是stddev函數(shù)和stddev_samp函數(shù)底瓣,而總體標(biāo)準(zhǔn)差使用stddev_pop方法谢揪。需要注意的一點(diǎn)是,這里和hive sql是有區(qū)別的,在hive sql中键耕,stddev函數(shù)代表的是總體標(biāo)準(zhǔn)差寺滚,而在spark sql中柑营,stddev函數(shù)代表的是樣本標(biāo)準(zhǔn)差屈雄,可以查看一下源代碼:
通過(guò)代碼驗(yàn)證一下:
df.agg(stddev($"feature1") as "stddev_feature1",
stddev_pop($"feature1") as "stddev_pop_feature1",
stddev_samp($"feature1") as "stddev_samp_feature1").show()
輸出結(jié)果為:
1.4 中位數(shù)
SparkSQL中也沒(méi)有直接計(jì)算中位數(shù)的方法,所以我們還是借鑒上一篇中的思路官套,再來(lái)回顧一下:
計(jì)算中位數(shù)也好酒奶,計(jì)算四分位數(shù)也好,無(wú)非就是要取得兩個(gè)位置嘛奶赔,假設(shè)我們的數(shù)據(jù)從小到大排惋嚎,按照1、2站刑、3另伍、.. 、n進(jìn)行編號(hào)绞旅,當(dāng)數(shù)量n為奇數(shù)時(shí)摆尝,取編號(hào)(n + 1)/2位置的數(shù)即可,當(dāng)n為偶數(shù)時(shí)因悲,取(int)(n + 1)/2位置和(int)(n + 1)/2 + 1位置的數(shù)取平均即可堕汞。但二者其實(shí)可以統(tǒng)一到一個(gè)公式中:
1)假設(shè)n = 149 ,(n+1)/2 = 75 晃琳,小數(shù)部分為0讯检,那么中位數(shù)=75位置的數(shù) * (1 - 0)+ 76位置的數(shù) * (0 - 0)
2)假設(shè)n = 150,(n+1)/2 = 75卫旱,小數(shù)部分為0.5,那么中位數(shù)=75位置的數(shù) * (1 - 0.5)+ 76位置的數(shù) * (0.5 - 0)
所以人灼,可以把這個(gè)過(guò)程分解為三個(gè)步驟,第一步是給數(shù)字進(jìn)行一個(gè)編號(hào)顾翼,spark中同樣使用row_number()函數(shù)(該函數(shù)的具體用法后續(xù)再展開(kāi)投放,這里只提供一個(gè)簡(jiǎn)單的例子),第二步是計(jì)算(n+1)/2的整數(shù)部分和小數(shù)部分暴构,第三步就是根據(jù)公式計(jì)算中位數(shù)跪呈。
首先使用row_number()給數(shù)據(jù)進(jìn)行編號(hào):
val windowFun = Window.orderBy(col("feature3").asc)
df.withColumn("rank",row_number().over(windowFun)).show(false)
輸出如下:
接下來(lái)是確定中位數(shù)的位置,這里我們分別拿到(n + 1)/2的整數(shù)部分和小數(shù)部分:
val median_index = df.agg(
((count($"feature3") + 1) / 2).cast("int") as "rank",
((count($"feature3") + 1) / 2 % 1) as "float_part"
)
median_index.show()
輸出如下:
這里小數(shù)部分不為0取逾,意味著我們不僅要拿到rank=75的數(shù)耗绿,還要拿到rank=76的數(shù),我們最好把其放到一行上砾隅,這里使用同樣lead函數(shù)误阻,lead函數(shù)的作用就是拿到分組排序后,下一個(gè)位置或下n個(gè)位置的數(shù),咱們?cè)诤竺娴牟┛椭羞€會(huì)細(xì)講究反,這里也只是拋磚引玉:
val windowFun = Window.orderBy(col("feature3").asc)
df.withColumn("next_feature3",lead(col("feature3"),1).over(windowFun)).show(false)
輸出如下:
接下來(lái)寻定,join兩個(gè)表,按公式計(jì)算中位數(shù)就可以啦精耐,完整的代碼如下:
val median_index = df.agg(
((count($"feature3") + 1) / 2).cast("int") as "rank",
((count($"feature3") + 1) / 2 % 1) as "float_part"
)
val windowFun = Window.orderBy(col("feature3").asc)
df.withColumn("rank",row_number().over(windowFun))
.withColumn("next_feature3",lead(col("feature3"),1).over(windowFun))
.join(median_index,Seq("rank"),"inner")
.withColumn("median" ,($"float_part" - lit(0)) * $"next_feature3" + (lit(1) - $"float_part") * $"feature3")
.show()
輸出如下:
1.5 四分位數(shù)
先來(lái)復(fù)習(xí)下四分位數(shù)的兩種解法狼速,n+1方法和n-1方法:
對(duì)于n+1方法,如果數(shù)據(jù)量為n卦停,則四分位數(shù)的位置為:
Q1的位置= (n+1) × 0.25
Q2的位置= (n+1) × 0.5
Q3的位置= (n+1) × 0.75
對(duì)于n-1方法向胡,如果數(shù)據(jù)量為n,則四分位數(shù)的位置為:
Q1的位置=1+(n-1)x 0.25
Q2的位置=1+(n-1)x 0.5
Q3的位置=1+(n-1)x 0.75
這里的思路和求解中位數(shù)是一樣的惊完,我們分別實(shí)現(xiàn)一下兩種方法僵芹,首先是n+1方法:
val q1_index = df.agg(
((count($"feature3") + 1) * 0.25).cast("int") as "rank",
((count($"feature3") + 1) * 0.25 % 1) as "float_part"
)
val windowFun = Window.orderBy(col("feature3").asc)
df.withColumn("rank",row_number().over(windowFun))
.withColumn("next_feature3",lead(col("feature3"),1).over(windowFun))
.join(q1_index,Seq("rank"),"inner")
.withColumn("q1" ,($"float_part" - lit(0)) * $"next_feature3" + (lit(1) - $"float_part") * $"feature3")
.show()
輸出為:
接下來(lái)是n-1方法:
val q1_index = df.agg(
((count($"feature3") - 1) * 0.25).cast("int") + 1 as "rank",
((count($"feature3") - 1) * 0.25 % 1) as "float_part"
)
val windowFun = Window.orderBy(col("feature3").asc)
df.withColumn("rank",row_number().over(windowFun))
.withColumn("next_feature3",lead(col("feature3"),1).over(windowFun))
.join(q1_index,Seq("rank"),"inner")
.withColumn("q1" ,($"float_part" - lit(0)) * $"next_feature3" + (lit(1) - $"float_part") * $"feature3")
.show()
輸出為:
3、踩坑總結(jié)
在計(jì)算中位數(shù)或者四分位數(shù)時(shí)小槐,我一開(kāi)始的寫(xiě)法如下:
很奇怪的一點(diǎn)是拇派,$"float_part" - 0沒(méi)有報(bào)錯(cuò),1 - $"float_part"卻報(bào)錯(cuò)了凿跳,報(bào)的錯(cuò)誤是:
看這里大家應(yīng)該明白了件豌,$"float_part" - 0中,減號(hào)左右兩邊的數(shù)據(jù)都應(yīng)該是列名拄显,與$"float_part" 類(lèi)型相同苟径,但是1 - $"float_part"兩邊都應(yīng)該是個(gè)數(shù)字,與1的類(lèi)型相同躬审,所以后面一個(gè)報(bào)錯(cuò)了棘街。
因此修改的方法是:
使用lit方法創(chuàng)建了一個(gè)全為0或者全為1的列,使得減號(hào)左右兩邊類(lèi)型匹配承边。