1. 算法簡(jiǎn)介
PageRank 是執(zhí)行多次連接的一個(gè)迭代算法墩崩,因此它是RDD 分區(qū)操作的一個(gè)很好的用例兼搏。算法會(huì)維護(hù)兩個(gè)數(shù)據(jù)集:
- 一個(gè)由(pageID, linkList) 的元素組成,包含每個(gè)頁面的相鄰頁面的列表揩局;
- 另一個(gè)由(pageID, rank) 元素組成,包含每個(gè)頁面的當(dāng)前排序值。
它按如下步驟進(jìn)行計(jì)算:
- 將每個(gè)頁面的排序值初始化為1.0残腌。
- 在每次迭代中,對(duì)頁面p贫导,向其每個(gè)相鄰頁面(有直接鏈接的頁面)發(fā)送一個(gè)值為rank(p)/numNeighbors(p) 的貢獻(xiàn)值抛猫。
- 將每個(gè)頁面的排序值設(shè)為0.15 + 0.85 * contributionsReceived。
最后兩步會(huì)重復(fù)幾個(gè)循環(huán)孩灯,在此過程中闺金,算法會(huì)逐漸收斂于每個(gè)頁面的實(shí)際PageRank 值。在實(shí)際操作中峰档,收斂通常需要大約10 輪迭代败匹。
2. 數(shù)據(jù)模擬
假設(shè)一個(gè)由4個(gè)頁面組成的小團(tuán)體:A寨昙,B,C掀亩,D舔哪。相鄰頁面如下所示:
A:B C
B:A C
C:A B D
D:C
4. 測(cè)試代碼
// Scala版PageRank
import org.apache.spark.HashPartitioner
// 假設(shè)相鄰頁面列表以Spark objectFile的形式存儲(chǔ)
val links = sc.parallelize(List(
("A",List("B","C")),
("B",List("A","C")),
("C",List("A","B","D")),
("D",List("C"))
)).partitionBy(new HashPartitioner(100))
.persist()
// 將每個(gè)頁面的排序值初始化為1.0;由于使用mapValues槽棍,生成的RDD
// 的分區(qū)方式會(huì)和"links"的一樣
var ranks = links.mapValues(v => 1.0)
// 運(yùn)行10輪PageRank迭代
for(i <- 0 until 10) {
val contributions = links.join(ranks).flatMap {
case (pageId, (links, rank)) =>
links.map(dest => (dest, rank / links.size))
}
ranks = contributions.reduceByKey((x, y) => x + y).mapValues(v => 0.15 + 0.85*v)
}
// 寫出最終排名
ranks.sortByKey().collect().foreach(println)
5. 運(yùn)行過程分析
初始的linksRDD和ranksRDD如下所示:
//linksRDD:
Array[(String, List[String])] = Array(
(A,List(B, C)),
(B,List(A, C)),
(C,List(A, B, D)),
(D,List(C)) )
//ranksRDD:
Array[(String, Double)] = Array((A,1.0), (B,1.0), (C,1.0), (D,1.0))
首次迭代后的contributionsRDD和ranksRDD如下所示:
//contributionsRDD:
Array[(String, Double)] = Array(
(A,0.5), (A,0.3333333333333333),
(B,0.5), (B,0.3333333333333333),
(C,0.5), (C,0.5), (C,1.0), (D,0.3333333333333333))
//ranksRDD:
Array[(String, Double)] = Array(
(A,0.8583333333333333),
(B,0.8583333333333333),
(C,1.8499999999999999),
(D,0.43333333333333335) )
驗(yàn)證數(shù)據(jù):
第1次迭代:
PR(A)=0.15 + 0.85 * (1/2 + 1/3) = 0.858333
PR(B)=0.15 + 0.85 * (1/2 + 1/3) = 0.858333
PR(C)=0.15 + 0.85 * (1/2 + 1/2 + 1/1) = 1.85
PR(D)=0.15 + 0.85 * (1/3) = 0.433333
第2次迭代:
PR(A)=0.15 + 0.85 * (0.858333/2 + 1.85/3) = 1.038958191100
PR(B)=0.15 + 0.85 * (0.858333/2 + 1.85/3) = 1.038958191100
PR(C)=0.15 + 0.85 * (0.858333/2 + 0.858333/2 + 0.433333/1) = 1.247916100000
PR(D)=0.15 + 0.85 * (1.85/3) = 0.67416667
第3次迭代:
PR(A)=0.15 + 0.85 * (1.038958191100/2 + 1.247916100000/3) = 0.945133459550833333
PR(B)=0.15 + 0.85 * (1.038958191100/2 + 1.247916100000/3) = 0.945133459550833333
PR(C)=0.15 + 0.85 * (1.038958191100/2 + 1.038958191100/2 + 0.67416667/1) = 1.606156131935
PR(D)=0.15 + 0.85 * (1.247916100000/3) = 0.503576228333333333
首先對(duì)當(dāng)前的ranksRDD和靜態(tài)的linksRDD 進(jìn)行一次join() 操作捉蚤,來獲取每個(gè)頁面ID對(duì)應(yīng)的相鄰頁面列表和當(dāng)前的排序值,然后使用flatMap創(chuàng)建出“contributions”來記錄每個(gè)頁面對(duì)各相鄰頁面的貢獻(xiàn)炼七。然后再把這些貢獻(xiàn)值按照頁面ID(根據(jù)獲得共享的頁面)分別累加起來缆巧,把該頁面的排序值設(shè)為0.15 + 0.85 * contributionsReceived。
雖然代碼本身很簡(jiǎn)單豌拙,這個(gè)示例程序還是做了不少事情來確保RDD 以比較高效的方式進(jìn)行分區(qū)陕悬,以最小化通信開銷:
- 請(qǐng)注意,linksRDD 在每次迭代中都會(huì)和ranks 發(fā)生連接操作按傅。由于links 是一個(gè)靜態(tài)數(shù)據(jù)集墩莫,所以我們?cè)诔绦蛞婚_始的時(shí)候就對(duì)它進(jìn)行了分區(qū)操作,這樣就不需要把它通過網(wǎng)絡(luò)進(jìn)行數(shù)據(jù)混洗了逞敷。實(shí)際上狂秦,linksRDD 的字節(jié)數(shù)一般來說也會(huì)比ranks 大很多,畢竟它包含每個(gè)頁面的相鄰頁面列表(由頁面ID 組成)推捐,而不僅僅是一個(gè)Double 值裂问,因此這一優(yōu)化相比PageRank 的原始實(shí)現(xiàn)(例如普通的MapReduce)節(jié)約了相當(dāng)可觀的網(wǎng)絡(luò)通信開銷。
- 出于同樣的原因牛柒,我們調(diào)用links 的persist() 方法堪簿,將它保留在內(nèi)存中以供每次迭代使用。
- 當(dāng)我們第一次創(chuàng)建ranks 時(shí)皮壁,我們使用mapValues() 而不是map() 來保留父RDD(links)的分區(qū)方式椭更,這樣對(duì)它進(jìn)行的第一次連接操作就會(huì)開銷很小。
- 在循環(huán)體中蛾魄,我們?cè)趓educeByKey() 后使用mapValues()虑瀑;因?yàn)閞educeByKey() 的結(jié)果已經(jīng)是哈希分區(qū)的了,這樣一來滴须,下一次循環(huán)中將映射操作的結(jié)果再次與links 進(jìn)行連接操作時(shí)就會(huì)更加高效舌狗。
注意:為了最大化分區(qū)相關(guān)優(yōu)化的潛在作用,你應(yīng)該在無需改變?cè)氐逆I時(shí)盡量使用mapValues() 或flatMapValues()扔水。