The following table summarizes terms you’ll see used to refer to cluster concepts:
Term Meaning
Application User program built on Spark. Consists of a driver program and executors on the cluster.
Application jar A jar containing the user's Spark application. In some cases users will want to create an "uber jar" containing their application along with its dependencies. The user's jar should never include Hadoop or Spark libraries, however, these will be added at runtime.
Driver program The process running the main() function of the application and creating the SparkContext
Cluster manager An external service for acquiring resources on the cluster (e.g. standalone manager, Mesos, YARN)
Deploy mode Distinguishes where the driver process runs. In "cluster" mode, the framework launches the driver inside of the cluster. In "client" mode, the submitter launches the driver outside of the cluster.
Worker node Any node that can run application code in the cluster
Executor A process launched for an application on a worker node, that runs tasks and keeps data in memory or disk storage across them. Each application has its own executors.
Task A unit of work that will be sent to one executor
Job A parallel computation consisting of multiple tasks that gets spawned in response to a Spark action (e.g. save, collect); you'll see this term used in the driver's logs.
Stage Each job gets divided into smaller sets of tasks called stages that depend on each other (similar to the map and reduce stages in MapReduce); you'll see this term used in the driver's logs.
spark://HOST:PORT Connect to the given Spark standalone cluster master. The port must be whichever one your master is configured to use, which is 7077 by default.
In general, configuration values explicitly set on a SparkConf take the highest precedence, then flags passed to spark-submit, then values in the defaults file.
Note that JARs and files are copied to the working directory for each SparkContext on the executor nodes. This can use up a significant amount of space over time and will need to be cleaned up. With YARN, cleanup is handled automatically, and with Spark standalone, automatic cleanup can be configured with the spark.worker.cleanup.appDataTtl property.
spark.driver.memory 1g Amount of memory to use for the driver process, i.e. where SparkContext is initialized. (e.g. 1g, 2g).
Note: In client mode, this config must not be set through the SparkConf directly in your application, because the driver JVM has already started at that point. Instead, please set this through the --driver-memory command line option or in your default properties file.
spark.driver.extraClassPath spark.driver.extraJavaOptions spark.driver.extraLibraryPath
Container的一些基本概念和工作流程如下:
(1)? Container是YARN中資源的抽象,它封裝了某個節(jié)點(diǎn)上一定量的資源(CPU和內(nèi)存兩類資源)。它跟Linux Container沒有任何關(guān)系束凑,僅僅是YARN提出的一個概念(從實(shí)現(xiàn)上看,可看做一個可序列化/反序列化的Java類)。
(2)? Container由ApplicationMaster向ResourceManager申請的,由ResouceManager中的資源調(diào)度器異步分配給ApplicationMaster喘批;
(3) Container的運(yùn)行是由ApplicationMaster向資源所在的NodeManager發(fā)起的,Container運(yùn)行時需提供內(nèi)部執(zhí)行的任務(wù)命令(可以使任何命令凯楔,比如java、Python锦募、C++進(jìn)程啟動命令均可)以及該命令執(zhí)行所需的環(huán)境變量和外部資源(比如詞典文件摆屯、可執(zhí)行文件、jar包等)糠亩。
另外虐骑,一個應(yīng)用程序所需的Container分為兩大類,如下:
(1) 運(yùn)行ApplicationMaster的Container:這是由ResourceManager(向內(nèi)部的資源調(diào)度器)申請和啟動的赎线,用戶提交應(yīng)用程序時廷没,可指定唯一的ApplicationMaster所需的資源;
(2) 運(yùn)行各類任務(wù)的Container:這是由ApplicationMaster向ResourceManager申請的垂寥,并由ApplicationMaster與NodeManager通信以啟動之颠黎。
以上兩類Container可能在任意節(jié)點(diǎn)上,它們的位置通常而言是隨機(jī)的滞项,即ApplicationMaster可能與它管理的任務(wù)運(yùn)行在一個節(jié)點(diǎn)上狭归。
Container是YARN中最重要的概念之一,懂得該概念對于理解YARN的資源模型至關(guān)重要文判,希望本文對學(xué)習(xí)Container這一概念有所幫助过椎。
在學(xué)習(xí)Container之前,大家應(yīng)先了解YARN的基本架構(gòu)律杠、工作流程潭流。比如,大家應(yīng)該了解一個應(yīng)用程序的運(yùn)行過程如下:
步驟1:用戶將應(yīng)用程序提交到ResourceManager上柜去;
步驟2:ResourceManager為應(yīng)用程序ApplicationMaster申請資源灰嫉,并與某個NodeManager通信,以啟動ApplicationMaster嗓奢;
步驟3:ApplicationMaster與ResourceManager通信讼撒,為內(nèi)部要執(zhí)行的任務(wù)申請資源,一旦得到資源后股耽,將于NodeManager通信根盒,以啟動對應(yīng)的任務(wù)。
步驟4:所有任務(wù)運(yùn)行完成后物蝙,ApplicationMaster向ResourceManager注銷炎滞,整個應(yīng)用程序運(yùn)行結(jié)束。
上述步驟中诬乞,步驟2~3涉及到資源申請與使用册赛,而這正是Container出現(xiàn)的地方钠导。
在YARN中,ResourceManager中包含一個插拔式的組件:資源調(diào)度器森瘪,它負(fù)責(zé)資源的管理和調(diào)度牡属,是YARN中最核心的組件之一。
當(dāng)向資源調(diào)度器申請資源扼睬,需向它發(fā)送一個ResourceRequest列表逮栅,其中,每個ResourceRequest描述了一個資源單元的詳細(xì)需求窗宇,而資源調(diào)度器則為之返回分配到的資源描述Container措伐。每個ResourceRequest可看做一個可序列化Java對象,包含的字段信息(直接給出了Protocol Buffers定義)如下:
message ResourceRequestProto {
optional PriorityProto priority = 1; // 資源優(yōu)先級
optional string resource_name = 2; // 資源名稱(期望資源所在的host军俊、rack名稱等)
optional ResourceProto capability = 3; // 資源量(僅支持CPU和內(nèi)存兩種資源)
optional int32 num_containers = 4; // 滿足以上條件的資源個數(shù)
optional bool relax_locality = 5 [default = true];? //是否支持本地性松弛(2.1.0-beta之后的版本新增加的废士,具體參考我的這篇文章:Hadoop新特性、改進(jìn)蝇完、優(yōu)化和Bug分析系列3:YARN-392)
}
從上面定義可以看出,可以為應(yīng)用程序申請任意大小的資源量(CPU和內(nèi)存)矗蕊,且默認(rèn)情況下資源是本地性松弛的短蜕,即申請優(yōu)先級為10,資源名稱為“node11”傻咖,資源量為<2GB, 1cpu>的5份資源時朋魔,如果節(jié)點(diǎn)node11上沒有滿足要求的資源,則優(yōu)先找node11同一機(jī)架上其他節(jié)點(diǎn)上滿足要求的資源卿操,如果仍找不到警检,則找其他機(jī)架上的資源。而如果你一定要node11上的節(jié)點(diǎn)害淤,則將relax_locality置為false扇雕。
發(fā)出資源請求后,資源調(diào)度器并不會立馬為它返回滿足要求的資源窥摄,而需要應(yīng)用程序的ApplicationMaster不斷與ResourceManager通信镶奉,探測分配到的資源,并拉去過來使用崭放。一旦分配到資源后哨苛,ApplicatioMaster可從資源調(diào)度器那獲取以Container表示的資源,Container可看做一個可序列化Java對象币砂,包含的字段信息(直接給出了Protocol Buffers定義)如下:
message ContainerProto {
optional ContainerIdProto id = 1; //container id
optional NodeIdProto nodeId = 2; //container(資源)所在節(jié)點(diǎn)
optional string node_http_address = 3;
optional ResourceProto resource = 4; //container資源量
optional PriorityProto priority = 5; //container優(yōu)先級
optional hadoop.common.TokenProto container_token = 6; //container token建峭,用于安全認(rèn)證
}
一般而言,每個Container可用于運(yùn)行一個任務(wù)决摧。ApplicationMaster收到一個或多個Container后亿蒸,再次將該Container進(jìn)一步分配給內(nèi)部的某個任務(wù)凑兰,一旦確定該任務(wù)后,ApplicationMaster需將該任務(wù)運(yùn)行環(huán)境(包含運(yùn)行命令祝懂、環(huán)境變量票摇、依賴的外部文件等)連同Container中的資源信息封裝到ContainerLaunchContext對象中,進(jìn)而與對應(yīng)的NodeManager通信砚蓬,以啟動該任務(wù)矢门。ContainerLaunchContext包含的字段信息(直接給出了Protocol Buffers定義)如下:
message ContainerLaunchContextProto {
repeated StringLocalResourceMapProto localResources = 1; //Container啟動以來的外部資源
optional bytes tokens = 2;
repeated StringBytesMapProto service_data = 3;
repeated StringStringMapProto environment = 4; //Container啟動所需的環(huán)境變量
repeated string command = 5; //Container內(nèi)部運(yùn)行的任務(wù)啟動命令,如果是MapReduce的話灰蛙,Map/Reduce Task啟動命令就在該字段中
repeated ApplicationACLMapProto application_ACLs = 6;
}
每個ContainerLaunchContext和對應(yīng)的Container信息(被封裝到了ContainerToken中)將再次被封裝到StartContainerRequest中祟剔,也就是說,ApplicationMaster最終發(fā)送給NodeManager的是StartContainerRequest摩梧,每個StartContainerRequest對應(yīng)一個Container和任務(wù)物延。
Delay scheduling機(jī)制是為了提高數(shù)據(jù)本地性提出的,它的基本思想是仅父,當(dāng)一個節(jié)點(diǎn)出現(xiàn)空閑資源時叛薯,調(diào)度器按照調(diào)度策略應(yīng)將該資源分配給job1,但是job1沒有滿足locality的任務(wù)笙纤,考慮到性能問題耗溜,調(diào)度器暫時跳過該作業(yè),而將空閑資源分配給其他有l(wèi)ocality任務(wù)的作業(yè)省容,今后集群出現(xiàn)空閑資源時抖拴,job1將一直被跳過,知道它有一個滿足locality的任務(wù)腥椒,或者達(dá)到了管理員事先配置的最長跳過時間阿宅,這時候不得不將資源分配給job1(不能讓人家再等了啊,親)笼蛛。
YARN調(diào)度器是一個resource-centric調(diào)度器洒放,調(diào)度時間復(fù)雜度是O(number of nodes),而JobTracker調(diào)度器是一個task-centric調(diào)度器伐弹,調(diào)度時間復(fù)雜度是O(number of tasks)拉馋,在設(shè)計YARN調(diào)度策略時,一定要牢記這一點(diǎn)惨好,這是保證YARN高擴(kuò)展性的前提煌茴,切莫混淆了這兩種調(diào)度。
對于FIFO,在hadoop中日川,就是有節(jié)點(diǎn)匯報心跳蔓腐,然后遍歷所有任務(wù)找出優(yōu)先級最高的滿足本地性的任務(wù),調(diào)度任務(wù)執(zhí)行龄句;在yarn中根據(jù)各個隊列資源請求回论,然后遍歷節(jié)點(diǎn)散罕,找到合適資源,將容器列表分派給隊列傀蓉。
Yarn欧漱,AM從RM申請資源,是pull模式葬燎,AM要求NM啟動container是push模式误甚, NM向RM匯報心跳,是Pull模式谱净。