一.概述
Spark數(shù)據(jù)本地化即計(jì)算向數(shù)據(jù)移動(dòng)舒萎,但數(shù)據(jù)塊所在的Executor不一定有足夠的的計(jì)算資源提供登澜,為了讓task能盡可能的以最優(yōu)本地化級(jí)別(Locality Levels)來(lái)啟動(dòng)亦渗,Spark的延遲調(diào)度應(yīng)運(yùn)而生窟感,資源不夠可在該Locality Levels對(duì)應(yīng)的限制時(shí)間內(nèi)重試,超過(guò)限制時(shí)間后還無(wú)法啟動(dòng)則降低Locality Levels再嘗試啟動(dòng)薄翅。
二.本地化級(jí)別(Locality Levels)
Spark目前支持以下幾種本地化級(jí)別:
- 1.PROCESS_LOCAL:進(jìn)程本地化,表示 task 要計(jì)算的數(shù)據(jù)在同一個(gè) Executor 中。
- 2.NODE_LOCAL: 節(jié)點(diǎn)本地化皇忿,速度稍慢,因?yàn)閿?shù)據(jù)需要在不同的進(jìn)程之間傳遞或從文件中讀取坦仍。分為兩種情況鳍烁,第一種:task 要計(jì)算的數(shù)據(jù)是在同一個(gè) worker 的不同 Executor 進(jìn)程中。第二種:task 要計(jì)算的數(shù)據(jù)是在同一個(gè) worker 的磁盤上繁扎,或在 HDFS 上恰好有 block 在同一個(gè)節(jié)點(diǎn)上幔荒。如果 Spark 要計(jì)算的數(shù)據(jù)來(lái)源于 HDFS 上,那么最好的本地化級(jí)別就是 NODE_LOCAL梳玫。
- 3.NO_PREF: 沒(méi)有最佳位置爹梁,數(shù)據(jù)從哪訪問(wèn)都一樣快,不需要位置優(yōu)先提澎。比如 Spark SQL 從 Mysql 中讀取數(shù)據(jù)姚垃。
- 4.RACK_LOCAL:機(jī)架本地化,數(shù)據(jù)在同一機(jī)架的不同節(jié)點(diǎn)上虱朵。需要通過(guò)網(wǎng)絡(luò)傳輸數(shù)據(jù)以及文件 IO莉炉,比 NODE_LOCAL 慢钓账。
- 5.ANY:跨機(jī)架,數(shù)據(jù)在非同一機(jī)架的網(wǎng)絡(luò)上絮宁,速度最慢梆暮。
三.Spark 的數(shù)據(jù)本地化由誰(shuí)來(lái)負(fù)責(zé)
DAGScheduler 切割Job,劃分Stage, 通過(guò)調(diào)用 submitStage 來(lái)提交一個(gè)Stage 對(duì)應(yīng)的 Tasks绍昂,submitStage 會(huì)調(diào)用 submitMissingTasks, submitMissingTasks 確定每個(gè)需要計(jì)算的 task 的preferredLocations啦粹,通過(guò)調(diào)用 getPreferrdeLocations方法得到 partition 的優(yōu)先位置,就是這個(gè) partition 對(duì)應(yīng)的 task 的優(yōu)先位置窘游,對(duì)于要提交到 TaskScheduler 的 TaskSet 中的每一個(gè)Task 唠椭,該 Task 優(yōu)先位置與其對(duì)應(yīng)的 partition 對(duì)應(yīng)的優(yōu)先位置一致。
TaskScheduler 接收到了 TaskSet 后忍饰,TaskScheduler會(huì)為每個(gè)TaskSet創(chuàng)建一個(gè)TaskSetMagager來(lái)對(duì)其Task進(jìn)行管理贪嫂,TaskSetMagager中包含TaskSet 所有 Task,并管理這些 Task 的執(zhí)行艾蓝,在初始化TaskSetMagager的時(shí)候就會(huì)通過(guò)computeValidLocalityLevels計(jì)算該TaskSet包含的Locality Levels力崇,以便在調(diào)度和延遲調(diào)度 tasks 時(shí)發(fā)揮作用。
總的來(lái)說(shuō)赢织,Spark 中的數(shù)據(jù)本地化是由 DAGScheduler 和 TaskScheduler 共同負(fù)責(zé)的亮靴。
四.Spark是如何進(jìn)行調(diào)度
Locality Levels表示了計(jì)算節(jié)點(diǎn)與輸入數(shù)據(jù)位置的關(guān)系,下面以一個(gè)圖來(lái)展開 Spark 是如何進(jìn)行調(diào)度的于置。這一個(gè)過(guò)程會(huì)涉及 RDD, DAGScheduler , TaskScheduler茧吊。
1.PROCESS_LOCAL
TaskScheduler 根據(jù)數(shù)據(jù)的位置向數(shù)據(jù)節(jié)點(diǎn)發(fā)送 Task 任務(wù)。如果這個(gè)任務(wù)在 worker1 的 Executor 中等待了 3 秒八毯。(默認(rèn)的搓侄,可以通過(guò)spark.locality.wait 來(lái)設(shè)置),可以通過(guò) SparkConf() 來(lái)修改宪彩,重試了 5 次之后休讳,還是無(wú)法執(zhí)行,TaskScheduler 就會(huì)降低數(shù)據(jù)本地化的級(jí)別尿孔,從 PROCESS_LOCAL 降到 NODE_LOCAL俊柔。
2.NODE_LOCAL
TaskScheduler 重新發(fā)送 task 到 worker1 中的 Executor2 中執(zhí)行,如果 Task 在worker1 的 Executor2 中等待了 3 秒活合,重試了 5 次雏婶,還是無(wú)法執(zhí)行,TaskScheduler 就會(huì)降低數(shù)據(jù)本地化的級(jí)別白指,從 NODE_LOCAL 降到 RACK_LOCAL留晚。
3.RACK_LOCAL
TaskScheduler重新發(fā)送 Task 到 worker2 中的 Executor1 中執(zhí)行。
4.獲取數(shù)據(jù)執(zhí)行
當(dāng) Task 分配完成之后告嘲,Task 會(huì)通過(guò)所在的 worker 的 Executor 中的 BlockManager 來(lái)獲取數(shù)據(jù)错维。如果 BlockManager 發(fā)現(xiàn)自己沒(méi)有數(shù)據(jù)奖地,那么它會(huì)調(diào)用 getRemoteValues 方法,通過(guò) BlockManagerSlaveEndpoint與Driver所在節(jié)點(diǎn)的BlockManagerMaster中的BlockManagerMasterEndpoint先建立連接赋焕,獲取數(shù)據(jù)所在的BlockManager的地址参歹,然后通過(guò)BlockTransferService(網(wǎng)絡(luò)傳輸組件)獲取數(shù)據(jù),通過(guò)網(wǎng)絡(luò)傳輸回Task所在節(jié)點(diǎn)(這時(shí)候性能大幅下降隆判,大量的網(wǎng)絡(luò)IO占用資源)犬庇,之后就開始計(jì)算流程。
四.優(yōu)化建議
TaskScheduler在發(fā)送 Task 的時(shí)候侨嘀,會(huì)根據(jù)數(shù)據(jù)所在的節(jié)點(diǎn)發(fā)送 Task 臭挽,這時(shí)候的數(shù)據(jù)本地化的級(jí)別是最高的,如果這個(gè) Task 在這個(gè)Executor中等待了3秒咬腕,重試發(fā)射了5次還是依然無(wú)法執(zhí)行欢峰,那么TaskScheduler就會(huì)認(rèn)為這個(gè)Executor的計(jì)算資源滿了,TaskScheduler會(huì)降低 1 級(jí)數(shù)據(jù)本地化的級(jí)別郎汪,重新發(fā)送 Task 到其他的Executor中執(zhí)行赤赊,如果還是依然無(wú)法執(zhí)行,那么繼續(xù)降低數(shù)據(jù)本地化的級(jí)別...
如果想讓每一個(gè) Task 都能拿到最好的數(shù)據(jù)本地化級(jí)別煞赢,那么調(diào)優(yōu)點(diǎn)就是等待時(shí)間加長(zhǎng)。注意哄孤!如果過(guò)度調(diào)大等待時(shí)間照筑,雖然為每一個(gè) Task 都拿到了最好的數(shù)據(jù)本地化級(jí)別,但是我們 Job 執(zhí)行的時(shí)間也會(huì)隨之延長(zhǎng)瘦陈。
屬性名稱 | 默認(rèn)值 | 含義 |
---|---|---|
spark.locality.wait | 3000 | 以下幾個(gè)參數(shù)是關(guān)于Spark數(shù)據(jù)本地性的凝危。本參數(shù)是以毫秒為單位啟動(dòng)本地?cái)?shù)據(jù)task的等待時(shí)間,如果超出就啟動(dòng)下一本地優(yōu)先級(jí)別的task晨逝。該設(shè)置同樣可以應(yīng)用到各優(yōu)先級(jí)別的本地性之間(本地進(jìn)程 -> 本地節(jié)點(diǎn) -> 本地機(jī)架 -> 任意節(jié)點(diǎn) )蛾默,當(dāng)然,也可以通過(guò)spark.locality.wait.node等參數(shù)設(shè)置不同優(yōu)先級(jí)別的本地性 |
spark.locality.wait.process | spark.locality.wait | 本地進(jìn)程級(jí)別的本地等待時(shí)間 |
spark.locality.wait.node | spark.locality.wait | 本地節(jié)點(diǎn)級(jí)別的本地等待時(shí)間 |
spark.locality.wait.rack | spark.locality.wait | 本地機(jī)架級(jí)別的本地等待時(shí)間 |
可以在代碼里面這樣設(shè)置:
new SparkConf.set("spark.locality.wait","1000")
五.參考資料
1.Spark 的 數(shù)據(jù)本地化捉貌,提供最佳的計(jì)算節(jié)點(diǎn)支鸡,終于入門了
2.Spark性能調(diào)優(yōu)篇六之調(diào)節(jié)數(shù)據(jù)本地化等待時(shí)長(zhǎng)