Spark源碼分析(1) RDD是什么

RDD是Spark的基礎(chǔ)修赞,是對(duì)大數(shù)據(jù)的抽象婶恼,所以先破解Spark,首先從RDD開(kāi)始柏副。

  • RDD 是什么勾邦?有什么特點(diǎn)?
  • RDD 包含什么割择?
  • RDD 能做什么眷篇?

RDD 的注釋

org.apache.spark.rdd.RDD 類源代碼中有詳細(xì)的注釋:

  • A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
    翻譯:彈性的 分布式 數(shù)據(jù)集是 Spark 基礎(chǔ)的抽象。
    解釋:彈性的(可復(fù)原的)锨推,說(shuō)明數(shù)據(jù)集具有容錯(cuò)性铅歼、可修復(fù)性。
    分布式换可,說(shuō)明數(shù)據(jù)集可以分布在不同的機(jī)器上

  • Represents an immutable, partitioned collection of elements that can be operated on in parallel.
    翻譯:RDD 是不可變的 分區(qū)的 可并行處理的 元素集合
    解釋:不可變的椎椰,這和 Scala 的設(shè)計(jì)理念相同,數(shù)據(jù)集一旦構(gòu)建完成沾鳄,就不能再修改慨飘,這樣能輕松解決多個(gè)線程讀數(shù)據(jù)的一致性問(wèn)題。
    分區(qū)的=可并行處理的=分布式

  • This class contains the basic operations available on all RDDs, such as map, filter, and persist.
    翻譯:這個(gè)抽象類包含了所有 RDD 都應(yīng)該有的基本操作,比如 map 瓤的、filter 休弃、persist
    解釋:這三個(gè)操作分別是:批量轉(zhuǎn)換、篩選圈膏、持久化

  • In addition, [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value
    pairs, such as groupByKey and join;
    翻譯:另外 PairRDDFunctions 對(duì)象中包含了 鍵值對(duì)型(KV型) RDD 的操作塔猾,例如 groupByKeyjoin
    解釋:KV 型可以支持按 Key 分組稽坤、關(guān)聯(lián)等操作

  • [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of
    Doubles;
    翻譯:DoubleRDDFunctions提供可 double 數(shù)據(jù)集的操作丈甸;
    解釋:數(shù)值型數(shù)據(jù)集有求和、平均尿褪、分布圖等統(tǒng)計(jì)性操作

  • and [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that
    can be saved as SequenceFiles.
    翻譯:SequenceFileRDDFunctions 提供了順序存儲(chǔ)操作

  • All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)]) through implicit.
    翻譯:所有的的類通過(guò)隱式轉(zhuǎn)換自動(dòng)地用于RDD實(shí)例中
    解釋:RDD 伴生對(duì)象里包含了隱式轉(zhuǎn)換函數(shù)睦擂,用implicit 修飾。隱式轉(zhuǎn)換是 Scala 的語(yǔ)法特性杖玲。

  • Internally, each RDD is characterized by five main properties:

    • A list of partitions
    • A function for computing each split
    • A list of dependencies on other RDDs
    • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
    • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
      翻譯:在 RDD 中顿仇,包含這樣的5個(gè)屬性(也就說(shuō)要實(shí)現(xiàn)抽象方法或給空對(duì)象賦值):
    • 一個(gè)分區(qū)的列表(getPartitions)
    • 一個(gè)用于計(jì)算分區(qū)中數(shù)據(jù)的函數(shù)(compute)
    • 一個(gè)對(duì)其他 RDD 的依賴列表(getDependencies)
    • 可選:KV 型 RDD 應(yīng)該有一個(gè)分區(qū)器,例如 hash-分區(qū)器(partitioner)
    • 可選:分區(qū)數(shù)據(jù)計(jì)算完后優(yōu)先存儲(chǔ)的位置摆马,例如 HDFS 的某個(gè)塊(getPreferredLocations)
  • All of the scheduling and execution in Spark is done based on these methods, allowing each RDD to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for reading data from a new storage system) by overriding these functions.
    翻譯: Spark 中所有的任務(wù)調(diào)度臼闻、任務(wù)執(zhí)行都依賴于這些方法。RDD 可以覆蓋這些方法今膊,實(shí)現(xiàn)有自己的計(jì)算方法些阅。例如從一個(gè)新的存儲(chǔ)系統(tǒng)中讀取數(shù)據(jù)。

  • Please refer to the http://101.96.8.165/people.csail.mit.edu/matei/papers/2012/nsdi_spark.pdf for more details on RDD internals.
    翻譯:更多細(xì)節(jié)斑唬,可以 Spark 的論文

一段示例代碼

是的市埋,我們從HelloWorld開(kāi)始,官方推薦的第一個(gè)程序是計(jì)算π的近似值:

import scala.math.random
import org.apache.spark.sql.SparkSession

object SparkPi {
  def main(args: Array[String]) {
    val spark = SparkSession
      .builder
      .appName("Spark Pi")
      .getOrCreate()
    val slices = if (args.length > 0) args(0).toInt else 2
    val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
    val count = spark.sparkContext.parallelize(1 until n, slices).map { i =>
      val x = random * 2 - 1
      val y = random * 2 - 1
      if (x*x + y*y <= 1) 1 else 0
    }.reduce(_ + _)
    println("Pi is roughly " + 4.0 * count / (n - 1))
    spark.stop()
  }
}

??什么恕刘,RDD 在哪里缤谎?

  1. 獲得 Spark 會(huì)話 和 Spark 上下文
    val spark = SparkSession.builder.appName("xx").getOrCreate()
    這4行,只能算是1行代碼褐着。調(diào)用內(nèi)部的構(gòu)造器產(chǎn)生了一個(gè)Spark 會(huì)話對(duì)象坷澡,賦值給 spark。后面 spark.sparkContext 是獲得 Spark 的上下文對(duì)象含蓉。至于會(huì)話對(duì)象和上下文對(duì)象频敛,以后再分析。

  2. 創(chuàng)建一個(gè) RDD
    spark.sparkContext.parallelize(1 until n, slices)
    SparkContext 對(duì)象中有一個(gè)parallelize 函數(shù)馅扣,創(chuàng)建了一個(gè)RDD對(duì)象斟赚。
    RDD是抽象類。進(jìn)入源碼我們可以看到 創(chuàng)建的 RDD 是 ParallelCollectionRDD(字面翻譯為并行容器 RDD)差油。這個(gè)RDD是最簡(jiǎn)單的RDD了拗军。如果是我任洞,我會(huì)將它命名為SimpleRDD。
    這句話創(chuàng)建了一個(gè)包含slices個(gè)分區(qū)的 RDD发侵,RDD 的內(nèi)容是1到 n交掏,這 n+1 個(gè)數(shù)。數(shù)據(jù)存在內(nèi)存中刃鳄,從內(nèi)存讀分區(qū)的數(shù)據(jù)盅弛。

  3. 看看這個(gè) RDD 中的細(xì)節(jié)
    還記得前一節(jié)翻譯的文字嗎?RDD 應(yīng)該實(shí)現(xiàn)5個(gè)方法铲汪。這個(gè)并行容器 RDD 是怎么實(shí)現(xiàn)的呢熊尉?

    • 一個(gè)分區(qū)的列表
      將數(shù)據(jù)分成slices份,放在slices個(gè)容器中掌腰。每個(gè)容器就是一個(gè)分區(qū),所有容器構(gòu)成了分區(qū)列表
    • 一個(gè)用于計(jì)算分區(qū)中數(shù)據(jù)的函數(shù)
      什么都沒(méi)做张吉,返回分區(qū)的迭代器
    • 一個(gè)依賴列表
      依賴列表為Nil空列表齿梁。即,這個(gè) RDD 不依賴別的 RDD
    • 一個(gè)分區(qū)器
      不是 KV 型的肮蛹,不需要
    • 一個(gè)運(yùn)算存儲(chǔ)優(yōu)先位置
      SparkContext傳入了一個(gè) Map勺择,Map 有slices個(gè)key,對(duì)應(yīng)slices個(gè)容器伦忠∈『耍可見(jiàn),SparkContext希望結(jié)果存在內(nèi)存中昆码。
  4. map
    map是將分組中每一個(gè)元素映射成另一個(gè)元素的操作气忠。我們說(shuō)過(guò),RDD是不可變的赋咽,map這個(gè)操作產(chǎn)生新MapPartitionsRDD對(duì)象旧噪。
    那MapPartitionsRDD的5個(gè)方法呢?

    • 一個(gè)依賴列表:只依賴于上游的 RDD脓匿,本例中依賴于上游的ParallelCollectionRDD淘钟。
    • 一個(gè)分區(qū)列表:就是上游分區(qū)列表,直接讀取上游數(shù)據(jù)
    • 一個(gè)計(jì)算:計(jì)算過(guò)程就是“映射關(guān)系”陪毡,由外部傳入一個(gè)函數(shù)對(duì)象表達(dá)映射關(guān)系
    • 一個(gè)分區(qū)器:上游 RDD 的分區(qū)器米母,直接讀上游的分區(qū)
    • 一個(gè)優(yōu)先存儲(chǔ)位置:上游 RDD 的優(yōu)先位置,本例中直接寫(xiě)到SparkContext傳入的 Map
  5. reduce
    reduce 也是一個(gè)操作毡琉,是多對(duì)一的聚合操作铁瞒,聚合前后類型必須一致。本例中是求和操作绊起。
    過(guò)程可以簡(jiǎn)述成精拟,先計(jì)算每個(gè)分區(qū)的聚合結(jié)果,再將多個(gè)分區(qū)的結(jié)果再聚合。過(guò)程比較復(fù)雜蜂绎,以后再深入栅表。

  6. 如何計(jì)算π?
    random 取隨機(jī)數(shù)师枣,范圍是 [0, 1)怪瓶,那么x 和 y 是 [-1, 1)范圍內(nèi)的隨機(jī)數(shù)。
    計(jì)算xx+yy践美,這是點(diǎn)(x, y)到(0, 0) 的距離洗贰,當(dāng)距離不大1(點(diǎn)落在r=1的圓內(nèi))時(shí),取1陨倡,否則取0敛滋。那么隨機(jī)取 N 個(gè)點(diǎn),點(diǎn)落圓內(nèi)的幾率等于圓的面積/邊長(zhǎng)為2的正方形的面積兴革。所以:
    圓的面積 ≌ 正方形面積 * 落在圓內(nèi)的點(diǎn)數(shù) / 所有的點(diǎn)數(shù)
    圓的面積=π绎晃,正方形面積=4
    根據(jù)大數(shù)定理和中心極限定理,取的點(diǎn)越多杂曲,π的估值越近似于正態(tài)分布庶艾;取得的點(diǎn)越多,正態(tài)分布的標(biāo)準(zhǔn)差越星婵薄咱揍;取得的點(diǎn)越多,正態(tài)分布的均值越接近π的真值棚饵。所以煤裙,隨著取點(diǎn)的增加,π估值約精確蟹地。


Scala 語(yǔ)法

1 until n 用到了三個(gè) Scala 語(yǔ)法:

  1. 一切皆對(duì)象
    在 Java 中积暖,1會(huì)被認(rèn)為是一個(gè)基本類型int,可以裝包成對(duì)象怪与,在 Scala 中夺刑,1 就是一個(gè)對(duì)象。
  2. 隱式轉(zhuǎn)換
    util是調(diào)用 RichInt.util 方法分别。Int 轉(zhuǎn)換成 RichInt 是隱式的遍愿,定義在 scala.Predef對(duì)象中的intWrapper方法。scala.Predef類似于宏耘斩。
    參考: scala source implicit conversion from Int to RichInt - Stack Overflow
  3. 函數(shù)調(diào)用的寫(xiě)法
    1 until n等價(jià)于1.until(n)沼填,也就是說(shuō),如果對(duì)象方法若只有一個(gè)參數(shù)括授,可以省略掉點(diǎn)和括號(hào)坞笙,這樣代碼更接近自然語(yǔ)言岩饼。

OK,那么1 until n 這句話寫(xiě)全了應(yīng)該是什么樣的呢薛夜?
答:scala.this.Predef.intWrapper(1).until(n);


疑問(wèn)列表

我將閱讀過(guò)程中的未解內(nèi)容記錄下來(lái)籍茧,留待以后閱讀代碼時(shí)解答。疑問(wèn)一個(gè)一個(gè)劃掉梯澜,就是成長(zhǎng)的過(guò)程寞冯。

  1. reduce 等 RDD 操作是如何執(zhí)行的?

總結(jié)

  1. RDD 是數(shù)據(jù)集
  2. RDD 的特點(diǎn)是有彈性晚伙、分布式吮龄、不可變。
  3. RDD應(yīng)該包含5個(gè)部分:一個(gè)分區(qū)集咆疗、一個(gè)依賴集漓帚、一個(gè)運(yùn)算、[一個(gè)分區(qū)器民傻、一個(gè)優(yōu)先結(jié)果存儲(chǔ)位置]胰默。
  4. RDD 有一系列的操作,包括映射漓踢、過(guò)濾、聚合漏隐、存儲(chǔ)等喧半。

本文源碼

RDD spark/core/RDD/RDD.scala at master · apache/spark · GitHub
map spark/core/RDD/MapPartitionsRDD at master · apache/spark · GitHub
計(jì)算π spark/examples/SparkPi.scala at master · apache/spark · GitHub


@ Kangying Village, Beijing, China

Spark源碼/RDD

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市青责,隨后出現(xiàn)的幾起案子挺据,更是在濱河造成了極大的恐慌,老刑警劉巖脖隶,帶你破解...
    沈念sama閱讀 211,290評(píng)論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件扁耐,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡产阱,警方通過(guò)查閱死者的電腦和手機(jī)婉称,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,107評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)构蹬,“玉大人王暗,你說(shuō)我怎么就攤上這事∽玻” “怎么了俗壹?”我有些...
    開(kāi)封第一講書(shū)人閱讀 156,872評(píng)論 0 347
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)藻烤。 經(jīng)常有香客問(wèn)我绷雏,道長(zhǎng)头滔,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,415評(píng)論 1 283
  • 正文 為了忘掉前任涎显,我火速辦了婚禮坤检,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘棺禾。我一直安慰自己缀蹄,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,453評(píng)論 6 385
  • 文/花漫 我一把揭開(kāi)白布膘婶。 她就那樣靜靜地躺著缺前,像睡著了一般。 火紅的嫁衣襯著肌膚如雪悬襟。 梳的紋絲不亂的頭發(fā)上衅码,一...
    開(kāi)封第一講書(shū)人閱讀 49,784評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音脊岳,去河邊找鬼逝段。 笑死,一個(gè)胖子當(dāng)著我的面吹牛割捅,可吹牛的內(nèi)容都是我干的奶躯。 我是一名探鬼主播,決...
    沈念sama閱讀 38,927評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼亿驾,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼嘹黔!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起莫瞬,我...
    開(kāi)封第一講書(shū)人閱讀 37,691評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤儡蔓,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后疼邀,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體喂江,經(jīng)...
    沈念sama閱讀 44,137評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,472評(píng)論 2 326
  • 正文 我和宋清朗相戀三年旁振,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了获询。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,622評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡规求,死狀恐怖筐付,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情阻肿,我是刑警寧澤瓦戚,帶...
    沈念sama閱讀 34,289評(píng)論 4 329
  • 正文 年R本政府宣布,位于F島的核電站丛塌,受9級(jí)特大地震影響较解,放射性物質(zhì)發(fā)生泄漏畜疾。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,887評(píng)論 3 312
  • 文/蒙蒙 一印衔、第九天 我趴在偏房一處隱蔽的房頂上張望啡捶。 院中可真熱鬧,春花似錦奸焙、人聲如沸瞎暑。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,741評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)了赌。三九已至,卻和暖如春玄糟,著一層夾襖步出監(jiān)牢的瞬間勿她,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,977評(píng)論 1 265
  • 我被黑心中介騙來(lái)泰國(guó)打工阵翎, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留逢并,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,316評(píng)論 2 360
  • 正文 我出身青樓郭卫,卻偏偏與公主長(zhǎng)得像砍聊,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子贰军,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,490評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容