先只做一個靜態(tài)的分析座韵,即上圖最左面部份公壤,動態(tài)調度執(zhí)行稍后分析
概念理解
做為新手,RDD看的我頭痛商模,Resilient Distributed Dataset, ?彈性分布式數(shù)據(jù)集奠旺,有哪些特點呢蜘澜?
1. 首先他是 Dataset, 俗稱數(shù)據(jù)集∠炀危可以類比 Redis 里的 ZSET, HSET, SET, 保存數(shù)據(jù)的一種組織結構而已鄙信。區(qū)別就在于 RDD 是分布式,粗粒度
2. 關于分布式忿晕,大家所熟知的就是將數(shù)據(jù)分片装诡。類比 MySQL 分庫分表,可以有按 ID 做 Range 分怎践盼,也可以按 Hash鸦采。RDD 也同樣,具體取決于 partitioner 如何實現(xiàn)咕幻。
3. 具有 fault tolerance 特性渔伯,做個對比,關系數(shù)據(jù)庫一般都是對分區(qū) partition 做多副本來做到容災和高可用谅河。但是RDD 完全另外一個思路咱旱,他有一個 lineage(血統(tǒng)?绷耍?) 的概念吐限,每一個 partition 都可以回溯來重建。
4. 由于 RDD 只讀褂始,每一個 RDD 都由父 RDD 和做用之上的操作生成诸典,父子 partition 涉及一一對應(窄依賴)和一對多(寬依賴),而這種依賴關系就是構成 fault tolerance 回溯的基礎崎苗。
上面只是對于 RDD 靜態(tài)的分析狐粱,至于動態(tài)的調度執(zhí)行和 stage 劃分下次再分享。
RDD 五要素
1. A list of partitions ? 數(shù)據(jù)分區(qū)
2. A function for computing each split 每個片的處理函數(shù)
3. A list of dependencies on other RDDs 依賴
4. Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) 分區(qū)函數(shù)很重要胆数,對于join group 優(yōu)化
5. Optionally, a list of preferred locations to compute each split on (e.g. block locations for?an HDFS file)
例子: 行數(shù)統(tǒng)計
來看一個官網(wǎng) quick start 例子肌蜻,打開 spark-shell
scala> val textFile = sc.textFile("README.md")
textFile: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at:27
textFile 是一個 MapPartitionsRDD, 他沒有 Dependency, 并且也不會讀取數(shù)據(jù),不做任何操作
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at:29
linesWithSpark 同樣也是一個 MapPartitionsRDD, 他有 Dependency, 就是上文的 textFile, 并且附加一個 filter 操作必尼,返回包含 “Spark” 的記錄
scala> linesWithSpark.count
res0: Long = 17
最后執(zhí)行 count 函數(shù)蒋搜,得到記錄 17?
transformation 和 action
RDD 只讀,所以每次 filter, map, flatMap 等操作都是生成一個新的 RDD, 多個 RDD 為鏈式關系判莉,由 Denpendency 和 compute 聯(lián)系在一起豆挽。下圖是主要的兩類函數(shù)。
transformation: 惰性的操作券盅,只生成新的 RDD, 描述執(zhí)行邏輯
action: 會提交 job, 交給 worker 去執(zhí)行帮哈,流式依次計算所有 transformation 操作,流式的精髓在于 Iterator
對應文中例子锰镀,textFile和filter 都是 transformation 操作娘侍,只負責生成 RDD, 只有最后一個 count 才發(fā)起 job 執(zhí)行咖刃。
filter 函數(shù)簡單明了,直接生成一個新的 MapPartitionsRDD, 重點在于兩個參數(shù) this 和 iter 操作的閉包
MapPartitionsRDD 要重寫幾個關鍵成員函數(shù)私蕾,partitioner 決定如何做分區(qū)僵缺,getPartitions 決定如何從父 RDD 中獲取自已使用的 partition,這塊是簡單的 one to one依賴踩叭,即窄依賴,后續(xù)會涉及到 shuffle 寬依賴再分析翠胰。compute 封裝傳弟的 f 閉包容贝,直接作用于 partition, 這里都是針對 Iterator, 不到最后觸發(fā)是不會執(zhí)行。
count不具有代表現(xiàn)之景,貼一個 reduce 源碼, 最常見的就是 reduce(_+_)斤富,如果大家了解單機的,肯定知道原理锻狗,對于分布式的RDD也一樣满力。 jobResult 是一個 Option[T] 結果,mergeResult 決定如何對各個分區(qū)的結果進行操作轻纪,就是調用 reduce(f: (T,T) => T): T 里面的閉包f油额,對于各分區(qū)也執(zhí)行 f。
RDD 的初探刻帚,DAGSchdule 后文分析潦嘶,如有理解有誤的請大家指證