數(shù)據(jù):
1001 2020-09-10 10:21:21 home.html
1001 2020-09-10 10:28:10 good_list.html
1001 2020-09-10 10:35:05 good_detail.html
1001 2020-09-10 10:42:55 cart.html
1001 2020-09-10 11:35:21 home.html
1001 2020-09-10 11:36:10 cart.html
1001 2020-09-10 11:38:12 trade.html
1001 2020-09-10 11:38:55 payment.html
1002 2020-09-10 09:40:00 home.html
1002 2020-09-10 09:41:00 mine.html
1002 2020-09-10 09:42:00 favor.html
1003 2020-09-10 13:10:00 home.html
1003 2020-09-10 13:15:00 search.html
需求:?
求得用戶每次會話的行為軌跡(同一用戶赁豆,上一條與下一條在半小時內(nèi)為一次會話)
要求結(jié)果如下:
A 1001 2020-09-10 10:21:21 home.html 1
A 1001 2020-09-10 10:28:10 good_list.html 2
A 1001 2020-09-10 10:35:05 good_detail.html 3
A 1001 2020-09-10 10:42:55 cart.html 4
B 1001 2020-09-10 11:35:21 home.html 1
B 1001 2020-09-10 11:36:10 cart.html 2
B 1001 2020-09-10 11:38:12 trade.html 3
B 1001 2020-09-10 11:38:55 payment.html 4
C 1002 2020-09-10 09:40:00 home.html 1
C 1002 2020-09-10 09:41:00 mine.html 2
C 1002 2020-09-10 09:42:00 favor.html 3
D 1003 2020-09-10 13:10:00 home.html 1
D 1003 2020-09-10 13:15:00 search.html 2
一仅醇、sql語句:
1、建表:
create table page_session(
????????user_id string,
????????page_time string,
????????page string)
row format delimited
fields terminated by '\t';
2魔种、按時間排序-升序,按user_id分組,使用開窗函數(shù)lag()獲得上一條的時間
select
????????user_id,
????????page_time,
????????page,
lag(page_time) over(partition by user_id order by page_time asc) before_time
from page_session;
結(jié)果如下:
user_id page_time page before_time
1001 2020-09-10 10:21:21 home.html? ? ? NULL
1001 2020-09-10 10:28:10 good_list.html? 2020-09-10 10:21:21
1001 2020-09-10 10:35:05 good_detail.html 2020-09-10 10:28:10
1001 2020-09-10 10:42:55 cart.html? ? ? 2020-09-10 10:35:05
1001 2020-09-10 11:35:21 home.html? ? ? 2020-09-10 10:42:55
1001 2020-09-10 11:36:10 cart.html? ? ? 2020-09-10 11:35:21
1001 2020-09-10 11:38:12 trade.html? ? ? 2020-09-10 11:36:10
1001 2020-09-10 11:38:55 payment.html? ? 2020-09-10 11:38:12
1002 2020-09-10 09:40:00 home.html? ? ? NULL
1002 2020-09-10 09:41:00 mine.html? ? ? 2020-09-10 09:40:00
1002 2020-09-10 09:42:00 favor.html? ? ? 2020-09-10 09:41:00
1003 2020-09-10 13:10:00 home.html? ? ? NULL
1003 2020-09-10 13:15:00 search.html? ? 2020-09-10 13:10:00
3析二、給每個會話添加會話id
兩個條件判斷是否為新會話:
1、before_time是否為null?
2节预、判斷上一次時間與下一次時間是否大于半小時(時間需要轉(zhuǎn)成時間戳),如果大于30分說明是新的會話
如果是新會話叶摄,就使用concat()拼接--將user_id和時間戳拼接起來得到唯一的session_id
select
????????user_id,
????????page_time,
????????page,
????????before_time,
if( before_time is null or unix_timestamp(page_time) - unix_timestamp(before_time) > 30*60,
concat(user_id,'-',unix_timestamp(page_time)),null ) session_id
from(
? ? ? ? select
????????????????user_id,
????????????????page_time,
????????????????page,
? ? ? ? lag(page_time) over(partition by user_id order by page_time asc) before_time
????????from page_session) tmp1;
結(jié)果如下:
user_id page_time page before_time session_id
1001 2020-09-10 10:21:21 home.html? ? ? NULL? ? 1001-1599733281
1001 2020-09-10 10:28:10 good_list.html? 2020-09-10 10:21:21 NULL
1001 2020-09-10 10:35:05 good_detail.html 2020-09-10 10:28:10 NULL
1001 2020-09-10 10:42:55 cart.html? ? ? 2020-09-10 10:35:05 NULL
1001 2020-09-10 11:35:21 home.html? ? ? 2020-09-10 10:42:55 1001-1599737721
1001 2020-09-10 11:36:10 cart.html? ? ? 2020-09-10 11:35:21 NULL
1001 2020-09-10 11:38:12 trade.html? ? ? 2020-09-10 11:36:10 NULL
1001 2020-09-10 11:38:55 payment.html? ? 2020-09-10 11:38:12 NULL
1002 2020-09-10 09:40:00 home.html? ? ? NULL? ? 1002-1599730800
1002 2020-09-10 09:41:00 mine.html? ? ? 2020-09-10 09:40:00 NULL
1002 2020-09-10 09:42:00 favor.html? ? ? 2020-09-10 09:41:00 NULL
1003 2020-09-10 13:10:00 home.html? ? ? NULL? ? 1003-1599743400
1003 2020-09-10 13:15:00 search.html? ? 2020-09-10 13:10:00 NULL
4、給每個會話的其他數(shù)據(jù)都添加上對應的session_id
last_value(xx,true) --返回一組值中的最后一個值(該組通常是有序集合)安拟。
將第二個參數(shù)設(shè)置為true是忽略null值蛤吓,如果這組值中的最后一個值是null值,返回該集合中的最后一個非空值糠赦。
默認為false会傲,如果最后一個值為null,函數(shù)將返回 null拙泽。
如果所有值均為空值淌山,則返回 null。
select
????????user_id,
????????page_time,
????????page,
last_value(session_id,true) over(partition by user_id order by page_time asc) session_id
from(
????????select
????????????????user_id,
????????????????page_time,
????????????????page,
????????????????before_time,
????????if( before_time is null or unix_timestamp(page_time) - unix_timestamp(before_time) > 30*60,
????????concat(user_id,'-',unix_timestamp(page_time)),null ) session_id
????????from(
????????????????select
????????????????????????user_id,
????????????????????????page_time,
????????????????????????page,
????????????????lag(page_time) over(partition by user_id order by page_time asc) before_time
????????????????from page_session) tmp1) tmp2;
結(jié)果如下:
user_id page_time page session_id
1001 2020-09-10 10:21:21? ? home.html? ? ? 1001-1599733281
1001 2020-09-10 10:28:10? ? good_list.html? 1001-1599733281
1001 2020-09-10 10:35:05? ? good_detail.html 1001-1599733281
1001 2020-09-10 10:42:55? ? cart.html? ? ? 1001-1599733281
1001 2020-09-10 11:35:21? ? home.html? ? ? 1001-1599737721
1001 2020-09-10 11:36:10? ? cart.html? ? ? 1001-1599737721
1001 2020-09-10 11:38:12? ? trade.html? ? ? 1001-1599737721
1001 2020-09-10 11:38:55? ? payment.html? ? 1001-1599737721
1002 2020-09-10 09:40:00? ? home.html? ? ? 1002-1599730800
1002 2020-09-10 09:41:00? ? mine.html? ? ? 1002-1599730800
1002 2020-09-10 09:42:00? ? favor.html? ? ? 1002-1599730800
1003 2020-09-10 13:10:00? ? home.html? ? ? 1003-1599743400
1003 2020-09-10 13:15:00? ? search.html? ? 1003-1599743400
5顾瞻、求行為軌跡--
ROW_NUMBER()排序:會根據(jù)順序計算
select
????????user_id,
????????page_time,
????????page,
????????session_id,
row_number()over(partition by session_id order by page_time asc) step
from(
????????????????select
????????????????user_id,
????????????????page_time,
????????????????page,
????????last_value(session_id,true) over(partition by user_id order by page_time asc) session_id
????????from(
????????????????select
????????????????????????user_id,
????????????????????????page_time,
????????????????????????page,
????????????????????????before_time,
????????????????if( before_time is null or unix_timestamp(page_time) - unix_timestamp(before_time) > 30*60,
????????????????concat(user_id,'-',unix_timestamp(page_time)),null ) session_id
????????????????from(
?????????????????????????select
????????????????????????????????user_id,
????????????????????????????????page_time,
????????????????????????????????page,
????????????????????????lag(page_time) over(partition by user_id order by page_time asc) before_time
????????????????????????from page_session) tmp1) tmp2) tmp3;
結(jié)果如下:
user_id page_time page session_id step
1001 2020-09-10 10:21:21 home.html? ? ? 1001-1599733281 1
1001 2020-09-10 10:28:10 good_list.html? 1001-1599733281 2
1001 2020-09-10 10:35:05 good_detail.html 1001-1599733281 3
1001 2020-09-10 10:42:55 cart.html? ? ? 1001-1599733281 4
1001 2020-09-10 11:35:21 home.html? ? ? 1001-1599737721 1
1001 2020-09-10 11:36:10 cart.html? ? ? 1001-1599737721 2
1001 2020-09-10 11:38:12 trade.html? ? ? 1001-1599737721 3
1001 2020-09-10 11:38:55 payment.html? ? 1001-1599737721 4
1002 2020-09-10 09:40:00 home.html? ? ? 1002-1599730800 1
1002 2020-09-10 09:41:00 mine.html? ? ? 1002-1599730800 2
1002 2020-09-10 09:42:00 favor.html? ? ? 1002-1599730800 3
1003 2020-09-10 13:10:00 home.html? ? ? 1003-1599743400 1
1003 2020-09-10 13:15:00 search.html? ? 1003-1599743400 2
二艾岂、代碼實現(xiàn):
import java.text.SimpleDateFormat
import java.util.UUID
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Dataset
object SessionTest1 {
? def main(args: Array[String]): Unit = {
? ? //需求:求得每個用戶每次會話的行為軌跡
? ? import org.apache.spark.sql.SparkSession
? ? val spark = SparkSession.builder().master("local[4]").appName("test").getOrCreate()
? ? import spark.implicits._
? ? //1、讀取數(shù)據(jù)(行類型轉(zhuǎn)為元組)
? ? val ds: Dataset[(String, String, String)] = spark.read.option("sep","\t").csv("datas/session.txt")
? ? ? .toDF("user_id","page_time","page").as[(String,String,String)]
? ? //轉(zhuǎn)為rdd操作
? ? val rdd: RDD[(String, String, String)] = ds.rdd
? ? //此時的結(jié)果
? ? //RDD(
? ? //? (1001,2020-09-10 10:21:21,home.html),
? ? //? (1001,2020-09-10 10:28:10,good_list.html),
? ? //? (1001,2020-09-10 10:35:05,good_detail.html),
? ? //? ...
? ? //? (1003,2020-09-10 13:15:00,search.html)
? ? // )
? ? //2朋其、轉(zhuǎn)換數(shù)據(jù)類型--轉(zhuǎn)為樣例類
? ? val rdd2: RDD[UserAnalysis] = rdd.map{
? ? ? case(userid,timestr,page)=>
? ? ? ? //格式化時間
? ? ? ? val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
? ? ? ? //獲得時間戳-毫秒
? ? ? ? val time = formatter.parse(timestr).getTime
? ? ? ? UserAnalysis(userid,time,timestr,page)
? ? }
? ? //此時的結(jié)果:
? ? //RDD(
? ? //? UserAnalysis(1001,1599704481000,2020-09-10 10:21:21,home.html,cdd00058-48f6-4cd8-980d-4f6966d18324,1)
? ? //? UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,342b9112-af55-4f03-802e-e9ee4f178818,1)
? ? //? UserAnalysis(1001,1599705305000,2020-09-10 10:35:05,good_detail.html,33bf0bce-7620-478d-bfea-0f48c2321ebe,1)
? ? //? ...
? ? //? UserAnalysis(1003,1599714900000,2020-09-10 13:15:00,search.html,825d1cd1-a506-48da-92c7-253602395f3e,1))
? ? //3王浴、按照用戶id分組
? ? val rdd3: RDD[(String, Iterable[UserAnalysis])] = rdd2.groupBy(x => x.userid)
? ? //此時的結(jié)果:
? ? //RDD(
? ? //? 1001->Iterable(
? ? //? ? ? ? ? UserAnalysis(1001,1599704481000,2020-09-10 10:21:21,home.html,cdd00058-48f6-4cd8-980d-4f6966d18324,1)
? ? //? ? ? ? ? UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,342b9112-af55-4f03-802e-e9ee4f178818,1)
? ? //? ? ? ? ? UserAnalysis(1001,1599705305000,2020-09-10 10:35:05,good_detail.html,33bf0bce-7620-478d-bfea-0f48c2321ebe,1)
? ? //? ? ? ? ? ...)
? ? //? ...
? ? //? 1003->Iterable(
? ? //? ? ? ? ? ...
? ? //? ? ? ? ? UserAnalysis(1003,1599714900000,2020-09-10 13:15:00,search.html,825d1cd1-a506-48da-92c7-253602395f3e,1)))
? ? val rdd4: RDD[UserAnalysis] = rdd3.flatMap(x=>{
? ? ? //4、對每個用戶所有數(shù)據(jù)排序
? ? ? val sortedList: List[UserAnalysis] = x._2.toList.sortBy(y=>y.time)
? ? ? //結(jié)果為:
? ? ? //List(
? ? ? //? ? UserAnalysis(1001,1599704481000,2020-09-10 10:21:21,home.html,cdd00058-48f6-4cd8-980d-4f6966d18324,1)
? ? ? //? ? UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,342b9112-af55-4f03-802e-e9ee4f178818,1)
? ? ? //? ? UserAnalysis(1001,1599705305000,2020-09-10 10:35:05,good_detail.html,33bf0bce-7620-478d-bfea-0f48c2321ebe,1)
? ? ? //? ? ...)
? ? ? //5梅猿、針對每個用戶所有數(shù)據(jù)滑窗--每2個為一個窗口
? ? ? val slidingList: Iterator[List[UserAnalysis]] = sortedList.sliding(2)
? ? ? //結(jié)果為:
? ? ? //Iterator(
? ? ? //? ? List( UserAnalysis(1001,1599704481000,2020-09-10 10:21:21,home.html,cdd00058-48f6-4cd8-980d-4f6966d18324,1),
? ? ? //? ? ? ? ? UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,342b9112-af55-4f03-802e-e9ee4f178818,1))
? ? ? //? ? List( UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,05899882-50bb-4c21-8cbf-32cae82c27f1,1),
? ? ? //? ? ? ? ? UserAnalysis(1001,1599705305000,2020-09-10 10:35:05,good_detail.html,4dd73745-b572-4304-a9ff-3b64d7b95b67,1) )
? ? ? //? ? ....
? ? ? // )
? ? ? //6氓辣、判斷每個窗口中兩個數(shù)據(jù)是否屬于同一個session,如果屬于同一個會話,修改session袱蚓,step
? ? ? slidingList.foreach(list=>{
? ? ? ? //取出窗口中的第一個數(shù)據(jù)
? ? ? ? val first = list.head
? ? ? ? //取出窗口中的第二個數(shù)據(jù)
? ? ? ? val next = list.last
? ? ? ? //判斷第一個數(shù)據(jù)和下一個數(shù)據(jù)的時間是否在30分鐘內(nèi)
? ? ? ? if (next.time-first.time <= 30*60*1000){
? ? ? ? ? //屬于同一個會話钞啸,那么下一條數(shù)據(jù)的session和上一條一樣
? ? ? ? ? next.session=first.session
? ? ? ? ? //屬于同一個會話,那么下一條數(shù)據(jù)的軌跡step+1
? ? ? ? ? next.step=first.step+1
? ? ? ? }
? ? ? })
? ? ? //返回樣例類對象(樣例類對象對應的session和step已經(jīng)修改了)
? ? ? x._2
? ? })
? ? //7喇潘、結(jié)果展示
? ? rdd4.foreach(println)
? ? //結(jié)果:
? ? //UserAnalysis(1001,1599704481000,2020-09-10 10:21:21,home.html,d341fd8b-333d-43ae-8ea2-795a5b544b6a,1)
? ? //UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,d341fd8b-333d-43ae-8ea2-795a5b544b6a,2)
? ? //UserAnalysis(1001,1599705305000,2020-09-10 10:35:05,good_detail.html,d341fd8b-333d-43ae-8ea2-795a5b544b6a,3)
? ? //UserAnalysis(1001,1599705775000,2020-09-10 10:42:55,cart.html,d341fd8b-333d-43ae-8ea2-795a5b544b6a,4)
? ? //UserAnalysis(1001,1599708921000,2020-09-10 11:35:21,home.html,7de3b33f-a476-407d-9dd6-a763130130d2,1)
? ? //UserAnalysis(1001,1599708970000,2020-09-10 11:36:10,cart.html,7de3b33f-a476-407d-9dd6-a763130130d2,2)
? ? //UserAnalysis(1001,1599709092000,2020-09-10 11:38:12,trade.html,7de3b33f-a476-407d-9dd6-a763130130d2,3)
? ? //UserAnalysis(1001,1599709135000,2020-09-10 11:38:55,payment.html,7de3b33f-a476-407d-9dd6-a763130130d2,4)
? ? //UserAnalysis(1002,1599702000000,2020-09-10 09:40:00,home.html,e84903df-73d5-4fee-83e5-a11c186a8e27,1)
? ? //UserAnalysis(1002,1599702060000,2020-09-10 09:41:00,mine.html,e84903df-73d5-4fee-83e5-a11c186a8e27,2)
? ? //UserAnalysis(1002,1599702120000,2020-09-10 09:42:00,favor.html,e84903df-73d5-4fee-83e5-a11c186a8e27,3)
? ? //UserAnalysis(1003,1599714600000,2020-09-10 13:10:00,home.html,292ff465-60e8-483f-908a-3d03eebca0f9,1)
? ? //UserAnalysis(1003,1599714900000,2020-09-10 13:15:00,search.html,292ff465-60e8-483f-908a-3d03eebca0f9,2)
? }
}
//UUID(Universally Unique IDentifier)全局唯一標識符,是指在一臺機器上生成的數(shù)字体斩,它保證對在同一時空中的所有機器都是唯一的
//UUID.randomUUID().toString()是javaJDK(1.5以上的版本)提供的一個自動生成主鍵的方法,它生成的是以為32位的數(shù)字和字母組合的字符颖低,中間還參雜著4個 - 符號絮吵。
case class UserAnalysis(userid:String,time:Long,timestr:String,page:String,var session:String = UUID.randomUUID().toString,var step:Int=1)