layout: pospost
title: Spark算子
date: 2017-02-27 16:03:52
tags: [Spark算子,Spark,大數(shù)據(jù)]
categories: "大數(shù)據(jù)"
RDD創(chuàng)建操作
- parallelize
def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit arg0: ClassTag[T]): RDD[T]
? 從一個(gè)Seq集合創(chuàng)建 RDD。
參數(shù)1:Seq集合,必須齿风。
參數(shù)2:分區(qū)數(shù),默認(rèn)為該Application分配到的資源的CPU核數(shù)( Spark will run one task for each partition of the cluster. )
scala> val data = Array(1,2,3,4,5,6);
data: Array[Int] = Array(1, 2, 3, 4, 5, 6)
scala> val num = sc.parallelize(data);
num: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:26
scala> num.collect();
res1: Array[Int] = Array(1, 2, 3, 4, 5, 6)
scala> val num1 = sc.parallelize(1 to 10);
num1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24
scala> num1.collect();
res2: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> num1.partitions.size
res3: Int = 4
scala> val num2 = sc.parallelize(1 to 10 ,2);
num2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24
scala> num2.partitions.size
res4: Int = 2
- makeRDD (跳過(guò))
與parallelize一模一樣
從外部存儲(chǔ)創(chuàng)建RDD
- textFile
從hdfs文件創(chuàng)建
scala> val textFile = sc.textFile("/user/test/1.txt");
textFile: org.apache.spark.rdd.RDD[String] = /user/test/1.txt MapPartitionsRDD[12] at textFile at <console>:24
scala> textFile.collect();
res6: Array[String] = Array(Hello World, Hello Yang, Hello SCALA)
scala> textFile.count();
res7: Long = 3
從其他HDFS文件格式創(chuàng)建 (跳過(guò))
- hadoopFile
- sequenceFile
- objectFile
- newAPIHadoopFile
從Hadoop接口API創(chuàng)建 (跳過(guò))
- hadoopRDD
- newAPIHadoopRDD
RDD基本轉(zhuǎn)換操作(1) –map胶哲、flatMap、distinct
數(shù)據(jù)準(zhǔn)備
hadoop@hzbxs-bigdata16:~/hadoop-hp2$ bin/hadoop dfs -copyFromLocal ~/test/1.txt /user/test/
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.
hadoop@hzbxs-bigdata16:~/hadoop-hp2$ cat ~/test/1.txt
Hello World
Hello Yang
Hello SCALA
hadoop@hzbxs-bigdata16:~/hadoop-hp2$
- map
將一個(gè)RDD中的每個(gè)數(shù)據(jù)項(xiàng),通過(guò)map中的函數(shù)映射變?yōu)橐粋€(gè)新的元素。輸入分區(qū)與輸出分區(qū)一對(duì)一党窜,即:有多少個(gè)輸入分區(qū),就有多少個(gè)輸出分區(qū)借宵。
//從HDFS從加載文件
scala> val textFile = sc.textFile("/user/test/1.txt");
textFile: org.apache.spark.rdd.RDD[String] = /user/test/1.txt MapPartitionsRDD[13] at textFile at <console>:24
//打印 textFile 內(nèi)容
scala> textFile.collect();
res7: Array[String] = Array(Hello World, Hello Yang, Hello SCALA)
// map 算子
scala> val mapdata = textFile.map(line => line.split("\\s+"));
mapdata: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[14] at map at <console>:26
//打印data 內(nèi)容
scala> mapdata.collect();
res8: Array[Array[String]] = Array(Array(Hello, World), Array(Hello, Yang), Array(Hello, SCALA))
- flatMap
與map類(lèi)似幌衣,但是最后會(huì)將結(jié)果合并到一個(gè)分區(qū)。
scala> val flatmapdata = textFile.flatMap(line => line.split("\\s+"));
flatmapdata: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at flatMap at <console>:26
scala> flatmapdata.collect();
res10: Array[String] = Array(Hello, World, Hello, Yang, Hello, SCALA)
- distinct
對(duì)元素去重
scala> val flatdistincedata = textFile.flatMap(line => line.split("\\s+")).distinct();
flatdistincedata: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at distinct at <console>:26
scala> flatdistincedata.collect();
res0: Array[String] = Array(Hello, SCALA, World, Yang)
RDD基本轉(zhuǎn)換操作(2)–coalesce壤玫、repartition
- coalesce
def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T]
將RDD的分區(qū)數(shù)量重新調(diào)整為numPartitions個(gè),如果shuffle為true泼掠,分區(qū)數(shù)量可以大于原值怔软。
scala> var data = sc.textFile("/user/test/1.txt");
data: org.apache.spark.rdd.RDD[String] = /user/test/1.txt MapPartitionsRDD[16] at textFile at <console>:24
scala> data.collect();
res9: Array[String] = Array(Hello World, Hello Yang, Hello SCALA)
scala> data.partitions.size;
res11: Int = 2
scala> var newdata = data.coalesce(1);
newdata: org.apache.spark.rdd.RDD[String] = CoalescedRDD[17] at coalesce at <console>:26
scala> newdata.collect();
res12: Array[String] = Array(Hello World, Hello Yang, Hello SCALA)
scala> newdata.partitions.size;
res13: Int = 1
// shuttle 默認(rèn)為true,新分區(qū)數(shù)量大于原分區(qū)值择镇,無(wú)效
scala> var newdata = data.coalesce(3);
newdata: org.apache.spark.rdd.RDD[String] = CoalescedRDD[18] at coalesce at <console>:26
scala> newdata.partitions.size;
res14: Int = 2
// shuttle 默認(rèn)為false挡逼,新分區(qū)數(shù)量可以大于原分區(qū)值,有效
scala> var newdata = data.coalesce(3,true);
newdata: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[22] at coalesce at <console>:26
scala> newdata.partitions.size;
res15: Int = 3
- repartition
def repartition(numPartitions: Int)(implicit ord: Ordering[String]): org.apache.spark.rdd.RDD[String]
相當(dāng)于 coalesce 的shuttle 為true
scala> var newdata1 = data.repartition(3);
newdata1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[26] at repartition at <console>:26
scala> newdata1.partition
partitioner partitions
scala> newdata1.partitions.size
res16: Int = 3
RDD基本轉(zhuǎn)換操作(3)–union(并集)、intersection(交集)腻豌、subtract(差集)
- union
def union(other: RDD[T]): RDD[T]
將兩個(gè)RDD 合并
scala> var rdd1 = sc.makeRDD(1 to 5,1);
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[28] at makeRDD at <console>:24
scala> var rdd2 = sc.makeRDD(6 to 10,2);
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[29] at makeRDD at <console>:24
scala> rdd1.union(rdd2).collect();
res26: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
- intersetion
def intersection(other: org.apache.spark.rdd.RDD[Int],numPartitions: Int): org.apache.spark.rdd.RDD[Int]
def intersection(other: org.apache.spark.rdd.RDD[Int],partitioner: org.apache.spark.Partitioner)(implicit ord: Ordering[Int]): org.apache.spark.rdd.RDD[Int]
def intersection(other: org.apache.spark.rdd.RDD[Int]): org.apache.spark.rdd.RDD[Int]
求兩個(gè)RDD的交集家坎,可以指定返回的RDD的分區(qū)數(shù)或者指定分區(qū)函數(shù)。不去重
scala> var rdd1 = sc.makeRDD(1 to 5,1);
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[31] at makeRDD at <console>:24
scala> var rdd2 = sc.makeRDD(3 to 7,1);
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[32] at makeRDD at <console>:24
scala> rdd1.intersection(rdd2).collect();
res28: Array[Int] = Array(4, 3, 5)
scala> var rdd3 = rdd1.intersection(rdd2,4);
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[50] at intersection at <console>:28
scala> rdd3.partitions.size
res31: Int = 4
- substract
def subtract(other: RDD[T]): RDD[T]
def subtract(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
返回在RDD中出現(xiàn)吝梅,并且不在otherRDD中出現(xiàn)的元素虱疏,不去重。
scala> var rdd1 = sc.makeRDD(Seq(1,1,2,2,3,3));
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[51] at makeRDD at <console>:24
scala> var rdd2 = sc.makeRDD(Seq(3,3,4,4,5,5));
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[52] at makeRDD at <console>:24
scala> rdd1.subtract(rdd2).collect();
res32: Array[Int] = Array(1, 1, 2, 2)
RDD基本轉(zhuǎn)換操作(4)-mapPartitions 苏携、mapPartitionsWithIndex
- mapPartitions
mapPartitions(func)
Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator< T > => Iterator< U > when running on an RDD of type T.
跟map類(lèi)似做瞪,區(qū)別在于mapPartitions作用在RDD的每一個(gè)分區(qū)上,func需要實(shí)現(xiàn)從 Iterator< T > 到Iterator< U > 的轉(zhuǎn)換右冻。
scala> var rdd = sc.makeRDD(1 to 10,4);
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[57] at makeRDD at <console>:24
scala> var rdd1 = rdd.mapPartitions{ x => {
| var result = List[Int]()
| var i = 0;
| while (x.hasNext){
| i += x.next()
| }
| result.::(i).iterator
| }}
rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[58] at mapPartitions at <console>:26
scala> rdd1.collect
res33: Array[Int] = Array(3, 12, 13, 27)
scala> rdd.partitions.size
res34: Int = 4
- mapPartitionsWithIndex
mapPartitionsWithIndex(func)
Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator< T >) => Iterator< U > when running on an RDD of type T.
跟mapPartitions 類(lèi)似装蓬,區(qū)別在于 func需要實(shí)現(xiàn)(Int, Iterator< T >) 到 Iterator< U > 的類(lèi)型轉(zhuǎn)換,第一個(gè)整數(shù)代表分區(qū)的下標(biāo)
scala> var rdd1 = rdd.mapPartitionsWithIndex{ (index,x) => {
| var result = List[String]()
| var i = 0
| while (x.hasNext){
| i+=x.next()
| }
| result.::(index+"|"+i).iterator
| }
| }
rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[60] at mapPartitionsWithIndex at <console>:26
scala> rdd1.collect
res38: Array[String] = Array(0|3, 1|12, 2|13, 3|27)
RDD基本轉(zhuǎn)換操作(5)
- groupByKey
**def groupByKey(): RDD[(K, Iterable[V])] **
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] // 指定分區(qū)數(shù)
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs.
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance.
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.
scala> var arr = Array(("1","YANG"),("1","YANG1"),("2","HELLO"),("2","HELLO2"),("3","WORLD"));
arr: Array[(String, String)] = Array((1,YANG), (1,YANG1), (2,HELLO), (2,HELLO2), (3,WORLD))
scala> var rdd = sc.makeRDD(arr);
rdd: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[61] at makeRDD at <console>:26
scala> var rdd1 = rdd.groupByKey();
rdd1: org.apache.spark.rdd.RDD[(String, Iterable[String])] = ShuffledRDD[62] at groupByKey at <console>:28
scala> rdd1.collect
res39: Array[(String, Iterable[String])] = Array((1,CompactBuffer(YANG, YANG1)), (2,CompactBuffer(HELLO, HELLO2)), (3,CompactBuffer(WORLD)))
- reduceByKey
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
scala> var rdd = sc.makeRDD(Array(("A",1),("A",2),("A",3),("B",3),("B",3)));
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[63] at makeRDD at <console>:24
scala> var rdd1 = rdd.reduceByKey((x,y) => x+y)
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[64] at reduceByKey at <console>:26
scala> rdd1.collect
res40: Array[(String, Int)] = Array((A,6), (B,6))
scala> rdd.partitions.size
res41: Int = 4
scala> rdd1.partitions.size
res42: Int = 4
scala> var rdd1 = rdd.reduceByKey((x,y) => x+y,10);
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[65] at reduceByKey at <console>:26
scala> rdd1.collect
res43: Array[(String, Int)] = Array((A,6), (B,6))
scala> rdd1.partitions.size
res44: Int = 10
- reduceByKeyLocally
類(lèi)似與reduceByKey 纱扭,但是返回的不是RDD,而是Map
scala> rdd.collect
res45: Array[(String, Int)] = Array((A,1), (A,2), (A,3), (B,3), (B,3))
scala> rdd.reduceByKeyLocally((x,y) => x+y)
res46: scala.collection.Map[String,Int] = Map(B -> 6, A -> 6)