zipWithIndex:首先基于分區(qū)索引? 然后基于分區(qū)內(nèi)元素索引 第一個(gè)元素是第一個(gè)分區(qū)的第一個(gè)元素 最后一個(gè)元素是最后一個(gè)分區(qū)的最后一個(gè)元素 Index的返回類型是Long類型而不是Int 如果RDD不止一個(gè)分區(qū)息堂,則觸發(fā)一個(gè)spark job痴晦,如果是根據(jù)groupBy()返回的RDD 不能保證一個(gè)分區(qū)內(nèi)的元素排序凌节,所以 如果需要確保每一個(gè)元素的索引序列,需要針對(duì)RDD使用sortByKey() 算子 進(jìn)行sort? 或者保存進(jìn)一個(gè)文件
val seqRdd = sc.parallelize(List("Mary","Jim","Green","Jack","Tony"))
seqRdd.mapPartitionsWithIndex{(index,iter) => iter.toList.map(x => ("partitionIndex:0"+Index,x)).toIterator}.collect().foreach(println)
// (partitionIndex:00,Mary)
// (partitionIndex:00,Jim)
// (partitionIndex:01,Green)
// (partitionIndex:01,Jack)
// (partitionIndex:01,Tony)
seqRdd.zipWithIndex().collect().foreach(println)
// (Mary,0)
// (Jim,1)
// (Green,2)
// (Jack,3)
// (Tony,4)
seqRdd.zipWithIndex().mapPartitionsWithIndex{(index,iter) => iter.toList.map(x => ("partitionIndex:0"+Index,x)).toIterator}.collect().foreach(println)
// (partitionIndex:00,(Mary,0))
// (partitionIndex:00,(Jim,1))
// (partitionIndex:01,(Green,2))
// (partitionIndex:01,(Jack,3))
// (partitionIndex:01,(Tony,4))
zip:
將當(dāng)前RDD與另外一個(gè)RDD進(jìn)行zip操作双泪,返回從兩個(gè)RDD返回的第一個(gè)元素對(duì)/第二個(gè)元素對(duì) 作為key-value對(duì)排截,假定兩個(gè)RDD 具有兩個(gè)相同數(shù)據(jù)量的分區(qū)且每個(gè)分區(qū)的元素?cái)?shù)量相同椭迎,如果存在某一個(gè)分區(qū)對(duì)晴裹,其元素?cái)?shù)量不相同,則拋出SparkException:Can Only zip RDDs with the same number of elements in each partition
CollectAsMap:將當(dāng)前RDD的key-value 對(duì) 以Map形式返回至master節(jié)點(diǎn)祝辣,如果一個(gè)key有多個(gè)value贴妻。只能返回一個(gè)value
seqRdd.zip(seqRdd).collectAsMap().foreach(println);seqRdd.zip(seqRdd).foreach(println);
// (Jim,Jim) (Jack,Jack) (Green,Green) (Mary,Mary) (Tony,Tony)
// (Mary,Mary)(Jim,Jim)(Green,Green)(Jack,Jack)(Tony,Tony)
zipWithUniqueId:
使用唯一的Long id 與當(dāng)前RDD 進(jìn)行Zips操作,在第K個(gè)分區(qū)內(nèi)的元素(其分區(qū)內(nèi)索引分別為0蝙斜,1名惩,2,3孕荠,4)的id分別為:k,k+n,k+2*n,k+3*n... n為分區(qū)總數(shù)绢片,因此在這里存在一定的跳躍,但是與zipWithIndex不同岛琼,該方法不會(huì)觸發(fā)spark job,其他與其一致巢株。
seqRdd.zipWithUniqueId().collect.foreach(println)
// (Mary,0)
// (Jim,2)
// (Green,1)
// (Jack,3)
// (Tony,5)
// 諸如 combineByKey等算子槐瑞,在指定參數(shù)名及其具體值時(shí),需要注意其參數(shù)名應(yīng)與聲明時(shí)參數(shù)名一致阁苞。
val seq1Rdd = sc.parallelize(List("Mary","Jim","Green","Jack","Tony"))
val seq2Rdd = sc.parallelize(List(1,2,1,2,1))
val zipRdd = seq2Rdd.zip(seq1Rdd)
combineRdd:方法向后兼容困檩,使用一系列常用的聚合函數(shù)對(duì)RDD的每個(gè)Key 聯(lián)合組成其value值
val combineRdd = zipRdd.combineByKey(createCombiner = (x:String) => List(x),mergeValue = (x:List[String],y:String) => x.:+(y),mergeCombiners = (x:List[String],y:List[String]) => x.:::(y))
combineRdd.foreach(println)
// (2,(List(Jack,Jim)))
// (1,(List(Green,Tony,Mary)))
解析:
combineByKey屬于Key-value算子祠挫,做的是聚合操作,這種變換不會(huì)觸發(fā)作業(yè)的提交悼沿,主要有三個(gè)參數(shù):
createCombiner function:一個(gè)組合函數(shù) 用于將RDD[K,V] 中的V轉(zhuǎn)換成一個(gè)新的值C1
mergeValue function: 合并值函數(shù),將一個(gè)C1類型值和一個(gè)V類型值合并成一個(gè)C2類型等舔,輸入?yún)?shù)為(C1,V) 輸出為新的C2
mergeCombiners function:合并組合器函數(shù) 用于將兩個(gè)C2類型值合并成一個(gè)C3類型 輸入?yún)?shù)為(C2,C2) 輸出為C2
val createCombine = (x:String) => List(x)
val mergeValue = (x:List[String],y:String) => y :: x
// Adda an element at the beginning of this list.{{{ 1 :: List(2,3) = List(2,3).::(1) = List(1,2,3)}}}
val mergerCombiners = (x:List[String],y:List[String]) => x ::: y // Adds the elements of a given list in front of this list. {{{ List(1,2) ::: List(3,4) = List(3,4).:::List(1,2) = List(1,2,3,4)}}}
val combineRdd2 = zipRdd.combineByKey(createCombine,mergeValue,mergerCombiners)
combineRdd2.foreach(println)
// (2,(List(Jim,Jack)))
// (1,(List(Mary糟趾,Tony慌植,Green)))
aggregateByKey:
使用combine組合函數(shù)和一個(gè)中性netural“zero value”對(duì)每一個(gè)key的多個(gè)value進(jìn)行聚合操作,可以返回不同于RDD中固有數(shù)據(jù)類型V的結(jié)果類型U义郑,一個(gè)merge操作將V 轉(zhuǎn)換為U蝶柿,另外一個(gè)merge操作將兩個(gè)U聚合,(集合可反復(fù)遍歷) 之前的數(shù)據(jù)類型轉(zhuǎn)換操作是在一個(gè)分區(qū)內(nèi)部進(jìn)行的? 之后的將U進(jìn)行聚合操作是在分區(qū)之間進(jìn)行的非驮。
為避免內(nèi)存泄漏交汤,這些函數(shù)均可以被修改并且返回他們的第一個(gè)參數(shù)而不是創(chuàng)建一個(gè)新的U實(shí)例。 計(jì)算的時(shí)候與分區(qū)的關(guān)系很大劫笙,注意分區(qū)的作用芙扎。
val seqRdd = sc.parallelize(List(("cat",2),("dog",12),("cat",12),("cat",5),("mouse",4),("mouse",2)))
seqRdd.mapPartitionsWithIndex{(index,iter) => iter.toList.map(x => ("partitionIndex:0"+Index,x)).toIterator}.collect().foreach(println)
// (partitionIndex:00,(cat,2))
// (partitionIndex:00,(dog,12))
// (partitionIndex:00,(cat,12))
// (partitionIndex:01,(cat,5))
// (partitionIndex:01,(mouse,4))
// (partitionIndex:01,(mouse,2))
val aggregateFuncRdd = seqRdd.aggregateByKey[Int](zeroValue = 0)(seqOp = (value:Int,zeroValue:Int) => math.max(value,zeroValue),combOp = (value1:Int,value2:Int) => value1 + value2)
aggregateFuncRdd.collect.foreach(println)
// (dog,12) (cat,17) (mouse,4)
seqRdd.aggregateByKey[Int](zeroValue = 10)(seqOp = (value:Int,zeroValue:Int) => math.max(value,zeroValue),combOp = (value1:Int,value2:Int) => value1 + value2).collect.foreach(println)
// (dog,12) (cat,22) (mouse,10)? 首先和當(dāng)前分區(qū)內(nèi)的同一key的多個(gè)value與zeroValue結(jié)合進(jìn)行seqOp運(yùn)算,分區(qū)0內(nèi)cat 對(duì)應(yīng)的value最大為12填大,dog為12戒洼,分區(qū)1內(nèi)cat對(duì)應(yīng)的value最大為5 mouse為4? 再將其與ZeroValue進(jìn)行計(jì)算 0-cat-Max 12 0-dog-Max 12 1-cat-Max 10 1-mouse-Max 10? 然后再將兩個(gè)分區(qū)的同一key的多個(gè)value進(jìn)行相加? 得到 (dog,12) (cat,22) (mouse,10)
seqRdd.aggregateByKey[Int](zeroValue = 10)(seqOp = (value:Int,zeroValue:Int) => math.max(value,zeroValue),combOp = (value1:Int,value2:Int) => value1 + value2).collect.foreach(println)
// (dog,100) (cat,200) (mouse,100)
countByKey:
計(jì)算每一個(gè)Key對(duì)應(yīng)的元素?cái)?shù)量,并講結(jié)果collect至本地Map格式栋盹,結(jié)果Map必須相對(duì)較小 才能夠全部加載進(jìn)Driver's 內(nèi)存 處理大數(shù)據(jù)量的話? 可以返回一個(gè)RDD[K,Long] 代替一個(gè)Map
seqRdd.countByKey() // self.mapValue(_ => 1L).reduceByKey(_+_).collect().toMap
seqRdd.countByValue()// map(value => (value,null)).countByKey()? 當(dāng)前value不是key-value對(duì)的value? 而是指代整個(gè)key-value 以(value施逾,count)對(duì)的本地Map形式返回當(dāng)前RDD的不同value的個(gè)數(shù)(此處應(yīng)該是一個(gè)Bug)
seqRdd.map(_._2).countByValue()
sortByKey():
進(jìn)行了shuffle操作,shuffle之后被重新分區(qū)例获,排序靠前的元素在低序號(hào)分區(qū)汉额,排序靠后的元素在高序號(hào)分區(qū),每一個(gè)分區(qū)內(nèi)包含一段已經(jīng)排序好的元素榨汤,針對(duì)結(jié)果如果調(diào)用collect 或者save算子蠕搜,將會(huì)返回或者輸出一段已經(jīng)排序好的記錄,調(diào)用save算子時(shí)將會(huì)在文件系統(tǒng)內(nèi)生成多個(gè)依照key進(jìn)行排序的“part-x”的文件
val ze = sc.parallelize(List("dog","tiger","tac","cat","gnu","panther"),2)
val zf = ze.map(x => (x.length,x))
zf.mapPartitionsWithIndex{(index,iter) => iter.toList.map(x => ("partitionIndex:0"+Index,x)).toIterator}.collect().foreach(println)
// (partitionIndex:00,(3,dog))
// (partitionIndex:00,(5,tiger))
// (partitionIndex:00,(3,tac))
// (partitionIndex:01,(3,cat))
// (partitionIndex:01,(3,gnu))
// (partitionIndex:01,(7,panther))
zf.sortByKey().foreachPartition(x => println("sortByKey....",x.toList.mkString(",")))
// (sortByKey....,(5,tiger),(7,panther))
// (sortByKey....,(3,dog),(3,tac),(3,cat),(3,gnu))
zf.sortByKey().mapPartitionsWithIndex{(index,iter) => iter.toList.map(x => ("partitionIndex:0"+Index,x)).toIterator}.collect().foreach(println)
// (partitionIndex:00,(3,dog))
// (partitionIndex:00,(3,tac))
// (partitionIndex:00,(3,cat))
// (partitionIndex:00,(3,gnu))
// (partitionIndex:01,(5,tiger))
// (partitionIndex:01,(7,panther))
foldByKey():使用一個(gè)組合函數(shù) 和中性值 neutral “zero value” 數(shù)據(jù)類型與Value類型相同收壕,執(zhí)行時(shí)首先針對(duì)某一分區(qū)內(nèi)相關(guān)元素進(jìn)行組合函數(shù)計(jì)算 然后 結(jié)合 zero value 進(jìn)行計(jì)算妓灌。。 然后根據(jù)Key將不同分區(qū)的組合計(jì)算結(jié)果進(jìn)行組合函數(shù)運(yùn)算
zf.foldByKey(zeroValue="#")((x:String,y:String) => x + y)..mapPartitionsWithIndex{(index,iter) => iter.toList.map(x => ("foldByKey:"+Index,x)).toIterator}.collect().foreach(println)
// (foldByKey:1,(3,#dogtac#catgnu))
// (foldByKey:1,(7,#panther))
// (foldByKey:1,(5,#tiger))
Join:
fullOuterJoin:全連接 (key,(Some(v1),Some(v2)))
join:連接 (key,(v1,v2))
leftOuterJoin:左連接 (key,(v1,Some(v2)))
rightOuterJoin:右連接 (key,(Some(v1),v2))