Spark RDD(Resilient Distributed Datasets)論文
2: Resilient Distributed Datasets(RDDs)
2.2.1 例子 – 監(jiān)控日志數(shù)據(jù)挖掘
6.5 用 spark 構(gòu)建的用戶應(yīng)用
為了能解決程序員能在大規(guī)模的集群中以一種容錯的方式進(jìn)行內(nèi)存計算這個問題, 我們提出了 RDDs 的概念. 當(dāng)前的很多框架對迭代式算法場景與交互性數(shù)據(jù)挖掘場景的處理性能非常差, 這個是 RDDs 的提出的動機. 如果能將數(shù)據(jù)保存在內(nèi)存中, 將會使的上面兩種場景的性能提高一個數(shù)量級. 為了能達(dá)到高效的容錯, RDDs 提供了一種受限制的共享內(nèi)存的方式, 這種方式是基于粗粒度的轉(zhuǎn)換共享狀態(tài)而非細(xì)粒度的更新共享狀態(tài). 然而, 我們分析表明 RDDs 可以表達(dá)出很多種類的計算, 包括目前專門從事迭代任務(wù)的編程計算模型, 比如 Pregel, 當(dāng)然也可以表達(dá)出目前模型表達(dá)不出的計算. 我們通過 Spark 系統(tǒng)來實現(xiàn)了 RDDs, 并且通過各種各樣的用戶應(yīng)用和測試來評估了這個系統(tǒng).
像 MapReduce 和 Dryad 等分布式計算框架已經(jīng)廣泛應(yīng)用于大數(shù)據(jù)集的分析. 這些系統(tǒng)可以讓用戶不用擔(dān)心分布式工作以及容錯, 而是使用一系列的高層次的操作 api 來達(dá)到并行計算的目的.
雖然當(dāng)前的框架提供了大量的對訪問利用計算資源的抽象, 但是它們?nèi)鄙倭藢梅植际絻?nèi)存的抽象.樣使的它們在處理需要在多個計算之間復(fù)用中間結(jié)果的應(yīng)用的時候會非常的不高效. 數(shù)據(jù)的復(fù)用在迭代機器學(xué)習(xí)和圖計算領(lǐng)域(比如 PageRank, K-means 以及線性回歸等算法)是很常見的. 在交互式數(shù)據(jù)挖中, 一個用戶會經(jīng)常對一個相同的數(shù)據(jù)子集進(jìn)行多次不同的特定查詢, 所以數(shù)據(jù)復(fù)用在交互式數(shù)據(jù)挖掘也是很常見的. 然而, 目前的大部分的框架對計算之間的數(shù)據(jù)復(fù)用的處理方式就是將中間數(shù)據(jù)寫到一個靠穩(wěn)定的系統(tǒng)中(比如分布式文件系統(tǒng)), 這樣會由于數(shù)據(jù)的復(fù)制備份, 磁盤的 I/O 以及數(shù)據(jù)的序列化而致應(yīng)用任務(wù)執(zhí)行很費時間.
認(rèn)識到這個問題后, 研究者們已經(jīng)為一些需要中間數(shù)據(jù)復(fù)用的應(yīng)用開發(fā)出了一些特殊的框架.比如Pregel 在做迭代式圖計算的時候會將中間結(jié)果放在內(nèi)存中. HaLoop 也提供了迭代式 MapReduce 接口.然而, 這些框架僅僅支持一些特殊的計算模式(比如循環(huán)一系列的 MapReduce 步驟), 并且它們是隱式的為些計算模式提供數(shù)據(jù)共享. 它們沒有提供更加普遍數(shù)據(jù)復(fù)用的抽象, 比如可以讓用戶加載幾個數(shù)據(jù)集到存中然后對這些內(nèi)存中的數(shù)據(jù)集進(jìn)行專門的查詢.
在這篇論文中, 我們提出了一個全新的抽象, 叫做 RDDs, 它可以高效的處理廣泛的應(yīng)用中涉及到的數(shù)據(jù)用的場景. RDDs 是一個可以容錯且并行的數(shù)據(jù)結(jié)構(gòu), 它可以讓用戶顯式的將中間結(jié)果數(shù)據(jù)集保存在內(nèi)中、控制數(shù)據(jù)集的分區(qū)來達(dá)到數(shù)據(jù)存放處理最優(yōu)以及可以使用豐富的操作 api 來操作數(shù)據(jù)集在設(shè)計 RDDs 的時候, 最大的挑戰(zhàn)是定義一個可以高效容錯的編程接口. 已經(jīng)存在的分布式內(nèi)存抽象系統(tǒng)比如 distributed shared memory毡咏、key-value stores驮宴、databases 以及 Poccolo, 都是提供了基于粒度的更新可變狀態(tài)(比如 table 中的 cells)的接口, 基于這種接口下, 保證容錯的方式無非是將數(shù)據(jù)復(fù)備份到多臺機器或者在多臺機器上記錄更新的日志, 這兩種方式在數(shù)據(jù)密集性的工作任務(wù)中都是非常的時的, 因為需要通過網(wǎng)絡(luò)傳輸在機器節(jié)點間復(fù)制大量的數(shù)據(jù), 寬帶傳輸數(shù)據(jù)的速度遠(yuǎn)遠(yuǎn)比 RAM 內(nèi)存慢, 而這兩種方式會占用大量的存儲空間.
與這些系統(tǒng)相反, RDDs 提供了基于粗粒度轉(zhuǎn)換(比如 map, filter 以及 join)的接口, 這些接口可以對多的數(shù)據(jù)條目應(yīng)用相同的操作.這樣就可以通過記錄來生成某個數(shù)據(jù)集的一系列轉(zhuǎn)換 (就是這個數(shù)據(jù)集 lineage)而不是記錄真實的數(shù)據(jù)來達(dá)到提供高效的容錯機制. 這個 RDD 就有足夠的信息知道它是從哪 RDDs 轉(zhuǎn)換計算來的, 如果一個 RDD 的分區(qū)數(shù)據(jù)丟失掉了, 那么重新計算這個 RDD 所依賴的那個 RDD 對應(yīng)的區(qū)就行了. 因此可以很快且不用通過復(fù)制備份方式來恢復(fù)丟失的數(shù)據(jù).
雖然基于粗粒度的轉(zhuǎn)換一開始看起來受限制, 但是 RDDs 非常適合很多并行計算的應(yīng)用, 因為這些應(yīng)用基都是在大量的數(shù)據(jù)元素上應(yīng)用相同的操作方法. 事實上, 我們分析表明 RDDs 不僅可以高效的表達(dá)出目前括 MapReduce, DryadLINQ, SQL, Pregel 以及 HaLoop 等系統(tǒng)提出的分布式編程模型, 而且還能表達(dá)它們表達(dá)不了的新的應(yīng)用的計算模型, 比如交互型數(shù)據(jù)挖掘. 我們相信, RDDs 解決那些新的框架提出來計算需求的能力將會成為是 RDD 抽象強大的最有力證據(jù).
我們在 Spark 系統(tǒng)中實現(xiàn)了 RDDs, 這個系統(tǒng)已經(jīng)在 UC Berkeley 以及好些個公司中應(yīng)用于研究和生產(chǎn)應(yīng)中.Spark 和 DryadLINQ 類似使用scala語言提供了很方便語言集成編程接口.另外, Spark可以利用 scala 的解釋器來對大數(shù)據(jù)集進(jìn)行交互式的查詢.我們相信 spark 是首個允許使用多種編程語言來進(jìn)行分布式內(nèi)存中交互式數(shù)據(jù)挖掘的系統(tǒng).
我們通過為基準(zhǔn)測試以及用戶應(yīng)用的測試兩個方面來評估了 RDDs 和 spark. 我們分析顯示, Spark 在迭代應(yīng)用中可以比 hadoop 快上 20 倍以上、使的現(xiàn)實中的數(shù)據(jù)分析報表的速度提升了 40 倍以及使的交互式的掃1TB數(shù)據(jù)集的延遲在 5-7 秒. 更重要的是, 為了彰顯 RDDs 的普遍性, 我們基于spark 用相對較小的程序(每個包只有 200 行代碼)實現(xiàn)了 Pregel 和 HaLoop 的編程模型, 包括它們使用的數(shù)據(jù)分布優(yōu)化. 本篇論文以 RDDs(第二節(jié))和 Spark(第三節(jié))的概述開始. 然后在第四節(jié)中討論 了RDD s內(nèi)部的表達(dá)呕缭、在第節(jié)中討論了我們的實現(xiàn)以及在第六節(jié)中討論了實驗結(jié)果. 最后, 我們討論了 RDDs 是怎么樣來表達(dá)現(xiàn)在已存在的幾個系統(tǒng)的編程模型(第七節(jié))堵泽、調(diào)查相關(guān)工作(第八節(jié))以及總結(jié).
2: Resilient Distributed Datasets(RDDs)
這節(jié)主要講述 RDDs 的概要, 首先定義 RDDs(2.1)以及介紹 RDDs 在 spark 中的編程接口(2.2), 然后對 RDDs 和細(xì)粒度共享內(nèi)存抽象進(jìn)行的對比(2.3).最后我們討論了 RDD 模型的限制性.
一個 RDD 是一個只讀, 被分區(qū)的數(shù)據(jù)集.我們可以通過兩種對穩(wěn)定的存儲系統(tǒng)和其他的 RDDs 進(jìn)行操作而創(chuàng)建一個新的 RDDs.為了區(qū)別開 RDDs 的其他操作, 我們稱這些操作為 transformations, 比如 map, filter 以及 join 等都是 transformations 操作.
RDDs 并不要始終被具體化, 一個 RDD 有足夠的信息知道自己是從哪個數(shù)據(jù)集計算而來的(就是所謂的依賴血統(tǒng)), 這是一個非常強大的屬性:其實, 一個程序你能引用一個不能從失敗中重新構(gòu)建的 RDD.
最后, 用戶可以控制 RDDs 的兩個方面:數(shù)據(jù)存儲和分區(qū).對于需要復(fù)用的 RDD, 用戶可以明確的選擇一個數(shù)據(jù)存儲策略(比如內(nèi)存緩存). 他們也可以基于一個元素的 key 來為 RDD 所有的元素在機器節(jié)點間進(jìn)行數(shù)據(jù)分區(qū), 這樣非常利于數(shù)據(jù)分布優(yōu)化, 比如給兩個數(shù)據(jù)集進(jìn)行相同的 hash 分區(qū), 然后進(jìn)行 join, 可以提高 join 的性能.
Spark 和 DryadLINQ 和 FlumeJava 一樣通過集成編程語言 api 來暴露 RDDs, 這樣的話, 每一個數(shù)據(jù)集就代表一個對象, 我們可以調(diào)用這個對象中的方法來操作這個對象.
編程者可以通過對穩(wěn)定存儲的數(shù)據(jù)進(jìn)行轉(zhuǎn)換操作(即 transformations, 比如 map 和 filter 等)來得到一個或者多個 RDDs. 然后可以對這些 RDDs 進(jìn)行 actions 操作, 這些操作可以是得到應(yīng)用的結(jié)果值, 也可以是將結(jié)果數(shù)據(jù)寫入到存儲系統(tǒng)中, actions 包括: count(表示返回這個數(shù)據(jù)集的元素的個數(shù))、collect(表示返回數(shù)據(jù)集的所有元素)以及 save(表示將輸出結(jié)果寫入到存儲系統(tǒng)中). 和 DryadLINQ 一樣, spark 在定義 RDDs 的時候并不會真正的計算, 而是要等到對這個 RDDs 觸發(fā)了 actions 操作才會真正的觸發(fā)計算, 這個稱之為 RDDs 的 lazy 特性, 所以我們可以先對 transformations 進(jìn)行組裝一系列的 pipelines, 然后再計算.
另外, 編程者可以通過調(diào)用 RDDs 的 persist 方法來緩存后續(xù)需要復(fù)用的 RDDs. Spark 默認(rèn)是將緩存數(shù)據(jù)放在內(nèi)存中, 但是如果內(nèi)存不足的話則會寫入到磁盤中. 用戶可以通過 persist 的參數(shù)來調(diào)整緩存策略, 比如只將數(shù)據(jù)存儲在磁盤中或者復(fù)制備份數(shù)據(jù)到多臺機器. 最后, 用戶可以為每一個 RDDs 的緩存設(shè)置優(yōu)先級, 以達(dá)到哪個在內(nèi)存中的 RDDs 應(yīng)該首先寫道磁盤中
2.2.1 例子 – 監(jiān)控日志數(shù)據(jù)挖掘
假設(shè)一個 web 服務(wù)正發(fā)生了大量的錯誤, 然后運維人員想從存儲在 hdfs 中的幾 TB 的日志中找出錯誤的原因. 運維人員可以通過 spark 將日志中的錯誤信息加載到分布式的內(nèi)存中, 然后對這些內(nèi)存中的數(shù)據(jù)進(jìn)行查詢. 她首先需要寫下面的 scala 代碼:
line = spark.textFile("hdfs://..")
errors = lines.filter(_.startsWith("ERROR"))
errors.persist()
第一行表示從一個 HDFS 文件(許多行的文件數(shù)據(jù)集)上定義了一個 RDD, 第二行表示基于前面定義的 RDD 進(jìn)行過濾數(shù)據(jù).第三行將過濾后的 RDD 結(jié)果存儲在內(nèi)存中, 以達(dá)到多個對這個共享 RDD 的查詢. 需要注意的事, filter 的參數(shù)是 scala 語法中的閉包.
到目前為止, 集群上還沒有真正的觸發(fā)計算.然而, 用戶可以對RDD進(jìn)行action操作, 比如對錯誤信息的計數(shù):
errors.count()
用戶也可以繼續(xù)對 RDD 進(jìn)行 transformations 操作, 然后計算其結(jié)果, 比如:
//對錯誤中含有 ”MySQL” 單詞的數(shù)據(jù)進(jìn)行計數(shù)
errors.filters(_.contains("MySQL")).count()
//返回錯誤信息中含有 "HDFS" 字樣的信息中的時間字段的值(假設(shè)每行數(shù)據(jù)的字段是以 tab 來切分的, 時間字段是第 3 個字段)
errors.filter(_.contains("HDFS"))
.map(_.split("\t")(3))
.collect()
在對 errors 第一次做 action 操作的后, spark 會將 errors 的所有分區(qū)的數(shù)據(jù)存儲在內(nèi)存中, 這樣后面對 errors 的計算速度會有很大的提升.需要注意的是, 像 lines 這種基礎(chǔ)數(shù)據(jù)的 RDD 是不會存儲在內(nèi)存中的.因為包含錯誤信息的數(shù)據(jù)可能只是整個日志數(shù)據(jù)的一小部分, 所以將包含錯誤數(shù)據(jù)的日志放在內(nèi)存中是比較合理的.
最后, 為了說明我們的模型是如何達(dá)到容錯的, 我們在圖一種展示了第三個查詢的血緣關(guān)系圖(lineage graph).在這個查詢種, 我們以對 lines 進(jìn)行過濾后的 errors 開始, 然后在對 errors 進(jìn)行了 filter 和 map 操作, 最后做了 action 操作即 collect. Spark 會最后兩個 transformations 組成一個 pipeline, 然后將這個 pipeline 分解成一系列的 task, 最后將這些 task 調(diào)度到含有 errors 緩存數(shù)據(jù)的機器上進(jìn)行執(zhí)行. 此外, 如果 errors 的一個分區(qū)的數(shù)據(jù)丟失了, spark 會對 lines 的相對應(yīng)的分區(qū)應(yīng)用 filter 函數(shù)來重新創(chuàng)建 errors 這個分區(qū)的數(shù)據(jù)
圖一: 我們例子中第三個查詢的血緣關(guān)系圖, 其中方框表示 RDDs, 箭頭表示轉(zhuǎn)換
為了理解作為分布式內(nèi)存抽象的 RDDs 的好處, 我們在表一種用 RDDs 和分布式共享內(nèi)存系統(tǒng)(Distributed shared memory 即 DSM)進(jìn)行了對比. 在所有的 DSM 系統(tǒng)中, 應(yīng)用從一個全局的地址空間中的任意位置中讀寫數(shù)據(jù). 需要注意的是, 依據(jù)這個定義, 我們所說的 DSM 系統(tǒng)不僅包含了傳統(tǒng)的共享內(nèi)存系統(tǒng), 還包含了對共享狀態(tài)的細(xì)粒度寫操作的其他系統(tǒng)(比如 Piccolo), 以及分布式數(shù)據(jù)庫. DSM 是一個很普遍的抽象, 但是這個普遍性使得它在商用集群中實現(xiàn)高效且容錯的系統(tǒng)比較困難.
Aspect(概念)RDDsDistribute shared memory(分布式共享內(nèi)存)
Reads粗粒度或者細(xì)粒度細(xì)粒度
Writes粗粒度細(xì)粒度
數(shù)據(jù)一致性不重要的(因為RDD是不可變的)取決于app 或者 runtime
容錯利用lineage達(dá)到細(xì)粒度且低延遲的容錯需要應(yīng)用checkpoints(就是需要寫磁盤)
并且需要程序回滾
計算慢的任務(wù)可以利用備份的任務(wù)來解決很難做到
計算數(shù)據(jù)的位置自動的機遇數(shù)據(jù)本地性取決于app (runtime是以透明為目標(biāo)的)
內(nèi)存不足時的行為和已經(jīng)存在的數(shù)據(jù)流處理系統(tǒng)一樣, 寫磁盤非常糟糕的性能(需要內(nèi)存的交換恢总?)
表一: RDDs 和 Distributed shared memory 對比
RDDs 只能通過粗粒度的轉(zhuǎn)換被創(chuàng)建(或者被寫) , 然而 DSM 允許對每一個內(nèi)存位置進(jìn)行讀寫, 這個是 RDDs 和 DSM 最主要的區(qū)別. 這樣使都 RDDs在 應(yīng)用中大量寫數(shù)據(jù)受到了限制, 但是可以使的容錯變的更加高效. 特別是, RDDs 不需要發(fā)生非常耗時的 checkpoint 操作, 因為它可以根據(jù) lineage 進(jìn)行恢復(fù)數(shù)據(jù) . 而且, 只有丟掉了數(shù)據(jù)的分區(qū)才會需要重新計算, 并不需要回滾整個程序, 并且這些重新計算的任務(wù)是在多臺機器上并行運算的.
RDDs 的第二個好處是:它不變的特性使的它可以和 MapReduce 一樣來運行執(zhí)行很慢任務(wù)的備份任務(wù)來達(dá)到緩解計算很慢的節(jié)點的問題. 在 DSM 中, 備份任務(wù)是很難實現(xiàn)的, 因為原始任務(wù)和備份任務(wù)或同時更新訪問同一個內(nèi)存地址和接口.
最后, RDDs 比 DSM 多提供了兩個好處. 第一, 在對 RDDs 進(jìn)行大量寫操作的過程中, 我們可以根據(jù)數(shù)據(jù)的本地性來調(diào)度 task 以提高性能. 第二, 如果在 scan-base 的操作中, 且這個時候內(nèi)存不足以存儲這個 RDDs, 那么 RDDs 可以慢慢的從內(nèi)存中清理掉. 在內(nèi)存中存儲不下的分區(qū)數(shù)據(jù)會被寫到磁盤中, 且提供了和現(xiàn)有并行數(shù)據(jù)處理系統(tǒng)相同的性能保證.
經(jīng)過上面的討論介紹, 我們知道 RDDs 非常適合將相同操作應(yīng)用在整個數(shù)據(jù)集的所有的元素上的批處理應(yīng)用. 在這些場景下, RDDs 可以利用血緣關(guān)系圖來高效的記住每一個 transformations 的步驟, 并且不需要記錄大量的數(shù)據(jù)就可以恢復(fù)丟失的分區(qū)數(shù)據(jù). RDDs 不太適合用于需要異步且細(xì)粒度的更新共享狀態(tài)的應(yīng)用, 比如一個 web 應(yīng)用或者數(shù)據(jù)遞增的 web 爬蟲應(yīng)用的存儲系統(tǒng). 對于這些應(yīng)用, 使用傳統(tǒng)的紀(jì)錄更新日志以及對數(shù)據(jù)進(jìn)行 checkpoint 會更加高效. 比如使用數(shù)據(jù)庫迎罗、RAMCloud、Percolator 以及 Piccolo. 我們的目標(biāo)是給批量分析提供一個高效的編程模型, 對于這些異步的應(yīng)用需要其他的特殊系統(tǒng)來實現(xiàn).
Spark 使用 scala 語言實現(xiàn)了抽象的 RDD, scala 是建立在 java VM 上的靜態(tài)類型函數(shù)式編程語言. 我們選擇 scala 是因為它結(jié)合了簡潔(很方便進(jìn)行交互式使用)與高效(由于它的靜態(tài)類型). 然而, 并不是說 RDD 的抽象需要函數(shù)式語言來實現(xiàn).
開發(fā)員需要寫連接集群中的 workers 的 driver 程序來使用 spark, 就比如圖 2 展示的. Driver 端程序定義了一系列的 RDDs 并且調(diào)用了 RDD 的 action 操作. Driver 的程序同時也會跟蹤 RDDs 之間的的血緣關(guān)系. workers 是可以將 RDD 分區(qū)數(shù)據(jù)存儲在內(nèi)存中的長期存活的進(jìn)程.
圖二: 這個是 Spark 運行時的圖, 用戶寫的 driver 端程序啟動多個 workers, 這些 workers 可以從分布書的存儲系統(tǒng)中讀取數(shù)據(jù)塊并且可以將計算出來的 RDD 分區(qū)數(shù)據(jù)存放在內(nèi)存中.
在 2.2.1 小節(jié)中的日志挖掘例子中, 我們提到, 用戶提供給 RDD 操作比如 map 以參數(shù)作為這個操作的閉包(說白了就是函數(shù)). Scala 將這些函數(shù)看作一個 java 對象, 這些對象是可以序列化的, 并且可以通過網(wǎng)絡(luò)傳輸傳輸?shù)狡渌臋C器節(jié)點上的. Scala 將函數(shù)中的變量看作一個對象中的變量. 比如, 我們可以寫一段這樣的代碼: var x = 5; rdd.map(_ + 5)來達(dá)到給這個 RDD 每一個元素加上 5 的目的.
RDDs 是被一元素類型參數(shù)化的靜態(tài)類型對象, 比如, RDD[Int] 表示一個類型為整數(shù)的 RDD. 然而, 我們很多例子中的 RDD 都會省去這個類型, 這個是因為 scala 支持類型推斷.
雖然我們用 scala 實現(xiàn) RDD 的方法很簡單, 但是我們需要處理用反射實現(xiàn)的閉包對象相關(guān)的工作, 我們還需要做很多的工作使的 spark 可以用 scala 的解釋器, 這個我們在 5.2 小節(jié)中會討論到. 盡管如此, 我們是不需要修改 scala 的編譯器的.
表 2 中列舉了 Spark 中 RDD 常用的 transformations 和 actions 操作, 且描述了每一個方法的簽名以及類型.我們需要記住 transformations 是用來定義一個新的 RDD 的 lazy 操作, 而actions 是真正觸發(fā)一個能返回結(jié)果或者將結(jié)果寫到文件系統(tǒng)中的計算.
表二: Spark 中 RDD 常用的 transformations 和 actions 操作.Seq[T] 表示元素類型為 T 的一個列表.
需要注意的是, 一些操作比如 join 只適合用于 key-value 類型的 RDDs. 我們?nèi)〉暮瘮?shù)的名稱和 scala 或者其他函數(shù)式編程語言的函數(shù)名是一致的. 比如, map 是一個 one-to-one 的映射操作, 而 flatMap 的每一個輸入值會對應(yīng)一個或者更多的輸出值(有點像 MapReduce 中的 map)
除了這些操作, 用戶可以通過 persist 操作來請求緩存 RDD. 另外, 用戶可以拿到被 Partitioner 分區(qū)后的分區(qū)數(shù)以及根據(jù) Partitioner 對另一個 dataset 進(jìn)行分區(qū). 像 groupByKey片仿、reduceByKey 以及 sort 等操作都是經(jīng)過了hash 或者 rang 分區(qū)后的 RDD.
我們用兩個迭代式的應(yīng)用:線性回歸和 PageRank 來補充 2.2.1 提到的數(shù)據(jù)挖掘的例子. 稍后也會展示下如何控制 RDD 的分區(qū)以達(dá)到提升性能的目的.
很多的機器學(xué)習(xí)算法一般都是迭代式的計算, 因為它們需要跑迭代的優(yōu)化程序(比如梯度下降)來達(dá)到最大化功能. 他們將數(shù)據(jù)存放在內(nèi)存中以達(dá)到很快的速度.
作為一個例子, 下面的程序?qū)崿F(xiàn)了線性回歸, 一個能找到最佳區(qū)分兩種點集(垃圾郵件以及非垃圾郵件)的超平面 w 的常用的分類算法. 這個算法用了梯度下降的方法:一個隨機的值作為 w 的初始值, 每次迭代都會將含有 w 的方法應(yīng)用到每一個數(shù)據(jù)點然后累加得到梯度值, 然后將 w 往改善結(jié)果的方向移動.
一開始我們定義一個叫 points 的 RDD, 這個 RDD 從一個文本文件中經(jīng)過 map 將每一行轉(zhuǎn)換為 Point 對象得到. 然后我們重復(fù)對 points 進(jìn)行 map 和 reduce 操作計算出每一步的梯度值. 在迭代之間我們將 points 存放在內(nèi)存中可以使的性能提高 20 倍, 我們將會在 6.1 節(jié)中討論.
在 PageRank 中數(shù)據(jù)共享更加復(fù)雜. 如果一個文檔引用另一個文檔, 那被引用的文檔的排名值(rank)需要加上引用的文檔發(fā)送過來的貢獻(xiàn)值, 當(dāng)然這個過程是個迭代的過程. 在每一次迭代中, 每一個文檔都會發(fā)送 r/n 的貢獻(xiàn)值給它的鄰居, 其中 r 表示這個文檔的排名值, n 表示這個文檔的鄰居數(shù)量. 然后更新文檔的排名值為, 這個表達(dá)式值表示這個文檔收到的貢獻(xiàn)值, N 表示所有的文檔的數(shù)量, 我們可以用如下的 spark 代碼來表達(dá) PageRank:
其中 links 表示( URL , outlinks )鍵值對. 這個程序的 RDD 的血緣關(guān)系圖如圖三. 在每一次迭代中我們都是根據(jù)上一次迭代的 contribs 和 ranks 以及原始不變的 links 數(shù)據(jù)集來創(chuàng)建一個新的 ranks 數(shù)據(jù)集. 隨著迭代次數(shù)的變多這張圖會變的越長, 這個是這個圖比較有意思的特點. 如果這個 job 的迭代次數(shù)很多的話, 那么備份一些版本的 ranks 來達(dá)到減少從錯誤中恢復(fù)出來的時間是很有必要的, 用戶可以調(diào)用標(biāo)記為 RELIABLE 的 persist 函數(shù)來達(dá)到這個目的. 需要注意的是, links 是不需要備份的, 因為它的分區(qū)數(shù)據(jù)可以快速的從重新計算輸入文件中對應(yīng)的數(shù)據(jù)塊而得到, 這個數(shù)據(jù)集一般會比 ranks 數(shù)據(jù)集大上很多倍, 因為每一個文檔會有很多的連接但只會有一個排名值, 所以利用 RDD 的血緣關(guān)系來恢復(fù)數(shù)據(jù)肯定比 checkpoint 內(nèi)存中的數(shù)據(jù)快很多(因為數(shù)據(jù)量太大).
最后, 我們可以控制 RDDs 的分區(qū)方式來優(yōu)化 PageRank 中的節(jié)點通訊. 如果我們事先為 links 指定一個分區(qū)方式(比如, 根據(jù) link 的 url 來 hash 分區(qū), 就是將相同的 url 發(fā)送到同一個節(jié)點中), 然后我們對 ranks 進(jìn)行相同的分區(qū)方式, 這樣就可以保證 links 和 ranks 之間的 join 不需要機器節(jié)點之間的通訊(因為相同的 url 都在同一個機器節(jié)點了, 那么相對應(yīng)的 rank 和 link 肯定也是在同一個機器節(jié)點了). 我們也可以自定義分區(qū)器來實現(xiàn)將一組頁面 url 放到一起(比如按照 url 的 domain 進(jìn)行分區(qū)). 以上兩種優(yōu)化方式都可以通過在定義 links 的時候調(diào)用 partitionBy 來實現(xiàn):
在調(diào)用了 partitionBy 后, links 和 ranks 之間的 join 操作會自動的在 link 所在的機器進(jìn)行每一個 URL 的貢獻(xiàn)值的聚合計算, 然后在相同的機器計算新的排名值, 然后計算出來的新的 ranks 在相同的機器和 links 進(jìn)行 join. 這種在迭代之間進(jìn)行數(shù)據(jù)一致分區(qū)是像 Pregel 這種框架中的主要的優(yōu)化計算方式. RDDs 使的用戶可以直接自己來實現(xiàn)這種優(yōu)化機制.
在抽象 RDDs 的過程中, 怎么表達(dá)出 RDDs 能跟蹤很多的 transformations 操作之間血緣關(guān)系是一個比較大的挑戰(zhàn). 理想的情況下, 一個實現(xiàn) RDDs 系統(tǒng)應(yīng)該是盡可能多的提供 transformations 操作(比如表二中的操作), 并且可以讓用戶以任意的方式來組合這些 transformations 操作. 我們提出了基于圖的 RDDs 展現(xiàn)方式來達(dá)到以上的目的. 我們在 spark 中利用這種展現(xiàn)方式達(dá)到了在不需要給調(diào)度系統(tǒng)為每一個 transformation 操作增加任何的特殊邏輯就可以支持大量的 transformations 操作, 這樣極大的簡化了我們的系統(tǒng)設(shè)計.
概括的說, 以下五個信息可以表達(dá) RDDs: 一個分區(qū)列表, 每一個分區(qū)就是數(shù)據(jù)集的原子塊. 一個父親 RDDs 的依賴列表. 一個計算父親的數(shù)據(jù)集的函數(shù). 分區(qū)模式的元數(shù)據(jù)信息以及數(shù)據(jù)存儲信息. 比如, 基于一個 HDFS 文件創(chuàng)建出來的的 RDD 中文件的每一個數(shù)據(jù)塊就是一個分區(qū), 并且這個 RDD 知道每一個數(shù)據(jù)塊存儲在哪些機器上, 同時, 在這個 RDD 上進(jìn)行 map 操作后的結(jié)果有相同的分區(qū)數(shù), 當(dāng)計算元素的時候, 將 map 函數(shù)應(yīng)用到父親 RDD 數(shù)據(jù)中的. 我們在表三總結(jié)了這些接口:
操作接口含義
partitions()返回一個分區(qū)對象的列表
preferredLocations(p)分區(qū)p數(shù)據(jù)存儲在哪些機器節(jié)點中
dependencies()返回一個依賴列表
iterator(p, parentIters)根據(jù)父親分區(qū)的數(shù)據(jù)輸入計算分區(qū)p的所有數(shù)據(jù)
partitioner()返回這個RDD是hash還是range分區(qū)的元數(shù)據(jù)信息
表三: Spark 中表達(dá) RDDs 的接口
在設(shè)計如何表達(dá) RDDs 之間依賴的接口是一個非常有意思的問題. 我們發(fā)現(xiàn)將依賴定義成兩種類型就足夠了: 窄依賴, 表示父親 RDDs 的一個分區(qū)最多被子 RDDs 一個分區(qū)所依賴. 寬依賴, 表示父親 RDDs 的一個分區(qū)可以被子 RDDs 的多個子分區(qū)所依賴. 比如, map 操作是一個窄依賴, join 操作是一個寬依賴操作(除非父親 RDDs 已經(jīng)被 hash 分區(qū)過), 圖四顯示了其他的例子:
圖四:窄依賴和寬依賴的例子.每一個方框表示一個 RDD , 帶有顏色的矩形表示分區(qū)
以下兩個原因使的這種區(qū)別很有用, 第一, 窄依賴可以使得在集群中一個機器節(jié)點的執(zhí)行流計算所有父親的分區(qū)數(shù)據(jù), 比如, 我們可以將每一個元素應(yīng)用了 map 操作后緊接著應(yīng)用 filter 操作, 與此相反, 寬依賴需要父親 RDDs 的所有分區(qū)數(shù)據(jù)準(zhǔn)備好并且利用類似于 MapReduce 的操作將數(shù)據(jù)在不同的節(jié)點之間進(jìn)行重新洗牌和網(wǎng)絡(luò)傳輸. 第二, 窄依賴從一個失敗節(jié)點中恢復(fù)是非常高效的, 因為只需要重新計算相對應(yīng)的父親的分區(qū)數(shù)據(jù)就可以, 而且這個重新計算是在不同的節(jié)點進(jìn)行并行重計算的, 與此相反, 在一個含有寬依賴的血緣關(guān)系 RDDs 圖中, 一個節(jié)點的失敗可能導(dǎo)致一些分區(qū)數(shù)據(jù)的丟失, 但是我們需要重新計算父 RDD 的所有分區(qū)的數(shù)據(jù).
Spark 中的這些 RDDs 的通用接口使的實現(xiàn)很多 transformations 操作的時候只花了少于 20 行的代碼. 實際上, 新的 spark 用戶可以在不了解調(diào)度系統(tǒng)的細(xì)節(jié)之上來實現(xiàn)新的 transformations 操作(比如, 采樣和各種 join 操作). 下面簡要的概括了一些 RDD 的實現(xiàn):
HDFS files: 抽樣的輸入 RDDs 是 HDFS 中的文件.對于這些 RDDs, partitions 返回文件中每一個數(shù)據(jù)塊對應(yīng)的一個分區(qū)信息(數(shù)據(jù)塊的位置信息存儲在 Partition 對象中), preferredLocations 返回每一個數(shù)據(jù)塊所在的機器節(jié)點信息, 最后 iterator 負(fù)責(zé)數(shù)據(jù)塊的讀取操作.
map: 對任意的 RDDs 調(diào)用 map 操作將會返回一個 MappedRDD 對象.這個對象含有和其父親 RDDs 相同的分區(qū)信息和數(shù)據(jù)存儲節(jié)點信息, 但是在 iterator 中對父親的所有輸出數(shù)據(jù)記錄應(yīng)用傳給 map 的函數(shù).
union: 對兩個 RDDs 調(diào)用 union 操作將會返回一個新的 RDD , 這個 RDD 的分區(qū)數(shù)是他所有父親 RDDs 的所有分區(qū)數(shù)的總數(shù).每一個子分區(qū)通過相對應(yīng)的窄依賴的父親分區(qū)計算得到.
sample: sampling 和 mapping 類似, 除了 sample RDD 中為每一個分區(qū)存儲了一個隨機數(shù), 作為從父親分區(qū)數(shù)據(jù)中抽樣的種子.
join: 對兩個 RDDs 進(jìn)行 join 操作, 可能導(dǎo)致兩個窄依賴(如果兩個 RDDs 都是事先經(jīng)過相同的 hash/range 分區(qū)器進(jìn)行分區(qū)), 或者導(dǎo)致兩個寬依賴, 或者一個窄依賴一個寬依賴(一個父親 RDD 經(jīng)過分區(qū)而另一個沒有分區(qū)).在上面所有的惡場景中, join 之后的輸出 RDD 會有一個 partitioner (從父親 RDD 中繼承過來的或者是一個默認(rèn)的 hash partitioner).
我們用了 14000 行 scala 代碼實現(xiàn)了 spark. Spark 系統(tǒng)跑在集群管理者 mesos 上, 這樣可以使的它和其他的應(yīng)用比如 hadoop 纹安、 MPI 等共享資源, 每一個 spark 程序都是由它的 driver 和 workers 組成, 這些 driver 和 workers 都是以一個 mesos 應(yīng)用運行在 mesos 上的, mesos 可以管理這些應(yīng)用之間的資源共享問題.
Spark 可以利用已經(jīng)存在的 hadoop 的 api 組件讀取任何的 hadoop 的輸入數(shù)據(jù)源(比如: HDFS 和 Hbase 等), 這個程序 api 是運行在沒有更改的 scala 版本上.
我們會簡要的概括下幾個比較有意思的技術(shù)點:我們的 job 調(diào)度器( 5.1 節(jié)), 可以用于交互的 spark 解釋器( 5.2 節(jié)), 內(nèi)存管理( 5.3 節(jié))以及對 checkpointing 的支持( 5.4 節(jié)).
spark 的調(diào)度器依賴我們在第 4 章中討論的 RDDs 的表達(dá).
從總體上看, 我們的調(diào)度系統(tǒng)有點和 Dryad 相似, 但是它還考慮了被存儲的 RDDs 的哪些分區(qū)還在內(nèi)存中.當(dāng)一個用戶對某個 RDD 調(diào)用了 action 操作(比如 count 或者 save )的時候調(diào)度器會檢查這個 RDD 的血緣關(guān)系圖, 然后根據(jù)這個血緣關(guān)系圖構(gòu)建一個含有 stages 的有向無環(huán)圖( DAG ), 最后按照步驟執(zhí)行這個 DAG 中的 stages , 如圖 5 的說明.每一個 stage 包含了盡可能多的帶有窄依賴的 transformations 操作. 這個 stage 的劃分是根據(jù)需要 shuffle 操作的寬依賴或者任何可以切斷對父親 RDD 計算的某個操作(因為這些父親 RDD 的分區(qū)已經(jīng)計算過了). 然后調(diào)度器可以調(diào)度啟動 tasks 來執(zhí)行沒有父親 stage 的 stage (或者父親 stage 已經(jīng)計算好了的 stage ),一直到計算完我們的最后的目標(biāo) RDD .
圖五: 怎么計算 spark job stage 的例子.實現(xiàn)的方框表示 RDDs ,帶有顏色的方形表示分區(qū), 黑色的是表示這個分區(qū)的數(shù)據(jù)存儲在內(nèi)存中, 對 RDD G 調(diào)用 action 操作, 我們根據(jù)寬依賴生成很多 stages , 且將窄依賴的 transformations 操作放在 stage 中.在這個場景中, stage 1 的輸出結(jié)果已經(jīng)在內(nèi)存中, 所以我們開始運行 stage 2 , 然后是 stage 3.
我們調(diào)度器在分配 tasks 的時候是采用延遲調(diào)度來達(dá)到數(shù)據(jù)本地性的目的(說白了, 就是數(shù)據(jù)在哪里, 計算就在哪里). 如果某個分區(qū)的數(shù)據(jù)在某個節(jié)點上的內(nèi)存中, 那么將這個分區(qū)的計算發(fā)送到這個機器節(jié)點中. 如果某個 RDD 為它的某個分區(qū)提供了這個數(shù)據(jù)存儲的位置節(jié)點, 則將這個分區(qū)的計算發(fā)送到這個節(jié)點上.
對于寬依賴(比如 shuffle 依賴), 我們將中間數(shù)據(jù)寫入到節(jié)點的磁盤中以利于從錯誤中恢復(fù), 這個和 MapReduce 將 map 后的結(jié)果寫入到磁盤中是很相似的.
只要一個任務(wù)所在的 stage 的父親 stage 還是有效的話, 那么當(dāng)這個 task 失敗的時候, 我們就可以在其他的機器節(jié)點中重新跑這個任務(wù). 如果一些 stages 變的無效的話(比如因為一個 shuffle 過程中 map 端的一個輸出結(jié)果丟失了), 我們需要重新并行提交沒有父親 stage 的 stage (或者父親 stage 已經(jīng)計算好了的 stage )的計算任務(wù). 雖然備份 RDD 的血緣關(guān)系圖示比較容易的, 但是我們還不能容忍調(diào)度器調(diào)度失敗的場景.
雖然目前 spark 中所有的計算都是響應(yīng) driver 程序中調(diào)用的 action 操作, 但是我們也是需要嘗試在集群中調(diào)用 lookup 操作, 這種操作是根據(jù) key 來隨機訪問已經(jīng) hash 分區(qū)過的 RDD 所有元素以獲取相應(yīng)的 value. 在這種場景中, 如果一個分區(qū)沒有計算的話, 那么 task 需要將這個信息告訴調(diào)度器.
scala 和 Ruby 以及 Python 一樣包含了一個交互型的 shell 腳本工具. 考慮到利用內(nèi)存數(shù)據(jù)可以獲得低延遲的特性, 我們想讓用戶通過解釋器來交互性的運行 spark , 從而達(dá)到查詢大數(shù)據(jù)集的目的.
Scala 解釋器通常是將用戶輸入的每一行代碼編譯成一個類, 然后將這個類加載到 JVM 中, 然后調(diào)用這個類的方法. 這個類中包含了一個單例對象, 這個單例對象包含了用戶輸入一行代碼中的變量或者函數(shù), 還包含了一個運行用戶輸入那行代碼的初始化方法. 比如, 用戶輸入 var x = 5 , 然后再輸入 println(x), scala 解釋器定義個包含了 x 的叫做 Line 1 的類, 然后將第二行代碼編譯成 println(Line 1.getInstance(). x ).
我們對 spark 中的解釋器做了如下兩個改變:
Class shipping: 為了讓 worker 節(jié)點能拿到用戶輸入的每一行代碼編譯成的 class 的二進(jìn)制代碼, 我們使的解釋器為這些 classes 的二進(jìn)制代碼提供 HTTP 服務(wù).
修改了代碼生成:正常情況下, 我們通過訪問對應(yīng)的類的靜態(tài)方法來達(dá)到訪問將用戶輸入每一行代碼編譯成的單例對象.這個以為著, 當(dāng)我們將一個含有在前面行中定義的變量(比如上面例子中的 Line 1.x )的閉包序列化發(fā)送到 worker 節(jié)點的時候, java 是不會通過對象圖來跟蹤含有 x 的實力 Line 1 的, 這樣的話 worker 節(jié)點將收不到變量 x.我們修改了代碼生成邏輯來達(dá)到能直接引用每一行代碼生成的實例.
圖六顯示了經(jīng)過我們的改變后, 解釋器是如何將用戶輸入的一系列的代碼轉(zhuǎn)換成 java 對象.
圖六:顯示 spark 解釋器是如何將用戶輸入的代碼轉(zhuǎn)換成 java 對象的例子
我們發(fā)現(xiàn) spark 解釋器在處理我們研究中的大量已經(jīng)獲取到的痕跡數(shù)據(jù)以及探索存儲在 hdfs 中的數(shù)據(jù)集時是非常有用的.我們正在打算用這個來實現(xiàn)更高層面的交互查詢語言, 比如 SQL.
Spark 在持久化 RDDs 的時候提供了 3 種存儲選:存在內(nèi)存中的非序列化的 java 對象、存在內(nèi)存中的序列化的數(shù)據(jù)以及存儲在磁盤中. 第一種選擇的性能是最好的, 因為 java VM 可以很快的訪問 RDD 的每一個元素. 第二種選擇是在內(nèi)存有限的情況下, 使的用戶可以以很低的性能代價而選擇的比 java 對象圖更加高效的內(nèi)存存儲的方式. 如果內(nèi)存完全不夠存儲的下很大的 RDDs , 而且計算這個 RDD 又很費時的, 那么選擇第三種方式.
為了管理有限的內(nèi)存資源, 我們在 RDDs 的層面上采用 LRU (最近最少使用)回收策略. 當(dāng)一個新的 RDD 分區(qū)被計算但是沒有足夠的內(nèi)存空間來存儲這個分區(qū)的數(shù)據(jù)的時候, 我們回收掉最近很少使用的 RDD 的分區(qū)數(shù)據(jù)的占用內(nèi)存, 如果這個 RDD 和這個新的計算分區(qū)的 RDD 時同一個 RDD 的時候, 我們則不對這個分區(qū)數(shù)據(jù)占用的內(nèi)存做回收. 在這種情況下, 我們將相同的 RDD 的老分區(qū)的數(shù)據(jù)保存在內(nèi)存中是為了不讓老是重新計算這些分區(qū)的數(shù)據(jù), 這點事非常重要的, 因為很多操作都是對整個 RDD 的所有的 tasks 進(jìn)行計算的, 所以非常有必要將后續(xù)要用到的數(shù)據(jù)保存在內(nèi)存中.到目前為止, 我們發(fā)現(xiàn)這種默認(rèn)的機制在所有的應(yīng)用中工作的很好, 但是我們還是將持久每一個 RDD 數(shù)據(jù)的策略的控制權(quán)交給用戶.
最后, 在一個集群中的每一個 spark 實例的內(nèi)存空間都是分開的, 我們以后打算通過統(tǒng)一內(nèi)存管理達(dá)到在 spark 實例之間共享 RDDs.
雖然我們總是可以使用 RDDs 的血緣關(guān)系來恢復(fù)失敗的 RDDs 的計算, 但是如果這個血緣關(guān)系鏈很長的話, 則恢復(fù)是需要耗費不少時間的.因此, 將一些 RDDs 的數(shù)據(jù)持久化到穩(wěn)定存儲系統(tǒng)中是有必要的
一般來說, checkpointing 對具有很長的血緣關(guān)系鏈且包含了寬依賴的 RDDs 是非常有用的, 比如我們在 3.2.2 小節(jié)中提到的 PageRank 的例子. 在這些場景下, 集群中的某個節(jié)點的失敗會導(dǎo)致每一個父親 RDD 的一些數(shù)據(jù)的丟失, 進(jìn)而需要重新所有的計算. 與此相反的, 對于存儲在穩(wěn)定存儲系統(tǒng)中且是窄依賴的 RDDs (比如 3.2.1 小節(jié)中線性回歸例子中的 points 和 PageRank 中的 link 列表數(shù)據(jù)), checkpointing 可能一點用都沒有. 如果一個節(jié)點失敗了, 我們可以在其他的節(jié)點中并行的重新計算出丟失了數(shù)據(jù)的分區(qū), 這個成本只是備份整個 RDD 的成本的一點點而已.
spark 目前提供了一個 checkpointing 的 api ( persist 中的標(biāo)識為 REPLICATE , 還有 checkpoint ()), 但是需要將哪些數(shù)據(jù)需要 checkpointing 的決定權(quán)留給了用戶. 然而, 我們也在調(diào)查怎么樣自動的 checkpoing , 因為我們的調(diào)度系統(tǒng)知道數(shù)據(jù)集的大小以及第一次計算這個數(shù)據(jù)集花的時間, 所以有必要選擇一些最佳的 RDDs 來進(jìn)行 checkpointing , 來達(dá)到最小化恢復(fù)時間
最后, 需要知道的事 RDDs 天生的只讀的特性使的他們比一般的共享內(nèi)存系統(tǒng)做 checkpointing 更簡單了. 因為不用考慮數(shù)據(jù)的一致性, 我們可以不終止程序或者 take 快照, 然后在后臺將 RDDs 的數(shù)據(jù)寫入到存儲系統(tǒng)中.
我們通過在亞馬遜 EC 2 傷進(jìn)行一系列的實驗以及用用戶的應(yīng)用做基準(zhǔn)測試來評估 spark , 總的來說, 下面是我們的結(jié)論:
在迭代式機器學(xué)習(xí)和圖計算中, spark 以 20 倍的速度超過了 hadoop .提速的點主要是在避免了 I / O 操作以及將數(shù)據(jù)以 java 對象的形式存在內(nèi)存中從而降低了反序列化的成本.
用戶寫的應(yīng)用程序運行平穩(wěn)以及很好擴(kuò)展.特別的, 我們利用 spark 為一個分析報表提速了 40 倍, 相對于 hadoop 來說.
當(dāng)節(jié)點失敗的時候, spark 可以通過重新計算失去的 rdd 分區(qū)數(shù)據(jù)達(dá)到快速的恢復(fù).
spark 在查詢 1 TB 的數(shù)據(jù)的時候的延遲可以控制在 5 到 7 秒.
我們通過和 hadoop 對比, 展示迭代式機器學(xué)習(xí)( 6.1 節(jié))和 PageRank ( 6.2 節(jié))的基準(zhǔn)測試.然后我們評估了 spark 的錯誤恢復(fù)機制( 6.3 節(jié))以及當(dāng)內(nèi)存不足以存儲一個數(shù)據(jù)集的行為( 6.4 節(jié)), 最后我們討論了用戶應(yīng)用( 6.5 節(jié))和交互式數(shù)據(jù)挖掘( 6.6 節(jié))的結(jié)果 除非另外聲明, 我們都是用類型為 m 1.xlarge 的 EC 2 節(jié)點, 4 核以及 15 GB 內(nèi)存. 我們是有數(shù)據(jù)塊大小為 256 M 的 HDFS 存儲系統(tǒng). 在每一次測試之前, 我們都會清理 OS 的緩存, 以達(dá)到準(zhǔn)確的測量 IO 成本的目的
我們實現(xiàn)了兩種迭代式機器學(xué)習(xí)應(yīng)用, 線性回歸核 K - means , 來和下面的系統(tǒng)進(jìn)行性能的對比:
Hadoop:版本號為 0.20.0 的穩(wěn)定版.
HadoopBinMem:這個系統(tǒng)在迭代的一開始會將輸入數(shù)據(jù)轉(zhuǎn)換成底開銷的二進(jìn)制形式, 這樣可以為接下來的迭代消除解析文本數(shù)據(jù)的開銷, 并且將數(shù)據(jù)存儲在 hdfs 實例的內(nèi)存中.
Spark:我們的 RDDs 的實現(xiàn).
我們在 25-100 臺機器上存儲 100 G 數(shù)據(jù), 兩種算法都是對這 100 G 數(shù)據(jù)跑 10 次迭代. 兩個應(yīng)用之間的關(guān)鍵不同點是他們對相同數(shù)據(jù)的計算量不一樣. K-means 的迭代時間都是花在計算上, 然而線性回歸是一個計算量不大, 時間都是花在反序列化和 I/O 上. 由于典型的機器學(xué)習(xí)算法都是需要上千次的迭代來達(dá)到收斂, 所以我們將第一次迭代花的時間和接下來的迭代時間分開顯示. 我們發(fā)現(xiàn)通過 RDDs 的共享數(shù)據(jù)極大的提高了后續(xù)的迭代速度
圖七:在 100 臺機器的集群上分別用 hadoop 滋戳、 hadoopBinMem 以及 spark 對 100 GB 的數(shù)據(jù)進(jìn)行,線性回歸和 k - means 的首次迭代和隨后迭代花的時間
首次迭代:三個系統(tǒng)在首次迭代中都是讀取 HDFS 中的數(shù)據(jù), 從圖七的條形圖中我們可以看出, 在實驗中, spark 穩(wěn)定的比 hadoop 要快. 這個是由于 hadoop 主從節(jié)點之間的心跳信息的信號開銷導(dǎo)致的. HadoopBinMen 是最慢的, 這個是因為它啟動了一個額外的 MapReduce 任務(wù)來將數(shù)據(jù)轉(zhuǎn)換為二進(jìn)制, 它還需要通過網(wǎng)絡(luò)傳輸數(shù)據(jù)以達(dá)到備份內(nèi)存中的數(shù)據(jù)的目的.隨后的迭代:圖七也顯示了隨后迭代花的平均時間. 圖八則是顯示了集群大小不斷擴(kuò)展時候的花的時間. 對于線性回歸, 在 100 臺機器上, spark 分別比 hadoop 和 hadoopBinMem 快上 25.3 倍和 20.7 倍. 對于計算型的 k - means 應(yīng)用, spark 仍然分別提高了 1.9 倍和 3.2 倍.
圖八: hadoop 钻蔑、 hadoopBinMem 以及 spark 在隨后的迭代花的時間, 都是處理 100 G 的數(shù)據(jù)
理解為什么提速了: 我們驚奇的發(fā)現(xiàn) spark 甚至比基于內(nèi)存存儲二進(jìn)制數(shù)據(jù)的 hadoopBinMem 還要快 20 倍. 在 hadoopBinMem 中, 我們使用的是 hadoop 標(biāo)準(zhǔn)的二進(jìn)制文件格式( sequenceFile )和 256 m 這么大的數(shù)據(jù)塊大小, 以及我們強制將 hadoop 的數(shù)據(jù)目錄放在一個內(nèi)存的文件系統(tǒng)中. 然而, Hadoop 仍然因為下面幾點而比 spark 慢:
Hadoop 軟件棧的最低開銷.
HDFS 提供數(shù)據(jù)服務(wù)的開銷.
將二進(jìn)制數(shù)據(jù)轉(zhuǎn)換成有效的內(nèi)存中的 java 對象的反序列化的成本開銷.
我們依次來調(diào)查上面的每一個因素.為了測量第一個因素, 我們跑了一些空的 hadoop 任務(wù), 我們發(fā)現(xiàn)單單完成 job 的設(shè)置、任務(wù)的啟動以及任務(wù)的清理等工作就花掉了至少 25 秒鐘. 對于第二個元素, 我們發(fā)現(xiàn) HDFS 需要執(zhí)行多份內(nèi)存數(shù)據(jù)的拷貝以及為每一個數(shù)據(jù)塊做 checksum 計算.
最后, 為了測試第 3 個因素, 我們在單機上做了一個微型的基準(zhǔn)測試, 就是針對不同文件類型的 256 M 數(shù)據(jù)來跑線性回歸計算. 我們特別的對比了分別從 HDFS 文件( HDFS 技術(shù)棧的耗時將會很明顯)和本地內(nèi)存文件(內(nèi)核可以很高效的將數(shù)據(jù)傳輸給應(yīng)用程序)中處理文本和二進(jìn)制類型數(shù)據(jù)所話的時間奸鸯、
圖九中是我們我們測試結(jié)果的展示. 從 In - memory HDFS (數(shù)據(jù)是在本地機器中的內(nèi)存中)中讀數(shù)據(jù)比從本地內(nèi)存文件中讀數(shù)據(jù)要多花費 2 秒中.解析文本文件要比解析二進(jìn)制文件多花費 7 秒鐘. 最后, 即使從本地內(nèi)存文件中讀數(shù)據(jù), 但是將預(yù)先解析了二進(jìn)制數(shù)據(jù)轉(zhuǎn)換成 java 對象也需要 3 秒鐘, 這個對于線性回歸來說也是一個非常耗時的操作. Spark 將 RDDs 所有元素以 java 對象的形式存儲在內(nèi)存中, 進(jìn)而避免了上述說的所有的耗時
我們分別用 spark 和 hadoop 對 54 GB 的維基百科的轉(zhuǎn)儲數(shù)據(jù)進(jìn)行了 PageRank 機器學(xué)習(xí), 并比對了它們的性能. 我們用 PageRank 的算法處理了大約 4 百萬相互連接的文章, 并進(jìn)行了 10 次迭代. 圖十展示了在 30 個節(jié)點上, 只用內(nèi)存存儲使的 spark 擁有了比 hadoop 2.4 倍的性能提升. 另外, 就和 3.2.2 小節(jié)討論的, 如果控制 RDD 的分區(qū)使的迭代之間數(shù)據(jù)的平衡更可以使的性能速度提升到 7.2 倍. 將節(jié)點數(shù)量擴(kuò)展到 60 個, spark 的性能速度提升也是上面的結(jié)果
圖十:分別基于 Hadoop 和 spark 的 PageRank 的性能對比
我們也評估了在第 7.1 節(jié)中提到的用我們基于 spark 而實現(xiàn)的 Pregel 重寫的 PageRank .迭代次數(shù)和圖十是一樣的, 但是慢了 4 秒鐘, 這個是因為每一次迭代 Pregel 都要跑額外的操作來讓頂點進(jìn)行投票決定是否需要結(jié)束任務(wù).
我們評估了當(dāng)在 k - means 應(yīng)用中一個節(jié)點失敗了而利用 RDD 的血緣關(guān)系鏈來重建 RDD 的分區(qū)需要的成本.圖十一對比的是在 75 個節(jié)點中運行 10 次迭代的 k - means 正常情況和在第 6 次迭代一個節(jié)點失敗的情況. 如果沒有任何失敗的話, 每一次迭代都是在 100 GB 的數(shù)據(jù)上跑 400 個 tasks.
圖十一:出現(xiàn)了失敗的 k - means 每次迭代時間.在第 6 次迭代中一臺機器被殺掉了, 導(dǎo)致需要利用血緣關(guān)系鏈重建 RDD 的部分分區(qū)
第五次迭代的時間是 58 秒. 在第 6 次迭代, 一臺機器被殺死了, 導(dǎo)致丟失了運行在這臺機器上的 tasks 以及存儲在這臺機器上的 RDD 分區(qū)數(shù)據(jù). Spark 在其他機器節(jié)點上重新讀取相應(yīng)的輸入數(shù)據(jù)以及通過血緣關(guān)系來重建 RDD , 然后并行的重跑丟失的 tasks , 使的這次迭代的時間增加到 80s. 一旦丟失的 RDD 分區(qū)數(shù)據(jù)重建好了, 迭代的時間又回到了 58s.
需要注意的是, 如果是基于 checkpoint 的容錯機制的話, 那么需要通過重跑好幾個迭代才能恢復(fù), 需要重跑幾個取決于 checkpoints 的頻率. 此外, 系統(tǒng)需要通過網(wǎng)絡(luò)傳輸來備份應(yīng)用需要的 100GB 數(shù)據(jù)(被轉(zhuǎn)化為二進(jìn)制的文本數(shù)據(jù)), 以及為了達(dá)到在內(nèi)存中備份數(shù)據(jù)而消耗掉 2 倍的內(nèi)存, 或者等待將 100GB 數(shù)據(jù)寫入到磁盤中. 與此相反的是, 在我們的例子中每一個 RDDs 的血緣關(guān)系圖的大小都是小于 10KB 的.
在目前為止, 我們都是假設(shè)集群中的每一臺機器都是有足夠的內(nèi)存來存儲迭代之間的 RDDs 的數(shù)據(jù)的. 當(dāng)沒有足夠的內(nèi)存來存儲任務(wù)的數(shù)據(jù)的時候 spark 是怎么運行的呢咪笑? 在這個實驗中, 我們給 spark 每一個節(jié)點配置很少的內(nèi)存, 這些內(nèi)存不足以存儲的下 RDDs. 我們在圖十二中, 我們展示了不同存儲空間下的運行線性回歸應(yīng)用需要的時間. 可以看出, 隨著空間的減少, 性能速度慢慢的下降:
圖十二: 每次都是使用不同的內(nèi)存, 然后在 25 臺機器中對 100 GB 的數(shù)據(jù)運行線性回歸的性能對比圖
6.5 用 spark 構(gòu)建的用戶應(yīng)用
內(nèi)存中分析: Conviva Inc 是一個視頻提供商, 他們用 spark 來加速之前在 hadoop 上運行的幾個數(shù)據(jù)報表分析. 比如, 其中一個報表是運行一系列的 Hive 查詢來計算一個用戶的各種統(tǒng)計信息. 這些查詢都是基于相同的數(shù)據(jù)子集(基于自定義的過濾器過濾出來的數(shù)據(jù))但是需要很多 MapReduce 任務(wù)來為分組字段進(jìn)行聚合運算(平均值、百分位數(shù)值以及 count distinct). 將這些數(shù)據(jù)子集創(chuàng)建成一個可以共享的 spark 中的 RDD 來實現(xiàn)這些查詢使的這個報表的速度提升了 40 倍. 對 200 GB 已經(jīng)壓縮的數(shù)據(jù)在 hadoop 集群上跑這個報表花了 20 個小時, 但是利用 2 臺機器的 spark 只用了 30 分鐘而已. 此外, spark 程序只花了 96 G 的內(nèi)存, 因為只需要將報表關(guān)心的列數(shù)據(jù)存儲在內(nèi)存中進(jìn)行共享就行, 而不是所有的解壓后的數(shù)據(jù).
交通模型:伯克利分校的 Mobile Millennium 項目組的研究員在收集到的零星的汽車的 GPS 信息上并行運行一個機器學(xué)習(xí)算法試圖推斷出道路交通是否擁擠. 在都市區(qū)域道路網(wǎng)絡(luò)中的 10000 條道路和 600000 個裝有 GPS 設(shè)備的汽車點對點的旅行時間(每一條路線的旅行時間可能包含了多條道路)樣本是數(shù)據(jù)源. 利用交通模型可以估算出通過每一條獨立的道路需要多長時間. 研究人員利用 EM 算法來訓(xùn)練模型, 這個算法在迭代的過程中重復(fù)執(zhí)行 map 和 reduceByKey 步驟. 這個應(yīng)用近似線性的將機器規(guī)模從 20 臺擴(kuò)展到 80 臺, 每臺機器 4 個 cores , 如圖 13 ( a )所示.
圖十三:兩個用 spark 實現(xiàn)的用戶應(yīng)用的每次迭代的時間, 誤差線表示標(biāo)準(zhǔn)誤差
推特垃圾郵件分類:伯克利分校的 Monarch 項目利用 spark 來標(biāo)記推特消息中的垃圾鏈接. 它們實現(xiàn)的線性回歸分類器和第 6.1 節(jié)中很相似, 但是他們利用了分布式的 reduceByKey 來并行的累加梯度向量值. 圖 13(b) 中顯示了對 50 GB 的數(shù)據(jù)子集進(jìn)行分類訓(xùn)練需要的時間(隨著機器擴(kuò)展), 這個數(shù)據(jù)子集中包含了 25000 URLs 以及每一個 url 對應(yīng)的頁面的網(wǎng)絡(luò)和內(nèi)容屬性相關(guān)的 10000000 個特征/緯度. 圖 13(b) 中的時間不是線性的下降是因為每一次迭代花費了很高的且固定的通訊成本.
為了演示 spark 在交互查詢大數(shù)據(jù)集的能力, 我們來分析 1 TB 的維基頁面訪問日志數(shù)據(jù)( 2 年的數(shù)據(jù)). 在這個實驗中, 我們使用 100 個 m 2.4 xlarge EC 2 實例, 每一個實例 8 個 cores 以及 68 G 內(nèi)存. 我們查詢出( 1 )所有頁面的瀏覽量, ( 2 )頁面標(biāo)題等于某個單詞的頁面的瀏覽量以及( 3 )頁面標(biāo)題部分的等于某個單詞的頁面的瀏覽量. 每一個查詢都是掃描整個輸入數(shù)據(jù)集.
圖十四展示的分別是查詢整個數(shù)據(jù)集娄涩、一半數(shù)據(jù)集一集十分之一的數(shù)據(jù)集的響應(yīng)時間. 即使是 1 TB 的數(shù)據(jù), 用 spark 來查詢僅僅花了 5-7 秒而已.這個比查詢磁盤數(shù)據(jù)的速度快了一個數(shù)量級, 比如, 查詢磁盤文件中的 1 TB 數(shù)據(jù)需要 170 秒.這個可以說明 RDDs 使的 spark 是一個非常強大的交互型數(shù)據(jù)挖掘的工具.
雖然由于 RDDs 的天然不可變性以及粗粒度的轉(zhuǎn)換導(dǎo)致它們似乎提供了有限制的編程接口, 但是我們發(fā)現(xiàn)它們適合很多類型的應(yīng)用. 特別的, RDDs 可以表達(dá)出現(xiàn)在各種各樣的框架提出的編程模型, 而且還可以將這些模型組合在同一個程序中(比如跑一個 MapReduce 任務(wù)來創(chuàng)建一個圖, 然后基于這個圖來運行 Pregel )以及可以在這些模型中共享數(shù)據(jù). 在這一章中, 我們在第 7.1 節(jié)中討論 RDDs 可以表達(dá)哪些模型以及為什么適合表達(dá)這些編程模型. 另外, 我們在第 7.2 節(jié)中討論我們推崇的 RDD 的血緣信息的好處, 利用這些信息可以幫助我們 debug 模型.
對于到目前為止很多獨立提出的編程模型, RDDs 都可以高效的表達(dá)出來. 這里所說的 “高效”, 不僅僅是指使用 RDDs 的輸出結(jié)果和獨立提出的編程模型狂簡的輸出結(jié)果是一致的, 而且 RDDs 在優(yōu)化性能方面比這些框架還要強大, 比如將特定的數(shù)據(jù)保存在內(nèi)存中窗怒、對數(shù)據(jù)分區(qū)以減少網(wǎng)絡(luò)傳輸以及高效的從錯誤中恢復(fù). 可以用 RDDs 表達(dá)的模型如下:
MapReduce: 可以利用 spark 中的 flatMap 和 groupByKey 操作來表達(dá)這個模型, 或者如果需要聚合的話可以使用 reduceByKey .
DryadLINQ: DryadLINQ 系統(tǒng)比 MapReduce 更多的操作, 但是這些操作都是直接和 RDD 的轉(zhuǎn)換操作( map , groupByKey , join 等)對應(yīng)的批量操作.
SQL: 和 DryadLINQ 一樣, SQL 查詢都是對一個數(shù)據(jù)集進(jìn)行并行的操作計算.
Pregel: Google 的 Pregel 是一個專門解決迭代圖計算應(yīng)用的模型, 它一開始看起來和面向數(shù)據(jù)集的編程模型的其他系統(tǒng)完全不同.在 Pregel 中, 一個程序運行一些列的相互協(xié)調(diào)的“ supersteps ”.在每一個 superstep 上, 對圖上的每一個頂點運行用戶自定義的函數(shù)來更新這個頂點的相關(guān)的狀態(tài)映跟、改變圖的拓?fù)浣Y(jié)構(gòu)以及向其他頂點發(fā)送下一個 superstep 需要的消息.這種模型可以表達(dá)非常多的圖計算算法, 包括最短路徑、二部圖匹配以及 PageRank.
Pregel 在每一次迭代中都是對所有頂點應(yīng)用相同的用戶定義的函數(shù), 這個是使的我們用 RDDs 來實現(xiàn)這個模型的關(guān)鍵點. 因此, 每次迭代后, 我們都可以將頂點的狀態(tài)保存在 RDD 中, 然后執(zhí)行一個批量轉(zhuǎn)換操作( apply )來應(yīng)用這個函數(shù)以及生成一個消息的 RDD . 然后我們可以用這個 RDD 通頂點的狀態(tài)進(jìn)行 join 來完成消息的交換. 和 Pregel 一樣, RDDs 允許將點的狀態(tài)保存在內(nèi)存中扬虚、控制它們的分區(qū)以減少網(wǎng)絡(luò)通訊以及指出從失敗中恢復(fù). 我們在 spark 上用了 200 行代碼的包實現(xiàn)了 Pregel , 讀者可以參考第 33 個文獻(xiàn)來了解更多的細(xì)節(jié)
迭代 MapReduce: 最近提出的幾個系統(tǒng), 包括 HaLoop 和 Twister , 它們提供了可以讓用戶循環(huán)跑一系列的 MapReduce 任務(wù)的迭代式 MapReduce 模型.這些系統(tǒng)在迭代之間保持?jǐn)?shù)據(jù)分區(qū)一致, Twister 也可以將數(shù)據(jù)保存在內(nèi)存中. RDDs 可以很簡單的表達(dá)以上兩個優(yōu)化, 而且我們基于 spark 花了 200 行代碼實現(xiàn)了 HaLoop.
批量流處理: 研究人員最近提出了一些增量處理系統(tǒng), 這些系統(tǒng)是為定期接受新數(shù)據(jù)然后根據(jù)數(shù)據(jù)更新結(jié)果的應(yīng)用服務(wù)的.比如, 一個應(yīng)用需要實時接收新數(shù)據(jù), 然后每 15 分鐘就將接收到的數(shù)據(jù)和前面 15 分鐘的時間窗口的數(shù)據(jù)進(jìn)行 join 聚合, 將聚合的結(jié)果更新到統(tǒng)計數(shù)據(jù)中.這些系統(tǒng)執(zhí)行和 Dryad 類似的批處理, 但是它們將應(yīng)用的狀態(tài)數(shù)據(jù)存儲在分布式系統(tǒng)中.將中間結(jié)果放在 RDDs 中可以提高處理速度.
闡釋 RDDs 的表達(dá)力為什么這么豐富:為什么 RDDs 可以表達(dá)多種多樣編程模型努隙?原因就是 RDDs 的限制性對很多并行計算的應(yīng)用的影響是很小的.特別指出的是, 雖然 RDDs 只能通過批量轉(zhuǎn)換而得到, 但是很多的并行計算的程序都是將相同的操作應(yīng)用到大量的數(shù)據(jù)條目中, 這樣使的 RDDs 的表達(dá)力變的豐富.類似的, RDDs 的不變性并不是障礙, 因為我們可以創(chuàng)建多個 RDDs 來表達(dá)不同版本的相同數(shù)據(jù)集.事實上, 現(xiàn)在很多的 MapReduce 的應(yīng)用都是運行在不能對文件修改數(shù)據(jù)的文件系統(tǒng)中, 比如 HDFS.
最后一個問題是為什么之前的框架沒有提供這中通用型的表達(dá)能力呢? 我們相信這個是因為這些框架解決的是 MapReduce 和 Dryad 不能解決的特殊性的問題, 比如迭代, 它們沒有洞察到這些問題的共同原因是因為缺少了數(shù)據(jù)共享的抽象.
當(dāng)我們一開始設(shè)計 RDDs 通過重新計算來達(dá)到容錯的時候, 這種特性同時也促使了 debugging 的產(chǎn)生. 特別的, 在一個任務(wù)中通過記錄 RDDs 的創(chuàng)建的血緣, 我們可以:
后面可以重新構(gòu)建這些 RDDs 以及可以讓用戶交互性的查詢它們.
通過重新計算其依賴的 RDD 分區(qū)來達(dá)到在一個進(jìn)程 debugger 中重跑任何的任務(wù).
和傳統(tǒng)的通用分布式系統(tǒng)的重跑 debugger 不一樣, 傳統(tǒng)的需要捕獲和引用多個節(jié)點之間的事件發(fā)生的順序, RDDs 這種 debugger 方式不需要依賴任何的數(shù)據(jù), 而只是需要記錄 RDD 的血緣關(guān)系圖.我們目前正在基于這些想法來開發(fā)一個 spark debbger.
集群編程模型:集群編程模型的相關(guān)工作分為以下幾類:
第一, 像 MapReduce , Dryad 以及 Ciel 一樣支持一系列處理數(shù)據(jù)的操作, 并且需要通過穩(wěn)定的存儲系統(tǒng)來共享數(shù)據(jù), RDDs 表達(dá)了一種比穩(wěn)定存儲系統(tǒng)更高效的數(shù)據(jù)共享抽象, 因為它避免了數(shù)據(jù)備份辜昵、 I / O 以及序列化的成本.
第二, 幾個數(shù)據(jù)流系統(tǒng)的高層面上的編程接口, 包括 DryadLINQ 和 FlumeJava ,它們提供了語言集成 api , 使的用戶可以通過像 map 和 join 等操作來操作并行的集合.然而, 在這些系統(tǒng)中, 并行的集合是指在磁盤上的文件或者一個查詢計劃表達(dá)的臨時數(shù)據(jù)集.雖然這些系統(tǒng)在相同的查詢中的操作之間組裝數(shù)據(jù)的 pipeline (比如, 一個 map 操作后跟另外一個 map ),但是它們不能在查詢之間進(jìn)行高效的數(shù)據(jù)共享.我們在并行集合模式上建立 spark api , 是由于它的便利性以及在集成語言接口上不要求新穎性, 但是我們基于在這些接口背后以 RDDs 作為存儲抽象, 就可以使的 spark 支持大量類型的應(yīng)用了.
第三種系統(tǒng)為許多專門的需要數(shù)據(jù)共享的應(yīng)用提供了高層的接口.比如, pregel 支持迭代式的圖計算應(yīng)用荸镊、 Twister 和 HaLoop 支持迭代式的 MapReduce .然而, 這些框架只是為他們支持的計算模型隱式的共享數(shù)據(jù), 并沒有提供可以讓用戶根據(jù)自己的需求顯式的共享數(shù)據(jù)的通用抽象.比如, 一個用戶不能用 Pregel 或者 Twister 將數(shù)據(jù)加載到內(nèi)存中然后決定在數(shù)據(jù)集上面跑什么樣的查詢. RDDs 提供了一個顯式的分布式存儲抽象, 因此可以支持那些特殊系統(tǒng)不能支持的應(yīng)用, 比如交互式數(shù)據(jù)挖掘.
最后, 一些系統(tǒng)暴露共享可變狀態(tài)以使的用戶可以執(zhí)行內(nèi)存計算. 比如, Piccolo 使的用戶通過分布式的函數(shù)來讀取和更新分布式 hash 表中的單元格數(shù)據(jù). DSM 和像 RAMCloud 這樣的 key - value 存儲系統(tǒng)提供了類似的模型. RDDs 和這些系統(tǒng)有兩個方面的不同, 第一, RDDs 基于像 map , sot 以及 join 等操作提供了高層的編程接口, 然而, 在 Piccolo 和 DSM 中的接口只是讀取和更新表的單元格數(shù)據(jù). 第二, Piccolo 和 DSM 通過 checkpoint 和回滾機制實現(xiàn)容錯, 在許多應(yīng)用中這種機制比機遇血緣機制的 RDDs 的容錯的成本更大. 最后, 如 2.3 小節(jié)討論的, 相對于 DSM , RDDs 還提供了其他的優(yōu)勢功能, 比如執(zhí)行慢的 task 的處理機制
緩存系統(tǒng): Nectar 可以通過標(biāo)識通用的程序分析的子表達(dá)式來達(dá)到在 DryadLINQ 任務(wù)之間對中間數(shù)據(jù)結(jié)果的復(fù)用. 這種能力肯定會加入到基于 RDD 的系統(tǒng)中. 然而, Nectar 即沒有提供基于內(nèi)存的緩存(他是將數(shù)據(jù)放到分布式文件系統(tǒng)中)也不能讓用戶可以顯式的對數(shù)據(jù)集進(jìn)行緩存控制和分區(qū)控制. Ciel 和 FlumeJava 同樣可以緩存任務(wù)結(jié)果, 但是也不提供基于內(nèi)存的緩存或者顯式的控制哪些數(shù)據(jù)可以緩存
Ananthanarayanan et al 已經(jīng)提出在分布式文件系統(tǒng)上加一層基于內(nèi)存的緩存來利用數(shù)據(jù)訪問的暫時性和本地性. 這種解決方案卻是加快了已經(jīng)存在于文件系統(tǒng)中的數(shù)據(jù)訪問速度, 但是它在共享同一個應(yīng)用中的中間結(jié)果方面并沒有 RDD 高效, 因為它在每一個 stage 之間仍然需要將數(shù)據(jù)寫入到文件系統(tǒng)中
血緣:在科學(xué)計算以及數(shù)據(jù)庫中, 捕獲數(shù)據(jù)的血緣或者來源信息是一個很長時間被研究的話題了, 對于像解釋結(jié)果的應(yīng)用, 需要讓他們可以從其他的應(yīng)用重新產(chǎn)生, 且當(dāng)在工作流中存在一個 bug 或者數(shù)據(jù)丟失的時候可以重新對數(shù)據(jù)進(jìn)行計算. 對于這邊面的容錯的工作, 我們推薦讀者看第 [5] 和 [9]的資料. RDDs 提供了一種并行編程模型, 記錄跟蹤細(xì)粒度的血緣的成本很低, 我們可以根據(jù)血緣來達(dá)到容錯的目的.
我們基于血緣的容錯機制和 MapReduce 以及 Dryad 一個任務(wù)中的容錯機制是類似的, 在 DAG 中跟蹤任務(wù)的依賴. 然而, 在這些系統(tǒng)中, 一個任務(wù)結(jié)束后血緣信息也就丟失了, 用戶需要依靠數(shù)據(jù)備份式的存儲系統(tǒng)來共享任務(wù)之間的數(shù)據(jù). 與此相反, RDDs 可以在任務(wù)之間通過將數(shù)據(jù)存儲在內(nèi)存中達(dá)到高效的數(shù)據(jù)共享, 并不需要數(shù)據(jù)的備份和磁盤的 I/O 關(guān)系型數(shù)據(jù)庫: RDDs 在概念上和數(shù)據(jù)庫中的視圖比較類似, 存儲的 RDDs 則像具體的視圖. 然而, 像 DSM 系統(tǒng), 數(shù)據(jù)庫一般允許細(xì)粒度的對所有的數(shù)據(jù)進(jìn)行讀寫訪問, 這種方式需要對操作和數(shù)據(jù)進(jìn)行記錄日志, 用于容錯, 以及需要額外的開銷來保持?jǐn)?shù)據(jù)的一致性, 對于粗粒度轉(zhuǎn)換模型的 RDDs 來說, 這些額外的開銷式不需要的.
我們已經(jīng)展示了在集群應(yīng)用中一個高效的, 通用的以及容錯的對共享數(shù)據(jù)的抽象的 RDDs . RDDs 可以表達(dá)大量的并行應(yīng)用, 包括特殊的編程模型提出的迭代式計算以及這些模型表達(dá)不了的新的應(yīng)用. 和已經(jīng)存在的對集群存儲抽象不同的是, RDDs 提供了基于粗粒度轉(zhuǎn)換的 api , 可以使的用戶通過血緣達(dá)到高效的容錯. 我們在 spark 系統(tǒng)中實現(xiàn)了 RDDs, 在迭代式的應(yīng)用中, 性能是 hadoop 的 20 倍, 并且可以用于交互式的查詢數(shù)百 GB 的數(shù)據(jù).
我們已經(jīng)在 spark-project.org 中開源了 Spark, 作為一個穩(wěn)定的數(shù)據(jù)分析和系統(tǒng)研究的手段.
We thank the first Spark users, including Tim Hunter, Lester Mackey, Dilip Joseph, and Jibin Zhan, for trying out our system in their real applications, providing many good suggestions, and identifying a few research challenges along the way. We also thank our shepherd, Ed Nightingale, and our reviewers for their feedback. This research was supported in part by Berkeley AMP Lab sponsors Google, SAP, Amazon Web Services, Cloudera, Huawei, IBM, Intel, Microsoft, NEC, NetApp and VMWare, by DARPA (contract #FA8650-11-C-7136), by a Google PhD Fellowship, and by the Natural Sciences and Engineering Research Council of Canada.
[1] ApacheHive.http://hadoop.apache.org/hive.
[2] Scala.http://www.scala-lang.org.
[3] G.Ananthanarayanan,A.Ghodsi,S.Shenker,andI.Stoica. Disk-locality in datacenter computing considered irrelevant. In HotOS ’11, 2011.
[4] P.Bhatotia,A.Wieder,R.Rodrigues,U.A.Acar,and R. Pasquin. Incoop: MapReduce for incremental computations. In ACM SOCC ’11, 2011.
[5] R.BoseandJ.Frew.Lineageretrievalforscientificdata processing: a survey. ACM Computing Surveys, 37:1–28, 2005.
[6] S.BrinandL.Page.Theanatomyofalarge-scalehypertextual web search engine. In WWW, 1998.
[7] Y.Bu,B.Howe,M.Balazinska,andM.D.Ernst.HaLoop: efficient iterative data processing on large clusters. Proc. VLDB Endow., 3:285–296, September 2010.
[8] C.Chambers,A.Raniwala,F.Perry,S.Adams,R.R.Henry, R. Bradshaw, and N. Weizenbaum. FlumeJava: easy, efficient data-parallel pipelines. In PLDI ’10. ACM, 2010.
[9] J.Cheney,L.Chiticariu,andW.-C.Tan.Provenancein databases: Why, how, and where. Foundations and Trends in Databases, 1(4):379–474, 2009.
[10] J.DeanandS.Ghemawat.MapReduce:Simplifieddata processing on large clusters. In OSDI, 2004.
[11] J. Ekanayake, H. Li, B. Zhang, T. Gunarathne, S.-H. Bae, J. Qiu, and G. Fox. Twister: a runtime for iterative mapreduce. In HPDC ’10, 2010.
[12] P.K.Gunda,L.Ravindranath,C.A.Thekkath,Y.Yu,and L. Zhuang. Nectar: automatic management of data and computation in datacenters. In OSDI ’10, 2010.
[13] Z.Guo,X.Wang,J.Tang,X.Liu,Z.Xu,M.Wu,M.F. Kaashoek, and Z. Zhang. R2: an application-level kernel for record and replay. OSDI’08, 2008.
[14] T.Hastie,R.Tibshirani,andJ.Friedman.TheElementsof Statistical Learning: Data Mining, Inference, and Prediction. Springer Publishing Company, New York, NY, 2009.
[15] B.He,M.Yang,Z.Guo,R.Chen,B.Su,W.Lin,andL.Zhou. Comet: batched stream processing for data intensive distributed computing. In SoCC ’10.
[16] A.Heydon,R.Levin,andY.Yu.Cachingfunctioncallsusing precise dependencies. In ACM SIGPLAN Notices, pages 311–320, 2000.
[17] B.Hindman,A.Konwinski,M.Zaharia,A.Ghodsi,A.D. Joseph, R. H. Katz, S. Shenker, and I. Stoica. Mesos: A platform for fine-grained resource sharing in the data center. In NSDI ’11.
[18] T.Hunter,T.Moldovan,M.Zaharia,S.Merzgui,J.Ma,M.J. Franklin, P. Abbeel, and A. M. Bayen. Scaling the Mobile Millennium system in the cloud. In SOCC ’11, 2011.
[19] M. Isard, M. Budiu, Y. Yu, A. Birrell, and D. Fetterly. Dryad: distributed data-parallel programs from sequential building blocks. In EuroSys ’07, 2007. [20] S.Y.Ko,I.Hoque,B.Cho,andI.Gupta.Onavailabilityof intermediate data in cloud computations. In HotOS ’09, 2009.
[21] D. Logothetis, C. Olston, B. Reed, K. C. Webb, and K. Yocum. Stateful bulk processing for incremental analytics. SoCC ’10.
[22] G.Malewicz,M.H.Austern,A.J.Bik,J.C.Dehnert,I.Horn, N. Leiser, and G. Czajkowski. Pregel: a system for large-scale graph processing. In SIGMOD, 2010.
[23] D.G.Murray,M.Schwarzkopf,C.Smowton,S.Smith, A. Madhavapeddy, and S. Hand. Ciel: a universal execution engine for distributed data-flow computing. In NSDI, 2011.
[24] B.NitzbergandV.Lo.Distributedsharedmemory:asurveyof issues and algorithms. Computer, 24(8):52 –60, Aug 1991.
[25] J.Ousterhout,P.Agrawal,D.Erickson,C.Kozyrakis, J. Leverich, D. Mazie`res, S. Mitra, A. Narayanan, G. Parulkar, M. Rosenblum, S. M. Rumble, E. Stratmann, and R. Stutsman. The case for RAMClouds: scalable high-performance storage entirely in DRAM. SIGOPS Op. Sys. Rev., 43:92–105, Jan 2010. [26] D.PengandF.Dabek.Large-scaleincrementalprocessingusing distributed transactions and notifications. In OSDI 2010.
[27] R.PowerandJ.Li.Piccolo:Buildingfast,distributedprograms with partitioned tables. In Proc. OSDI 2010, 2010.
[28] R.RamakrishnanandJ.Gehrke.DatabaseManagement Systems. McGraw-Hill, Inc., 3 edition, 2003.
[29] K.Thomas,C.Grier,J.Ma,V.Paxson,andD.Song.Designand evaluation of a real-time URL spam filtering service. In IEEE Symposium on Security and Privacy, 2011. [30] J.W.Young.Afirstorderapproximationtotheoptimum checkpoint interval. Commun. ACM, 17:530–531, Sept 1974.
[31] Y.Yu,M.Isard,D.Fetterly,M.Budiu,U ?.Erlingsson,P.K. Gunda, and J. Currey. DryadLINQ: A system for general-purpose distributed data-parallel computing using a high-level language. In OSDI ’08, 2008.
[32] M.Zaharia,D.Borthakur,J.SenSarma,K.Elmeleegy,S. Shenker, and I. Stoica. Delay scheduling: A simple technique for achieving locality and fairness in cluster scheduling. In EuroSys ’10, 2010.
[33] M.Zaharia,M.Chowdhury,T.Das,A.Dave,J.Ma,M. McCauley, M. Franklin, S. Shenker, and I. Stoica. Resilient distributed datasets: A fault-tolerant abstraction for in-memory cluster computing. Technical Report UCB/EECS-2011-82, EECS Department, UC Berkeley, 2011.
http://people.csail.mit.edu/matei/papers/2012/nsdi_spark.pdf
@老湯(老湯)
@wangyangting(那伊抹微笑)
原文地址: http://spark.apachecn.org/paper/zh/spark-rdd.html
網(wǎng)頁地址: http://spark.apachecn.org/
github: https://github.com/apachecn/spark-doc-zh(覺得不錯麻煩給個 Star,謝謝堪置!~)