Spark GraphX

Spark GraphX概述

GraphX是Spark的一個組件,專門用來表示圖以及進(jìn)行圖的并行計算孵户。GraphX通過重新定義了圖的抽象概念來拓展了RDD:定向多圖萧朝,其屬性附加到每個頂點和邊。為了支持圖計算夏哭,GraphX公開了一系列基本運(yùn)算符(比如:mapVertices检柬、mapEdges、subgraph)已經(jīng)優(yōu)化后的Pregel API變種竖配。此外何址,還包含越來越多的圖計算和構(gòu)建器,以簡化圖形分析任務(wù)进胯。GraphX在圖頂點信息和邊信息存儲上做了優(yōu)化头朱,使得圖計算框架性能相對于原生RDD是想較大的提升,接近或達(dá)到GraphLab等專業(yè)圖計算平臺性能龄减。GraphX最大的貢獻(xiàn)是,在Spark之上提供一站式數(shù)據(jù)解決方案班眯,可以方便且高效地完成圖計算的一整套流水作業(yè)希停。


Spark GraphX概述.png

圖的相關(guān)術(shù)語

圖是一種較線性表和樹更為復(fù)雜的數(shù)據(jù)結(jié)構(gòu),圖表達(dá)的是多對多的關(guān)系署隘。

如下圖所示宠能,G1是一個簡單的圖,其中V1磁餐、V2违崇、V3阿弃、V4被稱為頂點(Vertex),任意兩個頂點之間的通路被稱為邊(Edge)羞延,它可以由(V1渣淳,V2)有序?qū)肀硎荆@時稱G1位有向圖伴箩,意味著邊是有方向的入愧,若以無序?qū)肀硎疽粭l表,則該圖為無向圖嗤谚,如G2

圖的相關(guān)術(shù)語.png

在G1中棺蛛,與頂點相關(guān)聯(lián)的邊的數(shù)量,被稱為頂點的度(Degree)巩步。其中旁赊,以頂點為起點的邊的數(shù)量被稱為該訂單的出度(OutDegree),以頂點為終點的邊的數(shù)量被稱為該頂點的入度(InDegree)椅野。

以G1的V1舉例终畅,V1的度是3,啟動出度為2鳄橘,入度為1声离。在無向圖G2中,如過任意兩個頂點之間是聯(lián)通的瘫怜,則稱G2為連通圖(Connected Graph)术徊。在有向圖G1中,如果任意兩個訂單Vm鲸湃、Vn且m != n,從Vm到Vn以及Vn到Vm之間都存在通路赠涮,則稱G1為強(qiáng)連通圖(Strongly Conneted Graph)。如果任意兩個頂點之間若存在通路暗挑,則稱為路徑(Path)笋除,用一個頂點序列表示,若第一個頂點和最后一個頂點相同,則稱為回路或者環(huán)(Cycle)

圖數(shù)據(jù)庫與圖計算

Neo4j 是一個比較老牌的開源圖數(shù)據(jù)庫,目前在業(yè)界的使用也較為廣泛瓤狐,它提供了一種簡單易學(xué)的查詢語言 Cypher实胸。Neo4j 支持交互式查詢,查詢效率很高。能夠迅速從整網(wǎng)中找出符合特定模式的子網(wǎng),供隨后分析之用,適用于OLTP 場景酱吝。

Neo4j 是圖數(shù)據(jù)庫,偏向于存儲和查詢土思。能存儲關(guān)聯(lián)關(guān)系比較復(fù)雜务热,實體之間的連接豐富忆嗜。比如社交網(wǎng)絡(luò)、知識圖譜崎岂、金融風(fēng)控等領(lǐng)域的數(shù)據(jù)捆毫。擅長從某個點或某些點出發(fā),根據(jù)特定條件在復(fù)雜的關(guān)聯(lián)關(guān)系中找到目標(biāo)點或邊该镣。如在社交網(wǎng)絡(luò)中找到某個點三步以內(nèi)能認(rèn)識的人冻璃,這些人可以認(rèn)為是潛在朋友。數(shù)據(jù)量限定在一定范圍內(nèi)损合,能短時完成的查詢就是所謂的OLTP操作省艳。

Neo4j 查詢與插入速度較快,沒有分布式版本嫁审,容量有限跋炕,而且一旦圖變得非常大, 如數(shù)十億頂點律适,數(shù)百億邊辐烂,查詢速度將變得緩慢。Neo4j 分為社區(qū)版和企業(yè)版捂贿,企業(yè)版有一些高級功能纠修,需要授權(quán),價格昂貴厂僧。

比較復(fù)雜的分析和算法扣草,如基于圖的聚類,PageRank 算法等颜屠,這類計算任務(wù)對于圖數(shù)據(jù)庫來說就很難勝任了辰妙,主要由一些圖挖掘技術(shù)來負(fù)責(zé)。

Pregel 是 Google 于 2010 年在 SIGMOD 會議上發(fā)表的《Pregel: A System for Large-Scale Graph Processing》論文中提到的海量并行圖挖掘的抽象框架甫窟,Pregel 與 Dremel 一樣密浑,是 Google 新三駕馬車之一,它基于 BSP 模型(Bulk Synchronous Parallel粗井,整體同步并行計算模型)尔破,將計算分為若干個超步(super step),在超步內(nèi)浇衬,通過消息來傳播頂點之間的狀態(tài)懒构。Pregel 可以看成是同步計 算,即等所有頂點完成處理后再進(jìn)行下一輪的超步径玖,Spark 基于 Pregel 論文實現(xiàn)的 海量并行圖挖掘框架 GraphX。

圖計算模式

目前基于圖的并行計算框架已經(jīng)有很多颤介,比如來自Google的Pregel梳星、來自Apache開源的圖計算框架Giraph / HAMA以及最為著名的GraphLab赞赖,其中Pregel、HAMA和 Giraph都是非常類似的冤灾,都是基于BSP模式前域。

BSP即整體同步并行,它將計算分成一系列超步的迭代韵吨。從縱向上看匿垄,它是一個串行模式,而從橫向上看归粉,它是一個并行的模式椿疗,每兩個超步之間設(shè)置一個柵欄 (barrier),即整體同步點糠悼,確定所有并行的計算都完成后再啟動下一輪超步届榄。

圖計算模式.png

每一個超步包含三部分內(nèi)容:

  • 計算 cumpute:每一個Processor利用上一個超步傳過來的消息和本地的數(shù)據(jù)進(jìn)行本地計算
  • 消息傳遞:每一個Processor計算完畢后,將消息傳遞給與之關(guān)聯(lián)的其他Processors
  • 整體同步點:用整體同步倔喂,確定所有的計算和消息傳遞都進(jìn)行完畢后铝条,進(jìn)入下一個超步

Spark GraphX 基礎(chǔ)

架構(gòu)
存儲模式
核心數(shù)據(jù)結(jié)構(gòu)

GraphX 與 Spark 其他組件相比相對獨立,擁有自己的核心數(shù)據(jù)結(jié)構(gòu)與算子席噩。

GraphX架構(gòu)

GraphX架構(gòu).png

GraphX的整體架構(gòu)可以分為三個部分:

  • 算法層:基于Pregel接口實現(xiàn)了常用的圖算法班缰,包括PageRank、SVDPlusPlus悼枢、TriangeleCount埠忘、ConnectedComponents、StonglyConnectedConponents等算法
  • 接口層:在底層RDD基礎(chǔ)之上實現(xiàn)了Pregel模型BSP模式的計算接口
  • 底層:圖計算的核心類萧芙,包含:VertexRDD给梅、EdgeRDD、RDD[EdgeTriplet]

存儲模式

巨型圖的存儲總體上有邊分割和點分割兩種存儲方式双揪。2013年动羽,GraphLab2.0將其 存儲方式由邊分割變?yōu)辄c分割,在性能上取得重大提升渔期,目前基本上被業(yè)界廣泛接受 并使用运吓。

  • 邊分割(Edge-Cut):每個頂點存儲一次,但有的邊會被打斷分到兩臺機(jī)器上疯趟。這樣做的好處就是節(jié)省存儲空間拘哨;壞處是對圖進(jìn)行基于邊計算時,對于一條兩個頂點被分到不同的機(jī)器上的邊來說信峻,需要跨機(jī)器傳輸數(shù)據(jù)倦青,內(nèi)網(wǎng)通信流量大
  • 點分割(Vertex-Cut):每條邊只存儲一次,都會出現(xiàn)一臺機(jī)器上盹舞,鄰居多的點會被復(fù)制到多臺機(jī)器上产镐,增加了存儲開銷隘庄,同時會引發(fā)數(shù)據(jù)同步問題。好處是可以大幅減少內(nèi)網(wǎng)通信量癣亚。

存儲模式.png

雖然兩種方法互有利弊丑掺,但現(xiàn)在是點分割占上風(fēng),各種分布式圖計算框架都將自己底層的存儲形式變成了點分割述雾。主要原因有以下兩個:

  • 磁盤價格下降街州,存儲空間不再是問題,而內(nèi)網(wǎng)的通信資源沒有突破性進(jìn)展玻孟,集群計算時內(nèi)網(wǎng)帶寬是寶貴的唆缴,時間比磁盤更珍貴。這點就類似于常見的空間換時間的策略;
  • 在當(dāng)前的應(yīng)用場景中取募,絕大多數(shù)網(wǎng)絡(luò)都是“無尺度網(wǎng)絡(luò)”琐谤,遵循冪律分布,不同點的鄰居數(shù)量相差非常懸殊玩敏。而邊分割會使那些多鄰居的點所相連的邊大多數(shù)被分到不同的機(jī)器上斗忌,這樣的數(shù)據(jù)分布會使得內(nèi)網(wǎng)帶寬更加捉襟見肘,于是邊分割存儲方式被漸漸拋棄了;

核心數(shù)據(jù)結(jié)構(gòu)

核心數(shù)據(jù)結(jié)構(gòu)包括:graph旺聚、vertices织阳、edges、triplets

GraphX API 的開發(fā)語言目前僅支持 Scala砰粹。GraphX 的核心數(shù)據(jù)結(jié)構(gòu) Graph 由 RDD 封裝而成唧躲。

Graph

GraphX用屬性圖的方法表示圖,頂點有屬性碱璃,邊有屬性弄痹。存儲結(jié)構(gòu)采用邊集數(shù)據(jù)的形式,即一個頂點表嵌器,一個邊表肛真,如下圖所示:

Graph.png

頂點ID是非常重要的字段,他不光是頂點的唯一標(biāo)識符爽航,也是描述邊的唯一手段蚓让,訂單表與邊表實際上就是RDD,他們分別是VertexRDD與EdgeRDD讥珍。在Spark的源碼中历极,Graph類如下:

Graph2.png
  • vertices為頂點表,VD為訂單屬性類型
  • edges為邊表衷佃,ED為邊屬性類型
  • 可以通過Graph的vertices與edges成員直接得到頂點RDD與邊RDD
  • 頂點RDD類型是VerticeRDD趟卸,繼承自RDD[(VertexId,VD)]
  • 邊RDD類型為EdgeRDD,繼承自RDD[Edge[ED]]
vertices

vertices對應(yīng)著名為 VertexRDD 的RDD。這個RDD由頂點id和頂點屬性兩個成員變量锄列。

vertices.png

VertexRDD繼承自 RDD[(VertexId, VD)]新蟆,這里VertexId表示頂點id,VD表示頂點所 帶的屬性的類別右蕊。

VertexId實際上是一個Long類型

vertices2.png
edges

edges對應(yīng)著EdgeRDD。這個RDD擁有三個成員變量吮螺,分別是源頂點id饶囚、目標(biāo)頂點id以及邊屬性。

edges.png

Edge代表邊鸠补,由 源頂點id萝风、目標(biāo)頂點id、以及邊的屬性構(gòu)成紫岩。

edges2.png
triplets

triplets 表示邊點三元組规惰,如下圖所示(其中圓柱形分別代表頂點屬性與邊屬性):

triplets.png

通過 triplets 成員,用戶可以直接獲取到起點頂點泉蝌、起點頂點屬性歇万、終點頂點、終點頂點屬性勋陪、邊與邊屬性信息贪磺。triplets 的生成可以由邊表與頂點表通過 ScrId 與 DstId 連接而成。

triplets對應(yīng)著EdgeTriplet诅愚。它是一個三元組視圖寒锚,這個視圖邏輯上將頂點和邊的屬 性保存為一個RDD[EdgeTriplet[VD, ED]]。

triplets2.png

Spark GraphX 計算

圖的定義
屬性操作
轉(zhuǎn)換操作
結(jié)構(gòu)操作
關(guān)聯(lián)操作
聚合操作
Pregel API

引入依賴:

<!-- graphx -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-graphx_2.12</artifactId>
  <version>${spark.version}</version>
</dependency>

案例一:圖的基本操作

Spark GraphX 計算.png

找到 出度=入度 的人員\找出5到各頂點的最短距離 即鏈接操作與聚合操作需要重新聽課

package com.hhb.spark.graphx

import org.apache.spark.graphx.{Edge, Graph, VertexId, VertexRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.reflect.ClassTag

/**
 * @description:
 * @author: huanghongbo
 * @date: 2020-11-25 14:25
 **/
object GraphXExample1 {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
    val sc = new SparkContext(conf)

    sc.setLogLevel("warn")
    //定義頂點的數(shù)據(jù)(訂單违孝,信息)
    val vertexArr: Array[(VertexId, (String, Int))] = Array(
      (1L, ("Alice", 28)),
      (2L, ("Bob", 27)),
      (3L, ("Charlie", 65)),
      (4L, ("David", 42)),
      (5L, ("Ed", 55)),
      (6L, ("Fran", 50)))


    //定義邊的數(shù)據(jù)(起點刹前,終點,邊屬性)
    val edgeArray: Array[Edge[Int]] = Array(
      Edge(2L, 1L, 7),
      Edge(2L, 4L, 2),
      Edge(3L, 2L, 4),
      Edge(3L, 6L, 6),
      Edge(4L, 1L, 1),
      Edge(5L, 2L, 2),
      Edge(5L, 3L, 8),
      Edge(5L, 6L, 3)
    )

    //根據(jù)頂點數(shù)據(jù)生成RDD
    val vertexRDD = sc.makeRDD(vertexArr)
    //根據(jù)邊數(shù)據(jù)生成RDD
    val edgeRDD = sc.makeRDD(edgeArray)

    //生成圖信息
    val graph: Graph[(String, Int), Int] = Graph.apply(vertexRDD, edgeRDD)
    println("=======================找出圖中所有年年齡大于30的頂點============================")
    //屬性操作示例雌桑,
    //找出圖中所有年年齡大于30的頂點
    //所有頂點的屬性信息
    // val vertices: VertexRDD[(String, Int)] = graph.vertices
    // filter(pred: Tuple2[VertexId, VD] => Boolean)
    graph.vertices
      .filter { case (_, (_, point)) => point > 30 }
      .foreach(println)
    println("=======================找出圖中屬性大于5的邊============================")
    //找出圖中屬性大于5的邊
    //    val edges: EdgeRDD[Int] = graph.edges
    graph.edges
      .filter(_.attr > 5)
      .foreach(println)
    println("=======================列出邊屬性>5的tripltes============================")
    //列出邊屬性>5的tripltes
    //    val triplets: RDD[EdgeTriplet[(String, Int), Int]] = graph.triplets
    graph.triplets
      .filter(_.attr > 5)
      .foreach(println)
    //屬性操作喇喉,degress操作
    //找出圖中最大的出度、入度筹燕、度數(shù)
    println("=======================找出圖中最大的出度============================")
    //所有的
    //    val degrees: VertexRDD[Int] = graph.outDegrees
    val outMaxDegrees: (VertexId, Int) = graph.outDegrees
      .reduce((x, y) => if (x._2 > y._2) x else y)
    println(outMaxDegrees)
    println("=======================找出圖中最大的入度============================")
    val inMaxDegrees = graph.inDegrees
      .reduce((x, y) => if (x._2 > y._2) x else y)
    println(inMaxDegrees)
    println("=======================找出圖中最大的度數(shù)============================")
    val maxDegrees: (VertexId, Int) = graph.degrees
      .reduce((x, y) => if (x._2 > y._2) x else y)
    println(maxDegrees)
    // 轉(zhuǎn)換操作
    println("=======================頂點的轉(zhuǎn)換操作轧飞。所有人的年齡加 10============================")
    // 頂點的轉(zhuǎn)換操作。所有人的年齡加 10
    graph.vertices
      .map { case (id, (name, age)) => (id, (name, age + 10)) }
      .foreach(println)
    println("=======================頂點的轉(zhuǎn)換操作撒踪。所有人的年齡加 10============================")
    graph.mapVertices { case (id, (name, age)) => (id, (name, age + 10)) }.vertices.foreach(println)

    // 邊的轉(zhuǎn)換操作过咬。邊的屬性*2
    println("=======================邊的轉(zhuǎn)換操作。邊的屬性*2============================")
    graph.edges
      .map(x => x.attr * 2)
      .foreach(println(_))
    // 結(jié)構(gòu)操作
    // 頂點年齡 > 30 的子圖
    //    def subgraph(
    //                  epred: EdgeTriplet[VD, ED] => Boolean = (x => true),
    //                  vpred: (VertexId, VD) => Boolean = ((v, d) => true))
    //    : Graph[VD, ED]
    println("=======================頂點年齡 > 30 的子圖============================")
    graph.subgraph(vpred = (_, VD) => VD._2 > 30).triplets.foreach(println(_))


    //找出出度 == 入度的人員制妄,鏈接操作掸绞,思路:圖 + 頂點的出度+頂點的入度
    println("=======================找出出度 == 入度的人員============================")
    //創(chuàng)建一個新圖,頂點VD的數(shù)據(jù)類型為User,并從graph做類型轉(zhuǎn)換
    val g: Graph[User, Int] = graph.mapVertices { case (id, (name, age)) => User(name, age, 0, 0) }
    //(2,User(Bob,27,0,0))
    //(4,User(David,42,0,0))
    //(6,User(Fran,50,0,0))
    //(3,User(Charlie,65,0,0))
    //(5,User(Ed,55,0,0))
    //(1,User(Alice,28,0,0))
    //    value1.vertices.foreach(println)


    // def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])
    //  (mapFunc: (VertexId, VD, Option[U]) => VD2)(implicit eq: VD =:= VD2 = null) : Graph[VD2, ED]
    //    val degrees: VertexRDD[Int] = graph.inDegrees
    //使用新圖關(guān)聯(lián)入度的數(shù)據(jù)
    val userGraph = g.outerJoinVertices(graph.inDegrees) {
      //
      case (id, info, inDe) => User(info.name, info.age, inDe.getOrElse(0), info.outDegrees)
    }.outerJoinVertices(graph.outDegrees) {
      case (id, info, outDe) => User(info.name, info.age, info.inDegrees, outDe.getOrElse(0))
    }
    val value: VertexRDD[User] = userGraph.vertices.filter { case (id, u) => u.inDegrees == u.outDegrees }
    value.foreach(println(_))


    //找出5到各頂點的最短距離


    sc.stop()
  }
  case class User(name: String, age: Int, inDegrees: Int, outDegrees: Int)
}
Pregel API

圖本身是遞歸數(shù)據(jù)結(jié)構(gòu)衔掸,頂點的屬性依賴于他們鄰居的屬性烫幕,這些鄰居屬性有依賴于自己的鄰居屬性。許多重要的圖算法都是迭代的重新計算每個頂點的屬性敞映,直到滿足某個確定的條件较曼。一系列的圖并發(fā)抽象被提出來用來表達(dá)這些迭代算法。

Graphx公開了一個類似Pregel的操作振愿。

Pregel API.png
  • vprog:用戶定義的頂點運(yùn)行程序捷犹。它作用于每一個頂點,負(fù)責(zé)接收進(jìn)來的信息冕末,并計算新的頂點值

  • sendMsg:發(fā)送消息

  • mergeMsg:合并消息

案例二:連通圖算法

連通圖算法.png
package com.hhb.spark.graphx

import org.apache.spark.graphx.{Graph, GraphLoader}
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @description:
 * @date: 2020-11-25 18:07
 **/
object GraphXExample2 {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init)
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("warn")

    val graph: Graph[Int, Int] = GraphLoader.edgeListFile(sc, "data/graph.dat")
    //(VertexId,info)
    //(11,1)
    //(4,1)
    //(45067,1)
    //(431250,1)
    //(1,1)
    //(6,1)
    //(3,1)
    //(10,1)
    //(7,1)
    //(2,1)
    //(9,1)
    //(9111,1)
    //(5,1)
    //把文件里面所有的數(shù)據(jù)都默認(rèn)設(shè)置為頂點萍歉,屬性給個默認(rèn)值1(無用)
    graph.vertices.foreach(println(_))
    println("===" * 20)
    //起點,終點档桃,邊屬性枪孩,邊屬性為默認(rèn)值(無用)
    //Edge(4,45067,1)
    //Edge(1,431250,1)
    //Edge(6,45067,1)
    //Edge(7,45067,1)
    //Edge(2,431250,1)
    //Edge(9,9111,1)
    //Edge(3,431250,1)
    //Edge(10,9111,1)
    //Edge(4,431250,1)
    //Edge(11,9111,1)
    //Edge(5,45067,1)
    //Edge(5,431250,1)
    graph.edges.foreach(println(_))
    println("===" * 20)
    //生成連通圖
    //(頂點信息,出始點)
    (11, 9)
    (45067, 1)
    (1, 1)
    (3, 1)
    (7, 1)
    (9, 9)
    (9111, 9)
    (4, 1)
    (5, 1)
    (431250, 1)
    (6, 1)
    (10, 9)
    (2, 1)
    graph.connectedComponents()
      .vertices
      //按照元組的第二個元素排序藻肄,第二個元素就是出始點
      .sortBy(_._2)
      .foreach(println(_))
    sc.stop()
  }
}

案例三:尋找相同的用戶合并信息

假設(shè):

  • 假設(shè)有五個不同信息可以作為用戶標(biāo)識码撰,分別為:1X或舞、2X签餐、3X菱鸥、4X、5X;

  • 每次可以選擇使用若干為字段作為標(biāo)識

  • 部分標(biāo)識可能發(fā)生變化抚垄,如:12 => 13 或 24 => 25

根據(jù)以上規(guī)則蜕窿,判斷以下標(biāo)識是否代表同一用戶:

  • 11-21-32、12-22-33 (X)
  • 11-21-32呆馁、11-21-52 (OK)
  • 21-32桐经、11-21-33 (OK)
  • 11-21-32、32-48 (OK)

問題:在以下數(shù)據(jù)中浙滤,找到同一用戶阴挣,合并相同用戶的數(shù)據(jù)

  • 對于用戶標(biāo)識(id):合并后去重
  • 對于用戶的信息:key相同,合并權(quán)重
統(tǒng)一用戶識別.png
package com.hhb.spark.graphx

import org.apache.spark.graphx.{Edge, Graph, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @description:
 * @author: huanghongbo
 * @date: 2020-11-26 16:02
 **/
object GraphXExample3 {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init)
    val sc = new SparkContext(conf)
    sc.setLogLevel("warn")

    val list = List(
      (List(11L, 21L, 31L), List("kw$北京" -> 1.0, "kw$上海" -> 1.0, "area$中關(guān)村" -> 1.0)),
      (List(21L, 32L, 41L), List("kw$上海" -> 1.0, "kw$天津" -> 1.0, "area$回龍觀" -> 1.0)),
      (List(41L), List("kw$天津" -> 1.0, "area$中關(guān)村" -> 1.0)),
      (List(12L, 22L, 33L), List("kw$大數(shù)據(jù)" -> 1.0, "kw$spark" -> 1.0, "area$西二旗" -> 1.0)),
      (List(22L, 34L, 44L), List("kw$spark" -> 1.0, "area$五道口" -> 1.0)),
      (List(33L, 53L), List("kw$hive" -> 1.0, "kw$spark" -> 1.0, "area$西二旗" -> 1.0))
    )

    val dataRDD = sc.makeRDD(list)

    //將標(biāo)示信息中的每一個元素抽取出來纺腊,作為ID
    //這里使用了flatMap將數(shù)據(jù)壓平
    //丟掉了標(biāo)簽信息畔咧,因為這個RDD是用來構(gòu)造頂點,邊揖膜,tags信息用不到
    //頂點誓沸、邊的數(shù)據(jù)要求是long,這個程序修改后我們才能使用
    val dotRDD: RDD[(Long, Long)] = dataRDD.flatMap { case (buffer, _) =>
      buffer.map(id => (id, buffer.mkString.toLong))
    }
    //    dotRDD.foreach(println(_))
    //(41,41)
    //(12,122233)
    //(22,223444)
    //(34,223444)
    //(33,3353)
    //(44,223444)
    //(21,213241)
    //(32,213241)
    //(41,213241)
    //(22,122233)
    //(33,122233)
    //(11,112131)
    //(21,112131)
    //(31,112131)
    //(53,3353)

    //定義訂單的RDD壹粟,每個id都是一個訂單
    val vertexesRDD = dotRDD.map(x => (x._1, 0))
    //定義邊信息
    val edgeRDD = dotRDD.map(x => Edge(x._1, x._2, 0))

    //定義圖信息
    val graph = Graph(vertexesRDD, edgeRDD)
    //輸出點所有的信息
    // graph.vertices.foreach(println(_))
    //(31,0)
    //(3353,0)
    //(22,0)
    //(32,0)
    //(21,0)
    //(12,0)
    //(34,0)
    //(11,0)
    //(112131,0)
    //(44,0)
    //(53,0)
    //(41,0)
    //(223444,0)
    //(213241,0)
    //(122233,0)
    //(33,0)
    //使用強(qiáng)連通圖拜隧,生成的數(shù)據(jù)可以看見看見,是被分為了兩類
    //    graph.connectedComponents().vertices.foreach(println(_))
    // (22,12)
    //(32,11)
    //(34,12)
    //(21,11)
    //(12,12)
    //(31,11)
    //(11,11)
    //(112131,11)
    //(3353,12)
    //(53,12)
    //(41,11)
    //(213241,11)
    //(122233,12)
    //(33,12)
    //(44,12)
    //(223444,12)
    //計算出連通點后,其實就是為了數(shù)據(jù)(112131,11)洪添、(213241,11)垦页、(41,11)、(122233,12)(223444,12)
    //這樣就可以上最上面的rdd(dataRDD)將第一個集合的數(shù)據(jù)壓縮成上面的key干奢,就可以確認(rèn)出來哪些數(shù)據(jù)為一條數(shù)據(jù)
    val vertices = graph.connectedComponents().vertices

    //定義中心點的數(shù)據(jù)痊焊,即(List(11L, 21L, 31L), List("kw$北京" -> 1.0, "kw$上海" -> 1.0, "area$中關(guān)村" -> 1.0)) =》
    // (112131,(List(11L, 21L, 31L), List("kw$北京" -> 1.0, "kw$上海" -> 1.0, "area$中關(guān)村" -> 1.0)))
    //使用這樣的數(shù)據(jù),與vertices 進(jìn)行join忿峻,如下
    val allInfoRDD: RDD[(VertexId, (List[VertexId], List[(String, Double)]))] = dataRDD.map { case (ids, infos) =>
      (ids.mkString.toLong, (ids, infos))
    }
    //(223444,(12,(List(22, 34, 44),List((kw$spark,1.0), (area$五道口,1.0)))))
    //那么就可以通過value的第一個元素進(jìn)行合并數(shù)據(jù)
    val joinRDD: RDD[(VertexId, (VertexId, (List[VertexId], List[(String, Double)])))] = vertices.join(allInfoRDD)
    //  value.foreach(println(_)),可以通過12宋光、12進(jìn)行reduceBy
    //(223444,(12,(List(22, 34, 44),List((kw$spark,1.0), (area$五道口,1.0)))))
    //(3353,(12,(List(33, 53),List((kw$hive,1.0), (kw$spark,1.0), (area$西二旗,1.0)))))
    //(112131,(11,(List(11, 21, 31),List((kw$北京,1.0), (kw$上海,1.0), (area$中關(guān)村,1.0)))))
    //(41,(11,(List(41),List((kw$天津,1.0), (area$中關(guān)村,1.0)))))
    //(213241,(11,(List(21, 32, 41),List((kw$上海,1.0), (kw$天津,1.0), (area$回龍觀,1.0)))))
    //(122233,(12,(List(12, 22, 33),List((kw$大數(shù)據(jù),1.0), (kw$spark,1.0), (area$西二旗,1.0)))))


    val mergeInfo = joinRDD.map { case (_, infos) => (infos._1, infos._2) }
    //(12,(List(22, 34, 44),List((kw$spark,1.0), (area$五道口,1.0))))


    //下面兩個步驟可以合并到一起,shuffle階段直接對數(shù)據(jù)去重炭菌、合并
    val resultRDD = mergeInfo.reduceByKey { case ((ids, infos), (id, info)) =>
      val newIds = ids ++ id
      val newInfos = infos ++ info
      (newIds, newInfos)
    }
    val result = resultRDD.mapValues { case (newIds, newInfos) =>
      val ids = newIds.distinct
      val infos: Map[String, Double] = newInfos.groupBy(_._1).mapValues(list => list.map(_._2).sum)
      (ids, infos)
    }

    result.foreach(println(_))
    //(11,(List(41, 21, 32, 11, 31),Map(area$中關(guān)村 -> 2.0, kw$北京 -> 1.0, kw$天津 -> 2.0, kw$上海 -> 2.0, area$回龍觀 -> 1.0)))
    //(12,(List(33, 53, 12, 22, 34, 44),Map(kw$大數(shù)據(jù) -> 1.0, kw$spark -> 3.0, area$五道口 -> 1.0, area$西二旗 -> 2.0, kw$hive -> 1.0)))

    sc.stop()

  }

}
spark下任務(wù)三錯題1.png
spark下任務(wù)三錯題2.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市逛漫,隨后出現(xiàn)的幾起案子黑低,更是在濱河造成了極大的恐慌,老刑警劉巖酌毡,帶你破解...
    沈念sama閱讀 216,591評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件克握,死亡現(xiàn)場離奇詭異,居然都是意外死亡枷踏,警方通過查閱死者的電腦和手機(jī)菩暗,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,448評論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來旭蠕,“玉大人停团,你說我怎么就攤上這事√桶荆” “怎么了佑稠?”我有些...
    開封第一講書人閱讀 162,823評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長旗芬。 經(jīng)常有香客問我舌胶,道長,這世上最難降的妖魔是什么疮丛? 我笑而不...
    開封第一講書人閱讀 58,204評論 1 292
  • 正文 為了忘掉前任幔嫂,我火速辦了婚禮,結(jié)果婚禮上誊薄,老公的妹妹穿的比我還像新娘履恩。我一直安慰自己,他們只是感情好呢蔫,可當(dāng)我...
    茶點故事閱讀 67,228評論 6 388
  • 文/花漫 我一把揭開白布似袁。 她就那樣靜靜地躺著,像睡著了一般。 火紅的嫁衣襯著肌膚如雪昙衅。 梳的紋絲不亂的頭發(fā)上扬霜,一...
    開封第一講書人閱讀 51,190評論 1 299
  • 那天,我揣著相機(jī)與錄音而涉,去河邊找鬼著瓶。 笑死,一個胖子當(dāng)著我的面吹牛啼县,可吹牛的內(nèi)容都是我干的材原。 我是一名探鬼主播,決...
    沈念sama閱讀 40,078評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼季眷,長吁一口氣:“原來是場噩夢啊……” “哼余蟹!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起子刮,我...
    開封第一講書人閱讀 38,923評論 0 274
  • 序言:老撾萬榮一對情侶失蹤威酒,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后挺峡,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體葵孤,經(jīng)...
    沈念sama閱讀 45,334評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,550評論 2 333
  • 正文 我和宋清朗相戀三年橱赠,在試婚紗的時候發(fā)現(xiàn)自己被綠了尤仍。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,727評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡狭姨,死狀恐怖宰啦,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情饼拍,我是刑警寧澤绑莺,帶...
    沈念sama閱讀 35,428評論 5 343
  • 正文 年R本政府宣布,位于F島的核電站惕耕,受9級特大地震影響纺裁,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜司澎,卻給世界環(huán)境...
    茶點故事閱讀 41,022評論 3 326
  • 文/蒙蒙 一欺缘、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧挤安,春花似錦谚殊、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,672評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽丛肢。三九已至,卻和暖如春剿干,著一層夾襖步出監(jiān)牢的瞬間蜂怎,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,826評論 1 269
  • 我被黑心中介騙來泰國打工置尔, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留杠步,地道東北人。 一個月前我還...
    沈念sama閱讀 47,734評論 2 368
  • 正文 我出身青樓榜轿,卻偏偏與公主長得像幽歼,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子谬盐,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,619評論 2 354

推薦閱讀更多精彩內(nèi)容

  • 寫在前面 態(tài)度決定高度甸私!讓優(yōu)秀成為一種習(xí)慣! 世界上沒有什么事兒是加一次班解決不了的飞傀,如果有皇型,就加兩次!(- - ...
    夜盡天明時閱讀 36,284評論 10 33
  • 【轉(zhuǎn)載】原文地址:原文地址 概述 ??GraphX是Spark中用于圖和圖計算的組件助析,GraphX通過擴(kuò)展Spar...
    木亦汐閱讀 4,451評論 0 1
  • 網(wǎng)上graphx實現(xiàn)最短路徑的代碼比較多,但是都是scala版本椅您,java版本的實現(xiàn)很少外冀。1.創(chuàng)建圖數(shù)據(jù)使用的方法...
    寒江老翁閱讀 776評論 0 1
  • 引子:筆者有一段時間學(xué)習(xí)使用 spark 圖算法實現(xiàn) One ID 的工作,看到一篇文章打算翻譯掀泳,今天得空可以還債...
    _糖sir_閱讀 4,112評論 0 2
  • Spark GraphX GraphX簡介 主要特點 演化過程 應(yīng)用場景 分布式圖計算處理技術(shù)介紹 下面分別從圖數(shù)...
    raincoffee閱讀 1,342評論 0 0