Resilient Distribute DataSet
RDD是Spark最核心的理念, 要掌握Spark, 首先要把RDD這個概念搞明白. 下面我將嘗試去解釋一下RDD的概念.
如果你使用過Scala的集合類庫, 那么你會發(fā)現(xiàn)RDD和它的API非常一致. 在Scala中我們經(jīng)常使用map, foreach, flatMap等這些函數(shù), 而你和RDD打交道, 無非就是這幾個函數(shù). 招式是一樣的. 從這個層面看, 你可以把RDD看成是Scala中的一個非變集合(immutable collection).
RDD不同于Scala集合的地方正在于它名字的前兩個字母distribute和resilient. 說句玩笑話, 雖然Scala集合和RDD招式上一致(都源于Monad), 但是RDD的內(nèi)功比Scala集合深厚了不知多少倍. 好了, 我們接著來看剛才提到的RDD最重要的兩個特性. Distribute, 這個特性說明RDD可以分布到多臺機(jī)器上執(zhí)行; Resilient有可復(fù)原, 可恢復(fù)的意思, 這表示RDD是可以重新構(gòu)建的, 具備容錯性的.
總結(jié)一下, RDD就是一種具有容錯性和可并行執(zhí)行的數(shù)據(jù)結(jié)構(gòu). 接下來看一下RDD是如何做到這兩點(diǎn)的.
Distribute
首先, 我們來寫一個簡單的統(tǒng)計單詞數(shù)量的示例:
val textFile = sc.textFile(inputFile, 3) //讀取文件, 指定分區(qū)數(shù)量
textFile
.map(_.split(" ")) //把每行按找空格分割, 得到單詞的數(shù)組
.map(_.length) //求出每行的單詞個數(shù)
val wordCount = .reduce(_ + _) //匯總
textFile函數(shù)是可以攜帶兩個參數(shù)的, 第一個是我們要輸入的文件, 第二個是partitions的個數(shù). partition這個參數(shù)就是和distribute緊密關(guān)聯(lián)的. RDD在執(zhí)行的時候是以分區(qū)為基本單位的, 每個分區(qū)持有一定數(shù)量的數(shù)據(jù), 各個分區(qū)在執(zhí)行的時候是相互獨(dú)立, 并行執(zhí)行的. RDD可分區(qū)特性為它并行執(zhí)行提供了前提.
Resilient
說Resilient之前先稍微鋪墊一下. RDD提供了兩種API: transformation(轉(zhuǎn)換) & action(執(zhí)行). 像map, flatMap, filter等這些API都是轉(zhuǎn)換操作. 還有RDD是延遲執(zhí)行(lazy evaluate)的, 轉(zhuǎn)換操作并不會觸發(fā)真正的計算, 它只是向RDD提交執(zhí)行計劃. RDD真正的執(zhí)行是由action函數(shù)觸發(fā)的, action函數(shù)有reduce, take, count等. 轉(zhuǎn)換和執(zhí)行函數(shù)還有很多API詳細(xì)參照文檔.
還有一點(diǎn), 每個RDD都是只讀的. 這是什么意思呢? 就是說, RDD是不可變的, 一經(jīng)創(chuàng)建不論什么時候讀, 在什么地方讀, 結(jié)果都是一樣的. 那問題來了, 既然RDD是只讀的, 那它做的那些map, filter到底在干什么呢? 上文也提到了一點(diǎn)RDD上的transformation是在構(gòu)建執(zhí)行計劃; 另外一點(diǎn)是, 建立RDD之間的依賴關(guān)系. (這兩點(diǎn)是緊密聯(lián)系的, 這個會在后續(xù)分析Spark內(nèi)部執(zhí)行流程的文章會提到, 現(xiàn)在大概知道這么回事即可.) 我們可以使用RDD提供的toDebugString
查看RDD之間的依賴關(guān)系. 下邊給出了WordCount
示例所產(chǎn)生的RDD依賴關(guān)系:
(1) MapPartitionsRDD[5] at map at WordCount.scala:22 []
| MapPartitionsRDD[4] at map at WordCount.scala:21 []
| MapPartitionsRDD[1] at textFile at WordCount.scala:16 []
| /home/focusj/workspace/scala/SparkTour/src/main/scala/lfda/core/WordCount.scala HadoopRDD[0] at textFile at WordCount.scala:16 []
那RDD是如何做到容錯的呢? 假設(shè)一個任務(wù)在執(zhí)行的過程中, 集群中一個節(jié)點(diǎn)宕機(jī), 在該機(jī)器上運(yùn)行的任務(wù)和數(shù)據(jù)全部丟失. 這時Spark會立即通知其它節(jié)點(diǎn)重新執(zhí)行該任務(wù). 因為RDD內(nèi)部記錄了足夠的信息去恢復(fù)這個任務(wù). 這些信息包括RDD之間的依賴關(guān)系和執(zhí)行計劃. 所以, RDD是容錯的.
到此, 把RDD的基本概念說完了, 下篇會著重解釋Spark內(nèi)部是如何工作的. 從一個RDD Action API的調(diào)用開始, 到最終結(jié)果輸出, Spark都會作哪些工作.