spark練習(xí):求得用戶每次會話的行為軌跡--解決數(shù)據(jù)傾斜

數(shù)據(jù):

1001,2020-09-10 10:21:21,home.html

1001,2020-09-10 10:28:10,good_list.html

1002,2020-09-10 09:40:00,home.html

1001,2020-09-10 10:35:05,good_detail.html

1002,2020-09-10 09:42:00,favor.html

1001,2020-09-10 10:42:55,cart.html

1001,2020-09-10 10:43:55,11.html

1001,2020-09-10 10:44:55,22.html

1001,2020-09-10 10:45:55,33.html

1001,2020-09-10 10:46:55,44.html

1001,2020-09-10 10:47:55,55.html

1001,2020-09-10 10:48:55,66.html

1001,2020-09-10 10:49:55,77.html

1002,2020-09-10 09:41:00,mine.html

1001,2020-09-10 11:35:21,home.html

1001,2020-09-10 11:36:10,cart.html

1003,2020-09-10 13:10:00,home.html

1001,2020-09-10 11:38:12,trade.html

1001,2020-09-10 11:39:12,aa.html

1001,2020-09-10 11:40:12,bb.html

1001,2020-09-10 11:41:12,cc.html

1001,2020-09-10 11:42:12,dd.html

1001,2020-09-10 11:43:12,ee.html

1001,2020-09-10 11:44:12,ff.html

1001,2020-09-10 11:45:12,gg.html

1001,2020-09-10 11:46:12,hh.html

1001,2020-09-10 11:47:12,ll.html

1001,2020-09-10 11:38:55,payment.html

1003,2020-09-10 13:15:00,search.html

需求:求得用戶每次會話的行為軌跡--解決數(shù)據(jù)傾斜

import java.text.SimpleDateFormat

import java.util

import java.util.UUID

import org.apache.spark.{Partition, RangePartitioner}

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.Dataset

object SessionTest2 {

? def main(args: Array[String]): Unit = {

? ? //需求:求得用戶每次會話的行為軌跡--解決數(shù)據(jù)傾斜

? ? import org.apache.spark.sql.SparkSession

? ? val spark = SparkSession.builder().master("local[4]").appName("test").getOrCreate()

? ? import spark.implicits._

? ? //1蹬竖、讀取數(shù)據(jù)

? ? val ds = spark.read.csv("datas/session2.txt").toDF("user_id", "page_time", "page").as[(String, String, String)]

? ? //獲取一個集合累加器

? ? val acc = spark.sparkContext.collectionAccumulator[(String, UserAnalysis)]("acc")

? ? //2、轉(zhuǎn)換數(shù)據(jù)類型--樣例類(轉(zhuǎn)成樣例類方便修改值)

? ? val ds2: Dataset[(String, UserAnalysis)] = ds.map {

? ? ? case (userid, timestr, page) =>

? ? ? ? //獲取時間戳

? ? ? ? val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

? ? ? ? val time = formatter.parse(timestr).getTime

? ? ? ? //獲得user對象

? ? ? ? val user = UserAnalysis(userid, time, timestr, page)

? ? ? ? //返回kv鍵值對--k為user_id和time或timestr拼接起來的字符串,方便后續(xù)分組排序

? ? ? ? (s"${userid}_${time}", user)

? ? }

? ? //3、轉(zhuǎn)成rdd

? ? val rdd = ds2.rdd

? ? //4、用RangePartitioner進(jìn)行重分區(qū)--得到的結(jié)果在每個分區(qū)內(nèi)是有序的魏割,在分區(qū)間也是有序的

? ? val rdd2: RDD[(String, UserAnalysis)] = rdd.repartitionAndSortWithinPartitions(new RangePartitioner(5, rdd))

? ? //repartitionAndSortWithinPartitions():根據(jù)給定的分區(qū)器對RDD進(jìn)行重新分區(qū),并在每個結(jié)果分區(qū)中根據(jù)key進(jìn)行排序。

? ? //? ? ? ? ? def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] ={...}

? ? //class RangePartitioner[K : Ordering : ClassTag, V](partitions: Int,rdd: RDD[_ <: Product2[K, V]],...)extends Partitioner {...}

? ? //RangePartitioner分區(qū)規(guī)則:首先對rdd采樣出 分區(qū)數(shù)-1 個key眨八,通過這些key確定 分區(qū)數(shù)個 邊界,

? ? //? ? ? ? 這些邊界就是每個分區(qū)的邊界左电,后續(xù)key會與每個分區(qū)的邊界對比廉侧,如果在范圍內(nèi)页响,則數(shù)據(jù)放入該分區(qū)

? ? //5、對每個分區(qū)中每個用戶的兩兩數(shù)據(jù)進(jìn)行判斷段誊,看是否屬于同一個會話

? ? val rdd3 = rdd2.mapPartitionsWithIndex((index, it) => {

? ? ? val list = it.toList

? ? ? //當(dāng)前l(fā)ist中的數(shù)據(jù):

? ? ? //List(

? ? ? //(1001_1599704481000,UserAnalysis(1001,1599704481000,2020-09-10 10:21:21,home.html,2837d947-9a35-44b6-a90d-69f1c03f25d3,1)),

? ? ? //(1001_1599704890000,UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,249c5312-4f34-48db-9ed8-0de6c2348e69,1)),

? ? ? //(1001_1599705305000,UserAnalysis(1001,1599705305000,2020-09-10 10:35:05,good_detail.html,618b4ff1-63f7-499c-9757-d7091ad8aa47,1)),

? ? ? //(1001_1599705775000,UserAnalysis(1001,1599705775000,2020-09-10 10:42:55,cart.html,0571dccb-0904-49b9-ab31-2e0418b44c34,1)),

? ? ? //(1001_1599705835000,UserAnalysis(1001,1599705835000,2020-09-10 10:43:55,11.html,7a7e5d8f-a04e-44ec-8180-8f521cb4b3bd,1)),

? ? ? //(1001_1599705895000,UserAnalysis(1001,1599705895000,2020-09-10 10:44:55,22.html,e21f5d1f-5105-4ab4-a2b0-8068192d45d4,1)))

? ? ? //...

? ? ? // 滑窗

? ? ? val slidingList: Iterator[List[(String, UserAnalysis)]] = list.sliding(2)

? ? ? //slidingList中的數(shù)據(jù):

? ? ? //Iterator(

? ? ? //? ? List( (1001_1599704481000,UserAnalysis(1001,1599704481000,2020-09-10 10:21:21,home.html,2837d947-9a35-44b6-a90d-69f1c03f25d3,1)) ,(1001_1599704890000,UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,249c5312-4f34-48db-9ed8-0de6c2348e69,1)) )

? ? ? //? ...

? ? ? // )

? ? ? slidingList.foreach(x => {

? ? ? ? //取出窗口中的第一個數(shù)據(jù)中的對象

? ? ? ? val before = x.head._2

? ? ? ? //取出窗口中的第二個數(shù)據(jù)中的對象

? ? ? ? val next = x.last._2

? ? ? ? //判斷--如果是同一個用戶,并且時間在半小時內(nèi)就屬于同一個會話

? ? ? ? if (next.userid == before.userid && next.time - before.time <= 30 * 60 * 1000) {

? ? ? ? ? //修改session和step

? ? ? ? ? next.session = before.session

? ? ? ? ? next.step = before.step + 1

? ? ? ? }

? ? ? })

? ? ? //此時分區(qū)內(nèi)有軌跡順序正確闰蚕,但是分區(qū)間的軌跡還有問題--通過累加器解決

? ? ? // 使用集合累加器記錄每個分區(qū)的第一條數(shù)據(jù)和最后一條數(shù)據(jù)

? ? ? // 目的是用第一條數(shù)據(jù)和上一個分區(qū)的最后一條數(shù)據(jù)比較,判斷是否為同一會話连舍,如果是就修改成相同的session没陡,并且這個分區(qū)的第一條數(shù)據(jù)的step+1

? ? ? val head = list.head //第一條數(shù)據(jù)

? ? ? val last = list.last //最后一條數(shù)據(jù)

? ? ? //將本分區(qū)第一條與最后一條數(shù)據(jù)放入累加器--通過action算子觸發(fā)

? ? ? acc.add((s"${index}#head", head._2))

? ? ? acc.add((s"${index}#last", last._2))

? ? ? list.iterator

? ? })

? ? //后續(xù)rdd3可能會在多個job中使用,所以緩存一下

? ? rdd3.cache()

? ? rdd3.collect()

? ? //獲取累加器結(jié)果--獲取的結(jié)果是java的List索赏,不能toMap盼玄,所以需要導(dǎo)入以下內(nèi)容,將java的集合轉(zhuǎn)成scala的集合潜腻,也可將scala的集合轉(zhuǎn)成java的集合

? ? import scala.collection.JavaConverters._

? ? val userMap = acc.value.asScala.toMap

? ? //根據(jù)分區(qū)號遍歷--0號分區(qū)不用處理埃儿,因?yàn)?號分區(qū)里的都是同一個會話

? ? for (i <- 1 until (userMap.size / 2)) {

? ? ? //獲取前一個分區(qū)的最后一條數(shù)據(jù)

? ? ? val beforePartitonLast = userMap.get(s"${i - 1}#last").get

? ? ? //獲取當(dāng)前分區(qū)的第一條數(shù)據(jù)

? ? ? val currentPartitionHead = userMap.get(s"${i}#head").get

? ? ? //獲取當(dāng)前分區(qū)的最后一條數(shù)據(jù)

? ? ? val currentPartitionLast = userMap.get(s"${i}#last").get

? ? ? //判斷當(dāng)前分區(qū)第一條與前一個分區(qū)的最后一條數(shù)據(jù)是否是同一個session,如果是則同步修改session和step

? ? ? if (currentPartitionHead.userid == beforePartitonLast.userid && currentPartitionHead.time - beforePartitonLast.time <= 30 * 60 * 1000) {

? ? ? ? //注意同一個會話可能跨分區(qū)融涣,所以要先判斷當(dāng)前分區(qū)的最后一條數(shù)據(jù)和當(dāng)前分區(qū)的第一條數(shù)據(jù)是否為同一session,如果是則同步修改session和step

? ? ? ? if (currentPartitionLast.session == currentPartitionHead.session) {

? ? ? ? ? currentPartitionLast.session = currentPartitionHead.session

? ? ? ? ? currentPartitionLast.step = beforePartitonLast.step + currentPartitionLast.step

? ? ? ? }

? ? ? ? currentPartitionHead.session = beforePartitonLast.session

? ? ? ? currentPartitionHead.step = beforePartitonLast.step + 1

? ? ? ? //0號分區(qū)

? ? ? ? //...

? ? ? ? //(1001_1599705895000,UserAnalysis(1001,1599705895000,2020-09-10 10:44:55,22.html,2826f01e-1294-429b-90e4-877439fc345a,6))

? ? ? ? //1號分區(qū)

? ? ? ? //(1001_1599705955000,UserAnalysis(1001,1599705955000,2020-09-10 10:45:55,33.html,5865170d-bb4c-439a-b0aa-f0a10d307bc7,1))

? ? ? ? //此處的session應(yīng)該等于0號分區(qū)的最后一條數(shù)據(jù)的session蝌箍,step=0號分區(qū)Last.step+1=7

? ? ? ? //...

? ? ? ? //(1001_1599708921000,UserAnalysis(1001,1599708921000,2020-09-10 11:35:21,home.html,7c60b41c-0b06-4bfe-ad80-29fc75ce1d24,1))

? ? ? ? //2號分區(qū)

? ? ? ? //(1001_1599708970000,UserAnalysis(1001,1599708970000,2020-09-10 11:36:10,cart.html,9076e207-4ad4-4aa2-ab3a-ef966d23f09a,1))

? ? ? ? //此處的session應(yīng)該等于1號分區(qū)的最后一條數(shù)據(jù)的session,step=1號分區(qū)Last.step+1=2

? ? ? ? //...

? ? ? ? //(1001_1599709272000,UserAnalysis(1001,1599709272000,2020-09-10 11:41:12,cc.html,9076e207-4ad4-4aa2-ab3a-ef966d23f09a,6))

? ? ? ? //該分區(qū)的最后一條數(shù)據(jù)和第一條數(shù)據(jù)是同一個會話暴心,所以此處的session也應(yīng)該等于1號分區(qū)的最后一條數(shù)據(jù)的session妓盲,step=1號分區(qū)Last.step+當(dāng)前分區(qū)的Last.step=7

? ? ? ? //3號分區(qū)

? ? ? ? //(1001_1599709332000,UserAnalysis(1001,1599709332000,2020-09-10 11:42:12,dd.html,8c793999-5d8c-469c-b6c7-689d592e9713,1))

? ? ? ? //此處的session應(yīng)該等于2號分區(qū)的最后一條數(shù)據(jù)的session,step=1號分區(qū)Last.step+1=8

? ? ? ? //...

? ? ? ? //(1001_1599709632000,UserAnalysis(1001,1599709632000,2020-09-10 11:47:12,ll.html,8c793999-5d8c-469c-b6c7-689d592e9713,6))

? ? ? ? //該分區(qū)的最后一條數(shù)據(jù)和第一條數(shù)據(jù)是同一個會話专普,所以此處的session也應(yīng)該等于2號分區(qū)的最后一條數(shù)據(jù)的session悯衬,step=2號分區(qū)Last.step+當(dāng)前分區(qū)的Last.step=13

? ? ? }

? ? }

? ? //userMap的結(jié)果:

? ? //1號分區(qū): (UserAnalysis(1001,1599708921000,2020-09-10 11:35:21,home.html,e48d4e68-dbd0-4f56-af9e-8ca55b1bfacc,1))

? ? //1號分區(qū): (UserAnalysis(1001,1599705955000,2020-09-10 10:45:55,33.html,81e0175d-7422-41e1-a20b-54e447bbb809,7))

? ? //...

? ? //2號分區(qū):(UserAnalysis(1001,1599708970000,2020-09-10 11:36:10,cart.html,e48d4e68-dbd0-4f56-af9e-8ca55b1bfacc,2))

? ? //2號分區(qū): (UserAnalysis(1001,1599709272000,2020-09-10 11:41:12,cc.html,978710f3-0a61-4a40-8e7c-61e7b0b1a2cb,7))

? ? //...

? ? //3號分區(qū): (UserAnalysis(1001,1599709332000,2020-09-10 11:42:12,dd.html,978710f3-0a61-4a40-8e7c-61e7b0b1a2cb,8))

? ? //3號分區(qū):(UserAnalysis(1001,1599709632000,2020-09-10 11:47:12,ll.html,225b0961-1603-4e31-a84a-454d0025e79c,13))

? ? //此時得到的結(jié)果中,每個分區(qū)內(nèi)的第一條和最后一條的session和step已經(jīng)修正檀夹,但分區(qū)內(nèi)的其他數(shù)據(jù)的還未修正

? ? //根據(jù)修復(fù)過的每個分區(qū)的第一條和最后一條數(shù)據(jù)修復(fù)分區(qū)內(nèi)所有數(shù)據(jù)--將userMap廣播出去

? ? //廣播變量

? ? val bc = spark.sparkContext.broadcast(userMap)

? ? val rdd4 = rdd3.mapPartitionsWithIndex((index, it) => {

? ? ? val list = it.toList

? ? ? //獲取當(dāng)前分區(qū)第一條數(shù)據(jù)

? ? ? val currentPartitionUser = list.head._2

? ? ? //從廣播變量中取出修復(fù)過的當(dāng)前分區(qū)的第一條數(shù)據(jù)

? ? ? val repairCurrentPartitionUser = bc.value.get(s"${index}#head").get

? ? ? //取出當(dāng)前分區(qū)原來數(shù)據(jù)的session

? ? ? val oldSession = currentPartitionUser.session

? ? ? //廣播變量中的數(shù)據(jù)是被修復(fù)過的筋粗,所以如果修復(fù)過的數(shù)據(jù)不等于當(dāng)前分區(qū)原來的數(shù)據(jù),就說明確實(shí)被修復(fù)過了

? ? ? if (repairCurrentPartitionUser.session != currentPartitionUser.session) {

? ? ? ? //過濾出還沒有被修復(fù)的數(shù)據(jù)

? ? ? ? list.filter(x => x._2.session == oldSession).foreach(x => {

? ? ? ? ? x._2.session = repairCurrentPartitionUser.session

? ? ? ? ? x._2.step = repairCurrentPartitionUser.step + x._2.step - 1

? ? ? ? })

? ? ? }

? ? ? list.foreach(x => println(s"index = ${index} ${x._2}"))

? ? ? list.iterator

? ? })

? ? rdd4.collect()

? }

}

//定義樣例類

case class UserAnalysis(userid: String, time: Long, timestr: String, page: String, var session: String = UUID.randomUUID().toString, var step: Int = 1)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末炸渡,一起剝皮案震驚了整個濱河市娜亿,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌蚌堵,老刑警劉巖买决,帶你破解...
    沈念sama閱讀 218,386評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異吼畏,居然都是意外死亡督赤,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,142評論 3 394
  • 文/潘曉璐 我一進(jìn)店門泻蚊,熙熙樓的掌柜王于貴愁眉苦臉地迎上來躲舌,“玉大人,你說我怎么就攤上這事性雄∶恍叮” “怎么了羹奉?”我有些...
    開封第一講書人閱讀 164,704評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長约计。 經(jīng)常有香客問我诀拭,道長,這世上最難降的妖魔是什么病蛉? 我笑而不...
    開封第一講書人閱讀 58,702評論 1 294
  • 正文 為了忘掉前任炫加,我火速辦了婚禮瑰煎,結(jié)果婚禮上铺然,老公的妹妹穿的比我還像新娘。我一直安慰自己酒甸,他們只是感情好魄健,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,716評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著插勤,像睡著了一般沽瘦。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上农尖,一...
    開封第一講書人閱讀 51,573評論 1 305
  • 那天析恋,我揣著相機(jī)與錄音,去河邊找鬼盛卡。 笑死助隧,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的滑沧。 我是一名探鬼主播并村,決...
    沈念sama閱讀 40,314評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼滓技!你這毒婦竟也來了哩牍?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,230評論 0 276
  • 序言:老撾萬榮一對情侶失蹤令漂,失蹤者是張志新(化名)和其女友劉穎膝昆,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體叠必,經(jīng)...
    沈念sama閱讀 45,680評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡外潜,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,873評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了挠唆。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片处窥。...
    茶點(diǎn)故事閱讀 39,991評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖玄组,靈堂內(nèi)的尸體忽然破棺而出滔驾,到底是詐尸還是另有隱情谒麦,我是刑警寧澤,帶...
    沈念sama閱讀 35,706評論 5 346
  • 正文 年R本政府宣布哆致,位于F島的核電站绕德,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏摊阀。R本人自食惡果不足惜耻蛇,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,329評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望胞此。 院中可真熱鬧臣咖,春花似錦、人聲如沸漱牵。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,910評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽酣胀。三九已至刁赦,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間闻镶,已是汗流浹背甚脉。 一陣腳步聲響...
    開封第一講書人閱讀 33,038評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留铆农,地道東北人牺氨。 一個月前我還...
    沈念sama閱讀 48,158評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像顿涣,于是被迫代替她去往敵國和親波闹。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,941評論 2 355

推薦閱讀更多精彩內(nèi)容