“像頂點一樣思考”
?
???? ?? Spark GraphX是一個分布式圖處理框架腥寇,Spark GraphX基于Spark平臺提供對圖計算和圖挖掘簡潔易用的而豐富多彩的接口扒吁,極大的方便了大家對分布式圖處理的需求咙咽。Spark GraphX由于底層是基于Spark來處理的涧卵,所以天然就是一個分布式的圖處理系統(tǒng)极景。圖的分布式或者并行處理其實是把這張圖拆分成很多的子圖处坪,然后我們分別對這些子圖進行計算,計算的時候可以分別迭代進行分階段的計算费坊,即對圖進行并行計算倒槐。
?
?????? 設(shè)計GraphX時,點分割和GAS都已成熟附井,在設(shè)計和編碼中針對它們進行了優(yōu)化讨越,并在功能和性能之間尋找最佳的平衡點。如同Spark本身永毅,每個子模塊都有一個核心抽象把跨。GraphX的核心抽象是Resilient Distributed Property Graph,一種點和邊都帶屬性的有向多重圖沼死。它擴展了Spark RDD的抽象着逐,有Table和Graph兩種視圖,而只需要一份物理存儲漫雕。兩種視圖都有自己獨有的操作符滨嘱,從而獲得了靈活操作和執(zhí)行效率。
相關(guān)知識
- Scala
- 對象-函數(shù)式編程語言
- 類型推斷
- 類的申明
- map和reduce
- 一切都是函數(shù)
- 與java的互操作性
- Spark
- 分布式內(nèi)存數(shù)據(jù):RDD
- 集群和術(shù)語解釋
- 圖術(shù)語
- 基礎(chǔ)圖: 有向圖和無向圖浸间、有環(huán)圖和無環(huán)圖太雨、有標簽的圖和無標簽的圖、二分圖
- RDF圖和屬性圖
- 鄰接矩陣
GraphX基礎(chǔ)
類成員
?????? 在GraphX中魁蒜,圖的基礎(chǔ)類為Garph囊扳,它包含兩個RDD:一個為邊RDD吩翻,另一個為頂點RDD∽断蹋可以用給定的邊RDD和頂點RDD構(gòu)建一個圖狭瞎。一旦構(gòu)建好圖,就可以用edges()和vertices()來訪問邊和頂點的集合搏予。VD和ED代表了用戶自定義的頂點和邊類熊锭,對應的圖是參數(shù)化類型的泛類型Graph[VD,ED]。GraphX中圖必須要有頂點和邊屬性雪侥。GraphX中Vertice和Edge持有VerticeId值碗殷,而不是頂點的引用。圖在集群中是分布式存儲的速缨,不屬于單個JVM锌妻,因此一條邊的頂點可能在不同的集群節(jié)點上。
-
頂點: Vertice(VertexId, VD) ??????
abstract class VertexRDD[VD] extends RDD[(VertexId, VD)]
- 抽象值成員
innerJoin
leftJoin
mapValues
··· - 具體值成員
collect
count
distinct
filter
foreach
groupBy
isEmpty
persist
map
reduce
sortBy
toString
···
- 抽象值成員
-
邊: Edge(VertexId, VertexId, ED)???
class Edge[ED](srcId:VertexId, dstId:VertexId, attire:E
abstract class EdgeRDD[ED] extends RDD[Edge[ED]]
- 抽象值成員
innerJoin
mapValues
reverse
- 具體值成員
++
aggregate
cache
collect
count
distinct
filter
foreach
groupBy
isEmpty
map
persist
reduce
sortBy
toString
···
class EdgeTriplet[VD, ED] extends Edge[ED]
- 值成員
Attr
srcId
srcAttr
dstId
dstAttr
- 抽象值成員
-
圖: Graph(VD, ED) ??????
abstract class Graph[VD,ED] extend Serializable
- 抽象值成員
cache
edges
mapEdges
mapTriplets
mapVertices
mask
outerJoinVertices
persist
reverse
subgraph
triplets
vertices
··· - 具體值成員
aggregateMessages
mapEdges
mapTriplets
···
class GraphOps[VD,ED] extends Serializable
- 值成員
collectEdges
collectNeiborIds
collectNeibors
degrees
filter
inDegrees
joinVertices
numEdges
numVertices
outDegrees
pageRank
personalizedPageRank
pickRandomVertex
pregel
triangleCount
···
- 抽象值成員
GraphX實例
-
引用
import org.apache.spark._ import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD
-
構(gòu)圖
???? ??有很多方式從一個原始文件旬牲、RDD構(gòu)造一個屬性圖仿粹。最一般的方法是利用Graph object。 下面的代碼從RDD集合生成屬性圖原茅。
// 假設(shè)SparkContext已經(jīng)被構(gòu)造 val sc: SparkContext // 創(chuàng)建點RDD val users: RDD[(VertexId, (String, String))] =sc.parallelize( Array((3L, ("rxin", "student")), (7L, ("jgonzal","postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof")))) // 創(chuàng)建邊RDD val relationships: RDD[Edge[String]] = sc.parallelize( Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi") Edge(5L, 0L, "colleague"))) // 定義一個默認用戶吭历,避免有不存在用戶的關(guān)系 val defaultUser = ("John Doe", "Missing") // 構(gòu)造Graph val graph = Graph(users, relationships, defaultUser)
-
緩存
//緩存。默認情況下,緩存在內(nèi)存的圖會在內(nèi)存緊張的時候被強制清理员咽,采用的是LRU算法 graph.cache() graph.persist(StorageLevel.MEMORY_ONLY) graph.unpersistVertices(true)
-
點毒涧、邊和三元組
??????下面的代碼用到了Edge樣本類贮预。邊有一個srcId和dstId分別對應于源和目標頂點的標示符贝室。另外,Edge類有一個attr成員用來存儲邊屬性仿吞』担可以分別用graph.vertices和graph.edges成員將一個圖解構(gòu)為相應的頂點和邊。graph.vertices返回一個VertexRDD[(String, String)]唤冈,它繼承于 RDD[(VertexID, (String, String))]峡迷。所以我們可以用scala的case表達式解構(gòu)這個元組。另一方面你虹,graph.edges返回一個包含Edge[String]對象的EdgeRDD绘搞,我們也可以用到case類的類型構(gòu)造器。
???? ??除了屬性圖的頂點和邊視圖傅物,GraphX也包含了一個三元組視圖夯辖,三元視圖邏輯上將頂點和邊的屬性保存為一個RDD[EdgeTriplet[VD, ED]],它包含EdgeTriplet類的實例董饰。EdgeTriplet類繼承于Edge類蒿褂,并且加入了srcAttr和dstAttr成員圆米,這兩個成員分別包含源和目的的屬性。我們可以用一個三元組視圖渲染字符串集合用來描述用戶之間的關(guān)系啄栓。
// 找出職業(yè)為postdoc的人 graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.collect // 計算源頂點ID大于目標頂點ID的邊的數(shù)量 graph.edges.filter(e => e.srcId > e.dstId).count graph.edges.filter { case Edge(src, dst, prop) => src > dst }.count // 使用三元組視圖描述關(guān)系事實 val facts: RDD[String] = graph.triplets.map(triplet =>triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1) facts.collect.foreach(println(_))
-
度娄帖、入度、出度
???? ??正如RDDs有基本的操作map, filter和reduceByKey一樣昙楚,屬性圖也有基本的集合操作近速,這些操作采用用戶自定義的函數(shù)并產(chǎn)生包含轉(zhuǎn)換特征和結(jié)構(gòu)的新圖。定義在Graph中的 核心操作是經(jīng)過優(yōu)化的實現(xiàn)堪旧。表示為核心操作的組合的便捷操作定義在GraphOps中数焊。然而, 因為有Scala的隱式轉(zhuǎn)換崎场,定義在GraphOps中的操作可以作為Graph的成員自動使用佩耳。例如,我們可以通過下面的方式計算每個頂點(定義在GraphOps中)的入度谭跨。區(qū)分核心圖操作和GraphOps的原因是為了在將來支持不同的圖表示干厚。每個圖表示都必須提供核心操作的實現(xiàn)并重用很多定義在GraphOps中的有用操作。
val degrees: VertexRDD[Int] = graph.degrees; degrees.collect().foreach(println) val inDegrees: VertexRDD[Int] = graph.inDegrees inDegrees.collect().foreach(println) val outDegrees: VertexRDD[Int] = graph.outDegrees outDegrees.collect().foreach(println)
-
屬性操作:修改頂點和邊的屬性
???? ??屬性操作每個操作都產(chǎn)生一個新的圖螃宙,這個新的圖包含通過用戶自定義的map操作修改后的頂點或邊的屬性蛮瞄。Map操作根據(jù)原圖的一些特性得到新圖,原圖結(jié)構(gòu)是不變的谆扎。這些操作的一個重要特征是它允許所得圖形重用原有圖形的結(jié)構(gòu)索引(indices)挂捅。下面的兩行代碼在邏輯上是等價的,但是第一個不是圖操作堂湖,它不保存結(jié)構(gòu)索引闲先,所以不會從GraphX系統(tǒng)優(yōu)化中受益。Map操作根據(jù)原圖的一些特性得到新圖无蜂,原圖結(jié)構(gòu)是不變的伺糠。這些操作經(jīng)常用來初始化的圖形,用作特定計算或者用來處理項目不需要的屬性斥季。例如夜矗,給定一個圖驮履,這個圖的頂點特征包含出度惩系,我們?yōu)镻ageRank初始化它讹挎。
- map 操作
//頂點轉(zhuǎn)換,頂點age+1 //RDD操作躁锡,再構(gòu)造新圖午绳,不保存結(jié)構(gòu)索引,不會被系統(tǒng)優(yōu)化 val newVertices = graph.vertices.map { case (id, attr) => (id, (attr._1 + "-1", attr._2 + "-2")) } val newGraph1 = Graph(newVertices, graph.edges) //圖Map操作稚铣,被系統(tǒng)優(yōu)化 val newGraph2 = graph.mapVertices((id, attr) => (id, (attr._1 + "-1", attr._2 + "-2")))
- Join 操作
//構(gòu)造一個新圖箱叁,頂點屬性是出度 val inputGraph: Graph[Int, String] = graph.outerJoinVertices( graph.outDegrees)((vid, _, degOpt) => degOpt.getOrElse(0)) //根據(jù)頂點屬性為出度的圖構(gòu)造一個新圖墅垮,依據(jù)PageRank算法初始化邊與點 val outputGraph: Graph[Double, Double] =inputGraph.mapTriplets( triplet => 1.0 / triplet.srcAttr).mapVertices((id, _) => 1.0)
- 自定義類型
//創(chuàng)建一個新圖,頂點 VD 的數(shù)據(jù)類型為 User耕漱,并從 graph 做類型轉(zhuǎn)換 case class User(name: String, pos: String, inDeg: Int, outDeg: Int) val initialUserGraph: Graph[User, String] = graph.mapVertices { case (id, (name, age)) => User(name, pos, 0, 0)} //initialUserGraph 與 inDegrees算色、outDegrees(RDD)進行連接,并修改 initialUserGraph中 inDeg 值螟够、outDeg 值 val userGraph = initialUserGraph.outerJoinVertices( initialUserGraph.inDegrees) { case (id, u, inDegOpt) => User(u.name, u.pos, inDegOpt.getOrElse(0), u.outDeg) }.outerJoinVertices( initialUserGraph.outDegrees) { case (id, u, outDegOpt) => User(u.name, u.pos, u.inDeg, outDegOpt.getOrElse(0)) } userGraph.vertices.collect.foreach(v => println(s"${v._2.name} inDeg: ${v._2.inDeg} outDeg: ${v._2.outDeg}")) //出度和入讀相同的人員 userGraph.vertices.filter { case (id, u) => u.inDeg == u.outDeg }.collect.foreach { case (id, property) => println(property.name) }
-
結(jié)構(gòu)操作
- 子圖
//由已定義的頂點構(gòu)成的子圖 val subGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing" ) subGraph.vertices.collect().foreach(println(_)) subGraph.triplets.map(triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1).collect().foreach(println(_))
- 圖反向
//圖的反向操作灾梦,新的圖形的所有邊的方向相反,不修改頂點或邊性屬性妓笙、不改變的邊的數(shù)目若河,它可以有效地實現(xiàn)不必要的數(shù)據(jù)移動或復制 var rGraph = graph.reverse
- Mask
//Mask操作也是根據(jù)輸入圖構(gòu)造一個新圖,達到一個限制制約的效果 val ccGraph = graph.connectedComponents() val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing") val validCCGraph = ccGraph.mask(validGraph)
-
聚合操作
//計算年齡大于自己的關(guān)注者的總?cè)藬?shù)和總年齡 val olderFollowers: VertexRDD[(Int, Double)] = graph.mapReduceTriplets[(Int, Double)]( //Map函數(shù) triplet => { if (triplet.srcAttr > triplet.dstAttr) { Iterator((triplet.dstId, (1, triplet.srcAttr))) } else { Iterator.empty } }, //Reduce函數(shù) (a, b) => (a._1 + b._1, a._2 + b._2) ) //計算年齡大于自己的關(guān)注者的平均年齡 val avgAgeOfOlderFollowers: VertexRDD[Double] = olderFollowers.mapValues((id, value) => value match {case (count, totalAge) => totalAge / count }) avgAgeOfOlderFollowers.collect.foreach(println(_)) //定義一個Reduce函數(shù)來計算圖中較大度的點 def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = { if (a._2 > b._2) a else b } val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max) println(s"maxInDegree: $maxInDegree") val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max) println(s"maxOutDegree: $maxOutDegree") val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max) println(s"maxDegrees: $maxDegrees")
-
相鄰聚合
//計算鄰居相關(guān)函數(shù)寞宫,這些操作是相當昂貴的萧福,需要大量的重復信息作為他們的通信,因此相同的計算還是推薦用mapReduceTriplets val neighboorIds:VertexRDD[Array[VertexId]] = graph.collectNeighborIds(EdgeDirection.Out) val neighboors:VertexRDD[Array[(VertexId, Double)]]= graph.collectNeighbors(EdgeDirection.Out);
-
Pregel API
//Pregel API辈赋。計算單源最短路徑 //通過GraphGenerators構(gòu)建一個隨機圖 val numVertices = 100 val numEParts = 2 val mu = 4.0 val sigma = 1.3 val graph1 = GraphGenerators.logNormalGraph(sc, numVertices, numEParts, mu, sigma).mapEdges(e=> e.attr.toDouble) //定義一個源值 點 val sourceId: VertexId = 42 //初始化圖的所有點鲫忍,除了與指定的源值點相同值的點為0.0以外,其他點為無窮大 val initialGraph = graph1.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity) //Pregel有兩個參數(shù)列表钥屈,第一個參數(shù)列表包括的是:初始化消息悟民、迭代較大數(shù)、邊的方向(Out)篷就。第二個參數(shù)列表包括的是:用戶定義的接受消息射亏、計算消息、聯(lián)合合并消息的函數(shù)竭业。 val sssp = initialGraph.pregel(Double.PositiveInfinity)( //點程序 (id, dist, newDist) => math.min(dist, newDist), //發(fā)送消息 triplet => { if (triplet.srcAttr + triplet.attr < triplet.dstAttr) { Iterator((triplet.dstId, triplet.srcAttr + triplet.attr)) } else { Iterator.empty } }, //合并消息 (a, b) => math.min(a, b) ) println(sssp.vertices.collect.mkString("\n"))
-
主要圖算法
- PageRank
val pageRankGraph = graph1.pageRank(0.001) pageRankGraph.vertices.sortBy(_._2智润,false).saveAsTextFile( "/user/hadoop/data/temp/graph/graph.pr") pageRankGraph.vertices.top(5)(Ordering.by(_._2)).foreach(println)
- Connected Components
val connectedComponentsGraph = graph1.connectedComponents() connectedComponentsGraph.vertices.sortBy(_._2, false).saveAsTextFile( "/user/hadoop/data/temp/graph/graph.cc") connectedComponentsGraph.vertices.top(5)(Ordering.by(_._2)).foreach(println)
- TriangleCount
//TriangleCount主要用途之一是用于社區(qū)發(fā)現(xiàn) 保持sourceId小于destId val graph2 = GraphLoader.edgeListFile(sc, path, true) val triangleCountGraph = graph2.triangleCount() triangleCountGraph.vertices.sortBy(_._2,false).saveAsTextFile( "/user/hadoop/data/temp/graph/graph.tc") triangleCountGraph.vertices.top(5)(Ordering.by(_._2)).foreach(println)
-
其他操作
- GraphLoader構(gòu)建Graph
var path = "/user/Hadoop/data/temp/graph/graph.txt" var minEdgePartitions = 1 var canonicalOrientation = false // if sourceId < destId this value is true val graph1 = GraphLoader.edgeListFile(sc, path, canonicalOrientation, minEdgePartitions,StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY)
- 隨機圖
//通過GraphGenerators構(gòu)建一個隨機圖 val numVertices = 100 val numEParts = 2 val mu = 4.0 val sigma = 1.3 val graph: Graph[Double, Int] = GraphGenerators.logNormalGraph(sc, numVertices, numEParts, mu, sigma).mapVertices((id, _) => id.toDouble) graph.triplets.collect.foreach(triplet => println(triplet.srcId + "-" + triplet.srcAttr + "-" + triplet.attr + "-" + triplet.dstId + "-" + triplet.dstAttr))
- aggregateUsingIndex操作
val setA: VertexRDD[Int] = VertexRDD( sc.parallelize(0L until 100L).map(id => (id, 1))) val rddB: RDD[(VertexId, Double)] = sc.parallelize( 0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0))) val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _) val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)
參考資料
- 《Spark GraphX實戰(zhàn)》
- 《Spark入門實戰(zhàn)系列--9.Spark圖計算GraphX介紹及實例》
- 《Spark中文手冊8:spark GraphX編程指南》
- 《Spark_GraphX大規(guī)模圖計算和圖挖掘V3.0》