本文發(fā)表于2010 IEEE 2nd International Conference on Cloud Computing Technology and Science (CloudCom 2010)斋射,是Spark系列論文的開(kāi)篇之作。文中主要介紹了當(dāng)時(shí)解決大規(guī)模數(shù)據(jù)的分布式框架存在的局限性宫莱,并針對(duì)這些問(wèn)題提出了Spark的解決方案号阿。下面是針對(duì)該篇論文所做的閱讀筆記婆瓜。
1. 摘要和引言
現(xiàn)有的大規(guī)模數(shù)據(jù)解決方案(主要指MapReduce)針對(duì)如下兩類(lèi)問(wèn)題時(shí)榜苫,存在著局限性:
1)迭代式作業(yè):雖然每次迭代可以都可以表示一個(gè)MR任務(wù)近迁,但是每一次迭代必須從磁盤(pán)加載數(shù)據(jù)代嗤;
2)交互式數(shù)據(jù)分析:SQL雖然也可以轉(zhuǎn)換成MR任務(wù)窄陡,但是每一次MR任務(wù)都要從磁盤(pán)加載數(shù)據(jù)炕淮。
這里都提到MR在解決問(wèn)題的時(shí)候,都是從磁盤(pán)不停的加載數(shù)據(jù)跳夭,而磁盤(pán)IO的代價(jià)是非常高的涂圆,因此MR在解決上述問(wèn)題的時(shí)候效率其實(shí)并不高。
針對(duì)上述問(wèn)題币叹,本文提出了一種新的大規(guī)模數(shù)據(jù)計(jì)算方案Spark润歉,彈性分布式數(shù)據(jù)集(RDD)可以用來(lái)解決迭代式作業(yè)的問(wèn)題;而Spark是基于Scala進(jìn)行構(gòu)建的颈抚,而Scala可以提供交互式的操作踩衩,可以很好的解決交互式的數(shù)據(jù)分析。
2. 編程模型
為了使用Spark贩汉,開(kāi)發(fā)人員需要編寫(xiě)驅(qū)動(dòng)程序(Driver)驱富,它的作用是控制應(yīng)用程序的執(zhí)行流程并在并行的環(huán)境中執(zhí)行一系列的并行操作。Spark主要提供了兩類(lèi)抽象:RDD 和并行算子(parallel operation)匹舞。此外褐鸥,Spark還提供了兩種受限的共享變量。
2.1. 彈性分布式數(shù)據(jù)集(RDD)
RDD的特點(diǎn):
1)跨計(jì)算機(jī)間的可分區(qū)的只讀對(duì)象集合赐稽;
2)分區(qū)丟失之后可以重建(因?yàn)镽DD不需要物化在物理存儲(chǔ)上叫榕,相反可以通過(guò)物理存儲(chǔ)上的數(shù)據(jù)來(lái)構(gòu)建RDD);
3)可以持久化RDD又憨,供后續(xù)計(jì)算來(lái)使用翠霍。
如何創(chuàng)建RDD?
1)從HDFS這樣的分布式文件系統(tǒng)創(chuàng)建蠢莺;
2)通過(guò)并行的讀取Scala集合來(lái)創(chuàng)建寒匙;
3)從另一個(gè)RDD轉(zhuǎn)化而來(lái);
4)改變現(xiàn)有RDD的持久性躏将。
RDD默認(rèn)是惰性并且臨時(shí)的锄弱,但是可以通過(guò)特定的操作來(lái)改變其持久性,如何改變祸憋?
1)Cache action:將數(shù)據(jù)保存在內(nèi)存中会宪,以便后期重用時(shí),可以快速的使用蚯窥。
2)Save action:將數(shù)據(jù)持久化到像HDFS這樣的分布式文件存統(tǒng)上掸鹅,這個(gè)被保存的版本也可以在后期的操作中重用塞帐。
2.2. 并行操作
可以在RDD上執(zhí)行一系列的并行操作,如reduce, collect, foreach等等巍沙。
2.3. 共享變量
Spark提供了兩種共享變量:
1)廣播變量:這種變量只會(huì)被廣播到每一個(gè)Worker一次葵姥;
2)累加器:可以在Worker節(jié)點(diǎn)間共享該變量,可以用來(lái)作為計(jì)數(shù)器句携。
3. 示例
下面會(huì)列舉三個(gè)示例來(lái)顯示如何使用上述特性榔幸。
3.1. 文本搜索
val file = spark.textFile("hdfs://...")
val errs = file.filter(_.contains("ERROR"))
val ones = errs.map(_ => 1)
val count = ones.reduce(_+_)
假設(shè)需要對(duì)存儲(chǔ)在HDFS中的大型日志文件中包含的錯(cuò)誤行進(jìn)行統(tǒng)計(jì)。上面的代碼示例使用Spark的方式實(shí)現(xiàn)了MapReduce操作矮嫉。與MapReduce的操作不同的是削咆,Spark可以保存中間數(shù)據(jù)。如果我們想保存errs數(shù)據(jù)蠢笋,就可以使用如下方式創(chuàng)建一個(gè)緩存的RDD:
val cachedErrs = errs.cache()
這樣如果后續(xù)我們需要讀errs數(shù)據(jù)進(jìn)行更多的操作拨齐,就會(huì)大大的提高執(zhí)行效率了。
3.2. 邏輯回歸
// 從文本文件中讀取點(diǎn)數(shù)據(jù)昨寞,并緩存在內(nèi)存中
val points = spark.textFile(...).map(parsePoint).cache()
// Initialize w to random D-dimensional vector
var w = Vector.random(D)
// Run multiple iterations to update w
for (i <- 1 to ITERATIONS) {
//將梯度設(shè)置成累計(jì)奏黑,可以在所有的worker之間累加該數(shù)據(jù)
val grad = spark.accumulator(new Vector(D))
//scala的for是語(yǔ)法糖,因此如下的代碼會(huì)被轉(zhuǎn)換成points.foreach來(lái)執(zhí)行编矾,是一個(gè)并行操作
for (p <- points) {
// Runs in parallel
val s = (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y
grad += s*p.x
}
w -= grad.value
}
LR是一種迭代算法熟史,因此可將迭代數(shù)據(jù)緩存在內(nèi)存中從而提高執(zhí)行效率(迭代ITERATIONS次,每次points都是從cache到內(nèi)存的數(shù)據(jù)來(lái)讀)窄俏。將梯度設(shè)置成累加器變量蹂匹,這樣其就可以在并行的環(huán)境下進(jìn)行累加了。
3.3. 最小二乘法
//每次計(jì)算的時(shí)候R都是被當(dāng)作參數(shù)傳進(jìn)去凹蜈,所以這里將數(shù)據(jù)集R設(shè)置成廣播變量
val Rb = spark.broadcast(R)
for (i <- 1 to ITERATIONS) {
U = spark.parallelize(0 until u)
.map(j => updateUser(j, Rb, M)).collect()
M = spark.parallelize(0 until m)
.map(j => updateUser(j, Rb, U)).collect()
}
計(jì)算U和M時(shí)限寞,都是通過(guò)并行化的方式進(jìn)行計(jì)算的,而計(jì)算的過(guò)程中每一次循環(huán)仰坦,都需要數(shù)據(jù)集R履植,因此我們可以把數(shù)據(jù)集R設(shè)置成廣播變量,在程序啟動(dòng)之后悄晃,數(shù)據(jù)集R只會(huì)被driver向所有參與計(jì)算的worker節(jié)點(diǎn)發(fā)送一次玫霎。