spark 面試題(1)

1.spark中的RDD是什么冀续,有哪些特性?

答:RDD(Resilient Distributed Dataset)叫做分布式數(shù)據(jù)集,是spark中最基本的數(shù)據(jù)抽象鳍怨,它代表一個(gè)不可變串结,可分區(qū)哑子,里面的元素可以并行計(jì)算的集合

Dataset:就是一個(gè)集合,用于存放數(shù)據(jù)的

Destributed:分布式肌割,可以并行在集群計(jì)算

Resilient:表示彈性的卧蜓,彈性表示

1.RDD中的數(shù)據(jù)可以存儲(chǔ)在內(nèi)存或者磁盤中;

2.RDD中的分區(qū)是可以改變的把敞;

五大特性:

1.A list of partitions:一個(gè)分區(qū)列表弥奸,RDD中的數(shù)據(jù)都存儲(chǔ)在一個(gè)分區(qū)列表中

2.A function for computing each split:作用在每一個(gè)分區(qū)中的函數(shù)

3.A list of dependencies on other RDDs:一個(gè)RDD依賴于其他多個(gè)RDD,這個(gè)點(diǎn)很重要奋早,RDD的容錯(cuò)機(jī)制就是依據(jù)這個(gè)特性而來(lái)的

4.Optionally,a Partitioner for key-value RDDs(eg:to say that the RDD is hash-partitioned):可選的盛霎,針對(duì)于kv類型的RDD才有這個(gè)特性,作用是決定了數(shù)據(jù)的來(lái)源以及數(shù)據(jù)處理后的去向

5.可選項(xiàng)耽装,數(shù)據(jù)本地性愤炸,數(shù)據(jù)位置最優(yōu)

2.概述一下spark中的常用算子區(qū)別(map,mapPartitions,foreach掉奄,foreachPatition)

答:map:用于遍歷RDD规个,將函數(shù)應(yīng)用于每一個(gè)元素,返回新的RDD(transformation算子)

foreach:用于遍歷RDD姓建,將函數(shù)應(yīng)用于每一個(gè)元素诞仓,無(wú)返回值(action算子)

mapPatitions:用于遍歷操作RDD中的每一個(gè)分區(qū),返回生成一個(gè)新的RDD(transformation算子)

foreachPatition:用于遍歷操作RDD中的每一個(gè)分區(qū)速兔,無(wú)返回值(action算子)

總結(jié):一般使用mapPatitions和foreachPatition算子比map和foreach更加高效狂芋,推薦使用

3.談?wù)剆park中的寬窄依賴:


答:RDD和它的父RDD的關(guān)系有兩種類型:窄依賴和寬依賴

寬依賴:指的是多個(gè)子RDD的Partition會(huì)依賴同一個(gè)父RDD的Partition,關(guān)系是一對(duì)多憨栽,父RDD的一個(gè)分區(qū)的數(shù)據(jù)去到子RDD的不同分區(qū)里面帜矾,會(huì)有shuffle的產(chǎn)生

窄依賴:指的是每一個(gè)父RDD的Partition最多被子RDD的一個(gè)partition使用翼虫,是一對(duì)一的,也就是父RDD的一個(gè)分區(qū)去到了子RDD的一個(gè)分區(qū)中屡萤,這個(gè)過(guò)程沒有shuffle產(chǎn)生

區(qū)分的標(biāo)準(zhǔn)就是看父RDD的一個(gè)分區(qū)的數(shù)據(jù)的流向珍剑,要是流向一個(gè)partition的話就是窄依賴,否則就是寬依賴死陆,如圖所示:

4.spark中如何劃分stage:

答:概念:Spark任務(wù)會(huì)根據(jù)RDD之間的依賴關(guān)系招拙,形成一個(gè)DAG有向無(wú)環(huán)圖,DAG會(huì)提交給DAGScheduler措译,DAGScheduler會(huì)把DAG劃分相互依賴的多個(gè)stage别凤,劃分依據(jù)就是寬窄依賴,遇到寬依賴就劃分stage领虹,每個(gè)stage包含一個(gè)或多個(gè)task规哪,然后將這些task以taskSet的形式提交給TaskScheduler運(yùn)行,stage是由一組并行的task組成

1.spark程序中可以因?yàn)椴煌腶ction觸發(fā)眾多的job塌衰,一個(gè)程序中可以有很多的job诉稍,每一個(gè)job是由一個(gè)或者多個(gè)stage構(gòu)成的,后面的stage依賴于前面的stage最疆,也就是說(shuō)只有前面依賴的stage計(jì)算完畢后杯巨,后面的stage才會(huì)運(yùn)行;

2.stage 的劃分標(biāo)準(zhǔn)就是寬依賴:何時(shí)產(chǎn)生寬依賴就會(huì)產(chǎn)生一個(gè)新的stage努酸,例如reduceByKey,groupByKey服爷,join的算子,會(huì)導(dǎo)致寬依賴的產(chǎn)生获诈;

3.切割規(guī)則:從后往前仍源,遇到寬依賴就切割stage;

4.圖解:

5.計(jì)算格式:pipeline管道計(jì)算模式烙荷,piepeline只是一種計(jì)算思想镜会,一種模式

6.spark的pipeline管道計(jì)算模式相當(dāng)于執(zhí)行了一個(gè)高階函數(shù)檬寂,也就是說(shuō)來(lái)一條數(shù)據(jù)然后計(jì)算一條數(shù)據(jù)终抽,會(huì)把所有的邏輯走完,然后落地桶至,而MapReduce是1+1=2昼伴,2+1=3這樣的計(jì)算模式,也就是計(jì)算完落地镣屹,然后再計(jì)算圃郊,然后再落地到磁盤或者內(nèi)存,最后數(shù)據(jù)是落在計(jì)算節(jié)點(diǎn)上女蜈,按reduce的hash分區(qū)落地持舆。管道計(jì)算模式完全基于內(nèi)存計(jì)算色瘩,所以比MapReduce快的原因。

7.管道中的RDD何時(shí)落地:shuffle write的時(shí)候逸寓,對(duì)RDD進(jìn)行持久化的時(shí)候居兆。

8.stage的task的并行度是由stage的最后一個(gè)RDD的分區(qū)數(shù)來(lái)決定的,一般來(lái)說(shuō)竹伸,一個(gè)partition對(duì)應(yīng)一個(gè)task泥栖,但最后reduce的時(shí)候可以手動(dòng)改變r(jià)educe的個(gè)數(shù),也就是改變最后一個(gè)RDD的分區(qū)數(shù)勋篓,也就改變了并行度吧享。例如:reduceByKey(_+_,3)

9.優(yōu)化:提高stage的并行度:reduceByKey(_+_,patition的個(gè)數(shù)) ,join(_+_,patition的個(gè)數(shù))

4.DAGScheduler分析:

答:概述:是一個(gè)面向stage 的調(diào)度器譬嚣;

主要入?yún)⒂校篸agScheduler.runJob(rdd,?cleanedFunc, partitions, callSite, allowLocal,resultHandler, localProperties.get)

rdd: final RDD钢颂;

cleanedFunc: 計(jì)算每個(gè)分區(qū)的函數(shù);

resultHander: 結(jié)果偵聽器孤荣;

主要功能:1.接受用戶提交的job甸陌;

2.將job根據(jù)類型劃分為不同的stage,記錄那些RDD盐股,stage被物化钱豁,并在每一個(gè)stage內(nèi)產(chǎn)生一系列的task,并封裝成taskset疯汁;

3.決定每個(gè)task的最佳位置牲尺,任務(wù)在數(shù)據(jù)所在節(jié)點(diǎn)上運(yùn)行,并結(jié)合當(dāng)前的緩存情況幌蚊,將taskSet提交給TaskScheduler谤碳;

4.重新提交shuffle輸出丟失的stage給taskScheduler;

注:一個(gè)stage內(nèi)部的錯(cuò)誤不是由shuffle輸出丟失造成的溢豆,DAGScheduler是不管的蜒简,由TaskScheduler負(fù)責(zé)嘗試重新提交task執(zhí)行。

5.Job的生成:

答:一旦driver程序中出現(xiàn)action漩仙,就會(huì)生成一個(gè)job搓茬,比如count等,向DAGScheduler提交job队他,如果driver程序后面還有action卷仑,那么其他action也會(huì)對(duì)應(yīng)生成相應(yīng)的job,所以麸折,driver端有多少action就會(huì)提交多少job锡凝,這可能就是為什么spark將driver程序稱為application而不是job 的原因。每一個(gè)job可能會(huì)包含一個(gè)或者多個(gè)stage垢啼,最后一個(gè)stage生成result窜锯,在提交job 的過(guò)程中张肾,DAGScheduler會(huì)首先從后往前劃分stage,劃分的標(biāo)準(zhǔn)就是寬依賴锚扎,一旦遇到寬依賴就劃分捌浩,然后先提交沒有父階段的stage們,并在提交過(guò)程中工秩,計(jì)算該stage的task數(shù)目以及類型尸饺,并提交具體的task,在這些無(wú)父階段的stage提交完之后助币,依賴該stage 的stage才會(huì)提交浪听。

6.有向無(wú)環(huán)圖:

答:DAG,有向無(wú)環(huán)圖眉菱,簡(jiǎn)單的來(lái)說(shuō)迹栓,就是一個(gè)由頂點(diǎn)和有方向性的邊構(gòu)成的圖中,從任意一個(gè)頂點(diǎn)出發(fā)俭缓,沒有任意一條路徑會(huì)將其帶回到出發(fā)點(diǎn)的頂點(diǎn)位置克伊,為每個(gè)spark job計(jì)算具有依賴關(guān)系的多個(gè)stage任務(wù)階段,通常根據(jù)shuffle來(lái)劃分stage华坦,如reduceByKey,groupByKey等涉及到shuffle的transformation就會(huì)產(chǎn)生新的stage 愿吹,然后將每個(gè)stage劃分為具體的一組任務(wù),以TaskSets的形式提交給底層的任務(wù)調(diào)度模塊來(lái)執(zhí)行惜姐,其中不同stage之前的RDD為寬依賴關(guān)系犁跪,TaskScheduler任務(wù)調(diào)度模塊負(fù)責(zé)具體啟動(dòng)任務(wù),監(jiān)控和匯報(bào)任務(wù)運(yùn)行情況歹袁。

7.RDD是什么以及它的分類:

8.RDD的操作

9.RDD緩存:

Spark可以使用 persist 和 cache 方法將任意 RDD 緩存到內(nèi)存坷衍、磁盤文件系統(tǒng)中。緩存是容錯(cuò)的条舔,如果一個(gè) RDD 分片丟失枫耳,可以通過(guò)構(gòu)建它的 transformation自動(dòng)重構(gòu)。被緩存的 RDD 被使用的時(shí)孟抗,存取速度會(huì)被大大加速迁杨。一般的executor內(nèi)存60%做 cache, 剩下的40%做task夸浅。

Spark中仑最,RDD類可以使用cache() 和 persist() 方法來(lái)緩存扔役。cache()是persist()的特例帆喇,將該RDD緩存到內(nèi)存中。而persist可以指定一個(gè)StorageLevel亿胸。StorageLevel的列表可以在StorageLevel 伴生單例對(duì)象中找到坯钦。

Spark的不同StorageLevel 预皇,目的滿足內(nèi)存使用和CPU效率權(quán)衡上的不同需求。我們建議通過(guò)以下的步驟來(lái)進(jìn)行選擇:

·如果你的RDDs可以很好的與默認(rèn)的存儲(chǔ)級(jí)別(MEMORY_ONLY)契合婉刀,就不需要做任何修改了吟温。這已經(jīng)是CPU使用效率最高的選項(xiàng),它使得RDDs的操作盡可能的快突颊。

·如果不行鲁豪,試著使用MEMORY_ONLY_SER并且選擇一個(gè)快速序列化的庫(kù)使得對(duì)象在有比較高的空間使用率的情況下,依然可以較快被訪問律秃。

·盡可能不要存儲(chǔ)到硬盤上爬橡,除非計(jì)算數(shù)據(jù)集的函數(shù),計(jì)算量特別大棒动,或者它們過(guò)濾了大量的數(shù)據(jù)糙申。否則,重新計(jì)算一個(gè)分區(qū)的速度船惨,和與從硬盤中讀取基本差不多快柜裸。

·如果你想有快速故障恢復(fù)能力,使用復(fù)制存儲(chǔ)級(jí)別(例如:用Spark來(lái)響應(yīng)web應(yīng)用的請(qǐng)求)粱锐。所有的存儲(chǔ)級(jí)別都有通過(guò)重新計(jì)算丟失數(shù)據(jù)恢復(fù)錯(cuò)誤的容錯(cuò)機(jī)制疙挺,但是復(fù)制存儲(chǔ)級(jí)別可以讓你在RDD上持續(xù)的運(yùn)行任務(wù),而不需要等待丟失的分區(qū)被重新計(jì)算怜浅。

·如果你想要定義你自己的存儲(chǔ)級(jí)別(比如復(fù)制因子為3而不是2)衔统,可以使用StorageLevel 單例對(duì)象的apply()方法。

在不會(huì)使用cached RDD的時(shí)候海雪,及時(shí)使用unpersist方法來(lái)釋放它锦爵。

10.RDD共享變量:

在應(yīng)用開發(fā)中,一個(gè)函數(shù)被傳遞給Spark操作(例如map和reduce)奥裸,在一個(gè)遠(yuǎn)程集群上運(yùn)行险掀,它實(shí)際上操作的是這個(gè)函數(shù)用到的所有變量的獨(dú)立拷貝。這些變量會(huì)被拷貝到每一臺(tái)機(jī)器湾宙。通痴燎猓看來(lái),在任務(wù)之間中侠鳄,讀寫共享變量顯然不夠高效埠啃。然而,Spark還是為兩種常見的使用模式伟恶,提供了兩種有限的共享變量:廣播變量和累加器碴开。

(1). 廣播變量(Broadcast Variables)

– 廣播變量緩存到各個(gè)節(jié)點(diǎn)的內(nèi)存中,而不是每個(gè) Task

– 廣播變量被創(chuàng)建后,能在集群中運(yùn)行的任何函數(shù)調(diào)用

– 廣播變量是只讀的潦牛,不能在被廣播后修改

– 對(duì)于大數(shù)據(jù)集的廣播眶掌, Spark 嘗試使用高效的廣播算法來(lái)降低通信成本

val broadcastVar = sc.broadcast(Array(1, 2, 3))方法參數(shù)中是要廣播的變量

(2). 累加器

累加器只支持加法操作,可以高效地并行巴碗,用于實(shí)現(xiàn)計(jì)數(shù)器和變量求和朴爬。Spark 原生支持?jǐn)?shù)值類型和標(biāo)準(zhǔn)可變集合的計(jì)數(shù)器,但用戶可以添加新的類型橡淆。只有驅(qū)動(dòng)程序才能獲取累加器的值

11.spark-submit的時(shí)候如何引入外部jar包:

在通過(guò)spark-submit提交任務(wù)時(shí)召噩,可以通過(guò)添加配置參數(shù)來(lái)指定?

–driver-class-path 外部jar包

–jars 外部jar包

12.spark如何防止內(nèi)存溢出:

driver端的內(nèi)存溢出?

可以增大driver的內(nèi)存參數(shù):spark.driver.memory (default 1g)

這個(gè)參數(shù)用來(lái)設(shè)置Driver的內(nèi)存。在Spark程序中逸爵,SparkContext蚣常,DAGScheduler都是運(yùn)行在Driver端的。對(duì)應(yīng)rdd的Stage切分也是在Driver端運(yùn)行痊银,如果用戶自己寫的程序有過(guò)多的步驟抵蚊,切分出過(guò)多的Stage,這部分信息消耗的是Driver的內(nèi)存溯革,這個(gè)時(shí)候就需要調(diào)大Driver的內(nèi)存贞绳。

map過(guò)程產(chǎn)生大量對(duì)象導(dǎo)致內(nèi)存溢出?

這種溢出的原因是在單個(gè)map中產(chǎn)生了大量的對(duì)象導(dǎo)致的,例如:rdd.map(x=>for(i <- 1 to 10000) yield i.toString)致稀,這個(gè)操作在rdd中冈闭,每個(gè)對(duì)象都產(chǎn)生了10000個(gè)對(duì)象,這肯定很容易產(chǎn)生內(nèi)存溢出的問題抖单。針對(duì)這種問題萎攒,在不增加內(nèi)存的情況下,可以通過(guò)減少每個(gè)Task的大小矛绘,以便達(dá)到每個(gè)Task即使產(chǎn)生大量的對(duì)象Executor的內(nèi)存也能夠裝得下耍休。具體做法可以在會(huì)產(chǎn)生大量對(duì)象的map操作之前調(diào)用repartition方法,分區(qū)成更小的塊傳入map货矮。例如:rdd.repartition(10000).map(x=>for(i <- 1 to 10000) yield i.toString)羊精。?

面對(duì)這種問題注意,不能使用rdd.coalesce方法囚玫,這個(gè)方法只能減少分區(qū)喧锦,不能增加分區(qū), 不會(huì)有shuffle的過(guò)程抓督。

數(shù)據(jù)不平衡導(dǎo)致內(nèi)存溢出?

數(shù)據(jù)不平衡除了有可能導(dǎo)致內(nèi)存溢出外燃少,也有可能導(dǎo)致性能的問題,解決方法和上面說(shuō)的類似铃在,就是調(diào)用repartition重新分區(qū)阵具。這里就不再累贅了。

shuffle后內(nèi)存溢出?

shuffle內(nèi)存溢出的情況可以說(shuō)都是shuffle后,單個(gè)文件過(guò)大導(dǎo)致的怔昨。在Spark中,join宿稀,reduceByKey這一類型的過(guò)程趁舀,都會(huì)有shuffle的過(guò)程,在shuffle的使用祝沸,需要傳入一個(gè)partitioner矮烹,大部分Spark中的shuffle操作,默認(rèn)的partitioner都是HashPatitioner罩锐,默認(rèn)值是父RDD中最大的分區(qū)數(shù),這個(gè)參數(shù)通過(guò)spark.default.parallelism控制(在spark-sql中用spark.sql.shuffle.partitions) 奉狈, spark.default.parallelism參數(shù)只對(duì)HashPartitioner有效,所以如果是別的Partitioner或者自己實(shí)現(xiàn)的Partitioner就不能使用spark.default.parallelism這個(gè)參數(shù)來(lái)控制shuffle的并發(fā)量了涩惑。如果是別的partitioner導(dǎo)致的shuffle內(nèi)存溢出仁期,就需要從partitioner的代碼增加partitions的數(shù)量。

standalone模式下資源分配不均勻?qū)е聝?nèi)存溢出

在standalone的模式下如果配置了–total-executor-cores 和 –executor-memory 這兩個(gè)參數(shù)竭恬,但是沒有配置–executor-cores這個(gè)參數(shù)的話跛蛋,就有可能導(dǎo)致,每個(gè)Executor的memory是一樣的痊硕,但是cores的數(shù)量不同赊级,那么在cores數(shù)量多的Executor中,由于能夠同時(shí)執(zhí)行多個(gè)Task岔绸,就容易導(dǎo)致內(nèi)存溢出的情況理逊。這種情況的解決方法就是同時(shí)配置–executor-cores或者spark.executor.cores參數(shù),確保Executor資源分配均勻盒揉。

使用rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)代替rdd.cache()

rdd.cache()和rdd.persist(Storage.MEMORY_ONLY)是等價(jià)的晋被,在內(nèi)存不足的時(shí)候rdd.cache()的數(shù)據(jù)會(huì)丟失,再次使用的時(shí)候會(huì)重算刚盈,而rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)在內(nèi)存不足的時(shí)候會(huì)存儲(chǔ)在磁盤墨微,避免重算,只是消耗點(diǎn)IO時(shí)間扁掸。

13.spark中cache和persist的區(qū)別:

cache:緩存數(shù)據(jù)翘县,默認(rèn)是緩存在內(nèi)存中,其本質(zhì)還是調(diào)用persist

persist:緩存數(shù)據(jù)谴分,有豐富的數(shù)據(jù)緩存策略锈麸。數(shù)據(jù)可以保存在內(nèi)存也可以保存在磁盤中,使用的時(shí)候指定對(duì)應(yīng)的緩存級(jí)別就可以了牺蹄。

14.spark分布式集群搭建的步驟:

地球人都知道

這里可以概述下如何搭建高可用的spark集群(HA)?

主要是引入了zookeeper

15.spark中的數(shù)據(jù)傾斜的現(xiàn)象忘伞,原因,后果:

(1)、數(shù)據(jù)傾斜的現(xiàn)象?

多數(shù)task執(zhí)行速度較快,少數(shù)task執(zhí)行時(shí)間非常長(zhǎng)氓奈,或者等待很長(zhǎng)時(shí)間后提示你內(nèi)存不足翘魄,執(zhí)行失敗。

(2)舀奶、數(shù)據(jù)傾斜的原因?

數(shù)據(jù)問題?

1暑竟、key本身分布不均衡(包括大量的key為空)

2、key的設(shè)置不合理

spark使用問題?

1育勺、shuffle時(shí)的并發(fā)度不夠

2但荤、計(jì)算方式有誤

(3)、數(shù)據(jù)傾斜的后果?

1涧至、spark中的stage的執(zhí)行時(shí)間受限于最后那個(gè)執(zhí)行完成的task,因此運(yùn)行緩慢的任務(wù)會(huì)拖垮整個(gè)程序的運(yùn)行速度(分布式程序運(yùn)行的速度是由最慢的那個(gè)task決定的)腹躁。

2、過(guò)多的數(shù)據(jù)在同一個(gè)task中運(yùn)行南蓬,將會(huì)把executor撐爆纺非。

16.spark數(shù)據(jù)傾斜的處理:

發(fā)現(xiàn)數(shù)據(jù)傾斜的時(shí)候,不要急于提高executor的資源赘方,修改參數(shù)或是修改程序铐炫,首先要檢查數(shù)據(jù)本身,是否存在異常數(shù)據(jù)蒜焊。

1倒信、數(shù)據(jù)問題造成的數(shù)據(jù)傾斜

找出異常的key?

如果任務(wù)長(zhǎng)時(shí)間卡在最后最后1個(gè)(幾個(gè))任務(wù),首先要對(duì)key進(jìn)行抽樣分析泳梆,判斷是哪些key造成的鳖悠。?選取key,對(duì)數(shù)據(jù)進(jìn)行抽樣优妙,統(tǒng)計(jì)出現(xiàn)的次數(shù)乘综,根據(jù)出現(xiàn)次數(shù)大小排序取出前幾個(gè)。

比如: df.select(“key”).sample(false,0.1).(k=>(k,1)).reduceBykey(+).map(k=>(k._2,k._1)).sortByKey(false).take(10)

如果發(fā)現(xiàn)多數(shù)數(shù)據(jù)分布都較為平均,而個(gè)別數(shù)據(jù)比其他數(shù)據(jù)大上若干個(gè)數(shù)量級(jí),則說(shuō)明發(fā)生了數(shù)據(jù)傾斜砸捏。


經(jīng)過(guò)分析歉闰,傾斜的數(shù)據(jù)主要有以下三種情況:?

1疑枯、null(空值)或是一些無(wú)意義的信息()之類的,大多是這個(gè)原因引起。

2、無(wú)效數(shù)據(jù),大量重復(fù)的測(cè)試數(shù)據(jù)或是對(duì)結(jié)果影響不大的有效數(shù)據(jù)萌朱。

3、有效數(shù)據(jù)策菜,業(yè)務(wù)導(dǎo)致的正常數(shù)據(jù)分布晶疼。

解決辦法?

第1酒贬,2種情況,直接對(duì)數(shù)據(jù)進(jìn)行過(guò)濾即可(因?yàn)樵摂?shù)據(jù)對(duì)當(dāng)前業(yè)務(wù)不會(huì)產(chǎn)生影響)翠霍。

第3種情況則需要進(jìn)行一些特殊操作锭吨,常見的有以下幾種做法?

(1) 隔離執(zhí)行,將異常的key過(guò)濾出來(lái)單獨(dú)處理寒匙,最后與正常數(shù)據(jù)的處理結(jié)果進(jìn)行union操作零如。

(2) 對(duì)key先添加隨機(jī)值,進(jìn)行操作后蒋情,去掉隨機(jī)值埠况,再進(jìn)行一次操作耸携。

(3) 使用reduceByKey 代替 groupByKey(reduceByKey用于對(duì)每個(gè)key對(duì)應(yīng)的多個(gè)value進(jìn)行merge操作棵癣,最重要的是它能夠在本地先進(jìn)行merge操作,并且merge操作可以通過(guò)函數(shù)自定義.)

(4) 使用map join夺衍。

案例?

如果使用reduceByKey因?yàn)閿?shù)據(jù)傾斜造成運(yùn)行失敗的問題狈谊。具體操作流程如下:?

(1) 將原始的 key 轉(zhuǎn)化為 key + 隨機(jī)值(例如Random.nextInt)

(2) 對(duì)數(shù)據(jù)進(jìn)行 reduceByKey(func)

(3) 將 key + 隨機(jī)值 轉(zhuǎn)成 key

(4) 再對(duì)數(shù)據(jù)進(jìn)行 reduceByKey(func)

案例操作流程分析:?

假設(shè)說(shuō)有傾斜的Key,我們給所有的Key加上一個(gè)隨機(jī)數(shù)沟沙,然后進(jìn)行reduceByKey操作河劝;此時(shí)同一個(gè)Key會(huì)有不同的隨機(jī)數(shù)前綴,在進(jìn)行reduceByKey操作的時(shí)候原來(lái)的一個(gè)非常大的傾斜的Key就分而治之變成若干個(gè)更小的Key矛紫,不過(guò)此時(shí)結(jié)果和原來(lái)不一樣赎瞎,怎么破?進(jìn)行map操作颊咬,目的是把隨機(jī)數(shù)前綴去掉务甥,然后再次進(jìn)行reduceByKey操作。(當(dāng)然喳篇,如果你很無(wú)聊敞临,可以再次做隨機(jī)數(shù)前綴),這樣我們就可以把原本傾斜的Key通過(guò)分而治之方案分散開來(lái)麸澜,最后又進(jìn)行了全局聚合

注意1: 如果此時(shí)依舊存在問題挺尿,建議篩選出傾斜的數(shù)據(jù)單獨(dú)處理。最后將這份數(shù)據(jù)與正常的數(shù)據(jù)進(jìn)行union即可炊邦。

注意2: 單獨(dú)處理異常數(shù)據(jù)時(shí)编矾,可以配合使用Map Join解決。

2馁害、spark使用不當(dāng)造成的數(shù)據(jù)傾斜

提高shuffle并行度

dataFrame和sparkSql可以設(shè)置spark.sql.shuffle.partitions參數(shù)控制shuffle的并發(fā)度洽沟,默認(rèn)為200。

rdd操作可以設(shè)置spark.default.parallelism控制并發(fā)度蜗细,默認(rèn)參數(shù)由不同的Cluster Manager控制裆操。

局限性: 只是讓每個(gè)task執(zhí)行更少的不同的key怒详。無(wú)法解決個(gè)別key特別大的情況造成的傾斜,如果某些key的大小非常大踪区,即使一個(gè)task單獨(dú)執(zhí)行它昆烁,也會(huì)受到數(shù)據(jù)傾斜的困擾。

使用map join 代替reduce join

在小表不是特別大(取決于你的executor大小)的情況下使用缎岗,可以使程序避免shuffle的過(guò)程静尼,自然也就沒有數(shù)據(jù)傾斜的困擾了.(詳細(xì)見http://blog.csdn.net/lsshlsw/article/details/50834858、http://blog.csdn.net/lsshlsw/article/details/48694893)

局限性: 因?yàn)槭窍葘⑿?shù)據(jù)發(fā)送到每個(gè)executor上传泊,所以數(shù)據(jù)量不能太大鼠渺。

17.spark中map-side-join關(guān)聯(lián)優(yōu)化:

將多份數(shù)據(jù)進(jìn)行關(guān)聯(lián)是數(shù)據(jù)處理過(guò)程中非常普遍的用法,不過(guò)在分布式計(jì)算系統(tǒng)中眷细,這個(gè)問題往往會(huì)變的非常麻煩拦盹,因?yàn)榭蚣芴峁┑?join 操作一般會(huì)將所有數(shù)據(jù)根據(jù) key 發(fā)送到所有的 reduce 分區(qū)中去,也就是 shuffle 的過(guò)程溪椎。造成大量的網(wǎng)絡(luò)以及磁盤IO消耗普舆,運(yùn)行效率極其低下,這個(gè)過(guò)程一般被稱為 reduce-side-join校读。

如果其中有張表較小的話沼侣,我們則可以自己實(shí)現(xiàn)在 map 端實(shí)現(xiàn)數(shù)據(jù)關(guān)聯(lián),跳過(guò)大量數(shù)據(jù)進(jìn)行 shuffle 的過(guò)程歉秫,運(yùn)行時(shí)間得到大量縮短蛾洛,根據(jù)不同數(shù)據(jù)可能會(huì)有幾倍到數(shù)十倍的性能提升。

何時(shí)使用:在海量數(shù)據(jù)中匹配少量特定數(shù)據(jù)

原理:reduce-side-join 的缺陷在于會(huì)將key相同的數(shù)據(jù)發(fā)送到同一個(gè)partition中進(jìn)行運(yùn)算雁芙,大數(shù)據(jù)集的傳輸需要長(zhǎng)時(shí)間的IO轧膘,同時(shí)任務(wù)并發(fā)度收到限制,還可能造成數(shù)據(jù)傾斜却特。

reduce-side-join 運(yùn)行圖如下

map-side-join 運(yùn)行圖如下:

將少量的數(shù)據(jù)轉(zhuǎn)化為Map進(jìn)行廣播扶供,廣播會(huì)將此 Map 發(fā)送到每個(gè)節(jié)點(diǎn)中,如果不進(jìn)行廣播裂明,每個(gè)task執(zhí)行時(shí)都會(huì)去獲取該Map數(shù)據(jù)椿浓,造成了性能浪費(fèi)。對(duì)大數(shù)據(jù)進(jìn)行遍歷闽晦,使用mapPartition而不是map扳碍,因?yàn)閙apPartition是在每個(gè)partition中進(jìn)行操作,因此可以減少遍歷時(shí)新建broadCastMap.value對(duì)象的空間消耗仙蛉,同時(shí)匹配不到的數(shù)據(jù)也不會(huì)返回笋敞。

18.kafka整合sparkStreaming問題:

(1)、如何實(shí)現(xiàn)sparkStreaming讀取kafka中的數(shù)據(jù)

可以這樣說(shuō):在kafka0.10版本之前有二種方式與sparkStreaming整合荠瘪,一種是基于receiver夯巷,一種是direct,然后分別闡述這2種方式分別是什么?

receiver:是采用了kafka高級(jí)api,利用receiver接收器來(lái)接受kafka topic中的數(shù)據(jù)赛惩,從kafka接收來(lái)的數(shù)據(jù)會(huì)存儲(chǔ)在spark的executor中,之后spark streaming提交的job會(huì)處理這些數(shù)據(jù)趁餐,kafka中topic的偏移量是保存在zk中的喷兼。?

基本使用:

還有幾個(gè)需要注意的點(diǎn):?

在Receiver的方式中,Spark中的partition和kafka中的partition并不是相關(guān)的后雷,所以如果我們加大每個(gè)topic的partition數(shù)量季惯,僅僅是增加線程來(lái)處理由單一Receiver消費(fèi)的主題。但是這并沒有增加Spark在處理數(shù)據(jù)上的并行度.

對(duì)于不同的Group和topic我們可以使用多個(gè)Receiver創(chuàng)建不同的Dstream來(lái)并行接收數(shù)據(jù)臀突,之后可以利用union來(lái)統(tǒng)一成一個(gè)Dstream勉抓。

在默認(rèn)配置下,這種方式可能會(huì)因?yàn)榈讓拥氖《鴣G失數(shù)據(jù). 因?yàn)閞eceiver一直在接收數(shù)據(jù),在其已經(jīng)通知zookeeper數(shù)據(jù)接收完成但是還沒有處理的時(shí)候,executor突然掛掉(或是driver掛掉通知executor關(guān)閉),緩存在其中的數(shù)據(jù)就會(huì)丟失. 如果希望做到高可靠, 讓數(shù)據(jù)零丟失,如果我們啟用了Write Ahead Logs(spark.streaming.receiver.writeAheadLog.enable=true)該機(jī)制會(huì)同步地將接收到的Kafka數(shù)據(jù)寫入分布式文件系統(tǒng)(比如HDFS)上的預(yù)寫日志中. 所以, 即使底層節(jié)點(diǎn)出現(xiàn)了失敗, 也可以使用預(yù)寫日志中的數(shù)據(jù)進(jìn)行恢復(fù). 復(fù)制到文件系統(tǒng)如HDFS候学,那么storage level需要設(shè)置成 StorageLevel.MEMORY_AND_DISK_SER藕筋,也就是KafkaUtils.createStream(…, StorageLevel.MEMORY_AND_DISK_SER)

direct:在spark1.3之后,引入了Direct方式盒齿。不同于Receiver的方式念逞,Direct方式?jīng)]有receiver這一層困食,其會(huì)周期性的獲取Kafka中每個(gè)topic的每個(gè)partition中的最新offsets边翁,之后根據(jù)設(shè)定的maxRatePerPartition來(lái)處理每個(gè)batch。(設(shè)置spark.streaming.kafka.maxRatePerPartition=10000硕盹。限制每秒鐘從topic的每個(gè)partition最多消費(fèi)的消息條數(shù))符匾。

(2) 對(duì)比這2中方式的優(yōu)缺點(diǎn):

采用receiver方式:這種方式可以保證數(shù)據(jù)不丟失,但是無(wú)法保證數(shù)據(jù)只被處理一次瘩例,WAL實(shí)現(xiàn)的是At-least-once語(yǔ)義(至少被處理一次)啊胶,如果在寫入到外部存儲(chǔ)的數(shù)據(jù)還沒有將offset更新到zookeeper就掛掉,這些數(shù)據(jù)將會(huì)被反復(fù)消費(fèi). 同時(shí),降低了程序的吞吐量。

采用direct方式:相比Receiver模式而言能夠確保機(jī)制更加健壯. 區(qū)別于使用Receiver來(lái)被動(dòng)接收數(shù)據(jù), Direct模式會(huì)周期性地主動(dòng)查詢Kafka, 來(lái)獲得每個(gè)topic+partition的最新的offset, 從而定義每個(gè)batch的offset的范圍. 當(dāng)處理數(shù)據(jù)的job啟動(dòng)時(shí), 就會(huì)使用Kafka的簡(jiǎn)單consumer api來(lái)獲取Kafka指定offset范圍的數(shù)據(jù)垛贤。?

優(yōu)點(diǎn):?

1焰坪、簡(jiǎn)化并行讀取?

如果要讀取多個(gè)partition, 不需要?jiǎng)?chuàng)建多個(gè)輸入DStream然后對(duì)它們進(jìn)行union操作. Spark會(huì)創(chuàng)建跟Kafka partition一樣多的RDD partition, 并且會(huì)并行從Kafka中讀取數(shù)據(jù). 所以在Kafka partition和RDD partition之間, 有一個(gè)一對(duì)一的映射關(guān)系.

2、高性能?

如果要保證零數(shù)據(jù)丟失, 在基于receiver的方式中, 需要開啟WAL機(jī)制. 這種方式其實(shí)效率低下, 因?yàn)閿?shù)據(jù)實(shí)際上被復(fù)制了兩份, Kafka自己本身就有高可靠的機(jī)制, 會(huì)對(duì)數(shù)據(jù)復(fù)制一份, 而這里又會(huì)復(fù)制一份到WAL中. 而基于direct的方式, 不依賴Receiver, 不需要開啟WAL機(jī)制, 只要Kafka中作了數(shù)據(jù)的復(fù)制, 那么就可以通過(guò)Kafka的副本進(jìn)行恢復(fù).

3聘惦、一次且僅一次的事務(wù)機(jī)制?

基于receiver的方式, 是使用Kafka的高階API來(lái)在ZooKeeper中保存消費(fèi)過(guò)的offset的. 這是消費(fèi)Kafka數(shù)據(jù)的傳統(tǒng)方式. 這種方式配合著WAL機(jī)制可以保證數(shù)據(jù)零丟失的高可靠性, 但是卻無(wú)法保證數(shù)據(jù)被處理一次且僅一次, 可能會(huì)處理兩次. 因?yàn)镾park和ZooKeeper之間可能是不同步的. 基于direct的方式, 使用kafka的簡(jiǎn)單api, Spark Streaming自己就負(fù)責(zé)追蹤消費(fèi)的offset, 并保存在checkpoint中. Spark自己一定是同步的, 因此可以保證數(shù)據(jù)是消費(fèi)一次且僅消費(fèi)一次某饰。不過(guò)需要自己完成將offset寫入zk的過(guò)程,在官方文檔中都有相應(yīng)介紹.?

*簡(jiǎn)單代碼實(shí)例:?

* messages.foreachRDD(rdd=>{?

val message = rdd.map(_._2)//對(duì)數(shù)據(jù)進(jìn)行一些操作?

message.map(method)//更新zk上的offset (自己實(shí)現(xiàn))?

updateZKOffsets(rdd)?

})?

* sparkStreaming程序自己消費(fèi)完成后,自己主動(dòng)去更新zk上面的偏移量善绎。也可以將zk中的偏移量保存在mysql或者redis數(shù)據(jù)庫(kù)中黔漂,下次重啟的時(shí)候,直接讀取mysql或者redis中的偏移量禀酱,獲取到上次消費(fèi)的偏移量炬守,接著讀取數(shù)據(jù)。

19.利用scala語(yǔ)言進(jìn)行排序:

1.冒泡:

2.快讀排序:

20.spark master在使用zookeeper進(jìn)行HA時(shí)剂跟,有哪些元數(shù)據(jù)保存在zookeeper减途?

答:spark通過(guò)這個(gè)參數(shù)spark.deploy.zookeeper.dir指定master元數(shù)據(jù)在zookeeper中保存的位置酣藻,包括worker,master,application,executors.standby節(jié)點(diǎn)要從zk中獲得元數(shù)據(jù)信息,恢復(fù)集群運(yùn)行狀態(tài)鳍置,才能對(duì)外繼續(xù)提供服務(wù)臊恋,作業(yè)提交資源申請(qǐng)等,在恢復(fù)前是不能接受請(qǐng)求的墓捻,另外抖仅,master切換需要注意兩點(diǎn):

1.在master切換的過(guò)程中,所有的已經(jīng)在運(yùn)行的程序皆正常運(yùn)行砖第,因?yàn)閟park application在運(yùn)行前就已經(jīng)通過(guò)cluster manager獲得了計(jì)算資源撤卢,所以在運(yùn)行時(shí)job本身的調(diào)度和處理master是沒有任何關(guān)系的;

2.在master的切換過(guò)程中唯一的影響是不能提交新的job梧兼,一方面不能提交新的應(yīng)用程序給集群放吩,因?yàn)橹挥蠥ctive master才能接受新的程序的提交請(qǐng)求,另外一方面羽杰,已經(jīng)運(yùn)行的程序也不能action操作觸發(fā)新的job提交請(qǐng)求渡紫。

21.spark master HA主從切換過(guò)程不會(huì)影響集群已有的作業(yè)運(yùn)行,為什么考赛?

答:因?yàn)槌绦蛟谶\(yùn)行之前惕澎,已經(jīng)向集群申請(qǐng)過(guò)資源,這些資源已經(jīng)提交給driver了颜骤,也就是說(shuō)已經(jīng)分配好資源了唧喉,這是粗粒度分配,一次性分配好資源后不需要再關(guān)心資源分配忍抽,在運(yùn)行時(shí)讓driver和executor自動(dòng)交互八孝,弊端是如果資源分配太多,任務(wù)運(yùn)行完不會(huì)很快釋放鸠项,造成資源浪費(fèi)干跛,這里不適用細(xì)粒度分配的原因是因?yàn)槿蝿?wù)提交太慢。

22.什么是粗粒度祟绊,什么是細(xì)粒度楼入,各自的優(yōu)缺點(diǎn)是什么?

答:1.粗粒度:?jiǎn)?dòng)時(shí)就分配好資源久免,程序啟動(dòng)浅辙,后續(xù)具體使用就使用分配好的資源,不需要再分配資源阎姥。好處:作業(yè)特別多時(shí)记舆,資源復(fù)用率較高,使用粗粒度呼巴。缺點(diǎn):容易資源浪費(fèi)泽腮,如果一個(gè)job有1000個(gè)task御蒲,完成了999個(gè),還有一個(gè)沒完成诊赊,那么使用粗粒度厚满。如果有999個(gè)資源閑置在那里,會(huì)造成資源大量浪費(fèi)碧磅。

2.細(xì)粒度:用資源的時(shí)候分配碘箍,用完了就立即回收資源,啟動(dòng)會(huì)麻煩一點(diǎn)鲸郊,啟動(dòng)一次分配一次丰榴,會(huì)比較麻煩。

23.driver的功能是什么:

答:1.一個(gè)spark作業(yè)運(yùn)行時(shí)包括一個(gè)driver進(jìn)程秆撮,也就是作業(yè)的主進(jìn)程四濒,具有main函數(shù),并且有sparkContext的實(shí)例职辨,是程序的入口盗蟆;

2.功能:負(fù)責(zé)向集群申請(qǐng)資源,向master注冊(cè)信息舒裤,負(fù)責(zé)了作業(yè)的調(diào)度喳资,負(fù)責(zé)了作業(yè)的解析,生成stage并調(diào)度task到executor上惭每,包括DAGScheduler骨饿,TaskScheduler亏栈。

24.spark的有幾種部署模式台腥,每種模式特點(diǎn)?

1)本地模式

Spark不一定非要跑在hadoop集群绒北,可以在本地黎侈,起多個(gè)線程的方式來(lái)指定。將Spark應(yīng)用以多線程的方式直接運(yùn)行在本地闷游,一般都是為了方便調(diào)試峻汉,本地模式分三類

·??local:只啟動(dòng)一個(gè)executor

·??local[k]:啟動(dòng)k個(gè)executor

·??local:?jiǎn)?dòng)跟cpu數(shù)目相同的 executor

2)standalone模式

分布式部署集群, 自帶完整的服務(wù)脐往,資源管理和任務(wù)監(jiān)控是Spark自己監(jiān)控休吠,這個(gè)模式也是其他模式的基礎(chǔ),

3)Spark on yarn模式

分布式部署集群业簿,資源和任務(wù)監(jiān)控交給yarn管理瘤礁,但是目前僅支持粗粒度資源分配方式,包含cluster和client運(yùn)行模式梅尤,cluster適合生產(chǎn)柜思,driver運(yùn)行在集群子節(jié)點(diǎn)岩调,具有容錯(cuò)功能,client適合調(diào)試赡盘,dirver運(yùn)行在客戶端

4)Spark On Mesos模式号枕。官方推薦這種模式(當(dāng)然,原因之一是血緣關(guān)系)陨享。正是由于Spark開發(fā)之初就考慮到支持Mesos葱淳,因此,目前而言抛姑,Spark運(yùn)行在Mesos上會(huì)比運(yùn)行在YARN上更加靈活蛙紫,更加自然。用戶可選擇兩種調(diào)度模式之一運(yùn)行自己的應(yīng)用程序:

1)? ?粗粒度模式(Coarse-grained Mode):每個(gè)應(yīng)用程序的運(yùn)行環(huán)境由一個(gè)Dirver和若干個(gè)Executor組成途戒,其中坑傅,每個(gè)Executor占用若干資源,內(nèi)部可運(yùn)行多個(gè)Task(對(duì)應(yīng)多少個(gè)“slot”)喷斋。應(yīng)用程序的各個(gè)任務(wù)正式運(yùn)行之前唁毒,需要將運(yùn)行環(huán)境中的資源全部申請(qǐng)好,且運(yùn)行過(guò)程中要一直占用這些資源星爪,即使不用浆西,最后程序運(yùn)行結(jié)束后,回收這些資源顽腾。

2)? ?細(xì)粒度模式(Fine-grained Mode):鑒于粗粒度模式會(huì)造成大量資源浪費(fèi)近零,Spark On Mesos還提供了另外一種調(diào)度模式:細(xì)粒度模式,這種模式類似于現(xiàn)在的云計(jì)算抄肖,思想是按需分配久信。

25.Spark技術(shù)棧有哪些組件,每個(gè)組件都有什么功能漓摩,適合什么應(yīng)用場(chǎng)景裙士?

1)Spark core:是其它組件的基礎(chǔ),spark的內(nèi)核管毙,主要包含:有向循環(huán)圖腿椎、RDD、Lingage夭咬、Cache啃炸、broadcast等,并封裝了底層通訊框架卓舵,是Spark的基礎(chǔ)南用。

2)SparkStreaming是一個(gè)對(duì)實(shí)時(shí)數(shù)據(jù)流進(jìn)行高通量、容錯(cuò)處理的流式處理系統(tǒng),可以對(duì)多種數(shù)據(jù)源(如Kdfka训枢、Flume托修、Twitter、Zero和TCP 套接字)進(jìn)行類似Map恒界、Reduce和Join等復(fù)雜操作睦刃,將流式計(jì)算分解成一系列短小的批處理作業(yè)。

3)Spark sql:Shark是SparkSQL的前身十酣,Spark SQL的一個(gè)重要特點(diǎn)是其能夠統(tǒng)一處理關(guān)系表和RDD涩拙,使得開發(fā)人員可以輕松地使用SQL命令進(jìn)行外部查詢,同時(shí)進(jìn)行更復(fù)雜的數(shù)據(jù)分析

4)BlinkDB :是一個(gè)用于在海量數(shù)據(jù)上運(yùn)行交互式 SQL 查詢的大規(guī)模并行查詢引擎耸采,它允許用戶通過(guò)權(quán)衡數(shù)據(jù)精度來(lái)提升查詢響應(yīng)時(shí)間兴泥,其數(shù)據(jù)的精度被控制在允許的誤差范圍內(nèi)。

5)MLBase是Spark生態(tài)圈的一部分專注于機(jī)器學(xué)習(xí)虾宇,讓機(jī)器學(xué)習(xí)的門檻更低搓彻,讓一些可能并不了解機(jī)器學(xué)習(xí)的用戶也能方便地使用MLbase。MLBase分為四部分:MLlib嘱朽,MLI旭贬、ML Optimizer和MLRuntime。

6)GraphX是Spark中用于圖和圖并行計(jì)算

26.spark中worker 的主要工作是什么搪泳?

主要功能:管理當(dāng)前節(jié)點(diǎn)內(nèi)存稀轨,CPU的使用情況,接受master發(fā)送過(guò)來(lái)的資源指令岸军,通過(guò)executorRunner啟動(dòng)程序分配任務(wù)奋刽,worker就類似于包工頭,管理分配新進(jìn)程艰赞,做計(jì)算的服務(wù)佣谐,相當(dāng)于process服務(wù),需要注意的是:

1.worker會(huì)不會(huì)匯報(bào)當(dāng)前信息給master猖毫?worker心跳給master主要只有workid台谍,不會(huì)以心跳的方式發(fā)送資源信息給master,這樣master就知道worker是否存活吁断,只有故障的時(shí)候才會(huì)發(fā)送資源信息;

2.worker不會(huì)運(yùn)行代碼坞生,具體運(yùn)行的是executor仔役,可以運(yùn)行具體application斜的業(yè)務(wù)邏輯代碼,操作代碼的節(jié)點(diǎn)是己,不會(huì)去運(yùn)行代碼又兵。

27.簡(jiǎn)單說(shuō)一下hadoop和spark的shuffle相同和差異?

答:1)從 high-level 的角度來(lái)看,兩者并沒有大的差別沛厨。 都是將 mapper(Spark 里是 ShuffleMapTask)的輸出進(jìn)行 partition宙地,不同的 partition 送到不同的 reducer(Spark 里 reducer 可能是下一個(gè) stage 里的 ShuffleMapTask,也可能是 ResultTask)逆皮。Reducer 以內(nèi)存作緩沖區(qū)宅粥,邊 shuffle 邊 aggregate 數(shù)據(jù),等到數(shù)據(jù) aggregate 好以后進(jìn)行 reduce() (Spark 里可能是后續(xù)的一系列操作)电谣。

2)從 low-level 的角度來(lái)看秽梅,兩者差別不小。 Hadoop MapReduce 是 sort-based剿牺,進(jìn)入 combine() 和 reduce() 的 records 必須先 sort企垦。這樣的好處在于 combine/reduce() 可以處理大規(guī)模的數(shù)據(jù),因?yàn)槠漭斎霐?shù)據(jù)可以通過(guò)外排得到(mapper 對(duì)每段數(shù)據(jù)先做排序晒来,reducer 的 shuffle 對(duì)排好序的每段數(shù)據(jù)做歸并)钞诡。目前的 Spark 默認(rèn)選擇的是 hash-based,通常使用 HashMap 來(lái)對(duì) shuffle 來(lái)的數(shù)據(jù)進(jìn)行 aggregate湃崩,不會(huì)對(duì)數(shù)據(jù)進(jìn)行提前排序臭增。如果用戶需要經(jīng)過(guò)排序的數(shù)據(jù),那么需要自己調(diào)用類似 sortByKey() 的操作竹习;如果你是Spark 1.1的用戶誊抛,可以將spark.shuffle.manager設(shè)置為sort,則會(huì)對(duì)數(shù)據(jù)進(jìn)行排序整陌。在Spark 1.2中拗窃,sort將作為默認(rèn)的Shuffle實(shí)現(xiàn)。

3)從實(shí)現(xiàn)角度來(lái)看泌辫,兩者也有不少差別随夸。 Hadoop MapReduce 將處理流程劃分出明顯的幾個(gè)階段:map(), spill, merge, shuffle, sort, reduce() 等。每個(gè)階段各司其職震放,可以按照過(guò)程式的編程思想來(lái)逐一實(shí)現(xiàn)每個(gè)階段的功能宾毒。在 Spark 中,沒有這樣功能明確的階段殿遂,只有不同的 stage 和一系列的 transformation()诈铛,所以 spill, merge, aggregate 等操作需要蘊(yùn)含在 transformation() 中。

如果我們將 map 端劃分?jǐn)?shù)據(jù)墨礁、持久化數(shù)據(jù)的過(guò)程稱為 shuffle write幢竹,而將 reducer 讀入數(shù)據(jù)、aggregate 數(shù)據(jù)的過(guò)程稱為 shuffle read恩静。那么在 Spark 中焕毫,問題就變?yōu)樵趺丛?job 的邏輯或者物理執(zhí)行圖中加入 shuffle write 和 shuffle read 的處理邏輯蹲坷?以及兩個(gè)處理邏輯應(yīng)該怎么高效實(shí)現(xiàn)??

Shuffle write由于不要求數(shù)據(jù)有序邑飒,shuffle write 的任務(wù)很簡(jiǎn)單:將數(shù)據(jù) partition 好循签,并持久化。之所以要持久化疙咸,一方面是要減少內(nèi)存存儲(chǔ)空間壓力县匠,另一方面也是為了 fault-tolerance。

28.Mapreduce和Spark的都是并行計(jì)算罕扎,那么他們有什么相同和區(qū)別

答:兩者都是用mr模型來(lái)進(jìn)行并行計(jì)算:

1)hadoop的一個(gè)作業(yè)稱為job聚唐,job里面分為map task和reduce task,每個(gè)task都是在自己的進(jìn)程中運(yùn)行的腔召,當(dāng)task結(jié)束時(shí)杆查,進(jìn)程也會(huì)結(jié)束。?

2)spark用戶提交的任務(wù)成為application臀蛛,一個(gè)application對(duì)應(yīng)一個(gè)sparkcontext亲桦,app中存在多個(gè)job,每觸發(fā)一次action操作就會(huì)產(chǎn)生一個(gè)job浊仆。這些job可以并行或串行執(zhí)行客峭,每個(gè)job中有多個(gè)stage,stage是shuffle過(guò)程中DAGSchaduler通過(guò)RDD之間的依賴關(guān)系劃分job而來(lái)的抡柿,每個(gè)stage里面有多個(gè)task舔琅,組成taskset有TaskSchaduler分發(fā)到各個(gè)executor中執(zhí)行,executor的生命周期是和app一樣的洲劣,即使沒有job運(yùn)行也是存在的备蚓,所以task可以快速啟動(dòng)讀取內(nèi)存進(jìn)行計(jì)算。?

3)hadoop的job只有map和reduce操作囱稽,表達(dá)能力比較欠缺而且在mr過(guò)程中會(huì)重復(fù)的讀寫hdfs郊尝,造成大量的io操作,多個(gè)job需要自己管理關(guān)系战惊。?

spark的迭代計(jì)算都是在內(nèi)存中進(jìn)行的流昏,API中提供了大量的RDD操作如join,groupby等吞获,而且通過(guò)DAG圖可以實(shí)現(xiàn)良好的容錯(cuò)况凉。

29.RDD機(jī)制??

答:rdd分布式彈性數(shù)據(jù)集衫哥,簡(jiǎn)單的理解成一種數(shù)據(jù)結(jié)構(gòu)茎刚,是spark框架上的通用貨幣。?

所有算子都是基于rdd來(lái)執(zhí)行的撤逢,不同的場(chǎng)景會(huì)有不同的rdd實(shí)現(xiàn)類,但是都可以進(jìn)行互相轉(zhuǎn)換。?

rdd執(zhí)行過(guò)程中會(huì)形成dag圖蚊荣,然后形成lineage保證容錯(cuò)性等初狰。 從物理的角度來(lái)看rdd存儲(chǔ)的是block和node之間的映射。

30互例、spark有哪些組件奢入??

答:主要有如下組件:

1)master:管理集群和節(jié)點(diǎn),不參與計(jì)算媳叨。?

2)worker:計(jì)算節(jié)點(diǎn)腥光,進(jìn)程本身不參與計(jì)算,和master匯報(bào)糊秆。?

3)Driver:運(yùn)行程序的main方法武福,創(chuàng)建spark context對(duì)象。?

4)spark context:控制整個(gè)application的生命周期痘番,包括dagsheduler和task scheduler等組件捉片。?

5)client:用戶提交程序的入口。

31汞舱、spark工作機(jī)制伍纫??

答:用戶在client端提交作業(yè)后,會(huì)由Driver運(yùn)行main方法并創(chuàng)建spark context上下文昂芜。?

執(zhí)行add算子莹规,形成dag圖輸入dagscheduler,按照add之間的依賴關(guān)系劃分stage輸入task scheduler泌神。 task scheduler會(huì)將stage劃分為task set分發(fā)到各個(gè)節(jié)點(diǎn)的executor中執(zhí)行良漱。

32、spark的優(yōu)化怎么做腻扇??

答: spark調(diào)優(yōu)比較復(fù)雜债热,但是大體可以分為三個(gè)方面來(lái)進(jìn)行,

1)平臺(tái)層面的調(diào)優(yōu):防止不必要的jar包分發(fā)幼苛,提高數(shù)據(jù)的本地性窒篱,選擇高效的存儲(chǔ)格式如parquet,

2)應(yīng)用程序?qū)用娴恼{(diào)優(yōu):過(guò)濾操作符的優(yōu)化降低過(guò)多小任務(wù)舶沿,降低單條記錄的資源開銷墙杯,處理數(shù)據(jù)傾斜,復(fù)用RDD進(jìn)行緩存括荡,作業(yè)并行化執(zhí)行等等高镐,

3)JVM層面的調(diào)優(yōu):設(shè)置合適的資源量,設(shè)置合理的JVM畸冲,啟用高效的序列化方法如kyro嫉髓,增大off head內(nèi)存等等

序列化在分布式系統(tǒng)中扮演著重要的角色观腊,優(yōu)化Spark程序時(shí),首當(dāng)其沖的就是對(duì)序列化方式的優(yōu)化算行。Spark為使用者提供兩種序列化方式:


Java serialization: 默認(rèn)的序列化方式梧油。


Kryo serialization: 相較于 Java serialization 的方式,速度更快州邢,空間占用更小儡陨,但并不支持所有的序列化格式,同時(shí)使用的時(shí)候需要注冊(cè)class量淌。spark-sql中默認(rèn)使用的是kyro的序列化方式骗村。

可以在spark-default.conf設(shè)置全局參數(shù),也可以代碼中初始化時(shí)對(duì)SparkConf設(shè)置?conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")?呀枢,該參數(shù)會(huì)同時(shí)作用于機(jī)器之間數(shù)據(jù)的shuffle操作以及序列化rdd到磁盤胚股,內(nèi)存。

Spark不將Kyro設(shè)置成默認(rèn)的序列化方式是因?yàn)樗枰獙?duì)類進(jìn)行注冊(cè)硫狞,官方強(qiáng)烈建議在一些網(wǎng)絡(luò)數(shù)據(jù)傳輸很大的應(yīng)用中使用kyro序列化信轿。

如果你要序列化的對(duì)象比較大,可以增加參數(shù)spark.kryoserializer.buffer所設(shè)置的值残吩。

如果你沒有注冊(cè)需要序列化的class财忽,Kyro依然可以照常工作,但會(huì)存儲(chǔ)每個(gè)對(duì)象的全類名(full class name)泣侮,這樣的使用方式往往比默認(rèn)的 Java serialization 還要浪費(fèi)更多的空間即彪。

可以設(shè)置?spark.kryo.registrationRequired?參數(shù)為?true,使用kyro時(shí)如果在應(yīng)用中有類沒有進(jìn)行注冊(cè)則會(huì)報(bào)錯(cuò):

如上這個(gè)錯(cuò)誤需要添加

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末活尊,一起剝皮案震驚了整個(gè)濱河市隶校,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌蛹锰,老刑警劉巖深胳,帶你破解...
    沈念sama閱讀 216,692評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異铜犬,居然都是意外死亡舞终,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,482評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門癣猾,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)敛劝,“玉大人,你說(shuō)我怎么就攤上這事纷宇】涿耍” “怎么了?”我有些...
    開封第一講書人閱讀 162,995評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵像捶,是天一觀的道長(zhǎng)上陕。 經(jīng)常有香客問我桩砰,道長(zhǎng),這世上最難降的妖魔是什么唆垃? 我笑而不...
    開封第一講書人閱讀 58,223評(píng)論 1 292
  • 正文 為了忘掉前任五芝,我火速辦了婚禮痘儡,結(jié)果婚禮上辕万,老公的妹妹穿的比我還像新娘。我一直安慰自己沉删,他們只是感情好渐尿,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,245評(píng)論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著矾瑰,像睡著了一般砖茸。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上殴穴,一...
    開封第一講書人閱讀 51,208評(píng)論 1 299
  • 那天凉夯,我揣著相機(jī)與錄音,去河邊找鬼采幌。 笑死劲够,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的休傍。 我是一名探鬼主播征绎,決...
    沈念sama閱讀 40,091評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼磨取!你這毒婦竟也來(lái)了人柿?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,929評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤忙厌,失蹤者是張志新(化名)和其女友劉穎凫岖,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體逢净,經(jīng)...
    沈念sama閱讀 45,346評(píng)論 1 311
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡哥放,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,570評(píng)論 2 333
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了汹胃。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片婶芭。...
    茶點(diǎn)故事閱讀 39,739評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖着饥,靈堂內(nèi)的尸體忽然破棺而出犀农,到底是詐尸還是另有隱情,我是刑警寧澤宰掉,帶...
    沈念sama閱讀 35,437評(píng)論 5 344
  • 正文 年R本政府宣布呵哨,位于F島的核電站赁濒,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏孟害。R本人自食惡果不足惜拒炎,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,037評(píng)論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望挨务。 院中可真熱鬧击你,春花似錦、人聲如沸谎柄。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,677評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)朝巫。三九已至鸿摇,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間劈猿,已是汗流浹背拙吉。 一陣腳步聲響...
    開封第一講書人閱讀 32,833評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留揪荣,地道東北人筷黔。 一個(gè)月前我還...
    沈念sama閱讀 47,760評(píng)論 2 369
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像变逃,于是被迫代替她去往敵國(guó)和親必逆。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,647評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容

  • Apache Spark 是專為大規(guī)模數(shù)據(jù)處理而設(shè)計(jì)的快速通用的計(jì)算引擎揽乱。Spark是UC Berkeley AM...
    大佛愛讀書閱讀 2,828評(píng)論 0 20
  • spark-submit的時(shí)候如何引入外部jar包 在通過(guò)spark-submit提交任務(wù)時(shí)名眉,可以通過(guò)添加配置參數(shù)...
    博弈史密斯閱讀 2,741評(píng)論 1 14
  • 前言 繼基礎(chǔ)篇講解了每個(gè)Spark開發(fā)人員都必須熟知的開發(fā)調(diào)優(yōu)與資源調(diào)優(yōu)之后,本文作為《Spark性能優(yōu)化指南》的...
    Alukar閱讀 873評(píng)論 0 2
  • 1 數(shù)據(jù)傾斜調(diào)優(yōu) 1.1 調(diào)優(yōu)概述 有的時(shí)候凰棉,我們可能會(huì)遇到大數(shù)據(jù)計(jì)算中一個(gè)最棘手的問題——數(shù)據(jù)傾斜损拢,此時(shí)Spar...
    wisfern閱讀 2,935評(píng)論 0 23
  • 這個(gè)是我當(dāng)時(shí)通篇下來(lái)最有底氣說(shuō)我有或舞,我有的一點(diǎn)荆姆! 從最初的生命之舞、到后來(lái)的艾萌舞社映凳,跳舞是我一直有接...
    lily北媽閱讀 316評(píng)論 2 1