Spark算子


layout: pospost
title: Spark算子
date: 2017-02-27 16:03:52
tags: [Spark算子,Spark,大數(shù)據(jù)]
categories: "大數(shù)據(jù)"


RDD創(chuàng)建操作

  • parallelize

def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit arg0: ClassTag[T]): RDD[T]
? 從一個(gè)Seq集合創(chuàng)建 RDD。
參數(shù)1:Seq集合,必須齿风。
參數(shù)2:分區(qū)數(shù),默認(rèn)為該Application分配到的資源的CPU核數(shù)( Spark will run one task for each partition of the cluster. )

scala> val data = Array(1,2,3,4,5,6);
data: Array[Int] = Array(1, 2, 3, 4, 5, 6)

scala> val num = sc.parallelize(data);
num: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:26

scala> num.collect();
res1: Array[Int] = Array(1, 2, 3, 4, 5, 6)

scala> val num1 = sc.parallelize(1 to 10);
num1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24

scala> num1.collect();
res2: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> num1.partitions.size
res3: Int = 4

scala> val num2 = sc.parallelize(1 to 10 ,2);
num2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24

scala> num2.partitions.size
res4: Int = 2

  • makeRDD (跳過(guò))

與parallelize一模一樣

從外部存儲(chǔ)創(chuàng)建RDD

  • textFile

從hdfs文件創(chuàng)建

scala> val textFile = sc.textFile("/user/test/1.txt");
textFile: org.apache.spark.rdd.RDD[String] = /user/test/1.txt MapPartitionsRDD[12] at textFile at <console>:24

scala> textFile.collect();
res6: Array[String] = Array(Hello World, Hello Yang, Hello SCALA)

scala> textFile.count();
res7: Long = 3

從其他HDFS文件格式創(chuàng)建 (跳過(guò))

  • hadoopFile
  • sequenceFile
  • objectFile
  • newAPIHadoopFile

從Hadoop接口API創(chuàng)建 (跳過(guò))

  • hadoopRDD
  • newAPIHadoopRDD

RDD基本轉(zhuǎn)換操作(1) –map胶哲、flatMap、distinct

數(shù)據(jù)準(zhǔn)備

hadoop@hzbxs-bigdata16:~/hadoop-hp2$ bin/hadoop dfs -copyFromLocal ~/test/1.txt  /user/test/
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

hadoop@hzbxs-bigdata16:~/hadoop-hp2$ cat ~/test/1.txt 
Hello World
Hello Yang
Hello SCALA
hadoop@hzbxs-bigdata16:~/hadoop-hp2$ 
  • map

將一個(gè)RDD中的每個(gè)數(shù)據(jù)項(xiàng),通過(guò)map中的函數(shù)映射變?yōu)橐粋€(gè)新的元素。輸入分區(qū)與輸出分區(qū)一對(duì)一党窜,即:有多少個(gè)輸入分區(qū),就有多少個(gè)輸出分區(qū)借宵。

//從HDFS從加載文件
scala> val textFile = sc.textFile("/user/test/1.txt");
textFile: org.apache.spark.rdd.RDD[String] = /user/test/1.txt MapPartitionsRDD[13] at textFile at <console>:24
//打印 textFile 內(nèi)容
scala> textFile.collect();
res7: Array[String] = Array(Hello World, Hello Yang, Hello SCALA)               
// map 算子
scala> val mapdata = textFile.map(line => line.split("\\s+"));
mapdata: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[14] at map at <console>:26
//打印data 內(nèi)容
scala> mapdata.collect();
res8: Array[Array[String]] = Array(Array(Hello, World), Array(Hello, Yang), Array(Hello, SCALA))
  • flatMap

與map類(lèi)似幌衣,但是最后會(huì)將結(jié)果合并到一個(gè)分區(qū)。

scala> val flatmapdata = textFile.flatMap(line => line.split("\\s+"));
flatmapdata: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at flatMap at <console>:26

scala> flatmapdata.collect();
res10: Array[String] = Array(Hello, World, Hello, Yang, Hello, SCALA)

  • distinct

對(duì)元素去重

scala> val flatdistincedata = textFile.flatMap(line => line.split("\\s+")).distinct();
flatdistincedata: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at distinct at <console>:26

scala> flatdistincedata.collect();
res0: Array[String] = Array(Hello, SCALA, World, Yang)

RDD基本轉(zhuǎn)換操作(2)–coalesce壤玫、repartition

  • coalesce

def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T]
將RDD的分區(qū)數(shù)量重新調(diào)整為numPartitions個(gè),如果shuffle為true泼掠,分區(qū)數(shù)量可以大于原值怔软。

scala> var data = sc.textFile("/user/test/1.txt");
data: org.apache.spark.rdd.RDD[String] = /user/test/1.txt MapPartitionsRDD[16] at textFile at <console>:24

scala> data.collect();
res9: Array[String] = Array(Hello World, Hello Yang, Hello SCALA)

scala> data.partitions.size;
res11: Int = 2

scala> var newdata = data.coalesce(1);
newdata: org.apache.spark.rdd.RDD[String] = CoalescedRDD[17] at coalesce at <console>:26

scala> newdata.collect();
res12: Array[String] = Array(Hello World, Hello Yang, Hello SCALA)

scala> newdata.partitions.size;
res13: Int = 1

// shuttle 默認(rèn)為true,新分區(qū)數(shù)量大于原分區(qū)值择镇,無(wú)效
scala> var newdata = data.coalesce(3);  
newdata: org.apache.spark.rdd.RDD[String] = CoalescedRDD[18] at coalesce at <console>:26

scala> newdata.partitions.size;
res14: Int = 2

// shuttle 默認(rèn)為false挡逼,新分區(qū)數(shù)量可以大于原分區(qū)值,有效
scala> var newdata = data.coalesce(3,true);
newdata: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[22] at coalesce at <console>:26

scala> newdata.partitions.size;
res15: Int = 3

  • repartition

def repartition(numPartitions: Int)(implicit ord: Ordering[String]): org.apache.spark.rdd.RDD[String]
相當(dāng)于 coalesce 的shuttle 為true

scala> var newdata1 = data.repartition(3);
newdata1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[26] at repartition at <console>:26

scala> newdata1.partition
partitioner   partitions

scala> newdata1.partitions.size
res16: Int = 3

RDD基本轉(zhuǎn)換操作(3)–union(并集)、intersection(交集)腻豌、subtract(差集)

  • union

def union(other: RDD[T]): RDD[T]
將兩個(gè)RDD 合并

scala> var rdd1 = sc.makeRDD(1 to 5,1);
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[28] at makeRDD at <console>:24
scala> var rdd2 = sc.makeRDD(6 to 10,2);
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[29] at makeRDD at <console>:24
scala> rdd1.union(rdd2).collect();
res26: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

  • intersetion

def intersection(other: org.apache.spark.rdd.RDD[Int],numPartitions: Int): org.apache.spark.rdd.RDD[Int]
def intersection(other: org.apache.spark.rdd.RDD[Int],partitioner: org.apache.spark.Partitioner)(implicit ord: Ordering[Int]): org.apache.spark.rdd.RDD[Int]
def intersection(other: org.apache.spark.rdd.RDD[Int]): org.apache.spark.rdd.RDD[Int]
求兩個(gè)RDD的交集家坎,可以指定返回的RDD的分區(qū)數(shù)或者指定分區(qū)函數(shù)。不去重

scala> var rdd1 = sc.makeRDD(1 to 5,1);
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[31] at makeRDD at <console>:24
scala> var rdd2 = sc.makeRDD(3 to 7,1);
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[32] at makeRDD at <console>:24
scala> rdd1.intersection(rdd2).collect();
res28: Array[Int] = Array(4, 3, 5)
scala> var rdd3 = rdd1.intersection(rdd2,4);
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[50] at intersection at <console>:28
scala> rdd3.partitions.size
res31: Int = 4
  • substract

def subtract(other: RDD[T]): RDD[T]
def subtract(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
返回在RDD中出現(xiàn)吝梅,并且不在otherRDD中出現(xiàn)的元素虱疏,不去重

scala> var rdd1 = sc.makeRDD(Seq(1,1,2,2,3,3));
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[51] at makeRDD at <console>:24

scala> var rdd2 = sc.makeRDD(Seq(3,3,4,4,5,5));
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[52] at makeRDD at <console>:24

scala> rdd1.subtract(rdd2).collect();
res32: Array[Int] = Array(1, 1, 2, 2)

RDD基本轉(zhuǎn)換操作(4)-mapPartitions 苏携、mapPartitionsWithIndex

  • mapPartitions

mapPartitions(func)
Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator< T > => Iterator< U > when running on an RDD of type T.
跟map類(lèi)似做瞪,區(qū)別在于mapPartitions作用在RDD的每一個(gè)分區(qū)上,func需要實(shí)現(xiàn)從 Iterator< T > 到Iterator< U > 的轉(zhuǎn)換右冻。

scala> var rdd = sc.makeRDD(1 to 10,4);
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[57] at makeRDD at <console>:24
scala> var rdd1 = rdd.mapPartitions{ x => {
     | var result = List[Int]()
     |     var i = 0;
     |     while (x.hasNext){
     |        i += x.next()
     |     }
     |     result.::(i).iterator
     | }}
rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[58] at mapPartitions at <console>:26

scala> rdd1.collect
res33: Array[Int] = Array(3, 12, 13, 27)

scala> rdd.partitions.size
res34: Int = 4

  • mapPartitionsWithIndex

mapPartitionsWithIndex(func)
Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator< T >) => Iterator< U > when running on an RDD of type T.
跟mapPartitions 類(lèi)似装蓬,區(qū)別在于 func需要實(shí)現(xiàn)(Int, Iterator< T >) 到 Iterator< U > 的類(lèi)型轉(zhuǎn)換,第一個(gè)整數(shù)代表分區(qū)的下標(biāo)

scala> var rdd1 = rdd.mapPartitionsWithIndex{ (index,x) => {
     |     var result = List[String]()
     |     var i = 0
     |     while (x.hasNext){
     |        i+=x.next()
     |     }
     |     result.::(index+"|"+i).iterator
     |  }
     | }
rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[60] at mapPartitionsWithIndex at <console>:26

scala> rdd1.collect
res38: Array[String] = Array(0|3, 1|12, 2|13, 3|27)

RDD基本轉(zhuǎn)換操作(5)

  • groupByKey

**def groupByKey(): RDD[(K, Iterable[V])] **
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] // 指定分區(qū)數(shù)
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs.
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance.
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.

scala> var arr = Array(("1","YANG"),("1","YANG1"),("2","HELLO"),("2","HELLO2"),("3","WORLD"));
arr: Array[(String, String)] = Array((1,YANG), (1,YANG1), (2,HELLO), (2,HELLO2), (3,WORLD))

scala> var rdd = sc.makeRDD(arr);
rdd: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[61] at makeRDD at <console>:26

scala> var rdd1 = rdd.groupByKey();
rdd1: org.apache.spark.rdd.RDD[(String, Iterable[String])] = ShuffledRDD[62] at groupByKey at <console>:28

scala> rdd1.collect
res39: Array[(String, Iterable[String])] = Array((1,CompactBuffer(YANG, YANG1)), (2,CompactBuffer(HELLO, HELLO2)), (3,CompactBuffer(WORLD)))

  • reduceByKey

def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

scala> var rdd = sc.makeRDD(Array(("A",1),("A",2),("A",3),("B",3),("B",3)));
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[63] at makeRDD at <console>:24

scala> var rdd1 = rdd.reduceByKey((x,y) => x+y)
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[64] at reduceByKey at <console>:26

scala> rdd1.collect
res40: Array[(String, Int)] = Array((A,6), (B,6))

scala> rdd.partitions.size
res41: Int = 4

scala> rdd1.partitions.size
res42: Int = 4

scala> var rdd1 = rdd.reduceByKey((x,y) => x+y,10);
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[65] at reduceByKey at <console>:26

scala> rdd1.collect
res43: Array[(String, Int)] = Array((A,6), (B,6))                               

scala> rdd1.partitions.size
res44: Int = 10
  • reduceByKeyLocally

類(lèi)似與reduceByKey 纱扭,但是返回的不是RDD,而是Map

scala> rdd.collect
res45: Array[(String, Int)] = Array((A,1), (A,2), (A,3), (B,3), (B,3))

scala> rdd.reduceByKeyLocally((x,y) => x+y)
res46: scala.collection.Map[String,Int] = Map(B -> 6, A -> 6)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末牍帚,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子乳蛾,更是在濱河造成了極大的恐慌暗赶,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,997評(píng)論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件肃叶,死亡現(xiàn)場(chǎng)離奇詭異蹂随,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)因惭,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,603評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門(mén)糙及,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人筛欢,你說(shuō)我怎么就攤上這事浸锨。” “怎么了版姑?”我有些...
    開(kāi)封第一講書(shū)人閱讀 163,359評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵柱搜,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我剥险,道長(zhǎng)聪蘸,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,309評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮健爬,結(jié)果婚禮上控乾,老公的妹妹穿的比我還像新娘。我一直安慰自己娜遵,他們只是感情好蜕衡,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,346評(píng)論 6 390
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著设拟,像睡著了一般慨仿。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上纳胧,一...
    開(kāi)封第一講書(shū)人閱讀 51,258評(píng)論 1 300
  • 那天镰吆,我揣著相機(jī)與錄音,去河邊找鬼跑慕。 笑死万皿,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的核行。 我是一名探鬼主播牢硅,決...
    沈念sama閱讀 40,122評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼钮科!你這毒婦竟也來(lái)了唤衫?” 一聲冷哼從身側(cè)響起婆赠,我...
    開(kāi)封第一講書(shū)人閱讀 38,970評(píng)論 0 275
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤绵脯,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后休里,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體蛆挫,經(jīng)...
    沈念sama閱讀 45,403評(píng)論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,596評(píng)論 3 334
  • 正文 我和宋清朗相戀三年妙黍,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了悴侵。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,769評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡拭嫁,死狀恐怖可免,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情做粤,我是刑警寧澤浇借,帶...
    沈念sama閱讀 35,464評(píng)論 5 344
  • 正文 年R本政府宣布,位于F島的核電站怕品,受9級(jí)特大地震影響妇垢,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,075評(píng)論 3 327
  • 文/蒙蒙 一闯估、第九天 我趴在偏房一處隱蔽的房頂上張望灼舍。 院中可真熱鬧,春花似錦涨薪、人聲如沸骑素。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,705評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)砂豌。三九已至,卻和暖如春光督,著一層夾襖步出監(jiān)牢的瞬間阳距,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,848評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工结借, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留筐摘,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,831評(píng)論 2 370
  • 正文 我出身青樓船老,卻偏偏與公主長(zhǎng)得像咖熟,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子柳畔,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,678評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容