昨天看到了大神 文哥的學(xué)習(xí)日記 的最新一章博客防楷,分享的是hive相關(guān)的知識。正好最近自己也在復(fù)盤hive则涯,所以特地學(xué)習(xí)并實踐了一下博客的內(nèi)容域帐。
文哥分享的是關(guān)于hive sql的四道面試題赘被,其實主要是圍繞hive sql的兩個函數(shù):posexplode 和 lag/lead,
說實話肖揣,我之前并不知道這兩個函數(shù)民假,更別說在實際工作中應(yīng)用了,相關(guān)的也就是用過explode函數(shù)龙优;不過在看了這兩個函數(shù)的使用場景后羊异,發(fā)現(xiàn)還是很值得一學(xué)的。
1.posexplode
在說明posexplode之前彤断,先了解下 Lateral View的用法:
Lateral View與用戶自定義生成函數(shù)即UDTF(如explode()或者split()等)結(jié)合使用野舶。
(UDTF:為每一個輸入行生成0個或者多個輸出行)
Lateral View將UDTF應(yīng)用于基礎(chǔ)表的每一行,然后將輸出行連接到輸入行宰衙,以形成具有所提供的表別名的虛擬表平道。
基本用法:
lateral view:LATERAL VIEW udtf(expression) tableAlias AS columnAlias
案例數(shù)據(jù)準(zhǔn)備:
val spark = SparkSession.builder()
.master("local[2]")
.appName("hive context")
.getOrCreate()
val hiveContext = new HiveContext(spark.sparkContext)
//指定schema
val schema = types.StructType(Seq(
StructField("id", StringType, true),
StructField("time", StringType, true)
))
//創(chuàng)建案例數(shù)據(jù)
val dataRdd = spark.sparkContext.parallelize(Array("a,b,c,d;2:00,3:00,4:00,5:00", "f,b,c,d;1:10,2:20,3:30,4:40")).map(line => line.split(";"))
val rowRdd = dataRdd.map(p => Row(p(0).trim, p(1).trim))
//創(chuàng)建DataFrame
val data = hiveContext.createDataFrame(rowRdd, schema)
data.registerTempTable("tempTable")
data.show(false)
結(jié)果顯示:
+-------+-------------------+
|id |time |
+-------+-------------------+
|a,b,c,d|2:00,3:00,4:00,5:00|
|f,b,c,d|1:10,2:20,3:30,4:40|
+-------+-------------------+
最后要展示的數(shù)據(jù)樣式為:
1.1 演示下explode用法
//不使用 Lateral View
val sql1 =
s"""
|SELECT
| id,
| time,
| explode(split(time,',')) as single_time
|FROM tempTable
""".stripMargin
hiveContext.sql(sql1).show(false)
//2.使用explode和lateral view,效果與1一樣
val sql2 =
s"""
|SELECT
| id,
| time,
| single_time
|FROM tempTable
|lateral view explode(split(time,',')) as single_time
""".stripMargin
hiveContext.sql(sql2).show(false)
結(jié)果顯示:
+-------+-------------------+-----------+
|id |time |single_time|
+-------+-------------------+-----------+
|a,b,c,d|2:00,3:00,4:00,5:00|2:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|3:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|4:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|5:00 |
|f,b,c,d|1:10,2:20,3:30,4:40|1:10 |
|f,b,c,d|1:10,2:20,3:30,4:40|2:20 |
|f,b,c,d|1:10,2:20,3:30,4:40|3:30 |
|f,b,c,d|1:10,2:20,3:30,4:40|4:40 |
+-------+-------------------+-----------+
將id列也explode之后,結(jié)果顯示為:
+-------+-------------------+---------+-----------+
|id |time |single_id|single_time|
+-------+-------------------+---------+-----------+
|a,b,c,d|2:00,3:00,4:00,5:00|a |2:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|a |3:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|a |4:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|a |5:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|b |2:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|b |3:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|b |4:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|b |5:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|c |2:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|c |3:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|c |4:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|c |5:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|d |2:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|d |3:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|d |4:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|d |5:00 |
|f,b,c,d|1:10,2:20,3:30,4:40|f |1:10 |
|f,b,c,d|1:10,2:20,3:30,4:40|f |2:20 |
|f,b,c,d|1:10,2:20,3:30,4:40|f |3:30 |
|f,b,c,d|1:10,2:20,3:30,4:40|f |4:40 |
+-------+-------------------+---------+-----------+
1.2 使用posexplode函數(shù)
posexplode相比在explode之上供炼,將一列數(shù)據(jù)轉(zhuǎn)為多行之后一屋,還會輸出數(shù)據(jù)的下標(biāo)。
示例:
val sql4 =
s"""
|SELECT
| id,
| time,
| single_id,
| single_id_index
|FROM tempTable
|lateral view posexplode(split(id,',')) t as single_id_index,single_id
""".stripMargin
hiveContext.sql(sql4).show(false)
會發(fā)現(xiàn)多了1列 single_id_index
結(jié)果顯示:
+-------+-------------------+---------+---------------+
|id |time |single_id|single_id_index|
+-------+-------------------+---------+---------------+
|a,b,c,d|2:00,3:00,4:00,5:00|a |0 |
|a,b,c,d|2:00,3:00,4:00,5:00|b |1 |
|a,b,c,d|2:00,3:00,4:00,5:00|c |2 |
|a,b,c,d|2:00,3:00,4:00,5:00|d |3 |
|f,b,c,d|1:10,2:20,3:30,4:40|f |0 |
|f,b,c,d|1:10,2:20,3:30,4:40|b |1 |
|f,b,c,d|1:10,2:20,3:30,4:40|c |2 |
|f,b,c,d|1:10,2:20,3:30,4:40|d |3 |
+-------+-------------------+---------+---------------+
1.3 在此基礎(chǔ)上袋哼,可以實現(xiàn)最終效果冀墨,只要選取id的下標(biāo)與time的下標(biāo)一致的記錄
val sql5 =
s"""
|SELECT
| single_id,
| single_time
|FROM tempTable
| lateral view posexplode(split(id,',')) as single_id_index,single_id
| lateral view posexplode(split(time,',')) as single_time_index,single_time
|WHERE
| single_id_index=single_time_index
""".stripMargin
hiveContext.sql(sql5).show(false)
結(jié)果顯示:
+---------+-----------+
|single_id|single_time|
+---------+-----------+
|a |2:00 |
|b |3:00 |
|c |4:00 |
|d |5:00 |
|f |1:10 |
|b |2:20 |
|c |3:30 |
|d |4:40 |
+---------+-----------+
2.posexplode的應(yīng)用2
應(yīng)用場景:
對于記錄1:00001,輸出1對應(yīng)的下標(biāo)5
對于記錄2:0101涛贯,輸出1對應(yīng)的下標(biāo)2诽嘉,4
完整代碼顯示:
object PosexplodeDemo2 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[2]")
.appName("hive context")
.getOrCreate()
val hiveContext = new HiveContext(spark.sparkContext)
//指定schema
val schema = types.StructType(Seq(
StructField("id", IntegerType, true),
StructField("value", StringType, true)
))
//創(chuàng)建案例數(shù)據(jù)
val dataRdd = spark.sparkContext.parallelize(Array("1,1011", "2,0101","3,1111","4,00001")).map(line => line.split(","))
val rowRdd = dataRdd.map(p => Row(p(0).toInt, p(1).trim))
//創(chuàng)建DataFrame
val data = hiveContext.createDataFrame(rowRdd, schema)
data.registerTempTable("tempTable")
data.show(false)
/**
* +---+-----+------------+------------------+
* |id |value|single_value|single_value_index|
* +---+-----+------------+------------------+
* |1 |1011 |1 |0 |
* |1 |1011 |1 |2 |
* |1 |1011 |1 |3 |
* |2 |0101 |1 |1 |
* |2 |0101 |1 |3 |
* |3 |1111 |1 |0 |
* |3 |1111 |1 |1 |
* |3 |1111 |1 |2 |
* |3 |1111 |1 |3 |
* |4 |00001|1 |4 |
* +---+-----+------------+------------------+
*/
val sql=
s"""
|SELECT
| id,
| value,
| single_value,
| single_value_index
|FROM tempTable
| lateral view posexplode(split(value,'')) as single_value_index,single_value
|WHERE single_value='1'
""".stripMargin
hiveContext.sql(sql).show(false)
/**
* +---+-----+-------+
* |id |value|indices|
* +---+-----+-------+
* |4 |00001|5 |
* |2 |0101 |2,4 |
* |1 |1011 |1,3,4 |
* |3 |1111 |1,2,3,4|
* +---+-----+-------+
*/
val sql1=
s"""
|SELECT
| id,
| value,
| concat_ws(',',collect_list(single_value_index)) as indices
|FROM
|(
| SELECT
| id,
| value,
| single_value,
| cast(single_value_index+1 as string) as single_value_index
| FROM tempTable
| lateral view posexplode(split(value,'')) as single_value_index,single_value
| WHERE single_value='1'
|)
|GROUP BY id,value
""".stripMargin
hiveContext.sql(sql1).show(false)
}
}
3.lag和lead函數(shù)
lag和lead是在實現(xiàn)分組排序的基礎(chǔ)上,能夠獲取到排序在當(dāng)前記錄前幾位或后幾位的記錄的某個字段值弟翘。
基礎(chǔ)語法:
lag(字段名,N) over(partition by 分組字段 order by 排序字段 排序方式)
lead(字段名,N) over(partition by 分組字段 order by 排序字段 排序方式)
lag括號里的參數(shù):字段名和數(shù)量N 含義是獲取分組排序后比該條記錄序號小N的對應(yīng)記錄的指定字段的值
如果字段名為ts虫腋,N為1,就是取分組排序之后上一條記錄的ts值
lead括號里的參數(shù):字段名和數(shù)量N 含義是獲取分組排序后比該條記錄序號大N的對應(yīng)記錄的指定字段的值
如果字段名為ts稀余,N為1悦冀,就是取分組排序之后下一條記錄的ts值
如果沒有前一行或者后一行,對應(yīng)的字段值為null
應(yīng)用場景:統(tǒng)計截至目前季度的平均值(按照time排序滚躯,并計算平均值)
* +----+----+-----+
* |id |time|score|
* +----+----+-----+
* |2014|A |3 |
* |2014|C |1 |
* |2014|B |2 |
* |2015|A |4 |
* |2015|C |3 |
* +----+----+-----+
完整代碼顯示:
/**
* +----+----+-----+---------+
* |id |time|score|pre_score|
* +----+----+-----+---------+
* |2014|A |3 |null |
* |2014|B |2 |3 |
* |2014|C |1 |2 |
* |2015|A |4 |null |
* |2015|C |3 |4 |
* +----+----+-----+---------+
*/
val sql=
s"""
|SELECT
| id,
| time,
| score,
| lag(score,1) over(partition by id order by time asc) as pre_score
|FROM tempTable
""".stripMargin
hiveContext.sql(sql).show(false)
/**
* +----+----+---------+
* |id |time|avg_score|
* +----+----+---------+
* |2014|A |3.0 |
* |2014|B |2.5 |
* |2014|C |1.5 |
* |2015|A |4.0 |
* |2015|C |3.5 |
* +----+----+---------+
*/
val sql1=
s"""
|SELECT
| id,
| time,
| CASE WHEN pre_score IS NULL THEN score
| ELSE (score+pre_score)/2
| END AS avg_score
|FROM
|(
| SELECT
| id,
| time,
| score,
| lag(score,1) over(partition by id order by time asc) as pre_score
| FROM tempTable
|) tmp
""".stripMargin
hiveContext.sql(sql1).show(false)
4.posexplode和lag函數(shù)的結(jié)合(實現(xiàn)分塊排序)
完整代碼顯示:
object Posexplode_LagDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[2]")
.appName("hive context")
.getOrCreate()
val hiveContext = new HiveContext(spark.sparkContext)
//指定schema
val schema = types.StructType(Seq(
StructField("id", IntegerType, true),
StructField("value", IntegerType, true)
))
//創(chuàng)建案例數(shù)據(jù)
val dataRdd = spark.sparkContext.parallelize(Array("2014,1", "2015,1","2017,0","2018,0","2019,1","2020,1","2021,1","2022,0","2023,0")).map(line => line.split(","))
val rowRdd = dataRdd.map(p => Row(p(0).toInt, p(1).toInt))
//創(chuàng)建DataFrame
val data = hiveContext.createDataFrame(rowRdd, schema)
data.registerTempTable("tempTable")
/**
* +----+-----+
* |id |value|
* +----+-----+
* |2014|1 |
* |2015|1 |
* |2017|0 |
* |2018|0 |
* |2019|1 |
* |2020|1 |
* |2021|1 |
* |2022|0 |
* |2023|0 |
* +----+-----+
*/
data.show(false)
/**
* +----+-----+---------+
* |id |value|pre_value|
* +----+-----+---------+
* |2014|1 |null |
* |2015|1 |1 |
* |2017|0 |1 |
* |2018|0 |0 |
* |2019|1 |0 |
* |2020|1 |1 |
* |2021|1 |1 |
* |2022|0 |1 |
* |2023|0 |0 |
* +----+-----+---------+
* ------>
* +------------+-----+
* |min_block_id|value|
* +------------+-----+
* |2014 |1 |
* |2017 |0 |
* |2019 |1 |
* |2022 |0 |
* +------------+-----+
* ------>
* 基礎(chǔ)表與上面的表join
* +----+-----+------------+----+
* |id |value|min_block_id|rank|
* +----+-----+------------+----+
* |2014|1 |2014 |1 |
* |2015|1 |2014 |1 |
* |2017|0 |2017 |1 |
* |2018|0 |2017 |1 |
* |2019|1 |2019 |1 |
* |2019|1 |2014 |2 |
* |2020|1 |2019 |1 |
* |2020|1 |2014 |2 |
* |2021|1 |2019 |1 |
* |2021|1 |2014 |2 |
* |2022|0 |2022 |1 |
* |2022|0 |2017 |2 |
* |2023|0 |2022 |1 |
* |2023|0 |2017 |2 |
* +----+-----+------------+----+
*
* ------>
* 限制rank=1
* +----+-----+------------+----+
* |id |value|min_block_id|rank|
* +----+-----+------------+----+
* |2014|1 |2014 |1 |
* |2015|1 |2014 |1 |
* |2017|0 |2017 |1 |
* |2018|0 |2017 |1 |
* |2019|1 |2019 |1 |
* |2020|1 |2019 |1 |
* |2021|1 |2019 |1 |
* |2022|0 |2022 |1 |
* |2023|0 |2022 |1 |
* +----+-----+------------+----+
*
* ------>
* 最終結(jié)果
* +----+-----+--------+
* |id |value|new_rank|
* +----+-----+--------+
* |2014|1 |1 |
* |2015|1 |2 |
* |2017|0 |1 |
* |2018|0 |2 |
* |2019|1 |1 |
* |2020|1 |2 |
* |2021|1 |3 |
* |2022|0 |1 |
* |2023|0 |2 |
* +----+-----+--------+
*
*/
val sql=
s"""
|SELECT id,
| value,
| row_number() over(partition by min_block_id order by id asc) as new_rank
|FROM
|(
| SELECT
| t.id,
| t.value,
| m.min_block_id,
| row_number() over(partition by t.id order by min_block_id desc) as rank
| FROM tempTable t
| INNER JOIN
| (
| SELECT id as min_block_id,
| value
| FROM
| (
| SELECT
| id,
| value,
| lag(value,1) over(partition by 1 order by id asc) as pre_value
| FROM tempTable
| )
| WHERE pre_value is null
| or value!=pre_value
| ) m
| ON t.value=m.value
| AND t.id>=m.min_block_id
|)
|WHERE rank=1
|ORDER BY id
""".stripMargin
hiveContext.sql(sql).show(false)
}
}