代碼實(shí)現(xiàn)如下
val df = spark.sql("select id_map, id_num from dwd.dwd_ip_buyer_di where ds = '20220112'")
val idData: RDD[(Array[(String, String)], Int)] = df.rdd.map(line => {
val idMap: collection.Map[String, String] = line.getMap(0)
val userId = idMap.getOrElse("user_id", "")
val distinctId = idMap.getOrElse("distinct_id", "")
val deviceId = idMap.getOrElse("device_id", "")
val scDeviceId = idMap.getOrElse("sc_device_id", "")
val anonymousId = idMap.getOrElse("anonymous_id", "")
val idNum = line.getString(1).toInt
(Array(("userId", userId), ("distinctId", distinctId), ("deviceId", deviceId), ("scDeviceId", scDeviceId), ("anonymousId", anonymousId)).filter(e => StringUtils.isNotBlank(e._2) && !StringUtils.equals(e._2, "0000")), idNum)
})
// 緩存
idData.cache()
val vertices: RDD[(Long, (String, String))] = idData.flatMap(e => {
val arr = e._1
val idNum = e._2
// hashCode 對(duì)相同字符串,hashCode一致陆淀,因此添加前綴
for (id <- arr)yield ("%s%s".format(id._1, id._2).hashCode.toLong, (id._2, id._1))
})
// vertices.foreach(ele => println(ele._1 + " : " + ele._2))
vertices.cache()
val edges: RDD[Edge[String]] = idData.flatMap(e => {
val arr = e._1
val idNum = e._2
for (i <-0 to arr.length -2; j <- i +1 until arr.length )yield (Array("%s%s".format(arr(i)._1, arr(i)._2).hashCode.toLong, "%s%s".format(arr(j)._1, arr(j)._2).hashCode.toLong).sorted.mkString(": "), idNum)
})
.reduceByKey(_ + _)
.filter(tp => tp._2 >0)
.map(e => {
val arr = e._1.split(": ")
val srcId = arr(0).toLong
val dstId = arr(1).toLong
Edge(srcId, dstId, e._2.toString)
})
// 用點(diǎn)集合和邊集合構(gòu)造一張圖, 使用Graph算法
val graph =Graph(vertices, edges, ("00", "0000"))
//并調(diào)用最大連通子圖算法VertexRDD[VertexId] ==>rdd 里面裝的元組(Long值,組中最小值)
val res: VertexRDD[VertexId] = graph.connectedComponents().vertices
val oneIds = vertices.join(res).map {
case (id, (idProperties, cc)) => (cc, "%s: %s".format(idProperties._2, idProperties._1))
}
/*var guidArray = oneIds
.aggregateByKey(scala.collection.mutable.Set[String]())((agg: scala.collection.mutable.Set[String], e: String) => {
agg.add(e)
agg
}, (e1: scala.collection.mutable.Set[String], e2: scala.collection.mutable.Set[String]) => {
e1 ++ e2
}).collect()
*/
def combineAllIds(s1:String, s2:String):String = {
if (StringUtils.isBlank(s1) && StringUtils.isBlank(s2)) {
return ""
}
if (StringUtils.isBlank(s1)) {
return s2
}
if (StringUtils.isBlank(s2)) {
return s1
}
s"${s1}, ${s2}"
}
val guidArray = oneIds.reduceByKey(combineAllIds)
.map(e => {
val deDuplicate = e._2.split(", ").toSet
(e._1, deDuplicate, "20220112")
})
// print(guidArray.collect().mkString("\n"))
// guidArray.toDF()
val guidIdMappingDF = spark.createDataFrame(guidArray)
guidIdMappingDF
.toDF("guid", "id_mapping", "ds")
.write
.partitionBy("ds")
.mode(SaveMode.Overwrite)
.format("parquet")
// .option("compression", "zlib")
.saveAsTable("dwd.dwd_ip_buyer_id_mapping_df")