1.Spark數(shù)據(jù)的本地化:移動(dòng)計(jì)算,而不是移動(dòng)數(shù)據(jù)
2.Spark中的數(shù)據(jù)本地化級(jí)別:
TaskSetManager 的 Locality Levels 分為以下五個(gè)級(jí)別:
1.PROCESS_LOCAL
2.NODE_LOCAL
3.NO_PREF
4.RACK_LOCAL
5.ANY
1.PROCESS_LOCAL進(jìn)程本地化:task要計(jì)算的數(shù)據(jù)在同一個(gè)Executor中
2.NODE_LOCAL節(jié)點(diǎn)本地化:速度比 PROCESS_LOCAL 稍慢吩坝,因?yàn)閿?shù)據(jù)需要在不同進(jìn)程之間傳遞或從文件中讀韧龊濉;
#情況一:task要計(jì)算的數(shù)據(jù)是在同一個(gè)Worker的不同Executor進(jìn)程中;
#情況二:task要計(jì)算的數(shù)據(jù)是在同一個(gè)Worker的磁盤上,或在 HDFS 上,恰好有 block 在同一個(gè)節(jié)點(diǎn)上饱亮。
Spark計(jì)算數(shù)據(jù)來源于HDFS,那么最好的數(shù)據(jù)本地化級(jí)別就是NODE_LOCAL
3.NODE_PREF沒有最佳位置這一說舍沙,數(shù)據(jù)從哪里訪問都一樣快近上,不需要位置優(yōu)先。比如說SparkSQL讀取MySql中的數(shù)據(jù)场勤;
4.RACK_LOCAL機(jī)架本地化戈锻,數(shù)據(jù)在同一機(jī)架的不同節(jié)點(diǎn)上。需要通過網(wǎng)絡(luò)傳輸數(shù)據(jù)及文件 IO和媳,比 NODE_LOCAL 慢格遭;
情況一:task計(jì)算的數(shù)據(jù)在Worker2的Executor中
情況二:task計(jì)算的數(shù)據(jù)在Worker2的磁盤上
5.ANY跨機(jī)架,數(shù)據(jù)在非同一機(jī)架的網(wǎng)絡(luò)上留瞳,速度最慢
3.Spark中的數(shù)據(jù)本地化由誰負(fù)責(zé)拒迅?
DAGScheduler,TaskScheduler?
val rdd1 = rdd1.cache
rdd1.map.filter.count()
Driver(TaskScheduler)在發(fā)送task之前她倘,首先應(yīng)該拿到RDD1緩存在哪一些節(jié)點(diǎn)上(node1,node2)-->這一步就是由DAGScheduler通過cacheManager對(duì)象調(diào)用getPreferredLocations()來拿到RDD1緩存在哪些節(jié)點(diǎn)上璧微,TaskScheduler根據(jù)這些節(jié)點(diǎn)來發(fā)送task。
? ? ? ? ? ? val rdd1 = sc.textFile("hdfs://...") ? ?//rdd1中封裝了是這個(gè)文件所對(duì)應(yīng)的block的位置硬梁,getPreferredLocation()-->TaskScheduler調(diào)用拿到partition所對(duì)應(yīng)的數(shù)據(jù)的位置
? ? ? ? ? ? rdd1.map.filter.count()
Driver(TaskScheduler)在發(fā)送task之前前硫,首先應(yīng)該拿到rdd1數(shù)據(jù)所在的位置(node1,node2)-->RDD1封裝了這個(gè)文件所對(duì)應(yīng)的block的位置,TaskScheduler通過調(diào)用getPreferredLocations()拿到partition所對(duì)應(yīng)的數(shù)據(jù)的位置荧止,TaskScheduler根據(jù)這些位置來發(fā)送相應(yīng)的task?
#總的來說:
Spark中的數(shù)據(jù)本地化由DAGScheduler和TaskScheduler共同負(fù)責(zé)屹电。
DAGScheduler切割Job阶剑,劃分Stage, 通過調(diào)用submitStage來提交一個(gè)Stage對(duì)應(yīng)的tasks,submitStage會(huì)調(diào)用submitMissingTasks,submitMissingTasks 確定每個(gè)需要計(jì)算的 task 的preferredLocations危号,通過調(diào)用getPreferrdeLocations()得到partition 的優(yōu)先位置牧愁,就是這個(gè) partition 對(duì)應(yīng)的 task 的優(yōu)先位置,對(duì)于要提交到TaskScheduler的TaskSet中的每一個(gè)task外莲,該task優(yōu)先位置與其對(duì)應(yīng)的partition對(duì)應(yīng)的優(yōu)先位置一致猪半。
TaskScheduler接收到了TaskSet后,TaskSchedulerImpl 會(huì)為每個(gè) TaskSet 創(chuàng)建一個(gè) TaskSetManager 對(duì)象偷线,該對(duì)象包含taskSet 所有 tasks磨确,并管理這些 tasks 的執(zhí)行,其中就包括計(jì)算 TaskSetManager 中的 tasks 都有哪些locality levels声邦,以便在調(diào)度和延遲調(diào)度 tasks 時(shí)發(fā)揮作用俐填。
4.Spark中的數(shù)據(jù)本地化流程圖
即某個(gè) task 計(jì)算節(jié)點(diǎn)與其輸入數(shù)據(jù)的位置關(guān)系,下面將要挖掘Spark 的調(diào)度系統(tǒng)如何產(chǎn)生這個(gè)結(jié)果翔忽,這一過程涉及 RDD、DAGScheduler盏檐、TaskScheduler歇式,搞懂了這一過程也就基本搞懂了 Spark 的 PreferredLocations(位置優(yōu)先策略)
第一步:PROCESS_LOCAL-->TaskScheduler首先根據(jù)數(shù)據(jù)所在的節(jié)點(diǎn)發(fā)送task,
如果task在Worker1的Executor1中等待了3s(這個(gè)3s是spark的默認(rèn)等待時(shí)間,通過spark.locality.wait來設(shè)置胡野,可以在SparkConf()中修改)材失,重試了5次,還是無法執(zhí)行硫豆;
TaskScheduler會(huì)降低數(shù)據(jù)本地化的級(jí)別龙巨,從PROCESS_LOCAL降到NODE_LOCAL
第二步:NODE_LOCAL-->TaskScheduler重新發(fā)送task到Worker1中的Executor2中執(zhí)行,
如果task在Worker1的Executor2中等待了3s熊响,重試了5次旨别,還是無法執(zhí)行
TaskScheduler會(huì)降低數(shù)據(jù)本地化的級(jí)別,從NODE_LOCAL降到RACK_LOCAL
第三步:RACK_LOCAL-->TaskScheduler重新發(fā)送task到Worker2中的Executor1中執(zhí)行汗茄。
第四步:當(dāng)task分配完成之后秸弛,task會(huì)通過所在Worker的Executor中的BlockManager來獲取數(shù)據(jù),如果BlockManager發(fā)現(xiàn)自己沒有數(shù)據(jù)洪碳,那么它會(huì)調(diào)用getRemote()方法递览,通過ConnectionManager與原task所在節(jié)點(diǎn)的BlockManager中的ConnectionManager先建立連接,然后通過TransferService(網(wǎng)絡(luò)傳輸組件)獲取數(shù)據(jù)瞳腌,通過網(wǎng)絡(luò)傳輸回task所在節(jié)點(diǎn)(這時(shí)候性能大幅下降绞铃,大量的網(wǎng)絡(luò)IO占用資源),計(jì)算后的結(jié)果返回給Driver嫂侍。
#總結(jié):
TaskScheduler在發(fā)送task的時(shí)候儿捧,會(huì)根據(jù)數(shù)據(jù)所在的節(jié)點(diǎn)發(fā)送task,這時(shí)候的數(shù)據(jù)本地化的級(jí)別是最高的荚坞,如果這個(gè)task在這個(gè)Executor中等待了三秒,重試發(fā)射了5次還是依然無法執(zhí)行纯命,那么TaskScheduler就會(huì)認(rèn)為這個(gè)Executor的計(jì)算資源滿了西剥,TaskScheduler會(huì)降低一級(jí)數(shù)據(jù)本地化的級(jí)別,重新發(fā)送task到其他的Executor中執(zhí)行亿汞,如果還是依然無法執(zhí)行瞭空,那么繼續(xù)降低數(shù)據(jù)本地化的級(jí)別...
現(xiàn)在想讓每一個(gè)task都能拿到最好的數(shù)據(jù)本地化級(jí)別,那么調(diào)優(yōu)點(diǎn)就是等待時(shí)間加長(zhǎng)疗我。注意咆畏!如果過度調(diào)大等待時(shí)間,雖然為每一個(gè)task都拿到了最好的數(shù)據(jù)本地化級(jí)別吴裤,但是我們job執(zhí)行的時(shí)間也會(huì)隨之延長(zhǎng)
spark.locality.wait3s//相當(dāng)于是全局的旧找,下面默認(rèn)以3s為準(zhǔn),手動(dòng)設(shè)置了麦牺,以手動(dòng)的為準(zhǔn)
spark.locality.wait.process
spark.locality.wait.node
spark.locality.wait.rack
newSparkConf.set("spark.locality.wait","100")
#