sparkRddFunctionNote

zipWithIndex:首先基于分區(qū)索引? 然后基于分區(qū)內(nèi)元素索引 第一個(gè)元素是第一個(gè)分區(qū)的第一個(gè)元素 最后一個(gè)元素是最后一個(gè)分區(qū)的最后一個(gè)元素 Index的返回類型是Long類型而不是Int 如果RDD不止一個(gè)分區(qū)息堂,則觸發(fā)一個(gè)spark job痴晦,如果是根據(jù)groupBy()返回的RDD 不能保證一個(gè)分區(qū)內(nèi)的元素排序凌节,所以 如果需要確保每一個(gè)元素的索引序列,需要針對(duì)RDD使用sortByKey() 算子 進(jìn)行sort? 或者保存進(jìn)一個(gè)文件

val seqRdd = sc.parallelize(List("Mary","Jim","Green","Jack","Tony"))

seqRdd.mapPartitionsWithIndex{(index,iter) => iter.toList.map(x => ("partitionIndex:0"+Index,x)).toIterator}.collect().foreach(println)

// (partitionIndex:00,Mary)

// (partitionIndex:00,Jim)

// (partitionIndex:01,Green)

// (partitionIndex:01,Jack)

// (partitionIndex:01,Tony)

seqRdd.zipWithIndex().collect().foreach(println)

// (Mary,0)

// (Jim,1)

// (Green,2)

// (Jack,3)

// (Tony,4)

seqRdd.zipWithIndex().mapPartitionsWithIndex{(index,iter) => iter.toList.map(x => ("partitionIndex:0"+Index,x)).toIterator}.collect().foreach(println)

// (partitionIndex:00,(Mary,0))

// (partitionIndex:00,(Jim,1))

// (partitionIndex:01,(Green,2))

// (partitionIndex:01,(Jack,3))

// (partitionIndex:01,(Tony,4))

zip:

將當(dāng)前RDD與另外一個(gè)RDD進(jìn)行zip操作双泪,返回從兩個(gè)RDD返回的第一個(gè)元素對(duì)/第二個(gè)元素對(duì) 作為key-value對(duì)排截,假定兩個(gè)RDD 具有兩個(gè)相同數(shù)據(jù)量的分區(qū)且每個(gè)分區(qū)的元素?cái)?shù)量相同椭迎,如果存在某一個(gè)分區(qū)對(duì)晴裹,其元素?cái)?shù)量不相同,則拋出SparkException:Can Only zip RDDs with the same number of elements in each partition

CollectAsMap:將當(dāng)前RDD的key-value 對(duì) 以Map形式返回至master節(jié)點(diǎn)祝辣,如果一個(gè)key有多個(gè)value贴妻。只能返回一個(gè)value

seqRdd.zip(seqRdd).collectAsMap().foreach(println);seqRdd.zip(seqRdd).foreach(println);

// (Jim,Jim) (Jack,Jack) (Green,Green) (Mary,Mary) (Tony,Tony)

// (Mary,Mary)(Jim,Jim)(Green,Green)(Jack,Jack)(Tony,Tony)

zipWithUniqueId:

使用唯一的Long id 與當(dāng)前RDD 進(jìn)行Zips操作,在第K個(gè)分區(qū)內(nèi)的元素(其分區(qū)內(nèi)索引分別為0蝙斜,1名惩,2,3孕荠,4)的id分別為:k,k+n,k+2*n,k+3*n... n為分區(qū)總數(shù)绢片,因此在這里存在一定的跳躍,但是與zipWithIndex不同岛琼,該方法不會(huì)觸發(fā)spark job,其他與其一致巢株。

seqRdd.zipWithUniqueId().collect.foreach(println)

// (Mary,0)

// (Jim,2)

// (Green,1)

// (Jack,3)

// (Tony,5)

// 諸如 combineByKey等算子槐瑞,在指定參數(shù)名及其具體值時(shí),需要注意其參數(shù)名應(yīng)與聲明時(shí)參數(shù)名一致阁苞。

val seq1Rdd = sc.parallelize(List("Mary","Jim","Green","Jack","Tony"))

val seq2Rdd = sc.parallelize(List(1,2,1,2,1))

val zipRdd = seq2Rdd.zip(seq1Rdd)

combineRdd:方法向后兼容困檩,使用一系列常用的聚合函數(shù)對(duì)RDD的每個(gè)Key 聯(lián)合組成其value值

val combineRdd = zipRdd.combineByKey(createCombiner = (x:String) => List(x),mergeValue = (x:List[String],y:String) => x.:+(y),mergeCombiners = (x:List[String],y:List[String]) => x.:::(y))

combineRdd.foreach(println)

// (2,(List(Jack,Jim)))

// (1,(List(Green,Tony,Mary)))

解析:

combineByKey屬于Key-value算子祠挫,做的是聚合操作,這種變換不會(huì)觸發(fā)作業(yè)的提交悼沿,主要有三個(gè)參數(shù):

createCombiner function:一個(gè)組合函數(shù) 用于將RDD[K,V] 中的V轉(zhuǎn)換成一個(gè)新的值C1

mergeValue function: 合并值函數(shù),將一個(gè)C1類型值和一個(gè)V類型值合并成一個(gè)C2類型等舔,輸入?yún)?shù)為(C1,V) 輸出為新的C2

mergeCombiners function:合并組合器函數(shù) 用于將兩個(gè)C2類型值合并成一個(gè)C3類型 輸入?yún)?shù)為(C2,C2) 輸出為C2

val createCombine = (x:String) => List(x)

val mergeValue = (x:List[String],y:String) => y :: x

// Adda an element at the beginning of this list.{{{ 1 :: List(2,3) = List(2,3).::(1) = List(1,2,3)}}}

val mergerCombiners = (x:List[String],y:List[String]) => x ::: y // Adds the elements of a given list in front of this list. {{{ List(1,2) ::: List(3,4) = List(3,4).:::List(1,2) = List(1,2,3,4)}}}

val combineRdd2 = zipRdd.combineByKey(createCombine,mergeValue,mergerCombiners)

combineRdd2.foreach(println)

// (2,(List(Jim,Jack)))

// (1,(List(Mary糟趾,Tony慌植,Green)))

aggregateByKey:

使用combine組合函數(shù)和一個(gè)中性netural“zero value”對(duì)每一個(gè)key的多個(gè)value進(jìn)行聚合操作,可以返回不同于RDD中固有數(shù)據(jù)類型V的結(jié)果類型U义郑,一個(gè)merge操作將V 轉(zhuǎn)換為U蝶柿,另外一個(gè)merge操作將兩個(gè)U聚合,(集合可反復(fù)遍歷) 之前的數(shù)據(jù)類型轉(zhuǎn)換操作是在一個(gè)分區(qū)內(nèi)部進(jìn)行的? 之后的將U進(jìn)行聚合操作是在分區(qū)之間進(jìn)行的非驮。

為避免內(nèi)存泄漏交汤,這些函數(shù)均可以被修改并且返回他們的第一個(gè)參數(shù)而不是創(chuàng)建一個(gè)新的U實(shí)例。 計(jì)算的時(shí)候與分區(qū)的關(guān)系很大劫笙,注意分區(qū)的作用芙扎。

val seqRdd = sc.parallelize(List(("cat",2),("dog",12),("cat",12),("cat",5),("mouse",4),("mouse",2)))

seqRdd.mapPartitionsWithIndex{(index,iter) => iter.toList.map(x => ("partitionIndex:0"+Index,x)).toIterator}.collect().foreach(println)

// (partitionIndex:00,(cat,2))

// (partitionIndex:00,(dog,12))

// (partitionIndex:00,(cat,12))

// (partitionIndex:01,(cat,5))

// (partitionIndex:01,(mouse,4))

// (partitionIndex:01,(mouse,2))

val aggregateFuncRdd = seqRdd.aggregateByKey[Int](zeroValue = 0)(seqOp = (value:Int,zeroValue:Int) => math.max(value,zeroValue),combOp = (value1:Int,value2:Int) => value1 + value2)

aggregateFuncRdd.collect.foreach(println)

// (dog,12) (cat,17) (mouse,4)

seqRdd.aggregateByKey[Int](zeroValue = 10)(seqOp = (value:Int,zeroValue:Int) => math.max(value,zeroValue),combOp = (value1:Int,value2:Int) => value1 + value2).collect.foreach(println)

// (dog,12) (cat,22) (mouse,10)? 首先和當(dāng)前分區(qū)內(nèi)的同一key的多個(gè)value與zeroValue結(jié)合進(jìn)行seqOp運(yùn)算,分區(qū)0內(nèi)cat 對(duì)應(yīng)的value最大為12填大,dog為12戒洼,分區(qū)1內(nèi)cat對(duì)應(yīng)的value最大為5 mouse為4? 再將其與ZeroValue進(jìn)行計(jì)算 0-cat-Max 12 0-dog-Max 12 1-cat-Max 10 1-mouse-Max 10? 然后再將兩個(gè)分區(qū)的同一key的多個(gè)value進(jìn)行相加? 得到 (dog,12) (cat,22) (mouse,10)

seqRdd.aggregateByKey[Int](zeroValue = 10)(seqOp = (value:Int,zeroValue:Int) => math.max(value,zeroValue),combOp = (value1:Int,value2:Int) => value1 + value2).collect.foreach(println)

// (dog,100) (cat,200) (mouse,100)

countByKey:

計(jì)算每一個(gè)Key對(duì)應(yīng)的元素?cái)?shù)量,并講結(jié)果collect至本地Map格式栋盹,結(jié)果Map必須相對(duì)較小 才能夠全部加載進(jìn)Driver's 內(nèi)存 處理大數(shù)據(jù)量的話? 可以返回一個(gè)RDD[K,Long] 代替一個(gè)Map

seqRdd.countByKey() // self.mapValue(_ => 1L).reduceByKey(_+_).collect().toMap

seqRdd.countByValue()// map(value => (value,null)).countByKey()? 當(dāng)前value不是key-value對(duì)的value? 而是指代整個(gè)key-value 以(value施逾,count)對(duì)的本地Map形式返回當(dāng)前RDD的不同value的個(gè)數(shù)(此處應(yīng)該是一個(gè)Bug)

seqRdd.map(_._2).countByValue()

sortByKey():

進(jìn)行了shuffle操作,shuffle之后被重新分區(qū)例获,排序靠前的元素在低序號(hào)分區(qū)汉额,排序靠后的元素在高序號(hào)分區(qū),每一個(gè)分區(qū)內(nèi)包含一段已經(jīng)排序好的元素榨汤,針對(duì)結(jié)果如果調(diào)用collect 或者save算子蠕搜,將會(huì)返回或者輸出一段已經(jīng)排序好的記錄,調(diào)用save算子時(shí)將會(huì)在文件系統(tǒng)內(nèi)生成多個(gè)依照key進(jìn)行排序的“part-x”的文件

val ze = sc.parallelize(List("dog","tiger","tac","cat","gnu","panther"),2)

val zf = ze.map(x => (x.length,x))

zf.mapPartitionsWithIndex{(index,iter) => iter.toList.map(x => ("partitionIndex:0"+Index,x)).toIterator}.collect().foreach(println)

// (partitionIndex:00,(3,dog))

// (partitionIndex:00,(5,tiger))

// (partitionIndex:00,(3,tac))

// (partitionIndex:01,(3,cat))

// (partitionIndex:01,(3,gnu))

// (partitionIndex:01,(7,panther))

zf.sortByKey().foreachPartition(x => println("sortByKey....",x.toList.mkString(",")))

// (sortByKey....,(5,tiger),(7,panther))

// (sortByKey....,(3,dog),(3,tac),(3,cat),(3,gnu))

zf.sortByKey().mapPartitionsWithIndex{(index,iter) => iter.toList.map(x => ("partitionIndex:0"+Index,x)).toIterator}.collect().foreach(println)

// (partitionIndex:00,(3,dog))

// (partitionIndex:00,(3,tac))

// (partitionIndex:00,(3,cat))

// (partitionIndex:00,(3,gnu))

// (partitionIndex:01,(5,tiger))

// (partitionIndex:01,(7,panther))

foldByKey():使用一個(gè)組合函數(shù) 和中性值 neutral “zero value” 數(shù)據(jù)類型與Value類型相同收壕,執(zhí)行時(shí)首先針對(duì)某一分區(qū)內(nèi)相關(guān)元素進(jìn)行組合函數(shù)計(jì)算 然后 結(jié)合 zero value 進(jìn)行計(jì)算妓灌。。 然后根據(jù)Key將不同分區(qū)的組合計(jì)算結(jié)果進(jìn)行組合函數(shù)運(yùn)算

zf.foldByKey(zeroValue="#")((x:String,y:String) => x + y)..mapPartitionsWithIndex{(index,iter) => iter.toList.map(x => ("foldByKey:"+Index,x)).toIterator}.collect().foreach(println)

// (foldByKey:1,(3,#dogtac#catgnu))

// (foldByKey:1,(7,#panther))

// (foldByKey:1,(5,#tiger))

Join:

fullOuterJoin:全連接 (key,(Some(v1),Some(v2)))

join:連接 (key,(v1,v2))

leftOuterJoin:左連接 (key,(v1,Some(v2)))

rightOuterJoin:右連接 (key,(Some(v1),v2))

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末蜜宪,一起剝皮案震驚了整個(gè)濱河市虫埂,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌圃验,老刑警劉巖掉伏,帶你破解...
    沈念sama閱讀 218,204評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡斧散,警方通過(guò)查閱死者的電腦和手機(jī)供常,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,091評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)鸡捐,“玉大人栈暇,你說(shuō)我怎么就攤上這事」烤担” “怎么了源祈?”我有些...
    開(kāi)封第一講書人閱讀 164,548評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)鹿寨。 經(jīng)常有香客問(wèn)我新博,道長(zhǎng),這世上最難降的妖魔是什么脚草? 我笑而不...
    開(kāi)封第一講書人閱讀 58,657評(píng)論 1 293
  • 正文 為了忘掉前任赫悄,我火速辦了婚禮,結(jié)果婚禮上馏慨,老公的妹妹穿的比我還像新娘埂淮。我一直安慰自己,他們只是感情好写隶,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,689評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布倔撞。 她就那樣靜靜地躺著,像睡著了一般慕趴。 火紅的嫁衣襯著肌膚如雪痪蝇。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書人閱讀 51,554評(píng)論 1 305
  • 那天冕房,我揣著相機(jī)與錄音躏啰,去河邊找鬼。 笑死耙册,一個(gè)胖子當(dāng)著我的面吹牛给僵,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播详拙,決...
    沈念sama閱讀 40,302評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼帝际,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了饶辙?” 一聲冷哼從身側(cè)響起蹲诀,我...
    開(kāi)封第一講書人閱讀 39,216評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎弃揽,沒(méi)想到半個(gè)月后侧甫,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體珊佣,經(jīng)...
    沈念sama閱讀 45,661評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,851評(píng)論 3 336
  • 正文 我和宋清朗相戀三年披粟,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片冷冗。...
    茶點(diǎn)故事閱讀 39,977評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡守屉,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出蒿辙,到底是詐尸還是另有隱情拇泛,我是刑警寧澤,帶...
    沈念sama閱讀 35,697評(píng)論 5 347
  • 正文 年R本政府宣布思灌,位于F島的核電站俺叭,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏泰偿。R本人自食惡果不足惜熄守,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,306評(píng)論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望耗跛。 院中可真熱鬧裕照,春花似錦、人聲如沸调塌。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 31,898評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)羔砾。三九已至负间,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間姜凄,已是汗流浹背政溃。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 33,019評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留檀葛,地道東北人玩祟。 一個(gè)月前我還...
    沈念sama閱讀 48,138評(píng)論 3 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像屿聋,于是被迫代替她去往敵國(guó)和親空扎。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,927評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容