轉(zhuǎn)載請注明出處搞监,謝謝合作~
GraphX 編程指南
- 概述(Overview)
- 快速上手(Getting Started)
- 屬性圖(The Property Graph)
- 屬性圖示例(Example Property Graph)
- 圖算子(Graph Operators)
- 算子列表(Summary List of Operators)
- 屬性算子(Property Operators)
- 結(jié)構(gòu)算子(Structural Operators)
- 鏈接算子(Join Operators)
- 鄰接聚合(Neighborhood Aggregation)
- 聚合消息(Aggregate Messages (aggregateMessages))
- MapReduceTriplets 遷移指南(Map Reduce Triplets Transition Guide (Legacy))
- 計(jì)算連接度(Computing Degree Information)
- 收集鄰接節(jié)點(diǎn)Collecting Neighbors
- 緩存及清理(Caching and Uncaching)
- Pregel API(Pregel API)
- 圖構(gòu)建器(Graph Builders)
- 節(jié)點(diǎn)和邊 RDD(Vertex and Edge RDDs)
- 節(jié)點(diǎn) RDD(VertexRDDs)
- 邊 RDD(EdgeRDDs)
- 優(yōu)化表示方式(Optimized Representation)
- 圖算法(Graph Algorithms)
- 網(wǎng)站排名(PageRank)
- 連通分量(Connected Components)
- 三角計(jì)數(shù)(Triangle Counting)
- 示例代碼(Examples)
概述
GraphX 是 Spark 中一個(gè)新的組件,用來應(yīng)對并行圖計(jì)算的場景。從一個(gè)比較高的層次來看愧哟,GraphX 通過擴(kuò)展 RDD 的概念提出了一個(gè)新的 Graph 抽象:一個(gè)節(jié)點(diǎn)和邊都包含屬性的有向多重圖劣挫。為了能夠支持圖計(jì)算,GraphX 給出了一組基礎(chǔ)的算子(例如傲绣,subgraph掠哥, joinVertices 和 aggregateMessages)以及優(yōu)化后的 Pregel API。另外秃诵,GraphX 還在持續(xù)更新圖算法(algorithms)和圖構(gòu)建器(builders)的集合來簡化圖分析任務(wù)续搀。
準(zhǔn)備工作
需要首先引入 Spark 和 GraphX 的依賴到項(xiàng)目中,如下所示:
import org.apache.spark._
import org.apache.spark.graphx._
// To make some of the examples work we will also need RDD
import org.apache.spark.rdd.RDD
如果不是通過 Spark shell 練習(xí)的話菠净,就需要先初始化一個(gè) SparkContext
禁舷。關(guān)于快速上手 Spark 應(yīng)用的更多信息參見 Spark Quick Start Guide。
屬性圖
屬性圖(property graph)是一個(gè)有向的多重圖毅往,每個(gè)節(jié)點(diǎn)和每條邊都可以自定義屬性牵咙。有向多重圖是指有向圖中某些相同的節(jié)點(diǎn)之間可能存在多條邊,這種能力為相同的節(jié)點(diǎn)間在存在多種關(guān)系(例如煞抬,同時(shí)是同事和朋友)的場景的建模提供了便利霜大。每個(gè)節(jié)點(diǎn)都以一個(gè) 64 位的長整型作為 ID(VertexId
),GraphX 并不需要節(jié)點(diǎn) ID 是有序的革答。每條邊也會(huì)包含相應(yīng)的源節(jié)點(diǎn) ID 和目的節(jié)點(diǎn) ID战坤。
屬性圖由節(jié)點(diǎn)類型(VD
)和邊類型(ED
)標(biāo)定,分別代表跟節(jié)點(diǎn)和邊綁定的自定義對象類型残拐。
當(dāng)節(jié)點(diǎn)和邊的類型為基本類型(例如途茫,int,double 等等)時(shí)溪食,GraphX 優(yōu)化了它們的表示方式囊卜,通過特殊的數(shù)組來存儲以減少內(nèi)存的消耗。
在一些場景下,需要同一張圖中的節(jié)點(diǎn)能夠擁有不同的屬性栅组,這樣的需求可以通過繼承來實(shí)現(xiàn)雀瓢。例如,如果對具有用戶和產(chǎn)品屬性的二分圖進(jìn)行建模玉掸,可以通過下面的方式:
class VertexProperty()
case class UserProperty(val name: String) extends VertexProperty
case class ProductProperty(val name: String, val price: Double) extends VertexProperty
// The graph might then have the type:
var graph: Graph[VertexProperty, String] = null
與 RDD 類似刃麸,屬性圖也是不可變的,分布式的和容錯(cuò)的司浪。節(jié)點(diǎn)值或者圖拓?fù)浣Y(jié)構(gòu)的變更會(huì)根據(jù)相應(yīng)的變更生成一張新圖泊业。需要注意的是,原圖中沒有發(fā)生變化的部分(即啊易,未受影響的圖結(jié)構(gòu)吁伺,屬性和索引)在構(gòu)建新圖的時(shí)候可以被復(fù)用,來減少函數(shù)式數(shù)據(jù)結(jié)構(gòu)的開銷租谈。整張圖根據(jù)節(jié)點(diǎn)被劃分為多個(gè)分區(qū)篮奄,分布在各個(gè) Executor 上。和 RDD 一樣割去,圖的每個(gè)分區(qū)都可以在故障時(shí)在另一個(gè)節(jié)點(diǎn)上被重新創(chuàng)建宦搬。
在邏輯上,屬性圖由一些強(qiáng)類型的數(shù)據(jù)集合(RDD)構(gòu)成劫拗,數(shù)據(jù)集合中存儲著每個(gè)節(jié)點(diǎn)和邊的屬性信息。于是矾克,從 Graph 類中可以訪問節(jié)點(diǎn)和邊:
class Graph[VD, ED] {
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
}
VertexRDD[VD]
和 EdgeRDD[ED]
分別代表優(yōu)化后的 RDD[(VertexId, VD)]
和 RDD[Edge[ED]]
页慷,它們都有一些契合圖計(jì)算的額外功能,為圖計(jì)算提供優(yōu)化措施胁附。關(guān)于 VertexRDD
VertexRDD 和 EdgeRDD
EdgeRDD 的 API 會(huì)在 vertex and edge RDDs 章節(jié)中詳細(xì)討論酒繁,暫時(shí)可以將它們簡單的當(dāng)做 RDD[(VertexId, VD)]
和 RDD[Edge[ED]]
。
屬性圖示例
假設(shè)想要構(gòu)建一張屬性圖控妻,包含了 GraphX 項(xiàng)目的合作人員州袒,節(jié)點(diǎn)屬性可能會(huì)包含名字和職業(yè),可以用一個(gè)字符串來描述合作人員之間的關(guān)系弓候,并用邊來表示:
這張圖的類型簽名如下:
val userGraph: Graph[(String, String), String]
可以通過幾種方式構(gòu)建一張屬性圖郎哭,比如從文件、RDD 甚至自定義生成器菇存,這些將會(huì)在 graph builders 章節(jié)中詳細(xì)討論夸研。最常用的方式可能是通過 Graph object,例如下面的代碼從一些 RDD 中構(gòu)建了一張圖依鸥。
// Assume the SparkContext has already been constructed
val sc: SparkContext
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Seq((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
sc.parallelize(Seq(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
在上面的示例中使用了樣本類 Edge
亥至,其中包含一個(gè) srcId
和一個(gè) dstId
,分別代表源節(jié)點(diǎn)和目的節(jié)點(diǎn)的 ID,此外還有一個(gè) attr
成員變量存儲了邊的屬性姐扮。
可以通過成員變量 graph.vertices
和 graph.edges
來分別獲取屬性圖中的節(jié)點(diǎn)視圖和邊的視圖絮供。
val graph: Graph[(String, String), String] // Constructed from above
// Count all users which are postdocs
graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count
// Count all the edges where src > dst
graph.edges.filter(e => e.srcId > e.dstId).count
注意
graph.vertices
返回一個(gè)VertexRDD[(String, String)]
,繼承自RDD[(VertexId, (String, String))]
茶敏,所以可以使用 scala 的case
表達(dá)式將其結(jié)構(gòu)為 tuple壤靶。同時(shí),graph.edges
返回一個(gè)EdgeRDD
睡榆,其中包含了Edge[String]
對象萍肆,也能夠以下面的方式使用構(gòu)造器:
graph.edges.filter { case Edge(src, dst, prop) => src > dst }.count
除了屬性圖的節(jié)點(diǎn)和邊的視圖,GraphX 還提供了一種三元組視圖胀屿。三元組視圖在邏輯上關(guān)聯(lián)了節(jié)點(diǎn)和邊的屬性塘揣,生成一個(gè) RDD[EdgeTriplet[VD, ED]]
,其中包含 EdgeTriplet
類的實(shí)例宿崭。關(guān)聯(lián)行為可以用下面的 SQL 語句來表示:
SELECT src.id, dst.id, src.attr, e.attr, dst.attr
FROM edges AS e LEFT JOIN vertices AS src, vertices AS dst
ON e.srcId = src.Id AND e.dstId = dst.Id
或者以圖的形式:
EdgeTriplet
類繼承了 Edge
亲铡,并添加了成員變量 srcAttr
和 dstAttr
,分別表示源節(jié)點(diǎn)和目的節(jié)點(diǎn)的屬性值葡兑〗甭可以使用三元組視圖來渲染字符串,生成用戶間關(guān)系的集合讹堤。
val graph: Graph[(String, String), String] // Constructed from above
// Use the triplets view to create an RDD of facts.
val facts: RDD[String] =
graph.triplets.map(triplet =>
triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1)
facts.collect.foreach(println(_))
圖算子
RDD 有一些基礎(chǔ)算子吆鹤,例如 map
,filter
和 reduceByKey
洲守,屬性圖也有一些基礎(chǔ)算子疑务,可以接收用戶自定義的函數(shù),根據(jù)轉(zhuǎn)換后的屬性值和拓?fù)浣Y(jié)構(gòu)生成新的圖梗醇。優(yōu)化后的核心算子定義在類 Graph
中知允,還有一些比較方便的算子作為二話不說算子的補(bǔ)充被定義在類 GraphOps
中。但是叙谨,得益于 Scala 的隱式轉(zhuǎn)換特性温鸽,定義在 GraphOps
中的算子可以被自動(dòng)轉(zhuǎn)化為的 Graph
算子。例如手负,可以通過以下方式計(jì)算每個(gè)節(jié)點(diǎn)的入度(定義在 GraphOps
中)涤垫。
val graph: Graph[(String, String), String]
// Use the implicit GraphOps.inDegrees operator
val inDegrees: VertexRDD[Int] = graph.inDegrees
核心圖 API 和 GraphOps
的區(qū)別是為了在未來更好的支持圖的不同呈現(xiàn)方式,每一種方式必須提供核心算子的實(shí)現(xiàn)虫溜,并且可以復(fù)用許多 GraphOps
中定義的算子雹姊。
算子列表
下面是一個(gè)關(guān)于 Graph
和 GraphOps
提供的算法功能的快速總結(jié),為了方便起見衡楞,都展示為 Graph 的成員變量吱雏。注意有一些函數(shù)的簽名被簡化了(例如敦姻,去除了默認(rèn)參數(shù)和類型約束),還有一些高級功能沒有介紹在內(nèi)歧杏,所以請通過官方的 API 文檔獲取完整的算子列表镰惦。
/** Summary of the functionality in the property graph */
class Graph[VD, ED] {
// Information about the Graph ===================================================================
val numEdges: Long
val numVertices: Long
val inDegrees: VertexRDD[Int]
val outDegrees: VertexRDD[Int]
val degrees: VertexRDD[Int]
// Views of the graph as collections =============================================================
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
val triplets: RDD[EdgeTriplet[VD, ED]]
// Functions for caching graphs ==================================================================
def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
def cache(): Graph[VD, ED]
def unpersistVertices(blocking: Boolean = false): Graph[VD, ED]
// Change the partitioning heuristic ============================================================
def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
// Transform vertex and edge attributes ==========================================================
def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
: Graph[VD, ED2]
// Modify the graph structure ====================================================================
def reverse: Graph[VD, ED]
def subgraph(
epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
vpred: (VertexId, VD) => Boolean = ((v, d) => true))
: Graph[VD, ED]
def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
// Join RDDs with the graph ======================================================================
def joinVertices[U](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
def outerJoinVertices[U, VD2](other: RDD[(VertexId, U)])
(mapFunc: (VertexId, VD, Option[U]) => VD2)
: Graph[VD2, ED]
// Aggregate information about adjacent triplets =================================================
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
def aggregateMessages[Msg: ClassTag](
sendMsg: EdgeContext[VD, ED, Msg] => Unit,
mergeMsg: (Msg, Msg) => Msg,
tripletFields: TripletFields = TripletFields.All)
: VertexRDD[A]
// Iterative graph-parallel computation ==========================================================
def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED]
// Basic graph algorithms ========================================================================
def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
def connectedComponents(): Graph[VertexId, ED]
def triangleCount(): Graph[Int, ED]
def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]
}
屬性算子
與 RDD 的 map
算子類似,屬性圖包含以下算子:
class Graph[VD, ED] {
def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
}
每個(gè)算子都會(huì)生成一張新的圖犬绒,其中的節(jié)點(diǎn)或者邊的屬性值被用戶自定義的 map
函數(shù)進(jìn)行了轉(zhuǎn)換旺入。
注意,圖的拓?fù)浣Y(jié)構(gòu)不會(huì)發(fā)生改變凯力,這是這些算子的共同特性茵瘾,可以讓新圖復(fù)用原圖的索引。下面的代碼片段在邏輯上是等同的咐鹤,但是第一種方式不會(huì)保留圖的結(jié)構(gòu)索引拗秘,沒有用到 GraphX 所做的優(yōu)化:
val newVertices = graph.vertices.map { case (id, attr) => (id, mapUdf(id, attr)) }
val newGraph = Graph(newVertices, graph.edges)
相反,使用
mapVertices
` 來復(fù)用索引:
val newGraph = graph.mapVertices((id, attr) => mapUdf(id, attr))
這些算子通常用來根據(jù)特定計(jì)算初始化圖的結(jié)構(gòu)或者剔除不需要的屬性信息祈惶。例如雕旨,在執(zhí)行 PageRank 算法前,將節(jié)點(diǎn)出度作為屬性值進(jìn)行初始化:
// Given a graph where the vertex property is the out degree
val inputGraph: Graph[Int, String] =
graph.outerJoinVertices(graph.outDegrees)((vid, _, degOpt) => degOpt.getOrElse(0))
// Construct a graph where each edge contains the weight
// and each vertex is the initial PageRank
val outputGraph: Graph[Double, Double] =
inputGraph.mapTriplets(triplet => 1.0 / triplet.srcAttr).mapVertices((id, _) => 1.0)
結(jié)構(gòu)算子
目前 GraphX 只支持一小部分常用的結(jié)構(gòu)算子捧请,在未來會(huì)豐富此類算子凡涩。下面列舉了一些基礎(chǔ)結(jié)構(gòu)算子。
class Graph[VD, ED] {
def reverse: Graph[VD, ED]
def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
}
reverse
算子返回一張新圖疹蛉,將原圖中所有的邊做反向處理活箕,該算子適用于形如計(jì)算反向 PageRank 的場景。由于反向算子并不會(huì)修改節(jié)點(diǎn)和邊的屬性值可款,可以在實(shí)現(xiàn)中避免避免數(shù)據(jù)遷移和去重讹蘑,以提高性能。
subgraph
算子接收節(jié)點(diǎn)和邊的斷言函數(shù)筑舅,返回的新圖中只包含了滿足斷言函數(shù)(返回值為 true)的節(jié)點(diǎn)和邊,以及滿足斷言函數(shù)的連接點(diǎn)(如果節(jié)點(diǎn)斷言為真陨舱,刪除相應(yīng)邊不會(huì)刪除節(jié)點(diǎn)翠拣;如果節(jié)點(diǎn)斷言為假,無論邊斷言是否為真都會(huì)被刪除)游盲。
subgraph` 算子可以被用來限制圖的節(jié)點(diǎn)和邊误墓,例如上述示例中刪除破裂的關(guān)系,如下面的代碼:
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Seq((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
(4L, ("peter", "student"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
sc.parallelize(Seq(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
// Notice that there is a user 0 (for which we have no information) connected to users
// 4 (peter) and 5 (franklin).
graph.triplets.map(
triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
).collect.foreach(println(_))
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// The valid subgraph will disconnect users 4 and 5 by removing user 0
validGraph.vertices.collect.foreach(println(_))
validGraph.triplets.map(
triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
).collect.foreach(println(_))
注意上面的示例中只提供了節(jié)點(diǎn)斷言函數(shù)益缎。如果節(jié)點(diǎn)或者邊的斷言函數(shù)沒有提供谜慌,
subgraph
算子會(huì)石筍默認(rèn)值true
。
mask
算子返回一張子圖莺奔,其中包含輸入圖中也存在的節(jié)點(diǎn)和邊欣范,該算子可以和 subgraph
算子配合使用,從而通過另一張圖的關(guān)聯(lián)關(guān)系約束本圖的拓?fù)浣Y(jié)構(gòu)。例如恼琼,可以再全圖節(jié)點(diǎn)上計(jì)算連通分量妨蛹,之后再通過有效子圖優(yōu)化計(jì)算結(jié)果。
// Run Connected Components
val ccGraph = graph.connectedComponents() // No longer contains missing field
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// Restrict the answer to the valid subgraph
val validCCGraph = ccGraph.mask(validGraph)
groupEdges
` 算子可以合并多重圖中并行的邊(即晴竞,同一節(jié)點(diǎn)對之間的多條邊)蛙卤。在很多數(shù)值應(yīng)用中,并行邊可以被求和(聚合權(quán)重)噩死,從而合并成一條單獨(dú)的邊颤难,減小圖的尺寸。
連接算子
在許多場景下已维,需要將圖數(shù)據(jù)和其他的數(shù)據(jù)集(RDD)關(guān)聯(lián)起來行嗤。例如,可能需要將額外的用戶屬性需要融合到現(xiàn)有的圖中衣摩,或者需要將一張圖中的節(jié)點(diǎn)屬性拉取到另一張圖中昂验。這些需求可以通過連接算子來實(shí)現(xiàn)。下面列舉了主要的連接算子:
class Graph[VD, ED] {
def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)
: Graph[VD, ED]
def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
: Graph[VD2, ED]
}
joinVertices
算子將節(jié)點(diǎn)和輸入 RDD 進(jìn)行關(guān)聯(lián)艾扮,返回一張新的圖既琴,其中節(jié)點(diǎn)的屬性通過應(yīng)用用戶自定義的
map` 函數(shù)計(jì)算得來。沒有匹配到的節(jié)點(diǎn)會(huì)保留原來的屬性值泡嘴。
注意甫恩,如果 RDD 中匹配某節(jié)點(diǎn)的元素多于一個(gè),只有有一個(gè)生效酌予。所以建議通過下面的方式來對 RDD 去重磺箕,同時(shí)預(yù)建索引,可以加速后續(xù)的連接計(jì)算抛虫。
val nonUniqueCosts: RDD[(VertexId, Double)]
val uniqueCosts: VertexRDD[Double] =
graph.vertices.aggregateUsingIndex(nonUnique, (a,b) => a + b)
val joinedGraph = graph.joinVertices(uniqueCosts)(
(id, oldCost, extraCost) => oldCost + extraCost)
更通用的 outerJoinVertices
算子和
joinVertices的行為相似松靡,但是該算子會(huì)應(yīng)用于所有的節(jié)點(diǎn),還能夠改變節(jié)點(diǎn)的的屬性值類型建椰。由于并不是每個(gè)節(jié)點(diǎn)在 RDD 中都有對應(yīng)的值雕欺,
map函數(shù)接收一個(gè)
Option` 類型的參數(shù)。例如棉姐,可以使用該算子將一張圖的節(jié)點(diǎn)屬性初始化為它們的出度屠列,供 PageRank 后續(xù)計(jì)算。
val outDegrees: VertexRDD[Int] = graph.outDegrees
val degreeGraph = graph.outerJoinVertices(outDegrees) { (id, oldAttr, outDegOpt) =>
outDegOpt match {
case Some(outDeg) => outDeg
case None => 0 // No outDegree means zero outDegree
}
}
你可注意到了上面的示例中使用的柯立化形式(例如伞矩,
f(a)(b)
)的列表參數(shù)笛洛,盡管f(a)(b)
和f(a,b)
的寫法在功能上沒有區(qū)別,但是后者意味著參數(shù)b
的類型推斷不依賴于參數(shù)a
乃坤。所以苛让,用戶需要為自定義函數(shù)提供類型聲明:
val joinedGraph = graph.joinVertices(uniqueCosts,
(id: VertexId, oldCost: Double, extraCost: Double) => oldCost + extraCost)
鄰接聚合
圖分析任務(wù)中的一個(gè)關(guān)鍵步驟就是聚合每個(gè)節(jié)點(diǎn)的鄰居的信息沟蔑。例如,如果想要知道一個(gè)用戶的關(guān)注者數(shù)量或者關(guān)注者的平均年齡蝌诡,許多圖迭代算法(例如溉贿,網(wǎng)頁排名,最短路徑和連通分量)也會(huì)重復(fù)聚合鄰居節(jié)點(diǎn)的屬性值(例如浦旱,當(dāng)前的排名值宇色,源的最短路徑和可達(dá)節(jié)點(diǎn)的最小 ID)。
為了提升性能颁湖,基礎(chǔ)聚合算子從
graph.mapReduceTriplets
升級為新的graph.AggregateMessages
宣蠕。盡管 API 的變動(dòng)很小,下面還是提供了遷移指南甥捺。
聚合消息 (aggregateMessages)
GraphX 中最核心的聚合算子就是 aggregateMessages
抢蚀。該算子將一個(gè)用戶自定義的
sendMsg函數(shù)應(yīng)用于圖中的每個(gè) *edge triplet*,之后使用
mergeMsg` 函數(shù)在目標(biāo)節(jié)點(diǎn)處聚合這些消息镰禾。
class Graph[VD, ED] {
def aggregateMessages[Msg: ClassTag](
sendMsg: EdgeContext[VD, ED, Msg] => Unit,
mergeMsg: (Msg, Msg) => Msg,
tripletFields: TripletFields = TripletFields.All)
: VertexRDD[Msg]
}
用戶自定義的函數(shù)接收一個(gè) EdgeContext
對象皿曲,該對象中可以訪問源節(jié)點(diǎn)和目的節(jié)點(diǎn)的屬性信息,以及邊的屬性信息吴侦,還能夠使用函數(shù)(sendToSrc
和 sendToDst
)將消息發(fā)送給源節(jié)點(diǎn)或者目的節(jié)點(diǎn)屋休。可以將 sendMsg
理解為 MapReduce 中的 map 函數(shù)备韧,而 mergeMsg
函數(shù)將發(fā)送給同一節(jié)點(diǎn)的多個(gè)消息聚合成為一個(gè)劫樟,可以將其理解為 MapReduce 中的 reduce 函數(shù)。aggregateMessages
算子返回一個(gè)
VertexRDD[Msg]织堂,其中包含了發(fā)送給每個(gè)節(jié)點(diǎn)的聚合后的消息(類型為
Msg`)叠艳。沒有收到消息的節(jié)點(diǎn)不會(huì)被包含在內(nèi)。
此外易阳,算子還可以接收一個(gè)可選的 tripletsFields
參數(shù)附较,表示可以在 EdgeContext
中訪問的數(shù)據(jù)。該參數(shù)的可選項(xiàng)定義在 TripletFields
中潦俺,默認(rèn)值為 TripletFields.All
翅睛,表示用戶自定義的 sendMsg
函數(shù)可能會(huì)訪問 EdgeContext
對象中的任意成員。該參數(shù)可以用來告知 GraphX 只有部分的 EdgeContext
成員在計(jì)算過程中是需要的黑竞,從而讓 GraphX 能夠選取一個(gè)優(yōu)化后的連接策略。例如如果需要計(jì)算每個(gè)用戶的關(guān)注者的平局年齡疏旨,只需要訪問源節(jié)點(diǎn)的屬性很魂,便可以將該參數(shù)配置為 TripletFields.Src
來表示只需要訪問源節(jié)點(diǎn)的屬性。
在之前的 GraphX 版本中使用字節(jié)編碼來推斷
TripletFields
的值檐涝,但是發(fā)現(xiàn)字節(jié)編碼的檢查并不是很靠譜遏匆,于是替換為顯式的控制方式法挨。(我理解是把位運(yùn)算替換為了顯式的布爾類型)
在下面的示例中,使用 aggregateMessages
` 算子來計(jì)算每個(gè)用戶的關(guān)注者中年齡比該用戶大的用戶的平均年齡幅聘。
import org.apache.spark.graphx.{Graph, VertexRDD}
import org.apache.spark.graphx.util.GraphGenerators
// Create a graph with "age" as the vertex property.
// Here we use a random graph for simplicity.
val graph: Graph[Double, Int] =
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
// Compute the number of older followers and their total age
val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
triplet => { // Map Function
if (triplet.srcAttr > triplet.dstAttr) {
// Send message to destination vertex containing counter and age
triplet.sendToDst((1, triplet.srcAttr))
}
},
// Add counter and age
(a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
)
// Divide total age by number of older followers to get average age of older followers
val avgAgeOfOlderFollowers: VertexRDD[Double] =
olderFollowers.mapValues( (id, value) =>
value match { case (count, totalAge) => totalAge / count } )
// Display the results
avgAgeOfOlderFollowers.collect.foreach(println(_))
完整的示例代碼位于 Spark 安裝包的 「examples/src/main/scala/org/apache/spark/examples/graphx/AggregateMessagesExample.scala」凡纳。
當(dāng)信息(以及聚合后的消息)的占用空間固定時(shí)(例如,浮點(diǎn)數(shù)和加法運(yùn)算帝蒿,而不是列表和拼接運(yùn)算)荐糜,
aggregateMessages
算子可以達(dá)到最好的性能。
Map Reduce Triplets 遷移指南(遺留 API)
在 GraphX 之前的版本中葛超,鄰接聚合是通過 mapReduceTriplets
算子來完成的:
class Graph[VD, ED] {
def mapReduceTriplets[Msg](
map: EdgeTriplet[VD, ED] => Iterator[(VertexId, Msg)],
reduce: (Msg, Msg) => Msg)
: VertexRDD[Msg]
}
mapReduceTriplets
接收一個(gè)用戶自定義的 map
函數(shù)暴氏,該函數(shù)會(huì)被應(yīng)用于每個(gè)三元組對象,并生成消息绣张,生成的消息通過用戶自定義的 reduce
函數(shù)進(jìn)行聚合答渔。但是發(fā)現(xiàn)返回的迭代器的使用方開銷很大,而且限制了額外的優(yōu)化措施(例如侥涵,本地節(jié)點(diǎn)的重編碼)沼撕。在 aggregateMessages
` 算子中暴露了三元組的字段,以及顯式發(fā)送信息到源節(jié)點(diǎn)和目的節(jié)點(diǎn)的函數(shù)芜飘,另外還移除了字節(jié)編碼檢查务豺,讓用戶顯式指定三元組中的哪些字段是需要的。
下面的代碼塊使用了 mapReduceTriplets
算子:
val graph: Graph[Int, Float] = ...
def msgFun(triplet: Triplet[Int, Float]): Iterator[(Int, String)] = {
Iterator((triplet.dstId, "Hi"))
}
def reduceFun(a: String, b: String): String = a + " " + b
val result = graph.mapReduceTriplets[String](msgFun, reduceFun)
可以被重寫為使用 aggregateMessages
算子:
val graph: Graph[Int, Float] = ...
def msgFun(triplet: EdgeContext[Int, Float, String]) {
triplet.sendToDst("Hi")
}
def reduceFun(a: String, b: String): String = a + " " + b
val result = graph.aggregateMessages[String](msgFun, reduceFun)
計(jì)算連接度
計(jì)算每個(gè)節(jié)點(diǎn)的連接度是一個(gè)常用的聚合任務(wù):計(jì)算每個(gè)節(jié)點(diǎn)的鄰居節(jié)點(diǎn)的數(shù)量燃箭。對于有向圖冲呢,通常需要知道每個(gè)節(jié)點(diǎn)的入度,出度和總的連接度招狸。GraphOps
類包含了一些計(jì)算節(jié)點(diǎn)連接度的算子敬拓。例如下面的示例中計(jì)算了圖中最大的出度,入度和總連接度:
// Define a reduce operation to compute the highest degree vertex
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
if (a._2 > b._2) a else b
}
// Compute the max degrees
val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max)
val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max)
收集鄰接節(jié)點(diǎn)
在某些場景下為了方便計(jì)算需要收集每個(gè)節(jié)點(diǎn)的鄰居節(jié)點(diǎn)及其屬性信息裙戏,可以通過 collectNeighborIds
和 collectNeighbors
算子來實(shí)現(xiàn)乘凸。
class GraphOps[VD, ED] {
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexId, VD)] ]
}
這些算子的開銷可能會(huì)很大,因?yàn)橛行┬畔?huì)被復(fù)制累榜,還需要大量的網(wǎng)絡(luò)通信营勤。可能的話壹罚,建議使用
aggregateMessages
` 算子來表達(dá)相同的計(jì)算葛作。
緩存及清理
Spark 中的 RDD 默認(rèn)是不會(huì)緩存到內(nèi)存中的。為了避免重復(fù)計(jì)算猖凛,對同一 RDD 多次使用時(shí)需要顯示指定緩存(參見 Spark Programming Guide)赂蠢。GraphX 中的圖也是類似的,當(dāng)需要多次使用同一張圖的數(shù)據(jù)時(shí)辨泳,請首先調(diào)用 Graph.cache()
來顯示指定緩存虱岂。
在迭代計(jì)算中玖院,為了達(dá)到最佳性能也有時(shí)也需要清除緩存。默認(rèn)情況下第岖,被緩存的 RDD 和圖數(shù)據(jù)會(huì)一直保留在內(nèi)存中难菌,直到由于內(nèi)存壓力觸發(fā) LAU 策略才會(huì)清除。對于迭代計(jì)算蔑滓,產(chǎn)生的中間計(jì)算結(jié)果會(huì)填滿緩存空間郊酒。即使它們最終被清理了,內(nèi)存中那些沒必要的存儲會(huì)降低垃圾回收的效率烫饼,所以有必要在中間結(jié)果不再需要的時(shí)候及時(shí)清理它們 猎塞。如此就需要在每個(gè)迭代中緩存圖中的 RDD,同時(shí)清理其他不需要的 RDD杠纵,將這些操作正確執(zhí)行可能比較有難度荠耽。所以對于迭代計(jì)算推薦使用 Pregel API,能夠正確的清理中間計(jì)算結(jié)果比藻。
Pregel API
圖是一種天然就具有遞歸性質(zhì)的數(shù)據(jù)結(jié)構(gòu)铝量,因?yàn)楣?jié)點(diǎn)的屬性值依賴于鄰居節(jié)點(diǎn)的屬性值,鄰居節(jié)點(diǎn)的屬性值又依賴于它們鄰居節(jié)點(diǎn)的屬性值银亲。所以慢叨,許多重要的圖算法都會(huì)迭代計(jì)算每個(gè)節(jié)點(diǎn)的屬性值直到達(dá)成某一收斂條件,根據(jù)迭代算法的特點(diǎn)業(yè)界也提出了一些并行圖計(jì)算的抽象务蝠。GraphX 采用了 Pregel 作為底層抽象拍谐。
從比較高的層面來看,受限于圖的拓?fù)浣Y(jié)構(gòu)馏段,Pregel 算子符合 BSP(Bulk-Synchronous Parallel)消息傳遞模型轩拨。Pregel 算子會(huì)執(zhí)行一系列的 super step,在每個(gè) super step 中每個(gè)節(jié)點(diǎn)接收上一個(gè) super step 中發(fā)送過來的消息院喜,并對同一節(jié)點(diǎn)接收的消息進(jìn)行聚合亡蓉,根據(jù)聚合后的消息計(jì)算出該節(jié)點(diǎn)新的屬性值,并向鄰居節(jié)點(diǎn)發(fā)送在下一個(gè) super step 中將會(huì)用到的消息喷舀。跟經(jīng)典 Pregel 模型不同的地方是砍濒,消息的計(jì)算是通過作用于三元組的一個(gè)函數(shù)并行執(zhí)行的,計(jì)算可以同時(shí)訪問源節(jié)點(diǎn)和目的節(jié)點(diǎn)硫麻。在某一 super step 中沒有收到消息的節(jié)點(diǎn)是不會(huì)參與計(jì)算的爸邢。當(dāng)不再有新的消息存在時(shí),Pregel 算子會(huì)終止計(jì)算并返回最終的圖狀態(tài)結(jié)果拿愧。
注意杠河,跟許多標(biāo)準(zhǔn)的 Pregel 實(shí)現(xiàn)不同,GraphX 中的節(jié)點(diǎn)只能發(fā)送消息到鄰居節(jié)點(diǎn),而且消息的構(gòu)建是通過一個(gè)用戶自定義函數(shù)并行進(jìn)行的感猛。這些特點(diǎn)限制了 GraphX 后續(xù)可做的一些優(yōu)化。
下面是 Pregel 算子([Pregel operator](http://spark.apache.org/docs/latest/api/scala/org/apache/spark/graphx/GraphOps.html#pregelA((VertexId,VD,A)?VD,(EdgeTriplet[VD,ED])?Iterator[(VertexId,A)],(A,A)?A))的類型簽名及其實(shí)現(xiàn)的骨架(注意:為了避免由于過長的調(diào)用鏈路造成的 stackOverflowError奢赂,Pregel 支持定期將圖狀態(tài)數(shù)據(jù)和消息持久化到檢查點(diǎn)陪白,由參數(shù)「spark.graphx.pregel.checkpointInterval」控制,可以將其設(shè)置為一個(gè)正整數(shù)膳灶,比如 10 咱士,還需要調(diào)用 SparkContext.setCheckpointDir(directory: String)
設(shè)置檢查點(diǎn)路徑):
class GraphOps[VD, ED] {
def pregel[A]
(initialMsg: A,
maxIter: Int = Int.MaxValue,
activeDir: EdgeDirection = EdgeDirection.Out)
(vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
// Receive the initial message at each vertex
var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()
// compute the messages
var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg)
var activeMessages = messages.count()
// Loop until no messages remain or maxIterations is achieved
var i = 0
while (activeMessages > 0 && i < maxIterations) {
// Receive the messages and update the vertices.
g = g.joinVertices(messages)(vprog).cache()
val oldMessages = messages
// Send new messages, skipping edges where neither side received a message. We must cache
// messages so it can be materialized on the next line, allowing us to uncache the previous
// iteration.
messages = GraphXUtils.mapReduceTriplets(
g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
activeMessages = messages.count()
i += 1
}
g
}
}
注意 Pregel 算子接收兩個(gè)參數(shù)列表(即 graph.pregel(list1)(list2)
),第一個(gè)參數(shù)列表包含初始消息值轧钓,最大迭代次數(shù)序厉,和消息發(fā)送的邊的方向(默認(rèn)為出邊),第二個(gè)參數(shù)列表包含接處理消息(vprog
)毕箍,發(fā)送消息(sendMsg
)弛房,聚合消息(mergeMsg
)的用戶自定義函數(shù)。
下面的示例展示了如何用 Pregel 算子實(shí)現(xiàn)單源最短路徑算法而柑。
import org.apache.spark.graphx.{Graph, VertexId}
import org.apache.spark.graphx.util.GraphGenerators
// A graph with edge attributes containing distances
val graph: Graph[Long, Double] =
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
val sourceId: VertexId = 42 // The ultimate source
// Initialize the graph such that all vertices except the root have distance infinity.
val initialGraph = graph.mapVertices((id, _) =>
if (id == sourceId) 0.0 else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
(id, dist, newDist) => math.min(dist, newDist), // Vertex Program
triplet => { // Send Message
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else {
Iterator.empty
}
},
(a, b) => math.min(a, b) // Merge Message
)
println(sssp.vertices.collect.mkString("\n"))
完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/graphx/SSSPExample.scala" in the Spark repo」。
圖構(gòu)建器
GraphX 提供了幾種方式從 RDD 或者磁盤上的數(shù)據(jù)中構(gòu)建出一張圖。默認(rèn)情況下這些構(gòu)建器都不會(huì)對圖中的邊進(jìn)行重分區(qū)葫松,相反使碾,所有的邊都會(huì)分布在它們默認(rèn)的分區(qū)中(比如原始 HDFS 的塊中)。Graph.groupEdges
需要圖中的邊被重分區(qū)涩澡,因?yàn)檫@個(gè)算子假設(shè)相同的邊都分布在同一個(gè)分區(qū)中顽耳,所以在使用
groupEdges算子之前需要調(diào)用 [
Graph.partitionBy`](http://spark.apache.org/docs/latest/api/scala/org/apache/spark/graphx/Graph.html#partitionBy(PartitionStrategy):Graph[VD,ED])。
object GraphLoader {
def edgeListFile(
sc: SparkContext,
path: String,
canonicalOrientation: Boolean = false,
minEdgePartitions: Int = 1)
: Graph[Int, Int]
}
GraphLoader.edgeListFile
提供了一種保存了邊信息的磁盤中文件中構(gòu)建出一張圖的方式妙同。該方法以下面的格式來解析射富,其中每一行包含一對 ID (源節(jié)點(diǎn) ID 和目的節(jié)點(diǎn) ID),會(huì)跳過以 #
開頭的行:
# This is a comment
2 1
4 1
1 2
該方法從指定的邊當(dāng)中創(chuàng)建出一張圖渐溶,會(huì)自動(dòng)添加邊中涉及到的節(jié)點(diǎn)辉浦。所有的節(jié)點(diǎn)和邊的屬性值默認(rèn)為 1。參數(shù) canonicalOrientation
允許將邊的方向正則化(srcId < dstId
)茎辐,聯(lián)通分量(connected components)算法需要這樣的性質(zhì)宪郊。參數(shù) minEdgePartitions
生成的邊的最小分區(qū)數(shù);最終的分區(qū)數(shù)可能會(huì)大于該參數(shù)的值拖陆,例如弛槐,HDFS 有更多的塊。
object Graph {
def apply[VD, ED](
vertices: RDD[(VertexId, VD)],
edges: RDD[Edge[ED]],
defaultVertexAttr: VD = null)
: Graph[VD, ED]
def fromEdges[VD, ED](
edges: RDD[Edge[ED]],
defaultValue: VD): Graph[VD, ED]
def fromEdgeTuples[VD](
rawEdges: RDD[(VertexId, VertexId)],
defaultValue: VD,
uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int]
}
[Graph.apply
](http://spark.apache.org/docs/latest/api/scala/org/apache/spark/graphx/Graph$.html#applyVD,ED 方法可以從節(jié)點(diǎn)和邊的 RDD 中構(gòu)建出一張圖依啰。重復(fù)的節(jié)點(diǎn)會(huì)從中任意挑選一個(gè)乎串,在邊的 RDD 中存在但是不在節(jié)點(diǎn)的 RDD 中存在的節(jié)點(diǎn)會(huì)被賦予默認(rèn)屬性值。
Graph.fromEdges
方法可以從單一的邊 RDD 中構(gòu)建出一張圖速警,自動(dòng)創(chuàng)建在邊的 RDD 中出現(xiàn)的節(jié)點(diǎn)叹誉,并賦予默認(rèn)的屬性值鸯两。
[Graph.fromEdgeTuples
](http://spark.apache.org/docs/latest/api/scala/org/apache/spark/graphx/Graph.html) 類型的
Some
(例如钝侠,uniqueEdges = Some(PartitionStrategy.RandomVertexCut)
) 。分區(qū)策略需要能夠歸并相同的邊來達(dá)到去重的效果酸舍。
節(jié)點(diǎn)和邊 RDD
GraphX 對外暴露了圖中節(jié)點(diǎn)和邊的 RDD 視圖帅韧。然而,因?yàn)?GraphX 以優(yōu)化后的數(shù)據(jù)結(jié)構(gòu)保存著節(jié)點(diǎn)和邊啃勉,這些數(shù)據(jù)結(jié)構(gòu)提供了附加功能忽舟,所以節(jié)點(diǎn)和邊分別以 VertexRDD
(VertexRDD)EdgeRDD
(EdgeRDD)的方式呈現(xiàn)。本章節(jié)介紹那些額外的有用的功能璧亮,注意下面只是一部分萧诫,完整的算子列表參見 API 文檔。
節(jié)點(diǎn) RDD
VertexRDD[A]
繼承了 RDD[(VertexId, A)]
枝嘶,并額外限制了每個(gè) VertexId
只能出現(xiàn)一次帘饶。VertexRDD[A]
表示一個(gè)節(jié)點(diǎn)的集合,其中每個(gè)節(jié)點(diǎn)都有一個(gè)類型為 A
的屬性值群扶。在內(nèi)部及刻,這些數(shù)據(jù)被保存為一個(gè)可復(fù)用的哈希表。進(jìn)而竞阐,如果兩個(gè) VertexRDD
來源于同一個(gè) VertexRDD
(例如缴饭,通過 filter
或者 mapValues
算子),那么這兩個(gè) VertexRDD
可以在常數(shù)時(shí)間內(nèi)做連接操作而不需要哈希重分區(qū)骆莹】怕В基于這種帶索引的數(shù)據(jù)結(jié)構(gòu), VertexRDD
給出了下面的接口:
class VertexRDD[VD] extends RDD[(VertexId, VD)] {
// Filter the vertex set but preserves the internal index
def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD]
// Transform the values without changing the ids (preserves the internal index)
def mapValues[VD2](map: VD => VD2): VertexRDD[VD2]
def mapValues[VD2](map: (VertexId, VD) => VD2): VertexRDD[VD2]
// Show only vertices unique to this set based on their VertexId's
def minus(other: RDD[(VertexId, VD)])
// Remove vertices from this set that appear in the other set
def diff(other: VertexRDD[VD]): VertexRDD[VD]
// Join operators that take advantage of the internal indexing to accelerate joins (substantially)
def leftJoin[VD2, VD3](other: RDD[(VertexId, VD2)])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]
def innerJoin[U, VD2](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
// Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD.
def aggregateUsingIndex[VD2](other: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
}
請注意 filter
算子是如何返回一個(gè) VertexRDD
的幕垦,過濾函數(shù)是通過一個(gè) BitSet
實(shí)現(xiàn)的丢氢,所以能夠復(fù)用索引結(jié)構(gòu),從而可以跟其他的 VertexRDD
快速進(jìn)行連接操作先改。同樣的疚察,mapValues
算子并不允許 map
函數(shù)改變節(jié)點(diǎn)的 VertexId
,從而能夠復(fù)用之前的 HashMap
索引結(jié)構(gòu)仇奶。leftJoin
和 innerJoin
算子在連接兩個(gè)來源于擁有相同 HashMap
索引結(jié)構(gòu)的 VertexRDD
時(shí)貌嫡,能夠識別相同 ID 的節(jié)點(diǎn),能夠在連接時(shí)線性掃描而不需要單點(diǎn)查找。
aggregateUsingIndex
算子可以高效的從一個(gè) RDD[(VertexId, A)]
中創(chuàng)建一個(gè)新的 VertexRDD
岛抄。從概念上來講别惦,如果已經(jīng)從已有的節(jié)點(diǎn)數(shù)據(jù)集上構(gòu)建了一個(gè) VertexRDD[B]
,它是另一個(gè)節(jié)點(diǎn)集 RDD[(VertexId, A)]
的超集夫椭,那么就可以復(fù)用 VertexRDD[B]
的索引結(jié)構(gòu)來輔助聚合操作以及創(chuàng)建 RDD[(VertexId, A)]
的索引步咪。例如:
val setA: VertexRDD[Int] = VertexRDD(sc.parallelize(0L until 100L).map(id => (id, 1)))
val rddB: RDD[(VertexId, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0)))
// There should be 200 entries in rddB
rddB.count
val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _)
// There should be 100 entries in setB
setB.count
// Joining A and B should now be fast!
val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)
邊 RDD
EdgeRDD[ED]
繼承了 RDD[Edge[ED]]
,通過定義在 PartitionStrategy
中的一些分區(qū)策略來對存儲了邊的數(shù)據(jù)塊進(jìn)行分區(qū)益楼。在每個(gè)分區(qū)中,邊的屬性值和相鄰的拓?fù)浣Y(jié)構(gòu)是分開存儲的点晴,來最大化的復(fù)用這些數(shù)據(jù)感凤,即使邊的屬性值發(fā)生了變化。
下面是三個(gè)典型的 EdgeRDD
的功能方法:
// Transform the edge attributes while preserving the structure
def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2]
// Reverse the edges reusing both attributes and structure
def reverse: EdgeRDD[ED]
// Join two `EdgeRDD`s partitioned using the same partitioning strategy.
def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]
在大多數(shù)應(yīng)用中粒督,對于 EdgeRDD
的操作是通過圖的算子或者的 RDD
基礎(chǔ)算子來完成的陪竿。
優(yōu)化表示方式
盡管關(guān)于分布式環(huán)境下圖的表示方式的優(yōu)化超出了本文的討論范疇,還是可以從一個(gè)比較高的層面上對優(yōu)化手段作一些了解屠橄,有助于彈性算法的設(shè)計(jì)實(shí)現(xiàn)和 API 的使用族跛。GraphX 采用節(jié)點(diǎn)分割的方式來對分布式圖進(jìn)行分區(qū):
相比于根據(jù)邊來切割圖數(shù)據(jù),GraphX 根據(jù)節(jié)點(diǎn)切割來進(jìn)行分區(qū)锐墙,能夠同時(shí)減少通信和存儲的開銷礁哄。從邏輯層面講,每條邊會(huì)被分配到不同的節(jié)點(diǎn)上溪北,而同一個(gè)節(jié)點(diǎn)可能會(huì)被分配到不同的節(jié)點(diǎn)上桐绒。具體的分配方式取決于 PartitionStrategy
,不同的方式之間都有一些取舍之拨,用戶可以通過 Graph.partitionBy
來對圖數(shù)據(jù)進(jìn)行重分區(qū)茉继。默認(rèn)的分區(qū)策略是構(gòu)建 Graph 對象時(shí)邊數(shù)據(jù)的初始分區(qū),不過用戶可以輕松的切換到 2D 分區(qū)策略或者其他 GrphX 提供的策略蚀乔。
一旦邊數(shù)據(jù)劃分好了分區(qū)烁竭,并行圖計(jì)算主要的挑戰(zhàn)就變成了如何高效的對節(jié)點(diǎn)屬性和邊進(jìn)行關(guān)聯(lián)操作。由于現(xiàn)實(shí)世界中的圖數(shù)據(jù)大部分情況下邊的數(shù)量都遠(yuǎn)超過點(diǎn)的數(shù)量吉挣,GraphX 將節(jié)點(diǎn)屬性和邊放在一起派撕。由于并不是所有的分區(qū)中的邊和所有的節(jié)點(diǎn)都相鄰,GraphX 內(nèi)部維護(hù)了一張路由表听想,用來在進(jìn)行 triplets
和 aggregateMessages
的連接操作時(shí)判定節(jié)點(diǎn)應(yīng)該被廣播到那個(gè)節(jié)點(diǎn)腥刹。
圖算法
GraphX 提供了一些圖算法來簡化數(shù)據(jù)分析,這些算法位于 org.apache.spark.graphx.lib
包汉买,可以直接通過 Graph
的方法訪問 GraphOps
中的算法衔峰。本章節(jié)對其中一些算法做簡要的介紹。
網(wǎng)站排名
PageRank 算法衡量圖中每個(gè)節(jié)點(diǎn)的重要性,一條從節(jié)點(diǎn) u 到節(jié)點(diǎn) v 的邊代表 u 節(jié)點(diǎn)對 v 節(jié)點(diǎn)重要性的貢獻(xiàn)垫卤。例如威彰,如果一個(gè) Twitter 用戶用很多關(guān)注者,該用戶的排名就會(huì)較高穴肘。
GraphX 在單例對象 PageRank
中提供了 PageRank 算法靜態(tài)和動(dòng)態(tài)的實(shí)現(xiàn)歇盼。靜態(tài)的 PageRank 算法執(zhí)行固定的迭代次數(shù),而動(dòng)態(tài)的 PageRank 算法會(huì)一直運(yùn)行直到結(jié)果收斂(即评抚,誤差在一個(gè)指定的范圍內(nèi))豹缀。可以通過 Graph
來直接調(diào)用 GraphOps
中的這些算法慨代。
GraphX 還提供了一個(gè)可以運(yùn)行 PageRank 算法的社交網(wǎng)絡(luò)數(shù)據(jù)集邢笙。用戶的信息保存在文件 data/graphx/users.txt
中,用戶之間的關(guān)系的數(shù)據(jù)保存在文件 data/graphx/followers.txt
中侍匙。計(jì)算的代碼如下:
import org.apache.spark.graphx.GraphLoader
// Load the edges as a graph
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Run PageRank
val ranks = graph.pageRank(0.0001).vertices
// Join the ranks with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val ranksByUsername = users.join(ranks).map {
case (id, (username, rank)) => (username, rank)
}
// Print the result
println(ranksByUsername.collect().mkString("\n"))
完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/graphx/PageRankExample.scala」氮惯。
連通分量
連通分量算法將圖中的每個(gè)節(jié)點(diǎn)的屬性值標(biāo)記為所在連通分量中的節(jié)點(diǎn)的最小 ID。例如想暗,在一個(gè)社交網(wǎng)絡(luò)中妇汗,連通分量可以做簡單的聚類分析。GraphX 在單例對象 ConnectedComponents
中包含了算法的實(shí)現(xiàn)说莫,下面還是根據(jù) PageRank 章節(jié)的數(shù)據(jù)進(jìn)行連通分量的計(jì)算:
import org.apache.spark.graphx.GraphLoader
// Load the graph as in the PageRank example
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Find the connected components
val cc = graph.connectedComponents().vertices
// Join the connected components with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val ccByUsername = users.join(cc).map {
case (id, (username, cc)) => (username, cc)
}
// Print the result
println(ccByUsername.collect().mkString("\n"))
完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/graphx/ConnectedComponentsExample.scala」杨箭。
三角計(jì)數(shù)
當(dāng)一個(gè)節(jié)點(diǎn)的兩個(gè)鄰居節(jié)點(diǎn)之間有一條邊時(shí),該節(jié)點(diǎn)就是一個(gè)三角形的一部分储狭。GraphX 在單例對象 TriangleCount
中實(shí)現(xiàn)了三角計(jì)數(shù)的算法告唆,計(jì)算每個(gè)節(jié)點(diǎn)所在的三角形的數(shù)量,作為一種聚類的手段晶密。這里使用 PageRank 章節(jié)的社交網(wǎng)絡(luò)數(shù)據(jù)集來進(jìn)行三角計(jì)數(shù)算法擒悬。注意 TriangleCount
算法需要圖中的邊的方向是正則化(srcId < dstId
)后的,并且需要使用 Graph.partitionBy
進(jìn)行分區(qū)稻艰。
import org.apache.spark.graphx.{GraphLoader, PartitionStrategy}
// Load the edges in canonical order and partition the graph for triangle count
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt", true)
.partitionBy(PartitionStrategy.RandomVertexCut)
// Find the triangle count for each vertex
val triCounts = graph.triangleCount().vertices
// Join the triangle counts with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) =>
(username, tc)
}
// Print the result
println(triCountByUsername.collect().mkString("\n"))
完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/graphx/TriangleCountingExample.scala」懂牧。
示例代碼
假設(shè)需要從一些文本文件中構(gòu)建出一張圖,將圖限制在重要的關(guān)系和用戶之間尊勿,在子圖上執(zhí)行 PageRank 算法僧凤,最后返回頭部用戶的屬性值,可以這樣做:
import org.apache.spark.graphx.GraphLoader
// Load my user data and parse into tuples of user id and attribute list
val users = (sc.textFile("data/graphx/users.txt")
.map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))
// Parse the edge data which is already in userId -> userId format
val followerGraph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Attach the user attributes
val graph = followerGraph.outerJoinVertices(users) {
case (uid, deg, Some(attrList)) => attrList
// Some users may not have attributes so we set them as empty
case (uid, deg, None) => Array.empty[String]
}
// Restrict the graph to users with usernames and names
val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2)
// Compute the PageRank
val pagerankGraph = subgraph.pageRank(0.001)
// Get the attributes of the top pagerank users
val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) {
case (uid, attrList, Some(pr)) => (pr, attrList.toList)
case (uid, attrList, None) => (0.0, attrList.toList)
}
println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n"))
完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/graphx/ComprehensiveExample.scala」元扔。