spark+hive開窗函數(shù)練習:求用戶每次會話的行為軌跡

數(shù)據(jù):

1001 2020-09-10 10:21:21 home.html

1001 2020-09-10 10:28:10 good_list.html

1001 2020-09-10 10:35:05 good_detail.html

1001 2020-09-10 10:42:55 cart.html

1001 2020-09-10 11:35:21 home.html

1001 2020-09-10 11:36:10 cart.html

1001 2020-09-10 11:38:12 trade.html

1001 2020-09-10 11:38:55 payment.html

1002 2020-09-10 09:40:00 home.html

1002 2020-09-10 09:41:00 mine.html

1002 2020-09-10 09:42:00 favor.html

1003 2020-09-10 13:10:00 home.html

1003 2020-09-10 13:15:00 search.html

需求:?

求得用戶每次會話的行為軌跡(同一用戶赁豆,上一條與下一條在半小時內(nèi)為一次會話)

要求結(jié)果如下:

A 1001 2020-09-10 10:21:21 home.html 1

A 1001 2020-09-10 10:28:10 good_list.html 2

A 1001 2020-09-10 10:35:05 good_detail.html 3

A 1001 2020-09-10 10:42:55 cart.html 4

B 1001 2020-09-10 11:35:21 home.html 1

B 1001 2020-09-10 11:36:10 cart.html 2

B 1001 2020-09-10 11:38:12 trade.html 3

B 1001 2020-09-10 11:38:55 payment.html 4

C 1002 2020-09-10 09:40:00 home.html 1

C 1002 2020-09-10 09:41:00 mine.html 2

C 1002 2020-09-10 09:42:00 favor.html 3

D 1003 2020-09-10 13:10:00 home.html 1

D 1003 2020-09-10 13:15:00 search.html 2

一仅醇、sql語句:

1、建表:

create table page_session(

????????user_id string,

????????page_time string,

????????page string)

row format delimited

fields terminated by '\t';

2魔种、按時間排序-升序,按user_id分組,使用開窗函數(shù)lag()獲得上一條的時間

select

????????user_id,

????????page_time,

????????page,

lag(page_time) over(partition by user_id order by page_time asc) before_time

from page_session;

結(jié)果如下:

user_id page_time page before_time

1001 2020-09-10 10:21:21 home.html? ? ? NULL

1001 2020-09-10 10:28:10 good_list.html? 2020-09-10 10:21:21

1001 2020-09-10 10:35:05 good_detail.html 2020-09-10 10:28:10

1001 2020-09-10 10:42:55 cart.html? ? ? 2020-09-10 10:35:05

1001 2020-09-10 11:35:21 home.html? ? ? 2020-09-10 10:42:55

1001 2020-09-10 11:36:10 cart.html? ? ? 2020-09-10 11:35:21

1001 2020-09-10 11:38:12 trade.html? ? ? 2020-09-10 11:36:10

1001 2020-09-10 11:38:55 payment.html? ? 2020-09-10 11:38:12

1002 2020-09-10 09:40:00 home.html? ? ? NULL

1002 2020-09-10 09:41:00 mine.html? ? ? 2020-09-10 09:40:00

1002 2020-09-10 09:42:00 favor.html? ? ? 2020-09-10 09:41:00

1003 2020-09-10 13:10:00 home.html? ? ? NULL

1003 2020-09-10 13:15:00 search.html? ? 2020-09-10 13:10:00

3析二、給每個會話添加會話id

兩個條件判斷是否為新會話:

1、before_time是否為null?

2节预、判斷上一次時間與下一次時間是否大于半小時(時間需要轉(zhuǎn)成時間戳),如果大于30分說明是新的會話

如果是新會話叶摄,就使用concat()拼接--將user_id和時間戳拼接起來得到唯一的session_id

select

????????user_id,

????????page_time,

????????page,

????????before_time,

if( before_time is null or unix_timestamp(page_time) - unix_timestamp(before_time) > 30*60,

concat(user_id,'-',unix_timestamp(page_time)),null ) session_id

from(

? ? ? ? select

????????????????user_id,

????????????????page_time,

????????????????page,

? ? ? ? lag(page_time) over(partition by user_id order by page_time asc) before_time

????????from page_session) tmp1;

結(jié)果如下:

user_id page_time page before_time session_id

1001 2020-09-10 10:21:21 home.html? ? ? NULL? ? 1001-1599733281

1001 2020-09-10 10:28:10 good_list.html? 2020-09-10 10:21:21 NULL

1001 2020-09-10 10:35:05 good_detail.html 2020-09-10 10:28:10 NULL

1001 2020-09-10 10:42:55 cart.html? ? ? 2020-09-10 10:35:05 NULL

1001 2020-09-10 11:35:21 home.html? ? ? 2020-09-10 10:42:55 1001-1599737721

1001 2020-09-10 11:36:10 cart.html? ? ? 2020-09-10 11:35:21 NULL

1001 2020-09-10 11:38:12 trade.html? ? ? 2020-09-10 11:36:10 NULL

1001 2020-09-10 11:38:55 payment.html? ? 2020-09-10 11:38:12 NULL

1002 2020-09-10 09:40:00 home.html? ? ? NULL? ? 1002-1599730800

1002 2020-09-10 09:41:00 mine.html? ? ? 2020-09-10 09:40:00 NULL

1002 2020-09-10 09:42:00 favor.html? ? ? 2020-09-10 09:41:00 NULL

1003 2020-09-10 13:10:00 home.html? ? ? NULL? ? 1003-1599743400

1003 2020-09-10 13:15:00 search.html? ? 2020-09-10 13:10:00 NULL

4、給每個會話的其他數(shù)據(jù)都添加上對應的session_id

last_value(xx,true) --返回一組值中的最后一個值(該組通常是有序集合)安拟。

將第二個參數(shù)設(shè)置為true是忽略null值蛤吓,如果這組值中的最后一個值是null值,返回該集合中的最后一個非空值糠赦。

默認為false会傲,如果最后一個值為null,函數(shù)將返回 null拙泽。

如果所有值均為空值淌山,則返回 null。

select

????????user_id,

????????page_time,

????????page,

last_value(session_id,true) over(partition by user_id order by page_time asc) session_id

from(

????????select

????????????????user_id,

????????????????page_time,

????????????????page,

????????????????before_time,

????????if( before_time is null or unix_timestamp(page_time) - unix_timestamp(before_time) > 30*60,

????????concat(user_id,'-',unix_timestamp(page_time)),null ) session_id

????????from(

????????????????select

????????????????????????user_id,

????????????????????????page_time,

????????????????????????page,

????????????????lag(page_time) over(partition by user_id order by page_time asc) before_time

????????????????from page_session) tmp1) tmp2;

結(jié)果如下:

user_id page_time page session_id

1001 2020-09-10 10:21:21? ? home.html? ? ? 1001-1599733281

1001 2020-09-10 10:28:10? ? good_list.html? 1001-1599733281

1001 2020-09-10 10:35:05? ? good_detail.html 1001-1599733281

1001 2020-09-10 10:42:55? ? cart.html? ? ? 1001-1599733281

1001 2020-09-10 11:35:21? ? home.html? ? ? 1001-1599737721

1001 2020-09-10 11:36:10? ? cart.html? ? ? 1001-1599737721

1001 2020-09-10 11:38:12? ? trade.html? ? ? 1001-1599737721

1001 2020-09-10 11:38:55? ? payment.html? ? 1001-1599737721

1002 2020-09-10 09:40:00? ? home.html? ? ? 1002-1599730800

1002 2020-09-10 09:41:00? ? mine.html? ? ? 1002-1599730800

1002 2020-09-10 09:42:00? ? favor.html? ? ? 1002-1599730800

1003 2020-09-10 13:10:00? ? home.html? ? ? 1003-1599743400

1003 2020-09-10 13:15:00? ? search.html? ? 1003-1599743400

5顾瞻、求行為軌跡--

ROW_NUMBER()排序:會根據(jù)順序計算

select

????????user_id,

????????page_time,

????????page,

????????session_id,

row_number()over(partition by session_id order by page_time asc) step

from(

????????????????select

????????????????user_id,

????????????????page_time,

????????????????page,

????????last_value(session_id,true) over(partition by user_id order by page_time asc) session_id

????????from(

????????????????select

????????????????????????user_id,

????????????????????????page_time,

????????????????????????page,

????????????????????????before_time,

????????????????if( before_time is null or unix_timestamp(page_time) - unix_timestamp(before_time) > 30*60,

????????????????concat(user_id,'-',unix_timestamp(page_time)),null ) session_id

????????????????from(

?????????????????????????select

????????????????????????????????user_id,

????????????????????????????????page_time,

????????????????????????????????page,

????????????????????????lag(page_time) over(partition by user_id order by page_time asc) before_time

????????????????????????from page_session) tmp1) tmp2) tmp3;

結(jié)果如下:

user_id page_time page session_id step

1001 2020-09-10 10:21:21 home.html? ? ? 1001-1599733281 1

1001 2020-09-10 10:28:10 good_list.html? 1001-1599733281 2

1001 2020-09-10 10:35:05 good_detail.html 1001-1599733281 3

1001 2020-09-10 10:42:55 cart.html? ? ? 1001-1599733281 4

1001 2020-09-10 11:35:21 home.html? ? ? 1001-1599737721 1

1001 2020-09-10 11:36:10 cart.html? ? ? 1001-1599737721 2

1001 2020-09-10 11:38:12 trade.html? ? ? 1001-1599737721 3

1001 2020-09-10 11:38:55 payment.html? ? 1001-1599737721 4

1002 2020-09-10 09:40:00 home.html? ? ? 1002-1599730800 1

1002 2020-09-10 09:41:00 mine.html? ? ? 1002-1599730800 2

1002 2020-09-10 09:42:00 favor.html? ? ? 1002-1599730800 3

1003 2020-09-10 13:10:00 home.html? ? ? 1003-1599743400 1

1003 2020-09-10 13:15:00 search.html? ? 1003-1599743400 2

二艾岂、代碼實現(xiàn):

import java.text.SimpleDateFormat

import java.util.UUID

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.Dataset

object SessionTest1 {

? def main(args: Array[String]): Unit = {

? ? //需求:求得每個用戶每次會話的行為軌跡

? ? import org.apache.spark.sql.SparkSession

? ? val spark = SparkSession.builder().master("local[4]").appName("test").getOrCreate()

? ? import spark.implicits._

? ? //1、讀取數(shù)據(jù)(行類型轉(zhuǎn)為元組)

? ? val ds: Dataset[(String, String, String)] = spark.read.option("sep","\t").csv("datas/session.txt")

? ? ? .toDF("user_id","page_time","page").as[(String,String,String)]

? ? //轉(zhuǎn)為rdd操作

? ? val rdd: RDD[(String, String, String)] = ds.rdd

? ? //此時的結(jié)果

? ? //RDD(

? ? //? (1001,2020-09-10 10:21:21,home.html),

? ? //? (1001,2020-09-10 10:28:10,good_list.html),

? ? //? (1001,2020-09-10 10:35:05,good_detail.html),

? ? //? ...

? ? //? (1003,2020-09-10 13:15:00,search.html)

? ? // )

? ? //2朋其、轉(zhuǎn)換數(shù)據(jù)類型--轉(zhuǎn)為樣例類

? ? val rdd2: RDD[UserAnalysis] = rdd.map{

? ? ? case(userid,timestr,page)=>

? ? ? ? //格式化時間

? ? ? ? val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

? ? ? ? //獲得時間戳-毫秒

? ? ? ? val time = formatter.parse(timestr).getTime

? ? ? ? UserAnalysis(userid,time,timestr,page)

? ? }

? ? //此時的結(jié)果:

? ? //RDD(

? ? //? UserAnalysis(1001,1599704481000,2020-09-10 10:21:21,home.html,cdd00058-48f6-4cd8-980d-4f6966d18324,1)

? ? //? UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,342b9112-af55-4f03-802e-e9ee4f178818,1)

? ? //? UserAnalysis(1001,1599705305000,2020-09-10 10:35:05,good_detail.html,33bf0bce-7620-478d-bfea-0f48c2321ebe,1)

? ? //? ...

? ? //? UserAnalysis(1003,1599714900000,2020-09-10 13:15:00,search.html,825d1cd1-a506-48da-92c7-253602395f3e,1))

? ? //3王浴、按照用戶id分組

? ? val rdd3: RDD[(String, Iterable[UserAnalysis])] = rdd2.groupBy(x => x.userid)

? ? //此時的結(jié)果:

? ? //RDD(

? ? //? 1001->Iterable(

? ? //? ? ? ? ? UserAnalysis(1001,1599704481000,2020-09-10 10:21:21,home.html,cdd00058-48f6-4cd8-980d-4f6966d18324,1)

? ? //? ? ? ? ? UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,342b9112-af55-4f03-802e-e9ee4f178818,1)

? ? //? ? ? ? ? UserAnalysis(1001,1599705305000,2020-09-10 10:35:05,good_detail.html,33bf0bce-7620-478d-bfea-0f48c2321ebe,1)

? ? //? ? ? ? ? ...)

? ? //? ...

? ? //? 1003->Iterable(

? ? //? ? ? ? ? ...

? ? //? ? ? ? ? UserAnalysis(1003,1599714900000,2020-09-10 13:15:00,search.html,825d1cd1-a506-48da-92c7-253602395f3e,1)))

? ? val rdd4: RDD[UserAnalysis] = rdd3.flatMap(x=>{

? ? ? //4、對每個用戶所有數(shù)據(jù)排序

? ? ? val sortedList: List[UserAnalysis] = x._2.toList.sortBy(y=>y.time)

? ? ? //結(jié)果為:

? ? ? //List(

? ? ? //? ? UserAnalysis(1001,1599704481000,2020-09-10 10:21:21,home.html,cdd00058-48f6-4cd8-980d-4f6966d18324,1)

? ? ? //? ? UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,342b9112-af55-4f03-802e-e9ee4f178818,1)

? ? ? //? ? UserAnalysis(1001,1599705305000,2020-09-10 10:35:05,good_detail.html,33bf0bce-7620-478d-bfea-0f48c2321ebe,1)

? ? ? //? ? ...)

? ? ? //5梅猿、針對每個用戶所有數(shù)據(jù)滑窗--每2個為一個窗口

? ? ? val slidingList: Iterator[List[UserAnalysis]] = sortedList.sliding(2)

? ? ? //結(jié)果為:

? ? ? //Iterator(

? ? ? //? ? List( UserAnalysis(1001,1599704481000,2020-09-10 10:21:21,home.html,cdd00058-48f6-4cd8-980d-4f6966d18324,1),

? ? ? //? ? ? ? ? UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,342b9112-af55-4f03-802e-e9ee4f178818,1))

? ? ? //? ? List( UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,05899882-50bb-4c21-8cbf-32cae82c27f1,1),

? ? ? //? ? ? ? ? UserAnalysis(1001,1599705305000,2020-09-10 10:35:05,good_detail.html,4dd73745-b572-4304-a9ff-3b64d7b95b67,1) )

? ? ? //? ? ....

? ? ? // )

? ? ? //6氓辣、判斷每個窗口中兩個數(shù)據(jù)是否屬于同一個session,如果屬于同一個會話,修改session袱蚓,step

? ? ? slidingList.foreach(list=>{

? ? ? ? //取出窗口中的第一個數(shù)據(jù)

? ? ? ? val first = list.head

? ? ? ? //取出窗口中的第二個數(shù)據(jù)

? ? ? ? val next = list.last

? ? ? ? //判斷第一個數(shù)據(jù)和下一個數(shù)據(jù)的時間是否在30分鐘內(nèi)

? ? ? ? if (next.time-first.time <= 30*60*1000){

? ? ? ? ? //屬于同一個會話钞啸,那么下一條數(shù)據(jù)的session和上一條一樣

? ? ? ? ? next.session=first.session

? ? ? ? ? //屬于同一個會話,那么下一條數(shù)據(jù)的軌跡step+1

? ? ? ? ? next.step=first.step+1

? ? ? ? }

? ? ? })

? ? ? //返回樣例類對象(樣例類對象對應的session和step已經(jīng)修改了)

? ? ? x._2

? ? })

? ? //7喇潘、結(jié)果展示

? ? rdd4.foreach(println)

? ? //結(jié)果:

? ? //UserAnalysis(1001,1599704481000,2020-09-10 10:21:21,home.html,d341fd8b-333d-43ae-8ea2-795a5b544b6a,1)

? ? //UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,d341fd8b-333d-43ae-8ea2-795a5b544b6a,2)

? ? //UserAnalysis(1001,1599705305000,2020-09-10 10:35:05,good_detail.html,d341fd8b-333d-43ae-8ea2-795a5b544b6a,3)

? ? //UserAnalysis(1001,1599705775000,2020-09-10 10:42:55,cart.html,d341fd8b-333d-43ae-8ea2-795a5b544b6a,4)

? ? //UserAnalysis(1001,1599708921000,2020-09-10 11:35:21,home.html,7de3b33f-a476-407d-9dd6-a763130130d2,1)

? ? //UserAnalysis(1001,1599708970000,2020-09-10 11:36:10,cart.html,7de3b33f-a476-407d-9dd6-a763130130d2,2)

? ? //UserAnalysis(1001,1599709092000,2020-09-10 11:38:12,trade.html,7de3b33f-a476-407d-9dd6-a763130130d2,3)

? ? //UserAnalysis(1001,1599709135000,2020-09-10 11:38:55,payment.html,7de3b33f-a476-407d-9dd6-a763130130d2,4)

? ? //UserAnalysis(1002,1599702000000,2020-09-10 09:40:00,home.html,e84903df-73d5-4fee-83e5-a11c186a8e27,1)

? ? //UserAnalysis(1002,1599702060000,2020-09-10 09:41:00,mine.html,e84903df-73d5-4fee-83e5-a11c186a8e27,2)

? ? //UserAnalysis(1002,1599702120000,2020-09-10 09:42:00,favor.html,e84903df-73d5-4fee-83e5-a11c186a8e27,3)

? ? //UserAnalysis(1003,1599714600000,2020-09-10 13:10:00,home.html,292ff465-60e8-483f-908a-3d03eebca0f9,1)

? ? //UserAnalysis(1003,1599714900000,2020-09-10 13:15:00,search.html,292ff465-60e8-483f-908a-3d03eebca0f9,2)

? }

}

//UUID(Universally Unique IDentifier)全局唯一標識符,是指在一臺機器上生成的數(shù)字体斩,它保證對在同一時空中的所有機器都是唯一的

//UUID.randomUUID().toString()是javaJDK(1.5以上的版本)提供的一個自動生成主鍵的方法,它生成的是以為32位的數(shù)字和字母組合的字符颖低,中間還參雜著4個 - 符號絮吵。

case class UserAnalysis(userid:String,time:Long,timestr:String,page:String,var session:String = UUID.randomUUID().toString,var step:Int=1)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市忱屑,隨后出現(xiàn)的幾起案子蹬敲,更是在濱河造成了極大的恐慌暇昂,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,406評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件伴嗡,死亡現(xiàn)場離奇詭異急波,居然都是意外死亡,警方通過查閱死者的電腦和手機瘪校,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,732評論 3 393
  • 文/潘曉璐 我一進店門彤钟,熙熙樓的掌柜王于貴愁眉苦臉地迎上來猩系,“玉大人,你說我怎么就攤上這事±铮” “怎么了幕侠?”我有些...
    開封第一講書人閱讀 163,711評論 0 353
  • 文/不壞的土叔 我叫張陵油挥,是天一觀的道長顾患。 經(jīng)常有香客問我,道長用踩,這世上最難降的妖魔是什么渠退? 我笑而不...
    開封第一講書人閱讀 58,380評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮脐彩,結(jié)果婚禮上碎乃,老公的妹妹穿的比我還像新娘。我一直安慰自己惠奸,他們只是感情好梅誓,可當我...
    茶點故事閱讀 67,432評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著佛南,像睡著了一般梗掰。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上嗅回,一...
    開封第一講書人閱讀 51,301評論 1 301
  • 那天及穗,我揣著相機與錄音,去河邊找鬼绵载。 笑死埂陆,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的娃豹。 我是一名探鬼主播焚虱,決...
    沈念sama閱讀 40,145評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼懂版!你這毒婦竟也來了鹃栽?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,008評論 0 276
  • 序言:老撾萬榮一對情侶失蹤定续,失蹤者是張志新(化名)和其女友劉穎谍咆,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體私股,經(jīng)...
    沈念sama閱讀 45,443評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡摹察,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,649評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了倡鲸。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片供嚎。...
    茶點故事閱讀 39,795評論 1 347
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖峭状,靈堂內(nèi)的尸體忽然破棺而出克滴,到底是詐尸還是另有隱情,我是刑警寧澤优床,帶...
    沈念sama閱讀 35,501評論 5 345
  • 正文 年R本政府宣布劝赔,位于F島的核電站,受9級特大地震影響胆敞,放射性物質(zhì)發(fā)生泄漏着帽。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,119評論 3 328
  • 文/蒙蒙 一移层、第九天 我趴在偏房一處隱蔽的房頂上張望仍翰。 院中可真熱鬧,春花似錦观话、人聲如沸予借。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,731評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽灵迫。三九已至,卻和暖如春晦溪,著一層夾襖步出監(jiān)牢的瞬間龟再,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,865評論 1 269
  • 我被黑心中介騙來泰國打工尼变, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留利凑,地道東北人。 一個月前我還...
    沈念sama閱讀 47,899評論 2 370
  • 正文 我出身青樓嫌术,卻偏偏與公主長得像哀澈,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子度气,可洞房花燭夜當晚...
    茶點故事閱讀 44,724評論 2 354

推薦閱讀更多精彩內(nèi)容