Graphx的ConnectComponent求解圖中的連通體,在圖中任意兩個(gè)頂點(diǎn)之間存在路徑可達(dá)铃剔,則該圖是連通圖懂拾,對(duì)應(yīng)的極大連通子圖即該算法要求的連通體。
5.1 簡(jiǎn)介
Graphx用圖中頂點(diǎn)的id來標(biāo)識(shí)節(jié)點(diǎn)所屬的連通體液样,同一個(gè)連通體的編號(hào)是采用該聯(lián)通體中最小的節(jié)點(diǎn)id來標(biāo)識(shí)的振亮。
5.2 算法場(chǎng)景
(一)社交網(wǎng)絡(luò)的社區(qū)發(fā)現(xiàn)
(二)測(cè)試機(jī)器的連通性或進(jìn)行網(wǎng)絡(luò)連接的判斷
5.3 算法流程
核心思想: 用圖中節(jié)點(diǎn)的id來表示連通分量巧还,將自身id傳遞給鄰居節(jié)點(diǎn),能夠發(fā)送消息的必然是在同一個(gè)連通分量中坊秸。
這里進(jìn)行消息傳送是從將id節(jié)點(diǎn)的id發(fā)送給有著更大id的節(jié)點(diǎn)麸祷。這樣最后一個(gè)聯(lián)通分支中的所有節(jié)點(diǎn)的分支id將會(huì)是該分支中最小的節(jié)點(diǎn)id。(消息發(fā)送不分方向褒搔,既可以沿著出邊發(fā)送也可以沿著入邊發(fā)送)
計(jì)算步驟:
1. 首先初始化圖阶牍,將圖中頂點(diǎn)id作為頂點(diǎn)的屬性,開始狀態(tài)是每個(gè)節(jié)點(diǎn)單獨(dú)作為一個(gè)連通分量星瘾,分量id是節(jié)點(diǎn)id走孽;
2. 對(duì)于每條邊,如果邊兩端節(jié)點(diǎn)屬性相同(說明兩個(gè)節(jié)點(diǎn)位于同一連通分量中)琳状,不需要發(fā)送消息磕瓷,否則將較小的屬性發(fā)送給較大屬性的節(jié)點(diǎn);
3. 同一個(gè)節(jié)點(diǎn)對(duì)于收到的多個(gè)消息念逞,只接收最小的消息困食;
4. 節(jié)點(diǎn)將自身屬性記錄的id與收到的消息中的id進(jìn)行比較,采用最小的id更新自己的屬性翎承。
不斷迭代上述2陷舅,3,4步审洞。
5.4 源碼分析
object ConnectedComponents {
/**
* 返回圖莱睁,圖中節(jié)點(diǎn)的屬性是當(dāng)前連通分量中最小的頂點(diǎn)id
* */
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED],
maxIterations: Int): Graph[VertexId, ED] = {
require(maxIterations > 0, s"Maximum of iterations must be greater than 0," +
s" but got ${maxIterations}")
// 初始化圖:將圖中頂點(diǎn)的id作為頂點(diǎn)屬性
val ccGraph = graph.mapVertices { case (vid, _) => vid }
// 邊上兩個(gè)頂點(diǎn),將id較小的頂點(diǎn)的屬性發(fā)送給id較大的頂點(diǎn)(使得最終連通分支的id是分支上最小的節(jié)點(diǎn)id)
// 如果邊的兩個(gè)頂點(diǎn)屬性相同芒澜,則說明已經(jīng)在同一個(gè)連通分支仰剿,不需要發(fā)送消息
def sendMessage(edge: EdgeTriplet[VertexId, ED]): Iterator[(VertexId, VertexId)] = {
if (edge.srcAttr < edge.dstAttr) {
Iterator((edge.dstId, edge.srcAttr))
} else if (edge.srcAttr > edge.dstAttr) {
Iterator((edge.srcId, edge.dstAttr))
} else {
Iterator.empty
}
}
// 初始化消息,因?yàn)楣?jié)點(diǎn)在處理消息時(shí)接收最小的id更新自己的屬性痴晦,所以初始時(shí)給每個(gè)節(jié)點(diǎn)發(fā)送一個(gè)超大的值
val initialMessage = Long.MaxValue
val pregelGraph = Pregel(ccGraph, initialMessage,
maxIterations, EdgeDirection.Either)(
vprog = (id, attr, msg) => math.min(attr, msg), // 取當(dāng)前屬性和收到消息的最小者更新屬性
sendMsg = sendMessage,
mergeMsg = (a, b) => math.min(a, b)) // 接收多個(gè)消息中的最小者
ccGraph.unpersist()
pregelGraph
} // end of connectedComponents
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED] = {
run(graph, Int.MaxValue)
}
}