上萬字詳解Spark Core(建議收藏)

??先來一個(gè)問題纺弊,也是面試中常問的:

Spark為什么會(huì)流行?

原因1:優(yōu)秀的數(shù)據(jù)模型和豐富計(jì)算抽象

Spark 產(chǎn)生之前逸绎,已經(jīng)有MapReduce這類非常成熟的計(jì)算系統(tǒng)存在了惹恃,并提供了高層次的API(map/reduce),把計(jì)算運(yùn)行在集群中并提供容錯(cuò)能力棺牧,從而實(shí)現(xiàn)分布式計(jì)算巫糙。

雖然MapReduce提供了對(duì)數(shù)據(jù)訪問和計(jì)算的抽象,但是對(duì)于數(shù)據(jù)的復(fù)用就是簡單的將中間數(shù)據(jù)寫到一個(gè)穩(wěn)定的文件系統(tǒng)中(例如HDFS)颊乘,所以會(huì)產(chǎn)生數(shù)據(jù)的復(fù)制備份参淹,磁盤的I/O以及數(shù)據(jù)的序列化,所以在遇到需要在多個(gè)計(jì)算之間復(fù)用中間結(jié)果的操作時(shí)效率就會(huì)非常的低乏悄。而這類操作是非常常見的浙值,例如迭代式計(jì)算,交互式數(shù)據(jù)挖掘檩小,圖計(jì)算等开呐。

認(rèn)識(shí)到這個(gè)問題后,學(xué)術(shù)界的 AMPLab 提出了一個(gè)新的模型规求,叫做 RDD筐付。RDD 是一個(gè)可以容錯(cuò)且并行的數(shù)據(jù)結(jié)構(gòu)(其實(shí)可以理解成分布式的集合,操作起來和操作本地集合一樣簡單)阻肿,它可以讓用戶顯式的將中間結(jié)果數(shù)據(jù)集保存在內(nèi)存中瓦戚,并且通過控制數(shù)據(jù)集的分區(qū)來達(dá)到數(shù)據(jù)存放處理最優(yōu)化.同時(shí) RDD也提供了豐富的 API (map、reduce冕茅、filter伤极、foreach、redeceByKey...)來操作數(shù)據(jù)集姨伤。后來 RDD被 AMPLab 在一個(gè)叫做 Spark 的框架中提供并開源哨坪。

簡而言之,Spark 借鑒了 MapReduce 思想發(fā)展而來乍楚,保留了其分布式并行計(jì)算的優(yōu)點(diǎn)并改進(jìn)了其明顯的缺陷当编。讓中間數(shù)據(jù)存儲(chǔ)在內(nèi)存中提高了運(yùn)行速度、并提供豐富的操作數(shù)據(jù)的API提高了開發(fā)速度徒溪。

原因2:完善的生態(tài)圈-fullstack

目前忿偷,Spark已經(jīng)發(fā)展成為一個(gè)包含多個(gè)子項(xiàng)目的集合,其中包含SparkSQL臊泌、Spark Streaming鲤桥、GraphX、MLlib等子項(xiàng)目。

Spark Core:實(shí)現(xiàn)了 Spark 的基本功能,包含RDD、任務(wù)調(diào)度杠园、內(nèi)存管理贮喧、錯(cuò)誤恢復(fù)筒狠、與存儲(chǔ)系統(tǒng)交互等模塊。

Spark SQL:Spark 用來操作結(jié)構(gòu)化數(shù)據(jù)的程序包箱沦。通過 Spark SQL辩恼,我們可以使用 SQL操作數(shù)據(jù)。

Spark Streaming:Spark 提供的對(duì)實(shí)時(shí)數(shù)據(jù)進(jìn)行流式計(jì)算的組件谓形。提供了用來操作數(shù)據(jù)流的 API灶伊。

Spark MLlib:提供常見的機(jī)器學(xué)習(xí)(ML)功能的程序庫。包括分類寒跳、回歸谁帕、聚類、協(xié)同過濾等冯袍,還提供了模型評(píng)估、數(shù)據(jù)導(dǎo)入等額外的支持功能碾牌。

GraphX(圖計(jì)算):Spark中用于圖計(jì)算的API康愤,性能良好,擁有豐富的功能和運(yùn)算符舶吗,能在海量數(shù)據(jù)上自如地運(yùn)行復(fù)雜的圖算法征冷。

集群管理器:Spark 設(shè)計(jì)為可以高效地在一個(gè)計(jì)算節(jié)點(diǎn)到數(shù)千個(gè)計(jì)算節(jié)點(diǎn)之間伸縮計(jì)算。

StructuredStreaming:處理結(jié)構(gòu)化流,統(tǒng)一了離線和實(shí)時(shí)的API誓琼。

Spark VS Hadoop

Hadoop Spark
類型 基礎(chǔ)平臺(tái), 包含計(jì)算, 存儲(chǔ), 調(diào)度 分布式計(jì)算工具
場景 大規(guī)模數(shù)據(jù)集上的批處理 迭代計(jì)算, 交互式計(jì)算, 流計(jì)算
價(jià)格 對(duì)機(jī)器要求低, 便宜 對(duì)內(nèi)存有要求, 相對(duì)較貴
編程范式 Map+Reduce, API 較為底層, 算法適應(yīng)性差 RDD組成DAG有向無環(huán)圖, API 較為頂層, 方便使用
數(shù)據(jù)存儲(chǔ)結(jié)構(gòu) MapReduce中間計(jì)算結(jié)果存在HDFS磁盤上, 延遲大 RDD中間運(yùn)算結(jié)果存在內(nèi)存中 , 延遲小
運(yùn)行方式 Task以進(jìn)程方式維護(hù), 任務(wù)啟動(dòng)慢 Task以線程方式維護(hù), 任務(wù)啟動(dòng)快

??注意:
盡管Spark相對(duì)于Hadoop而言具有較大優(yōu)勢检激,但Spark并不能完全替代Hadoop,Spark主要用于替代Hadoop中的MapReduce計(jì)算模型腹侣。存儲(chǔ)依然可以使用HDFS叔收,但是中間結(jié)果可以存放在內(nèi)存中;調(diào)度可以使用Spark內(nèi)置的傲隶,也可以使用更成熟的調(diào)度系統(tǒng)YARN等饺律。
實(shí)際上,Spark已經(jīng)很好地融入了Hadoop生態(tài)圈跺株,并成為其中的重要一員复濒,它可以借助于YARN實(shí)現(xiàn)資源調(diào)度管理,借助于HDFS實(shí)現(xiàn)分布式存儲(chǔ)乒省。
此外巧颈,Hadoop可以使用廉價(jià)的、異構(gòu)的機(jī)器來做分布式存儲(chǔ)與計(jì)算袖扛,但是砸泛,Spark對(duì)硬件的要求稍高一些,對(duì)內(nèi)存與CPU有一定的要求。

Spark Core

一晾嘶、RDD詳解

1. 為什么要有RDD?

在許多迭代式算法(比如機(jī)器學(xué)習(xí)妓雾、圖算法等)和交互式數(shù)據(jù)挖掘中,不同計(jì)算階段之間會(huì)重用中間結(jié)果垒迂,即一個(gè)階段的輸出結(jié)果會(huì)作為下一個(gè)階段的輸入械姻。但是,之前的MapReduce框架采用非循環(huán)式的數(shù)據(jù)流模型机断,把中間結(jié)果寫入到HDFS中楷拳,帶來了大量的數(shù)據(jù)復(fù)制、磁盤IO和序列化開銷吏奸。且這些框架只能支持一些特定的計(jì)算模式(map/reduce)欢揖,并沒有提供一種通用的數(shù)據(jù)抽象。

AMP實(shí)驗(yàn)室發(fā)表的一篇關(guān)于RDD的論文:《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》就是為了解決這些問題的奋蔚。

RDD提供了一個(gè)抽象的數(shù)據(jù)模型她混,讓我們不必?fù)?dān)心底層數(shù)據(jù)的分布式特性,只需將具體的應(yīng)用邏輯表達(dá)為一系列轉(zhuǎn)換操作(函數(shù))泊碑,不同RDD之間的轉(zhuǎn)換操作之間還可以形成依賴關(guān)系坤按,進(jìn)而實(shí)現(xiàn)管道化,從而避免了中間結(jié)果的存儲(chǔ)馒过,大大降低了數(shù)據(jù)復(fù)制臭脓、磁盤IO和序列化開銷,并且還提供了更多的API(map/reduec/filter/groupBy...)腹忽。

2. RDD是什么?

RDD(Resilient Distributed Dataset)叫做彈性分布式數(shù)據(jù)集来累,是Spark中最基本的數(shù)據(jù)抽象,代表一個(gè)不可變窘奏、可分區(qū)嘹锁、里面的元素可并行計(jì)算的集合。
單詞拆解:

  • Resilient :它是彈性的着裹,RDD里面的中的數(shù)據(jù)可以保存在內(nèi)存中或者磁盤里面
  • Distributed :它里面的元素是分布式存儲(chǔ)的兼耀,可以用于分布式計(jì)算
  • Dataset: 它是一個(gè)集合,可以存放很多元素

3. RDD主要屬性

進(jìn)入RDD的源碼中看下:

RDD源碼

在源碼中可以看到有對(duì)RDD介紹的注釋求冷,我們來翻譯下:

  1. A list of partitions
    一組分片(Partition)/一個(gè)分區(qū)(Partition)列表瘤运,即數(shù)據(jù)集的基本組成單位。
    對(duì)于RDD來說匠题,每個(gè)分片都會(huì)被一個(gè)計(jì)算任務(wù)處理拯坟,分片數(shù)決定并行度。
    用戶可以在創(chuàng)建RDD時(shí)指定RDD的分片個(gè)數(shù)韭山,如果沒有指定郁季,那么就會(huì)采用默認(rèn)值冷溃。

  2. A function for computing each split
    一個(gè)函數(shù)會(huì)被作用在每一個(gè)分區(qū)。
    Spark中RDD的計(jì)算是以分片為單位的梦裂,compute函數(shù)會(huì)被作用到每個(gè)分區(qū)上似枕。

  3. A list of dependencies on other RDDs
    一個(gè)RDD會(huì)依賴于其他多個(gè)RDD。
    RDD的每次轉(zhuǎn)換都會(huì)生成一個(gè)新的RDD年柠,所以RDD之間就會(huì)形成類似于流水線一樣的前后依賴關(guān)系凿歼。在部分分區(qū)數(shù)據(jù)丟失時(shí),Spark可以通過這個(gè)依賴關(guān)系重新計(jì)算丟失的分區(qū)數(shù)據(jù)冗恨,而不是對(duì)RDD的所有分區(qū)進(jìn)行重新計(jì)算答憔。(Spark的容錯(cuò)機(jī)制)

  4. Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
    可選項(xiàng),對(duì)于KV類型的RDD會(huì)有一個(gè)Partitioner掀抹,即RDD的分區(qū)函數(shù)虐拓,默認(rèn)為HashPartitioner。

  5. Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
    可選項(xiàng),一個(gè)列表傲武,存儲(chǔ)存取每個(gè)Partition的優(yōu)先位置(preferred location)蓉驹。
    對(duì)于一個(gè)HDFS文件來說,這個(gè)列表保存的就是每個(gè)Partition所在的塊的位置揪利。按照"移動(dòng)數(shù)據(jù)不如移動(dòng)計(jì)算"的理念戒幔,Spark在進(jìn)行任務(wù)調(diào)度的時(shí)候,會(huì)盡可能選擇那些存有數(shù)據(jù)的worker節(jié)點(diǎn)來進(jìn)行任務(wù)計(jì)算土童。

總結(jié)

RDD 是一個(gè)數(shù)據(jù)集的表示,不僅表示了數(shù)據(jù)集工坊,還表示了這個(gè)數(shù)據(jù)集從哪來献汗,如何計(jì)算,主要屬性包括:

  1. 分區(qū)列表
  2. 計(jì)算函數(shù)
  3. 依賴關(guān)系
  4. 分區(qū)函數(shù)(默認(rèn)是hash)
  5. 最佳位置

分區(qū)列表王污、分區(qū)函數(shù)罢吃、最佳位置,這三個(gè)屬性其實(shí)說的就是數(shù)據(jù)集在哪昭齐,在哪計(jì)算更合適尿招,如何分區(qū);
計(jì)算函數(shù)阱驾、依賴關(guān)系就谜,這兩個(gè)屬性其實(shí)說的是數(shù)據(jù)集怎么來的。

二里覆、RDD-API

1. RDD的創(chuàng)建方式

  1. 由外部存儲(chǔ)系統(tǒng)的數(shù)據(jù)集創(chuàng)建丧荐,包括本地的文件系統(tǒng),還有所有Hadoop支持的數(shù)據(jù)集喧枷,比如HDFS虹统、Cassandra弓坞、HBase等:
    val rdd1 = sc.textFile("hdfs://node1:8020/wordcount/input/words.txt")

  2. 通過已有的RDD經(jīng)過算子轉(zhuǎn)換生成新的RDD:
    val rdd2=rdd1.flatMap(_.split(" "))

  3. 由一個(gè)已經(jīng)存在的Scala集合創(chuàng)建:
    val rdd3 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
    或者
    val rdd4 = sc.makeRDD(List(1,2,3,4,5,6,7,8))

makeRDD方法底層調(diào)用了parallelize方法:

RDD源碼

2. RDD的算子分類

RDD的算子分為兩類:

  1. Transformation轉(zhuǎn)換操作:返回一個(gè)新的RDD
  2. Action動(dòng)作操作:返回值不是RDD(無返回值或返回其他的)

??注意:
1、RDD不實(shí)際存儲(chǔ)真正要計(jì)算的數(shù)據(jù)车荔,而是記錄了數(shù)據(jù)的位置在哪里渡冻,數(shù)據(jù)的轉(zhuǎn)換關(guān)系(調(diào)用了什么方法,傳入什么函數(shù))忧便。
2族吻、RDD中的所有轉(zhuǎn)換都是惰性求值/延遲執(zhí)行的,也就是說并不會(huì)直接計(jì)算茬腿。只有當(dāng)發(fā)生一個(gè)要求返回結(jié)果給Driver的Action動(dòng)作時(shí)呼奢,這些轉(zhuǎn)換才會(huì)真正運(yùn)行。
3切平、之所以使用惰性求值/延遲執(zhí)行握础,是因?yàn)檫@樣可以在Action時(shí)對(duì)RDD操作形成DAG有向無環(huán)圖進(jìn)行Stage的劃分和并行優(yōu)化悴品,這種設(shè)計(jì)讓Spark更加有效率地運(yùn)行禀综。

3. Transformation轉(zhuǎn)換算子

轉(zhuǎn)換算子 含義
map(func) 返回一個(gè)新的RDD,該RDD由每一個(gè)輸入元素經(jīng)過func函數(shù)轉(zhuǎn)換后組成
filter(func) 返回一個(gè)新的RDD退子,該RDD由經(jīng)過func函數(shù)計(jì)算后返回值為true的輸入元素組成
flatMap(func) 類似于map寂祥,但是每一個(gè)輸入元素可以被映射為0或多個(gè)輸出元素(所以func應(yīng)該返回一個(gè)序列,而不是單一元素)
mapPartitions(func) 類似于map铛碑,但獨(dú)立地在RDD的每一個(gè)分片上運(yùn)行刹缝,因此在類型為T的RDD上運(yùn)行時(shí)噪奄,func的函數(shù)類型必須是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func) 類似于mapPartitions,但func帶有一個(gè)整數(shù)參數(shù)表示分片的索引值,因此在類型為T的RDD上運(yùn)行時(shí),func的函數(shù)類型必須是(Int, Interator[T]) => Iterator[U]
sample(withReplacement, fraction, seed) 根據(jù)fraction指定的比例對(duì)數(shù)據(jù)進(jìn)行采樣嚣镜,可以選擇是否使用隨機(jī)數(shù)進(jìn)行替換徽职,seed用于指定隨機(jī)數(shù)生成器種子
union(otherDataset) 對(duì)源RDD和參數(shù)RDD求并集后返回一個(gè)新的RDD
intersection(otherDataset) 對(duì)源RDD和參數(shù)RDD求交集后返回一個(gè)新的RDD
distinct([numTasks])) 對(duì)源RDD進(jìn)行去重后返回一個(gè)新的RDD
groupByKey([numTasks]) 在一個(gè)(K,V)的RDD上調(diào)用,返回一個(gè)(K, Iterator[V])的RDD
reduceByKey(func, [numTasks]) 在一個(gè)(K,V)的RDD上調(diào)用,返回一個(gè)(K,V)的RDD栈顷,使用指定的reduce函數(shù)惑朦,將相同key的值聚合到一起兽泄,與groupByKey類似,reduce任務(wù)的個(gè)數(shù)可以通過第二個(gè)可選的參數(shù)來設(shè)置
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 對(duì)PairRDD中相同的Key值進(jìn)行聚合操作漾月,在聚合過程中同樣使用了一個(gè)中立的初始值病梢。和aggregate函數(shù)類似,aggregateByKey返回值的類型不需要和RDD中value的類型一致
sortByKey([ascending], [numTasks]) 在一個(gè)(K,V)的RDD上調(diào)用梁肿,K必須實(shí)現(xiàn)Ordered接口蜓陌,返回一個(gè)按照key進(jìn)行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks]) 與sortByKey類似觅彰,但是更靈活
join(otherDataset, [numTasks]) 在類型為(K,V)和(K,W)的RDD上調(diào)用,返回一個(gè)相同key對(duì)應(yīng)的所有元素對(duì)在一起的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks]) 在類型為(K,V)和(K,W)的RDD上調(diào)用护奈,返回一個(gè)(K,(Iterable<v style="margin: 0px; padding: 0px;">,Iterable<w style="margin: 0px; padding: 0px;">))類型的RDD</w></v>
cartesian(otherDataset) 笛卡爾積
pipe(command, [envVars]) 對(duì)rdd進(jìn)行管道操作
coalesce(numPartitions) 減少 RDD 的分區(qū)數(shù)到指定值缔莲。在過濾大量數(shù)據(jù)之后,可以執(zhí)行此操作
repartition(numPartitions) 重新給 RDD 分區(qū)

4. Action動(dòng)作算子

動(dòng)作算子 含義
reduce(func) 通過func函數(shù)聚集RDD中的所有元素霉旗,這個(gè)功能必須是可交換且可并聯(lián)的
collect() 在驅(qū)動(dòng)程序中痴奏,以數(shù)組的形式返回?cái)?shù)據(jù)集的所有元素
count() 返回RDD的元素個(gè)數(shù)
first() 返回RDD的第一個(gè)元素(類似于take(1))
take(n) 返回一個(gè)由數(shù)據(jù)集的前n個(gè)元素組成的數(shù)組
takeSample(withReplacement,num, [seed]) 返回一個(gè)數(shù)組,該數(shù)組由從數(shù)據(jù)集中隨機(jī)采樣的num個(gè)元素組成厌秒,可以選擇是否用隨機(jī)數(shù)替換不足的部分读拆,seed用于指定隨機(jī)數(shù)生成器種子
takeOrdered(n, [ordering]) 返回自然順序或者自定義順序的前 n 個(gè)元素
saveAsTextFile(path) 將數(shù)據(jù)集的元素以textfile的形式保存到HDFS文件系統(tǒng)或者其他支持的文件系統(tǒng),對(duì)于每個(gè)元素鸵闪,Spark將會(huì)調(diào)用toString方法檐晕,將它裝換為文件中的文本
saveAsSequenceFile(path) 將數(shù)據(jù)集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的文件系統(tǒng)
saveAsObjectFile(path) 將數(shù)據(jù)集的元素蚌讼,以 Java 序列化的方式保存到指定的目錄下
countByKey() 針對(duì)(K,V)類型的RDD辟灰,返回一個(gè)(K,Int)的map,表示每一個(gè)key對(duì)應(yīng)的元素個(gè)數(shù)
foreach(func) 在數(shù)據(jù)集的每一個(gè)元素上篡石,運(yùn)行函數(shù)func進(jìn)行更新
foreachPartition(func) 在數(shù)據(jù)集的每一個(gè)分區(qū)上芥喇,運(yùn)行函數(shù)func

統(tǒng)計(jì)操作:

算子 含義
count 個(gè)數(shù)
mean 均值
sum 求和
max 最大值
min 最小值
variance 方差
sampleVariance 從采樣中計(jì)算方差
stdev 標(biāo)準(zhǔn)差:衡量數(shù)據(jù)的離散程度
sampleStdev 采樣的標(biāo)準(zhǔn)差
stats 查看統(tǒng)計(jì)結(jié)果

三、RDD的持久化/緩存

在實(shí)際開發(fā)中某些RDD的計(jì)算或轉(zhuǎn)換可能會(huì)比較耗費(fèi)時(shí)間凰萨,如果這些RDD后續(xù)還會(huì)頻繁的被使用到继控,那么可以將這些RDD進(jìn)行持久化/緩存,這樣下次再使用到的時(shí)候就不用再重新計(jì)算了胖眷,提高了程序運(yùn)行的效率武通。

val rdd1 = sc.textFile("hdfs://node01:8020/words.txt")
val rdd2 = rdd1.flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_)
rdd2.cache //緩存/持久化
rdd2.sortBy(_._2,false).collect//觸發(fā)action,會(huì)去讀取HDFS的文件,rdd2會(huì)真正執(zhí)行持久化
rdd2.sortBy(_._2,false).collect//觸發(fā)action,會(huì)去讀緩存中的數(shù)據(jù),執(zhí)行速度會(huì)比之前快,因?yàn)閞dd2已經(jīng)持久化到內(nèi)存中了

持久化/緩存API詳解

  • ersist方法和cache方法

RDD通過persist或cache方法可以將前面的計(jì)算結(jié)果緩存,但是并不是這兩個(gè)方法被調(diào)用時(shí)立即緩存珊搀,而是觸發(fā)后面的action時(shí)冶忱,該RDD將會(huì)被緩存在計(jì)算節(jié)點(diǎn)的內(nèi)存中,并供后面重用境析。
通過查看RDD的源碼發(fā)現(xiàn)cache最終也是調(diào)用了persist無參方法(默認(rèn)存儲(chǔ)只存在內(nèi)存中):

RDD源碼
  • 存儲(chǔ)級(jí)別

默認(rèn)的存儲(chǔ)級(jí)別都是僅在內(nèi)存存儲(chǔ)一份囚枪,Spark的存儲(chǔ)級(jí)別還有好多種,存儲(chǔ)級(jí)別在object StorageLevel中定義的簿晓。

持久化級(jí)別 說明
MORY_ONLY(默認(rèn)) 將RDD以非序列化的Java對(duì)象存儲(chǔ)在JVM中眶拉。 如果沒有足夠的內(nèi)存存儲(chǔ)RDD千埃,則某些分區(qū)將不會(huì)被緩存憔儿,每次需要時(shí)都會(huì)重新計(jì)算。 這是默認(rèn)級(jí)別
MORY_AND_DISK(開發(fā)中可以使用這個(gè)) 將RDD以非序列化的Java對(duì)象存儲(chǔ)在JVM中放可。如果數(shù)據(jù)在內(nèi)存中放不下谒臼,則溢寫到磁盤上.需要時(shí)則會(huì)從磁盤上讀取
MEMORY_ONLY_SER (Java and Scala) 將RDD以序列化的Java對(duì)象(每個(gè)分區(qū)一個(gè)字節(jié)數(shù)組)的方式存儲(chǔ).這通常比非序列化對(duì)象(deserialized objects)更具空間效率朝刊,特別是在使用快速序列化的情況下,但是這種方式讀取數(shù)據(jù)會(huì)消耗更多的CPU
MEMORY_AND_DISK_SER (Java and Scala) 與MEMORY_ONLY_SER類似蜈缤,但如果數(shù)據(jù)在內(nèi)存中放不下拾氓,則溢寫到磁盤上,而不是每次需要重新計(jì)算它們
DISK_ONLY 將RDD分區(qū)存儲(chǔ)在磁盤上
MEMORY_ONLY_2, MEMORY_AND_DISK_2等 與上面的儲(chǔ)存級(jí)別相同底哥,只不過將持久化數(shù)據(jù)存為兩份咙鞍,備份每個(gè)分區(qū)存儲(chǔ)在兩個(gè)集群節(jié)點(diǎn)上
OFF_HEAP(實(shí)驗(yàn)中) 與MEMORY_ONLY_SER類似,但將數(shù)據(jù)存儲(chǔ)在堆外內(nèi)存中趾徽。 (即不是直接存儲(chǔ)在JVM內(nèi)存中)

總結(jié):

  1. RDD持久化/緩存的目的是為了提高后續(xù)操作的速度
  2. 緩存的級(jí)別有很多续滋,默認(rèn)只存在內(nèi)存中,開發(fā)中使用memory_and_disk
  3. 只有執(zhí)行action操作的時(shí)候才會(huì)真正將RDD數(shù)據(jù)進(jìn)行持久化/緩存
  4. 實(shí)際開發(fā)中如果某一個(gè)RDD后續(xù)會(huì)被頻繁的使用,可以將該RDD進(jìn)行持久化/緩存

四孵奶、RDD容錯(cuò)機(jī)制Checkpoint

  • 持久化的局限:

持久化/緩存可以把數(shù)據(jù)放在內(nèi)存中疲酌,雖然是快速的,但是也是最不可靠的了袁;也可以把數(shù)據(jù)放在磁盤上朗恳,也不是完全可靠的!例如磁盤會(huì)損壞等载绿。

  • 問題解決:

Checkpoint的產(chǎn)生就是為了更加可靠的數(shù)據(jù)持久化粥诫,在Checkpoint的時(shí)候一般把數(shù)據(jù)放在在HDFS上,這就天然的借助了HDFS天生的高容錯(cuò)卢鹦、高可靠來實(shí)現(xiàn)數(shù)據(jù)最大程度上的安全臀脏,實(shí)現(xiàn)了RDD的容錯(cuò)和高可用。

用法

SparkContext.setCheckpointDir("目錄") //HDFS的目錄

RDD.checkpoint

  • 總結(jié):

  • 開發(fā)中如何保證數(shù)據(jù)的安全性性及讀取效率:
    可以對(duì)頻繁使用且重要的數(shù)據(jù)冀自,先做緩存/持久化揉稚,再做checkpint操作。

  • 持久化和Checkpoint的區(qū)別:

  1. 位置:
    Persist 和 Cache 只能保存在本地的磁盤和內(nèi)存中(或者堆外內(nèi)存--實(shí)驗(yàn)中)
    Checkpoint 可以保存數(shù)據(jù)到 HDFS 這類可靠的存儲(chǔ)上熬粗。

  2. 生命周期:
    Cache和Persist的RDD會(huì)在程序結(jié)束后會(huì)被清除或者手動(dòng)調(diào)用unpersist方法
    Checkpoint的RDD在程序結(jié)束后依然存在搀玖,不會(huì)被刪除。

五驻呐、RDD依賴關(guān)系

1. 寬窄依賴

  • 兩種依賴關(guān)系類型
    RDD和它依賴的父RDD的關(guān)系有兩種不同的類型灌诅,即
    寬依賴(wide dependency/shuffle dependency)
    窄依賴(narrow dependency)
image
  • 圖解:
寬窄依賴
  • 如何區(qū)分寬窄依賴:

窄依賴:父RDD的一個(gè)分區(qū)只會(huì)被子RDD的一個(gè)分區(qū)依賴;
寬依賴:父RDD的一個(gè)分區(qū)會(huì)被子RDD的多個(gè)分區(qū)依賴(涉及到shuffle)含末。

2. 為什么要設(shè)計(jì)寬窄依賴

  1. 對(duì)于窄依賴:

窄依賴的多個(gè)分區(qū)可以并行計(jì)算猜拾;
窄依賴的一個(gè)分區(qū)的數(shù)據(jù)如果丟失只需要重新計(jì)算對(duì)應(yīng)的分區(qū)的數(shù)據(jù)就可以了。

  1. 對(duì)于寬依賴:

劃分Stage(階段)的依據(jù):對(duì)于寬依賴,必須等到上一階段計(jì)算完成才能計(jì)算下一階段佣盒。

六挎袜、DAG的生成和劃分Stage

1. DAG介紹

  • DAG是什么:

DAG(Directed Acyclic Graph有向無環(huán)圖)指的是數(shù)據(jù)轉(zhuǎn)換執(zhí)行的過程,有方向,無閉環(huán)(其實(shí)就是RDD執(zhí)行的流程)盯仪;
原始的RDD通過一系列的轉(zhuǎn)換操作就形成了DAG有向無環(huán)圖紊搪,任務(wù)執(zhí)行時(shí),可以按照DAG的描述全景,執(zhí)行真正的計(jì)算(數(shù)據(jù)被操作的一個(gè)過程)耀石。

  • DAG的邊界

開始:通過SparkContext創(chuàng)建的RDD;
結(jié)束:觸發(fā)Action爸黄,一旦觸發(fā)Action就形成了一個(gè)完整的DAG滞伟。

2.DAG劃分Stage

DAG劃分Stage

一個(gè)Spark程序可以有多個(gè)DAG(有幾個(gè)Action,就有幾個(gè)DAG炕贵,上圖最后只有一個(gè)Action(圖中未表現(xiàn)),那么就是一個(gè)DAG)诗良。

一個(gè)DAG可以有多個(gè)Stage(根據(jù)寬依賴/shuffle進(jìn)行劃分)。

同一個(gè)Stage可以有多個(gè)Task并行執(zhí)行(task數(shù)=分區(qū)數(shù)鲁驶,如上圖鉴裹,Stage1 中有三個(gè)分區(qū)P1、P2钥弯、P3径荔,對(duì)應(yīng)的也有三個(gè) Task)。

可以看到這個(gè)DAG中只reduceByKey操作是一個(gè)寬依賴脆霎,Spark內(nèi)核會(huì)以此為邊界將其前后劃分成不同的Stage总处。

同時(shí)我們可以注意到,在圖中Stage1中睛蛛,從textFile到flatMap到map都是窄依賴鹦马,這幾步操作可以形成一個(gè)流水線操作,通過flatMap操作生成的partition可以不用等待整個(gè)RDD計(jì)算結(jié)束忆肾,而是繼續(xù)進(jìn)行map操作荸频,這樣大大提高了計(jì)算的效率

  • 為什么要?jiǎng)澐諷tage? --并行計(jì)算

一個(gè)復(fù)雜的業(yè)務(wù)邏輯如果有shuffle客冈,那么就意味著前面階段產(chǎn)生結(jié)果后旭从,才能執(zhí)行下一個(gè)階段,即下一個(gè)階段的計(jì)算要依賴上一個(gè)階段的數(shù)據(jù)场仲。那么我們按照shuffle進(jìn)行劃分(也就是按照寬依賴就行劃分)和悦,就可以將一個(gè)DAG劃分成多個(gè)Stage/階段,在同一個(gè)Stage中渠缕,會(huì)有多個(gè)算子操作鸽素,可以形成一個(gè)pipeline流水線,流水線內(nèi)的多個(gè)平行的分區(qū)可以并行執(zhí)行亦鳞。

  • 如何劃分DAG的stage馍忽?

對(duì)于窄依賴澜汤,partition的轉(zhuǎn)換處理在stage中完成計(jì)算,不劃分(將窄依賴盡量放在在同一個(gè)stage中舵匾,可以實(shí)現(xiàn)流水線計(jì)算)。

對(duì)于寬依賴谁不,由于有shuffle的存在坐梯,只能在父RDD處理完成后,才能開始接下來的計(jì)算刹帕,也就是說需要要?jiǎng)澐謘tage吵血。

總結(jié):

Spark會(huì)根據(jù)shuffle/寬依賴使用回溯算法來對(duì)DAG進(jìn)行Stage劃分,從后往前偷溺,遇到寬依賴就斷開蹋辅,遇到窄依賴就把當(dāng)前的RDD加入到當(dāng)前的stage/階段中

作者:五分鐘學(xué)大數(shù)據(jù)
原文鏈接:https://www.cnblogs.com/itlz/p/14548375.html

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市挫掏,隨后出現(xiàn)的幾起案子侦另,更是在濱河造成了極大的恐慌,老刑警劉巖尉共,帶你破解...
    沈念sama閱讀 218,546評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件褒傅,死亡現(xiàn)場離奇詭異,居然都是意外死亡袄友,警方通過查閱死者的電腦和手機(jī)殿托,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,224評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來剧蚣,“玉大人支竹,你說我怎么就攤上這事○矗” “怎么了礼搁?”我有些...
    開封第一講書人閱讀 164,911評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長目尖。 經(jīng)常有香客問我叹坦,道長,這世上最難降的妖魔是什么卑雁? 我笑而不...
    開封第一講書人閱讀 58,737評(píng)論 1 294
  • 正文 為了忘掉前任募书,我火速辦了婚禮,結(jié)果婚禮上测蹲,老公的妹妹穿的比我還像新娘莹捡。我一直安慰自己,他們只是感情好扣甲,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,753評(píng)論 6 392
  • 文/花漫 我一把揭開白布篮赢。 她就那樣靜靜地躺著齿椅,像睡著了一般。 火紅的嫁衣襯著肌膚如雪启泣。 梳的紋絲不亂的頭發(fā)上涣脚,一...
    開封第一講書人閱讀 51,598評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音寥茫,去河邊找鬼遣蚀。 笑死,一個(gè)胖子當(dāng)著我的面吹牛纱耻,可吹牛的內(nèi)容都是我干的芭梯。 我是一名探鬼主播,決...
    沈念sama閱讀 40,338評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼弄喘,長吁一口氣:“原來是場噩夢啊……” “哼玖喘!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起蘑志,我...
    開封第一講書人閱讀 39,249評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤累奈,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后急但,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體费尽,經(jīng)...
    沈念sama閱讀 45,696評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,888評(píng)論 3 336
  • 正文 我和宋清朗相戀三年羊始,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了旱幼。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,013評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡突委,死狀恐怖柏卤,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情匀油,我是刑警寧澤缘缚,帶...
    沈念sama閱讀 35,731評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站敌蚜,受9級(jí)特大地震影響桥滨,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜弛车,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,348評(píng)論 3 330
  • 文/蒙蒙 一齐媒、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧纷跛,春花似錦喻括、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,929評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽望蜡。三九已至,卻和暖如春拷恨,著一層夾襖步出監(jiān)牢的瞬間脖律,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,048評(píng)論 1 270
  • 我被黑心中介騙來泰國打工腕侄, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留小泉,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,203評(píng)論 3 370
  • 正文 我出身青樓兜挨,卻偏偏與公主長得像,于是被迫代替她去往敵國和親眯分。 傳聞我的和親對(duì)象是個(gè)殘疾皇子拌汇,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,960評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容