網(wǎng)上graphx實現(xiàn)最短路徑的代碼比較多孽水,但是都是scala版本色乾,java版本的實現(xiàn)很少。
1.創(chuàng)建圖數(shù)據(jù)
使用的方法是Graph.apply()仙蛉,下面看一下scala的該方法的定義:
def apply[VD, ED](vertices : org.apache.spark.rdd.RDD[scala.Tuple2[org.apache.spark.graphx.VertexId, VD]], edges : org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[ED]], defaultVertexAttr : VD = { /* compiled code */ }, edgeStorageLevel : org.apache.spark.storage.StorageLevel = { /* compiled code */ }, vertexStorageLevel : org.apache.spark.storage.StorageLevel = { /* compiled code */ })(implicit evidence$18 : scala.reflect.ClassTag[VD], evidence$19 : scala.reflect.ClassTag[ED]) : org.apache.spark.graphx.Graph[VD, ED] = { /* compiled code */ }
其中VD是頂點的屬性的類型(可以是list或者Tuple)主巍,ED是邊的屬性的類型(可以是list或者Tuple)冠息。
vertices是頂點的rdd,其中rdd中的元素結(jié)構(gòu)是Tuple2<點id孕索,VD>
edges是邊的rdd逛艰,其中rdd中的元素結(jié)構(gòu)是Tuple2<邊id,VD>
defaultVertexAttr是點屬性的默認值搞旭,假設(shè)創(chuàng)建一條邊散怖,1->2,但是id為2的點我沒有創(chuàng)建肄渗,只創(chuàng)建了1的點镇眷,那么這時候就會自動生產(chǎn)一個id為2的點,點的屬性就是這個默認值恳啥。
edgeStorageLevel點的存儲等級
vertexStorageLevel邊的存儲等級
點屬性的classTag
邊屬性的classTag
其中classTag偏灿,包含實際運行時的類的類型丹诀。
創(chuàng)建的圖數(shù)據(jù)如下:
代碼:
private static Graph<Tuple2<String, Integer>, Tuple2<Integer, Integer>> createGraph(
JavaSparkContext javaSparkContext) {
// 必須是Tuple2<Object, Tuple2<String, Integer>>钝的,不能是Tuple2<Long, Tuple2<String, Integer>>
List<Tuple2<Object, Tuple2<String, Integer>>> vertexList = new ArrayList<>();
vertexList.add(new Tuple2<>(1L, new Tuple2<>("mar_1", 21)));
vertexList.add(new Tuple2<>(2L, new Tuple2<>("mar_2", 22)));
vertexList.add(new Tuple2<>(3L, new Tuple2<>("mar_3", 23)));
vertexList.add(new Tuple2<>(4L, new Tuple2<>("mar_4", 24)));
vertexList.add(new Tuple2<>(5L, new Tuple2<>("mar_5", 25)));
vertexList.add(new Tuple2<>(6L, new Tuple2<>("mar_6", 26)));
vertexList.add(new Tuple2<>(7L, new Tuple2<>("mar_7", 27)));
vertexList.add(new Tuple2<>(8L, new Tuple2<>("mar_8", 28)));
vertexList.add(new Tuple2<>(9L, new Tuple2<>("mar_9", 29)));
JavaRDD<Tuple2<Object, Tuple2<String, Integer>>> vertexRdd = javaSparkContext
.parallelize(vertexList);
List<Edge<Tuple2<Integer, Integer>>> edgeList = new ArrayList<>();
edgeList.add(new Edge<>(1, 2, new Tuple2<>(1, 1)));
edgeList.add(new Edge<>(2, 3, new Tuple2<>(2, 2)));
edgeList.add(new Edge<>(1, 4, new Tuple2<>(3, 3)));
edgeList.add(new Edge<>(1, 5, new Tuple2<>(4, 4)));
edgeList.add(new Edge<>(1, 6, new Tuple2<>(5, 5)));
edgeList.add(new Edge<>(4, 7, new Tuple2<>(6, 6)));
edgeList.add(new Edge<>(7, 8, new Tuple2<>(7, 7)));
edgeList.add(new Edge<>(5, 8, new Tuple2<>(8, 8)));
edgeList.add(new Edge<>(8, 9, new Tuple2<>(9, 9)));
edgeList.add(new Edge<>(6, 9, new Tuple2<>(10, 10)));
edgeList.add(new Edge<>(3, 9, new Tuple2<>(11, 11)));
JavaRDD<Edge<Tuple2<Integer, Integer>>> edgeRdd = javaSparkContext.parallelize(edgeList);
Tuple2<String, Integer> defaultVertex = new Tuple2<>("default", -1);
// ClassTag$.MODULE$.apply(Tuple2.class)所有用到的都改為ClassTag$.MODULE$.apply(Object.class)否則報錯
Graph<Tuple2<String, Integer>, Tuple2<Integer, Integer>> graph = Graph
.apply(vertexRdd.rdd(), edgeRdd.rdd(), defaultVertex, StorageLevels.MEMORY_ONLY,
StorageLevels.MEMORY_ONLY, ClassTag$.MODULE$.apply(Object.class),
ClassTag$.MODULE$.apply(Object.class));
graph.vertices().toJavaRDD()
.foreach(x -> System.out.println("vertex id:: " + x._1 + " , attr:: " + x._2));
graph.edges().toJavaRDD().foreach(
x -> System.out.println(
"edge id:: " + x.attr._1 + " , src:: " + x.srcId() + " , dest:: " + x
.dstId() + " weigh:: " + x.attr._2));
return graph;
}
注意其中翁垂,點的id要用Object類型的否則會報錯;還有classTag也用Object.class的否則也報錯硝桩,目前不知道什么原因沿猜。
2.最短路徑
這里用的是Pregel,這里簡單介紹一下詳細了解可以自行百度碗脊,Pregel框架是有谷歌提出啼肩,圖并行技術(shù)框架,以頂點為中心不斷的進行算法的迭代和數(shù)據(jù)同步衙伶。
Pregel的迭代過程如下:
- 最開始祈坠,圖中的所有頂點都會收到一個默認的消息,這個默認值就是方法的第一個參數(shù)矢劲。
- 各個頂點收到消息后調(diào)用vprog函數(shù)赦拘,生產(chǎn)新的消息。
- 調(diào)用sendMsg函數(shù)發(fā)送消息給下一輪迭代的頂點芬沉,這個函數(shù)將決定將消息發(fā)送給誰躺同。
- 接收到消息的頂點,調(diào)用vprog函數(shù)(如果收到多個消息丸逸,先調(diào)用mergeMsg)蹋艺,生產(chǎn)新的消息。其中最開始時是所有頂點都能收到消息的黄刚。
- 如果沒有頂點收到消息捎谨,或者到底迭代次數(shù)maxIterations退出計算,完成憔维。
下面看下用到的方法的幾個參數(shù):
def pregel[A](initialMsg : A, maxIterations : scala.Int = { /* compiled code */ }, activeDirection : org.apache.spark.graphx.EdgeDirection = { /* compiled code */ })(vprog : scala.Function3[org.apache.spark.graphx.VertexId, VD, A, VD], sendMsg : scala.Function1[org.apache.spark.graphx.EdgeTriplet[VD, ED], scala.Iterator[scala.Tuple2[org.apache.spark.graphx.VertexId, A]]], mergeMsg : scala.Function2[A, A, A])(implicit evidence$6 : scala.reflect.ClassTag[A]) : org.apache.spark.graphx.Graph[VD, ED] = { /* compiled code */ }
initialMsg:
第一輪迭代計算時侍芝,所有頂點收到的消息。(A表示消息類型)
maxIterations:
最大迭代次數(shù)(整型)
activeDirection:
沿著邊迭代的方向埋同。
vprog : scala.Function3[org.apache.spark.graphx.VertexId, VD, A, VD]:
在步驟2中調(diào)用的函數(shù)州叠,接收消息,然后生產(chǎn)頂點的新的屬性凶赁。
可以看到這個函數(shù)是scala.Function3類型的咧栗,3個入?yún)ⅲ?個返回值。
第一個入?yún)⑹屈cid虱肄,第二個參數(shù)是點的原來的屬性致板,第三個參數(shù)是接收到的消息,返回值是點的新的屬性咏窿。
scala.Function1[org.apache.spark.graphx.EdgeTriplet[VD, ED], scala.Iterator[scala.Tuple2[org.apache.spark.graphx.VertexId, A]]]:
發(fā)送消息的函數(shù)斟或,1個入?yún)ⅲ?個返回值。
入?yún)⑹荅dgeTriplet類型集嵌,這是保存邊的信息的一個類萝挤,這個類包括源點的屬性御毅、目的點的屬性、源點的id怜珍、目的點的id端蛆、以及邊的屬性黑竞。例如踱卵,如果a點收到消息,那么這個入?yún)⒕褪且詀為源的邊颜骤,a->b柔袁。返回值是發(fā)送消息到的點的id(例如b的id)呆躲,和發(fā)送的消息。
mergeMsg : scala.Function2[A, A, A]:
合并函數(shù)捶索,每個點可能收到多個消息歼秽,需要對消息進行合并。合并后在將消息作為入?yún)⒄{(diào)用vprog情组。
求最短路徑(指定起始點)的思路:
- 先對圖的點進行加工燥筷,即mapVertices,如果是起點院崇,那么該點的屬性為0肆氓,否則為整型最大值(這個值要大于圖中的最長路徑),這個屬性的意思就是路徑的長度底瓣,起始點路徑長度是0谢揪,其他的點路徑長度是一個很大的值。
2.然后就開始用Pregel進行迭代捐凭,第一次所有點都收到消息(代碼里面設(shè)置的是整型最大值)拨扶,收到消息后點的屬性和消息取最小值,結(jié)果作為點的新的屬性茁肠。所以起始點的屬性為0患民,其他的為整型最大值。
3.發(fā)送消息給下一輪迭代的頂點垦梆,函數(shù)中判斷srcAttr + 1 < dstAttr匹颤,如果滿足這發(fā)送,不滿足則不發(fā)送托猩,這樣只有和起始頂點直接連接的頂點才能收到消息印蓖。發(fā)送的消息是srcAttr + 1,這正好表示路徑的長度京腥。
4.頂點收到消息后赦肃,進行merge操作,取最小的,這就模擬了最短路徑他宛,例如到該點有兩條路徑船侧,一條長度為3,一條為長度為2堕汞,那么把2作為該點的新的屬性勺爱。
5.滿足迭代條件后結(jié)束計算晃琳,最后生產(chǎn)的點讯检,包含id和一個屬性,這個屬性就是起始點到該點的路徑長度卫旱。
整體代碼:
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.graphx.Edge;
import org.apache.spark.graphx.EdgeDirection;
import org.apache.spark.graphx.EdgeTriplet;
import org.apache.spark.graphx.Graph;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.JavaConversions;
import scala.reflect.ClassTag$;
import scala.runtime.AbstractFunction1;
import scala.runtime.AbstractFunction2;
import scala.runtime.AbstractFunction3;
public class ShortestPath {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("shortest_path").setMaster("local[2]");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
Graph<Tuple2<String, Integer>, Tuple2<Integer, Integer>> graph = createGraph(
javaSparkContext);
Object obj = Predef1.reflexivity();
scala.Predef.$eq$colon$eq<Tuple2<String, Integer>, Long> ev = (scala.Predef.$eq$colon$eq<Tuple2<String, Integer>, Long>) obj;
Graph<Long, Tuple2<Integer, Integer>> initGraph = graph
.mapVertices(new MapVerticesFunction(), ClassTag$.MODULE$.apply(Object.class), ev);
// initGraph.vertices().toJavaRDD().foreach(x -> System.out.println(x));
Graph<Long, Tuple2<Integer, Integer>> sssp = initGraph.ops()
.pregel((long)Integer.MAX_VALUE, 1, EdgeDirection.Out(), new VertexProgram(), new SendMsgFunction(),
new MergeMsgFunction(), ClassTag$.MODULE$.apply(Object.class));
sssp.vertices().toJavaRDD().foreach(x -> System.out.println(x));
}
static class VertexProgram extends AbstractFunction3<Object, Long, Long, Long> implements
Serializable {
@Override
public Long apply(Object id, Long vertexAttr, Long newVertexAttr) {
Long min = Math.min(vertexAttr, newVertexAttr);
System.out.println("id:: "+id+" :: "+vertexAttr+" :: "+newVertexAttr+" min "+min);
return min;
}
}
static class SendMsgFunction extends
AbstractFunction1<EdgeTriplet<Long, Tuple2<Integer, Integer>>, Iterator<Tuple2<Object, Long>>> implements
Serializable {
@Override
public Iterator<Tuple2<Object, Long>> apply(
EdgeTriplet<Long, Tuple2<Integer, Integer>> triplet) {
long srcAttr = triplet.srcAttr();
long dstAttr = triplet.dstAttr();
// System.out.println(srcAttr+" "+attr+" "+dstAttr);
System.out.println("srcid: "+triplet.srcId()+" destid: "+triplet.dstId()+" srca: "+srcAttr+" desa: "+dstAttr);
if (srcAttr + 1 < dstAttr) {
return JavaConversions.asScalaIterator(
Collections.singletonList(
new Tuple2<Object, Long>(triplet.dstId(), srcAttr + 1))
.iterator());
} else {
return JavaConversions.asScalaIterator(Collections.emptyIterator());
}
}
}
static class MergeMsgFunction extends AbstractFunction2<Long, Long, Long>
implements Serializable {
@Override
public Long apply(Long a, Long b) {
// System.out.println("marge ->" + Math.min((long) a, (long) b) + " -> " + a + " " + b);
return Math.min(a, b);
}
}
public static class Predef1 {
static public <T> scala.Predef.$eq$colon$eq<T, T> reflexivity() {
return scala.Predef.$eq$colon$eq$.MODULE$.tpEquals();
}
}
static class MapVerticesFunction extends
AbstractFunction2<Object, Tuple2<String, Integer>, Long> implements Serializable {
@Override
public Long apply(Object v1, Tuple2<String, Integer> v2) {
if ((long) v1 == 1) {
return 0L;
} else {
return (long) Integer.MAX_VALUE;
}
}
}
private static Graph<Tuple2<String, Integer>, Tuple2<Integer, Integer>> createGraph(
JavaSparkContext javaSparkContext) {
// 必須是Tuple2<Object, Tuple2<String, Integer>>人灼,不能是Tuple2<Long, Tuple2<String, Integer>>
List<Tuple2<Object, Tuple2<String, Integer>>> vertexList = new ArrayList<>();
vertexList.add(new Tuple2<>(1L, new Tuple2<>("mar_1", 21)));
vertexList.add(new Tuple2<>(2L, new Tuple2<>("mar_2", 22)));
vertexList.add(new Tuple2<>(3L, new Tuple2<>("mar_3", 23)));
vertexList.add(new Tuple2<>(4L, new Tuple2<>("mar_4", 24)));
vertexList.add(new Tuple2<>(5L, new Tuple2<>("mar_5", 25)));
vertexList.add(new Tuple2<>(6L, new Tuple2<>("mar_6", 26)));
vertexList.add(new Tuple2<>(7L, new Tuple2<>("mar_7", 27)));
vertexList.add(new Tuple2<>(8L, new Tuple2<>("mar_8", 28)));
vertexList.add(new Tuple2<>(9L, new Tuple2<>("mar_9", 29)));
JavaRDD<Tuple2<Object, Tuple2<String, Integer>>> vertexRdd = javaSparkContext
.parallelize(vertexList);
List<Edge<Tuple2<Integer, Integer>>> edgeList = new ArrayList<>();
edgeList.add(new Edge<>(1, 2, new Tuple2<>(1, 1)));
edgeList.add(new Edge<>(2, 3, new Tuple2<>(2, 2)));
edgeList.add(new Edge<>(1, 4, new Tuple2<>(3, 3)));
edgeList.add(new Edge<>(1, 5, new Tuple2<>(4, 4)));
edgeList.add(new Edge<>(1, 6, new Tuple2<>(5, 5)));
edgeList.add(new Edge<>(4, 7, new Tuple2<>(6, 6)));
edgeList.add(new Edge<>(7, 8, new Tuple2<>(7, 7)));
edgeList.add(new Edge<>(5, 8, new Tuple2<>(8, 8)));
edgeList.add(new Edge<>(8, 9, new Tuple2<>(9, 9)));
edgeList.add(new Edge<>(6, 9, new Tuple2<>(10, 10)));
edgeList.add(new Edge<>(3, 9, new Tuple2<>(11, 11)));
JavaRDD<Edge<Tuple2<Integer, Integer>>> edgeRdd = javaSparkContext.parallelize(edgeList);
Tuple2<String, Integer> defaultVertex = new Tuple2<>("default", -1);
// ClassTag$.MODULE$.apply(Tuple2.class)所有用到的都改為ClassTag$.MODULE$.apply(Object.class)否則報錯
Graph<Tuple2<String, Integer>, Tuple2<Integer, Integer>> graph = Graph
.apply(vertexRdd.rdd(), edgeRdd.rdd(), defaultVertex, StorageLevels.MEMORY_ONLY,
StorageLevels.MEMORY_ONLY, ClassTag$.MODULE$.apply(Object.class),
ClassTag$.MODULE$.apply(Object.class));
graph.vertices().toJavaRDD()
.foreach(x -> System.out.println("vertex id:: " + x._1 + " , attr:: " + x._2));
graph.edges().toJavaRDD().foreach(
x -> System.out.println(
"edge id:: " + x.attr._1 + " , src:: " + x.srcId() + " , dest:: " + x
.dstId() + " weigh:: " + x.attr._2));
return graph;
}
}
看下輸出結(jié)果:
(1,0)
(3,2147483647)
(7,2147483647)
(4,1)
(9,2147483647)
(5,1)
(6,1)
(8,2147483647)
(2,1)
上面的迭代次數(shù)只設(shè)置為1次,就是以起始點開始向外第一層(可以自行修改)顾翼,對比圖看結(jié)果投放,和1點直接連接的是2、4适贸、5灸芳、6,結(jié)果中對應的長度為1拜姿,其他點為Integer.MAX_VALUE烙样,如果想取固定的目的點,加過過濾即可蕊肥。
注意:
由于java和scala的兼容性問題谒获,如果你的idea爆紅,不用理會壁却,不影響編譯和運行批狱。
maven依賴:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>2.3.0</version>
</dependency>
版權(quán)聲明
1.以上文章為本人原創(chuàng),首發(fā)于簡書網(wǎng)展东,文責自負赔硫。
2.未經(jīng)作者同意不得轉(zhuǎn)載,如需轉(zhuǎn)載請聯(lián)系作者盐肃。感謝卦停。