Gelly簡介
Gelly是Flink的圖API庫撑蒜,它包含了一組旨在簡化Flink中圖形分析應(yīng)用程序開發(fā)的方法和實用程序。在Gelly中葡公,可以使用類似于批處理API提供的高級函數(shù)來轉(zhuǎn)換和修改圖另患。Gelly提供了創(chuàng)建盏袄、轉(zhuǎn)換和修改圖的方法凭语,以及圖算法庫葱她。
使用Gelly
在項目中為了能方便地使用Gelly,可以在pom.xml
中引入以下依賴:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-gelly_2.11</artifactId>
<version>1.7.0</version>
</dependency>
在運行Gelly程序之前似扔,Gelly庫jar在opt目錄下的Flink發(fā)行版中提供(對于超過Flink 1.2的版本吨些,可以從Maven Central手動下載)搓谆。要運行Gelly示例,必須將Flink - Gelly(用于Java)或Flink - gely - Scala(用于Scala) jar復(fù)制到Flink的lib目錄
cp opt/flink-gelly_*.jar lib/
cp opt/flink-gelly-scala_*.jar lib/
圖API
Graph Representation
在Gelly中豪墅,一個圖(Graph
)由頂點的數(shù)據(jù)集(DataSet
)和邊的數(shù)據(jù)集(DataSet
)組成泉手。圖中的頂點由Vertex
類型來表示,一個Vertex
由唯一的ID和一個值來表示偶器。其中Vertex
的ID必須是全局唯一的值斩萌,且實現(xiàn)了Comparable
接口。如果節(jié)點不需要由任何值屏轰,則該值類型可以聲明成NullValue
類型颊郎。
// create a new vertex with a Long ID and a String value
Vertex<Long, String> v = new Vertex<Long, String>(1L, "foo");
// create a new vertex with a Long ID and no value
Vertex<Long, NullValue> v = new Vertex<Long, NullValue>(1L, NullValue.getInstance());
圖中的邊由Edge
類型來表示,一個Edge
通常由源頂點的ID亭枷,目標(biāo)頂點的ID以及一個可選的值來表示袭艺。其中源頂點和目標(biāo)頂點的類型必須與Vertex
的ID類型相同搀崭。同樣的叨粘,如果邊不需要由任何值,則該值類型可以聲明成NullValue
類型瘤睹。
Edge<Long, Double> e = new Edge<Long, Double>(1L, 2L, 0.5);
// reverse the source and target of this edge
Edge<Long, Double> reversed = e.reverse();
Double weight = e.getValue(); // weight = 0.5
在Gelly中升敲,一個Edge
總是從源頂點指向目標(biāo)頂點。如果圖中每條邊都能匹配一個從目標(biāo)頂點到源頂點的Edge
轰传,那么這個圖可能是個無向圖驴党。同樣地,無向圖可以用這個方式來表示获茬。
Graph Creation
我們可以通過以下幾種方式創(chuàng)建一個Graph
:
- 從一個
Edge
數(shù)據(jù)集合和一個Vertex
數(shù)據(jù)集合中創(chuàng)建圖港庄。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Vertex<String, Long>> vertices = ...
DataSet<Edge<String, Double>> edges = ...
Graph<String, Long, Double> graph = Graph.fromDataSet(vertices, edges, env);
-
從一個表示邊的
Tuple2
數(shù)據(jù)集合中創(chuàng)建圖。Gelly會將每個Tuple2
轉(zhuǎn)換成一個'Edge'恕曲,其中第一個元素表示源頂點的ID鹏氧,第二個元素表示目標(biāo)頂點的ID,圖中的頂點和邊的value值均被設(shè)置為NullValue
佩谣。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<String, String>> edges = ...
Graph<String, NullValue, NullValue> graph = Graph.fromTuple2DataSet(edges, env);
-
從一個
Tuple3
數(shù)據(jù)集和一個可選的Tuple2
數(shù)據(jù)集中生成圖把还。在這種情況下,Gelly會將每個Tuple3
轉(zhuǎn)換成Edge
茸俭,其中第一個元素域是源頂點ID吊履,第二個域是目標(biāo)頂點ID,第三個域是邊的值调鬓。同樣的艇炎,每個Tuple2
會轉(zhuǎn)換成一個頂點Vertex
,其中第一個域是頂點的ID腾窝,第二個域是頂點的value缀踪。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<String, Long>> vertexTuples = env.readCsvFile("path/to/vertex/input").types(String.class, Long.class);
DataSet<Tuple3<String, String, Double>> edgeTuples = env.readCsvFile("path/to/edge/input").types(String.class, String.class, Double.class);
Graph<String, Long, Double> graph = Graph.fromTupleDataSet(vertexTuples, edgeTuples, env);
-
從一個表示邊數(shù)據(jù)的CSV文件和一個可選的表示節(jié)點的CSV文件中生成圖腺晾。在這種情況下,Gelly會將表示邊的CSV文件中的每一行轉(zhuǎn)換成一個
Edge
辜贵,其中第一個域表示源頂點ID悯蝉,第二個域表示目標(biāo)頂點ID,第三個域表示邊的值托慨。同樣的鼻由,表示節(jié)點的CSV中的每一行都被轉(zhuǎn)換成一個Vertex
,其中第一個域表示頂點的ID厚棵,第二個域表示頂點的值蕉世。為了通過GraphCsvReader
生成圖,需要指定每個域的類型婆硬,可以使用下列之一的方法: - types(Class<K> vertexKey, Class<VV> vertexValue,Class<EV> edgeValue): both vertex and edge values are present.
- edgeTypes(Class<K> vertexKey, Class<EV> edgeValue): the Graph has edge values, but no vertex values.
- vertexTypes(Class<K> vertexKey, Class<VV> vertexValue): the Graph has vertex values, but no edge values.
- keyType(Class<K> vertexKey): the Graph has no vertex values and no edge values.
// create a Graph with String Vertex IDs, Long Vertex values and Double Edge values
Graph<String, Long, Double> graph = Graph.fromCsvReader("path/to/vertex/input", "path/to/edge/input", env)
.types(String.class, Long.class, Double.class);
// create a Graph with neither Vertex nor Edge values
Graph<Long, NullValue, NullValue> simpleGraph = Graph.fromCsvReader("path/to/edge/input", env).keyType(Long.class);
-
從一個邊的集合和一個可選的頂點的集合中生成圖狠轻。如果在圖創(chuàng)建的時候頂點的集合沒有傳入,Gelly會依據(jù)數(shù)據(jù)的邊數(shù)據(jù)集合自動地生成一個
Vertex
集合彬犯。這種情況下向楼,創(chuàng)建的節(jié)點是沒有值的⌒城或者湖蜕,我們也可以像下面一樣,在創(chuàng)建圖的時候提供一個MapFunction
方法來初始化節(jié)點的值宋列。
List<Vertex<Long, Long>> vertexList = new ArrayList...
List<Edge<Long, String>> edgeList = new ArrayList...
Graph<Long, Long, String> graph = Graph.fromCollection(vertexList, edgeList, env);
// initialize the vertex value to be equal to the vertex ID
Graph<Long, Long, String> graph = Graph.fromCollection(edgeList,
new MapFunction<Long, Long>() {
public Long map(Long value) {
return value;
}
}, env);
Graph Properties
Gelly提供了下列方法來查詢圖的屬性和指標(biāo):
// get the Vertex DataSet
DataSet<Vertex<K, VV>> getVertices()
// get the Edge DataSet
DataSet<Edge<K, EV>> getEdges()
// get the IDs of the vertices as a DataSet
DataSet<K> getVertexIds()
// get the source-target pairs of the edge IDs as a DataSet
DataSet<Tuple2<K, K>> getEdgeIds()
// get a DataSet of <vertex ID, in-degree> pairs for all vertices
DataSet<Tuple2<K, LongValue>> inDegrees()
// get a DataSet of <vertex ID, out-degree> pairs for all vertices
DataSet<Tuple2<K, LongValue>> outDegrees()
// get a DataSet of <vertex ID, degree> pairs for all vertices, where degree is the sum of in- and out- degrees
DataSet<Tuple2<K, LongValue>> getDegrees()
// get the number of vertices
long numberOfVertices()
// get the number of edges
long numberOfEdges()
// get a DataSet of Triplets<srcVertex, trgVertex, edge>
DataSet<Triplet<K, VV, EV>> getTriplets()
Graph Transformations
-
Map:Gelly提供了專門的用于轉(zhuǎn)換頂點值和邊值的方法昭抒。
mapVertices
和mapEdges
會返回一個新圖,圖中的每個頂點和邊的ID不會改變炼杖,但是頂點和邊的值會根據(jù)用戶自定義的映射方法進(jìn)行修改灭返。這些映射方法同時也可以修改頂點和邊的值的類型。示例如下:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, edges, env);
// increment each vertex value by one
Graph<Long, Long, Long> updatedGraph = graph.mapVertices(
new MapFunction<Vertex<Long, Long>, Long>() {
public Long map(Vertex<Long, Long> value) {
return value.getValue() + 1;
}
});
-
Translate:Gelly還提供了專門用于根據(jù)用戶定義的函數(shù)轉(zhuǎn)換頂點和邊的ID和值的值及類型的方法(
translateGraphIDs
/translateVertexValues
/translateEdgeValues
)坤邪,是Map功能的升級版熙含,因為Map操作不支持修訂頂點和邊的ID。示例如下:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, edges, env);
// translate each vertex and edge ID to a String
Graph<String, Long, Long> updatedGraph = graph.translateGraphIds(
new MapFunction<Long, String>() {
public String map(Long id) {
return id.toString();
}
});
// translate vertex IDs, edge IDs, vertex values, and edge values to LongValue
Graph<LongValue, LongValue, LongValue> updatedGraph = graph
.translateGraphIds(new LongToLongValue())
.translateVertexValues(new LongToLongValue())
.translateEdgeValues(new LongToLongValue())
-
Filter:Gelly支持在圖中的頂點上或邊上執(zhí)行一個用戶指定的filter轉(zhuǎn)換罩扇。
filterOnEdges
會根據(jù)提供的在邊上的斷言在原圖的基礎(chǔ)上生成一個新的子圖婆芦,注意,頂點的數(shù)據(jù)不會被修改喂饥。同樣的filterOnVertices
在原圖的頂點上進(jìn)行filter轉(zhuǎn)換消约,不滿足斷言條件的源節(jié)點或目標(biāo)節(jié)點會在新的子圖中移除。該子圖方法支持同時對頂點和邊應(yīng)用filter函數(shù)员帮。示例如下:
Graph<Long, Long, Long> graph = ...
graph.subgraph(
new FilterFunction<Vertex<Long, Long>>() {
public boolean filter(Vertex<Long, Long> vertex) {
// keep only vertices with positive values
return (vertex.getValue() > 0);
}
},
new FilterFunction<Edge<Long, Long>>() {
public boolean filter(Edge<Long, Long> edge) {
// keep only edges with negative values
return (edge.getValue() < 0);
}
})
-
Join:Gelly提供了專門的方法用于將節(jié)點和邊的數(shù)據(jù)集合與其他的輸入數(shù)據(jù)集進(jìn)行連接或粮。
joinWithVertices
用于連接節(jié)點和一個輸入的Tuple2
數(shù)據(jù)集,連接操作通過使用節(jié)點的ID和輸入的Tuple2
數(shù)據(jù)集的第一個域作為連接的Key值捞高。該方法會根據(jù)用戶定義的轉(zhuǎn)換方法返回一個新圖氯材。類似的渣锦,一個數(shù)據(jù)集合也可以通過邊進(jìn)行連接,通過邊進(jìn)行連接有三種方式:joinWithEdges
的輸入是一個Tuple3
數(shù)據(jù)集氢哮,并將邊的源頂點ID和目標(biāo)頂點ID作為一個聯(lián)合的Key用于連接袋毙。joinWithEdgesOnSource
和joinWithEdgesOnTarget
均用于連接一個Tuple2
數(shù)據(jù)集,其中joinWithEdgesOnSource
針對Tuple2
的第一個域進(jìn)行連接冗尤,而joinWithEdgesOnTarget
針對Tuple2
的第二個域進(jìn)行連接听盖。值得注意的是,如果數(shù)據(jù)集中同一個Key出現(xiàn)多次裂七,Gelly中所有的Join方法僅針對第一個相同Key值得數(shù)據(jù)進(jìn)行連接操作皆看。示例如下:
Graph<Long, Double, Double> network = ...
DataSet<Tuple2<Long, LongValue>> vertexOutDegrees = network.outDegrees();
// assign the transition probabilities as the edge weights
Graph<Long, Double, Double> networkWithWeights = network.joinWithEdgesOnSource(vertexOutDegrees,
new VertexJoinFunction<Double, LongValue>() {
public Double vertexJoin(Double vertexValue, LongValue inputValue) {
return vertexValue / inputValue.getValue();
}
});
Reverse:Gelly中得
reverse()
方法用于在原圖的基礎(chǔ)上,生成一個所有邊方向與原圖相反的新圖背零。Undirected:在前面的內(nèi)容中腰吟,我們提到過,Gelly中的圖通常都是有向的徙瓶,而無向圖可以通過對所有邊添加反向的邊來實現(xiàn)毛雇,出于這個目的,Gelly提供了
getUndirected()
方法倍啥,用于獲取原圖的無向圖禾乘。-
Union:Gelly的
union()
操作用于聯(lián)合當(dāng)前圖和指定的輸入圖,并生成一個新圖虽缕,在輸出的新圖中,相同的節(jié)點只保留一份蒲稳,但是重復(fù)的邊會保留氮趋。如下圖所示:
Union Difference:Gelly提供了
difference()
方法用于發(fā)現(xiàn)當(dāng)前圖與指定的輸入圖之間的差異。Intersect:Gelly提供了
intersect()
方法用于發(fā)現(xiàn)兩個圖中共同存在的邊江耀,并將相同的邊以新圖的方式返回剩胁。相同的邊指的是具有相同的源頂點,相同的目標(biāo)頂點和相同的邊值祥国。返回的新圖中昵观,所有的節(jié)點沒有任何值,如果需要節(jié)點值舌稀,可以使用joinWithVertices()
方法去任何一個輸入圖中檢索啊犬。示例如下:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// create first graph from edges {(1, 3, 12) (1, 3, 13), (1, 3, 13)}
List<Edge<Long, Long>> edges1 = ...
Graph<Long, NullValue, Long> graph1 = Graph.fromCollection(edges1, env);
// create second graph from edges {(1, 3, 13)}
List<Edge<Long, Long>> edges2 = ...
Graph<Long, NullValue, Long> graph2 = Graph.fromCollection(edges2, env);
// Using distinct = true results in {(1,3,13)}
Graph<Long, NullValue, Long> intersect1 = graph1.intersect(graph2, true);
// Using distinct = false results in {(1,3,13),(1,3,13)} as there is one edge pair
Graph<Long, NullValue, Long> intersect2 = graph1.intersect(graph2, false);
Graph Mutations
Gelly內(nèi)置下列方法以支持對一個圖進(jìn)行節(jié)點和邊的增加/移除操作:
// adds a Vertex to the Graph.
//If the Vertex already exists, it will not be added again.
Graph<K, VV, EV> addVertex(final Vertex<K, VV> vertex)
// adds a list of vertices to the Graph.
//If the vertices already exist in the graph, they will not be added once more.
Graph<K, VV, EV> addVertices(List<Vertex<K, VV>> verticesToAdd)
// adds an Edge to the Graph.
//If the source and target vertices do not exist in the graph, they will also be added.
Graph<K, VV, EV> addEdge(Vertex<K, VV> source, Vertex<K, VV> target, EV edgeValue)
// adds a list of edges to the Graph.
// When adding an edge for a non-existing set of vertices,
//the edge is considered invalid and ignored.
Graph<K, VV, EV> addEdges(List<Edge<K, EV>> newEdges)
// removes the given Vertex and its edges from the Graph.
Graph<K, VV, EV> removeVertex(Vertex<K, VV> vertex)
// removes the given list of vertices and their edges from the Graph
Graph<K, VV, EV> removeVertices(List<Vertex<K, VV>> verticesToBeRemoved)
// removes *all* edges that match the given Edge from the Graph.
Graph<K, VV, EV> removeEdge(Edge<K, EV> edge)
// removes *all* edges that match the edges in the given list
Graph<K, VV, EV> removeEdges(List<Edge<K, EV>> edgesToBeRemoved)
Neighborhood Methods
鄰接方法允許每個頂點針對其所有的鄰接頂點或邊執(zhí)行某個集合操作。reduceOnEdges()
可以用于計算頂點所有鄰接邊的值的集合壁查。reduceOnNeighbors()
可以用于計算鄰接頂點的值的集合觉至。這些方法采用聯(lián)合和交換集合,并在內(nèi)部利用組合器睡腿,顯著提高了性能语御。鄰接的范圍由EdgeDirection
來確定峻贮,它有三個枚舉值,分別是:IN / OUT / ALL
应闯,其中IN只考慮所有入的鄰接邊和頂點纤控,OUT只考慮所有出的鄰接邊和頂點,而ALL考慮所有的鄰接邊和頂點碉纺。舉個例子嚼黔,如下圖所示,假設(shè)我們想要知道圖中出度最小的邊權(quán)重惜辑。
下列代碼會為每個節(jié)點找到出的邊集合唬涧,然后在集合的基礎(chǔ)上執(zhí)行一個用戶定義的方法
SelectMinWeight()
。
Graph<Long, Long, Double> graph = ...
DataSet<Tuple2<Long, Double>> minWeights = graph.reduceOnEdges(new SelectMinWeight(),
EdgeDirection.OUT);
// user-defined function to select the minimum weight
static final class SelectMinWeight implements ReduceEdgesFunction<Double> {
@Override
public Double reduceEdges(Double firstEdgeValue, Double secondEdgeValue) {
return Math.min(firstEdgeValue, secondEdgeValue);
}
}
結(jié)果入下圖所示:
同樣的盛撑,假設(shè)我們需要知道每個頂點的所有鄰接邊上的權(quán)重的值之和碎节,不考慮方向〉治溃可以用下面的代碼來實現(xiàn):
Graph<Long, Long, Double> graph = ...
DataSet<Tuple2<Long, Long>> verticesWithSum = graph.reduceOnNeighbors(new SumValues(),
EdgeDirection.IN);
// user-defined function to sum the neighbor values
static final class SumValues implements ReduceNeighborsFunction<Long> {
@Override
public Long reduceNeighbors(Long firstNeighbor, Long secondNeighbor) {
return firstNeighbor + secondNeighbor;
}
}
結(jié)果如下圖所示
Graph Validation
Gelly提供了一個簡單的工具用于對輸入的圖進(jìn)行校驗操作狮荔。由于應(yīng)用程序上下文的不同,根據(jù)某些標(biāo)準(zhǔn)介粘,有些圖可能有效殖氏,也可能無效。例如用戶需要校驗圖中是否包含重復(fù)的邊姻采。為了校驗一個圖雅采,可以定義一個定制的GraphValidator并實現(xiàn)它的validate()方法。InvalidVertexIdsValidator
是Gelly預(yù)定義的一個校驗器慨亲,用來校驗邊上所有的頂點ID是否有效婚瓜,即邊上的頂點ID在頂點集合中存在。示例如下:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// create a list of vertices with IDs = {1, 2, 3, 4, 5}
List<Vertex<Long, Long>> vertices = ...
// create a list of edges with IDs = {(1, 2) (1, 3), (2, 4), (5, 6)}
List<Edge<Long, Long>> edges = ...
Graph<Long, Long, Long> graph = Graph.fromCollection(vertices, edges, env);
// will return false: 6 is an invalid ID
graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>());