本文主要翻譯至鏈接且不局限于該文內(nèi)容,也加入了筆者實(shí)踐內(nèi)容践付,翻譯水平有限秦士,歡迎指正,轉(zhuǎn)載請(qǐng)注明出處永高。由于篇幅較長(zhǎng)隧土,四、編程指引-Scala篇拆成了上下兩篇命爬,上一篇請(qǐng)參考《Spark指南》四曹傀、編程指引-Scala篇(上)。
<接上文>
使用key-value鍵值對(duì)
雖然Spark在RDDs上的大部門操作支持任意類型的對(duì)象饲宛,但是一些操作只能在鍵值對(duì)上使用皆愉。最常見的是分布式“shuffle”操作,例如根據(jù)key對(duì)元素進(jìn)行分組或聚集操作。在Scala中幕庐,這些操作可以在包含Tuple2對(duì)象(語(yǔ)言中的內(nèi)置tuple久锥,例如(a,b))的RDDs上使用。參考PairRDDFunctions异剥,你可以看到元組RDD上支持的操作瑟由。下面是一個(gè)使用元組RDD的例子:
val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
這個(gè)例子在map中傳遞的函數(shù)生成了一個(gè)元組RDD,每個(gè)元組的鍵是該行的文本冤寿,值初始化為1歹苦,然后使用了reduceByKey操作對(duì)元組進(jìn)行規(guī)約操作,把相同key的值進(jìn)行相加督怜,最終得到的是每個(gè)文本鍵的計(jì)數(shù)殴瘦。
我們還可以使用counts.sortByKey()操作對(duì)得到的鍵值對(duì)進(jìn)行字母序排列,然后調(diào)用counts.collect()將結(jié)果以對(duì)象數(shù)組形式都收集到驅(qū)動(dòng)程序中号杠。
注意:當(dāng)使用普通的對(duì)象作為key時(shí)痴施,你應(yīng)該保證該對(duì)象類的equals()方法有一個(gè)對(duì)應(yīng)的hashCode()方法。更多的信息請(qǐng)參考Object.hashCode() 文檔
Transformations操作
下表列舉了Spark支持的一些常用的transformations操作究流,更詳細(xì)的操作請(qǐng)參考RDD的API文檔(Scala, Java动遭,Python芬探,R)和元組RDD的API文檔(Scala,Java)厘惦。
Transformation | 含義 |
---|---|
map(func) | 將數(shù)據(jù)源的每個(gè)元素都傳給一個(gè)用戶定義的函數(shù)func偷仿,然后返回一個(gè)新的數(shù)據(jù)集 |
filter(func) | 將數(shù)據(jù)源的每個(gè)元素都傳給一個(gè)用戶定義的過(guò)濾函數(shù)func,如果func返回true宵蕉,則收集這些元素酝静,最終以一個(gè)新的數(shù)據(jù)集返回 |
flatMap(func) | 和map類似,但是每一個(gè)元素經(jīng)過(guò)func處理后可以返回0個(gè)或多個(gè)輸出元素羡玛,即func必須返回一個(gè)序列别智,而不是單個(gè)元素 |
mapPartitions(func) | 和map類似,但是會(huì)在單獨(dú)的RDD分區(qū)(塊)上運(yùn)行稼稿,因此當(dāng)在類型T的RDD上運(yùn)行時(shí)薄榛,func必須是Iterator <T> => Iterator <U>類型 |
mapPartitionsWithIndex(func) | 和mapPartitions類型,區(qū)別是會(huì)給func函數(shù)提供一個(gè)整數(shù)值以表示數(shù)據(jù)所在的分區(qū)让歼,因此當(dāng)在類型T的RDD上運(yùn)行時(shí)敞恋,func必須是 (Int, Iterator<T>) => Iterator<U>類型 |
sample(withReplacement, fraction, seed) | 使用給定的隨機(jī)數(shù)發(fā)生器種子對(duì)一部分?jǐn)?shù)據(jù)(fraction)進(jìn)行抽樣,可以進(jìn)行替換(withReplacement為boolean值) |
union(otherDataset) | 將源數(shù)據(jù)集與參數(shù)數(shù)據(jù)集進(jìn)行合并谋右,得到一個(gè)新的數(shù)據(jù)集 |
intersection(otherDataset) | 返回源數(shù)據(jù)集與參數(shù)數(shù)據(jù)集的交集數(shù)據(jù)集 |
distinct([numTasks])) | 過(guò)濾掉源數(shù)據(jù)集中的重復(fù)元素硬猫,返回一個(gè)set集合 |
groupByKey([numTasks]) | 當(dāng)在(K,V)鍵值對(duì)數(shù)據(jù)集上調(diào)用本函數(shù)時(shí),返回一個(gè)新的數(shù)據(jù)集,值為(K,Iterable<V>)鍵值對(duì)啸蜜,即對(duì)相同的key進(jìn)行分組坑雅。注意,1)如果你分組的目的是為了在每個(gè)key上執(zhí)行聚集操作盔性,例如求和或求均值霞丧,那么使用reduceByKey或者aggregateByKey會(huì)有更好的性能;2)默認(rèn)情況下冕香,輸出結(jié)果中的并行度取決于父RDD的分區(qū)數(shù)蛹尝。 你可以傳遞可選的numTasks參數(shù),增加運(yùn)行時(shí)的并行度 |
reduceByKey(func, [numTasks]) | 當(dāng)在(K,V)數(shù)據(jù)集上調(diào)用本函數(shù)時(shí)悉尾,返回一個(gè)新的(K,V)數(shù)據(jù)集突那,其中,每個(gè)key對(duì)應(yīng)的所有值都會(huì)傳遞給給定的func函數(shù)func构眯,操作類型為(V,V) => V愕难。可以在第二個(gè)參數(shù)中傳遞numTasks來(lái)設(shè)置并行度惫霸。 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 當(dāng)在(K,V)數(shù)據(jù)集上調(diào)用本函數(shù)時(shí)猫缭,返回一個(gè)新的(K,U)數(shù)據(jù)集,其中壹店,seqOp函數(shù)用于聚集一個(gè)分區(qū)內(nèi)的結(jié)果(類型為(U,T) => U)猜丹,combOp函數(shù)用于聚集多個(gè)分區(qū)的結(jié)果(類型為(U,U) => U),zeroValue作為seqOp和combOp操作的初始值硅卢,典型情況下是一個(gè)自然元素射窒,例如,如果是列表拼接操作将塑,初始值可以指定為Nil脉顿,如果是求和操作,初始值可以指定為0点寥。 聚合結(jié)果的值類型允許與輸入值類型不同艾疟。 reduce任務(wù)的數(shù)量可以通過(guò)可選的第三個(gè)參數(shù)來(lái)配置。 |
sortByKey([ascending], [numTasks]) | 當(dāng)在(K,V)數(shù)據(jù)集上調(diào)用本函數(shù)時(shí)开财,返回一個(gè)以K值排序的(K,V)數(shù)據(jù)集汉柒,其中,K必須實(shí)現(xiàn)Ordered接口责鳍,升序或降序可以在第一個(gè)參數(shù)指定(true為升序)碾褂,第二個(gè)參數(shù)指定并行度。 |
join(otherDataset, [numTasks]) | 將源數(shù)據(jù)集(K,V)與目標(biāo)數(shù)據(jù)集(K,W)進(jìn)行連接操作历葛,返回一個(gè)新的數(shù)據(jù)集(K,(V,W))正塌,其中嘀略,K為所有的鍵,(V,W)為所有可能的組合乓诽。如果需要計(jì)算外部連接帜羊,可以使用leftOuterJoin、rightOuterJoin或fullOuterJoin方法鸠天,其含義與數(shù)據(jù)庫(kù)表的連接類似讼育。 |
cogroup(otherDataset, [numTasks]) | 將源數(shù)據(jù)集(K,V)與目標(biāo)數(shù)據(jù)集(K,W)進(jìn)行此操作后,返回一個(gè)新的(K,(Iterable<V>, Iterable<W>))元組稠集,該操作的另一個(gè)名稱是groupWith |
cartesian(otherDataset) | 當(dāng)在類型T的源數(shù)據(jù)集和類型U的參數(shù)數(shù)據(jù)集上進(jìn)行此操作后(笛卡爾積)奶段,返回一個(gè)元組數(shù)據(jù)集,元組是所有可能的(T,U)對(duì) |
pipe(command, [envVars]) | 構(gòu)建一個(gè)管道剥纷,對(duì)RDD的每個(gè)分區(qū)執(zhí)行一個(gè)shell腳本(例如Perl痹籍、bash),RDD的每個(gè)元素作為該腳本進(jìn)程的輸入晦鞋,該腳本的輸出則生成一個(gè)新的RDD蹲缠,RDD的一個(gè)元素對(duì)應(yīng)輸出結(jié)果的一行字符串 |
coalesce(numPartitions) | 減少RDD的分區(qū)數(shù)到指定的numPartitions,在過(guò)濾完一個(gè)大規(guī)模數(shù)據(jù)集后執(zhí)行操作時(shí)悠垛,這個(gè)方法可以更高效 |
repartition(numPartitions) | 隨機(jī)打亂RDD里的數(shù)據(jù)集线定,然后重新創(chuàng)建指定數(shù)量的分區(qū),使分區(qū)內(nèi)的元素盡量平衡确买。該操作通常會(huì)在整個(gè)網(wǎng)絡(luò)內(nèi)重組數(shù)據(jù) |
repartitionAndSortWithinPartitions(partitioner) | 隨機(jī)打亂RDD里的數(shù)據(jù)集渔肩,然后重新創(chuàng)建指定數(shù)量的分區(qū),對(duì)分區(qū)結(jié)果中的數(shù)據(jù)拇惋,根據(jù)他們的key值進(jìn)行沖排序。該操作比先調(diào)用repartition再調(diào)用排序操作更高效抹剩,因?yàn)榕判虿僮魍瑫r(shí)會(huì)在每個(gè)shuffle機(jī)器上執(zhí)行 |
Actions操作
下表列舉了Spark支持的一些常用的actions操作撑帖,更詳細(xì)的操作請(qǐng)參考RDD的API文檔(Scala, Java, Python, R),和元組RDD文檔 (Scala, Java)澳眷。
Action | 含義 |
---|---|
reduce(func) | 使用函數(shù)func聚集數(shù)據(jù)集中的所有元素胡嘿,函數(shù)類型為((U,U) => U),該函數(shù)避暑是可交換和關(guān)聯(lián)的钳踊,才能正確的進(jìn)行并行計(jì)算衷敌。 |
collect() | 將數(shù)據(jù)集的所有元素以數(shù)組的形式返回給驅(qū)動(dòng)程序。該操作在一些過(guò)濾操作返回操作數(shù)據(jù)集的子集時(shí)非常有用拓瞪。 |
count() | 統(tǒng)計(jì)數(shù)據(jù)集的元素個(gè)數(shù) |
first() | 返回?cái)?shù)據(jù)集的第一個(gè)元素缴罗,等同于take(1) |
take(n) | 返回?cái)?shù)據(jù)集的前n個(gè)元素 |
takeSample(withReplacement, num, [seed]) | 隨機(jī)抽樣num個(gè)元素,withReplacement表示有無(wú)放回抽樣祭埂,第三個(gè)參數(shù)可選面氓,可以提供一個(gè)隨機(jī)數(shù)種子發(fā)生器 |
takeOrdered(n, [ordering]) | 返回?cái)?shù)據(jù)集中排過(guò)序的前n個(gè)元素 |
saveAsTextFile(path) | 將數(shù)據(jù)集中的元素以文本文件形式寫到文件系統(tǒng)(本地兵钮、HDFS或其他Hadoop支持的文件系統(tǒng))的path目錄下,Spark會(huì)調(diào)用每個(gè)元素的toString方法來(lái)將每個(gè)元素轉(zhuǎn)成一行字符串 |
saveAsSequenceFile(path) (Java and Scala) | 將數(shù)據(jù)集中的元素以Hadoop序列化文件形式寫到文件系統(tǒng)(本地舌界、HDFS或其他Hadoop支持的文件系統(tǒng))的path目錄下掘譬,這個(gè)操作可以在實(shí)現(xiàn)Hadoop Writable接口的鍵值對(duì)RDD上使用。在Scala中呻拌,這個(gè)操作也支持在可以隱式轉(zhuǎn)換為Writable對(duì)象的類型上使用葱轩,例如Int、Double藐握、String等基礎(chǔ)類型就支持隱式轉(zhuǎn)換靴拱。 |
saveAsObjectFile(path) (Java and Scala) | 將數(shù)據(jù)集中的元素以Java序列化形式寫到文件系統(tǒng)(本地、HDFS或其他Hadoop支持的文件系統(tǒng))的path目錄下趾娃,這個(gè)文件隨后可以通過(guò)調(diào)用SparkContext.objectFile()方法讀取到內(nèi)存中缭嫡。 |
countByKey() | 該操作只能在元組RDD上使用,它返回一個(gè)(K,Int)形式的hashmap抬闷,統(tǒng)計(jì)了每個(gè)Key對(duì)應(yīng)的值的計(jì)數(shù) |
foreach(func) | 對(duì)數(shù)據(jù)集中的每個(gè)元素執(zhí)行func操作妇蛀。注意,更改foreach作用域之外的非Accumulators變量笤成,可能會(huì)有預(yù)期之外的行為评架,詳情請(qǐng)參考《Spark指南》四、編程指引-Scala篇(上) 一文中的“理解閉包” |
Spark RDD API也提供了一些actions的異步版本炕泳,例如foreach對(duì)應(yīng)的異步版本為foreachAsync纵诞,該操作直接返回一個(gè)FutureAction對(duì)象給調(diào)用者,而不是阻塞在調(diào)用過(guò)程中培遵。
Shuffle操作
Spark中執(zhí)行的部分操作可能會(huì)觸發(fā)shuffle事件浙芙,shuffle是Spark中一種重新分發(fā)數(shù)據(jù)的機(jī)制,不同分區(qū)內(nèi)的數(shù)據(jù)因此會(huì)被重組籽腕。由于數(shù)據(jù)會(huì)在不同的分區(qū)和機(jī)器之間拷貝嗡呼,shuffle是一個(gè)很復(fù)雜和代價(jià)很高的操作。
背景
以reduceByKey(func)操作為例皇耗,該操作的作用是掃描數(shù)據(jù)集中的所有(K,V)元組南窗,對(duì)同一個(gè)Key下的所有values執(zhí)行func操作,最終返回一個(gè)新的RDD郎楼,值為(K,func(V))万伤。實(shí)際運(yùn)行時(shí),源數(shù)據(jù)集被拆分在不同機(jī)器的不同分區(qū)上呜袁,因此同一個(gè)key的所有values無(wú)法保證都在同一個(gè)分區(qū)或同一臺(tái)機(jī)器敌买,于是必須進(jìn)行一些重組操作才能計(jì)算最終的結(jié)果。
在Spark中阶界,數(shù)據(jù)通常都不會(huì)自動(dòng)跨分區(qū)分布到特定操作所需要的位置上放妈,在計(jì)算過(guò)程中北救,一個(gè)task只會(huì)操作一個(gè)單獨(dú)的分區(qū),因此為了重組所有的數(shù)據(jù)以保證reduceByKey task能夠執(zhí)行芜抒,Spark需要進(jìn)行一個(gè)all-to-all操作珍策,即,它必須在所有分區(qū)上對(duì)所有的keys進(jìn)行重新分組宅倒,把一個(gè)key對(duì)應(yīng)的所有values都跨分區(qū)讀取到一起攘宙,以計(jì)算該key最終的結(jié)果——這個(gè)過(guò)程就是shuffle。
如果希望在shuffle操作之后拐迁,讓數(shù)據(jù)有序蹭劈,可以執(zhí)行如下操作:
- 使用mapPartitions對(duì)分區(qū)進(jìn)行排序
- 使用repartitionAndSortWithinPartitions在重新分區(qū)的同時(shí)進(jìn)行排序
- 使用sortBy得到一個(gè)全局有序的RDD
會(huì)觸發(fā)shuffle的操作包括repartition系列操作,例如repartition线召、coalesce铺韧,ByKey系列操作,例如groupByKey缓淹、reduceByKey哈打,以及join系列操作,例如cogroup讯壶、join料仗。
性能
Shuffle操作是一個(gè)代價(jià)很大的操作,因?yàn)樗鼤?huì)涉及磁盤I/O伏蚊、數(shù)據(jù)系列化甚至是網(wǎng)絡(luò)I/O立轧。Spark會(huì)生成一些列的tasks,包括:map tasks用于組織數(shù)據(jù)躏吊,reduce tasks用于聚集數(shù)據(jù)(這其中的map和reduce并不等價(jià)于Spark的map和reduce操作)氛改。
在內(nèi)部,map task產(chǎn)生的數(shù)據(jù)會(huì)一直保存在內(nèi)存中(直到內(nèi)存滿了)比伏,然后他們基于目標(biāo)分區(qū)進(jìn)行重排序并寫入到一個(gè)單獨(dú)的文件中平窘。在reduce一端,tasks直接讀取相關(guān)的已排序過(guò)的數(shù)據(jù)塊凳怨。
某些shuffle操作會(huì)消耗大量的堆內(nèi)存,因?yàn)樵趥鬏敂?shù)據(jù)前后是鬼,他們需要使用內(nèi)存中的數(shù)據(jù)結(jié)構(gòu)來(lái)表示肤舞。例如,reduceByKey和aggregateByKey操作會(huì)在map端創(chuàng)建這些數(shù)據(jù)結(jié)構(gòu)均蜜,其他'ByKey操作會(huì)在reduce端生成這些數(shù)據(jù)結(jié)構(gòu)李剖。當(dāng)內(nèi)存中存放不下時(shí),Spark會(huì)將這些數(shù)據(jù)暫存到磁盤囤耳,因此會(huì)增加額外的磁盤I/O開銷篙顺,以及增加JVM的內(nèi)存回收偶芍。
Shuffle操作也會(huì)在磁盤上生成大量的臨時(shí)文件。從Spark 1.3開始德玫,這些文件會(huì)被一直保留在磁盤上匪蟀,直到相應(yīng)的RDDs不再被使用,然后被當(dāng)成垃圾回收宰僧。這樣做的原因是如果進(jìn)行相同的操作材彪,中間數(shù)據(jù)可以被重用,而不必重新創(chuàng)建琴儿。如果應(yīng)用程序一直保持著對(duì)RDDs的引用段化,或者GC操作不頻繁,那么垃圾回收可能會(huì)在很長(zhǎng)的一段時(shí)間以后才會(huì)進(jìn)行造成,于是显熏,一些長(zhǎng)時(shí)間運(yùn)行的Spark作業(yè)可能會(huì)占用大量的磁盤空間。臨時(shí)的存儲(chǔ)目錄可以在SPark context中配置spark.local.dir屬性晒屎。Shuffle操作支持很多配置參數(shù)來(lái)調(diào)整功能喘蟆,你可以參考Spark Configuration Guide一文中的“Shuffle Behavior”小節(jié)。
RDD持久化
Spark中最重要的功能之一是在operations操作執(zhí)行中在內(nèi)存內(nèi)持久化(或緩存)數(shù)據(jù)集夷磕。 當(dāng)持久化RDD時(shí)履肃,每個(gè)節(jié)點(diǎn)都存儲(chǔ)它在自己內(nèi)存中計(jì)算的數(shù)據(jù)分區(qū),并在該數(shù)據(jù)集(或從其派生的數(shù)據(jù)集)上的其他操作中重新使用它們坐桩。 這通常能為接下來(lái)的操作能夠提升效率(通常超過(guò)10倍)尺棋。 緩存則是迭代算法和快速交互使用的關(guān)鍵工具。
您可以使用persist()或cache()方法對(duì)RDD進(jìn)行持久化绵跷。 第一次在action中計(jì)算它后膘螟,它將被保存在節(jié)點(diǎn)上的內(nèi)存中。 Spark的緩存是容錯(cuò)的 - 如果RDD的任何分區(qū)丟失碾局,它將根據(jù)最初創(chuàng)建時(shí)的變換自動(dòng)重新計(jì)算荆残。
此外,每一個(gè)持久化的RDD可以指定不同的存儲(chǔ)級(jí)別净当,例如内斯,你可以將數(shù)據(jù)存儲(chǔ)在磁盤中,將它以java序列化對(duì)象的形式存儲(chǔ)在內(nèi)存中像啼,或者在不同的節(jié)點(diǎn)之間進(jìn)行復(fù)制拷貝俘闯。存儲(chǔ)級(jí)別可以通過(guò)給persist()方法傳遞一個(gè)StorageLevel對(duì)象(Scala,Java, Python) 來(lái)實(shí)現(xiàn)。cache()方法等同于使用StorageLevel.MEMORY_ONLY級(jí)別的持久化(即忽冻,在內(nèi)存中存儲(chǔ)反序列化的對(duì)象)真朗,完整的存儲(chǔ)級(jí)別包括如下:
存儲(chǔ)級(jí)別 | 含義 |
---|---|
MEMORY_ONLY | 將RDD在JVM中存儲(chǔ)為反序列化的java對(duì)象。如果存儲(chǔ)超出了限制僧诚,某些分區(qū)將不會(huì)被緩存遮婶,而是在他們需要被使用時(shí)重新計(jì)算蝗碎。該級(jí)別是默認(rèn)的存儲(chǔ)級(jí)別 |
MEMORY_AND_DISK | 將RDD在JVM中存儲(chǔ)為反序列化的java對(duì)象。如果存儲(chǔ)超出了限制旗扑,某些分區(qū)將被存儲(chǔ)到磁盤上蹦骑,需要使用的時(shí)候再?gòu)拇疟P中讀取 |
MEMORY_ONLY_SER (Java and Scala) | 將RDD在JVM中存儲(chǔ)為序列化的java對(duì)象(每個(gè)分區(qū)對(duì)應(yīng)一個(gè)字節(jié)數(shù)組),這種方式比反序列化方式具有更高的空間利用率肩豁。如果內(nèi)存中放不下脊串,數(shù)據(jù)不會(huì)被緩存,而是在使用時(shí)重新計(jì)算清钥。 |
MEMORY_AND_DISK_SER (Java and Scala) | 與MEMORY_ONLY_SER類似琼锋,不同的是內(nèi)存中放不下時(shí),數(shù)據(jù)會(huì)被存儲(chǔ)到磁盤上祟昭。 |
DISK_ONLY | 只在磁盤中存儲(chǔ)RDD分區(qū) |
MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等. | 與前綴的級(jí)別類似缕坎,但是數(shù)據(jù)會(huì)在兩個(gè)集群節(jié)點(diǎn)之間進(jìn)行拷貝 |
OFF_HEAP (實(shí)驗(yàn)階段) | 與MEMORY_ONLY_SER相似,但是數(shù)據(jù)被存儲(chǔ)在堆外內(nèi)存中篡悟,該特性需要事先開啟堆外內(nèi)存功能谜叹,參考off-heap memory |
注意,在python中搬葬,存儲(chǔ)的數(shù)據(jù)都是以Pickle進(jìn)行序列化荷腊,因此一些序列化級(jí)別對(duì)python版無(wú)效,可用的存儲(chǔ)級(jí)別包括:MEMORY_ONLY, MEMORY_ONLY_2, MEMORY_AND_DISK, MEMORY_AND_DISK_2, DISK_ONLY, and DISK_ONLY_2。
如果選擇存儲(chǔ)級(jí)別
Spark的存儲(chǔ)級(jí)別意味著需要在內(nèi)存使用率和CPU效率之間進(jìn)行不同的權(quán)衡。我們建議按照如下過(guò)程進(jìn)行選擇:
- 如果你的RDDs足夠存儲(chǔ)在內(nèi)存中(默認(rèn)使用MEMORY_ONLY)智听,那么盡量使用這種級(jí)別,它提供了最高效的運(yùn)行方式疾忍。
- 如果不行,優(yōu)先使用MEMORY_ONLY_SER然后選擇一個(gè)高效的序列化庫(kù)床三,這樣能更好的利用內(nèi)存空間一罩,也保證足夠的訪問速率。
- 盡量不要把數(shù)據(jù)存儲(chǔ)到磁盤上撇簿,除非重新計(jì)算數(shù)據(jù)代價(jià)很大聂渊,或者需要過(guò)濾一個(gè)非常大規(guī)模的數(shù)據(jù)。否則四瘫,重新計(jì)算可能都會(huì)比從內(nèi)存中讀取來(lái)得快汉嗽。
- 如果你希望提供快速的錯(cuò)誤恢復(fù)能力,可以使用可拷貝的存儲(chǔ)級(jí)別莲组,例如,為一個(gè)web應(yīng)用程序提供服務(wù)時(shí)可能會(huì)用到暖夭。所有的存儲(chǔ)級(jí)別在分區(qū)數(shù)據(jù)丟失時(shí)锹杈,都會(huì)通過(guò)重新計(jì)算來(lái)容錯(cuò)撵孤,但是數(shù)據(jù)具有拷貝的話,Spark就能夠繼續(xù)運(yùn)行tasks而不是等待他們完成重新計(jì)算竭望。
移除數(shù)據(jù)
Spark會(huì)自動(dòng)監(jiān)控緩存的使用邪码,然后使用LRU(最近最少使用)算法丟棄老的數(shù)據(jù)分區(qū)。如果你想手動(dòng)從緩存中移除數(shù)據(jù)咬清,調(diào)用RDD.unpersist()方法闭专。
共享變量
通常,當(dāng)傳遞給Spark操作(例如map操作旧烧、reduce操作等)的函數(shù)在遠(yuǎn)程節(jié)點(diǎn)上執(zhí)行時(shí)影钉,該函數(shù)所使用的變量都是獨(dú)立拷貝的。因此掘剪,這些變量的后續(xù)修改都不會(huì)反饋到驅(qū)動(dòng)程序上平委,跨任務(wù)共享讀寫變量將是很低效的。Spark提供了兩種共享變量類型來(lái)支持這種需求:廣播變量(broadcast variables)和累加器(Accumulators)夺谁。
廣播變量(broadcast variables)
廣播變量允許開發(fā)者在每臺(tái)機(jī)器上緩存一個(gè)只讀的變量廉赔,而不是重復(fù)的在tasks之間進(jìn)行實(shí)時(shí)拷貝。例如匾鸥,它允許以一種更高效方式給每個(gè)節(jié)點(diǎn)拷貝一個(gè)數(shù)據(jù)集的副本蜡塌。Spark本身會(huì)嘗試使用高效的廣播算法來(lái)分發(fā)廣播變量,以降低通信成本勿负。
Spark的actions操作通常由一系列由分布式的“shuffle”操作分割開的步驟組成馏艾,Spark會(huì)自動(dòng)廣播每一個(gè)步驟所需要使用的數(shù)據(jù)。以這種方式廣播的數(shù)據(jù)會(huì)以序列化的形式緩存笆环,然后再每個(gè)task執(zhí)行之前反序列化攒至。因此,顯示地創(chuàng)建廣播變量只有在如下情形才有用:1)跨越多個(gè)步驟的tasks需要使用相同的數(shù)據(jù)躁劣;2)以反序列化形式緩存數(shù)據(jù)很重要迫吐。
調(diào)用SparkContext.broadcast(v)可以創(chuàng)建一個(gè)以v的內(nèi)容進(jìn)行包裝的廣播變量,它的值可以通過(guò)value方法進(jìn)行引用账忘,代碼例如:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
創(chuàng)建完一個(gè)廣播變量之后志膀,你應(yīng)該盡量使用該廣播變量而不是直接使用變量v,以防在多個(gè)節(jié)點(diǎn)之間多次拷貝鳖擒。此外溉浙,對(duì)象v不應(yīng)該在被廣播之后進(jìn)行修改,確保所有節(jié)點(diǎn)獲得的廣播變量的值都是相同的(例如蒋荚,如果變量以后被傳輸?shù)叫鹿?jié)點(diǎn)戳稽,應(yīng)該還能保證是相同的值)。
累加器(Accumulators)
累加器是這樣一類變量,它們?cè)陉P(guān)聯(lián)和交換操作中僅支持“累加”操作惊奇,因此支持有效的并行操作互躬。 它們可以用于實(shí)現(xiàn)計(jì)數(shù)器(如在MapReduce中)或求和。 Spark本身支持?jǐn)?shù)值類型的累加器颂郎,但是你可以添加對(duì)新類型的支持吼渡。
下圖例舉了一個(gè)名為counter的累加器,累加器會(huì)被展示在web UI上乓序,并且每一個(gè)有修改操作的步驟都會(huì)展示出來(lái)寺酪,(Tasks表)。
跟蹤UI中的累加器對(duì)于理解每個(gè)運(yùn)行步驟的進(jìn)度又很大幫助替劈。
在Scala中寄雀,可以使用SparkContext.longAccumulator()或SparkContext.doubleAccumulator()來(lái)創(chuàng)建一個(gè)long型或double型的累加器,Tasks在執(zhí)行時(shí)抬纸,可以調(diào)用累加器的add方法來(lái)增加累加器的數(shù)值咙俩。然而,Tasks無(wú)法讀取累加器的值湿故,只有驅(qū)動(dòng)程序才能使用value方法獲取它們的取值阿趁。
例如,下面的例子展示了用一個(gè)累加器來(lái)累加一個(gè)數(shù)組的和:
scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Long = 10
這段代碼使用了內(nèi)置支持的Long類型累加器坛猪,開發(fā)者也可以通過(guò)創(chuàng)建AccumulatorV2的子類來(lái)實(shí)現(xiàn)自定義類型的累加器脖阵。AccumulatorV2抽象類有若干個(gè)方法,其中必須實(shí)現(xiàn)的包括:1)reset方法墅茉,用于將累加器清零命黔;2)add方法,用于累加新的值到累加器上就斤;3)merge方法悍募,用于合并相同類型的累加器。其他必須重寫的方法請(qǐng)參考API documentation洋机。例如下面的例子中坠宴,我們假設(shè)MyVector類用于表示數(shù)學(xué)上的向量:
class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {
private val myVector: MyVector = MyVector.createZeroVector
def reset(): Unit = {
myVector.reset()
}
def add(v: MyVector): Unit = {
myVector.add(v)
}
...
}
// Then, create an Accumulator of this type:
val myVectorAcc = new VectorAccumulatorV2
// Then, register it into spark context:
sc.register(myVectorAcc, "MyVectorAcc1")
可以注意到,當(dāng)開發(fā)者自定義累加器時(shí)绷旗,累加的結(jié)果類型允許和累加的參數(shù)不一致喜鼓。
對(duì)于僅在actions內(nèi)部進(jìn)行的累加器更新,Spark保證每個(gè)task的更新到累加器上的操作只會(huì)進(jìn)行一次衔肢,也就是說(shuō)庄岖,如果重啟tasks,累加器的值不會(huì)重復(fù)更新角骤。但是在transformations操作中隅忿,開發(fā)者需要注意到每個(gè)task的更新操作可能會(huì)被執(zhí)行不止一次,如果任務(wù)被重新執(zhí)行的話。
注意背桐,累加器并不會(huì)改變Spark的lazy評(píng)估模型刘陶。即,如果它們是在一個(gè)RDD操作內(nèi)更新數(shù)值牢撼,則只有當(dāng)該RDD開始計(jì)算時(shí)才會(huì)更新累加器的數(shù)值。因此疑苫,在對(duì)RDD應(yīng)用一些lazy transformation時(shí)(例如map操作)熏版,累加器操作并不保證實(shí)時(shí)更新。例如捍掺,下面的代碼運(yùn)行結(jié)果仍然是0:
val accum = sc.longAccumulator
data.map { x => accum.add(x); x }
// Here, accum is still 0 because no actions have caused the map operation to be computed.
部署運(yùn)行
提交應(yīng)用程序
在《Spark指南》三撼短、 提交應(yīng)用程序這章中,我們描述了如何提交應(yīng)用程序到集群中運(yùn)行挺勿。簡(jiǎn)潔的說(shuō)曲横,我們需要把應(yīng)用程序打包(對(duì)于Java/Scala,打成JAR包不瓶,對(duì)于Python禾嫉,對(duì)應(yīng)的是一些.py或.zip文件),然后調(diào)用bin/spark-submit來(lái)提交應(yīng)用程序到集群中蚊丐。
在Java/Scala中啟動(dòng)
org.apache.spark.launcher這個(gè)包提供了一些建議的Java API熙参,可以以子進(jìn)程的形式啟動(dòng)Spark作業(yè)。
單元測(cè)試
Spark可以很友好的使用各種單元測(cè)試框架進(jìn)行單元測(cè)試麦备,你需要做的只要?jiǎng)?chuàng)建一個(gè)SparkContext孽椰,然后將master URL設(shè)置為local就可以執(zhí)行測(cè)試你的程序,當(dāng)代碼邏輯執(zhí)行完畢凛篙,你需要調(diào)用SparkContext.stop()來(lái)關(guān)閉進(jìn)程黍匾。盡量在創(chuàng)建新的context之前,使用finally塊或測(cè)試框架的tearDown方法來(lái)關(guān)閉上下文呛梆,因?yàn)镾park不支持在單進(jìn)程中同時(shí)存在兩個(gè)上下文锐涯。
下一步
學(xué)完本章,你可以到Spark官網(wǎng)上看看一些Spark示例代碼削彬,或者全庸,Spark的安裝包中也有一個(gè)examples目錄(Scala,Java, Python, R),你可以使用bin/run-example腳本執(zhí)行這些示例程序融痛,例如:
./bin/run-example SparkPi
對(duì)于壶笼,Python或R語(yǔ)言的范例,你需要使用spark-submit來(lái)提交程序:
./bin/spark-submit examples/src/main/python/pi.py
./bin/spark-submit examples/src/main/r/dataframe.R
如果需要調(diào)優(yōu)雁刷,你可以參考configuration 和 tuning 這兩篇文檔覆劈。他們提出了一些最佳實(shí)踐的建議,用以保證你的數(shù)據(jù)在內(nèi)存中以一種高效的形式進(jìn)行存儲(chǔ)。
如果需要部署方面的幫助责语,你可以查閱cluster mode overview這篇文章炮障,它描述了分布式操作中相關(guān)的一些組件,以及Spark所支持的集群管理器坤候。
最后胁赢,你可以查閱Scala, Java, Python and R這些文檔來(lái)了解完整的API。