RDD是Spark的基礎(chǔ)修赞,是對(duì)大數(shù)據(jù)的抽象婶恼,所以先破解Spark,首先從RDD開(kāi)始柏副。
- RDD 是什么勾邦?有什么特點(diǎn)?
- RDD 包含什么割择?
- RDD 能做什么眷篇?
RDD 的注釋
org.apache.spark.rdd.RDD
類源代碼中有詳細(xì)的注釋:
A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
翻譯:彈性的 分布式 數(shù)據(jù)集是 Spark 基礎(chǔ)的抽象。
解釋:彈性的(可復(fù)原的)锨推,說(shuō)明數(shù)據(jù)集具有容錯(cuò)性铅歼、可修復(fù)性。
分布式换可,說(shuō)明數(shù)據(jù)集可以分布在不同的機(jī)器上Represents an immutable, partitioned collection of elements that can be operated on in parallel.
翻譯:RDD 是不可變的 分區(qū)的 可并行處理的 元素集合
解釋:不可變的椎椰,這和 Scala 的設(shè)計(jì)理念相同,數(shù)據(jù)集一旦構(gòu)建完成沾鳄,就不能再修改慨飘,這樣能輕松解決多個(gè)線程讀數(shù)據(jù)的一致性問(wèn)題。
分區(qū)的=可并行處理的=分布式This class contains the basic operations available on all RDDs, such as
map
,filter
, andpersist
.
翻譯:這個(gè)抽象類包含了所有 RDD 都應(yīng)該有的基本操作,比如map
瓤的、filter
休弃、persist
等
解釋:這三個(gè)操作分別是:批量轉(zhuǎn)換、篩選圈膏、持久化In addition, [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value
pairs, such asgroupByKey
andjoin
;
翻譯:另外PairRDDFunctions
對(duì)象中包含了 鍵值對(duì)型(KV型) RDD 的操作塔猾,例如groupByKey
和join
;
解釋:KV 型可以支持按 Key 分組稽坤、關(guān)聯(lián)等操作[[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of
Doubles;
翻譯:DoubleRDDFunctions
提供可 double 數(shù)據(jù)集的操作丈甸;
解釋:數(shù)值型數(shù)據(jù)集有求和、平均尿褪、分布圖等統(tǒng)計(jì)性操作and [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that
can be saved as SequenceFiles.
翻譯:SequenceFileRDDFunctions
提供了順序存儲(chǔ)操作All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)]) through implicit.
翻譯:所有的的類通過(guò)隱式轉(zhuǎn)換自動(dòng)地用于RDD實(shí)例中
解釋:RDD 伴生對(duì)象里包含了隱式轉(zhuǎn)換函數(shù)睦擂,用implicit
修飾。隱式轉(zhuǎn)換是 Scala 的語(yǔ)法特性杖玲。-
Internally, each RDD is characterized by five main properties:
- A list of partitions
- A function for computing each split
- A list of dependencies on other RDDs
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
- Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
翻譯:在 RDD 中顿仇,包含這樣的5個(gè)屬性(也就說(shuō)要實(shí)現(xiàn)抽象方法或給空對(duì)象賦值):
- 一個(gè)分區(qū)的列表(getPartitions)
- 一個(gè)用于計(jì)算分區(qū)中數(shù)據(jù)的函數(shù)(compute)
- 一個(gè)對(duì)其他 RDD 的依賴列表(getDependencies)
- 可選:KV 型 RDD 應(yīng)該有一個(gè)分區(qū)器,例如 hash-分區(qū)器(partitioner)
- 可選:分區(qū)數(shù)據(jù)計(jì)算完后優(yōu)先存儲(chǔ)的位置摆马,例如 HDFS 的某個(gè)塊(getPreferredLocations)
All of the scheduling and execution in Spark is done based on these methods, allowing each RDD to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for reading data from a new storage system) by overriding these functions.
翻譯: Spark 中所有的任務(wù)調(diào)度臼闻、任務(wù)執(zhí)行都依賴于這些方法。RDD 可以覆蓋這些方法今膊,實(shí)現(xiàn)有自己的計(jì)算方法些阅。例如從一個(gè)新的存儲(chǔ)系統(tǒng)中讀取數(shù)據(jù)。Please refer to the http://101.96.8.165/people.csail.mit.edu/matei/papers/2012/nsdi_spark.pdf for more details on RDD internals.
翻譯:更多細(xì)節(jié)斑唬,可以 Spark 的論文
一段示例代碼
是的市埋,我們從HelloWorld開(kāi)始,官方推薦的第一個(gè)程序是計(jì)算π的近似值:
import scala.math.random
import org.apache.spark.sql.SparkSession
object SparkPi {
def main(args: Array[String]) {
val spark = SparkSession
.builder
.appName("Spark Pi")
.getOrCreate()
val slices = if (args.length > 0) args(0).toInt else 2
val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
val count = spark.sparkContext.parallelize(1 until n, slices).map { i =>
val x = random * 2 - 1
val y = random * 2 - 1
if (x*x + y*y <= 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / (n - 1))
spark.stop()
}
}
??什么恕刘,RDD 在哪里缤谎?
獲得 Spark 會(huì)話 和 Spark 上下文
val spark = SparkSession.builder.appName("xx").getOrCreate()
這4行,只能算是1行代碼褐着。調(diào)用內(nèi)部的構(gòu)造器產(chǎn)生了一個(gè)Spark會(huì)話對(duì)象
坷澡,賦值給spark
。后面spark.sparkContext
是獲得 Spark 的上下文對(duì)象
含蓉。至于會(huì)話對(duì)象和上下文對(duì)象频敛,以后再分析。創(chuàng)建一個(gè) RDD
spark.sparkContext.parallelize(1 until n, slices)
SparkContext 對(duì)象中有一個(gè)parallelize
函數(shù)馅扣,創(chuàng)建了一個(gè)RDD對(duì)象斟赚。
RDD是抽象類。進(jìn)入源碼我們可以看到 創(chuàng)建的 RDD 是ParallelCollectionRDD
(字面翻譯為并行容器 RDD
)差油。這個(gè)RDD是最簡(jiǎn)單的RDD了拗军。如果是我任洞,我會(huì)將它命名為SimpleRDD。
這句話創(chuàng)建了一個(gè)包含slices個(gè)分區(qū)的 RDD发侵,RDD 的內(nèi)容是1到 n交掏,這 n+1 個(gè)數(shù)。數(shù)據(jù)存在內(nèi)存中刃鳄,從內(nèi)存讀分區(qū)的數(shù)據(jù)盅弛。-
看看這個(gè) RDD 中的細(xì)節(jié)
還記得前一節(jié)翻譯的文字嗎?RDD 應(yīng)該實(shí)現(xiàn)5個(gè)方法铲汪。這個(gè)并行容器 RDD 是怎么實(shí)現(xiàn)的呢熊尉?- 一個(gè)分區(qū)的列表
將數(shù)據(jù)分成slices份,放在slices個(gè)容器中掌腰。每個(gè)容器就是一個(gè)分區(qū),所有容器構(gòu)成了分區(qū)列表 - 一個(gè)用于計(jì)算分區(qū)中數(shù)據(jù)的函數(shù)
什么都沒(méi)做张吉,返回分區(qū)的迭代器 - 一個(gè)依賴列表
依賴列表為Nil
空列表齿梁。即,這個(gè) RDD 不依賴別的 RDD - 一個(gè)分區(qū)器
不是 KV 型的肮蛹,不需要 - 一個(gè)運(yùn)算存儲(chǔ)優(yōu)先位置
SparkContext傳入了一個(gè) Map勺择,Map 有slices個(gè)key,對(duì)應(yīng)slices個(gè)容器伦忠∈『耍可見(jiàn),SparkContext希望結(jié)果存在內(nèi)存中昆码。
- 一個(gè)分區(qū)的列表
-
map
map是將分組中每一個(gè)元素映射成另一個(gè)元素的操作气忠。我們說(shuō)過(guò),RDD是不可變的赋咽,map這個(gè)操作產(chǎn)生新MapPartitionsRDD對(duì)象旧噪。
那MapPartitionsRDD的5個(gè)方法呢?- 一個(gè)依賴列表:只依賴于上游的 RDD脓匿,本例中依賴于上游的ParallelCollectionRDD淘钟。
- 一個(gè)分區(qū)列表:就是上游分區(qū)列表,直接讀取上游數(shù)據(jù)
- 一個(gè)計(jì)算:計(jì)算過(guò)程就是“映射關(guān)系”陪毡,由外部傳入一個(gè)函數(shù)對(duì)象表達(dá)映射關(guān)系
- 一個(gè)分區(qū)器:上游 RDD 的分區(qū)器米母,直接讀上游的分區(qū)
- 一個(gè)優(yōu)先存儲(chǔ)位置:上游 RDD 的優(yōu)先位置,本例中直接寫(xiě)到SparkContext傳入的 Map
reduce
reduce 也是一個(gè)操作毡琉,是多對(duì)一的聚合操作铁瞒,聚合前后類型必須一致。本例中是求和操作绊起。
過(guò)程可以簡(jiǎn)述成精拟,先計(jì)算每個(gè)分區(qū)的聚合結(jié)果,再將多個(gè)分區(qū)的結(jié)果再聚合。過(guò)程比較復(fù)雜蜂绎,以后再深入栅表。如何計(jì)算π?
random 取隨機(jī)數(shù)师枣,范圍是 [0, 1)怪瓶,那么x 和 y 是 [-1, 1)范圍內(nèi)的隨機(jī)數(shù)。
計(jì)算xx+yy践美,這是點(diǎn)(x, y)到(0, 0) 的距離洗贰,當(dāng)距離不大1(點(diǎn)落在r=1的圓內(nèi))時(shí),取1陨倡,否則取0敛滋。那么隨機(jī)取 N 個(gè)點(diǎn),點(diǎn)落圓內(nèi)的幾率等于圓的面積/邊長(zhǎng)為2的正方形的面積兴革。所以:
圓的面積 ≌ 正方形面積 * 落在圓內(nèi)的點(diǎn)數(shù) / 所有的點(diǎn)數(shù)
圓的面積=π绎晃,正方形面積=4
根據(jù)大數(shù)定理和中心極限定理,取的點(diǎn)越多杂曲,π的估值越近似于正態(tài)分布庶艾;取得的點(diǎn)越多,正態(tài)分布的標(biāo)準(zhǔn)差越星婵薄咱揍;取得的點(diǎn)越多,正態(tài)分布的均值越接近π的真值棚饵。所以煤裙,隨著取點(diǎn)的增加,π估值約精確蟹地。
Scala 語(yǔ)法
1 until n
用到了三個(gè) Scala 語(yǔ)法:
- 一切皆對(duì)象
在 Java 中积暖,1
會(huì)被認(rèn)為是一個(gè)基本類型int,可以裝包成對(duì)象怪与,在 Scala 中夺刑,1
就是一個(gè)對(duì)象。 - 隱式轉(zhuǎn)換
util
是調(diào)用RichInt.util
方法分别。Int 轉(zhuǎn)換成 RichInt 是隱式的遍愿,定義在scala.Predef
對(duì)象中的intWrapper
方法。scala.Predef
類似于宏耘斩。
參考: scala source implicit conversion from Int to RichInt - Stack Overflow - 函數(shù)調(diào)用的寫(xiě)法
1 until n
等價(jià)于1.until(n)
沼填,也就是說(shuō),如果對(duì)象方法若只有一個(gè)參數(shù)括授,可以省略掉點(diǎn)和括號(hào)坞笙,這樣代碼更接近自然語(yǔ)言岩饼。
OK,那么1 until n
這句話寫(xiě)全了應(yīng)該是什么樣的呢薛夜?
答:scala.this.Predef.intWrapper(1).until(n);
疑問(wèn)列表
我將閱讀過(guò)程中的未解內(nèi)容記錄下來(lái)籍茧,留待以后閱讀代碼時(shí)解答。疑問(wèn)一個(gè)一個(gè)劃掉梯澜,就是成長(zhǎng)的過(guò)程寞冯。
- reduce 等 RDD 操作是如何執(zhí)行的?
總結(jié)
- RDD 是數(shù)據(jù)集
- RDD 的特點(diǎn)是有彈性晚伙、分布式吮龄、不可變。
- RDD應(yīng)該包含5個(gè)部分:一個(gè)分區(qū)集咆疗、一個(gè)依賴集漓帚、一個(gè)運(yùn)算、[一個(gè)分區(qū)器民傻、一個(gè)優(yōu)先結(jié)果存儲(chǔ)位置]胰默。
- RDD 有一系列的操作,包括映射漓踢、過(guò)濾、聚合漏隐、存儲(chǔ)等喧半。
本文源碼
RDD spark/core/RDD/RDD.scala at master · apache/spark · GitHub
map spark/core/RDD/MapPartitionsRDD at master · apache/spark · GitHub
計(jì)算π spark/examples/SparkPi.scala at master · apache/spark · GitHub