上篇介紹了spark的突出特點以及基本框架钥勋,下面給大家介紹下spark的基本數(shù)據(jù)結(jié)構(gòu)、spark任務(wù)調(diào)度的詳細(xì)流程以及spark中stage的劃分售淡。
5. spark的基本數(shù)據(jù)類型
RDD理张、DataFrame和DataSet可以說是spark獨有的三種基本的數(shù)據(jù)類型。Spark的核心概念是RDD (resilientdistributed dataset)召廷,指的是一個只讀的凳厢,可分區(qū)的分布式數(shù)據(jù)集,這個數(shù)據(jù)集的全部或部分可以緩存在內(nèi)存中竞慢,在多次計算間重用先紫。DataFrame是一個以RDD為基礎(chǔ)的,但卻是一種類似二維數(shù)據(jù)表的一種分布式數(shù)據(jù)集筹煮。與RDD不同的是遮精,前者帶有schema元信息,即DataFrame所表示的二維表數(shù)據(jù)集的每一列都帶有名稱和類型。這樣本冲,spark就可以使用sql操作dataframe准脂,像操作數(shù)據(jù)庫中的表一樣。目前檬洞,spark sql支持大多數(shù)的sql數(shù)據(jù)庫的操作狸膏。Dataset可以認(rèn)為是DataFrame的一個特例,主要區(qū)別是Dataset每一個record存儲的是一個強類型值而不是一個Row添怔。后面版本DataFrame會繼承DataSet湾戳,DataFrame和DataSet可以相互轉(zhuǎn)化,df.as[ElementType]這樣可以把DataFrame轉(zhuǎn)化為DataSet澎灸,ds.toDF()這樣可以把DataSet轉(zhuǎn)化為DataFrame院塞。創(chuàng)建Dataframe的代碼如下所示:
val df = spark.read.json("examples/src/main/resources/people.json")
// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
創(chuàng)建Dataset的代碼如下所示:
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface
case class Person(name: String, age: Long)
// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+
// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)
// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
6. spark scheduler(spark任務(wù)調(diào)度)
(1) 在使用spark-summit提交spark程序后,根據(jù)提交時指定(deploy-mode)的位置性昭,創(chuàng)建driver進程拦止,driver進程根據(jù)sparkconf中的配置,初始化sparkcontext糜颠。Sparkcontext的啟動后汹族,創(chuàng)建DAG Scheduler(將DAG圖分解成stage)和Task Scheduler(提交和監(jiān)控task)兩個調(diào)度模塊。
(2) driver進程根據(jù)配置參數(shù)向resource manager(資源管理器)申請資源(主要是用來執(zhí)行的executor)其兴,resource manager接到到了Application的注冊請求之后顶瞒,會使用自己的資源調(diào)度算法,在spark集群的worker上元旬,通知worker為application啟動多個Executor榴徐。
(3) executor創(chuàng)建后,會向resource manager進行資源及狀態(tài)反饋匀归,以便resource manager對executor進行狀態(tài)監(jiān)控坑资,如監(jiān)控到有失敗的executor,則會立即重新創(chuàng)建穆端。
(4) Executor會向taskScheduler反向注冊袱贮,以便獲取taskScheduler分配的task。
(5) Driver完成SparkContext初始化体啰,繼續(xù)執(zhí)行application程序攒巍,當(dāng)執(zhí)行到Action時,就會創(chuàng)建Job荒勇。并且由DAGScheduler將Job劃分多個Stage,每個Stage 由TaskSet 組成柒莉,并將TaskSet提交給taskScheduler,taskScheduler把TaskSet中的task依次提交給Executor, Executor在接收到task之后,會使用taskRunner(封裝task的線程池)來封裝task,然后沽翔,從Executor的線程池中取出一個線程來執(zhí)行task常柄。
就這樣Spark的每個Stage被作為TaskSet提交給Executor執(zhí)行,每個Task對應(yīng)一個RDD的partition,執(zhí)行我們的定義的算子和函數(shù)。直到所有操作執(zhí)行完為止西潘。如下圖所示:
7. Spark作業(yè)調(diào)度中stage劃分
Spark在接收到提交的作業(yè)后卷玉,DAGScheduler會根據(jù)RDD之間的依賴關(guān)系將作業(yè)劃分成多個stage,DAGSchedule在將劃分的stage提交給TASKSchedule喷市,TASKSchedule將每個stage分成多個task相种,交給executor執(zhí)行。task的個數(shù)等于stage末端的RDD的分區(qū)個數(shù)品姓。因此對了解stage的劃分尤為重要寝并。
在spark中,RDD之間的依賴關(guān)系有兩種:一種是窄依賴腹备,一種是寬依賴衬潦。窄依賴的描述是:父RDD的分區(qū)最多只會被子RDD的一個分區(qū)使用。寬依賴是:父RDD的一個分區(qū)會被子RDD的多個分區(qū)使用植酥。如下圖所示:
上圖中镀岛,以一豎線作為分界,左邊是窄依賴友驮,右邊是寬依賴漂羊。
Stage的劃分不僅根據(jù)RDD的依賴關(guān)系,還有一個原則是將依賴鏈斷開卸留,每個stage內(nèi)部可以并行運行走越,整個作業(yè)按照stage順序依次執(zhí)行,最終完成整個Job耻瑟。
實際劃分時旨指,DAGScheduler就是根據(jù)DAG圖,從圖的末端逆向遍歷整個依賴鏈喳整,一般是以一次shuffle為邊界來劃分的谆构。一般劃分stage是從程序執(zhí)行流程的最后往前劃分,遇到寬依賴就斷開算柳,遇到窄依賴就將將其加入當(dāng)前stage中。一個典型的RDD Graph如下圖所示:其中實線框是RDD姓言,RDD內(nèi)的實心矩形是各個分區(qū)瞬项,實線箭頭表示父子分區(qū)間依賴關(guān)系,虛線框表示stage何荚。針對下圖流程首先根據(jù)最后一步j(luò)oin(寬依賴)操作來作為劃分stage的邊界囱淋,再往左走,A和B之間有個group by也為寬依賴餐塘,也可作為stage劃分的邊界妥衣,所以我們將下圖劃分為三個stage。