[Hive] 兩個‘不常用’的函數(shù)posexplode和lag

昨天看到了大神 文哥的學(xué)習(xí)日記 的最新一章博客防楷,分享的是hive相關(guān)的知識。正好最近自己也在復(fù)盤hive则涯,所以特地學(xué)習(xí)并實踐了一下博客的內(nèi)容域帐。
文哥分享的是關(guān)于hive sql的四道面試題赘被,其實主要是圍繞hive sql的兩個函數(shù):posexplodelag/lead
說實話肖揣,我之前并不知道這兩個函數(shù)民假,更別說在實際工作中應(yīng)用了,相關(guān)的也就是用過explode函數(shù)龙优;不過在看了這兩個函數(shù)的使用場景后羊异,發(fā)現(xiàn)還是很值得一學(xué)的。

1.posexplode

在說明posexplode之前彤断,先了解下 Lateral View的用法:
Lateral View與用戶自定義生成函數(shù)即UDTF(如explode()或者split()等)結(jié)合使用野舶。
(UDTF:為每一個輸入行生成0個或者多個輸出行)
Lateral View將UDTF應(yīng)用于基礎(chǔ)表的每一行,然后將輸出行連接到輸入行宰衙,以形成具有所提供的表別名的虛擬表平道。
基本用法:
lateral view:LATERAL VIEW udtf(expression) tableAlias AS columnAlias

案例數(shù)據(jù)準(zhǔn)備:

 val spark = SparkSession.builder()
      .master("local[2]")
      .appName("hive context")
      .getOrCreate()

    val hiveContext = new HiveContext(spark.sparkContext)

    //指定schema
    val schema = types.StructType(Seq(
      StructField("id", StringType, true),
      StructField("time", StringType, true)
    ))
    //創(chuàng)建案例數(shù)據(jù)
    val dataRdd = spark.sparkContext.parallelize(Array("a,b,c,d;2:00,3:00,4:00,5:00", "f,b,c,d;1:10,2:20,3:30,4:40")).map(line => line.split(";"))
    val rowRdd = dataRdd.map(p => Row(p(0).trim, p(1).trim))

    //創(chuàng)建DataFrame
    val data = hiveContext.createDataFrame(rowRdd, schema)
    data.registerTempTable("tempTable")
    data.show(false)

結(jié)果顯示:
+-------+-------------------+
|id |time |
+-------+-------------------+
|a,b,c,d|2:00,3:00,4:00,5:00|
|f,b,c,d|1:10,2:20,3:30,4:40|
+-------+-------------------+

最后要展示的數(shù)據(jù)樣式為:


image.png

1.1 演示下explode用法

   //不使用 Lateral View
   val sql1 =
    s"""
       |SELECT
       |  id,
       |  time,
       |  explode(split(time,',')) as single_time
       |FROM tempTable
       """.stripMargin

    hiveContext.sql(sql1).show(false)
  
  //2.使用explode和lateral view,效果與1一樣
    val sql2 =
      s"""
         |SELECT
         |  id,
         |  time,
         |  single_time
         |FROM tempTable
         |lateral view explode(split(time,',')) as single_time
       """.stripMargin

    hiveContext.sql(sql2).show(false)

結(jié)果顯示:
+-------+-------------------+-----------+
|id |time |single_time|
+-------+-------------------+-----------+
|a,b,c,d|2:00,3:00,4:00,5:00|2:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|3:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|4:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|5:00 |
|f,b,c,d|1:10,2:20,3:30,4:40|1:10 |
|f,b,c,d|1:10,2:20,3:30,4:40|2:20 |
|f,b,c,d|1:10,2:20,3:30,4:40|3:30 |
|f,b,c,d|1:10,2:20,3:30,4:40|4:40 |
+-------+-------------------+-----------+

將id列也explode之后,結(jié)果顯示為:

+-------+-------------------+---------+-----------+
|id |time |single_id|single_time|
+-------+-------------------+---------+-----------+
|a,b,c,d|2:00,3:00,4:00,5:00|a |2:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|a |3:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|a |4:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|a |5:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|b |2:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|b |3:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|b |4:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|b |5:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|c |2:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|c |3:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|c |4:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|c |5:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|d |2:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|d |3:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|d |4:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|d |5:00 |
|f,b,c,d|1:10,2:20,3:30,4:40|f |1:10 |
|f,b,c,d|1:10,2:20,3:30,4:40|f |2:20 |
|f,b,c,d|1:10,2:20,3:30,4:40|f |3:30 |
|f,b,c,d|1:10,2:20,3:30,4:40|f |4:40 |
+-------+-------------------+---------+-----------+

1.2 使用posexplode函數(shù)
posexplode相比在explode之上供炼,將一列數(shù)據(jù)轉(zhuǎn)為多行之后一屋,還會輸出數(shù)據(jù)的下標(biāo)。
示例:

    val sql4 =
      s"""
         |SELECT
         |  id,
         |  time,
         |  single_id,
         |  single_id_index
         |FROM tempTable
         |lateral view posexplode(split(id,',')) t as single_id_index,single_id
     """.stripMargin

    hiveContext.sql(sql4).show(false)

會發(fā)現(xiàn)多了1列 single_id_index
結(jié)果顯示:
+-------+-------------------+---------+---------------+
|id |time |single_id|single_id_index|
+-------+-------------------+---------+---------------+
|a,b,c,d|2:00,3:00,4:00,5:00|a |0 |
|a,b,c,d|2:00,3:00,4:00,5:00|b |1 |
|a,b,c,d|2:00,3:00,4:00,5:00|c |2 |
|a,b,c,d|2:00,3:00,4:00,5:00|d |3 |
|f,b,c,d|1:10,2:20,3:30,4:40|f |0 |
|f,b,c,d|1:10,2:20,3:30,4:40|b |1 |
|f,b,c,d|1:10,2:20,3:30,4:40|c |2 |
|f,b,c,d|1:10,2:20,3:30,4:40|d |3 |
+-------+-------------------+---------+---------------+

1.3 在此基礎(chǔ)上袋哼,可以實現(xiàn)最終效果冀墨,只要選取id的下標(biāo)與time的下標(biāo)一致的記錄

   val sql5 =
      s"""
         |SELECT
         |  single_id,
         |  single_time
         |FROM tempTable
         | lateral view posexplode(split(id,',')) as single_id_index,single_id
         | lateral view posexplode(split(time,',')) as single_time_index,single_time
         |WHERE
         |  single_id_index=single_time_index
       """.stripMargin

    hiveContext.sql(sql5).show(false)

結(jié)果顯示:
+---------+-----------+
|single_id|single_time|
+---------+-----------+
|a |2:00 |
|b |3:00 |
|c |4:00 |
|d |5:00 |
|f |1:10 |
|b |2:20 |
|c |3:30 |
|d |4:40 |
+---------+-----------+

2.posexplode的應(yīng)用2

應(yīng)用場景:
對于記錄1:00001,輸出1對應(yīng)的下標(biāo)5
對于記錄2:0101涛贯,輸出1對應(yīng)的下標(biāo)2诽嘉,4
完整代碼顯示:

object PosexplodeDemo2 {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .master("local[2]")
      .appName("hive context")
      .getOrCreate()

    val hiveContext = new HiveContext(spark.sparkContext)

    //指定schema
    val schema = types.StructType(Seq(
      StructField("id", IntegerType, true),
      StructField("value", StringType, true)
    ))
    //創(chuàng)建案例數(shù)據(jù)
    val dataRdd = spark.sparkContext.parallelize(Array("1,1011", "2,0101","3,1111","4,00001")).map(line => line.split(","))
    val rowRdd = dataRdd.map(p => Row(p(0).toInt, p(1).trim))

    //創(chuàng)建DataFrame
    val data = hiveContext.createDataFrame(rowRdd, schema)
    data.registerTempTable("tempTable")
    data.show(false)

    /**
      * +---+-----+------------+------------------+
      * |id |value|single_value|single_value_index|
      * +---+-----+------------+------------------+
      * |1  |1011 |1           |0                 |
      * |1  |1011 |1           |2                 |
      * |1  |1011 |1           |3                 |
      * |2  |0101 |1           |1                 |
      * |2  |0101 |1           |3                 |
      * |3  |1111 |1           |0                 |
      * |3  |1111 |1           |1                 |
      * |3  |1111 |1           |2                 |
      * |3  |1111 |1           |3                 |
      * |4  |00001|1           |4                 |
      * +---+-----+------------+------------------+
      */
    val sql=
      s"""
         |SELECT
         |  id,
         |  value,
         |  single_value,
         |  single_value_index
         |FROM tempTable
         | lateral view posexplode(split(value,'')) as single_value_index,single_value
         |WHERE single_value='1'
       """.stripMargin
    hiveContext.sql(sql).show(false)


    /**
      * +---+-----+-------+
      * |id |value|indices|
      * +---+-----+-------+
      * |4  |00001|5      |
      * |2  |0101 |2,4    |
      * |1  |1011 |1,3,4  |
      * |3  |1111 |1,2,3,4|
      * +---+-----+-------+
      */
    val sql1=
      s"""
         |SELECT
         |  id,
         |  value,
         |  concat_ws(',',collect_list(single_value_index)) as indices
         |FROM
         |(
         |  SELECT
         |    id,
         |    value,
         |    single_value,
         |    cast(single_value_index+1 as string) as single_value_index
         |  FROM tempTable
         |   lateral view posexplode(split(value,'')) as single_value_index,single_value
         |  WHERE single_value='1'
         |)
         |GROUP BY id,value
       """.stripMargin

    hiveContext.sql(sql1).show(false)

  }
}

3.lag和lead函數(shù)

lag和lead是在實現(xiàn)分組排序的基礎(chǔ)上,能夠獲取到排序在當(dāng)前記錄前幾位或后幾位的記錄的某個字段值弟翘。
基礎(chǔ)語法:
lag(字段名,N) over(partition by 分組字段 order by 排序字段 排序方式)
lead(字段名,N) over(partition by 分組字段 order by 排序字段 排序方式)

lag括號里的參數(shù):字段名和數(shù)量N 含義是獲取分組排序后比該條記錄序號小N的對應(yīng)記錄的指定字段的值
如果字段名為ts虫腋,N為1,就是取分組排序之后上一條記錄的ts值

lead括號里的參數(shù):字段名和數(shù)量N 含義是獲取分組排序后比該條記錄序號大N的對應(yīng)記錄的指定字段的值
如果字段名為ts稀余,N為1悦冀,就是取分組排序之后下一條記錄的ts值

如果沒有前一行或者后一行,對應(yīng)的字段值為null

應(yīng)用場景:統(tǒng)計截至目前季度的平均值(按照time排序滚躯,并計算平均值)
* +----+----+-----+
* |id |time|score|
* +----+----+-----+
* |2014|A |3 |
* |2014|C |1 |
* |2014|B |2 |
* |2015|A |4 |
* |2015|C |3 |
* +----+----+-----+

完整代碼顯示:

 /**
      * +----+----+-----+---------+
      * |id  |time|score|pre_score|
      * +----+----+-----+---------+
      * |2014|A   |3    |null     |
      * |2014|B   |2    |3        |
      * |2014|C   |1    |2        |
      * |2015|A   |4    |null     |
      * |2015|C   |3    |4        |
      * +----+----+-----+---------+
      */
    val sql=
      s"""
         |SELECT
         |  id,
         |  time,
         |  score,
         |  lag(score,1) over(partition by id order by time asc) as pre_score
         |FROM tempTable
       """.stripMargin
    hiveContext.sql(sql).show(false)


    /**
      * +----+----+---------+
      * |id  |time|avg_score|
      * +----+----+---------+
      * |2014|A   |3.0      |
      * |2014|B   |2.5      |
      * |2014|C   |1.5      |
      * |2015|A   |4.0      |
      * |2015|C   |3.5      |
      * +----+----+---------+
      */
    val sql1=
      s"""
         |SELECT
         |  id,
         |  time,
         |  CASE WHEN pre_score IS NULL THEN score
         |  ELSE (score+pre_score)/2
         |  END AS avg_score
         |FROM
         |(
         |   SELECT
         |     id,
         |     time,
         |     score,
         |     lag(score,1) over(partition by id order by time asc) as pre_score
         |   FROM tempTable
         |) tmp
       """.stripMargin

    hiveContext.sql(sql1).show(false)

4.posexplode和lag函數(shù)的結(jié)合(實現(xiàn)分塊排序)

image.png

完整代碼顯示:

object Posexplode_LagDemo {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .master("local[2]")
      .appName("hive context")
      .getOrCreate()

    val hiveContext = new HiveContext(spark.sparkContext)

    //指定schema
    val schema = types.StructType(Seq(
      StructField("id", IntegerType, true),
      StructField("value", IntegerType, true)
    ))
    //創(chuàng)建案例數(shù)據(jù)
    val dataRdd = spark.sparkContext.parallelize(Array("2014,1", "2015,1","2017,0","2018,0","2019,1","2020,1","2021,1","2022,0","2023,0")).map(line => line.split(","))
    val rowRdd = dataRdd.map(p => Row(p(0).toInt, p(1).toInt))

    //創(chuàng)建DataFrame
    val data = hiveContext.createDataFrame(rowRdd, schema)
    data.registerTempTable("tempTable")

    /**
      * +----+-----+
      * |id  |value|
      * +----+-----+
      * |2014|1    |
      * |2015|1    |
      * |2017|0    |
      * |2018|0    |
      * |2019|1    |
      * |2020|1    |
      * |2021|1    |
      * |2022|0    |
      * |2023|0    |
      * +----+-----+
      */
    data.show(false)


    /**
      * +----+-----+---------+
      * |id  |value|pre_value|
      * +----+-----+---------+
      * |2014|1    |null     |
      * |2015|1    |1        |
      * |2017|0    |1        |
      * |2018|0    |0        |
      * |2019|1    |0        |
      * |2020|1    |1        |
      * |2021|1    |1        |
      * |2022|0    |1        |
      * |2023|0    |0        |
      * +----+-----+---------+
      * ------>
      * +------------+-----+
      * |min_block_id|value|
      * +------------+-----+
      * |2014        |1    |
      * |2017        |0    |
      * |2019        |1    |
      * |2022        |0    |
      * +------------+-----+
      * ------>
      * 基礎(chǔ)表與上面的表join
      * +----+-----+------------+----+
      * |id  |value|min_block_id|rank|
      * +----+-----+------------+----+
      * |2014|1    |2014        |1   |
      * |2015|1    |2014        |1   |
      * |2017|0    |2017        |1   |
      * |2018|0    |2017        |1   |
      * |2019|1    |2019        |1   |
      * |2019|1    |2014        |2   |
      * |2020|1    |2019        |1   |
      * |2020|1    |2014        |2   |
      * |2021|1    |2019        |1   |
      * |2021|1    |2014        |2   |
      * |2022|0    |2022        |1   |
      * |2022|0    |2017        |2   |
      * |2023|0    |2022        |1   |
      * |2023|0    |2017        |2   |
      * +----+-----+------------+----+
      *
      * ------>
      * 限制rank=1
      * +----+-----+------------+----+
      * |id  |value|min_block_id|rank|
      * +----+-----+------------+----+
      * |2014|1    |2014        |1   |
      * |2015|1    |2014        |1   |
      * |2017|0    |2017        |1   |
      * |2018|0    |2017        |1   |
      * |2019|1    |2019        |1   |
      * |2020|1    |2019        |1   |
      * |2021|1    |2019        |1   |
      * |2022|0    |2022        |1   |
      * |2023|0    |2022        |1   |
      * +----+-----+------------+----+
      *
      * ------>
      * 最終結(jié)果
      * +----+-----+--------+
      * |id  |value|new_rank|
      * +----+-----+--------+
      * |2014|1    |1       |
      * |2015|1    |2       |
      * |2017|0    |1       |
      * |2018|0    |2       |
      * |2019|1    |1       |
      * |2020|1    |2       |
      * |2021|1    |3       |
      * |2022|0    |1       |
      * |2023|0    |2       |
      * +----+-----+--------+
      *
      */
    val sql=
      s"""
         |SELECT id,
         |  value,
         |  row_number() over(partition by min_block_id order by id asc) as new_rank
         |FROM
         |(
         |  SELECT
         |    t.id,
         |    t.value,
         |    m.min_block_id,
         |    row_number() over(partition by t.id order by min_block_id desc) as rank
         |  FROM tempTable t
         |  INNER JOIN
         |  (
         |     SELECT id as min_block_id,
         |       value
         |     FROM
         |      (
         |       SELECT
         |         id,
         |         value,
         |         lag(value,1) over(partition by 1 order by id asc) as pre_value
         |       FROM tempTable
         |       )
         |     WHERE pre_value is null
         |      or value!=pre_value
         |  ) m
         |  ON t.value=m.value
         |  AND t.id>=m.min_block_id
         |)
         |WHERE rank=1
         |ORDER BY id
       """.stripMargin

    hiveContext.sql(sql).show(false)
  }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末雏门,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子掸掏,更是在濱河造成了極大的恐慌茁影,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,290評論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件丧凤,死亡現(xiàn)場離奇詭異募闲,居然都是意外死亡,警方通過查閱死者的電腦和手機愿待,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,107評論 2 385
  • 文/潘曉璐 我一進店門浩螺,熙熙樓的掌柜王于貴愁眉苦臉地迎上來靴患,“玉大人,你說我怎么就攤上這事要出≡Ь” “怎么了?”我有些...
    開封第一講書人閱讀 156,872評論 0 347
  • 文/不壞的土叔 我叫張陵患蹂,是天一觀的道長或颊。 經(jīng)常有香客問我,道長传于,這世上最難降的妖魔是什么囱挑? 我笑而不...
    開封第一講書人閱讀 56,415評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮沼溜,結(jié)果婚禮上平挑,老公的妹妹穿的比我還像新娘。我一直安慰自己系草,他們只是感情好通熄,可當(dāng)我...
    茶點故事閱讀 65,453評論 6 385
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著悄但,像睡著了一般棠隐。 火紅的嫁衣襯著肌膚如雪石抡。 梳的紋絲不亂的頭發(fā)上檐嚣,一...
    開封第一講書人閱讀 49,784評論 1 290
  • 那天,我揣著相機與錄音啰扛,去河邊找鬼嚎京。 笑死,一個胖子當(dāng)著我的面吹牛隐解,可吹牛的內(nèi)容都是我干的鞍帝。 我是一名探鬼主播,決...
    沈念sama閱讀 38,927評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼煞茫,長吁一口氣:“原來是場噩夢啊……” “哼帕涌!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起续徽,我...
    開封第一講書人閱讀 37,691評論 0 266
  • 序言:老撾萬榮一對情侶失蹤蚓曼,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后钦扭,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體纫版,經(jīng)...
    沈念sama閱讀 44,137評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,472評論 2 326
  • 正文 我和宋清朗相戀三年客情,在試婚紗的時候發(fā)現(xiàn)自己被綠了其弊。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片癞己。...
    茶點故事閱讀 38,622評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖梭伐,靈堂內(nèi)的尸體忽然破棺而出痹雅,到底是詐尸還是另有隱情,我是刑警寧澤糊识,帶...
    沈念sama閱讀 34,289評論 4 329
  • 正文 年R本政府宣布练慕,位于F島的核電站,受9級特大地震影響技掏,放射性物質(zhì)發(fā)生泄漏铃将。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,887評論 3 312
  • 文/蒙蒙 一哑梳、第九天 我趴在偏房一處隱蔽的房頂上張望劲阎。 院中可真熱鬧,春花似錦、人聲如沸体啰。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,741評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽锡垄。三九已至,卻和暖如春祭隔,著一層夾襖步出監(jiān)牢的瞬間货岭,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評論 1 265
  • 我被黑心中介騙來泰國打工疾渴, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留千贯,地道東北人。 一個月前我還...
    沈念sama閱讀 46,316評論 2 360
  • 正文 我出身青樓搞坝,卻偏偏與公主長得像搔谴,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子桩撮,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,490評論 2 348

推薦閱讀更多精彩內(nèi)容