1. Value 型
(1) 輸入骆膝、輸出分區(qū) 一對(duì)一
-
map
// 將數(shù)據(jù)逐個(gè)迭代,生成新的值或鍵值對(duì) val rdd1 = sc.parallelize(1 to 3, 3) rdd1.map(x => (x, 1))
-
flatMap
// 合并每個(gè)分區(qū)中的元素致燥,生成一個(gè)新的集合 val rdd1 = sc.parallelize(1 to 3, 3) rdd1.flatMap(x => x) // => Array( 1, 2, 3 )
-
mapPartitions
// 將每個(gè)分區(qū)進(jìn)行一次map val rdd1 = spark.sparkContext.parallelize(1 to 9, 3) rdd1.mapPartitions{iter => val list = ListBuffer[Int]() while(iter.hasNext){ val item = iter.next() list.append(item * 2) } list.toIterator // 返回迭代器 }.collect().foreach(println)
-
glom
// 分區(qū) => 數(shù)組 val rdd1 = sc.parallelize(1 to 3, 3) rdd1.glom.collect // => Array( Array(1), Array(2), Array(3) )
(2) 輸入莲祸、輸出分區(qū) 多對(duì)一
-
union:并集(不去重)
// 合并多個(gè) RDD rdd1.union(rdd2) // rdd1 + rdd2
-
intersection:交集(去重)
rdd1.intersection(rdd2) // rdd1 * rdd2
-
subtract:差集
rdd1.subtract(rdd2) // rdd1 - rdd2
-
cartesian:笛卡爾積
// 對(duì) RDD 進(jìn)行笛卡爾積計(jì)算 rdd1.cartesian(rdd2)
(3) 輸入酪穿、輸出分區(qū) 多對(duì)多
-
groupBy
// 分組 val rdd1 = sc.parallelize(1 to 9, 3) rdd1.groupBy(x => { if(x % 2 == 0) "偶數(shù)" else "奇數(shù)" })
(4) 輸出為輸入的子集
-
filter
// 按一定條件過濾數(shù)據(jù) val rdd1 = sc.parallelize(1 to 9, 3) rdd1.filter(x => x % 2 == 0)
-
distinct
// 去重 rdd1.distinct
-
sample
// 采樣 val rdd1 = sc.parallelize(1 to 100000, 3) /** * 參數(shù) * (1) withReplacement: Boolean 是否有放回采樣 * (2) fraction: Double 采樣比例 * (3) seed: Long 種子 */ rdd1.sample(false, 0.1, 0L)
-
takeSample
// takeSample 與 sample 不同的是:不使用相對(duì)比例采樣族淮,而是按設(shè)定的個(gè)數(shù)進(jìn)行采樣 rdd1.takeSample(true, 20, 0L)
- Cache型
- cache
- persist
2. Key-Value 型
(1) 輸入辫红、輸出分區(qū)一對(duì)一
-
mapValues
// 對(duì) value 進(jìn)行操作 val rdd1 = sc.parallelize(List("dog","cat","tiger","lion","mouse"), 2) val rdd2 = rdd1.map(x => (x.length, x)) // Array( (3, dog), (3, cat), ... ) rdd2.mapValues(x => "#" + x + "#") // Array( (3, #dog#), (3, #cat#), ... ) // 只有一個(gè)參數(shù)時(shí),可以使用通配符 "_" rdd2.mapValues("#" + _ + "#")
-
zip
// 合并兩個(gè) RDD 為鍵值對(duì) // 注意分區(qū)數(shù)祝辣、元素?cái)?shù)量 必須相同 val rdd1 = sc.parallelize(List("dog","cat","tiger","lion","mouse"), 2) val rdd2 = sc.parallelize(List(1,1,2,3,3), 2) val rdd3 = rdd2.zip(rdd1) // Array((1,dog), (1,cat), (2,tiger), (3,lion), (3,mouse))
(2) 對(duì)單個(gè) RDD 聚集
-
combineByKey
val rdd1 = sc.parallelize(List("dog","cat","tiger","lion","mouse"), 2) val rdd2 = sc.parallelize(List(1,1,2,3,3), 2) val rdd3 = rdd2.zip(rdd1) // Array((1,dog), (1,cat), (2,tiger), (3,lion), (3,mouse)) // 根據(jù) Key 合并 val rdd4 = rdd3.combineByKey( List(_), // createCombiner (x: List[String], y: String) => y::x, // mergeValue (x: List[String], y: List[String]) => x:::y // mergeCombiners ) // Array((3,List(rabbit, cat)), (5,List(bee)), (2,List(dog, bear)))
-
reduceByKey
val rdd1 = sc.parallelize(List("dog","cat","tiger","dog","mouse"), 2) val rdd2 = sc.parallelize(List(1,1,2,3,3), 2) val rdd3 = rdd1.zip(rdd2) // Array((dog,1), (cat,1), (tiger,2), (dog,3), (mouse,3)) // 根據(jù) Key 相減 val rdd4 = rdd3.reduceByKey(_+_) // Array((dog,4), (cat,1), (tiger,2), (mouse,3))
- partitionByKey
(3) 對(duì)兩個(gè) RDD 聚集
Cogroup
-
連接
- join
- leftOutJoin
- rightOutJoin
3. Action 算子
- 無返回值
- foreach
- HDFS 輸出
- saveAsTextFile
- saveAsObjectFile
- 集合贴妻、數(shù)據(jù)類型
- collect
- collectAsMap
- reduceByKeyLocally
- lookup
- count
- top
- reduce
- fold
- aggregate
4. 數(shù)據(jù)加載算子
- 文件讀取
- textFile
- 內(nèi)存生成
- makeRDD
- parallelize