《Spark指南》四交播、編程指引-Scala篇(下)

本文主要翻譯至鏈接且不局限于該文內(nèi)容,也加入了筆者實(shí)踐內(nèi)容践付,翻譯水平有限秦士,歡迎指正,轉(zhuǎn)載請(qǐng)注明出處永高。由于篇幅較長(zhǎng)隧土,四、編程指引-Scala篇拆成了上下兩篇命爬,上一篇請(qǐng)參考《Spark指南》四曹傀、編程指引-Scala篇(上)

<接上文>

使用key-value鍵值對(duì)

雖然Spark在RDDs上的大部門操作支持任意類型的對(duì)象饲宛,但是一些操作只能在鍵值對(duì)上使用皆愉。最常見的是分布式“shuffle”操作,例如根據(jù)key對(duì)元素進(jìn)行分組或聚集操作。在Scala中幕庐,這些操作可以在包含Tuple2對(duì)象(語(yǔ)言中的內(nèi)置tuple久锥,例如(a,b))的RDDs上使用。參考PairRDDFunctions异剥,你可以看到元組RDD上支持的操作瑟由。下面是一個(gè)使用元組RDD的例子:

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

這個(gè)例子在map中傳遞的函數(shù)生成了一個(gè)元組RDD,每個(gè)元組的鍵是該行的文本冤寿,值初始化為1歹苦,然后使用了reduceByKey操作對(duì)元組進(jìn)行規(guī)約操作,把相同key的值進(jìn)行相加督怜,最終得到的是每個(gè)文本鍵的計(jì)數(shù)殴瘦。

我們還可以使用counts.sortByKey()操作對(duì)得到的鍵值對(duì)進(jìn)行字母序排列,然后調(diào)用counts.collect()將結(jié)果以對(duì)象數(shù)組形式都收集到驅(qū)動(dòng)程序中号杠。

注意:當(dāng)使用普通的對(duì)象作為key時(shí)痴施,你應(yīng)該保證該對(duì)象類的equals()方法有一個(gè)對(duì)應(yīng)的hashCode()方法。更多的信息請(qǐng)參考Object.hashCode() 文檔

Transformations操作

下表列舉了Spark支持的一些常用的transformations操作究流,更詳細(xì)的操作請(qǐng)參考RDD的API文檔(ScalaJava动遭,Python芬探,R)和元組RDD的API文檔(ScalaJava)厘惦。

Transformation 含義
map(func) 將數(shù)據(jù)源的每個(gè)元素都傳給一個(gè)用戶定義的函數(shù)func偷仿,然后返回一個(gè)新的數(shù)據(jù)集
filter(func) 將數(shù)據(jù)源的每個(gè)元素都傳給一個(gè)用戶定義的過(guò)濾函數(shù)func,如果func返回true宵蕉,則收集這些元素酝静,最終以一個(gè)新的數(shù)據(jù)集返回
flatMap(func) 和map類似,但是每一個(gè)元素經(jīng)過(guò)func處理后可以返回0個(gè)或多個(gè)輸出元素羡玛,即func必須返回一個(gè)序列别智,而不是單個(gè)元素
mapPartitions(func) 和map類似,但是會(huì)在單獨(dú)的RDD分區(qū)(塊)上運(yùn)行稼稿,因此當(dāng)在類型T的RDD上運(yùn)行時(shí)薄榛,func必須是Iterator <T> => Iterator <U>類型
mapPartitionsWithIndex(func) 和mapPartitions類型,區(qū)別是會(huì)給func函數(shù)提供一個(gè)整數(shù)值以表示數(shù)據(jù)所在的分區(qū)让歼,因此當(dāng)在類型T的RDD上運(yùn)行時(shí)敞恋,func必須是 (Int, Iterator<T>) => Iterator<U>類型
sample(withReplacement, fraction, seed) 使用給定的隨機(jī)數(shù)發(fā)生器種子對(duì)一部分?jǐn)?shù)據(jù)(fraction)進(jìn)行抽樣,可以進(jìn)行替換(withReplacement為boolean值)
union(otherDataset) 將源數(shù)據(jù)集與參數(shù)數(shù)據(jù)集進(jìn)行合并谋右,得到一個(gè)新的數(shù)據(jù)集
intersection(otherDataset) 返回源數(shù)據(jù)集與參數(shù)數(shù)據(jù)集的交集數(shù)據(jù)集
distinct([numTasks])) 過(guò)濾掉源數(shù)據(jù)集中的重復(fù)元素硬猫,返回一個(gè)set集合
groupByKey([numTasks]) 當(dāng)在(K,V)鍵值對(duì)數(shù)據(jù)集上調(diào)用本函數(shù)時(shí),返回一個(gè)新的數(shù)據(jù)集,值為(K,Iterable<V>)鍵值對(duì)啸蜜,即對(duì)相同的key進(jìn)行分組坑雅。注意,1)如果你分組的目的是為了在每個(gè)key上執(zhí)行聚集操作盔性,例如求和或求均值霞丧,那么使用reduceByKey或者aggregateByKey會(huì)有更好的性能;2)默認(rèn)情況下冕香,輸出結(jié)果中的并行度取決于父RDD的分區(qū)數(shù)蛹尝。 你可以傳遞可選的numTasks參數(shù),增加運(yùn)行時(shí)的并行度
reduceByKey(func, [numTasks]) 當(dāng)在(K,V)數(shù)據(jù)集上調(diào)用本函數(shù)時(shí)悉尾,返回一個(gè)新的(K,V)數(shù)據(jù)集突那,其中,每個(gè)key對(duì)應(yīng)的所有值都會(huì)傳遞給給定的func函數(shù)func构眯,操作類型為(V,V) => V愕难。可以在第二個(gè)參數(shù)中傳遞numTasks來(lái)設(shè)置并行度惫霸。
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 當(dāng)在(K,V)數(shù)據(jù)集上調(diào)用本函數(shù)時(shí)猫缭,返回一個(gè)新的(K,U)數(shù)據(jù)集,其中壹店,seqOp函數(shù)用于聚集一個(gè)分區(qū)內(nèi)的結(jié)果(類型為(U,T) => U)猜丹,combOp函數(shù)用于聚集多個(gè)分區(qū)的結(jié)果(類型為(U,U) => U),zeroValue作為seqOp和combOp操作的初始值硅卢,典型情況下是一個(gè)自然元素射窒,例如,如果是列表拼接操作将塑,初始值可以指定為Nil脉顿,如果是求和操作,初始值可以指定為0点寥。 聚合結(jié)果的值類型允許與輸入值類型不同艾疟。 reduce任務(wù)的數(shù)量可以通過(guò)可選的第三個(gè)參數(shù)來(lái)配置。
sortByKey([ascending], [numTasks]) 當(dāng)在(K,V)數(shù)據(jù)集上調(diào)用本函數(shù)時(shí)开财,返回一個(gè)以K值排序的(K,V)數(shù)據(jù)集汉柒,其中,K必須實(shí)現(xiàn)Ordered接口责鳍,升序或降序可以在第一個(gè)參數(shù)指定(true為升序)碾褂,第二個(gè)參數(shù)指定并行度。
join(otherDataset, [numTasks]) 將源數(shù)據(jù)集(K,V)與目標(biāo)數(shù)據(jù)集(K,W)進(jìn)行連接操作历葛,返回一個(gè)新的數(shù)據(jù)集(K,(V,W))正塌,其中嘀略,K為所有的鍵,(V,W)為所有可能的組合乓诽。如果需要計(jì)算外部連接帜羊,可以使用leftOuterJoin、rightOuterJoin或fullOuterJoin方法鸠天,其含義與數(shù)據(jù)庫(kù)表的連接類似讼育。
cogroup(otherDataset, [numTasks]) 將源數(shù)據(jù)集(K,V)與目標(biāo)數(shù)據(jù)集(K,W)進(jìn)行此操作后,返回一個(gè)新的(K,(Iterable<V>, Iterable<W>))元組稠集,該操作的另一個(gè)名稱是groupWith
cartesian(otherDataset) 當(dāng)在類型T的源數(shù)據(jù)集和類型U的參數(shù)數(shù)據(jù)集上進(jìn)行此操作后(笛卡爾積)奶段,返回一個(gè)元組數(shù)據(jù)集,元組是所有可能的(T,U)對(duì)
pipe(command, [envVars]) 構(gòu)建一個(gè)管道剥纷,對(duì)RDD的每個(gè)分區(qū)執(zhí)行一個(gè)shell腳本(例如Perl痹籍、bash),RDD的每個(gè)元素作為該腳本進(jìn)程的輸入晦鞋,該腳本的輸出則生成一個(gè)新的RDD蹲缠,RDD的一個(gè)元素對(duì)應(yīng)輸出結(jié)果的一行字符串
coalesce(numPartitions) 減少RDD的分區(qū)數(shù)到指定的numPartitions,在過(guò)濾完一個(gè)大規(guī)模數(shù)據(jù)集后執(zhí)行操作時(shí)悠垛,這個(gè)方法可以更高效
repartition(numPartitions) 隨機(jī)打亂RDD里的數(shù)據(jù)集线定,然后重新創(chuàng)建指定數(shù)量的分區(qū),使分區(qū)內(nèi)的元素盡量平衡确买。該操作通常會(huì)在整個(gè)網(wǎng)絡(luò)內(nèi)重組數(shù)據(jù)
repartitionAndSortWithinPartitions(partitioner) 隨機(jī)打亂RDD里的數(shù)據(jù)集渔肩,然后重新創(chuàng)建指定數(shù)量的分區(qū),對(duì)分區(qū)結(jié)果中的數(shù)據(jù)拇惋,根據(jù)他們的key值進(jìn)行沖排序。該操作比先調(diào)用repartition再調(diào)用排序操作更高效抹剩,因?yàn)榕判虿僮魍瑫r(shí)會(huì)在每個(gè)shuffle機(jī)器上執(zhí)行

Actions操作

下表列舉了Spark支持的一些常用的actions操作撑帖,更詳細(xì)的操作請(qǐng)參考RDD的API文檔(Scala, Java, Python, R),和元組RDD文檔 (Scala, Java)澳眷。

Action 含義
reduce(func) 使用函數(shù)func聚集數(shù)據(jù)集中的所有元素胡嘿,函數(shù)類型為((U,U) => U),該函數(shù)避暑是可交換和關(guān)聯(lián)的钳踊,才能正確的進(jìn)行并行計(jì)算衷敌。
collect() 將數(shù)據(jù)集的所有元素以數(shù)組的形式返回給驅(qū)動(dòng)程序。該操作在一些過(guò)濾操作返回操作數(shù)據(jù)集的子集時(shí)非常有用拓瞪。
count() 統(tǒng)計(jì)數(shù)據(jù)集的元素個(gè)數(shù)
first() 返回?cái)?shù)據(jù)集的第一個(gè)元素缴罗,等同于take(1)
take(n) 返回?cái)?shù)據(jù)集的前n個(gè)元素
takeSample(withReplacement, num, [seed]) 隨機(jī)抽樣num個(gè)元素,withReplacement表示有無(wú)放回抽樣祭埂,第三個(gè)參數(shù)可選面氓,可以提供一個(gè)隨機(jī)數(shù)種子發(fā)生器
takeOrdered(n, [ordering]) 返回?cái)?shù)據(jù)集中排過(guò)序的前n個(gè)元素
saveAsTextFile(path) 將數(shù)據(jù)集中的元素以文本文件形式寫到文件系統(tǒng)(本地兵钮、HDFS或其他Hadoop支持的文件系統(tǒng))的path目錄下,Spark會(huì)調(diào)用每個(gè)元素的toString方法來(lái)將每個(gè)元素轉(zhuǎn)成一行字符串
saveAsSequenceFile(path) (Java and Scala) 將數(shù)據(jù)集中的元素以Hadoop序列化文件形式寫到文件系統(tǒng)(本地舌界、HDFS或其他Hadoop支持的文件系統(tǒng))的path目錄下掘譬,這個(gè)操作可以在實(shí)現(xiàn)Hadoop Writable接口的鍵值對(duì)RDD上使用。在Scala中呻拌,這個(gè)操作也支持在可以隱式轉(zhuǎn)換為Writable對(duì)象的類型上使用葱轩,例如Int、Double藐握、String等基礎(chǔ)類型就支持隱式轉(zhuǎn)換靴拱。
saveAsObjectFile(path) (Java and Scala) 將數(shù)據(jù)集中的元素以Java序列化形式寫到文件系統(tǒng)(本地、HDFS或其他Hadoop支持的文件系統(tǒng))的path目錄下趾娃,這個(gè)文件隨后可以通過(guò)調(diào)用SparkContext.objectFile()方法讀取到內(nèi)存中缭嫡。
countByKey() 該操作只能在元組RDD上使用,它返回一個(gè)(K,Int)形式的hashmap抬闷,統(tǒng)計(jì)了每個(gè)Key對(duì)應(yīng)的值的計(jì)數(shù)
foreach(func) 對(duì)數(shù)據(jù)集中的每個(gè)元素執(zhí)行func操作妇蛀。注意,更改foreach作用域之外的非Accumulators變量笤成,可能會(huì)有預(yù)期之外的行為评架,詳情請(qǐng)參考《Spark指南》四、編程指引-Scala篇(上) 一文中的“理解閉包”

Spark RDD API也提供了一些actions的異步版本炕泳,例如foreach對(duì)應(yīng)的異步版本為foreachAsync纵诞,該操作直接返回一個(gè)FutureAction對(duì)象給調(diào)用者,而不是阻塞在調(diào)用過(guò)程中培遵。

Shuffle操作

Spark中執(zhí)行的部分操作可能會(huì)觸發(fā)shuffle事件浙芙,shuffle是Spark中一種重新分發(fā)數(shù)據(jù)的機(jī)制,不同分區(qū)內(nèi)的數(shù)據(jù)因此會(huì)被重組籽腕。由于數(shù)據(jù)會(huì)在不同的分區(qū)和機(jī)器之間拷貝嗡呼,shuffle是一個(gè)很復(fù)雜和代價(jià)很高的操作。

背景

以reduceByKey(func)操作為例皇耗,該操作的作用是掃描數(shù)據(jù)集中的所有(K,V)元組南窗,對(duì)同一個(gè)Key下的所有values執(zhí)行func操作,最終返回一個(gè)新的RDD郎楼,值為(K,func(V))万伤。實(shí)際運(yùn)行時(shí),源數(shù)據(jù)集被拆分在不同機(jī)器的不同分區(qū)上呜袁,因此同一個(gè)key的所有values無(wú)法保證都在同一個(gè)分區(qū)或同一臺(tái)機(jī)器敌买,于是必須進(jìn)行一些重組操作才能計(jì)算最終的結(jié)果。

在Spark中阶界,數(shù)據(jù)通常都不會(huì)自動(dòng)跨分區(qū)分布到特定操作所需要的位置上放妈,在計(jì)算過(guò)程中北救,一個(gè)task只會(huì)操作一個(gè)單獨(dú)的分區(qū),因此為了重組所有的數(shù)據(jù)以保證reduceByKey task能夠執(zhí)行芜抒,Spark需要進(jìn)行一個(gè)all-to-all操作珍策,即,它必須在所有分區(qū)上對(duì)所有的keys進(jìn)行重新分組宅倒,把一個(gè)key對(duì)應(yīng)的所有values都跨分區(qū)讀取到一起攘宙,以計(jì)算該key最終的結(jié)果——這個(gè)過(guò)程就是shuffle。

如果希望在shuffle操作之后拐迁,讓數(shù)據(jù)有序蹭劈,可以執(zhí)行如下操作:

  • 使用mapPartitions對(duì)分區(qū)進(jìn)行排序
  • 使用repartitionAndSortWithinPartitions在重新分區(qū)的同時(shí)進(jìn)行排序
  • 使用sortBy得到一個(gè)全局有序的RDD

會(huì)觸發(fā)shuffle的操作包括repartition系列操作,例如repartition线召、coalesce铺韧,ByKey系列操作,例如groupByKey缓淹、reduceByKey哈打,以及join系列操作,例如cogroup讯壶、join料仗。

性能

Shuffle操作是一個(gè)代價(jià)很大的操作,因?yàn)樗鼤?huì)涉及磁盤I/O伏蚊、數(shù)據(jù)系列化甚至是網(wǎng)絡(luò)I/O立轧。Spark會(huì)生成一些列的tasks,包括:map tasks用于組織數(shù)據(jù)躏吊,reduce tasks用于聚集數(shù)據(jù)(這其中的map和reduce并不等價(jià)于Spark的map和reduce操作)氛改。

在內(nèi)部,map task產(chǎn)生的數(shù)據(jù)會(huì)一直保存在內(nèi)存中(直到內(nèi)存滿了)比伏,然后他們基于目標(biāo)分區(qū)進(jìn)行重排序并寫入到一個(gè)單獨(dú)的文件中平窘。在reduce一端,tasks直接讀取相關(guān)的已排序過(guò)的數(shù)據(jù)塊凳怨。

某些shuffle操作會(huì)消耗大量的堆內(nèi)存,因?yàn)樵趥鬏敂?shù)據(jù)前后是鬼,他們需要使用內(nèi)存中的數(shù)據(jù)結(jié)構(gòu)來(lái)表示肤舞。例如,reduceByKey和aggregateByKey操作會(huì)在map端創(chuàng)建這些數(shù)據(jù)結(jié)構(gòu)均蜜,其他'ByKey操作會(huì)在reduce端生成這些數(shù)據(jù)結(jié)構(gòu)李剖。當(dāng)內(nèi)存中存放不下時(shí),Spark會(huì)將這些數(shù)據(jù)暫存到磁盤囤耳,因此會(huì)增加額外的磁盤I/O開銷篙顺,以及增加JVM的內(nèi)存回收偶芍。

Shuffle操作也會(huì)在磁盤上生成大量的臨時(shí)文件。從Spark 1.3開始德玫,這些文件會(huì)被一直保留在磁盤上匪蟀,直到相應(yīng)的RDDs不再被使用,然后被當(dāng)成垃圾回收宰僧。這樣做的原因是如果進(jìn)行相同的操作材彪,中間數(shù)據(jù)可以被重用,而不必重新創(chuàng)建琴儿。如果應(yīng)用程序一直保持著對(duì)RDDs的引用段化,或者GC操作不頻繁,那么垃圾回收可能會(huì)在很長(zhǎng)的一段時(shí)間以后才會(huì)進(jìn)行造成,于是显熏,一些長(zhǎng)時(shí)間運(yùn)行的Spark作業(yè)可能會(huì)占用大量的磁盤空間。臨時(shí)的存儲(chǔ)目錄可以在SPark context中配置spark.local.dir屬性晒屎。Shuffle操作支持很多配置參數(shù)來(lái)調(diào)整功能喘蟆,你可以參考Spark Configuration Guide一文中的“Shuffle Behavior”小節(jié)。

RDD持久化

Spark中最重要的功能之一是在operations操作執(zhí)行中在內(nèi)存內(nèi)持久化(或緩存)數(shù)據(jù)集夷磕。 當(dāng)持久化RDD時(shí)履肃,每個(gè)節(jié)點(diǎn)都存儲(chǔ)它在自己內(nèi)存中計(jì)算的數(shù)據(jù)分區(qū),并在該數(shù)據(jù)集(或從其派生的數(shù)據(jù)集)上的其他操作中重新使用它們坐桩。 這通常能為接下來(lái)的操作能夠提升效率(通常超過(guò)10倍)尺棋。 緩存則是迭代算法和快速交互使用的關(guān)鍵工具。

您可以使用persist()或cache()方法對(duì)RDD進(jìn)行持久化绵跷。 第一次在action中計(jì)算它后膘螟,它將被保存在節(jié)點(diǎn)上的內(nèi)存中。 Spark的緩存是容錯(cuò)的 - 如果RDD的任何分區(qū)丟失碾局,它將根據(jù)最初創(chuàng)建時(shí)的變換自動(dòng)重新計(jì)算荆残。

此外,每一個(gè)持久化的RDD可以指定不同的存儲(chǔ)級(jí)別净当,例如内斯,你可以將數(shù)據(jù)存儲(chǔ)在磁盤中,將它以java序列化對(duì)象的形式存儲(chǔ)在內(nèi)存中像啼,或者在不同的節(jié)點(diǎn)之間進(jìn)行復(fù)制拷貝俘闯。存儲(chǔ)級(jí)別可以通過(guò)給persist()方法傳遞一個(gè)StorageLevel對(duì)象(Scala,Java, Python) 來(lái)實(shí)現(xiàn)。cache()方法等同于使用StorageLevel.MEMORY_ONLY級(jí)別的持久化(即忽冻,在內(nèi)存中存儲(chǔ)反序列化的對(duì)象)真朗,完整的存儲(chǔ)級(jí)別包括如下:

存儲(chǔ)級(jí)別 含義
MEMORY_ONLY 將RDD在JVM中存儲(chǔ)為反序列化的java對(duì)象。如果存儲(chǔ)超出了限制僧诚,某些分區(qū)將不會(huì)被緩存遮婶,而是在他們需要被使用時(shí)重新計(jì)算蝗碎。該級(jí)別是默認(rèn)的存儲(chǔ)級(jí)別
MEMORY_AND_DISK 將RDD在JVM中存儲(chǔ)為反序列化的java對(duì)象。如果存儲(chǔ)超出了限制旗扑,某些分區(qū)將被存儲(chǔ)到磁盤上蹦骑,需要使用的時(shí)候再?gòu)拇疟P中讀取
MEMORY_ONLY_SER (Java and Scala) 將RDD在JVM中存儲(chǔ)為序列化的java對(duì)象(每個(gè)分區(qū)對(duì)應(yīng)一個(gè)字節(jié)數(shù)組),這種方式比反序列化方式具有更高的空間利用率肩豁。如果內(nèi)存中放不下脊串,數(shù)據(jù)不會(huì)被緩存,而是在使用時(shí)重新計(jì)算清钥。
MEMORY_AND_DISK_SER (Java and Scala) 與MEMORY_ONLY_SER類似琼锋,不同的是內(nèi)存中放不下時(shí),數(shù)據(jù)會(huì)被存儲(chǔ)到磁盤上祟昭。
DISK_ONLY 只在磁盤中存儲(chǔ)RDD分區(qū)
MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等. 與前綴的級(jí)別類似缕坎,但是數(shù)據(jù)會(huì)在兩個(gè)集群節(jié)點(diǎn)之間進(jìn)行拷貝
OFF_HEAP (實(shí)驗(yàn)階段) 與MEMORY_ONLY_SER相似,但是數(shù)據(jù)被存儲(chǔ)在堆外內(nèi)存中篡悟,該特性需要事先開啟堆外內(nèi)存功能谜叹,參考off-heap memory

注意,在python中搬葬,存儲(chǔ)的數(shù)據(jù)都是以Pickle進(jìn)行序列化荷腊,因此一些序列化級(jí)別對(duì)python版無(wú)效,可用的存儲(chǔ)級(jí)別包括:MEMORY_ONLY, MEMORY_ONLY_2, MEMORY_AND_DISK, MEMORY_AND_DISK_2, DISK_ONLY, and DISK_ONLY_2。

如果選擇存儲(chǔ)級(jí)別

Spark的存儲(chǔ)級(jí)別意味著需要在內(nèi)存使用率和CPU效率之間進(jìn)行不同的權(quán)衡。我們建議按照如下過(guò)程進(jìn)行選擇:

  • 如果你的RDDs足夠存儲(chǔ)在內(nèi)存中(默認(rèn)使用MEMORY_ONLY)智听,那么盡量使用這種級(jí)別,它提供了最高效的運(yùn)行方式疾忍。
  • 如果不行,優(yōu)先使用MEMORY_ONLY_SER然后選擇一個(gè)高效的序列化庫(kù)床三,這樣能更好的利用內(nèi)存空間一罩,也保證足夠的訪問速率。
  • 盡量不要把數(shù)據(jù)存儲(chǔ)到磁盤上撇簿,除非重新計(jì)算數(shù)據(jù)代價(jià)很大聂渊,或者需要過(guò)濾一個(gè)非常大規(guī)模的數(shù)據(jù)。否則四瘫,重新計(jì)算可能都會(huì)比從內(nèi)存中讀取來(lái)得快汉嗽。
  • 如果你希望提供快速的錯(cuò)誤恢復(fù)能力,可以使用可拷貝的存儲(chǔ)級(jí)別莲组,例如,為一個(gè)web應(yīng)用程序提供服務(wù)時(shí)可能會(huì)用到暖夭。所有的存儲(chǔ)級(jí)別在分區(qū)數(shù)據(jù)丟失時(shí)锹杈,都會(huì)通過(guò)重新計(jì)算來(lái)容錯(cuò)撵孤,但是數(shù)據(jù)具有拷貝的話,Spark就能夠繼續(xù)運(yùn)行tasks而不是等待他們完成重新計(jì)算竭望。

移除數(shù)據(jù)

Spark會(huì)自動(dòng)監(jiān)控緩存的使用邪码,然后使用LRU(最近最少使用)算法丟棄老的數(shù)據(jù)分區(qū)。如果你想手動(dòng)從緩存中移除數(shù)據(jù)咬清,調(diào)用RDD.unpersist()方法闭专。

共享變量

通常,當(dāng)傳遞給Spark操作(例如map操作旧烧、reduce操作等)的函數(shù)在遠(yuǎn)程節(jié)點(diǎn)上執(zhí)行時(shí)影钉,該函數(shù)所使用的變量都是獨(dú)立拷貝的。因此掘剪,這些變量的后續(xù)修改都不會(huì)反饋到驅(qū)動(dòng)程序上平委,跨任務(wù)共享讀寫變量將是很低效的。Spark提供了兩種共享變量類型來(lái)支持這種需求:廣播變量(broadcast variables)和累加器(Accumulators)夺谁。

廣播變量(broadcast variables)

廣播變量允許開發(fā)者在每臺(tái)機(jī)器上緩存一個(gè)只讀的變量廉赔,而不是重復(fù)的在tasks之間進(jìn)行實(shí)時(shí)拷貝。例如匾鸥,它允許以一種更高效方式給每個(gè)節(jié)點(diǎn)拷貝一個(gè)數(shù)據(jù)集的副本蜡塌。Spark本身會(huì)嘗試使用高效的廣播算法來(lái)分發(fā)廣播變量,以降低通信成本勿负。

Spark的actions操作通常由一系列由分布式的“shuffle”操作分割開的步驟組成馏艾,Spark會(huì)自動(dòng)廣播每一個(gè)步驟所需要使用的數(shù)據(jù)。以這種方式廣播的數(shù)據(jù)會(huì)以序列化的形式緩存笆环,然后再每個(gè)task執(zhí)行之前反序列化攒至。因此,顯示地創(chuàng)建廣播變量只有在如下情形才有用:1)跨越多個(gè)步驟的tasks需要使用相同的數(shù)據(jù)躁劣;2)以反序列化形式緩存數(shù)據(jù)很重要迫吐。

調(diào)用SparkContext.broadcast(v)可以創(chuàng)建一個(gè)以v的內(nèi)容進(jìn)行包裝的廣播變量,它的值可以通過(guò)value方法進(jìn)行引用账忘,代碼例如:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

創(chuàng)建完一個(gè)廣播變量之后志膀,你應(yīng)該盡量使用該廣播變量而不是直接使用變量v,以防在多個(gè)節(jié)點(diǎn)之間多次拷貝鳖擒。此外溉浙,對(duì)象v不應(yīng)該在被廣播之后進(jìn)行修改,確保所有節(jié)點(diǎn)獲得的廣播變量的值都是相同的(例如蒋荚,如果變量以后被傳輸?shù)叫鹿?jié)點(diǎn)戳稽,應(yīng)該還能保證是相同的值)。

累加器(Accumulators)

累加器是這樣一類變量,它們?cè)陉P(guān)聯(lián)和交換操作中僅支持“累加”操作惊奇,因此支持有效的并行操作互躬。 它們可以用于實(shí)現(xiàn)計(jì)數(shù)器(如在MapReduce中)或求和。 Spark本身支持?jǐn)?shù)值類型的累加器颂郎,但是你可以添加對(duì)新類型的支持吼渡。

下圖例舉了一個(gè)名為counter的累加器,累加器會(huì)被展示在web UI上乓序,并且每一個(gè)有修改操作的步驟都會(huì)展示出來(lái)寺酪,(Tasks表)。

累加器示例

跟蹤UI中的累加器對(duì)于理解每個(gè)運(yùn)行步驟的進(jìn)度又很大幫助替劈。

在Scala中寄雀,可以使用SparkContext.longAccumulator()或SparkContext.doubleAccumulator()來(lái)創(chuàng)建一個(gè)long型或double型的累加器,Tasks在執(zhí)行時(shí)抬纸,可以調(diào)用累加器的add方法來(lái)增加累加器的數(shù)值咙俩。然而,Tasks無(wú)法讀取累加器的值湿故,只有驅(qū)動(dòng)程序才能使用value方法獲取它們的取值阿趁。

例如,下面的例子展示了用一個(gè)累加器來(lái)累加一個(gè)數(shù)組的和:

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Long = 10

這段代碼使用了內(nèi)置支持的Long類型累加器坛猪,開發(fā)者也可以通過(guò)創(chuàng)建AccumulatorV2的子類來(lái)實(shí)現(xiàn)自定義類型的累加器脖阵。AccumulatorV2抽象類有若干個(gè)方法,其中必須實(shí)現(xiàn)的包括:1)reset方法墅茉,用于將累加器清零命黔;2)add方法,用于累加新的值到累加器上就斤;3)merge方法悍募,用于合并相同類型的累加器。其他必須重寫的方法請(qǐng)參考API documentation洋机。例如下面的例子中坠宴,我們假設(shè)MyVector類用于表示數(shù)學(xué)上的向量:

class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {

  private val myVector: MyVector = MyVector.createZeroVector

  def reset(): Unit = {
    myVector.reset()
  }

  def add(v: MyVector): Unit = {
    myVector.add(v)
  }
  ...
}

// Then, create an Accumulator of this type:
val myVectorAcc = new VectorAccumulatorV2
// Then, register it into spark context:
sc.register(myVectorAcc, "MyVectorAcc1")

可以注意到,當(dāng)開發(fā)者自定義累加器時(shí)绷旗,累加的結(jié)果類型允許和累加的參數(shù)不一致喜鼓。

對(duì)于僅在actions內(nèi)部進(jìn)行的累加器更新,Spark保證每個(gè)task的更新到累加器上的操作只會(huì)進(jìn)行一次衔肢,也就是說(shuō)庄岖,如果重啟tasks,累加器的值不會(huì)重復(fù)更新角骤。但是在transformations操作中隅忿,開發(fā)者需要注意到每個(gè)task的更新操作可能會(huì)被執(zhí)行不止一次,如果任務(wù)被重新執(zhí)行的話。

注意背桐,累加器并不會(huì)改變Spark的lazy評(píng)估模型刘陶。即,如果它們是在一個(gè)RDD操作內(nèi)更新數(shù)值牢撼,則只有當(dāng)該RDD開始計(jì)算時(shí)才會(huì)更新累加器的數(shù)值。因此疑苫,在對(duì)RDD應(yīng)用一些lazy transformation時(shí)(例如map操作)熏版,累加器操作并不保證實(shí)時(shí)更新。例如捍掺,下面的代碼運(yùn)行結(jié)果仍然是0:

val accum = sc.longAccumulator
data.map { x => accum.add(x); x }
// Here, accum is still 0 because no actions have caused the map operation to be computed.

部署運(yùn)行

提交應(yīng)用程序

《Spark指南》三撼短、 提交應(yīng)用程序這章中,我們描述了如何提交應(yīng)用程序到集群中運(yùn)行挺勿。簡(jiǎn)潔的說(shuō)曲横,我們需要把應(yīng)用程序打包(對(duì)于Java/Scala,打成JAR包不瓶,對(duì)于Python禾嫉,對(duì)應(yīng)的是一些.py或.zip文件),然后調(diào)用bin/spark-submit來(lái)提交應(yīng)用程序到集群中蚊丐。

在Java/Scala中啟動(dòng)

org.apache.spark.launcher這個(gè)包提供了一些建議的Java API熙参,可以以子進(jìn)程的形式啟動(dòng)Spark作業(yè)。

單元測(cè)試

Spark可以很友好的使用各種單元測(cè)試框架進(jìn)行單元測(cè)試麦备,你需要做的只要?jiǎng)?chuàng)建一個(gè)SparkContext孽椰,然后將master URL設(shè)置為local就可以執(zhí)行測(cè)試你的程序,當(dāng)代碼邏輯執(zhí)行完畢凛篙,你需要調(diào)用SparkContext.stop()來(lái)關(guān)閉進(jìn)程黍匾。盡量在創(chuàng)建新的context之前,使用finally塊或測(cè)試框架的tearDown方法來(lái)關(guān)閉上下文呛梆,因?yàn)镾park不支持在單進(jìn)程中同時(shí)存在兩個(gè)上下文锐涯。

下一步

學(xué)完本章,你可以到Spark官網(wǎng)上看看一些Spark示例代碼削彬,或者全庸,Spark的安裝包中也有一個(gè)examples目錄(Scala,Java, Python, R),你可以使用bin/run-example腳本執(zhí)行這些示例程序融痛,例如:

./bin/run-example SparkPi

對(duì)于壶笼,Python或R語(yǔ)言的范例,你需要使用spark-submit來(lái)提交程序:

./bin/spark-submit examples/src/main/python/pi.py
./bin/spark-submit examples/src/main/r/dataframe.R

如果需要調(diào)優(yōu)雁刷,你可以參考configurationtuning 這兩篇文檔覆劈。他們提出了一些最佳實(shí)踐的建議,用以保證你的數(shù)據(jù)在內(nèi)存中以一種高效的形式進(jìn)行存儲(chǔ)。

如果需要部署方面的幫助责语,你可以查閱cluster mode overview這篇文章炮障,它描述了分布式操作中相關(guān)的一些組件,以及Spark所支持的集群管理器坤候。

最后胁赢,你可以查閱Scala, Java, Python and R這些文檔來(lái)了解完整的API。

相關(guān)的文章

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市顽照,隨后出現(xiàn)的幾起案子由蘑,更是在濱河造成了極大的恐慌,老刑警劉巖代兵,帶你破解...
    沈念sama閱讀 218,682評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件尼酿,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡植影,警方通過(guò)查閱死者的電腦和手機(jī)谓媒,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,277評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)何乎,“玉大人句惯,你說(shuō)我怎么就攤上這事≈Ь龋” “怎么了抢野?”我有些...
    開封第一講書人閱讀 165,083評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)各墨。 經(jīng)常有香客問我指孤,道長(zhǎng),這世上最難降的妖魔是什么贬堵? 我笑而不...
    開封第一講書人閱讀 58,763評(píng)論 1 295
  • 正文 為了忘掉前任恃轩,我火速辦了婚禮,結(jié)果婚禮上黎做,老公的妹妹穿的比我還像新娘叉跛。我一直安慰自己,他們只是感情好蒸殿,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,785評(píng)論 6 392
  • 文/花漫 我一把揭開白布筷厘。 她就那樣靜靜地躺著鸣峭,像睡著了一般。 火紅的嫁衣襯著肌膚如雪酥艳。 梳的紋絲不亂的頭發(fā)上摊溶,一...
    開封第一講書人閱讀 51,624評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音充石,去河邊找鬼莫换。 笑死,一個(gè)胖子當(dāng)著我的面吹牛骤铃,可吹牛的內(nèi)容都是我干的浓镜。 我是一名探鬼主播,決...
    沈念sama閱讀 40,358評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼劲厌,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了听隐?” 一聲冷哼從身側(cè)響起补鼻,我...
    開封第一講書人閱讀 39,261評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎雅任,沒想到半個(gè)月后风范,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,722評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡沪么,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評(píng)論 3 336
  • 正文 我和宋清朗相戀三年硼婿,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片禽车。...
    茶點(diǎn)故事閱讀 40,030評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡寇漫,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出殉摔,到底是詐尸還是另有隱情州胳,我是刑警寧澤,帶...
    沈念sama閱讀 35,737評(píng)論 5 346
  • 正文 年R本政府宣布逸月,位于F島的核電站栓撞,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏碗硬。R本人自食惡果不足惜瓤湘,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,360評(píng)論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望恩尾。 院中可真熱鬧弛说,春花似錦、人聲如沸翰意。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,941評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至虎囚,卻和暖如春角塑,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背淘讥。 一陣腳步聲響...
    開封第一講書人閱讀 33,057評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工圃伶, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人蒲列。 一個(gè)月前我還...
    沈念sama閱讀 48,237評(píng)論 3 371
  • 正文 我出身青樓窒朋,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親蝗岖。 傳聞我的和親對(duì)象是個(gè)殘疾皇子侥猩,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,976評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容