前方高能減速慢行演熟!
在上一篇RDD結(jié)構(gòu)已經(jīng)介紹完了囱桨。雖然RDD結(jié)構(gòu)是spark設(shè)計(jì)思想最重要的組成万哪,但是沒有輔助的功能只有結(jié)構(gòu)又不能獨(dú)立使用。真正使RDD完成計(jì)算優(yōu)化的脐恩,就是今天我們要講到的spark RDD的另一個(gè)重要組成部分RDD算子镐侯。
一、RDD算子的定義
我給RDD算子的定義是:用來(lái)生成或處理RDD的方法叫做RDD算子驶冒。RDD算子就是一些方法苟翻,在Spark框架中起到運(yùn)算符的作用。算子用來(lái)構(gòu)建RDD及數(shù)據(jù)之間的關(guān)系骗污。數(shù)據(jù)可以由算子轉(zhuǎn)換成RDD崇猫,也可以由RDD產(chǎn)生新RDD,或者將RDD持久化到磁盤或內(nèi)存需忿。
從技術(shù)角度講RDD算子可能比較枯燥诅炉,我們舉個(gè)里生活學(xué)習(xí)中的例子來(lái)類比RDD算子的作用。
完成計(jì)算需要什么呢屋厘?
需要數(shù)據(jù)載體和運(yùn)算方式涕烧。數(shù)據(jù)載體可以是數(shù)字,數(shù)組汗洒,集合澈魄,分區(qū),矩陣等仲翎。一個(gè)普通的計(jì)算器痹扇,它的運(yùn)算單位是數(shù)字,而運(yùn)算符號(hào)是加減乘除溯香,這樣就可以得到結(jié)果并輸出了鲫构。一個(gè)矩陣通過加減乘除也可以得到結(jié)果,但是結(jié)果跟計(jì)算器的加減乘除一樣嗎玫坛?非也结笨!
所以說(shuō)加減乘除在不同的計(jì)算框架作用是不同的,而加減乘除這樣的符號(hào)就是運(yùn)算方式。在spark計(jì)算框架有自己的運(yùn)算單位(RDD)和自己的運(yùn)算符(RDD算子)炕吸。
是不是很抽象伐憾?下面來(lái)點(diǎn)具體的。
二赫模、RDD算子的使用
Spark算子非常豐富树肃,有幾十個(gè),開發(fā)者把算子組合使用瀑罗,從一個(gè)基礎(chǔ)的RDD計(jì)算出想要的結(jié)果胸嘴。并且算子是優(yōu)化Spark計(jì)算框架的主要依據(jù)。
我們以top算子舉例斩祭,rdd.top(n)獲取RDD的前n個(gè)排序后的結(jié)果劣像。
例如計(jì)算:文件a的2倍與文件b的TOP 3結(jié)果。
- 窄依賴優(yōu)化:如圖中的RDD1,2,3在Stage3中被優(yōu)化為RDD1到RDD3直接計(jì)算摧玫。是否可以直接計(jì)算是由算子的寬窄依賴決定耳奕,推薦使用數(shù)據(jù)流向區(qū)分寬窄依賴: partiton流向子RDD的多個(gè)partiton屬于寬依賴,父RDD的partiton流向子RDD一個(gè)partiton或多個(gè)partiton流向一個(gè)子RDD的partiton屬于窄依賴诬像。上圖中的RDD3和RDD4做top(3)操作屋群,top是先排序后取出前3個(gè)值,排序過程屬于寬依賴颅停,spark計(jì)算過程是逆向的DAG(DAG和拓?fù)渑判蛳乱黄榻B)谓晌,RDD5不能直接計(jì)算掠拳,必須等待依賴的RDD完成計(jì)算癞揉,我把這種算子叫做不可優(yōu)化算子(計(jì)算流程不可優(yōu)化,必須等待父RDD的完成)溺欧,Action算子(后文講解)都是不可優(yōu)化算子喊熟,Transformation算子也有很多不可優(yōu)化的算子(寬依賴算子),如:groupbykey姐刁,reducebykey芥牌,cogroup,join等聂使。
- 數(shù)據(jù)量?jī)?yōu)化:上圖中的a文件數(shù)據(jù)乘2壁拉,為什么前面有一個(gè)filter,假設(shè)filter過濾后的數(shù)據(jù)減少到三分之一柏靶,那么對(duì)后續(xù)RDD和shuffle的操作優(yōu)化可想而知弃理。而這只是提供一個(gè)思路,并不是說(shuō)有的過濾都是高效的屎蜓。
- 利用存儲(chǔ)算子優(yōu)化Lineage:RDD算子中除了save(輸出結(jié)果)算子之外痘昌,還有幾個(gè)比較特別的算子,用來(lái)保存中間結(jié)果的,如:persist辆苔,cache 和 checkpoint 算灸,當(dāng)RDD的數(shù)據(jù)保持不變并被復(fù)用多次的時(shí)候可以用它們臨時(shí)保存計(jì)算結(jié)果。
1). cache和persist
修改當(dāng)前RDD的存儲(chǔ)方案StorageLevel驻啤,默認(rèn)狀態(tài)下與persist級(jí)別是一樣的MEMORY_ONLY級(jí)別菲驴,保存到內(nèi)存,內(nèi)存不足選擇磁盤街佑。
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
def cache(): this.type = persist()
這2個(gè)方法都不會(huì)觸發(fā)任務(wù)谢翎,只是修改了RDD的存儲(chǔ)方案,當(dāng)RDD被執(zhí)行的時(shí)候按照方案存儲(chǔ)到相應(yīng)位置沐旨。而checkpoint會(huì)單獨(dú)執(zhí)行一個(gè)job森逮,并把數(shù)據(jù)寫入磁盤。
注:不要把RDD cache和Dataframe cache混淆磁携。Dataframe cache將在spark sql中介紹褒侧。
2).checkpoint
檢查RDD是否被物化或計(jì)算,一般在程序運(yùn)行比較長(zhǎng)或者計(jì)算量大的情況下谊迄,需要進(jìn)行Checkpoint闷供。這樣可以避免在運(yùn)行中出現(xiàn)異常導(dǎo)致RDD回溯代價(jià)過大的問題。Checkpoint會(huì)把數(shù)據(jù)寫在本地磁盤上统诺。Checkpoint的數(shù)據(jù)可以被同一session的多個(gè)job共用歪脏。
三、RDD算子之間的關(guān)系
算子從否觸發(fā)job的角度劃分粮呢,可以分為Transformation算子和Action算子婿失,Transformation算子不會(huì)產(chǎn)生job,是惰性算子啄寡,只記錄該算子產(chǎn)生的RDD及父RDD的partiton之間的關(guān)系豪硅,而Action算子將觸發(fā)job,完成依賴關(guān)系的所有計(jì)算操作挺物。
那么如果一個(gè)程序里有多個(gè)action算子怎么辦懒浮?
順序完成action操作,每個(gè)action算子產(chǎn)生一個(gè)job识藤,上一job的結(jié)果轉(zhuǎn)換成RDD砚著,繼續(xù)給后續(xù)的action使用。多數(shù)action返回結(jié)果都不是RDD痴昧,而transformation算子的返回結(jié)果都是RDD,但可能是多個(gè)RDD(如:randomSplit稽穆,將一個(gè)RDD切分成多個(gè)RDD)。
一張圖了解所有RDD算子之間的關(guān)系
上圖劃分為4個(gè)大塊剪个,從上到下我們順序講起:
- 圖中的RDD dependency正是RDD結(jié)構(gòu)中的private var deps: Seq[Dependency[_]]秧骑,dependency類被兩個(gè)類繼承版确,NarrowDependency(窄依賴)和ShuffleDependency(寬依賴)。窄依賴又分onetoonedependency和rangedependency乎折,這是窄依賴提供的2種抽樣方式1對(duì)1數(shù)據(jù)抽樣和平衡數(shù)據(jù)抽樣绒疗,返回值都是一個(gè)partitonid的list集合。
- 第二層骂澄,是提供RDD底層計(jì)算的基本算法吓蘑,繼承了RDD,并實(shí)現(xiàn)了dependency的一種或多種依賴關(guān)系的計(jì)算邏輯坟冲,并互相調(diào)用實(shí)現(xiàn)更復(fù)雜的功能磨镶。
- 最下層是Spark API,利用RDD基本的計(jì)算實(shí)現(xiàn)RDD所有的算子健提,并調(diào)用多個(gè)底層RDD算子實(shí)現(xiàn)復(fù)雜的功能琳猫。
- 右邊的泛型,是scala的一種類型私痹,可以理解為類的泛型脐嫂,泛指編譯時(shí)被抽象的類型。Spark利用scala的這一特性把依賴關(guān)系抽象成一種泛型結(jié)構(gòu)紊遵,并不需要真實(shí)的數(shù)據(jù)類型參與編譯過程账千。編譯的結(jié)構(gòu)類由序列化和反序列化到集群的計(jì)算節(jié)點(diǎn)取數(shù)并計(jì)算。
Transformation:轉(zhuǎn)換算子暗膜,這類轉(zhuǎn)換并不觸發(fā)提交作業(yè)匀奏,完成作業(yè)中間過程處理。Transformation按照數(shù)據(jù)類型又分為兩種学搜,value數(shù)據(jù)類型算子和key-value數(shù)據(jù)類型算子娃善。
1) Value數(shù)據(jù)類型的Transformation算子
Map,flatMap恒水,mapPartitions会放,glom饲齐,union钉凌,cartesian,groupBy捂人,filter御雕,distinct,subtract滥搭,sample酸纲,takeSample
2)Key-Value數(shù)據(jù)類型的Transfromation算子
mapValues,combineByKey瑟匆,reduceByKey闽坡,partitionBy,cogroup,join疾嗅,leftOuterJoin和rightOuterJoin
Action: 行動(dòng)算子外厂,這類算子會(huì)觸發(fā)SparkContext提交Job作業(yè)。Action算子是用來(lái)整合和輸出數(shù)據(jù)的代承,主要包括以下幾種:
Foreach汁蝶,HDFS,saveAsTextFile论悴,saveAsObjectFile掖棉, collect,collectAsMap膀估,reduceByKeyLocally幔亥,lookup,count察纯,top紫谷,reduce,fold捐寥,aggregate
注:上述舉例算子可能不全笤昨,隨著spark的更新也會(huì)不斷有新的算子加入其中。