spark RDD常用函數(shù)/操作
文中的代碼均可以在spark-shell中運行里覆。
transformations
map(func)
集合內(nèi)的每個元素通過function映射為新元素
val a = Array(1,2,3,4)
val pa = sc.parallelize(a)
val resultRdd = pa.map( _ + 1)
注意對于所有transformation操作勇凭,生成的都是一個新的RDD(這里就是resultRdd),并不實際進(jìn)行運算极谊,只有對RDD進(jìn)行action操作時才會實際計算并產(chǎn)生結(jié)果:
scala> resultRdd.collect
res3: Array[Int] = Array(2, 3, 4, 5)
以下的transformation操作同理吐限。
filter(func)
通過func
過濾集合的元素澡为,func
的返回值必須是Boolean
類型
val a = Array(1,2,3,4)
val pa = sc.parallelize(a)
val resultRdd = pa.filter( _ > 1)
scala> resultRdd.collect
res4: Array[Int] = Array(2, 3, 4)
flatmap(func)
通過func
將集合內(nèi)的每一元素,映射為一個序列(具體的是TraversableOnce[?]摸屠,這里可以不用管這個類型谓罗,spark會自己作隱式轉(zhuǎn)換,一般的可以順序迭代的序列都可以)季二。
說起來可能不好理解檩咱,舉個例子。還是[1,2,3,4]
吧胯舷,假設(shè)func是這樣的:x => Array(x+0.1, x+0.2)
刻蚯,也就是返回一個序?qū)Γ琭latMap流程可以看作是先對每個元素執(zhí)行func桑嘶,得到
[(1.1, 1.2), (2.1, 2.2), (3.1, 3.2), (4.1, 4.2)]
最后將所有的序列展平炊汹,就得到:
[1.1, 1.2, 2.1, 2.2, 3.1, 3.2, 4.1, 4.2]
代碼形式:
val a = Array(1,2,3,4)
val pa = sc.parallelize(a)
val resultRdd = pa.flatMap( x => Array(x+0.1,x+0.2) )
scala> resultRdd.collect
res8: Array[Double] = Array(1.1, 1.2, 2.1, 2.2, 3.1, 3.2, 4.1, 4.2)
mapPartitions(func)
和map類似,但它是在RDD的每個分區(qū)分別運行逃顶,可以理解成將一個分區(qū)內(nèi)的元素映射成一個新分區(qū)讨便,最后將所有新分區(qū)拼起來成為一個新RDD充甚。
當(dāng)對T
類型的RDD使用此函數(shù)時,func
的簽名必須是Iterator<T> => Iterator<U>
val a = Array(1,2,3,4)
val pa = sc.parallelize(a)
val resultRdd = pa.mapPartitions(iter => iter.map(_+1)) // 不要將這里的iter.map和RDD的map弄混了器钟,這是scala內(nèi)置的針對集合的操作
scala> resultRdd.collect
res11: Array[Int] = Array(2, 3, 4, 5)
mapPartitionsWithIndex(func)
類似mapPartitions(func)
津坑,但是多提供一個分區(qū)的索引號信息
所以對于元素為T
類型的RDD妙蔗,func
的類型簽名應(yīng)該是(Int, Iterator<T>) => Iterator<U>
val a = Array(1,2,3,4)
val pa = sc.parallelize(a)
val resultRdd = pa.mapPartitionsWithIndex( (index, iter) => iter.map( x => (x,s"分區(qū)$index")) )
scala> resultRdd.collect
res14: Array[(Int, String)] = Array((1,分區(qū)0), (2,分區(qū)1), (3,分區(qū)2), (4,分區(qū)3))
可以看到各個分區(qū)的情況傲霸,這里在集群結(jié)構(gòu)不同的時候結(jié)果也會不同,這取決于RDD分布在哪些分區(qū)上眉反。
sample(withReplacement, fraction, seed)
采樣函數(shù)昙啄,以一定的概率對數(shù)據(jù)進(jìn)行采樣
- withReplacement: 第一個參數(shù)決定在采樣完成后是否將樣本再放回去,類似于抽簽完成后再把簽放回去留給后面的人抽寸五。
- fraction: 理解成概率比較好梳凛,也就是說每個元素以fraction的概率被抽到。這個答案解釋得比較好
- seed: 隨機(jī)數(shù)生成器的種子
val a = Array(1,2,3,4)
val pa = sc.parallelize(a)
val resultRdd = pa.sample(true,0.5)
resultRdd.collect
多次運行的結(jié)果不同:
scala> pa.sample(false,0.5).collect
res52: Array[Int] = Array(1, 3)
scala> pa.sample(false,0.5).collect
res53: Array[Int] = Array(1, 2)
withReplacement
設(shè)為true
梳杏,也就是可以放回的情況韧拒,這可能會產(chǎn)生重復(fù)元素:
scala> pa.sample(true,0.5).collect
res60: Array[Int] = Array(1, 2, 2)
scala> pa.sample(true,0.5).collect
res61: Array[Int] = Array(3, 4)
union(otherDataset)
求并集。
示例:
val pa = sc.parallelize( Array(1,2))
val pb = sc.parallelize(Array(3,4))
pa.union(pb).collect
結(jié)果:
res63: Array[Int] = Array(1, 2, 3十性, 4)
intersection(otherDataset)
求交集叛溢。
示例:
val pa = sc.parallelize( Array(1,2,3))
val pb = sc.parallelize(Array(3,4,5))
pa.intersection(pb).collect
結(jié)果:
res66: Array[Int] = Array(3)
distinct([numTasks]))
去重。numTasks
是可選參數(shù)劲适,表示分配成幾個任務(wù)執(zhí)行楷掉。
示例:
val pa = sc.parallelize(Array(0,1,1,2,2,3))
pa.distinct.collect
結(jié)果:
res92: Array[Int] = Array(0, 1, 2, 3)
groupByKey([numTasks])
分組函數(shù)。
- 對類型 (K, V) 的數(shù)據(jù)集使用霞势,返回(K, Iterable<V>)類型的數(shù)據(jù)集
- 如果想在分組后使用
sum,average
等聚合函數(shù)烹植,最好使用reduceByKey
或aggregateByKey
,這將獲得更好的性能 - 默認(rèn)的并行度依賴于父RDD愕贡,也可以傳入可選參數(shù)
numTasks
指定并行任務(wù)數(shù)量草雕。
示例:
val pa = sc.parallelize(Array( "a" -> 1,"a" ->2, "b" -> 3))
pa.groupByKey.collect
結(jié)果:
res99: Array[(String, Iterable[Int])] = Array((a,CompactBuffer(1, 2)), (b,CompactBuffer(3)))
reduceByKey(func, [numTasks])
按照key分組然后聚集,類似于SQL中的groupby之后再使用聚集函數(shù)固以。
當(dāng)一個 (K, V) 類型的數(shù)據(jù)集調(diào)用此函數(shù)墩虹, 返回一個同樣是(K, V) 類型的數(shù)據(jù)集。
示例:
val pa = sc.parallelize(Array( "a" -> 1,"a" ->2,"a" ->3, "b" -> 4))
pa.reduceByKey( (x,y) => x+y).collect
結(jié)果:
res110: Array[(String, Int)] = Array((a,6), (b,4))
其實就是先分組一下嘴纺,再對每一組內(nèi)進(jìn)行reduce败晴。
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
先分組,再在每個分組內(nèi)調(diào)用aggregate
進(jìn)行折疊栽渴,與reduceByKey
的區(qū)別在于折疊結(jié)果可以是不同的類型尖坤。
對(K, V) 的數(shù)據(jù)集調(diào)用,返回 (K, U)類型的數(shù)據(jù)集闲擦。
可以參考aggregate
慢味。
val pa = sc.parallelize(Array("a" -> 1,"a" ->2,"a" ->3, "b" -> 4))
val r = pa.aggregateByKey("")(
(x:String,y:Int) => x+y.toString*2,
(x:String,y:String) => x+y
)
結(jié)果:
scala> r.collect
res135: Array[(String, String)] = Array((a,112233), (b,44))
sortByKey([ascending], [numTasks])
排序
- ascending: 可選场梆,是否升序排序
- numTasks: 可選,并發(fā)任務(wù)數(shù)量
對于 (K, V) 的數(shù)據(jù)集進(jìn)行操作纯路,返回同樣是(K, V)類型的數(shù)據(jù)集或油,其中K實現(xiàn)了Ordered
trait,也就是可以排序驰唬。
val pa = sc.parallelize(Array("b" -> 1,"d" ->2,"a" ->3, "c" -> 4))
pa.sortByKey().collect
結(jié)果:
res139: Array[(String, Int)] = Array((a,3), (b,1), (c,4), (d,2))
join(otherDataset, [numTasks])
兩個集合的內(nèi)積顶岸,對應(yīng)數(shù)據(jù)庫里的inner join
。
對 一個(K, V)和 (K, W)類型的數(shù)據(jù)集操作叫编,返回 (K, (V, W)) 類型的數(shù)據(jù)集辖佣。
另外還有對外積的支持:leftOuterJoin
、rightOuterJoin
搓逾、fullOuterJoin
卷谈,與數(shù)據(jù)庫里的相應(yīng)概念相同
看一個例子:
val pa = sc.parallelize(
Array("a" -> 1,
"b" -> 2, "b" -> 3)
)
val pb = sc.parallelize(
Array("b" -> 2, "b" -> 3,
"d" -> 4)
)
內(nèi)積:
scala> pa.join(pb).collect
res140: Array[(String, (Int, Int))] = Array((b,(2,2)), (b,(2,3)), (b,(3,2)), (b,(3,3)))
左外積:
scala> pa.leftOuterJoin(pb).collect
res141: Array[(String, (Int, Option[Int]))] = Array((a,(1,None)), (b,(2,Some(2))), (b,(2,Some(3))), (b,(3,Some(2))), (b,(3,Some(3))))
右外積:
scala> pa.rightOuterJoin(pb).collect
res142: Array[(String, (Option[Int], Int))] = Array((d,(None,4)), (b,(Some(2),2)), (b,(Some(2),3)), (b,(Some(3),2)), (b,(Some(3),3)))
全外積:
scala> pa.fullOuterJoin(pb).collect
res143: Array[(String, (Option[Int], Option[Int]))] = Array((d,(None,Some(4))), (a,(Some(1),None)), (b,(Some(2),Some(2))), (b,(Some(2),Some(3))), (b,(Some(3),Some(2))), (b,(Some(3),Some(3))))
需要注意的是外積結(jié)果的元素變成了Option
類型
cogroup(otherDataset, [numTasks])
對兩個數(shù)據(jù)集分別進(jìn)行分組,然后把分組結(jié)果連接來作為元素霞篡。
對(K, V) 和 (K, W)類型操作世蔗,返回 (K, (Iterable<V>, Iterable<W>)) 類型,別名groupWith朗兵。
val pa = sc.parallelize(Array("a" -> 1, "b" -> 2, "b" -> 3))
val pb = sc.parallelize(Array("a" -> "a", "b" -> "b", "b" -> "c"))
pa.cogroup(pb).collect
結(jié)果:
res152: Array[(String, (Iterable[Int], Iterable[String]))] = Array((a,(CompactBuffer(1),CompactBuffer(a))), (b,(CompactBuffer(2, 3),CompactBuffer(b, c))))
cartesian(otherDataset)
對兩個集合求笛卡爾積污淋,對T 和 U類型操作,返回 (T, U)類型矛市。
val pa = sc.parallelize(Array(1,2))
val pb = sc.parallelize(Array(3,4))
pa.cartesian(pb).collect
結(jié)果:
res156: Array[(Int, Int)] = Array((1,3), (1,4), (2,3), (2,4))
pipe(command, [envVars])
把每個分區(qū)輸出到stdin芙沥,然后執(zhí)行命令,最后讀回stdout浊吏,以每行為元素而昨,生成新的RDD。注意這里執(zhí)行命令的單位是分區(qū)找田,不是元素歌憨。
在/home/zeta/目錄新建一個腳本test.sh:
#!/bin/bash
while read LINE; do
echo e${LINE}
done
然后spark-shell里執(zhí)行:
val pa = sc.parallelize(Array(1,2,3,4,5,6))
pa.pipe("/home/zeta/test.sh").collect
結(jié)果:
res181: Array[String] = Array(e1, e2, e3, e4, e5, e6)
coalesce(numPartitions)
將RDD的分區(qū)數(shù)量減少到numPartitions
個,在對一個大數(shù)據(jù)集進(jìn)行filter
操作之后墩衙,調(diào)用一下減少分區(qū)數(shù)量可以提高效率务嫡。
repartition(numPartitions)
隨機(jī)打亂RDD內(nèi)全部分區(qū)的數(shù)據(jù),并且平衡一下漆改。
actions
collect()
以數(shù)組的形式返回集合內(nèi)的所有元素
count()
返回數(shù)據(jù)集內(nèi)的元素個數(shù)
foreach(func)
對數(shù)據(jù)集的每個元素執(zhí)行func心铃。
注意幾點:
- 副作用:分布式情況下,每個executor擁有自己的執(zhí)行空間挫剑,所以變量不是全局共享的去扣,對變量的副作用將導(dǎo)致未定義行為。這時候應(yīng)該使用 Accumulator樊破。
- 閉包:另外注意一些閉包引用的問題Understanding closures
first()
返回數(shù)據(jù)集內(nèi)的第一個元素
take(n)
以數(shù)組的形式返回數(shù)據(jù)集內(nèi)前n個元素
reduce(func)
reduce操作愉棱,為了在并行計算下可以得到正確結(jié)果唆铐,這個函數(shù)要滿足交換律和結(jié)合律,也就是數(shù)據(jù)集和這個運算必須構(gòu)成一個交換群奔滑。
val pa = sc.parallelize(Array(1,2,3))
pa.reduce( (x,y) => x+y )
結(jié)果:
res184: Int = 6
aggregate(zeroValue)(seqOp, combOp)
這個函數(shù)的操作流程可以看作兩步
- 在RDD的各個分區(qū)內(nèi)調(diào)用
seqOp
操作進(jìn)行折疊艾岂,它類似于fold - 調(diào)用
combOp
將各個分區(qū)的結(jié)果聚合起來
它的簽名是這樣的:
def aggregate[U](zeroValue: U)(
seqOp: (U, Int) => U,
combOp: (U, U) => U
)
要解釋太麻煩了。朋其。王浴。總之就是在各個分區(qū)內(nèi)fold一下令宿,再將結(jié)果聚合叼耙。
例子:
val pa = sc.parallelize(Array(1,2,3,4))
def seqOp(x:String,y:Int) = x+y.toString
def combOp(x:String,y:String) = x+y
pa.aggregate("")(seqOp,combOp)
由于聚合時分區(qū)的順序是不一定的,所以上面代碼的執(zhí)行結(jié)果也是不確定的:
scala> pa.aggregate("")(seqOp,combOp)
res201: String = 2413
scala> pa.aggregate("")(seqOp,combOp)
res202: String = 1234
takeSample(withReplacement, num, seed)
隨機(jī)取num
個樣本粒没,
- withReplacement: 抽樣后是否將元素放回
- num: 抽樣個數(shù)
val pa = sc.parallelize(Array(1,2,3,4,5,6))
pa.takeSample(false,3)
隨機(jī)結(jié)果:
res185: Array[Int] = Array(3, 2, 5)
takeOrdered[T](n:Int)(implicit ord: Ordering[T])
返回排序后排前n的元素,第二個隱式參數(shù)ordering
簇爆,編譯器會自行尋找癞松,也可由用戶自定義。
val pa = sc.parallelize(Array(1,2,3,4,5,6))
pa.takeOrdered(3)
結(jié)果:
res193: Array[Int] = Array(1, 2, 3)
嘗試自定義一個比較器:
object Ord extends Ordering[Int] {
override def compare(x:Int,y:Int):Int = {
if(x<y) 1 else -1;
}
}
val pa = sc.parallelize(Array(1,2,3,4,5,6))
pa.takeOrdered(3)(Ord)
這次結(jié)果變成了:
res195: Array[Int] = Array(6, 5, 4)
countByKey()
對鍵值對類型RDD有效入蛆,統(tǒng)計每個鍵對應(yīng)的元素個數(shù)响蓉。
saveAsTextFile(path)
每個元素作為一行,寫入一個文本文件(或一系列文本文件)哨毁。由參數(shù)path
指定寫入的目錄枫甲,支持本地文件系統(tǒng)、HDFS以及其它任何Hadoop支持的文件系統(tǒng)扼褪。
saveAsSequenceFile(path)
支持Java和Scala)想幻,將所有元素寫入一個 Hadoop SequenceFile, 支持 本地文件系統(tǒng) 话浇、HDFS 和 Hadoop支持的任何文件系統(tǒng)脏毯。
只有實現(xiàn) HadoopWritable
接口的鍵值對類型的RDD支持此操作。
在Scala里, 可以隱式轉(zhuǎn)換到Writable
的類型也支持這個操作幔崖, (Spark對基本類型Int, Double, String等都寫好了隱式轉(zhuǎn)換)食店。
saveAsObjectFile(path)
使用Java的序列化格式序列化對象,支持Java 和 Scala赏寇,要加載回來的話使用 SparkContext.objectFile()吉嫩。