Spark學(xué)習(xí)之路 (三)Spark之RDD(轉(zhuǎn))

一挪凑、RDD的概述

1.1 什么是RDD役衡?

RDD(Resilient Distributed Dataset)叫做彈性分布式數(shù)據(jù)集,是Spark中最基本的數(shù)據(jù)抽象嘁捷,它代表一個(gè)不可變凤类、可分區(qū)、里面的元素可并行計(jì)算的集合普气。RDD具有數(shù)據(jù)流模型的特點(diǎn):自動(dòng)容錯(cuò)谜疤、位置感知性調(diào)度和可伸縮性。RDD允許用戶在執(zhí)行多個(gè)查詢時(shí)顯式地將工作集緩存在內(nèi)存中现诀,后續(xù)的查詢能夠重用工作集夷磕,這極大地提升了查詢速度。

1.2 RDD的屬性

(1)一組分片(Partition)仔沿,即數(shù)據(jù)集的基本組成單位坐桩。對(duì)于RDD來(lái)說(shuō),每個(gè)分片都會(huì)被一個(gè)計(jì)算任務(wù)處理封锉,并決定并行計(jì)算的粒度绵跷。用戶可以在創(chuàng)建RDD時(shí)指定RDD的分片個(gè)數(shù),如果沒(méi)有指定成福,那么就會(huì)采用默認(rèn)值碾局。默認(rèn)值就是程序所分配到的CPU Core的數(shù)目。

(3)RDD之間的依賴關(guān)系奴艾。RDD的每次轉(zhuǎn)換都會(huì)生成一個(gè)新的RDD净当,所以RDD之間就會(huì)形成類(lèi)似于流水線一樣的前后依賴關(guān)系。在部分分區(qū)數(shù)據(jù)丟失時(shí),Spark可以通過(guò)這個(gè)依賴關(guān)系重新計(jì)算丟失的分區(qū)數(shù)據(jù)像啼,而不是對(duì)RDD的所有分區(qū)進(jìn)行重新計(jì)算俘闯。

(3)RDD之間的依賴關(guān)系。RDD的每次轉(zhuǎn)換都會(huì)生成一個(gè)新的RDD忽冻,所以RDD之間就會(huì)形成類(lèi)似于流水線一樣的前后依賴關(guān)系真朗。在部分分區(qū)數(shù)據(jù)丟失時(shí),Spark可以通過(guò)這個(gè)依賴關(guān)系重新計(jì)算丟失的分區(qū)數(shù)據(jù)僧诚,而不是對(duì)RDD的所有分區(qū)進(jìn)行重新計(jì)算遮婶。

(4)一個(gè)Partitioner,即RDD的分片函數(shù)振诬。當(dāng)前Spark中實(shí)現(xiàn)了兩種類(lèi)型的分片函數(shù)蹭睡,一個(gè)是基于哈希的HashPartitioner,另外一個(gè)是基于范圍的RangePartitioner赶么。只有對(duì)于于key-value的RDD肩豁,才會(huì)有Partitioner,非key-value的RDD的Parititioner的值是None辫呻。Partitioner函數(shù)不但決定了RDD本身的分片數(shù)量清钥,也決定了parent RDD Shuffle輸出時(shí)的分片數(shù)量。

(5)一個(gè)列表放闺,存儲(chǔ)存取每個(gè)Partition的優(yōu)先位置(preferred location)祟昭。對(duì)于一個(gè)HDFS文件來(lái)說(shuō),這個(gè)列表保存的就是每個(gè)Partition所在的塊的位置怖侦。按照“移動(dòng)數(shù)據(jù)不如移動(dòng)計(jì)算”的理念篡悟,Spark在進(jìn)行任務(wù)調(diào)度的時(shí)候,會(huì)盡可能地將計(jì)算任務(wù)分配到其所要處理數(shù)據(jù)塊的存儲(chǔ)位置匾寝。

1.3 WordCount粗圖解RDD

image

其中hello.txt

image

二搬葬、RDD的創(chuàng)建方式

2.1 通過(guò)讀取文件生成的

由外部存儲(chǔ)系統(tǒng)的數(shù)據(jù)集創(chuàng)建,包括本地的文件系統(tǒng)艳悔,還有所有Hadoop支持的數(shù)據(jù)集急凰,比如HDFS、Cassandra猜年、HBase等

scala> val file = sc.textFile("/spark/hello.txt")
image

2.2 通過(guò)并行化的方式創(chuàng)建RDD

由一個(gè)已經(jīng)存在的Scala集合創(chuàng)建抡锈。

scala> val array = Array(1,2,3,4,5)
array: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val rdd = sc.parallelize(array)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at parallelize at <console>:26

scala>
image

2.3 其他方式

讀取數(shù)據(jù)庫(kù)等等其他的操作。也可以生成RDD乔外。

RDD可以通過(guò)其他的RDD轉(zhuǎn)換而來(lái)的床三。

三、RDD編程API

Spark支持兩個(gè)類(lèi)型(算子)操作:Transformation和Action

3.1 Transformation

主要做的是就是將一個(gè)已有的RDD生成另外一個(gè)RDD袁稽。Transformation具有l(wèi)azy特性(延遲加載)勿璃。Transformation算子的代碼不會(huì)真正被執(zhí)行。只有當(dāng)我們的程序里面遇到一個(gè)action算子的時(shí)候推汽,代碼才會(huì)真正的被執(zhí)行补疑。這種設(shè)計(jì)讓Spark更加有效率地運(yùn)行。

常用的Transformation:

序號(hào) 轉(zhuǎn)換 含義
1 map(func) 返回一個(gè)新的RDD歹撒,該RDD由每一個(gè)輸入元素經(jīng)過(guò)func函數(shù)轉(zhuǎn)換后組成
2 filter(func) 返回一個(gè)新的RDD莲组,該RDD由經(jīng)過(guò)func函數(shù)計(jì)算后返回值為true的輸入元素組成
3 flatMap(func) 類(lèi)似于map,但是每一個(gè)輸入元素可以被映射為0或多個(gè)輸出元素(所以func應(yīng)該返回一個(gè)序列暖夭,而不是單一元素)
4 mapPartitions(func) 類(lèi)似于map锹杈,但獨(dú)立地在RDD的每一個(gè)分片上運(yùn)行,因此在類(lèi)型為T(mén)的RDD上運(yùn)行時(shí)迈着,func的函數(shù)類(lèi)型必須是Iterator[T] => Iterator[U]
5 mapPartitionsWithIndex(func) 類(lèi)似于mapPartitions竭望,但func帶有一個(gè)整數(shù)參數(shù)表示分片的索引值,因此在類(lèi)型為T(mén)的RDD上運(yùn)行時(shí)裕菠,func的函數(shù)類(lèi)型必須是
6 (Int, Interator[T]) => Iterator[U]sample(withReplacement, fraction, seed) 根據(jù)fraction指定的比例對(duì)數(shù)據(jù)進(jìn)行采樣咬清,可以選擇是否使用隨機(jī)數(shù)進(jìn)行替換,seed用于指定隨機(jī)數(shù)生成器種子
7 union(otherDataset) 對(duì)源RDD和參數(shù)RDD求并集后返回一個(gè)新的RDD
8 intersection(otherDataset) 對(duì)源RDD和參數(shù)RDD求交集后返回一個(gè)新的RDD
9 distinct([numTasks])) 對(duì)源RDD進(jìn)行去重后返回一個(gè)新的RDD
10 groupByKey([numTasks]) 在一個(gè)(K,V)的RDD上調(diào)用奴潘,返回一個(gè)(K, Iterator[V])的RDD
11 reduceByKey(func, [numTasks]) 在一個(gè)(K,V)的RDD上調(diào)用旧烧,返回一個(gè)(K,V)的RDD,使用指定的reduce函數(shù)画髓,將相同key的值聚合到一起掘剪,與groupByKey類(lèi)似,reduce任務(wù)的個(gè)數(shù)可以通過(guò)第二個(gè)可選的參數(shù)來(lái)設(shè)置
12 aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 先按分區(qū)聚合 再總的聚合 每次要跟初始值交流 例如:aggregateByKey(0)(+,+) 對(duì)k/y的RDD進(jìn)行操作
13 sortByKey([ascending], [numTasks]) 在一個(gè)(K,V)的RDD上調(diào)用奈虾,K必須實(shí)現(xiàn)Ordered接口夺谁,返回一個(gè)按照key進(jìn)行排序的(K,V)的RDD
14 sortBy(func,[ascending], [numTasks]) 與sortByKey類(lèi)似,但是更靈活 第一個(gè)參數(shù)是根據(jù)什么排序 第二個(gè)是怎么排序 false倒序 第三個(gè)排序后分區(qū)數(shù) 默認(rèn)與原RDD一樣
15 join(otherDataset, [numTasks]) 在類(lèi)型為(K,V)和(K,W)的RDD上調(diào)用肉微,返回一個(gè)相同key對(duì)應(yīng)的所有元素對(duì)在一起的(K,(V,W))的RDD 相當(dāng)于內(nèi)連接(求交集)
16 cogroup(otherDataset, [numTasks]) 在類(lèi)型為(K,V)和(K,W)的RDD上調(diào)用匾鸥,返回一個(gè)(K,(Iterable<V>,Iterable<W>))類(lèi)型的RDD
17 cartesian(otherDataset) 兩個(gè)RDD的笛卡爾積 的成很多個(gè)K/V
18 pipe(command, [envVars]) 調(diào)用外部程序
19 coalesce(numPartitions) 重新分區(qū) 第一個(gè)參數(shù)是要分多少區(qū),第二個(gè)參數(shù)是否shuffle 默認(rèn)false 少分區(qū)變多分區(qū) true 多分區(qū)變少分區(qū) false
20 repartition(numPartitions) 重新分區(qū) 必須shuffle 參數(shù)是要分多少區(qū) 少變多
21 repartitionAndSortWithinPartitions(partitioner) 重新分區(qū)+排序 比先分區(qū)再排序效率高 對(duì)K/V的RDD進(jìn)行操作
22 foldByKey(zeroValue)(seqOp) 該函數(shù)用于K/V做折疊浪册,合并處理 扫腺,與aggregate類(lèi)似 第一個(gè)括號(hào)的參數(shù)應(yīng)用于每個(gè)V值 第二括號(hào)函數(shù)是聚合例如:+
23 combineByKey 合并相同的key的值 rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
24 partitionBy(partitioner) 對(duì)RDD進(jìn)行分區(qū) partitioner是分區(qū)器 例如new HashPartition(2
25 cache RDD緩存,可以避免重復(fù)計(jì)算從而減少時(shí)間村象,區(qū)別:cache內(nèi)部調(diào)用了persist算子笆环,cache默認(rèn)就一個(gè)緩存級(jí)別MEMORY-ONLY ,而persist則可以選擇緩存級(jí)別
26 persist
27 Subtract(rdd) 返回前rdd元素不在后rdd的rdd
28 leftOuterJoin leftOuterJoin類(lèi)似于SQL中的左外關(guān)聯(lián)left outer join厚者,返回結(jié)果以前面的RDD為主躁劣,關(guān)聯(lián)不上的記錄為空。只能用于兩個(gè)RDD之間的關(guān)聯(lián)库菲,如果要多個(gè)RDD關(guān)聯(lián)账忘,多關(guān)聯(lián)幾次即可。
29 rightOuterJoin rightOuterJoin類(lèi)似于SQL中的有外關(guān)聯(lián)right outer join,返回結(jié)果以參數(shù)中的RDD為主鳖擒,關(guān)聯(lián)不上的記錄為空溉浙。只能用于兩個(gè)RDD之間的關(guān)聯(lián),如果要多個(gè)RDD關(guān)聯(lián)蒋荚,多關(guān)聯(lián)幾次即可
30 subtractByKey substractByKey和基本轉(zhuǎn)換操作中的subtract類(lèi)似只不過(guò)這里是針對(duì)K的戳稽,返回在主RDD中出現(xiàn),并且不在otherRDD中出現(xiàn)的元素

3.2 Action

序號(hào) 動(dòng)作 含義
1 reduce(func) 通過(guò)func函數(shù)聚集RDD中的所有元素期升,這個(gè)功能必須是課交換且可并聯(lián)的
2 collect() 在驅(qū)動(dòng)程序中惊奇,以數(shù)組的形式返回?cái)?shù)據(jù)集的所有元素
3 count() 返回RDD的元素個(gè)數(shù)
4 first() 返回RDD的第一個(gè)元素(類(lèi)似于take(1))
5 take(n) 返回一個(gè)由數(shù)據(jù)集的前n個(gè)元素組成的數(shù)組
6 takeSample(withReplacement,num, [seed]) 返回一個(gè)數(shù)組,該數(shù)組由從數(shù)據(jù)集中隨機(jī)采樣的num個(gè)元素組成播赁,可以選擇是否用隨機(jī)數(shù)替換不足的部分颂郎,seed用于指定隨機(jī)數(shù)生成器種子
7 takeOrdered(n, [ordering])
8 saveAsTextFile(path) 將數(shù)據(jù)集的元素以textfile的形式保存到HDFS文件系統(tǒng)或者其他支持的文件系統(tǒng),對(duì)于每個(gè)元素容为,Spark將會(huì)調(diào)用toString方法乓序,將它裝換為文件中的文本
9 saveAsSequenceFile(path) 將數(shù)據(jù)集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的文件系統(tǒng)舟奠。
10 saveAsObjectFile(path)
11 countByKey() 針對(duì)(K,V)類(lèi)型的RDD竭缝,返回一個(gè)(K,Int)的map,表示每一個(gè)key對(duì)應(yīng)的元素個(gè)數(shù)沼瘫。
12 foreach(func) 在數(shù)據(jù)集的每一個(gè)元素上抬纸,運(yùn)行函數(shù)func進(jìn)行更新。
13 aggregate 先對(duì)分區(qū)進(jìn)行操作耿戚,在總體操作
14 reduceByKeyLocally
15 lookup
16 top
17 fold
18 foreachPartition

觸發(fā)代碼的運(yùn)行湿故,我們一段spark代碼里面至少需要有一個(gè)action操作。

3.3 Spark WordCount代碼編寫(xiě)

使用maven進(jìn)行項(xiàng)目構(gòu)建

(1)使用scala進(jìn)行編寫(xiě)

查看官方網(wǎng)站膜蛔,需要導(dǎo)入2個(gè)依賴包


image

詳細(xì)代碼

SparkWordCountWithScala.scala

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SparkWordCountWithScala {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
    /**
      * 如果這個(gè)參數(shù)不設(shè)置坛猪,默認(rèn)認(rèn)為你運(yùn)行的是集群模式
      * 如果設(shè)置成local代表運(yùn)行的是local模式
      */
    conf.setMaster("local")
    //設(shè)置任務(wù)名
    conf.setAppName("WordCount")
    //創(chuàng)建SparkCore的程序入口
    val sc = new SparkContext(conf)
    //讀取文件 生成RDD
    val file: RDD[String] = sc.textFile("E:\\hello.txt")
    //把每一行數(shù)據(jù)按照,分割
    val word: RDD[String] = file.flatMap(_.split(","))
    //讓每一個(gè)單詞都出現(xiàn)一次
    val wordOne: RDD[(String, Int)] = word.map((_,1))
    //單詞計(jì)數(shù)
    val wordCount: RDD[(String, Int)] = wordOne.reduceByKey(_+_)
    //按照單詞出現(xiàn)的次數(shù) 降序排序
    val sortRdd: RDD[(String, Int)] = wordCount.sortBy(tuple => tuple._2,false)
    //將最終的結(jié)果進(jìn)行保存
    sortRdd.saveAsTextFile("E:\\result")

    sc.stop()
  }

運(yùn)行結(jié)果


image

(2)使用java jdk7進(jìn)行編寫(xiě)

SparkWordCountWithJava7.java

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

public class SparkWordCountWithJava7 {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("WordCount");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> fileRdd = sc.textFile("E:\\hello.txt");

        JavaRDD<String> wordRDD = fileRdd.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String line) throws Exception {
                return Arrays.asList(line.split(",")).iterator();
            }
        });

        JavaPairRDD<String, Integer> wordOneRDD = wordRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<>(word, 1);
            }
        });

        JavaPairRDD<String, Integer> wordCountRDD = wordOneRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer i1, Integer i2) throws Exception {
                return i1 + i2;
            }
        });

        JavaPairRDD<Integer, String> count2WordRDD = wordCountRDD.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(Tuple2<String, Integer> tuple) throws Exception {
                return new Tuple2<>(tuple._2, tuple._1);
            }
        });

        JavaPairRDD<Integer, String> sortRDD = count2WordRDD.sortByKey(false);

        JavaPairRDD<String, Integer> resultRDD = sortRDD.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<Integer, String> tuple) throws Exception {
                return new Tuple2<>(tuple._2, tuple._1);
            }
        });

        resultRDD.saveAsTextFile("E:\\result7");

    }
}

(3)使用java jdk8進(jìn)行編寫(xiě)

lambda表達(dá)式

SparkWordCountWithJava8.java

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;

public class SparkWordCountWithJava8 {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setAppName("WortCount");
        conf.setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<String> fileRDD = sc.textFile("E:\\hello.txt");
        JavaRDD<String> wordRdd = fileRDD.flatMap(line -> Arrays.asList(line.split(",")).iterator());
        JavaPairRDD<String, Integer> wordOneRDD = wordRdd.mapToPair(word -> new Tuple2<>(word, 1));
        JavaPairRDD<String, Integer> wordCountRDD = wordOneRDD.reduceByKey((x, y) -> x + y);
        JavaPairRDD<Integer, String> count2WordRDD = wordCountRDD.mapToPair(tuple -> new Tuple2<>(tuple._2, tuple._1));
        JavaPairRDD<Integer, String> sortRDD = count2WordRDD.sortByKey(false);
        JavaPairRDD<String, Integer> resultRDD = sortRDD.mapToPair(tuple -> new Tuple2<>(tuple._2, tuple._1));
        resultRDD.saveAsTextFile("E:\\result8");

    }

3.4 WordCount執(zhí)行過(guò)程圖

image

四皂股、RDD的寬依賴和窄依賴

4.1 RDD依賴關(guān)系的本質(zhì)內(nèi)幕

由于RDD是粗粒度的操作數(shù)據(jù)集墅茉,每個(gè)Transformation操作都會(huì)生成一個(gè)新的RDD,所以RDD之間就會(huì)形成類(lèi)似流水線的前后依賴關(guān)系呜呐;RDD和它依賴的父RDD(s)的關(guān)系有兩種不同的類(lèi)型就斤,即窄依賴(narrow dependency)和寬依賴(wide dependency)。如圖所示顯示了RDD之間的依賴關(guān)系蘑辑。

image

從圖中可知:

窄依賴:是指每個(gè)父RDD的一個(gè)Partition最多被子RDD的一個(gè)Partition所使用洋机,例如map、filter洋魂、union等操作都會(huì)產(chǎn)生窄依賴绷旗;(獨(dú)生子女)

寬依賴:是指一個(gè)父RDD的Partition會(huì)被多個(gè)子RDD的Partition所使用喜鼓,例如groupByKey、reduceByKey衔肢、sortByKey等操作都會(huì)產(chǎn)生寬依賴庄岖;(超生)

需要特別說(shuō)明的是對(duì)join操作有兩種情況:

(1)圖中左半部分join:如果兩個(gè)RDD在進(jìn)行join操作時(shí),一個(gè)RDD的partition僅僅和另一個(gè)RDD中已知個(gè)數(shù)的Partition進(jìn)行join膀懈,那么這種類(lèi)型的join操作就是窄依賴顿锰,例如圖1中左半部分的join操作(join with inputs co-partitioned)谨垃;

(2)圖中右半部分join:其它情況的join操作就是寬依賴,例如圖1中右半部分的join操作(join with inputs not co-partitioned)启搂,由于是需要父RDD的所有partition進(jìn)行join的轉(zhuǎn)換,這就涉及到了shuffle刘陶,因此這種類(lèi)型的join操作也是寬依賴胳赌。

總結(jié):

在這里我們是從父RDD的partition被使用的個(gè)數(shù)來(lái)定義窄依賴和寬依賴,因此可以用一句話概括下:如果父RDD的一個(gè)Partition被子RDD的一個(gè)Partition所使用就是窄依賴匙隔,否則的話就是寬依賴疑苫。因?yàn)槭谴_定的partition數(shù)量的依賴關(guān)系,所以RDD之間的依賴關(guān)系就是窄依賴纷责;由此我們可以得出一個(gè)推論:即窄依賴不僅包含一對(duì)一的窄依賴捍掺,還包含一對(duì)固定個(gè)數(shù)的窄依賴。

一對(duì)固定個(gè)數(shù)的窄依賴的理解:即子RDD的partition對(duì)父RDD依賴的Partition的數(shù)量不會(huì)隨著RDD數(shù)據(jù)規(guī)模的改變而改變再膳;換句話說(shuō)挺勿,無(wú)論是有100T的數(shù)據(jù)量還是1P的數(shù)據(jù)量,在窄依賴中喂柒,子RDD所依賴的父RDD的partition的個(gè)數(shù)是確定的不瓶,而寬依賴是shuffle級(jí)別的,數(shù)據(jù)量越大灾杰,那么子RDD所依賴的父RDD的個(gè)數(shù)就越多蚊丐,從而子RDD所依賴的父RDD的partition的個(gè)數(shù)也會(huì)變得越來(lái)越多。

4.2 依賴關(guān)系下的數(shù)據(jù)流視圖

image

在spark中艳吠,會(huì)根據(jù)RDD之間的依賴關(guān)系將DAG圖(有向無(wú)環(huán)圖)劃分為不同的階段麦备,對(duì)于窄依賴,由于partition依賴關(guān)系的確定性昭娩,partition的轉(zhuǎn)換處理就可以在同一個(gè)線程里完成凛篙,窄依賴就被spark劃分到同一個(gè)stage中,而對(duì)于寬依賴题禀,只能等父RDD shuffle處理完成后鞋诗,下一個(gè)stage才能開(kāi)始接下來(lái)的計(jì)算。

因此spark劃分stage的整體思路是:從后往前推迈嘹,遇到寬依賴就斷開(kāi)削彬,劃分為一個(gè)stage全庸;遇到窄依賴就將這個(gè)RDD加入該stage中。因此在圖2中RDD C,RDD D,RDD E,RDDF被構(gòu)建在一個(gè)stage中,RDD A被構(gòu)建在一個(gè)單獨(dú)的Stage中,而RDD B和RDD G又被構(gòu)建在同一個(gè)stage中融痛。

在spark中壶笼,Task的類(lèi)型分為2種:ShuffleMapTask和ResultTask;

簡(jiǎn)單來(lái)說(shuō)雁刷,DAG的最后一個(gè)階段會(huì)為每個(gè)結(jié)果的partition生成一個(gè)ResultTask覆劈,即每個(gè)Stage里面的Task的數(shù)量是由該Stage中最后一個(gè)RDD的Partition的數(shù)量所決定的!而其余所有階段都會(huì)生成ShuffleMapTask沛励;之所以稱之為ShuffleMapTask是因?yàn)樗枰獙⒆约旱挠?jì)算結(jié)果通過(guò)shuffle到下一個(gè)stage中责语;也就是說(shuō)上圖中的stage1和stage2相當(dāng)于mapreduce中的Mapper,而ResultTask所代表的stage3就相當(dāng)于mapreduce中的reducer。

在之前動(dòng)手操作了一個(gè)wordcount程序目派,因此可知坤候,Hadoop中MapReduce操作中的Mapper和Reducer在spark中的基本等量算子是map和reduceByKey;不過(guò)區(qū)別在于:Hadoop中的MapReduce天生就是排序的;而reduceByKey只是根據(jù)Key進(jìn)行reduce企蹭,但spark除了這兩個(gè)算子還有其他的算子白筹;因此從這個(gè)意義上來(lái)說(shuō),Spark比Hadoop的計(jì)算算子更為豐富谅摄。

原博客:https://www.cnblogs.com/qingyunzong/p/8899715.html#_label0_0

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末徒河,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子送漠,更是在濱河造成了極大的恐慌顽照,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,509評(píng)論 6 504
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件螺男,死亡現(xiàn)場(chǎng)離奇詭異棒厘,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)下隧,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,806評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門(mén)奢人,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人淆院,你說(shuō)我怎么就攤上這事何乎。” “怎么了土辩?”我有些...
    開(kāi)封第一講書(shū)人閱讀 163,875評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵支救,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我拷淘,道長(zhǎng)各墨,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,441評(píng)論 1 293
  • 正文 為了忘掉前任启涯,我火速辦了婚禮贬堵,結(jié)果婚禮上恃轩,老公的妹妹穿的比我還像新娘。我一直安慰自己黎做,他們只是感情好叉跛,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,488評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著蒸殿,像睡著了一般筷厘。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上宏所,一...
    開(kāi)封第一講書(shū)人閱讀 51,365評(píng)論 1 302
  • 那天酥艳,我揣著相機(jī)與錄音,去河邊找鬼楣铁。 笑死玖雁,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的盖腕。 我是一名探鬼主播,決...
    沈念sama閱讀 40,190評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼浓镜,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼溃列!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起膛薛,我...
    開(kāi)封第一講書(shū)人閱讀 39,062評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤听隐,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后哄啄,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體雅任,經(jīng)...
    沈念sama閱讀 45,500評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,706評(píng)論 3 335
  • 正文 我和宋清朗相戀三年咨跌,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了沪么。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,834評(píng)論 1 347
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡锌半,死狀恐怖禽车,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情刊殉,我是刑警寧澤殉摔,帶...
    沈念sama閱讀 35,559評(píng)論 5 345
  • 正文 年R本政府宣布,位于F島的核電站记焊,受9級(jí)特大地震影響逸月,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜遍膜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,167評(píng)論 3 328
  • 文/蒙蒙 一碗硬、第九天 我趴在偏房一處隱蔽的房頂上張望腐缤。 院中可真熱鬧,春花似錦肛响、人聲如沸岭粤。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,779評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)剃浇。三九已至,卻和暖如春猎物,著一層夾襖步出監(jiān)牢的瞬間虎囚,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,912評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工蔫磨, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留淘讥,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,958評(píng)論 2 370
  • 正文 我出身青樓堤如,卻偏偏與公主長(zhǎng)得像蒲列,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子搀罢,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,779評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容