07 GraphX Programming Guide

轉(zhuǎn)載請注明出處搞监,謝謝合作~

GraphX 編程指南

GraphX

概述

GraphX 是 Spark 中一個(gè)新的組件,用來應(yīng)對并行圖計(jì)算的場景。從一個(gè)比較高的層次來看愧哟,GraphX 通過擴(kuò)展 RDD 的概念提出了一個(gè)新的 Graph 抽象:一個(gè)節(jié)點(diǎn)和邊都包含屬性的有向多重圖劣挫。為了能夠支持圖計(jì)算,GraphX 給出了一組基礎(chǔ)的算子(例如傲绣,subgraph掠哥, joinVerticesaggregateMessages)以及優(yōu)化后的 Pregel API。另外秃诵,GraphX 還在持續(xù)更新圖算法(algorithms)和圖構(gòu)建器(builders)的集合來簡化圖分析任務(wù)续搀。

準(zhǔn)備工作

需要首先引入 Spark 和 GraphX 的依賴到項(xiàng)目中,如下所示:

import org.apache.spark._
import org.apache.spark.graphx._
// To make some of the examples work we will also need RDD
import org.apache.spark.rdd.RDD

如果不是通過 Spark shell 練習(xí)的話菠净,就需要先初始化一個(gè) SparkContext禁舷。關(guān)于快速上手 Spark 應(yīng)用的更多信息參見 Spark Quick Start Guide

屬性圖

屬性圖(property graph)是一個(gè)有向的多重圖毅往,每個(gè)節(jié)點(diǎn)和每條邊都可以自定義屬性牵咙。有向多重圖是指有向圖中某些相同的節(jié)點(diǎn)之間可能存在多條邊,這種能力為相同的節(jié)點(diǎn)間在存在多種關(guān)系(例如煞抬,同時(shí)是同事和朋友)的場景的建模提供了便利霜大。每個(gè)節(jié)點(diǎn)都以一個(gè) 64 位的長整型作為 ID(VertexId),GraphX 并不需要節(jié)點(diǎn) ID 是有序的革答。每條邊也會(huì)包含相應(yīng)的源節(jié)點(diǎn) ID 和目的節(jié)點(diǎn) ID战坤。

屬性圖由節(jié)點(diǎn)類型(VD)和邊類型(ED)標(biāo)定,分別代表跟節(jié)點(diǎn)和邊綁定的自定義對象類型残拐。

當(dāng)節(jié)點(diǎn)和邊的類型為基本類型(例如途茫,int,double 等等)時(shí)溪食,GraphX 優(yōu)化了它們的表示方式囊卜,通過特殊的數(shù)組來存儲以減少內(nèi)存的消耗。

在一些場景下,需要同一張圖中的節(jié)點(diǎn)能夠擁有不同的屬性栅组,這樣的需求可以通過繼承來實(shí)現(xiàn)雀瓢。例如,如果對具有用戶和產(chǎn)品屬性的二分圖進(jìn)行建模玉掸,可以通過下面的方式:

class VertexProperty()
case class UserProperty(val name: String) extends VertexProperty
case class ProductProperty(val name: String, val price: Double) extends VertexProperty
// The graph might then have the type:
var graph: Graph[VertexProperty, String] = null

與 RDD 類似刃麸,屬性圖也是不可變的,分布式的和容錯(cuò)的司浪。節(jié)點(diǎn)值或者圖拓?fù)浣Y(jié)構(gòu)的變更會(huì)根據(jù)相應(yīng)的變更生成一張新圖泊业。需要注意的是,原圖中沒有發(fā)生變化的部分(即啊易,未受影響的圖結(jié)構(gòu)吁伺,屬性和索引)在構(gòu)建新圖的時(shí)候可以被復(fù)用,來減少函數(shù)式數(shù)據(jù)結(jié)構(gòu)的開銷租谈。整張圖根據(jù)節(jié)點(diǎn)被劃分為多個(gè)分區(qū)篮奄,分布在各個(gè) Executor 上。和 RDD 一樣割去,圖的每個(gè)分區(qū)都可以在故障時(shí)在另一個(gè)節(jié)點(diǎn)上被重新創(chuàng)建宦搬。

在邏輯上,屬性圖由一些強(qiáng)類型的數(shù)據(jù)集合(RDD)構(gòu)成劫拗,數(shù)據(jù)集合中存儲著每個(gè)節(jié)點(diǎn)和邊的屬性信息。于是矾克,從 Graph 類中可以訪問節(jié)點(diǎn)和邊:

class Graph[VD, ED] {
  val vertices: VertexRDD[VD]
  val edges: EdgeRDD[ED]
}

VertexRDD[VD]EdgeRDD[ED] 分別代表優(yōu)化后的 RDD[(VertexId, VD)]RDD[Edge[ED]]页慷,它們都有一些契合圖計(jì)算的額外功能,為圖計(jì)算提供優(yōu)化措施胁附。關(guān)于 VertexRDDVertexRDDEdgeRDDEdgeRDD 的 API 會(huì)在 vertex and edge RDDs 章節(jié)中詳細(xì)討論酒繁,暫時(shí)可以將它們簡單的當(dāng)做 RDD[(VertexId, VD)]RDD[Edge[ED]]

屬性圖示例

假設(shè)想要構(gòu)建一張屬性圖控妻,包含了 GraphX 項(xiàng)目的合作人員州袒,節(jié)點(diǎn)屬性可能會(huì)包含名字和職業(yè),可以用一個(gè)字符串來描述合作人員之間的關(guān)系弓候,并用邊來表示:

The Property Graph

這張圖的類型簽名如下:

val userGraph: Graph[(String, String), String]

可以通過幾種方式構(gòu)建一張屬性圖郎哭,比如從文件、RDD 甚至自定義生成器菇存,這些將會(huì)在 graph builders 章節(jié)中詳細(xì)討論夸研。最常用的方式可能是通過 Graph object,例如下面的代碼從一些 RDD 中構(gòu)建了一張圖依鸥。

// Assume the SparkContext has already been constructed
val sc: SparkContext
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
  sc.parallelize(Seq((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                       (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
  sc.parallelize(Seq(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
                       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)

在上面的示例中使用了樣本類 Edge亥至,其中包含一個(gè) srcId 和一個(gè) dstId,分別代表源節(jié)點(diǎn)和目的節(jié)點(diǎn)的 ID,此外還有一個(gè) attr 成員變量存儲了邊的屬性姐扮。

可以通過成員變量 graph.verticesgraph.edges 來分別獲取屬性圖中的節(jié)點(diǎn)視圖和邊的視圖絮供。

val graph: Graph[(String, String), String] // Constructed from above
// Count all users which are postdocs
graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count
// Count all the edges where src > dst
graph.edges.filter(e => e.srcId > e.dstId).count

注意 graph.vertices 返回一個(gè) VertexRDD[(String, String)],繼承自 RDD[(VertexId, (String, String))]茶敏,所以可以使用 scala 的 case 表達(dá)式將其結(jié)構(gòu)為 tuple壤靶。同時(shí),graph.edges 返回一個(gè) EdgeRDD睡榆,其中包含了 Edge[String] 對象萍肆,也能夠以下面的方式使用構(gòu)造器:

graph.edges.filter { case Edge(src, dst, prop) => src > dst }.count

除了屬性圖的節(jié)點(diǎn)和邊的視圖,GraphX 還提供了一種三元組視圖胀屿。三元組視圖在邏輯上關(guān)聯(lián)了節(jié)點(diǎn)和邊的屬性塘揣,生成一個(gè) RDD[EdgeTriplet[VD, ED]],其中包含 EdgeTriplet 類的實(shí)例宿崭。關(guān)聯(lián)行為可以用下面的 SQL 語句來表示:

SELECT src.id, dst.id, src.attr, e.attr, dst.attr
FROM edges AS e LEFT JOIN vertices AS src, vertices AS dst
ON e.srcId = src.Id AND e.dstId = dst.Id

或者以圖的形式:

Edge Triplet

EdgeTriplet 類繼承了 Edge亲铡,并添加了成員變量 srcAttrdstAttr,分別表示源節(jié)點(diǎn)和目的節(jié)點(diǎn)的屬性值葡兑〗甭可以使用三元組視圖來渲染字符串,生成用戶間關(guān)系的集合讹堤。

val graph: Graph[(String, String), String] // Constructed from above
// Use the triplets view to create an RDD of facts.
val facts: RDD[String] =
  graph.triplets.map(triplet =>
    triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1)
facts.collect.foreach(println(_))

圖算子

RDD 有一些基礎(chǔ)算子吆鹤,例如 mapfilterreduceByKey洲守,屬性圖也有一些基礎(chǔ)算子疑务,可以接收用戶自定義的函數(shù),根據(jù)轉(zhuǎn)換后的屬性值和拓?fù)浣Y(jié)構(gòu)生成新的圖梗醇。優(yōu)化后的核心算子定義在類 Graph 中知允,還有一些比較方便的算子作為二話不說算子的補(bǔ)充被定義在類 GraphOps 中。但是叙谨,得益于 Scala 的隱式轉(zhuǎn)換特性温鸽,定義在 GraphOps 中的算子可以被自動(dòng)轉(zhuǎn)化為的 Graph 算子。例如手负,可以通過以下方式計(jì)算每個(gè)節(jié)點(diǎn)的入度(定義在 GraphOps 中)涤垫。

val graph: Graph[(String, String), String]
// Use the implicit GraphOps.inDegrees operator
val inDegrees: VertexRDD[Int] = graph.inDegrees

核心圖 API 和 GraphOps 的區(qū)別是為了在未來更好的支持圖的不同呈現(xiàn)方式,每一種方式必須提供核心算子的實(shí)現(xiàn)虫溜,并且可以復(fù)用許多 GraphOps 中定義的算子雹姊。

算子列表

下面是一個(gè)關(guān)于 GraphGraphOps 提供的算法功能的快速總結(jié),為了方便起見衡楞,都展示為 Graph 的成員變量吱雏。注意有一些函數(shù)的簽名被簡化了(例如敦姻,去除了默認(rèn)參數(shù)和類型約束),還有一些高級功能沒有介紹在內(nèi)歧杏,所以請通過官方的 API 文檔獲取完整的算子列表镰惦。

/** Summary of the functionality in the property graph */
class Graph[VD, ED] {
  // Information about the Graph ===================================================================
  val numEdges: Long
  val numVertices: Long
  val inDegrees: VertexRDD[Int]
  val outDegrees: VertexRDD[Int]
  val degrees: VertexRDD[Int]
  // Views of the graph as collections =============================================================
  val vertices: VertexRDD[VD]
  val edges: EdgeRDD[ED]
  val triplets: RDD[EdgeTriplet[VD, ED]]
  // Functions for caching graphs ==================================================================
  def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
  def cache(): Graph[VD, ED]
  def unpersistVertices(blocking: Boolean = false): Graph[VD, ED]
  // Change the partitioning heuristic  ============================================================
  def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
  // Transform vertex and edge attributes ==========================================================
  def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
  def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
    : Graph[VD, ED2]
  // Modify the graph structure ====================================================================
  def reverse: Graph[VD, ED]
  def subgraph(
      epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
      vpred: (VertexId, VD) => Boolean = ((v, d) => true))
    : Graph[VD, ED]
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
  // Join RDDs with the graph ======================================================================
  def joinVertices[U](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
  def outerJoinVertices[U, VD2](other: RDD[(VertexId, U)])
      (mapFunc: (VertexId, VD, Option[U]) => VD2)
    : Graph[VD2, ED]
  // Aggregate information about adjacent triplets =================================================
  def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
  def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
  def aggregateMessages[Msg: ClassTag](
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[A]
  // Iterative graph-parallel computation ==========================================================
  def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
      vprog: (VertexId, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED]
  // Basic graph algorithms ========================================================================
  def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
  def connectedComponents(): Graph[VertexId, ED]
  def triangleCount(): Graph[Int, ED]
  def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]
}

屬性算子

與 RDD 的 map 算子類似,屬性圖包含以下算子:

class Graph[VD, ED] {
  def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
}

每個(gè)算子都會(huì)生成一張新的圖犬绒,其中的節(jié)點(diǎn)或者邊的屬性值被用戶自定義的 map 函數(shù)進(jìn)行了轉(zhuǎn)換旺入。

注意,圖的拓?fù)浣Y(jié)構(gòu)不會(huì)發(fā)生改變凯力,這是這些算子的共同特性茵瘾,可以讓新圖復(fù)用原圖的索引。下面的代碼片段在邏輯上是等同的咐鹤,但是第一種方式不會(huì)保留圖的結(jié)構(gòu)索引拗秘,沒有用到 GraphX 所做的優(yōu)化:

val newVertices = graph.vertices.map { case (id, attr) => (id, mapUdf(id, attr)) }
val newGraph = Graph(newVertices, graph.edges)

相反,使用 mapVertices` 來復(fù)用索引:

val newGraph = graph.mapVertices((id, attr) => mapUdf(id, attr))

這些算子通常用來根據(jù)特定計(jì)算初始化圖的結(jié)構(gòu)或者剔除不需要的屬性信息祈惶。例如雕旨,在執(zhí)行 PageRank 算法前,將節(jié)點(diǎn)出度作為屬性值進(jìn)行初始化:

// Given a graph where the vertex property is the out degree
val inputGraph: Graph[Int, String] =
  graph.outerJoinVertices(graph.outDegrees)((vid, _, degOpt) => degOpt.getOrElse(0))
// Construct a graph where each edge contains the weight
// and each vertex is the initial PageRank
val outputGraph: Graph[Double, Double] =
  inputGraph.mapTriplets(triplet => 1.0 / triplet.srcAttr).mapVertices((id, _) => 1.0)

結(jié)構(gòu)算子

目前 GraphX 只支持一小部分常用的結(jié)構(gòu)算子捧请,在未來會(huì)豐富此類算子凡涩。下面列舉了一些基礎(chǔ)結(jié)構(gòu)算子。

class Graph[VD, ED] {
  def reverse: Graph[VD, ED]
  def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
               vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
}

reverse 算子返回一張新圖疹蛉,將原圖中所有的邊做反向處理活箕,該算子適用于形如計(jì)算反向 PageRank 的場景。由于反向算子并不會(huì)修改節(jié)點(diǎn)和邊的屬性值可款,可以在實(shí)現(xiàn)中避免避免數(shù)據(jù)遷移和去重讹蘑,以提高性能。

subgraph算子接收節(jié)點(diǎn)和邊的斷言函數(shù)筑舅,返回的新圖中只包含了滿足斷言函數(shù)(返回值為 true)的節(jié)點(diǎn)和邊,以及滿足斷言函數(shù)的連接點(diǎn)(如果節(jié)點(diǎn)斷言為真陨舱,刪除相應(yīng)邊不會(huì)刪除節(jié)點(diǎn)翠拣;如果節(jié)點(diǎn)斷言為假,無論邊斷言是否為真都會(huì)被刪除)游盲。subgraph` 算子可以被用來限制圖的節(jié)點(diǎn)和邊误墓,例如上述示例中刪除破裂的關(guān)系,如下面的代碼:

// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
  sc.parallelize(Seq((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                       (5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
                       (4L, ("peter", "student"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
  sc.parallelize(Seq(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
                       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
                       Edge(4L, 0L, "student"),   Edge(5L, 0L, "colleague")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
// Notice that there is a user 0 (for which we have no information) connected to users
// 4 (peter) and 5 (franklin).
graph.triplets.map(
  triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
).collect.foreach(println(_))
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// The valid subgraph will disconnect users 4 and 5 by removing user 0
validGraph.vertices.collect.foreach(println(_))
validGraph.triplets.map(
  triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
).collect.foreach(println(_))

注意上面的示例中只提供了節(jié)點(diǎn)斷言函數(shù)益缎。如果節(jié)點(diǎn)或者邊的斷言函數(shù)沒有提供谜慌, subgraph 算子會(huì)石筍默認(rèn)值 true

mask 算子返回一張子圖莺奔,其中包含輸入圖中也存在的節(jié)點(diǎn)和邊欣范,該算子可以和 subgraph 算子配合使用,從而通過另一張圖的關(guān)聯(lián)關(guān)系約束本圖的拓?fù)浣Y(jié)構(gòu)。例如恼琼,可以再全圖節(jié)點(diǎn)上計(jì)算連通分量妨蛹,之后再通過有效子圖優(yōu)化計(jì)算結(jié)果。

// Run Connected Components
val ccGraph = graph.connectedComponents() // No longer contains missing field
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// Restrict the answer to the valid subgraph
val validCCGraph = ccGraph.mask(validGraph)

groupEdges` 算子可以合并多重圖中并行的邊(即晴竞,同一節(jié)點(diǎn)對之間的多條邊)蛙卤。在很多數(shù)值應(yīng)用中,并行邊可以被求和(聚合權(quán)重)噩死,從而合并成一條單獨(dú)的邊颤难,減小圖的尺寸。

連接算子

在許多場景下已维,需要將圖數(shù)據(jù)和其他的數(shù)據(jù)集(RDD)關(guān)聯(lián)起來行嗤。例如,可能需要將額外的用戶屬性需要融合到現(xiàn)有的圖中衣摩,或者需要將一張圖中的節(jié)點(diǎn)屬性拉取到另一張圖中昂验。這些需求可以通過連接算子來實(shí)現(xiàn)。下面列舉了主要的連接算子:

class Graph[VD, ED] {
  def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)
    : Graph[VD, ED]
  def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
    : Graph[VD2, ED]
}

joinVertices算子將節(jié)點(diǎn)和輸入 RDD 進(jìn)行關(guān)聯(lián)艾扮,返回一張新的圖既琴,其中節(jié)點(diǎn)的屬性通過應(yīng)用用戶自定義的map` 函數(shù)計(jì)算得來。沒有匹配到的節(jié)點(diǎn)會(huì)保留原來的屬性值泡嘴。

注意甫恩,如果 RDD 中匹配某節(jié)點(diǎn)的元素多于一個(gè),只有有一個(gè)生效酌予。所以建議通過下面的方式來對 RDD 去重磺箕,同時(shí)預(yù)建索引,可以加速后續(xù)的連接計(jì)算抛虫。

val nonUniqueCosts: RDD[(VertexId, Double)]
val uniqueCosts: VertexRDD[Double] =
  graph.vertices.aggregateUsingIndex(nonUnique, (a,b) => a + b)
val joinedGraph = graph.joinVertices(uniqueCosts)(
  (id, oldCost, extraCost) => oldCost + extraCost)

更通用的 outerJoinVertices算子和joinVertices的行為相似松靡,但是該算子會(huì)應(yīng)用于所有的節(jié)點(diǎn),還能夠改變節(jié)點(diǎn)的的屬性值類型建椰。由于并不是每個(gè)節(jié)點(diǎn)在 RDD 中都有對應(yīng)的值雕欺,map函數(shù)接收一個(gè)Option` 類型的參數(shù)。例如棉姐,可以使用該算子將一張圖的節(jié)點(diǎn)屬性初始化為它們的出度屠列,供 PageRank 后續(xù)計(jì)算。

val outDegrees: VertexRDD[Int] = graph.outDegrees
val degreeGraph = graph.outerJoinVertices(outDegrees) { (id, oldAttr, outDegOpt) =>
  outDegOpt match {
    case Some(outDeg) => outDeg
    case None => 0 // No outDegree means zero outDegree
  }
}

你可注意到了上面的示例中使用的柯立化形式(例如伞矩,f(a)(b))的列表參數(shù)笛洛,盡管 f(a)(b)f(a,b) 的寫法在功能上沒有區(qū)別,但是后者意味著參數(shù) b 的類型推斷不依賴于參數(shù) a乃坤。所以苛让,用戶需要為自定義函數(shù)提供類型聲明:

val joinedGraph = graph.joinVertices(uniqueCosts,
  (id: VertexId, oldCost: Double, extraCost: Double) => oldCost + extraCost)

鄰接聚合

圖分析任務(wù)中的一個(gè)關(guān)鍵步驟就是聚合每個(gè)節(jié)點(diǎn)的鄰居的信息沟蔑。例如,如果想要知道一個(gè)用戶的關(guān)注者數(shù)量或者關(guān)注者的平均年齡蝌诡,許多圖迭代算法(例如溉贿,網(wǎng)頁排名,最短路徑和連通分量)也會(huì)重復(fù)聚合鄰居節(jié)點(diǎn)的屬性值(例如浦旱,當(dāng)前的排名值宇色,源的最短路徑和可達(dá)節(jié)點(diǎn)的最小 ID)。

為了提升性能颁湖,基礎(chǔ)聚合算子從 graph.mapReduceTriplets 升級為新的 graph.AggregateMessages宣蠕。盡管 API 的變動(dòng)很小,下面還是提供了遷移指南甥捺。

聚合消息 (aggregateMessages)

GraphX 中最核心的聚合算子就是 aggregateMessages抢蚀。該算子將一個(gè)用戶自定義的sendMsg函數(shù)應(yīng)用于圖中的每個(gè) *edge triplet*,之后使用mergeMsg` 函數(shù)在目標(biāo)節(jié)點(diǎn)處聚合這些消息镰禾。

class Graph[VD, ED] {
  def aggregateMessages[Msg: ClassTag](
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[Msg]
}

用戶自定義的函數(shù)接收一個(gè) EdgeContext 對象皿曲,該對象中可以訪問源節(jié)點(diǎn)和目的節(jié)點(diǎn)的屬性信息,以及邊的屬性信息吴侦,還能夠使用函數(shù)(sendToSrcsendToDst)將消息發(fā)送給源節(jié)點(diǎn)或者目的節(jié)點(diǎn)屋休。可以將 sendMsg 理解為 MapReduce 中的 map 函數(shù)备韧,而 mergeMsg 函數(shù)將發(fā)送給同一節(jié)點(diǎn)的多個(gè)消息聚合成為一個(gè)劫樟,可以將其理解為 MapReduce 中的 reduce 函數(shù)。aggregateMessages算子返回一個(gè)VertexRDD[Msg]织堂,其中包含了發(fā)送給每個(gè)節(jié)點(diǎn)的聚合后的消息(類型為Msg`)叠艳。沒有收到消息的節(jié)點(diǎn)不會(huì)被包含在內(nèi)。

此外易阳,算子還可以接收一個(gè)可選的 tripletsFields 參數(shù)附较,表示可以在 EdgeContext 中訪問的數(shù)據(jù)。該參數(shù)的可選項(xiàng)定義在 TripletFields 中潦俺,默認(rèn)值為 TripletFields.All翅睛,表示用戶自定義的 sendMsg 函數(shù)可能會(huì)訪問 EdgeContext 對象中的任意成員。該參數(shù)可以用來告知 GraphX 只有部分的 EdgeContext 成員在計(jì)算過程中是需要的黑竞,從而讓 GraphX 能夠選取一個(gè)優(yōu)化后的連接策略。例如如果需要計(jì)算每個(gè)用戶的關(guān)注者的平局年齡疏旨,只需要訪問源節(jié)點(diǎn)的屬性很魂,便可以將該參數(shù)配置為 TripletFields.Src 來表示只需要訪問源節(jié)點(diǎn)的屬性。

在之前的 GraphX 版本中使用字節(jié)編碼來推斷 TripletFields 的值檐涝,但是發(fā)現(xiàn)字節(jié)編碼的檢查并不是很靠譜遏匆,于是替換為顯式的控制方式法挨。(我理解是把位運(yùn)算替換為了顯式的布爾類型)

在下面的示例中,使用 aggregateMessages` 算子來計(jì)算每個(gè)用戶的關(guān)注者中年齡比該用戶大的用戶的平均年齡幅聘。

import org.apache.spark.graphx.{Graph, VertexRDD}
import org.apache.spark.graphx.util.GraphGenerators

// Create a graph with "age" as the vertex property.
// Here we use a random graph for simplicity.
val graph: Graph[Double, Int] =
  GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
// Compute the number of older followers and their total age
val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
  triplet => { // Map Function
    if (triplet.srcAttr > triplet.dstAttr) {
      // Send message to destination vertex containing counter and age
      triplet.sendToDst((1, triplet.srcAttr))
    }
  },
  // Add counter and age
  (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
)
// Divide total age by number of older followers to get average age of older followers
val avgAgeOfOlderFollowers: VertexRDD[Double] =
  olderFollowers.mapValues( (id, value) =>
    value match { case (count, totalAge) => totalAge / count } )
// Display the results
avgAgeOfOlderFollowers.collect.foreach(println(_))

完整的示例代碼位于 Spark 安裝包的 「examples/src/main/scala/org/apache/spark/examples/graphx/AggregateMessagesExample.scala」凡纳。

當(dāng)信息(以及聚合后的消息)的占用空間固定時(shí)(例如,浮點(diǎn)數(shù)和加法運(yùn)算帝蒿,而不是列表和拼接運(yùn)算)荐糜,aggregateMessages 算子可以達(dá)到最好的性能。

Map Reduce Triplets 遷移指南(遺留 API)

在 GraphX 之前的版本中葛超,鄰接聚合是通過 mapReduceTriplets 算子來完成的:

class Graph[VD, ED] {
  def mapReduceTriplets[Msg](
      map: EdgeTriplet[VD, ED] => Iterator[(VertexId, Msg)],
      reduce: (Msg, Msg) => Msg)
    : VertexRDD[Msg]
}

mapReduceTriplets 接收一個(gè)用戶自定義的 map 函數(shù)暴氏,該函數(shù)會(huì)被應(yīng)用于每個(gè)三元組對象,并生成消息绣张,生成的消息通過用戶自定義的 reduce 函數(shù)進(jìn)行聚合答渔。但是發(fā)現(xiàn)返回的迭代器的使用方開銷很大,而且限制了額外的優(yōu)化措施(例如侥涵,本地節(jié)點(diǎn)的重編碼)沼撕。在 aggregateMessages` 算子中暴露了三元組的字段,以及顯式發(fā)送信息到源節(jié)點(diǎn)和目的節(jié)點(diǎn)的函數(shù)芜飘,另外還移除了字節(jié)編碼檢查务豺,讓用戶顯式指定三元組中的哪些字段是需要的。

下面的代碼塊使用了 mapReduceTriplets 算子:

val graph: Graph[Int, Float] = ...
def msgFun(triplet: Triplet[Int, Float]): Iterator[(Int, String)] = {
  Iterator((triplet.dstId, "Hi"))
}
def reduceFun(a: String, b: String): String = a + " " + b
val result = graph.mapReduceTriplets[String](msgFun, reduceFun)

可以被重寫為使用 aggregateMessages 算子:

val graph: Graph[Int, Float] = ...
def msgFun(triplet: EdgeContext[Int, Float, String]) {
  triplet.sendToDst("Hi")
}
def reduceFun(a: String, b: String): String = a + " " + b
val result = graph.aggregateMessages[String](msgFun, reduceFun)

計(jì)算連接度

計(jì)算每個(gè)節(jié)點(diǎn)的連接度是一個(gè)常用的聚合任務(wù):計(jì)算每個(gè)節(jié)點(diǎn)的鄰居節(jié)點(diǎn)的數(shù)量燃箭。對于有向圖冲呢,通常需要知道每個(gè)節(jié)點(diǎn)的入度,出度和總的連接度招狸。GraphOps 類包含了一些計(jì)算節(jié)點(diǎn)連接度的算子敬拓。例如下面的示例中計(jì)算了圖中最大的出度,入度和總連接度:

// Define a reduce operation to compute the highest degree vertex
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
  if (a._2 > b._2) a else b
}
// Compute the max degrees
val maxInDegree: (VertexId, Int)  = graph.inDegrees.reduce(max)
val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
val maxDegrees: (VertexId, Int)   = graph.degrees.reduce(max)

收集鄰接節(jié)點(diǎn)

在某些場景下為了方便計(jì)算需要收集每個(gè)節(jié)點(diǎn)的鄰居節(jié)點(diǎn)及其屬性信息裙戏,可以通過 collectNeighborIdscollectNeighbors 算子來實(shí)現(xiàn)乘凸。

class GraphOps[VD, ED] {
  def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
  def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexId, VD)] ]
}

這些算子的開銷可能會(huì)很大,因?yàn)橛行┬畔?huì)被復(fù)制累榜,還需要大量的網(wǎng)絡(luò)通信营勤。可能的話壹罚,建議使用 aggregateMessages` 算子來表達(dá)相同的計(jì)算葛作。

緩存及清理

Spark 中的 RDD 默認(rèn)是不會(huì)緩存到內(nèi)存中的。為了避免重復(fù)計(jì)算猖凛,對同一 RDD 多次使用時(shí)需要顯示指定緩存(參見 Spark Programming Guide)赂蠢。GraphX 中的圖也是類似的,當(dāng)需要多次使用同一張圖的數(shù)據(jù)時(shí)辨泳,請首先調(diào)用 Graph.cache() 來顯示指定緩存虱岂。

在迭代計(jì)算中玖院,為了達(dá)到最佳性能也有時(shí)也需要清除緩存。默認(rèn)情況下第岖,被緩存的 RDD 和圖數(shù)據(jù)會(huì)一直保留在內(nèi)存中难菌,直到由于內(nèi)存壓力觸發(fā) LAU 策略才會(huì)清除。對于迭代計(jì)算蔑滓,產(chǎn)生的中間計(jì)算結(jié)果會(huì)填滿緩存空間郊酒。即使它們最終被清理了,內(nèi)存中那些沒必要的存儲會(huì)降低垃圾回收的效率烫饼,所以有必要在中間結(jié)果不再需要的時(shí)候及時(shí)清理它們 猎塞。如此就需要在每個(gè)迭代中緩存圖中的 RDD,同時(shí)清理其他不需要的 RDD杠纵,將這些操作正確執(zhí)行可能比較有難度荠耽。所以對于迭代計(jì)算推薦使用 Pregel API,能夠正確的清理中間計(jì)算結(jié)果比藻。

Pregel API

圖是一種天然就具有遞歸性質(zhì)的數(shù)據(jù)結(jié)構(gòu)铝量,因?yàn)楣?jié)點(diǎn)的屬性值依賴于鄰居節(jié)點(diǎn)的屬性值,鄰居節(jié)點(diǎn)的屬性值又依賴于它們鄰居節(jié)點(diǎn)的屬性值银亲。所以慢叨,許多重要的圖算法都會(huì)迭代計(jì)算每個(gè)節(jié)點(diǎn)的屬性值直到達(dá)成某一收斂條件,根據(jù)迭代算法的特點(diǎn)業(yè)界也提出了一些并行圖計(jì)算的抽象务蝠。GraphX 采用了 Pregel 作為底層抽象拍谐。

從比較高的層面來看,受限于圖的拓?fù)浣Y(jié)構(gòu)馏段,Pregel 算子符合 BSP(Bulk-Synchronous Parallel)消息傳遞模型轩拨。Pregel 算子會(huì)執(zhí)行一系列的 super step,在每個(gè) super step 中每個(gè)節(jié)點(diǎn)接收上一個(gè) super step 中發(fā)送過來的消息院喜,并對同一節(jié)點(diǎn)接收的消息進(jìn)行聚合亡蓉,根據(jù)聚合后的消息計(jì)算出該節(jié)點(diǎn)新的屬性值,并向鄰居節(jié)點(diǎn)發(fā)送在下一個(gè) super step 中將會(huì)用到的消息喷舀。跟經(jīng)典 Pregel 模型不同的地方是砍濒,消息的計(jì)算是通過作用于三元組的一個(gè)函數(shù)并行執(zhí)行的,計(jì)算可以同時(shí)訪問源節(jié)點(diǎn)和目的節(jié)點(diǎn)硫麻。在某一 super step 中沒有收到消息的節(jié)點(diǎn)是不會(huì)參與計(jì)算的爸邢。當(dāng)不再有新的消息存在時(shí),Pregel 算子會(huì)終止計(jì)算并返回最終的圖狀態(tài)結(jié)果拿愧。

注意杠河,跟許多標(biāo)準(zhǔn)的 Pregel 實(shí)現(xiàn)不同,GraphX 中的節(jié)點(diǎn)只能發(fā)送消息到鄰居節(jié)點(diǎn),而且消息的構(gòu)建是通過一個(gè)用戶自定義函數(shù)并行進(jìn)行的感猛。這些特點(diǎn)限制了 GraphX 后續(xù)可做的一些優(yōu)化。

下面是 Pregel 算子([Pregel operator](http://spark.apache.org/docs/latest/api/scala/org/apache/spark/graphx/GraphOps.html#pregelA((VertexId,VD,A)?VD,(EdgeTriplet[VD,ED])?Iterator[(VertexId,A)],(A,A)?A))的類型簽名及其實(shí)現(xiàn)的骨架(注意:為了避免由于過長的調(diào)用鏈路造成的 stackOverflowError奢赂,Pregel 支持定期將圖狀態(tài)數(shù)據(jù)和消息持久化到檢查點(diǎn)陪白,由參數(shù)「spark.graphx.pregel.checkpointInterval」控制,可以將其設(shè)置為一個(gè)正整數(shù)膳灶,比如 10 咱士,還需要調(diào)用 SparkContext.setCheckpointDir(directory: String) 設(shè)置檢查點(diǎn)路徑):

class GraphOps[VD, ED] {
  def pregel[A]
      (initialMsg: A,
       maxIter: Int = Int.MaxValue,
       activeDir: EdgeDirection = EdgeDirection.Out)
      (vprog: (VertexId, VD, A) => VD,
       sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
       mergeMsg: (A, A) => A)
    : Graph[VD, ED] = {
    // Receive the initial message at each vertex
    var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()

    // compute the messages
    var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg)
    var activeMessages = messages.count()
    // Loop until no messages remain or maxIterations is achieved
    var i = 0
    while (activeMessages > 0 && i < maxIterations) {
      // Receive the messages and update the vertices.
      g = g.joinVertices(messages)(vprog).cache()
      val oldMessages = messages
      // Send new messages, skipping edges where neither side received a message. We must cache
      // messages so it can be materialized on the next line, allowing us to uncache the previous
      // iteration.
      messages = GraphXUtils.mapReduceTriplets(
        g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
      activeMessages = messages.count()
      i += 1
    }
    g
  }
}

注意 Pregel 算子接收兩個(gè)參數(shù)列表(即 graph.pregel(list1)(list2)),第一個(gè)參數(shù)列表包含初始消息值轧钓,最大迭代次數(shù)序厉,和消息發(fā)送的邊的方向(默認(rèn)為出邊),第二個(gè)參數(shù)列表包含接處理消息(vprog)毕箍,發(fā)送消息(sendMsg)弛房,聚合消息(mergeMsg)的用戶自定義函數(shù)。

下面的示例展示了如何用 Pregel 算子實(shí)現(xiàn)單源最短路徑算法而柑。

import org.apache.spark.graphx.{Graph, VertexId}
import org.apache.spark.graphx.util.GraphGenerators

// A graph with edge attributes containing distances
val graph: Graph[Long, Double] =
  GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
val sourceId: VertexId = 42 // The ultimate source
// Initialize the graph such that all vertices except the root have distance infinity.
val initialGraph = graph.mapVertices((id, _) =>
    if (id == sourceId) 0.0 else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
  (id, dist, newDist) => math.min(dist, newDist), // Vertex Program
  triplet => {  // Send Message
    if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
      Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
    } else {
      Iterator.empty
    }
  },
  (a, b) => math.min(a, b) // Merge Message
)
println(sssp.vertices.collect.mkString("\n"))

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/graphx/SSSPExample.scala" in the Spark repo」。

圖構(gòu)建器

GraphX 提供了幾種方式從 RDD 或者磁盤上的數(shù)據(jù)中構(gòu)建出一張圖。默認(rèn)情況下這些構(gòu)建器都不會(huì)對圖中的邊進(jìn)行重分區(qū)葫松,相反使碾,所有的邊都會(huì)分布在它們默認(rèn)的分區(qū)中(比如原始 HDFS 的塊中)。Graph.groupEdges需要圖中的邊被重分區(qū)涩澡,因?yàn)檫@個(gè)算子假設(shè)相同的邊都分布在同一個(gè)分區(qū)中顽耳,所以在使用groupEdges算子之前需要調(diào)用 [Graph.partitionBy`](http://spark.apache.org/docs/latest/api/scala/org/apache/spark/graphx/Graph.html#partitionBy(PartitionStrategy):Graph[VD,ED])。

object GraphLoader {
  def edgeListFile(
      sc: SparkContext,
      path: String,
      canonicalOrientation: Boolean = false,
      minEdgePartitions: Int = 1)
    : Graph[Int, Int]
}

GraphLoader.edgeListFile 提供了一種保存了邊信息的磁盤中文件中構(gòu)建出一張圖的方式妙同。該方法以下面的格式來解析射富,其中每一行包含一對 ID (源節(jié)點(diǎn) ID 和目的節(jié)點(diǎn) ID),會(huì)跳過以 # 開頭的行:

# This is a comment
2 1
4 1
1 2

該方法從指定的邊當(dāng)中創(chuàng)建出一張圖渐溶,會(huì)自動(dòng)添加邊中涉及到的節(jié)點(diǎn)辉浦。所有的節(jié)點(diǎn)和邊的屬性值默認(rèn)為 1。參數(shù) canonicalOrientation 允許將邊的方向正則化(srcId < dstId)茎辐,聯(lián)通分量(connected components)算法需要這樣的性質(zhì)宪郊。參數(shù) minEdgePartitions 生成的邊的最小分區(qū)數(shù);最終的分區(qū)數(shù)可能會(huì)大于該參數(shù)的值拖陆,例如弛槐,HDFS 有更多的塊。

object Graph {
  def apply[VD, ED](
      vertices: RDD[(VertexId, VD)],
      edges: RDD[Edge[ED]],
      defaultVertexAttr: VD = null)
    : Graph[VD, ED]

  def fromEdges[VD, ED](
      edges: RDD[Edge[ED]],
      defaultValue: VD): Graph[VD, ED]

  def fromEdgeTuples[VD](
      rawEdges: RDD[(VertexId, VertexId)],
      defaultValue: VD,
      uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int]

}

[Graph.apply](http://spark.apache.org/docs/latest/api/scala/org/apache/spark/graphx/Graph$.html#applyVD,ED 方法可以從節(jié)點(diǎn)和邊的 RDD 中構(gòu)建出一張圖依啰。重復(fù)的節(jié)點(diǎn)會(huì)從中任意挑選一個(gè)乎串,在邊的 RDD 中存在但是不在節(jié)點(diǎn)的 RDD 中存在的節(jié)點(diǎn)會(huì)被賦予默認(rèn)屬性值。

Graph.fromEdges 方法可以從單一的邊 RDD 中構(gòu)建出一張圖速警,自動(dòng)創(chuàng)建在邊的 RDD 中出現(xiàn)的節(jié)點(diǎn)叹誉,并賦予默認(rèn)的屬性值鸯两。

[Graph.fromEdgeTuples](http://spark.apache.org/docs/latest/api/scala/org/apache/spark/graphx/Graph.html#fromEdgeTuples[VD](RDD[(VertexId,VertexId)],VD,Option[PartitionStrategy]) 方法可以從一個(gè)以 Tuple 格式描述邊的 RDD 中構(gòu)建出一張圖,并為邊賦予默認(rèn)屬性值 1长豁,自動(dòng)創(chuàng)建在邊的 RDD 中出現(xiàn)的節(jié)點(diǎn)钧唐,并賦予默認(rèn)的屬性值。該方法支持對邊進(jìn)行去重匠襟,請將參數(shù) `uniqueEdges` 設(shè)置成為一個(gè) [`PartitionStrategy`](http://spark.apache.org/docs/latest/api/scala/org/apache/spark/graphx/PartitionStrategy.html) 類型的 Some(例如钝侠,uniqueEdges = Some(PartitionStrategy.RandomVertexCut)) 。分區(qū)策略需要能夠歸并相同的邊來達(dá)到去重的效果酸舍。

節(jié)點(diǎn)和邊 RDD

GraphX 對外暴露了圖中節(jié)點(diǎn)和邊的 RDD 視圖帅韧。然而,因?yàn)?GraphX 以優(yōu)化后的數(shù)據(jù)結(jié)構(gòu)保存著節(jié)點(diǎn)和邊啃勉,這些數(shù)據(jù)結(jié)構(gòu)提供了附加功能忽舟,所以節(jié)點(diǎn)和邊分別以 VertexRDDVertexRDDEdgeRDDEdgeRDD)的方式呈現(xiàn)。本章節(jié)介紹那些額外的有用的功能璧亮,注意下面只是一部分萧诫,完整的算子列表參見 API 文檔。

節(jié)點(diǎn) RDD

VertexRDD[A] 繼承了 RDD[(VertexId, A)]枝嘶,并額外限制了每個(gè) VertexId 只能出現(xiàn)一次帘饶。VertexRDD[A] 表示一個(gè)節(jié)點(diǎn)的集合,其中每個(gè)節(jié)點(diǎn)都有一個(gè)類型為 A 的屬性值群扶。在內(nèi)部及刻,這些數(shù)據(jù)被保存為一個(gè)可復(fù)用的哈希表。進(jìn)而竞阐,如果兩個(gè) VertexRDD 來源于同一個(gè) VertexRDD(例如缴饭,通過 filter 或者 mapValues 算子),那么這兩個(gè) VertexRDD 可以在常數(shù)時(shí)間內(nèi)做連接操作而不需要哈希重分區(qū)骆莹】怕В基于這種帶索引的數(shù)據(jù)結(jié)構(gòu), VertexRDD 給出了下面的接口:

class VertexRDD[VD] extends RDD[(VertexId, VD)] {
  // Filter the vertex set but preserves the internal index
  def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD]
  // Transform the values without changing the ids (preserves the internal index)
  def mapValues[VD2](map: VD => VD2): VertexRDD[VD2]
  def mapValues[VD2](map: (VertexId, VD) => VD2): VertexRDD[VD2]
  // Show only vertices unique to this set based on their VertexId's
  def minus(other: RDD[(VertexId, VD)])
  // Remove vertices from this set that appear in the other set
  def diff(other: VertexRDD[VD]): VertexRDD[VD]
  // Join operators that take advantage of the internal indexing to accelerate joins (substantially)
  def leftJoin[VD2, VD3](other: RDD[(VertexId, VD2)])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]
  def innerJoin[U, VD2](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
  // Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD.
  def aggregateUsingIndex[VD2](other: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
}

請注意 filter 算子是如何返回一個(gè) VertexRDD 的幕垦,過濾函數(shù)是通過一個(gè) BitSet 實(shí)現(xiàn)的丢氢,所以能夠復(fù)用索引結(jié)構(gòu),從而可以跟其他的 VertexRDD 快速進(jìn)行連接操作先改。同樣的疚察,mapValues 算子并不允許 map 函數(shù)改變節(jié)點(diǎn)的 VertexId,從而能夠復(fù)用之前的 HashMap 索引結(jié)構(gòu)仇奶。leftJoininnerJoin 算子在連接兩個(gè)來源于擁有相同 HashMap 索引結(jié)構(gòu)的 VertexRDD 時(shí)貌嫡,能夠識別相同 ID 的節(jié)點(diǎn),能夠在連接時(shí)線性掃描而不需要單點(diǎn)查找。

aggregateUsingIndex 算子可以高效的從一個(gè) RDD[(VertexId, A)] 中創(chuàng)建一個(gè)新的 VertexRDD岛抄。從概念上來講别惦,如果已經(jīng)從已有的節(jié)點(diǎn)數(shù)據(jù)集上構(gòu)建了一個(gè) VertexRDD[B],它是另一個(gè)節(jié)點(diǎn)集 RDD[(VertexId, A)] 的超集夫椭,那么就可以復(fù)用 VertexRDD[B] 的索引結(jié)構(gòu)來輔助聚合操作以及創(chuàng)建 RDD[(VertexId, A)] 的索引步咪。例如:

val setA: VertexRDD[Int] = VertexRDD(sc.parallelize(0L until 100L).map(id => (id, 1)))
val rddB: RDD[(VertexId, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0)))
// There should be 200 entries in rddB
rddB.count
val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _)
// There should be 100 entries in setB
setB.count
// Joining A and B should now be fast!
val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)

邊 RDD

EdgeRDD[ED] 繼承了 RDD[Edge[ED]],通過定義在 PartitionStrategy 中的一些分區(qū)策略來對存儲了邊的數(shù)據(jù)塊進(jìn)行分區(qū)益楼。在每個(gè)分區(qū)中,邊的屬性值和相鄰的拓?fù)浣Y(jié)構(gòu)是分開存儲的点晴,來最大化的復(fù)用這些數(shù)據(jù)感凤,即使邊的屬性值發(fā)生了變化。

下面是三個(gè)典型的 EdgeRDD 的功能方法:

// Transform the edge attributes while preserving the structure
def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2]
// Reverse the edges reusing both attributes and structure
def reverse: EdgeRDD[ED]
// Join two `EdgeRDD`s partitioned using the same partitioning strategy.
def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]

在大多數(shù)應(yīng)用中粒督,對于 EdgeRDD 的操作是通過圖的算子或者的 RDD 基礎(chǔ)算子來完成的陪竿。

優(yōu)化表示方式

盡管關(guān)于分布式環(huán)境下圖的表示方式的優(yōu)化超出了本文的討論范疇,還是可以從一個(gè)比較高的層面上對優(yōu)化手段作一些了解屠橄,有助于彈性算法的設(shè)計(jì)實(shí)現(xiàn)和 API 的使用族跛。GraphX 采用節(jié)點(diǎn)分割的方式來對分布式圖進(jìn)行分區(qū):

Edge Cut vs. Vertex Cut

相比于根據(jù)邊來切割圖數(shù)據(jù),GraphX 根據(jù)節(jié)點(diǎn)切割來進(jìn)行分區(qū)锐墙,能夠同時(shí)減少通信和存儲的開銷礁哄。從邏輯層面講,每條邊會(huì)被分配到不同的節(jié)點(diǎn)上溪北,而同一個(gè)節(jié)點(diǎn)可能會(huì)被分配到不同的節(jié)點(diǎn)上桐绒。具體的分配方式取決于 PartitionStrategy,不同的方式之間都有一些取舍之拨,用戶可以通過 Graph.partitionBy 來對圖數(shù)據(jù)進(jìn)行重分區(qū)茉继。默認(rèn)的分區(qū)策略是構(gòu)建 Graph 對象時(shí)邊數(shù)據(jù)的初始分區(qū),不過用戶可以輕松的切換到 2D 分區(qū)策略或者其他 GrphX 提供的策略蚀乔。

RDD Graph Representation

一旦邊數(shù)據(jù)劃分好了分區(qū)烁竭,并行圖計(jì)算主要的挑戰(zhàn)就變成了如何高效的對節(jié)點(diǎn)屬性和邊進(jìn)行關(guān)聯(lián)操作。由于現(xiàn)實(shí)世界中的圖數(shù)據(jù)大部分情況下邊的數(shù)量都遠(yuǎn)超過點(diǎn)的數(shù)量吉挣,GraphX 將節(jié)點(diǎn)屬性和邊放在一起派撕。由于并不是所有的分區(qū)中的邊和所有的節(jié)點(diǎn)都相鄰,GraphX 內(nèi)部維護(hù)了一張路由表听想,用來在進(jìn)行 tripletsaggregateMessages 的連接操作時(shí)判定節(jié)點(diǎn)應(yīng)該被廣播到那個(gè)節(jié)點(diǎn)腥刹。

圖算法

GraphX 提供了一些圖算法來簡化數(shù)據(jù)分析,這些算法位于 org.apache.spark.graphx.lib 包汉买,可以直接通過 Graph 的方法訪問 GraphOps 中的算法衔峰。本章節(jié)對其中一些算法做簡要的介紹。

網(wǎng)站排名

PageRank 算法衡量圖中每個(gè)節(jié)點(diǎn)的重要性,一條從節(jié)點(diǎn) u 到節(jié)點(diǎn) v 的邊代表 u 節(jié)點(diǎn)對 v 節(jié)點(diǎn)重要性的貢獻(xiàn)垫卤。例如威彰,如果一個(gè) Twitter 用戶用很多關(guān)注者,該用戶的排名就會(huì)較高穴肘。

GraphX 在單例對象 PageRank 中提供了 PageRank 算法靜態(tài)和動(dòng)態(tài)的實(shí)現(xiàn)歇盼。靜態(tài)的 PageRank 算法執(zhí)行固定的迭代次數(shù),而動(dòng)態(tài)的 PageRank 算法會(huì)一直運(yùn)行直到結(jié)果收斂(即评抚,誤差在一個(gè)指定的范圍內(nèi))豹缀。可以通過 Graph 來直接調(diào)用 GraphOps 中的這些算法慨代。

GraphX 還提供了一個(gè)可以運(yùn)行 PageRank 算法的社交網(wǎng)絡(luò)數(shù)據(jù)集邢笙。用戶的信息保存在文件 data/graphx/users.txt 中,用戶之間的關(guān)系的數(shù)據(jù)保存在文件 data/graphx/followers.txt 中侍匙。計(jì)算的代碼如下:

import org.apache.spark.graphx.GraphLoader

// Load the edges as a graph
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Run PageRank
val ranks = graph.pageRank(0.0001).vertices
// Join the ranks with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
  val fields = line.split(",")
  (fields(0).toLong, fields(1))
}
val ranksByUsername = users.join(ranks).map {
  case (id, (username, rank)) => (username, rank)
}
// Print the result
println(ranksByUsername.collect().mkString("\n"))

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/graphx/PageRankExample.scala」氮惯。

連通分量

連通分量算法將圖中的每個(gè)節(jié)點(diǎn)的屬性值標(biāo)記為所在連通分量中的節(jié)點(diǎn)的最小 ID。例如想暗,在一個(gè)社交網(wǎng)絡(luò)中妇汗,連通分量可以做簡單的聚類分析。GraphX 在單例對象 ConnectedComponents 中包含了算法的實(shí)現(xiàn)说莫,下面還是根據(jù) PageRank 章節(jié)的數(shù)據(jù)進(jìn)行連通分量的計(jì)算:

import org.apache.spark.graphx.GraphLoader

// Load the graph as in the PageRank example
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Find the connected components
val cc = graph.connectedComponents().vertices
// Join the connected components with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
  val fields = line.split(",")
  (fields(0).toLong, fields(1))
}
val ccByUsername = users.join(cc).map {
  case (id, (username, cc)) => (username, cc)
}
// Print the result
println(ccByUsername.collect().mkString("\n"))

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/graphx/ConnectedComponentsExample.scala」杨箭。

三角計(jì)數(shù)

當(dāng)一個(gè)節(jié)點(diǎn)的兩個(gè)鄰居節(jié)點(diǎn)之間有一條邊時(shí),該節(jié)點(diǎn)就是一個(gè)三角形的一部分储狭。GraphX 在單例對象 TriangleCount 中實(shí)現(xiàn)了三角計(jì)數(shù)的算法告唆,計(jì)算每個(gè)節(jié)點(diǎn)所在的三角形的數(shù)量,作為一種聚類的手段晶密。這里使用 PageRank 章節(jié)的社交網(wǎng)絡(luò)數(shù)據(jù)集來進(jìn)行三角計(jì)數(shù)算法擒悬。注意 TriangleCount 算法需要圖中的邊的方向是正則化(srcId < dstId)后的,并且需要使用 Graph.partitionBy 進(jìn)行分區(qū)稻艰。

import org.apache.spark.graphx.{GraphLoader, PartitionStrategy}

// Load the edges in canonical order and partition the graph for triangle count
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt", true)
  .partitionBy(PartitionStrategy.RandomVertexCut)
// Find the triangle count for each vertex
val triCounts = graph.triangleCount().vertices
// Join the triangle counts with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
  val fields = line.split(",")
  (fields(0).toLong, fields(1))
}
val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) =>
  (username, tc)
}
// Print the result
println(triCountByUsername.collect().mkString("\n"))

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/graphx/TriangleCountingExample.scala」懂牧。

示例代碼

假設(shè)需要從一些文本文件中構(gòu)建出一張圖,將圖限制在重要的關(guān)系和用戶之間尊勿,在子圖上執(zhí)行 PageRank 算法僧凤,最后返回頭部用戶的屬性值,可以這樣做:

import org.apache.spark.graphx.GraphLoader

// Load my user data and parse into tuples of user id and attribute list
val users = (sc.textFile("data/graphx/users.txt")
  .map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))

// Parse the edge data which is already in userId -> userId format
val followerGraph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")

// Attach the user attributes
val graph = followerGraph.outerJoinVertices(users) {
  case (uid, deg, Some(attrList)) => attrList
  // Some users may not have attributes so we set them as empty
  case (uid, deg, None) => Array.empty[String]
}

// Restrict the graph to users with usernames and names
val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2)

// Compute the PageRank
val pagerankGraph = subgraph.pageRank(0.001)

// Get the attributes of the top pagerank users
val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) {
  case (uid, attrList, Some(pr)) => (pr, attrList.toList)
  case (uid, attrList, None) => (0.0, attrList.toList)
}

println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n"))

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/graphx/ComprehensiveExample.scala」元扔。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末躯保,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子澎语,更是在濱河造成了極大的恐慌途事,老刑警劉巖验懊,帶你破解...
    沈念sama閱讀 221,820評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異尸变,居然都是意外死亡义图,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,648評論 3 399
  • 文/潘曉璐 我一進(jìn)店門召烂,熙熙樓的掌柜王于貴愁眉苦臉地迎上來碱工,“玉大人,你說我怎么就攤上這事奏夫∨屡瘢” “怎么了?”我有些...
    開封第一講書人閱讀 168,324評論 0 360
  • 文/不壞的土叔 我叫張陵酗昼,是天一觀的道長匙头。 經(jīng)常有香客問我,道長仔雷,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,714評論 1 297
  • 正文 為了忘掉前任舔示,我火速辦了婚禮碟婆,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘惕稻。我一直安慰自己竖共,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,724評論 6 397
  • 文/花漫 我一把揭開白布俺祠。 她就那樣靜靜地躺著公给,像睡著了一般。 火紅的嫁衣襯著肌膚如雪蜘渣。 梳的紋絲不亂的頭發(fā)上淌铐,一...
    開封第一講書人閱讀 52,328評論 1 310
  • 那天,我揣著相機(jī)與錄音蔫缸,去河邊找鬼腿准。 笑死,一個(gè)胖子當(dāng)著我的面吹牛拾碌,可吹牛的內(nèi)容都是我干的吐葱。 我是一名探鬼主播,決...
    沈念sama閱讀 40,897評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼校翔,長吁一口氣:“原來是場噩夢啊……” “哼弟跑!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起防症,我...
    開封第一講書人閱讀 39,804評論 0 276
  • 序言:老撾萬榮一對情侶失蹤孟辑,失蹤者是張志新(化名)和其女友劉穎哎甲,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體扑浸,經(jīng)...
    沈念sama閱讀 46,345評論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡烧给,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,431評論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了喝噪。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片础嫡。...
    茶點(diǎn)故事閱讀 40,561評論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖酝惧,靈堂內(nèi)的尸體忽然破棺而出榴鼎,到底是詐尸還是另有隱情,我是刑警寧澤晚唇,帶...
    沈念sama閱讀 36,238評論 5 350
  • 正文 年R本政府宣布巫财,位于F島的核電站,受9級特大地震影響哩陕,放射性物質(zhì)發(fā)生泄漏平项。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,928評論 3 334
  • 文/蒙蒙 一悍及、第九天 我趴在偏房一處隱蔽的房頂上張望闽瓢。 院中可真熱鬧,春花似錦心赶、人聲如沸扣讼。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,417評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽椭符。三九已至,卻和暖如春耻姥,著一層夾襖步出監(jiān)牢的瞬間销钝,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,528評論 1 272
  • 我被黑心中介騙來泰國打工琐簇, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留曙搬,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,983評論 3 376
  • 正文 我出身青樓鸽嫂,卻偏偏與公主長得像纵装,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子据某,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,573評論 2 359