數(shù)據(jù):
1001,2020-09-10 10:21:21,home.html
1001,2020-09-10 10:28:10,good_list.html
1002,2020-09-10 09:40:00,home.html
1001,2020-09-10 10:35:05,good_detail.html
1002,2020-09-10 09:42:00,favor.html
1001,2020-09-10 10:42:55,cart.html
1001,2020-09-10 10:43:55,11.html
1001,2020-09-10 10:44:55,22.html
1001,2020-09-10 10:45:55,33.html
1001,2020-09-10 10:46:55,44.html
1001,2020-09-10 10:47:55,55.html
1001,2020-09-10 10:48:55,66.html
1001,2020-09-10 10:49:55,77.html
1002,2020-09-10 09:41:00,mine.html
1001,2020-09-10 11:35:21,home.html
1001,2020-09-10 11:36:10,cart.html
1003,2020-09-10 13:10:00,home.html
1001,2020-09-10 11:38:12,trade.html
1001,2020-09-10 11:39:12,aa.html
1001,2020-09-10 11:40:12,bb.html
1001,2020-09-10 11:41:12,cc.html
1001,2020-09-10 11:42:12,dd.html
1001,2020-09-10 11:43:12,ee.html
1001,2020-09-10 11:44:12,ff.html
1001,2020-09-10 11:45:12,gg.html
1001,2020-09-10 11:46:12,hh.html
1001,2020-09-10 11:47:12,ll.html
1001,2020-09-10 11:38:55,payment.html
1003,2020-09-10 13:15:00,search.html
需求:求得用戶每次會話的行為軌跡--解決數(shù)據(jù)傾斜
import java.text.SimpleDateFormat
import java.util
import java.util.UUID
import org.apache.spark.{Partition, RangePartitioner}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Dataset
object SessionTest2 {
? def main(args: Array[String]): Unit = {
? ? //需求:求得用戶每次會話的行為軌跡--解決數(shù)據(jù)傾斜
? ? import org.apache.spark.sql.SparkSession
? ? val spark = SparkSession.builder().master("local[4]").appName("test").getOrCreate()
? ? import spark.implicits._
? ? //1蹬竖、讀取數(shù)據(jù)
? ? val ds = spark.read.csv("datas/session2.txt").toDF("user_id", "page_time", "page").as[(String, String, String)]
? ? //獲取一個集合累加器
? ? val acc = spark.sparkContext.collectionAccumulator[(String, UserAnalysis)]("acc")
? ? //2、轉(zhuǎn)換數(shù)據(jù)類型--樣例類(轉(zhuǎn)成樣例類方便修改值)
? ? val ds2: Dataset[(String, UserAnalysis)] = ds.map {
? ? ? case (userid, timestr, page) =>
? ? ? ? //獲取時間戳
? ? ? ? val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
? ? ? ? val time = formatter.parse(timestr).getTime
? ? ? ? //獲得user對象
? ? ? ? val user = UserAnalysis(userid, time, timestr, page)
? ? ? ? //返回kv鍵值對--k為user_id和time或timestr拼接起來的字符串,方便后續(xù)分組排序
? ? ? ? (s"${userid}_${time}", user)
? ? }
? ? //3、轉(zhuǎn)成rdd
? ? val rdd = ds2.rdd
? ? //4、用RangePartitioner進(jìn)行重分區(qū)--得到的結(jié)果在每個分區(qū)內(nèi)是有序的魏割,在分區(qū)間也是有序的
? ? val rdd2: RDD[(String, UserAnalysis)] = rdd.repartitionAndSortWithinPartitions(new RangePartitioner(5, rdd))
? ? //repartitionAndSortWithinPartitions():根據(jù)給定的分區(qū)器對RDD進(jìn)行重新分區(qū),并在每個結(jié)果分區(qū)中根據(jù)key進(jìn)行排序。
? ? //? ? ? ? ? def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] ={...}
? ? //class RangePartitioner[K : Ordering : ClassTag, V](partitions: Int,rdd: RDD[_ <: Product2[K, V]],...)extends Partitioner {...}
? ? //RangePartitioner分區(qū)規(guī)則:首先對rdd采樣出 分區(qū)數(shù)-1 個key眨八,通過這些key確定 分區(qū)數(shù)個 邊界,
? ? //? ? ? ? 這些邊界就是每個分區(qū)的邊界左电,后續(xù)key會與每個分區(qū)的邊界對比廉侧,如果在范圍內(nèi)页响,則數(shù)據(jù)放入該分區(qū)
? ? //5、對每個分區(qū)中每個用戶的兩兩數(shù)據(jù)進(jìn)行判斷段誊,看是否屬于同一個會話
? ? val rdd3 = rdd2.mapPartitionsWithIndex((index, it) => {
? ? ? val list = it.toList
? ? ? //當(dāng)前l(fā)ist中的數(shù)據(jù):
? ? ? //List(
? ? ? //(1001_1599704481000,UserAnalysis(1001,1599704481000,2020-09-10 10:21:21,home.html,2837d947-9a35-44b6-a90d-69f1c03f25d3,1)),
? ? ? //(1001_1599704890000,UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,249c5312-4f34-48db-9ed8-0de6c2348e69,1)),
? ? ? //(1001_1599705305000,UserAnalysis(1001,1599705305000,2020-09-10 10:35:05,good_detail.html,618b4ff1-63f7-499c-9757-d7091ad8aa47,1)),
? ? ? //(1001_1599705775000,UserAnalysis(1001,1599705775000,2020-09-10 10:42:55,cart.html,0571dccb-0904-49b9-ab31-2e0418b44c34,1)),
? ? ? //(1001_1599705835000,UserAnalysis(1001,1599705835000,2020-09-10 10:43:55,11.html,7a7e5d8f-a04e-44ec-8180-8f521cb4b3bd,1)),
? ? ? //(1001_1599705895000,UserAnalysis(1001,1599705895000,2020-09-10 10:44:55,22.html,e21f5d1f-5105-4ab4-a2b0-8068192d45d4,1)))
? ? ? //...
? ? ? // 滑窗
? ? ? val slidingList: Iterator[List[(String, UserAnalysis)]] = list.sliding(2)
? ? ? //slidingList中的數(shù)據(jù):
? ? ? //Iterator(
? ? ? //? ? List( (1001_1599704481000,UserAnalysis(1001,1599704481000,2020-09-10 10:21:21,home.html,2837d947-9a35-44b6-a90d-69f1c03f25d3,1)) ,(1001_1599704890000,UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,249c5312-4f34-48db-9ed8-0de6c2348e69,1)) )
? ? ? //? ...
? ? ? // )
? ? ? slidingList.foreach(x => {
? ? ? ? //取出窗口中的第一個數(shù)據(jù)中的對象
? ? ? ? val before = x.head._2
? ? ? ? //取出窗口中的第二個數(shù)據(jù)中的對象
? ? ? ? val next = x.last._2
? ? ? ? //判斷--如果是同一個用戶,并且時間在半小時內(nèi)就屬于同一個會話
? ? ? ? if (next.userid == before.userid && next.time - before.time <= 30 * 60 * 1000) {
? ? ? ? ? //修改session和step
? ? ? ? ? next.session = before.session
? ? ? ? ? next.step = before.step + 1
? ? ? ? }
? ? ? })
? ? ? //此時分區(qū)內(nèi)有軌跡順序正確闰蚕,但是分區(qū)間的軌跡還有問題--通過累加器解決
? ? ? // 使用集合累加器記錄每個分區(qū)的第一條數(shù)據(jù)和最后一條數(shù)據(jù)
? ? ? // 目的是用第一條數(shù)據(jù)和上一個分區(qū)的最后一條數(shù)據(jù)比較,判斷是否為同一會話连舍,如果是就修改成相同的session没陡,并且這個分區(qū)的第一條數(shù)據(jù)的step+1
? ? ? val head = list.head //第一條數(shù)據(jù)
? ? ? val last = list.last //最后一條數(shù)據(jù)
? ? ? //將本分區(qū)第一條與最后一條數(shù)據(jù)放入累加器--通過action算子觸發(fā)
? ? ? acc.add((s"${index}#head", head._2))
? ? ? acc.add((s"${index}#last", last._2))
? ? ? list.iterator
? ? })
? ? //后續(xù)rdd3可能會在多個job中使用,所以緩存一下
? ? rdd3.cache()
? ? rdd3.collect()
? ? //獲取累加器結(jié)果--獲取的結(jié)果是java的List索赏,不能toMap盼玄,所以需要導(dǎo)入以下內(nèi)容,將java的集合轉(zhuǎn)成scala的集合潜腻,也可將scala的集合轉(zhuǎn)成java的集合
? ? import scala.collection.JavaConverters._
? ? val userMap = acc.value.asScala.toMap
? ? //根據(jù)分區(qū)號遍歷--0號分區(qū)不用處理埃儿,因?yàn)?號分區(qū)里的都是同一個會話
? ? for (i <- 1 until (userMap.size / 2)) {
? ? ? //獲取前一個分區(qū)的最后一條數(shù)據(jù)
? ? ? val beforePartitonLast = userMap.get(s"${i - 1}#last").get
? ? ? //獲取當(dāng)前分區(qū)的第一條數(shù)據(jù)
? ? ? val currentPartitionHead = userMap.get(s"${i}#head").get
? ? ? //獲取當(dāng)前分區(qū)的最后一條數(shù)據(jù)
? ? ? val currentPartitionLast = userMap.get(s"${i}#last").get
? ? ? //判斷當(dāng)前分區(qū)第一條與前一個分區(qū)的最后一條數(shù)據(jù)是否是同一個session,如果是則同步修改session和step
? ? ? if (currentPartitionHead.userid == beforePartitonLast.userid && currentPartitionHead.time - beforePartitonLast.time <= 30 * 60 * 1000) {
? ? ? ? //注意同一個會話可能跨分區(qū)融涣,所以要先判斷當(dāng)前分區(qū)的最后一條數(shù)據(jù)和當(dāng)前分區(qū)的第一條數(shù)據(jù)是否為同一session,如果是則同步修改session和step
? ? ? ? if (currentPartitionLast.session == currentPartitionHead.session) {
? ? ? ? ? currentPartitionLast.session = currentPartitionHead.session
? ? ? ? ? currentPartitionLast.step = beforePartitonLast.step + currentPartitionLast.step
? ? ? ? }
? ? ? ? currentPartitionHead.session = beforePartitonLast.session
? ? ? ? currentPartitionHead.step = beforePartitonLast.step + 1
? ? ? ? //0號分區(qū)
? ? ? ? //...
? ? ? ? //(1001_1599705895000,UserAnalysis(1001,1599705895000,2020-09-10 10:44:55,22.html,2826f01e-1294-429b-90e4-877439fc345a,6))
? ? ? ? //1號分區(qū)
? ? ? ? //(1001_1599705955000,UserAnalysis(1001,1599705955000,2020-09-10 10:45:55,33.html,5865170d-bb4c-439a-b0aa-f0a10d307bc7,1))
? ? ? ? //此處的session應(yīng)該等于0號分區(qū)的最后一條數(shù)據(jù)的session蝌箍,step=0號分區(qū)Last.step+1=7
? ? ? ? //...
? ? ? ? //(1001_1599708921000,UserAnalysis(1001,1599708921000,2020-09-10 11:35:21,home.html,7c60b41c-0b06-4bfe-ad80-29fc75ce1d24,1))
? ? ? ? //2號分區(qū)
? ? ? ? //(1001_1599708970000,UserAnalysis(1001,1599708970000,2020-09-10 11:36:10,cart.html,9076e207-4ad4-4aa2-ab3a-ef966d23f09a,1))
? ? ? ? //此處的session應(yīng)該等于1號分區(qū)的最后一條數(shù)據(jù)的session,step=1號分區(qū)Last.step+1=2
? ? ? ? //...
? ? ? ? //(1001_1599709272000,UserAnalysis(1001,1599709272000,2020-09-10 11:41:12,cc.html,9076e207-4ad4-4aa2-ab3a-ef966d23f09a,6))
? ? ? ? //該分區(qū)的最后一條數(shù)據(jù)和第一條數(shù)據(jù)是同一個會話暴心,所以此處的session也應(yīng)該等于1號分區(qū)的最后一條數(shù)據(jù)的session妓盲,step=1號分區(qū)Last.step+當(dāng)前分區(qū)的Last.step=7
? ? ? ? //3號分區(qū)
? ? ? ? //(1001_1599709332000,UserAnalysis(1001,1599709332000,2020-09-10 11:42:12,dd.html,8c793999-5d8c-469c-b6c7-689d592e9713,1))
? ? ? ? //此處的session應(yīng)該等于2號分區(qū)的最后一條數(shù)據(jù)的session,step=1號分區(qū)Last.step+1=8
? ? ? ? //...
? ? ? ? //(1001_1599709632000,UserAnalysis(1001,1599709632000,2020-09-10 11:47:12,ll.html,8c793999-5d8c-469c-b6c7-689d592e9713,6))
? ? ? ? //該分區(qū)的最后一條數(shù)據(jù)和第一條數(shù)據(jù)是同一個會話专普,所以此處的session也應(yīng)該等于2號分區(qū)的最后一條數(shù)據(jù)的session悯衬,step=2號分區(qū)Last.step+當(dāng)前分區(qū)的Last.step=13
? ? ? }
? ? }
? ? //userMap的結(jié)果:
? ? //1號分區(qū): (UserAnalysis(1001,1599708921000,2020-09-10 11:35:21,home.html,e48d4e68-dbd0-4f56-af9e-8ca55b1bfacc,1))
? ? //1號分區(qū): (UserAnalysis(1001,1599705955000,2020-09-10 10:45:55,33.html,81e0175d-7422-41e1-a20b-54e447bbb809,7))
? ? //...
? ? //2號分區(qū):(UserAnalysis(1001,1599708970000,2020-09-10 11:36:10,cart.html,e48d4e68-dbd0-4f56-af9e-8ca55b1bfacc,2))
? ? //2號分區(qū): (UserAnalysis(1001,1599709272000,2020-09-10 11:41:12,cc.html,978710f3-0a61-4a40-8e7c-61e7b0b1a2cb,7))
? ? //...
? ? //3號分區(qū): (UserAnalysis(1001,1599709332000,2020-09-10 11:42:12,dd.html,978710f3-0a61-4a40-8e7c-61e7b0b1a2cb,8))
? ? //3號分區(qū):(UserAnalysis(1001,1599709632000,2020-09-10 11:47:12,ll.html,225b0961-1603-4e31-a84a-454d0025e79c,13))
? ? //此時得到的結(jié)果中,每個分區(qū)內(nèi)的第一條和最后一條的session和step已經(jīng)修正檀夹,但分區(qū)內(nèi)的其他數(shù)據(jù)的還未修正
? ? //根據(jù)修復(fù)過的每個分區(qū)的第一條和最后一條數(shù)據(jù)修復(fù)分區(qū)內(nèi)所有數(shù)據(jù)--將userMap廣播出去
? ? //廣播變量
? ? val bc = spark.sparkContext.broadcast(userMap)
? ? val rdd4 = rdd3.mapPartitionsWithIndex((index, it) => {
? ? ? val list = it.toList
? ? ? //獲取當(dāng)前分區(qū)第一條數(shù)據(jù)
? ? ? val currentPartitionUser = list.head._2
? ? ? //從廣播變量中取出修復(fù)過的當(dāng)前分區(qū)的第一條數(shù)據(jù)
? ? ? val repairCurrentPartitionUser = bc.value.get(s"${index}#head").get
? ? ? //取出當(dāng)前分區(qū)原來數(shù)據(jù)的session
? ? ? val oldSession = currentPartitionUser.session
? ? ? //廣播變量中的數(shù)據(jù)是被修復(fù)過的筋粗,所以如果修復(fù)過的數(shù)據(jù)不等于當(dāng)前分區(qū)原來的數(shù)據(jù),就說明確實(shí)被修復(fù)過了
? ? ? if (repairCurrentPartitionUser.session != currentPartitionUser.session) {
? ? ? ? //過濾出還沒有被修復(fù)的數(shù)據(jù)
? ? ? ? list.filter(x => x._2.session == oldSession).foreach(x => {
? ? ? ? ? x._2.session = repairCurrentPartitionUser.session
? ? ? ? ? x._2.step = repairCurrentPartitionUser.step + x._2.step - 1
? ? ? ? })
? ? ? }
? ? ? list.foreach(x => println(s"index = ${index} ${x._2}"))
? ? ? list.iterator
? ? })
? ? rdd4.collect()
? }
}
//定義樣例類
case class UserAnalysis(userid: String, time: Long, timestr: String, page: String, var session: String = UUID.randomUUID().toString, var step: Int = 1)