Spark GraphX概述
GraphX是Spark的一個組件,專門用來表示圖以及進(jìn)行圖的并行計算孵户。GraphX通過重新定義了圖的抽象概念來拓展了RDD:定向多圖萧朝,其屬性附加到每個頂點和邊。為了支持圖計算夏哭,GraphX公開了一系列基本運(yùn)算符(比如:mapVertices检柬、mapEdges、subgraph)已經(jīng)優(yōu)化后的Pregel API變種竖配。此外何址,還包含越來越多的圖計算和構(gòu)建器,以簡化圖形分析任務(wù)进胯。GraphX在圖頂點信息和邊信息存儲上做了優(yōu)化头朱,使得圖計算框架性能相對于原生RDD是想較大的提升,接近或達(dá)到GraphLab等專業(yè)圖計算平臺性能龄减。GraphX最大的貢獻(xiàn)是,在Spark之上提供一站式數(shù)據(jù)解決方案班眯,可以方便且高效地完成圖計算的一整套流水作業(yè)希停。
圖的相關(guān)術(shù)語
圖是一種較線性表和樹更為復(fù)雜的數(shù)據(jù)結(jié)構(gòu),圖表達(dá)的是多對多的關(guān)系署隘。
如下圖所示宠能,G1是一個簡單的圖,其中V1磁餐、V2违崇、V3阿弃、V4被稱為頂點(Vertex),任意兩個頂點之間的通路被稱為邊(Edge)羞延,它可以由(V1渣淳,V2)有序?qū)肀硎荆@時稱G1位有向圖伴箩,意味著邊是有方向的入愧,若以無序?qū)肀硎疽粭l表,則該圖為無向圖嗤谚,如G2
在G1中棺蛛,與頂點相關(guān)聯(lián)的邊的數(shù)量,被稱為頂點的度(Degree)巩步。其中旁赊,以頂點為起點的邊的數(shù)量被稱為該訂單的出度(OutDegree),以頂點為終點的邊的數(shù)量被稱為該頂點的入度(InDegree)椅野。
以G1的V1舉例终畅,V1的度是3,啟動出度為2鳄橘,入度為1声离。在無向圖G2中,如過任意兩個頂點之間是聯(lián)通的瘫怜,則稱G2為連通圖(Connected Graph)术徊。在有向圖G1中,如果任意兩個訂單Vm鲸湃、Vn且m != n,從Vm到Vn以及Vn到Vm之間都存在通路赠涮,則稱G1為強(qiáng)連通圖(Strongly Conneted Graph)。如果任意兩個頂點之間若存在通路暗挑,則稱為路徑(Path)笋除,用一個頂點序列表示,若第一個頂點和最后一個頂點相同,則稱為回路或者環(huán)(Cycle)
圖數(shù)據(jù)庫與圖計算
Neo4j 是一個比較老牌的開源圖數(shù)據(jù)庫,目前在業(yè)界的使用也較為廣泛瓤狐,它提供了一種簡單易學(xué)的查詢語言 Cypher实胸。Neo4j 支持交互式查詢,查詢效率很高。能夠迅速從整網(wǎng)中找出符合特定模式的子網(wǎng),供隨后分析之用,適用于OLTP 場景酱吝。
Neo4j 是圖數(shù)據(jù)庫,偏向于存儲和查詢土思。能存儲關(guān)聯(lián)關(guān)系比較復(fù)雜务热,實體之間的連接豐富忆嗜。比如社交網(wǎng)絡(luò)、知識圖譜崎岂、金融風(fēng)控等領(lǐng)域的數(shù)據(jù)捆毫。擅長從某個點或某些點出發(fā),根據(jù)特定條件在復(fù)雜的關(guān)聯(lián)關(guān)系中找到目標(biāo)點或邊该镣。如在社交網(wǎng)絡(luò)中找到某個點三步以內(nèi)能認(rèn)識的人冻璃,這些人可以認(rèn)為是潛在朋友。數(shù)據(jù)量限定在一定范圍內(nèi)损合,能短時完成的查詢就是所謂的OLTP操作省艳。
Neo4j 查詢與插入速度較快,沒有分布式版本嫁审,容量有限跋炕,而且一旦圖變得非常大, 如數(shù)十億頂點律适,數(shù)百億邊辐烂,查詢速度將變得緩慢。Neo4j 分為社區(qū)版和企業(yè)版捂贿,企業(yè)版有一些高級功能纠修,需要授權(quán),價格昂貴厂僧。
比較復(fù)雜的分析和算法扣草,如基于圖的聚類,PageRank 算法等颜屠,這類計算任務(wù)對于圖數(shù)據(jù)庫來說就很難勝任了辰妙,主要由一些圖挖掘技術(shù)來負(fù)責(zé)。
Pregel 是 Google 于 2010 年在 SIGMOD 會議上發(fā)表的《Pregel: A System for Large-Scale Graph Processing》論文中提到的海量并行圖挖掘的抽象框架甫窟,Pregel 與 Dremel 一樣密浑,是 Google 新三駕馬車之一,它基于 BSP 模型(Bulk Synchronous Parallel粗井,整體同步并行計算模型)尔破,將計算分為若干個超步(super step),在超步內(nèi)浇衬,通過消息來傳播頂點之間的狀態(tài)懒构。Pregel 可以看成是同步計 算,即等所有頂點完成處理后再進(jìn)行下一輪的超步径玖,Spark 基于 Pregel 論文實現(xiàn)的 海量并行圖挖掘框架 GraphX。
圖計算模式
目前基于圖的并行計算框架已經(jīng)有很多颤介,比如來自Google的Pregel梳星、來自Apache開源的圖計算框架Giraph / HAMA以及最為著名的GraphLab赞赖,其中Pregel、HAMA和 Giraph都是非常類似的冤灾,都是基于BSP模式前域。
BSP即整體同步并行,它將計算分成一系列超步的迭代韵吨。從縱向上看匿垄,它是一個串行模式,而從橫向上看归粉,它是一個并行的模式椿疗,每兩個超步之間設(shè)置一個柵欄 (barrier),即整體同步點糠悼,確定所有并行的計算都完成后再啟動下一輪超步届榄。
每一個超步包含三部分內(nèi)容:
- 計算 cumpute:每一個Processor利用上一個超步傳過來的消息和本地的數(shù)據(jù)進(jìn)行本地計算
- 消息傳遞:每一個Processor計算完畢后,將消息傳遞給與之關(guān)聯(lián)的其他Processors
- 整體同步點:用整體同步倔喂,確定所有的計算和消息傳遞都進(jìn)行完畢后铝条,進(jìn)入下一個超步
Spark GraphX 基礎(chǔ)
架構(gòu)
存儲模式
核心數(shù)據(jù)結(jié)構(gòu)
GraphX 與 Spark 其他組件相比相對獨立,擁有自己的核心數(shù)據(jù)結(jié)構(gòu)與算子席噩。
GraphX架構(gòu)
GraphX的整體架構(gòu)可以分為三個部分:
- 算法層:基于Pregel接口實現(xiàn)了常用的圖算法班缰,包括PageRank、SVDPlusPlus悼枢、TriangeleCount埠忘、ConnectedComponents、StonglyConnectedConponents等算法
- 接口層:在底層RDD基礎(chǔ)之上實現(xiàn)了Pregel模型BSP模式的計算接口
- 底層:圖計算的核心類萧芙,包含:VertexRDD给梅、EdgeRDD、RDD[EdgeTriplet]
存儲模式
巨型圖的存儲總體上有邊分割和點分割兩種存儲方式双揪。2013年动羽,GraphLab2.0將其 存儲方式由邊分割變?yōu)辄c分割,在性能上取得重大提升渔期,目前基本上被業(yè)界廣泛接受 并使用运吓。
- 邊分割(Edge-Cut):每個頂點存儲一次,但有的邊會被打斷分到兩臺機(jī)器上疯趟。這樣做的好處就是節(jié)省存儲空間拘哨;壞處是對圖進(jìn)行基于邊計算時,對于一條兩個頂點被分到不同的機(jī)器上的邊來說信峻,需要跨機(jī)器傳輸數(shù)據(jù)倦青,內(nèi)網(wǎng)通信流量大
- 點分割(Vertex-Cut):每條邊只存儲一次,都會出現(xiàn)一臺機(jī)器上盹舞,鄰居多的點會被復(fù)制到多臺機(jī)器上产镐,增加了存儲開銷隘庄,同時會引發(fā)數(shù)據(jù)同步問題。好處是可以大幅減少內(nèi)網(wǎng)通信量癣亚。
雖然兩種方法互有利弊丑掺,但現(xiàn)在是點分割占上風(fēng),各種分布式圖計算框架都將自己底層的存儲形式變成了點分割述雾。主要原因有以下兩個:
- 磁盤價格下降街州,存儲空間不再是問題,而內(nèi)網(wǎng)的通信資源沒有突破性進(jìn)展玻孟,集群計算時內(nèi)網(wǎng)帶寬是寶貴的唆缴,時間比磁盤更珍貴。這點就類似于常見的空間換時間的策略;
- 在當(dāng)前的應(yīng)用場景中取募,絕大多數(shù)網(wǎng)絡(luò)都是“無尺度網(wǎng)絡(luò)”琐谤,遵循冪律分布,不同點的鄰居數(shù)量相差非常懸殊玩敏。而邊分割會使那些多鄰居的點所相連的邊大多數(shù)被分到不同的機(jī)器上斗忌,這樣的數(shù)據(jù)分布會使得內(nèi)網(wǎng)帶寬更加捉襟見肘,于是邊分割存儲方式被漸漸拋棄了;
核心數(shù)據(jù)結(jié)構(gòu)
核心數(shù)據(jù)結(jié)構(gòu)包括:graph旺聚、vertices织阳、edges、triplets
GraphX API 的開發(fā)語言目前僅支持 Scala砰粹。GraphX 的核心數(shù)據(jù)結(jié)構(gòu) Graph 由 RDD 封裝而成唧躲。
Graph
GraphX用屬性圖的方法表示圖,頂點有屬性碱璃,邊有屬性弄痹。存儲結(jié)構(gòu)采用邊集數(shù)據(jù)的形式,即一個頂點表嵌器,一個邊表肛真,如下圖所示:
頂點ID是非常重要的字段,他不光是頂點的唯一標(biāo)識符爽航,也是描述邊的唯一手段蚓让,訂單表與邊表實際上就是RDD,他們分別是VertexRDD與EdgeRDD讥珍。在Spark的源碼中历极,Graph類如下:
- vertices為頂點表,VD為訂單屬性類型
- edges為邊表衷佃,ED為邊屬性類型
- 可以通過Graph的vertices與edges成員直接得到頂點RDD與邊RDD
- 頂點RDD類型是VerticeRDD趟卸,繼承自RDD[(VertexId,VD)]
- 邊RDD類型為EdgeRDD,繼承自RDD[Edge[ED]]
vertices
vertices對應(yīng)著名為 VertexRDD 的RDD。這個RDD由頂點id和頂點屬性兩個成員變量锄列。
VertexRDD繼承自 RDD[(VertexId, VD)]新蟆,這里VertexId表示頂點id,VD表示頂點所 帶的屬性的類別右蕊。
VertexId實際上是一個Long類型
edges
edges對應(yīng)著EdgeRDD。這個RDD擁有三個成員變量吮螺,分別是源頂點id饶囚、目標(biāo)頂點id以及邊屬性。
Edge代表邊鸠补,由 源頂點id萝风、目標(biāo)頂點id、以及邊的屬性構(gòu)成紫岩。
triplets
triplets 表示邊點三元組规惰,如下圖所示(其中圓柱形分別代表頂點屬性與邊屬性):
通過 triplets 成員,用戶可以直接獲取到起點頂點泉蝌、起點頂點屬性歇万、終點頂點、終點頂點屬性勋陪、邊與邊屬性信息贪磺。triplets 的生成可以由邊表與頂點表通過 ScrId 與 DstId 連接而成。
triplets對應(yīng)著EdgeTriplet诅愚。它是一個三元組視圖寒锚,這個視圖邏輯上將頂點和邊的屬 性保存為一個RDD[EdgeTriplet[VD, ED]]。
Spark GraphX 計算
圖的定義
屬性操作
轉(zhuǎn)換操作
結(jié)構(gòu)操作
關(guān)聯(lián)操作
聚合操作
Pregel API
引入依賴:
<!-- graphx -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
案例一:圖的基本操作
找到 出度=入度 的人員\找出5到各頂點的最短距離 即鏈接操作與聚合操作需要重新聽課
package com.hhb.spark.graphx
import org.apache.spark.graphx.{Edge, Graph, VertexId, VertexRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.reflect.ClassTag
/**
* @description:
* @author: huanghongbo
* @date: 2020-11-25 14:25
**/
object GraphXExample1 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("warn")
//定義頂點的數(shù)據(jù)(訂單违孝,信息)
val vertexArr: Array[(VertexId, (String, Int))] = Array(
(1L, ("Alice", 28)),
(2L, ("Bob", 27)),
(3L, ("Charlie", 65)),
(4L, ("David", 42)),
(5L, ("Ed", 55)),
(6L, ("Fran", 50)))
//定義邊的數(shù)據(jù)(起點刹前,終點,邊屬性)
val edgeArray: Array[Edge[Int]] = Array(
Edge(2L, 1L, 7),
Edge(2L, 4L, 2),
Edge(3L, 2L, 4),
Edge(3L, 6L, 6),
Edge(4L, 1L, 1),
Edge(5L, 2L, 2),
Edge(5L, 3L, 8),
Edge(5L, 6L, 3)
)
//根據(jù)頂點數(shù)據(jù)生成RDD
val vertexRDD = sc.makeRDD(vertexArr)
//根據(jù)邊數(shù)據(jù)生成RDD
val edgeRDD = sc.makeRDD(edgeArray)
//生成圖信息
val graph: Graph[(String, Int), Int] = Graph.apply(vertexRDD, edgeRDD)
println("=======================找出圖中所有年年齡大于30的頂點============================")
//屬性操作示例雌桑,
//找出圖中所有年年齡大于30的頂點
//所有頂點的屬性信息
// val vertices: VertexRDD[(String, Int)] = graph.vertices
// filter(pred: Tuple2[VertexId, VD] => Boolean)
graph.vertices
.filter { case (_, (_, point)) => point > 30 }
.foreach(println)
println("=======================找出圖中屬性大于5的邊============================")
//找出圖中屬性大于5的邊
// val edges: EdgeRDD[Int] = graph.edges
graph.edges
.filter(_.attr > 5)
.foreach(println)
println("=======================列出邊屬性>5的tripltes============================")
//列出邊屬性>5的tripltes
// val triplets: RDD[EdgeTriplet[(String, Int), Int]] = graph.triplets
graph.triplets
.filter(_.attr > 5)
.foreach(println)
//屬性操作喇喉,degress操作
//找出圖中最大的出度、入度筹燕、度數(shù)
println("=======================找出圖中最大的出度============================")
//所有的
// val degrees: VertexRDD[Int] = graph.outDegrees
val outMaxDegrees: (VertexId, Int) = graph.outDegrees
.reduce((x, y) => if (x._2 > y._2) x else y)
println(outMaxDegrees)
println("=======================找出圖中最大的入度============================")
val inMaxDegrees = graph.inDegrees
.reduce((x, y) => if (x._2 > y._2) x else y)
println(inMaxDegrees)
println("=======================找出圖中最大的度數(shù)============================")
val maxDegrees: (VertexId, Int) = graph.degrees
.reduce((x, y) => if (x._2 > y._2) x else y)
println(maxDegrees)
// 轉(zhuǎn)換操作
println("=======================頂點的轉(zhuǎn)換操作轧飞。所有人的年齡加 10============================")
// 頂點的轉(zhuǎn)換操作。所有人的年齡加 10
graph.vertices
.map { case (id, (name, age)) => (id, (name, age + 10)) }
.foreach(println)
println("=======================頂點的轉(zhuǎn)換操作撒踪。所有人的年齡加 10============================")
graph.mapVertices { case (id, (name, age)) => (id, (name, age + 10)) }.vertices.foreach(println)
// 邊的轉(zhuǎn)換操作过咬。邊的屬性*2
println("=======================邊的轉(zhuǎn)換操作。邊的屬性*2============================")
graph.edges
.map(x => x.attr * 2)
.foreach(println(_))
// 結(jié)構(gòu)操作
// 頂點年齡 > 30 的子圖
// def subgraph(
// epred: EdgeTriplet[VD, ED] => Boolean = (x => true),
// vpred: (VertexId, VD) => Boolean = ((v, d) => true))
// : Graph[VD, ED]
println("=======================頂點年齡 > 30 的子圖============================")
graph.subgraph(vpred = (_, VD) => VD._2 > 30).triplets.foreach(println(_))
//找出出度 == 入度的人員制妄,鏈接操作掸绞,思路:圖 + 頂點的出度+頂點的入度
println("=======================找出出度 == 入度的人員============================")
//創(chuàng)建一個新圖,頂點VD的數(shù)據(jù)類型為User,并從graph做類型轉(zhuǎn)換
val g: Graph[User, Int] = graph.mapVertices { case (id, (name, age)) => User(name, age, 0, 0) }
//(2,User(Bob,27,0,0))
//(4,User(David,42,0,0))
//(6,User(Fran,50,0,0))
//(3,User(Charlie,65,0,0))
//(5,User(Ed,55,0,0))
//(1,User(Alice,28,0,0))
// value1.vertices.foreach(println)
// def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])
// (mapFunc: (VertexId, VD, Option[U]) => VD2)(implicit eq: VD =:= VD2 = null) : Graph[VD2, ED]
// val degrees: VertexRDD[Int] = graph.inDegrees
//使用新圖關(guān)聯(lián)入度的數(shù)據(jù)
val userGraph = g.outerJoinVertices(graph.inDegrees) {
//
case (id, info, inDe) => User(info.name, info.age, inDe.getOrElse(0), info.outDegrees)
}.outerJoinVertices(graph.outDegrees) {
case (id, info, outDe) => User(info.name, info.age, info.inDegrees, outDe.getOrElse(0))
}
val value: VertexRDD[User] = userGraph.vertices.filter { case (id, u) => u.inDegrees == u.outDegrees }
value.foreach(println(_))
//找出5到各頂點的最短距離
sc.stop()
}
case class User(name: String, age: Int, inDegrees: Int, outDegrees: Int)
}
Pregel API
圖本身是遞歸數(shù)據(jù)結(jié)構(gòu)衔掸,頂點的屬性依賴于他們鄰居的屬性烫幕,這些鄰居屬性有依賴于自己的鄰居屬性。許多重要的圖算法都是迭代的重新計算每個頂點的屬性敞映,直到滿足某個確定的條件较曼。一系列的圖并發(fā)抽象被提出來用來表達(dá)這些迭代算法。
Graphx公開了一個類似Pregel的操作振愿。
vprog:用戶定義的頂點運(yùn)行程序捷犹。它作用于每一個頂點,負(fù)責(zé)接收進(jìn)來的信息冕末,并計算新的頂點值
sendMsg:發(fā)送消息
mergeMsg:合并消息
案例二:連通圖算法
package com.hhb.spark.graphx
import org.apache.spark.graphx.{Graph, GraphLoader}
import org.apache.spark.{SparkConf, SparkContext}
/**
* @description:
* @date: 2020-11-25 18:07
**/
object GraphXExample2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init)
.setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("warn")
val graph: Graph[Int, Int] = GraphLoader.edgeListFile(sc, "data/graph.dat")
//(VertexId,info)
//(11,1)
//(4,1)
//(45067,1)
//(431250,1)
//(1,1)
//(6,1)
//(3,1)
//(10,1)
//(7,1)
//(2,1)
//(9,1)
//(9111,1)
//(5,1)
//把文件里面所有的數(shù)據(jù)都默認(rèn)設(shè)置為頂點萍歉,屬性給個默認(rèn)值1(無用)
graph.vertices.foreach(println(_))
println("===" * 20)
//起點,終點档桃,邊屬性枪孩,邊屬性為默認(rèn)值(無用)
//Edge(4,45067,1)
//Edge(1,431250,1)
//Edge(6,45067,1)
//Edge(7,45067,1)
//Edge(2,431250,1)
//Edge(9,9111,1)
//Edge(3,431250,1)
//Edge(10,9111,1)
//Edge(4,431250,1)
//Edge(11,9111,1)
//Edge(5,45067,1)
//Edge(5,431250,1)
graph.edges.foreach(println(_))
println("===" * 20)
//生成連通圖
//(頂點信息,出始點)
(11, 9)
(45067, 1)
(1, 1)
(3, 1)
(7, 1)
(9, 9)
(9111, 9)
(4, 1)
(5, 1)
(431250, 1)
(6, 1)
(10, 9)
(2, 1)
graph.connectedComponents()
.vertices
//按照元組的第二個元素排序藻肄,第二個元素就是出始點
.sortBy(_._2)
.foreach(println(_))
sc.stop()
}
}
案例三:尋找相同的用戶合并信息
假設(shè):
假設(shè)有五個不同信息可以作為用戶標(biāo)識码撰,分別為:1X或舞、2X签餐、3X菱鸥、4X、5X;
每次可以選擇使用若干為字段作為標(biāo)識
部分標(biāo)識可能發(fā)生變化抚垄,如:12 => 13 或 24 => 25
根據(jù)以上規(guī)則蜕窿,判斷以下標(biāo)識是否代表同一用戶:
- 11-21-32、12-22-33 (X)
- 11-21-32呆馁、11-21-52 (OK)
- 21-32桐经、11-21-33 (OK)
- 11-21-32、32-48 (OK)
問題:在以下數(shù)據(jù)中浙滤,找到同一用戶阴挣,合并相同用戶的數(shù)據(jù)
- 對于用戶標(biāo)識(id):合并后去重
- 對于用戶的信息:key相同,合并權(quán)重
package com.hhb.spark.graphx
import org.apache.spark.graphx.{Edge, Graph, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* @description:
* @author: huanghongbo
* @date: 2020-11-26 16:02
**/
object GraphXExample3 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init)
val sc = new SparkContext(conf)
sc.setLogLevel("warn")
val list = List(
(List(11L, 21L, 31L), List("kw$北京" -> 1.0, "kw$上海" -> 1.0, "area$中關(guān)村" -> 1.0)),
(List(21L, 32L, 41L), List("kw$上海" -> 1.0, "kw$天津" -> 1.0, "area$回龍觀" -> 1.0)),
(List(41L), List("kw$天津" -> 1.0, "area$中關(guān)村" -> 1.0)),
(List(12L, 22L, 33L), List("kw$大數(shù)據(jù)" -> 1.0, "kw$spark" -> 1.0, "area$西二旗" -> 1.0)),
(List(22L, 34L, 44L), List("kw$spark" -> 1.0, "area$五道口" -> 1.0)),
(List(33L, 53L), List("kw$hive" -> 1.0, "kw$spark" -> 1.0, "area$西二旗" -> 1.0))
)
val dataRDD = sc.makeRDD(list)
//將標(biāo)示信息中的每一個元素抽取出來纺腊,作為ID
//這里使用了flatMap將數(shù)據(jù)壓平
//丟掉了標(biāo)簽信息畔咧,因為這個RDD是用來構(gòu)造頂點,邊揖膜,tags信息用不到
//頂點誓沸、邊的數(shù)據(jù)要求是long,這個程序修改后我們才能使用
val dotRDD: RDD[(Long, Long)] = dataRDD.flatMap { case (buffer, _) =>
buffer.map(id => (id, buffer.mkString.toLong))
}
// dotRDD.foreach(println(_))
//(41,41)
//(12,122233)
//(22,223444)
//(34,223444)
//(33,3353)
//(44,223444)
//(21,213241)
//(32,213241)
//(41,213241)
//(22,122233)
//(33,122233)
//(11,112131)
//(21,112131)
//(31,112131)
//(53,3353)
//定義訂單的RDD壹粟,每個id都是一個訂單
val vertexesRDD = dotRDD.map(x => (x._1, 0))
//定義邊信息
val edgeRDD = dotRDD.map(x => Edge(x._1, x._2, 0))
//定義圖信息
val graph = Graph(vertexesRDD, edgeRDD)
//輸出點所有的信息
// graph.vertices.foreach(println(_))
//(31,0)
//(3353,0)
//(22,0)
//(32,0)
//(21,0)
//(12,0)
//(34,0)
//(11,0)
//(112131,0)
//(44,0)
//(53,0)
//(41,0)
//(223444,0)
//(213241,0)
//(122233,0)
//(33,0)
//使用強(qiáng)連通圖拜隧,生成的數(shù)據(jù)可以看見看見,是被分為了兩類
// graph.connectedComponents().vertices.foreach(println(_))
// (22,12)
//(32,11)
//(34,12)
//(21,11)
//(12,12)
//(31,11)
//(11,11)
//(112131,11)
//(3353,12)
//(53,12)
//(41,11)
//(213241,11)
//(122233,12)
//(33,12)
//(44,12)
//(223444,12)
//計算出連通點后,其實就是為了數(shù)據(jù)(112131,11)洪添、(213241,11)垦页、(41,11)、(122233,12)(223444,12)
//這樣就可以上最上面的rdd(dataRDD)將第一個集合的數(shù)據(jù)壓縮成上面的key干奢,就可以確認(rèn)出來哪些數(shù)據(jù)為一條數(shù)據(jù)
val vertices = graph.connectedComponents().vertices
//定義中心點的數(shù)據(jù)痊焊,即(List(11L, 21L, 31L), List("kw$北京" -> 1.0, "kw$上海" -> 1.0, "area$中關(guān)村" -> 1.0)) =》
// (112131,(List(11L, 21L, 31L), List("kw$北京" -> 1.0, "kw$上海" -> 1.0, "area$中關(guān)村" -> 1.0)))
//使用這樣的數(shù)據(jù),與vertices 進(jìn)行join忿峻,如下
val allInfoRDD: RDD[(VertexId, (List[VertexId], List[(String, Double)]))] = dataRDD.map { case (ids, infos) =>
(ids.mkString.toLong, (ids, infos))
}
//(223444,(12,(List(22, 34, 44),List((kw$spark,1.0), (area$五道口,1.0)))))
//那么就可以通過value的第一個元素進(jìn)行合并數(shù)據(jù)
val joinRDD: RDD[(VertexId, (VertexId, (List[VertexId], List[(String, Double)])))] = vertices.join(allInfoRDD)
// value.foreach(println(_)),可以通過12宋光、12進(jìn)行reduceBy
//(223444,(12,(List(22, 34, 44),List((kw$spark,1.0), (area$五道口,1.0)))))
//(3353,(12,(List(33, 53),List((kw$hive,1.0), (kw$spark,1.0), (area$西二旗,1.0)))))
//(112131,(11,(List(11, 21, 31),List((kw$北京,1.0), (kw$上海,1.0), (area$中關(guān)村,1.0)))))
//(41,(11,(List(41),List((kw$天津,1.0), (area$中關(guān)村,1.0)))))
//(213241,(11,(List(21, 32, 41),List((kw$上海,1.0), (kw$天津,1.0), (area$回龍觀,1.0)))))
//(122233,(12,(List(12, 22, 33),List((kw$大數(shù)據(jù),1.0), (kw$spark,1.0), (area$西二旗,1.0)))))
val mergeInfo = joinRDD.map { case (_, infos) => (infos._1, infos._2) }
//(12,(List(22, 34, 44),List((kw$spark,1.0), (area$五道口,1.0))))
//下面兩個步驟可以合并到一起,shuffle階段直接對數(shù)據(jù)去重炭菌、合并
val resultRDD = mergeInfo.reduceByKey { case ((ids, infos), (id, info)) =>
val newIds = ids ++ id
val newInfos = infos ++ info
(newIds, newInfos)
}
val result = resultRDD.mapValues { case (newIds, newInfos) =>
val ids = newIds.distinct
val infos: Map[String, Double] = newInfos.groupBy(_._1).mapValues(list => list.map(_._2).sum)
(ids, infos)
}
result.foreach(println(_))
//(11,(List(41, 21, 32, 11, 31),Map(area$中關(guān)村 -> 2.0, kw$北京 -> 1.0, kw$天津 -> 2.0, kw$上海 -> 2.0, area$回龍觀 -> 1.0)))
//(12,(List(33, 53, 12, 22, 34, 44),Map(kw$大數(shù)據(jù) -> 1.0, kw$spark -> 3.0, area$五道口 -> 1.0, area$西二旗 -> 2.0, kw$hive -> 1.0)))
sc.stop()
}
}