Spark-RDD算子調(diào)優(yōu)

1. RDD復(fù)用

在對(duì)RDD進(jìn)行算子時(shí)尊搬,要避免相同的算子和計(jì)算邏輯之下對(duì)RDD進(jìn)行重復(fù)的計(jì)算:

圖片

對(duì)上圖中的RDD計(jì)算架構(gòu)進(jìn)行修改远寸,得到如下圖所示的優(yōu)化結(jié)果:

圖片

2. 盡早filter

獲取到初始RDD后豪诲,應(yīng)該考慮盡早地過(guò)濾掉不需要的數(shù)據(jù),進(jìn)而減少對(duì)內(nèi)存的占用,從而提升Spark作業(yè)的運(yùn)行效率。

3. 讀取大量小文件-用wholeTextFiles

當(dāng)將一個(gè)文本文件讀取為 RDD 時(shí)喳篇,輸入的每一行都會(huì)成為RDD的一個(gè)元素。

也可以將多個(gè)完整的文本文件一次性讀取為一個(gè)pairRDD态辛,其中鍵是文件名,值是文件內(nèi)容挺尿。

val input:RDD[String] = sc.textFile("dir/*.log") 

如果傳遞目錄奏黑,則將目錄下的所有文件讀取作為RDD。文件路徑支持通配符编矾。

但是對(duì)于大量的小文件讀取效率并不高熟史,應(yīng)使用 wholeTextFiles
返回值為RDD[(String, String)],其中Key是文件的名稱(chēng)窄俏,Value是文件的內(nèi)容蹂匹。

def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)])

wholeTextFiles讀取小文件:

val filesRDD: RDD[(String, String)] =sc.wholeTextFiles("D:\\data\\files", minPartitions = 3)
val linesRDD: RDD[String] = filesRDD.flatMap(_._2.split("\\r\\n"))
val wordsRDD: RDD[String] = linesRDD.flatMap(_.split(" "))
wordsRDD.map((_, 1)).reduceByKey(_ + _).collect().foreach(println)

4. mapPartition和foreachPartition

  • mapPartitions

map(_….) 表示每一個(gè)元素

mapPartitions(_….) 表示每個(gè)分區(qū)的數(shù)據(jù)組成的迭代器

普通的map算子對(duì)RDD中的每一個(gè)元素進(jìn)行操作,而mapPartitions算子對(duì)RDD中每一個(gè)分區(qū)進(jìn)行操作凹蜈。

如果是普通的map算子限寞,假設(shè)一個(gè)partition有1萬(wàn)條數(shù)據(jù),那么map算子中的function要執(zhí)行1萬(wàn)次仰坦,也就是對(duì)每個(gè)元素進(jìn)行操作履植。

圖片

如果是mapPartition算子,由于一個(gè)task處理一個(gè)RDD的partition悄晃,那么一個(gè)task只會(huì)執(zhí)行一次function玫霎,function一次接收所有的partition數(shù)據(jù),效率比較高妈橄。

圖片

當(dāng)要把RDD中的所有數(shù)據(jù)通過(guò)JDBC寫(xiě)入數(shù)據(jù)庶近,如果使用map算子,那么需要對(duì)RDD中的每一個(gè)元素都創(chuàng)建一個(gè)數(shù)據(jù)庫(kù)連接眷蚓,這樣對(duì)資源的消耗很大鼻种,如果使用mapPartitions算子,那么針對(duì)一個(gè)分區(qū)的數(shù)據(jù)溪椎,只需要建立一個(gè)數(shù)據(jù)庫(kù)連接普舆。

mapPartitions算子也存在一些缺點(diǎn):對(duì)于普通的map操作,一次處理一條數(shù)據(jù)校读,如果在處理了2000條數(shù)據(jù)后內(nèi)存不足沼侣,那么可以將已經(jīng)處理完的2000條數(shù)據(jù)從內(nèi)存中垃圾回收掉;但是如果使用mapPartitions算子歉秫,但數(shù)據(jù)量非常大時(shí)蛾洛,function一次處理一個(gè)分區(qū)的數(shù)據(jù),如果一旦內(nèi)存不足,此時(shí)無(wú)法回收內(nèi)存轧膘,就可能會(huì)OOM钞螟,即內(nèi)存溢出。

因此谎碍,mapPartitions算子適用于數(shù)據(jù)量不是特別大的時(shí)候鳞滨,此時(shí)使用mapPartitions算子對(duì)性能的提升效果還是不錯(cuò)的。(當(dāng)數(shù)據(jù)量很大的時(shí)候蟆淀,一旦使用mapPartitions算子拯啦,就會(huì)直接OOM)

在項(xiàng)目中,應(yīng)該首先估算一下RDD的數(shù)據(jù)量熔任、每個(gè)partition的數(shù)據(jù)量褒链,以及分配給每個(gè)Executor的內(nèi)存資源,如果資源允許疑苔,可以考慮使用mapPartitions算子代替map甫匹。

  • foreachPartition

rrd.foreache(_….) 表示每一個(gè)元素

rrd.forPartitions(_….) 表示每個(gè)分區(qū)的數(shù)據(jù)組成的迭代器

在生產(chǎn)環(huán)境中,通常使用foreachPartition算子來(lái)完成數(shù)據(jù)庫(kù)的寫(xiě)入惦费,通過(guò)foreachPartition算子的特性兵迅,可以?xún)?yōu)化寫(xiě)數(shù)據(jù)庫(kù)的性能。

如果使用foreach算子完成數(shù)據(jù)庫(kù)的操作趁餐,由于foreach算子是遍歷RDD的每條數(shù)據(jù)喷兼,因此,每條數(shù)據(jù)都會(huì)建立一個(gè)數(shù)據(jù)庫(kù)連接后雷,這是對(duì)資源的極大浪費(fèi)季惯,因此,對(duì)于寫(xiě)數(shù)據(jù)庫(kù)操作臀突,我們應(yīng)當(dāng)使用foreachPartition算子勉抓。

與mapPartitions算子非常相似,foreachPartition是將RDD的每個(gè)分區(qū)作為遍歷對(duì)象候学,一次處理一個(gè)分區(qū)的數(shù)據(jù)藕筋,也就是說(shuō),如果涉及數(shù)據(jù)庫(kù)的相關(guān)操作梳码,一個(gè)分區(qū)的數(shù)據(jù)只需要?jiǎng)?chuàng)建一次數(shù)據(jù)庫(kù)連接隐圾,如下圖所示:


圖片

使用了foreachPartition 算子后,可以獲得以下的性能提升:

  1. 對(duì)于我們寫(xiě)的function函數(shù)掰茶,一次處理一整個(gè)分區(qū)的數(shù)據(jù)暇藏;

  2. 對(duì)于一個(gè)分區(qū)內(nèi)的數(shù)據(jù),創(chuàng)建唯一的數(shù)據(jù)庫(kù)連接濒蒋;

  3. 只需要向數(shù)據(jù)庫(kù)發(fā)送一次SQL語(yǔ)句和多組參數(shù)盐碱;

在生產(chǎn)環(huán)境中把兔,全部都會(huì)使用foreachPartition算子完成數(shù)據(jù)庫(kù)操作。foreachPartition算子存在一個(gè)問(wèn)題瓮顽,與mapPartitions算子類(lèi)似县好,如果一個(gè)分區(qū)的數(shù)據(jù)量特別大,可能會(huì)造成OOM暖混,即內(nèi)存溢出缕贡。

5. filter+coalesce/repartition(減少分區(qū))

在Spark任務(wù)中我們經(jīng)常會(huì)使用filter算子完成RDD中數(shù)據(jù)的過(guò)濾,在任務(wù)初始階段拣播,從各個(gè)分區(qū)中加載到的數(shù)據(jù)量是相近的善绎,但是一旦進(jìn)過(guò)filter過(guò)濾后,每個(gè)分區(qū)的數(shù)據(jù)量有可能會(huì)存在較大差異诫尽,如下圖所示:

圖片

根據(jù)上圖我們可以發(fā)現(xiàn)兩個(gè)問(wèn)題:

  1. 每個(gè)partition的數(shù)據(jù)量變小了,如果還按照之前與partition相等的task個(gè)數(shù)去處理當(dāng)前數(shù)據(jù)炬守,有點(diǎn)浪費(fèi)task的計(jì)算資源牧嫉;

  2. 每個(gè)partition的數(shù)據(jù)量不一樣,會(huì)導(dǎo)致后面的每個(gè)task處理每個(gè)partition數(shù)據(jù)的時(shí)候减途,每個(gè)task要處理的數(shù)據(jù)量不同酣藻,這很有可能導(dǎo)致數(shù)據(jù)傾斜問(wèn)題。

如上圖所示鳍置,第二個(gè)分區(qū)的數(shù)據(jù)過(guò)濾后只剩100條辽剧,而第三個(gè)分區(qū)的數(shù)據(jù)過(guò)濾后剩下800條,在相同的處理邏輯下税产,第二個(gè)分區(qū)對(duì)應(yīng)的task處理的數(shù)據(jù)量與第三個(gè)分區(qū)對(duì)應(yīng)的task處理的數(shù)據(jù)量差距達(dá)到了8倍怕轿,這也會(huì)導(dǎo)致運(yùn)行速度可能存在數(shù)倍的差距,這也就是數(shù)據(jù)傾斜問(wèn)題辟拷。

針對(duì)上述的兩個(gè)問(wèn)題撞羽,我們分別進(jìn)行分析:

  1. 針對(duì)第一個(gè)問(wèn)題,既然分區(qū)的數(shù)據(jù)量變小了衫冻,我們希望可以對(duì)分區(qū)數(shù)據(jù)進(jìn)行重新分配诀紊,比如將原來(lái)4個(gè)分區(qū)的數(shù)據(jù)轉(zhuǎn)化到2個(gè)分區(qū)中,這樣只需要用后面的兩個(gè)task進(jìn)行處理即可隅俘,避免了資源的浪費(fèi)邻奠。

  2. 針對(duì)第二個(gè)問(wèn)題,解決方法和第一個(gè)問(wèn)題的解決方法非常相似为居,對(duì)分區(qū)數(shù)據(jù)重新分配碌宴,讓每個(gè)partition中的數(shù)據(jù)量差不多,這就避免了數(shù)據(jù)傾斜問(wèn)題颜骤。

那么具體應(yīng)該如何實(shí)現(xiàn)上面的解決思路唧喉?我們需要coalesce算子。

repartition與coalesce都可以用來(lái)進(jìn)行重分區(qū),其中repartition只是coalesce接口中shuffle為true的簡(jiǎn)易實(shí)現(xiàn)八孝,coalesce默認(rèn)情況下不進(jìn)行shuffle董朝,但是可以通過(guò)參數(shù)進(jìn)行設(shè)置。

假設(shè)我們希望將原本的分區(qū)個(gè)數(shù)A通過(guò)重新分區(qū)變?yōu)锽干跛,那么有以下幾種情況:

  1. A > B(多數(shù)分區(qū)合并為少數(shù)分區(qū))
  • A與B相差值不大

    此時(shí)使用coalesce即可子姜,無(wú)需shuffle過(guò)程。

  • A與B相差值很大

    此時(shí)可以使用coalesce并且不啟用shuffle過(guò)程楼入,但是會(huì)導(dǎo)致合并過(guò)程性能低下哥捕,所以推薦設(shè)置coalesce的第二個(gè)參數(shù)為true,即啟動(dòng)shuffle過(guò)程嘉熊。

  1. A < B(少數(shù)分區(qū)分解為多數(shù)分區(qū))

此時(shí)使用repartition即可遥赚,如果使用coalesce需要將shuffle設(shè)置為true,否則coalesce無(wú)效阐肤。

我們可以在filter操作之后凫佛,使用coalesce算子針對(duì)每個(gè)partition的數(shù)據(jù)量各不相同的情況,壓縮partition的數(shù)量孕惜,而且讓每個(gè)partition的數(shù)據(jù)量盡量均勻緊湊愧薛,以便于后面的task進(jìn)行計(jì)算操作,在某種程度上能夠在一定程度上提升性能衫画。

注意:local模式是進(jìn)程內(nèi)模擬集群運(yùn)行毫炉,已經(jīng)對(duì)并行度和分區(qū)數(shù)量有了一定的內(nèi)部?jī)?yōu)化,因此不用去設(shè)置并行度和分區(qū)數(shù)量削罩。

6. 并行度設(shè)置

Spark作業(yè)中的并行度指各個(gè)stage的task的數(shù)量瞄勾。

如果并行度設(shè)置不合理而導(dǎo)致并行度過(guò)低,會(huì)導(dǎo)致資源的極大浪費(fèi)弥激,例如丰榴,20個(gè)Executor,每個(gè)Executor分配3個(gè)CPU core秆撮,而Spark作業(yè)有40個(gè)task四濒,這樣每個(gè)Executor分配到的task個(gè)數(shù)是2個(gè),這就使得每個(gè)Executor有一個(gè)CPU core空閑职辨,導(dǎo)致資源的浪費(fèi)盗蟆。

理想的并行度設(shè)置,應(yīng)該是讓并行度與資源相匹配舒裤,簡(jiǎn)單來(lái)說(shuō)就是在資源允許的前提下喳资,并行度要設(shè)置的盡可能大,達(dá)到可以充分利用集群資源腾供。合理的設(shè)置并行度仆邓,可以提升整個(gè)Spark作業(yè)的性能和運(yùn)行速度鲜滩。

Spark官方推薦,task數(shù)量應(yīng)該設(shè)置為Spark作業(yè)總CPU core數(shù)量的2~3倍节值。之所以沒(méi)有推薦task數(shù)量與CPU core總數(shù)相等徙硅,是因?yàn)閠ask的執(zhí)行時(shí)間不同,有的task執(zhí)行速度快而有的task執(zhí)行速度慢搞疗,如果task數(shù)量與CPU core總數(shù)相等嗓蘑,那么執(zhí)行快的task執(zhí)行完成后,會(huì)出現(xiàn)CPU core空閑的情況匿乃。如果task數(shù)量設(shè)置為CPU core總數(shù)的2~3倍桩皿,那么一個(gè)task執(zhí)行完畢后,CPU core會(huì)立刻執(zhí)行下一個(gè)task幢炸,降低了資源的浪費(fèi)泄隔,同時(shí)提升了Spark作業(yè)運(yùn)行的效率。

Spark作業(yè)并行度的設(shè)置如下:

val conf = new SparkConf().set("spark.default.parallelism", "500")

原則:讓 cpu 的 core(cpu 核心數(shù)) 充分利用起來(lái)宛徊, 如有100個(gè) core梅尤,那么并行度可以設(shè)置為200~300

7. repartition/coalesce調(diào)節(jié)并行度

Spark 中雖然可以設(shè)置并行度的調(diào)節(jié)策略岩调,但是,并行度的設(shè)置對(duì)于Spark SQL是不生效的赡盘,用戶(hù)設(shè)置的并行度只對(duì)于Spark SQL以外的所有Spark的stage生效号枕。

Spark SQL的并行度不允許用戶(hù)自己指定,Spark SQL自己會(huì)默認(rèn)根據(jù)hive表對(duì)應(yīng)的HDFS文件的split個(gè)數(shù)自動(dòng)設(shè)置Spark SQL所在的那個(gè)stage的并行度陨享,用戶(hù)自己通 spark.default.parallelism 參數(shù)指定的并行度葱淳,只會(huì)在沒(méi)Spark SQL的stage中生效。

由于Spark SQL所在stage的并行度無(wú)法手動(dòng)設(shè)置抛姑,如果數(shù)據(jù)量較大赞厕,并且此stage中后續(xù)的transformation操作有著復(fù)雜的業(yè)務(wù)邏輯,而Spark SQL自動(dòng)設(shè)置的task數(shù)量很少定硝,這就意味著每個(gè)task要處理為數(shù)不少的數(shù)據(jù)量皿桑,然后還要執(zhí)行非常復(fù)雜的處理邏輯,這就可能表現(xiàn)為第一個(gè)有Spark SQL的stage速度很慢蔬啡,而后續(xù)的沒(méi)有Spark SQL的stage運(yùn)行速度非郴逦辏快。

為了解決Spark SQL無(wú)法設(shè)置并行度和task數(shù)量的問(wèn)題箱蟆,我們可以使用repartition算子沟绪。

repartition 算子使用前后對(duì)比圖如下:

圖片

Spark SQL這一步的并行度和task數(shù)量肯定是沒(méi)有辦法去改變了,但是空猜,對(duì)于Spark SQL查詢(xún)出來(lái)的RDD绽慈,立即使用repartition算子恨旱,去重新進(jìn)行分區(qū),這樣可以重新分區(qū)為多個(gè)partition坝疼,從repartition之后的RDD操作搜贤,由于不再涉及Spark SQL,因此stage的并行度就會(huì)等于你手動(dòng)設(shè)置的值裙士,這樣就避免了Spark SQL所在的stage只能用少量的task去處理大量數(shù)據(jù)并執(zhí)行復(fù)雜的算法邏輯入客。使用repartition算子的前后對(duì)比如上圖所示

8. reduceByKey本地預(yù)聚合

reduceByKey相較于普通的shuffle操作一個(gè)顯著的特點(diǎn)就是會(huì)進(jìn)行map端的本地聚合腿椎,map端會(huì)先對(duì)本地的數(shù)據(jù)進(jìn)行combine操作桌硫,然后將數(shù)據(jù)寫(xiě)入給下個(gè)stage的每個(gè)task創(chuàng)建的文件中,也就是在map端啃炸,對(duì)每一個(gè)key對(duì)應(yīng)的value铆隘,執(zhí)行reduceByKey算子函數(shù)。

reduceByKey算子的執(zhí)行過(guò)程如下圖所示:

圖片

使用reduceByKey對(duì)性能的提升如下:

  1. 本地聚合后南用,在map端的數(shù)據(jù)量變少膀钠,減少了磁盤(pán)IO,也減少了對(duì)磁盤(pán)空間的占用裹虫;

  2. 本地聚合后肿嘲,下一個(gè)stage拉取的數(shù)據(jù)量變少,減少了網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)量筑公;

  3. 本地聚合后雳窟,在reduce端進(jìn)行數(shù)據(jù)緩存的內(nèi)存占用減少;

  4. 本地聚合后匣屡,在reduce端進(jìn)行聚合的數(shù)據(jù)量減少封救。

基于reduceByKey的本地聚合特征,我們應(yīng)該考慮使用reduceByKey代替其他的shuffle算子捣作,例如groupByKey誉结。

groupByKey與reduceByKey的運(yùn)行原理如下圖所示:

圖片
圖片

根據(jù)上圖可知,groupByKey不會(huì)進(jìn)行map端的聚合券躁,而是將所有map端的數(shù)據(jù)shuffle到reduce端惩坑,然后在reduce端進(jìn)行數(shù)據(jù)的聚合操作。由于reduceByKey有map端聚合的特性也拜,使得網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)量減小旭贬,因此效率要明顯高于groupByKey。

9. 使用持久化+checkpoint

Spark持久化在大部分情況下是沒(méi)有問(wèn)題的搪泳,但是有時(shí)數(shù)據(jù)可能會(huì)丟失稀轨,如果數(shù)據(jù)一旦丟失,就需要對(duì)丟失的數(shù)據(jù)重新進(jìn)行計(jì)算岸军,計(jì)算完后再緩存和使用奋刽,為了避免數(shù)據(jù)的丟失瓦侮,可以選擇對(duì)這個(gè)RDD進(jìn)行checkpoint,也就是將數(shù)據(jù)持久化一份到容錯(cuò)的文件系統(tǒng)上(比如HDFS)佣谐。

一個(gè)RDD緩存并checkpoint后肚吏,如果一旦發(fā)現(xiàn)緩存丟失,就會(huì)優(yōu)先查看checkpoint數(shù)據(jù)存不存在狭魂,如果有罚攀,就會(huì)使用checkpoint數(shù)據(jù),而不用重新計(jì)算雌澄。也即是說(shuō)斋泄,checkpoint可以視為cache的保障機(jī)制,如果cache失敗镐牺,就使用checkpoint的數(shù)據(jù)炫掐。

使用checkpoint的優(yōu)點(diǎn)在于提高了Spark作業(yè)的可靠性,一旦緩存出現(xiàn)問(wèn)題睬涧,不需要重新計(jì)算數(shù)據(jù)募胃,缺點(diǎn)在于,checkpoint時(shí)需要將數(shù)據(jù)寫(xiě)入HDFS等文件系統(tǒng)畦浓,對(duì)性能的消耗較大痹束。

持久化設(shè)置如下:

sc.setCheckpointDir(‘HDFS’)rdd.cache/persist(memory_and_disk)rdd.checkpoint

10. 使用廣播變量

默認(rèn)情況下,task中的算子中如果使用了外部的變量讶请,每個(gè)task都會(huì)獲取一份變量的復(fù)本祷嘶,這就造成了內(nèi)存的極大消耗。一方面秽梅,如果后續(xù)對(duì)RDD進(jìn)行持久化,可能就無(wú)法將RDD數(shù)據(jù)存入內(nèi)存剿牺,只能寫(xiě)入磁盤(pán)企垦,磁盤(pán)IO將會(huì)嚴(yán)重消耗性能;另一方面晒来,task在創(chuàng)建對(duì)象的時(shí)候钞诡,也許會(huì)發(fā)現(xiàn)堆內(nèi)存無(wú)法存放新創(chuàng)建的對(duì)象,這就會(huì)導(dǎo)致頻繁的GC湃崩,GC會(huì)導(dǎo)致工作線程停止荧降,進(jìn)而導(dǎo)致Spark暫停工作一段時(shí)間,嚴(yán)重影響Spark性能攒读。

假設(shè)當(dāng)前任務(wù)配置了20個(gè)Executor朵诫,指定500個(gè)task,有一個(gè)20M的變量被所有task共用薄扁,此時(shí)會(huì)在500個(gè)task中產(chǎn)生500個(gè)副本剪返,耗費(fèi)集群10G的內(nèi)存废累,如果使用了廣播變量, 那么每個(gè)Executor保存一個(gè)副本脱盲,一共消耗400M內(nèi)存邑滨,內(nèi)存消耗減少了5倍。

廣播變量在每個(gè)Executor保存一個(gè)副本钱反,此Executor的所有task共用此廣播變量掖看,這讓變量產(chǎn)生的副本數(shù)量大大減少

在初始階段面哥,廣播變量只在Driver中有一份副本哎壳。task在運(yùn)行的時(shí)候,想要使用廣播變量中的數(shù)據(jù)幢竹,此時(shí)首先會(huì)在自己本地的Executor對(duì)應(yīng)的BlockManager中嘗試獲取變量耳峦,如果本地沒(méi)有,BlockManager就會(huì)從Driver或者其他節(jié)點(diǎn)的BlockManager上遠(yuǎn)程拉取變量的復(fù)本焕毫,并由本地的BlockManager進(jìn)行管理蹲坷;之后此Executor的所有task都會(huì)直接從本地的BlockManager中獲取變量。

對(duì)于多個(gè)Task可能會(huì)共用的數(shù)據(jù)可以廣播到每個(gè)Executor上:

val 廣播變量名= sc.broadcast(會(huì)被各個(gè)Task用到的變量,即需要廣播的變量)廣播變量名.value//獲取廣播變量

11. 使用Kryo序列化

默認(rèn)情況下邑飒,Spark使用Java的序列化機(jī)制循签。Java的序列化機(jī)制使用方便,不需要額外的配置疙咸,在算子中使用的變量實(shí)現(xiàn)Serializable接口即可县匠,但是,Java序列化機(jī)制的效率不高撒轮,序列化速度慢并且序列化后的數(shù)據(jù)所占用的空間依然較大乞旦。

Spark官方宣稱(chēng)Kryo序列化機(jī)制比Java序列化機(jī)制性能提高10倍左右,Spark之所以沒(méi)有默認(rèn)使用Kryo作為序列化類(lèi)庫(kù)题山,是因?yàn)?strong>它不支持所有對(duì)象的序列化兰粉,同時(shí)Kryo需要用戶(hù)在使用前注冊(cè)需要序列化的類(lèi)型,不夠方便顶瞳,但從Spark 2.0.0版本開(kāi)始玖姑,簡(jiǎn)單類(lèi)型、簡(jiǎn)單類(lèi)型數(shù)組慨菱、字符串類(lèi)型的Shuffling RDDs 已經(jīng)默認(rèn)使用Kryo序列化方式了焰络。

Kryo序列化注冊(cè)方式的代碼如下:

public class MyKryoRegistrator implements KryoRegistrator{  
  @Override  public void registerClasses(Kryo kryo){
    kryo.register(StartupReportLogs.class);  
  }
}

配置Kryo序列化方式的代碼如下:

//創(chuàng)建SparkConf對(duì)象
val conf = new SparkConf().setMaster(…).setAppName(…)
//使用Kryo序列化庫(kù)
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");  
//在Kryo序列化庫(kù)中注冊(cè)自定義的類(lèi)集合
conf.set("spark.kryo.registrator", "bigdata.com.MyKryoRegistrator"); 
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市符喝,隨后出現(xiàn)的幾起案子闪彼,更是在濱河造成了極大的恐慌,老刑警劉巖协饲,帶你破解...
    沈念sama閱讀 207,113評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件备蚓,死亡現(xiàn)場(chǎng)離奇詭異课蔬,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)郊尝,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,644評(píng)論 2 381
  • 文/潘曉璐 我一進(jìn)店門(mén)二跋,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人流昏,你說(shuō)我怎么就攤上這事扎即。” “怎么了况凉?”我有些...
    開(kāi)封第一講書(shū)人閱讀 153,340評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵谚鄙,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我刁绒,道長(zhǎng)闷营,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,449評(píng)論 1 279
  • 正文 為了忘掉前任知市,我火速辦了婚禮傻盟,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘嫂丙。我一直安慰自己娘赴,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,445評(píng)論 5 374
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著,像睡著了一般胰挑。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上竿奏,一...
    開(kāi)封第一講書(shū)人閱讀 49,166評(píng)論 1 284
  • 那天,我揣著相機(jī)與錄音腥放,去河邊找鬼泛啸。 笑死,一個(gè)胖子當(dāng)著我的面吹牛捉片,可吹牛的內(nèi)容都是我干的平痰。 我是一名探鬼主播汞舱,決...
    沈念sama閱讀 38,442評(píng)論 3 401
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼伍纫,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了昂芜?” 一聲冷哼從身側(cè)響起莹规,我...
    開(kāi)封第一講書(shū)人閱讀 37,105評(píng)論 0 261
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎泌神,沒(méi)想到半個(gè)月后良漱,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體舞虱,經(jīng)...
    沈念sama閱讀 43,601評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,066評(píng)論 2 325
  • 正文 我和宋清朗相戀三年母市,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了矾兜。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,161評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡患久,死狀恐怖椅寺,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情蒋失,我是刑警寧澤返帕,帶...
    沈念sama閱讀 33,792評(píng)論 4 323
  • 正文 年R本政府宣布,位于F島的核電站篙挽,受9級(jí)特大地震影響荆萤,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜铣卡,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,351評(píng)論 3 307
  • 文/蒙蒙 一链韭、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧算行,春花似錦梧油、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,352評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至量淌,卻和暖如春骗村,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背呀枢。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,584評(píng)論 1 261
  • 我被黑心中介騙來(lái)泰國(guó)打工胚股, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人裙秋。 一個(gè)月前我還...
    沈念sama閱讀 45,618評(píng)論 2 355
  • 正文 我出身青樓琅拌,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親摘刑。 傳聞我的和親對(duì)象是個(gè)殘疾皇子进宝,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,916評(píng)論 2 344

推薦閱讀更多精彩內(nèi)容