圖中環(huán)路檢測(cè)
一般是三色標(biāo)記,使用dfs 算法; 三種狀態(tài):
- 未訪(fǎng)問(wèn) visited =0
- 訪(fǎng)問(wèn),但子節(jié)點(diǎn)未訪(fǎng)問(wèn)完 visited=1
- 全部訪(fǎng)問(wèn)绒净,visited =2;
如果在訪(fǎng)問(wèn)過(guò)程中, dfs 下一個(gè)節(jié)點(diǎn)闪金,為visited=1,則 有環(huán)疯溺;
使用graphx 實(shí)現(xiàn)
graphx 一般只能實(shí)現(xiàn)基于消息機(jī)制的 bfs 算法
在我們的實(shí)現(xiàn)中论颅,為每個(gè) node哎垦,記錄 路徑, 然后看下一個(gè)節(jié)點(diǎn)恃疯,是否包含在路徑中漏设,如果包含則有圖;
每個(gè)節(jié)點(diǎn)今妄,需要保存路徑郑口,內(nèi)存開(kāi)銷(xiāo)很大鸳碧,目前還沒(méi)有找到好的算法實(shí)現(xiàn);
參考犬性,scala 版本
https://codeleading.com/article/35132638895/
java 版本:
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("Graphx Learning");
conf.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
ClassTag<Long> LongTag = scala.reflect.ClassTag$.MODULE$.apply(Long.class);
ClassTag<List<Long>> ListLongTag = scala.reflect.ClassTag$.MODULE$.apply(List.class);
ClassTag<HashMap<Object,Long>> mapTag = scala.reflect.ClassTag$.MODULE$.apply(Map.class);
List<String> edges = Lists.newArrayList(
"1 2",
"2 3",
"3 4",
"4 5",
"5 1",
"5 3",
"6 7",
"7 6",
"7 8",
"8 7",
"1 9",
"9 1"
);
JavaRDD<Edge<Long>> rddEdges = sc.parallelize(edges).map(x->
new Edge(Long.parseLong(x.split(" ")[0]),
Long.parseLong(x.split(" ")[1]),1L));
Graph<List<Long>,Long> graph = Graph.fromEdges(rddEdges.rdd(),new ArrayList<>(),
StorageLevel.MEMORY_ONLY(), StorageLevel.MEMORY_ONLY(),ListLongTag,LongTag);
Object obj = reflexivity();
Predef.$eq$colon$eq<List<Long>,List<Long>> tpEqus = ( Predef.$eq$colon$eq<List<Long>,List<Long>>) obj;
Graph<List<Long>, Long> ready = graph.mapVertices(new SimpleInitGraph(),ListLongTag,tpEqus);
Graph<List<Long>,Long> LPA = ready.ops().pregel(new ArrayList<>(),
10, EdgeDirection.Either(),
new VProj(), new SendMsg(), new ReduceMsg(),ListLongTag );
LPA.vertices().toJavaRDD()
// .filter(x->x._2.get(0).equals(x._2.get(x._2.size()-1)))
.foreach(x->{
System.out.println(x._1+"\t"+x._2);
});
}
static public <T> Predef.$eq$colon$eq<T, T> reflexivity() {
return Predef.$eq$colon$eq$.MODULE$.tpEquals();
}
static class SimpleInitGraph extends AbstractFunction2<Object,List<Long>,List<Long>> implements Serializable {
@Override
public List<Long> apply(Object v1, List<Long> v2) {
return new ArrayList<>();
}
}
static class VProj extends AbstractFunction3<Object,List<Long>,List<Long>,List<Long>> implements Serializable {
@Override
public List<Long> apply(Object v1, List<Long> v2, List<Long> v3) {
List<Long> res = new ArrayList<>();
res.addAll(v3);
return res;
}
}
static class SendMsg extends AbstractFunction1<EdgeTriplet<List<Long>,Long>, Iterator<Tuple2<Object,List<Long>>>>
implements Serializable{
@Override
public Iterator<Tuple2<Object, List<Long>>> apply(EdgeTriplet<List<Long>, Long> v1) {
List<Tuple2<Object,List<Long>>> res = new ArrayList<>();
if(v1.srcAttr().size() == 0){
Object id = v1.dstId();
List<Long> src_des = Lists.newArrayList(v1.srcId(),v1.dstId());
Tuple2<Object,List<Long>> path = new Tuple2<>(id, src_des);
res.add(path);
return JavaConverters.asScalaIteratorConverter(res.iterator()).asScala();
}else{
Long desId = v1.dstId();
List<Long> oldPath = v1.srcAttr();
for(int i=1;i<oldPath.size();i++) {
if (oldPath.get(i).equals(desId)) {
//find a circle, don't repeat
System.out.println("find a circle " + oldPath + "\t" + desId );
return JavaConverters.asScalaIteratorConverter(res.iterator()).asScala();
}
}
Object id = v1.dstId();
List<Long> src_des = new ArrayList<>();
src_des.addAll(oldPath);
src_des.add(desId);
Tuple2<Object,List<Long>> path = new Tuple2<>(id, src_des);
res.add(path);
return JavaConverters.asScalaIteratorConverter(res.iterator()).asScala();
}
}
}
static class ReduceMsg extends AbstractFunction2<List<Long>,List<Long>,List<Long>>
implements Serializable {
@Override
public List<Long> apply(List<Long> v1, List<Long> v2) {
return v1;
}
}