Spark 1.4.x版本以后燃乍,為Spark SQL和DataFrame引入了開窗函數(shù)唆樊,比如最經(jīng)典,最常用的刻蟹,row_number()逗旁,可以讓我們實現(xiàn)分組取topn的邏輯。
案例:統(tǒng)計每個種類的銷售額排名前3的產(chǎn)品
輸入文件sales.txt舆瘪,內(nèi)容如下:
Thin?Cell Phone?6000
Normal?Tablet?1500
Mini?Tablet?5500
UltraThin?Cell Phone?5000
VeryThin?Cell Phone?6000
Big?Tablet?2500
Bedable?Cell Phone?3000
Foldable?Cell Phone?3500
Pro?Tablet?4500
Pro2?Tablet?6500
開始編寫我們的統(tǒng)計邏輯痢艺,使用row_number()開窗函數(shù),先說明一下介陶,row_number()開窗函數(shù)的作用:
其實,就是給每個分組的數(shù)據(jù)色建,按照其排序順序哺呜,打上一個分組內(nèi)的行號,比如說箕戳,有一個分組date=20201001某残,里面有3條數(shù)據(jù),1122陵吸,1121玻墅,1124,那么對這個分組的每一行使用row_number()開窗函數(shù)以后,三行壮虫,依次會獲得一個組內(nèi)的行號澳厢,行號從1開始遞增,比如1122 1囚似,1121 2剩拢,1124 3
代碼實現(xiàn):
package sql;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.hive.HiveContext;
public class RowNumberWindowFunction {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("RowNumberWindowFunction");
JavaSparkContext sc = new JavaSparkContext(conf);
HiveContext hiveContext = new HiveContext(sc.sc());
// 創(chuàng)建銷售額表,sales表
hiveContext.sql("DROP TABLE IF EXISTS sales");
hiveContext.sql("CREATE TABLE IF NOT EXISTS sales ("
+ "product STRING,"
+ "category STRING,"
+ "revenue BIGINT)");
hiveContext.sql("LOAD DATA "
+ "LOCAL INPATH '/usr/local/spark-study/resources/sales.txt' "
+ "INTO TABLE sales");
DataFrame top3SalesDF = hiveContext.sql(
"SELECT product,category,revenue from("
+ "SELECT "
+ "product,"
+ "category,"
+ "revenue,"
+ "row_number() OVER (PARTITION BY category ORDER BY revenue DESC) rank "
+ "FROM sales "
+ ") tmp_sales "
+ "WHERE rank<=3");
// 將每組排名前3的數(shù)據(jù)饶唤,保存到一個表中
hiveContext.sql("DROP TABLE IF EXISTS top3_sales");
top3SalesDF.saveAsTable("top3_sales");
sc.close();
}
}
在第29行的row_number()開窗函數(shù)的語法說明:
首先可以徐伐,在SELECT查詢時,使用row_number()函數(shù)募狂,其次办素,row_number()函數(shù)后面先跟上OVER關(guān)鍵字,然后括號中祸穷,是PARTITION BY性穿,也就是說根據(jù)哪個字段進(jìn)行分組,其次是可以用ORDER BY進(jìn)行組內(nèi)排序粱哼,然后row_number()就可以給每個組內(nèi)的行季二,一個組內(nèi)行號
代碼編寫完成,開始打jar包,發(fā)布到集群提交spark程序胯舷,編寫如下腳本:
提交腳本刻蚯,運行spark程序,運行完成桑嘶,進(jìn)入hive查看:
我們發(fā)現(xiàn)炊汹,原數(shù)據(jù)和統(tǒng)計后的數(shù)據(jù)都正常寫入hive,至此逃顶,銷售額排名前3的產(chǎn)品統(tǒng)計完成讨便。