為什么需要持久化
所謂的持久化令杈,就是將數(shù)據(jù)進(jìn)行保存掌敬,避免數(shù)據(jù)丟失。RDD持久化并非將數(shù)據(jù)落盤保存蠢熄,而是用作緩存跪解。
了解RDD持久化前需要先了解什么是RDD?
- RDD就像是一個(gè)水管签孔,數(shù)據(jù)就像是水叉讥,水只會經(jīng)過水管,并不是存儲水饥追。所以RDD是不會存儲數(shù)據(jù)的图仓。
- RDD(
Resilient Distributed Dataset
)叫做彈性分布式數(shù)據(jù)集,的其中一個(gè)特性就是彈性
- 存儲的彈性: spark計(jì)算過程中中間結(jié)果會保存在內(nèi)存中如果內(nèi)存不足會自動(dòng)存儲在磁盤
- 容錯(cuò)的彈性: spark中計(jì)算過程中如果出錯(cuò)會自動(dòng)重試
- 計(jì)算的彈性:如果計(jì)算過程中數(shù)據(jù)丟失,會根據(jù)RDD的依賴關(guān)系重新計(jì)算得到數(shù)據(jù)
- 分區(qū)的彈性:spark RDD會根據(jù)文件大小動(dòng)態(tài)分區(qū)
扯到彈性但绕,一是該篇文章會講到救崔,最主要的還是我需要復(fù)習(xí)一下
這里主要說到計(jì)算的彈性
,如果其中一個(gè)RDD出現(xiàn)問題壁熄,可以根據(jù)依賴關(guān)系重寫計(jì)算帚豪,獲得結(jié)果。這樣的好處就是保證了數(shù)據(jù)的完整性
草丧。那么有沒有缺點(diǎn)呢狸臣?答案是有的,就是需要重復(fù)計(jì)算昌执,得從頭開始烛亦。
如圖(1001):若
rdd5
出現(xiàn)問題了诈泼,若要重新獲得數(shù)據(jù)需要從rdd1
開始運(yùn)行,但是rdd5實(shí)際上只依賴于rdd4
產(chǎn)生的數(shù)據(jù)結(jié)果煤禽。從頭開始效率大大降低呀铐达,我們是否可以把rdd4
的解決結(jié)果緩存起來,rdd5
直接從緩存中獲让使瓮孙?若緩存中沒有在從頭開始呢?這樣可以大大減少運(yùn)行時(shí)間选脊。
我們再通過一個(gè)案例深入理解緩存的作用杭抠。
@Test
def check2Test(): Unit ={
val conf =new SparkConf().setMaster("local[4]").setAppName("test")
val sc=new SparkContext(conf)
val rdd1=sc.parallelize(List(1,2,3),4)
//進(jìn)行一個(gè)map算子操作
val rdd2=rdd1.map(x=>{
println("*"*10)
x*10
})
//進(jìn)行一個(gè)reduce算子操作
val rdd3=rdd2.map(x=>x+10)
val rdd4=rdd2.map(x=>x+10)
//打印 rdd2 的結(jié)果
println(rdd3.collect.toList)
// 輸出 sum值
println(rdd4.collect.toList)
}
有這么一個(gè)程序(如上),rdd2的結(jié)果會作為rdd3和rdd4的數(shù)據(jù)來源恳啥,也就是說rdd3和rdd4會對rdd2的結(jié)果做運(yùn)算偏灿。
若此時(shí)運(yùn)行程序;rdd2中的println("*"*10)
會被運(yùn)行多少次?
val rdd2=rdd1.map(x=>{
println("*"*10)
x*10
})
答案會被運(yùn)行6
次钝的,因?yàn)闀?jīng)過兩次collect
翁垂,
根據(jù)依賴關(guān)系,調(diào)用類似collect
的Action運(yùn)行算子硝桩,程序會從上往下運(yùn)行(三條數(shù)據(jù)*
兩次collect
=6次)沿猜。
**********
**********
**********
**********
**********
**********
你可以發(fā)現(xiàn)到了問題所在,這樣每次都要從頭開始碗脊,這種依賴關(guān)系比較少只有三級邢疙,若在實(shí)際開發(fā)中,十級以上都是屬于正常的依賴關(guān)系
望薄,若在rdd2
中存有數(shù)據(jù),就無需從頭開始了呼畸,直接取來用即可痕支。
RDD緩存
RDD不存儲數(shù)據(jù),所以默認(rèn)情況下每次執(zhí)行的時(shí)候都會stage開頭執(zhí)行
緩存:
- 數(shù)據(jù)保存位置: 保存在task所在主機(jī)的內(nèi)存/本地磁盤上
- 應(yīng)用場景: 某個(gè)RDD在多個(gè)job中重復(fù)使用的時(shí)候
如何緩存:
- cache
- persist
緩存的好處:
如果一個(gè)RDD有設(shè)置cache\persist,此時(shí)rdd所屬第一個(gè)Job執(zhí)行完成之后,數(shù)據(jù)會持久化到本地的磁盤/內(nèi)存中蛮原。后續(xù)RDD所屬的其他job在執(zhí)行的時(shí)候會直接將緩存數(shù)據(jù)拿過來使用而不用重新計(jì)算
RDD Cache緩存
RDD通過Cache或者Persist方法將前面的計(jì)算結(jié)果緩存卧须,默認(rèn)情況下會把數(shù)據(jù)以序列化的形式緩存在JVM的堆內(nèi)存中。但是
并不是這兩個(gè)方法被調(diào)用時(shí)立即緩存
儒陨,而是觸發(fā)后面的action時(shí)花嘶,該RDD將會被緩存在計(jì)算節(jié)點(diǎn)的內(nèi)存中,并供后面重用蹦漠。
如何使用椭员?
語法: RDD.cache()
@Test
def check2Test(): Unit ={
val conf =new SparkConf().setMaster("local[4]").setAppName("test")
val sc=new SparkContext(conf)
val rdd1=sc.parallelize(List(1,2,3),4)
//進(jìn)行一個(gè)map算子操作
val rdd2=rdd1.map(x=>{
println("*"*10)
x*10
})
//添加 rdd2的緩存
val rddx=rdd2.cache()
//進(jìn)行一個(gè)reduce算子操作
val rdd3=rddx.map(x=>x+10)
val rdd4=rddx.map(x=>x+10)
//打印 rdd2 的結(jié)果
println(rdd3.collect.toList)
// 輸出 sum值
println(rdd4.collect.toList)
}
運(yùn)行結(jié)果,是否還會打印6次println("*"*10)
**********
**********
**********
數(shù)據(jù)直接從緩存中取笛园,所以只會打印三次隘击。
RDD Persist緩存
如何使用侍芝?
語法: RDD.persist()
@Test
def check2Test(): Unit ={
val conf =new SparkConf().setMaster("local[4]").setAppName("test")
val sc=new SparkContext(conf)
val rdd1=sc.parallelize(List(1,2,3),4)
//進(jìn)行一個(gè)map算子操作
val rdd2=rdd1.map(x=>{
println("*"*10)
x*10
})
//添加 rdd2的緩存
val rddx=rdd2.persist()
//進(jìn)行一個(gè)reduce算子操作
val rdd3=rddx.map(x=>x+10)
val rdd4=rddx.map(x=>x+10)
//打印 rdd2 的結(jié)果
println(rdd3.collect.toList)
// 輸出 sum值
println(rdd4.collect.toList)
}
結(jié)果還是3次
**********
**********
**********
Cache和Persist有什么區(qū)別?
cache
底層就是persist
埋同,調(diào)用是一個(gè)無參函數(shù)州叠。
def cache(): this.type = persist()
無參的persist()
其實(shí)調(diào)用的就是有參的,指定一個(gè)緩存級別凶赁。
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
- cache是將數(shù)據(jù)保存在本地內(nèi)存中
- persist可以指定將數(shù)據(jù)保存在內(nèi)存/磁盤中
Persist的緩存級別
StorageLevel 中就定義persist
的用到緩存級別咧栗。
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
存儲級別:
- NONE: 不存儲
- DISK_ONLY : 只保存在磁盤中
- DISK_ONLY_2 : 只保存在磁盤中,數(shù)據(jù)保存兩份
- MEMORY_ONLY : 只保存在內(nèi)存中
- MEMORY_ONLY_2 : 只保存在內(nèi)存中,數(shù)據(jù)保存兩份
- MEMORY_ONLY_SER :只保存在內(nèi)存中,以序列化形式存儲
- MEMORY_ONLY_SER_2 : 只保存在內(nèi)存中,以序列化形式存儲,數(shù)據(jù)保存兩份
- MEMORY_AND_DISK : 數(shù)據(jù)保存在內(nèi)存/磁盤中,可以動(dòng)態(tài)調(diào)整
- MEMORY_AND_DISK_2 : 數(shù)據(jù)保存在內(nèi)存/磁盤中,可以動(dòng)態(tài)調(diào)整虱肄,數(shù)據(jù)保存兩份
- MEMORY_AND_DISK_SER :數(shù)據(jù)保存在內(nèi)存/磁盤中,可以動(dòng)態(tài)調(diào)整,以序列化形式存儲
- MEMORY_AND_DISK_SER_2 : 數(shù)據(jù)保存在內(nèi)存/磁盤中,可以動(dòng)態(tài)調(diào)整,以序列化形式存儲致板,數(shù)據(jù)保存兩份
- OFF_HEAP :數(shù)據(jù)保存在堆外內(nèi)存中
太多了對不對?其中工作中常用的存儲級別:
- MEMORY_ONLY<只適用于小數(shù)據(jù)量場景>
- MEMORY_AND_DISK<適用于大數(shù)據(jù)量場景>
RDD CheckPoint檢查點(diǎn)
雖然我們配置了緩存但是只能
保存在task所在主機(jī)的內(nèi)存/本地磁盤上
浩峡,若該服務(wù)器出現(xiàn)問題可岂,依然會造成數(shù)據(jù)丟失,從頭開始
計(jì)算翰灾,效率也有所降低缕粹。為了安全起見,我們可以設(shè)置CheckPoint
纸淮,將數(shù)據(jù)保存到比較可靠的地方平斩,如:將數(shù)據(jù)保存到HDFS
中。
1)檢查點(diǎn):是通過將RDD中間結(jié)果寫入磁盤咽块。
2)為什么要做檢查點(diǎn)绘面?
由于血緣依賴過長會造成容錯(cuò)成本過高,這樣就不如在中間階段做檢查點(diǎn)容錯(cuò)侈沪,如果檢查點(diǎn)之后有節(jié)點(diǎn)出現(xiàn)問題揭璃,可以從檢查點(diǎn)開始重做血緣,減少了開銷亭罪。
3)檢查點(diǎn)存儲路徑:Checkpoint的數(shù)據(jù)通常是存儲在HDFS等容錯(cuò)瘦馍、高可用的文件系統(tǒng)
4)檢查點(diǎn)數(shù)據(jù)存儲格式為:二進(jìn)制的文件
5)檢查點(diǎn)切斷血緣:在Checkpoint的過程中,該RDD的所有依賴于父RDD中的信息將全部被移除应役。
6)檢查點(diǎn)觸發(fā)時(shí)間:對RDD進(jìn)行Checkpoint操作并不會馬上被執(zhí)行情组,必須執(zhí)行Action操作才能觸發(fā)。但是檢查點(diǎn)為了數(shù)據(jù)安全箩祥,會從血緣關(guān)系的最開始執(zhí)行一遍院崇。
案例演示:
@Test
def check4Test(): Unit ={
val conf =new SparkConf().setMaster("local[4]").setAppName("test")
val sc=new SparkContext(conf)
//設(shè)置存儲路徑
sc.setCheckpointDir("hdfs://hadoop102:9820/output/a")
val rdd1=sc.parallelize(List(1,2,3),4)
//進(jìn)行一個(gè)map算子操作
val rdd2=rdd1.map(x=>{
println("*"*10)
x*10
})
//添加 rdd2的緩存
rdd2.checkpoint()
//進(jìn)行一個(gè)reduce算子操作
val rdd3=rdd2.map(x=>x+10)
val rdd4=rdd2.map(x=>x+10)
//打印 rdd2 的結(jié)果
println(rdd3.collect.toList)
// 輸出 sum值
println(rdd4.collect.toList)
}
/output/a
輸出文件
若需要這種錯(cuò)誤
Permission denied: user=123456, access=WRITE, inode="/output/a":hadoop:supergroup:drwxr-xr-x
這種屬于權(quán)限問題,解決方式設(shè)置-DHADOOP_USER_NAME=hadoop
hadoop=你鏈接服務(wù)器的賬號
奇怪的是控制臺卻打印了6次println("*"*10)
**********
**********
**********
**********
**********
**********
這是怎么回事了袍祖?
是我執(zhí)行了 rdd3和rdd4的原因底瓣?
@Test
def check4Test(): Unit ={
val conf =new SparkConf().setMaster("local[4]").setAppName("test")
val sc=new SparkContext(conf)
//設(shè)置存儲路徑
sc.setCheckpointDir("hdfs://hadoop102:9820/output/a")
val rdd1=sc.parallelize(List(1,2,3),4)
//進(jìn)行一個(gè)map算子操作
val rdd2=rdd1.map(x=>{
println("*"*10)
x*10
})
//添加 rdd2的緩存
rdd2.checkpoint()
//進(jìn)行一個(gè)reduce算子操作
val rdd3=rdd2.map(x=>x+10)
val rdd4=rdd2.map(x=>x+10)
val rdd5=rdd2.map(x=>x+10)
//打印 rdd2 的結(jié)果
println(rdd3.collect.toList)
// 輸出 rdd4 的結(jié)果
println(rdd4.collect.toList)
// 輸出 rdd5 的結(jié)果
println(rdd5.collect.toList)
}
在加一個(gè) rdd5
是否會執(zhí)行9次?
啟動(dòng)運(yùn)行依舊是6次
**********
**********
**********
**********
**********
**********
懊し骸濒持?這么奇怪键耕?那么把rdd4
和rdd5
刪了呢?
@Test
def check4Test(): Unit ={
val conf =new SparkConf().setMaster("local[4]").setAppName("test")
val sc=new SparkContext(conf)
//設(shè)置存儲路徑
sc.setCheckpointDir("hdfs://hadoop102:9820/output/a")
val rdd1=sc.parallelize(List(1,2,3),4)
//進(jìn)行一個(gè)map算子操作
val rdd2=rdd1.map(x=>{
println("*"*10)
x*10
})
//添加 rdd2的緩存
rdd2.checkpoint()
//進(jìn)行一個(gè)reduce算子操作
val rdd3=rdd2.map(x=>x+10)
//打印 rdd2 的結(jié)果
println(rdd3.collect.toList)
}
啟動(dòng)運(yùn)行依舊是6次
**********
**********
**********
**********
**********
**********
這是什么原因呢柑营?這就得從checkpoint()
源碼中尋址答案了屈雄。
def checkpoint(): Unit = RDDCheckpointData.synchronized {
// 判斷路徑是否為空
if (context.checkpointDir.isEmpty) {
throw new SparkException("Checkpoint directory has not been set in the SparkContext")
} else if (checkpointData.isEmpty) {
// 看看 ReliableRDDCheckpointData
checkpointData = Some(new ReliableRDDCheckpointData(this))
}
}
ReliableRDDCheckpointData
是一個(gè)伴生對象,我們看看doCheckpoint
方法
protected override def doCheckpoint(): CheckpointRDD[T] = {
// 將RDD寫入檢查點(diǎn)目錄
val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)
// Optionally clean our checkpoint files if the reference is out of scope
if (rdd.conf.get(CLEANER_REFERENCE_TRACKING_CLEAN_CHECKPOINTS)) {
rdd.context.cleaner.foreach { cleaner =>
cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)
}
}
logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}")
newRDD
}
進(jìn)入到 writeRDDToCheckpointDirectory
中
def writeRDDToCheckpointDirectory[T: ClassTag](
originalRDD: RDD[T],
checkpointDir: String,
blockSize: Int = -1): ReliableCheckpointRDD[T] = {
val checkpointStartTimeNs = System.nanoTime()
val sc = originalRDD.sparkContext
// Create the output path for the checkpoint
val checkpointDirPath = new Path(checkpointDir)
val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
if (!fs.mkdirs(checkpointDirPath)) {
throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath")
}
// Save to file, and reload it as an RDD
//保存到文件官套,并將其作為RDD重新加載
val broadcastedConf = sc.broadcast(
new SerializableConfiguration(sc.hadoopConfiguration))
// 上面都是一些初始化工作酒奶,判斷,創(chuàng)建目錄啥奶赔,主要是這一句惋嚎,會在運(yùn)行job
sc.runJob(originalRDD, writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)
// 下面就不是很重要了
if (originalRDD.partitioner.nonEmpty) {
writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)
}
val checkpointDurationMs =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - checkpointStartTimeNs)
logInfo(s"Checkpointing took $checkpointDurationMs ms.")
val newRDD = new ReliableCheckpointRDD[T](
sc, checkpointDirPath.toString, originalRDD.partitioner)
if (newRDD.partitions.length != originalRDD.partitions.length) {
throw new SparkException(
"Checkpoint RDD has a different number of partitions from original RDD. Original " +
s"RDD [ID: ${originalRDD.id}, num of partitions: ${originalRDD.partitions.length}]; " +
s"Checkpoint RDD [ID: ${newRDD.id}, num of partitions: " +
s"${newRDD.partitions.length}].")
}
newRDD
}
程序運(yùn)行到這里,他會再執(zhí)行一個(gè)job任務(wù)站刑,并將數(shù)據(jù)保存到緩存中另伍。
sc.runJob(originalRDD, writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)
checkpoint
一開始就執(zhí)行(如下),雖然checkpoint
在rdd3
前面被調(diào)用绞旅,但是它需要等到rdd3
執(zhí)行完collect
之后再運(yùn)行一個(gè)job摆尝。
//添加 rdd2的緩存
rdd2.checkpoint()
//進(jìn)行一個(gè)reduce算子操作
val rdd3=rdd2.map(x=>x+10)
總結(jié):checkpoint會觸發(fā)一次job操作,該job操作是在checkpoint所屬RDD第一個(gè)job執(zhí)行完成之后才會觸發(fā)
;
這樣就不爽了因悲,明明只需要跑一次的任務(wù)就可以緩存的堕汞,現(xiàn)在需要多跑一次,雖然對后面的RDD
效率有所提高晃琳,但是就是不爽讯检。
其實(shí)check
可以和checkpoint
配合使用
@Test
def check4Test(): Unit ={
val conf =new SparkConf().setMaster("local[4]").setAppName("test")
val sc=new SparkContext(conf)
//設(shè)置存儲路徑
sc.setCheckpointDir("hdfs://hadoop102:9820/output/a")
val rdd1=sc.parallelize(List(1,2,3),4)
//進(jìn)行一個(gè)map算子操作
val rdd2=rdd1.map(x=>{
println("*"*10)
x*10
})
//添加 rdd2的緩存
val rddx=rdd2.cache()
rddx.checkpoint()
//進(jìn)行一個(gè)reduce算子操作
val rdd3=rddx.map(x=>x+10)
val rdd4=rddx.map(x=>x+10)
val rdd5=rddx.map(x=>x+10)
//打印 rdd2 的結(jié)果
println(rdd3.collect.toList)
// 輸出 rdd4 的結(jié)果
println(rdd4.collect.toList)
// 輸出 rdd5 的結(jié)果
println(rdd5.collect.toList)
//釋放
rddx.unpersist(true)
//關(guān)閉鏈接
sc.stop()
}
這樣配置配置之后,就只會運(yùn)行一次了卫旱。
//添加 rdd2的緩存
val rddx=rdd2.cache()
rddx.checkpoint()
程序結(jié)果
**********
**********
**********
注意:checkpoint
還是會運(yùn)行一個(gè)job人灼,但是程序不用從頭開始了,而是直接從rddx
中取顾翼。
總結(jié):為了避免checkpoint觸發(fā)的job重復(fù)執(zhí)行之前的數(shù)據(jù)處理邏輯,可以在checkpoint之間將rdd通過cache緩存數(shù)據(jù),后續(xù)checkpoint觸發(fā)的job就可以直接使用緩存的數(shù)據(jù)
最后還得注意的是挡毅,緩存是可以備份到內(nèi)存或磁盤中的。
使用cache時(shí)暴构,job結(jié)束之后,緩存會被自動(dòng)釋放段磨。
使用checkpoint時(shí)取逾,需要手動(dòng)進(jìn)行釋放,需要設(shè)置unpersist
為true默認(rèn)為false苹支。
//釋放
rddx.unpersist(true)
//關(guān)閉鏈接
sc.stop()
cache與checkpoint的區(qū)別:
數(shù)據(jù)持久化的位置不一樣:
- cache是將數(shù)據(jù)保存在本地內(nèi)存/磁盤中
- checkpoint是將數(shù)據(jù)保存在HDFS中
依賴關(guān)系是否保留不一樣
- cache是將數(shù)據(jù)保存在本地內(nèi)存/磁盤中,如果服務(wù)器宕機(jī),此時(shí)數(shù)據(jù)丟失,丟失數(shù)據(jù)之后只能根據(jù)rdd的依賴關(guān)系重新計(jì)算得到數(shù)據(jù),所以rdd的依賴關(guān)系會保留
- checkpoint是將數(shù)據(jù)保存在HDFS,數(shù)據(jù)不會丟失,此時(shí)RDD的依賴關(guān)系會切除
工作常用還是cache
1)Cache緩存只是將數(shù)據(jù)保存起來砾隅,不切斷血緣依賴。Checkpoint檢查點(diǎn)切斷血緣依賴债蜜。
2)Cache緩存的數(shù)據(jù)通常存儲在磁盤晴埂、內(nèi)存等地方究反,可靠性低。Checkpoint的數(shù)據(jù)通常存儲在HDFS等容錯(cuò)儒洛、高可用的文件系統(tǒng)精耐,可靠性高。
3)建議對checkpoint()的RDD使用Cache緩存琅锻,這樣checkpoint的job只需從Cache緩存中讀取數(shù)據(jù)即可卦停,否則需要再從頭計(jì)算一次RDD。
4)如果使用完了緩存恼蓬,可以通過unpersist()方法釋放緩存
Shuffle會自動(dòng)進(jìn)行緩存
Job程序運(yùn)行中惊完,若進(jìn)行了
shuffle
操作,會自動(dòng)完成一次緩存操作处硬。
案例演示:
@Test
def check4Test(): Unit ={
val conf =new SparkConf().setMaster("local[4]").setAppName("test")
val sc=new SparkContext(conf)
val rdd1=sc.parallelize(List(1,2,3),4)
//進(jìn)行一個(gè)map算子操作
val rdd2=rdd1.map(x=>{
println("*"*10)
x*10
})
// 進(jìn)行一次分組小槐,主要想讓程序完成一次shuffle操作。
val rddx=rdd2.groupBy(x=>x)
//進(jìn)行一個(gè)reduce算子操作
val rdd3=rddx.map(x=>x)
val rdd4=rddx.map(x=>x)
val rdd5=rddx.map(x=>x)
//打印 rdd2 的結(jié)果
println(rdd3.collect.toList)
// 輸出 rdd4 的結(jié)果
println(rdd4.collect.toList)
// 輸出 rdd5 的結(jié)果
println(rdd5.collect.toList)
//關(guān)閉鏈接
sc.stop()
}
將代碼做了一些變動(dòng)荷辕;
- 刪除所有的緩存
- 使用
groupBy
凿跳,讓程序完成一次shuffle
操作 - 最終看程序,是否只打印 3次
println("*"*10)
?
運(yùn)行結(jié)果:只打印了3次桐腌。
**********
**********
**********
這個(gè)一定要記住拄显,面試可能會被問到。
最后
文章內(nèi)容案站,我一邊看一邊做案例躬审,可能文檔會比較亂,沒有其他博客那么清晰明了蟆盐。