Spark2.0 Programming Guide(Spark2.0編程指導(dǎo))

1. Overview-概覽

? 每一個Spark應(yīng)用都是由包含一個main方法的driver program組成荞雏,并且能夠在一個集群上執(zhí)行一系列的并行操作轴合。Spark的第一個主要抽象概念是RDD(Resilient distributed dataset)-分布在集群的各個節(jié)點上能夠被并行操作的被分割的數(shù)據(jù)集创坞。RDD開始可以是由在hdfs(或其他hadoop支持的文件系統(tǒng))上的文件或者是driver program中的一個集合通過轉(zhuǎn)換來創(chuàng)建,用戶可以在內(nèi)存中persist一個RDD來允許它被高效的重復(fù)使用受葛,RDD具備自動恢復(fù)能力题涨。

? Spark的第二個抽象概念是:共享變量。共享變量可以在并行操作中被使用奔坟。默認(rèn)情況携栋,Spark通過在不同的節(jié)點以任務(wù)集的方式來運行并行操作函數(shù),spark會把在并行操作中用到的變量傳遞到每個節(jié)點上咳秉。有時婉支,一個變量需要在不同的任務(wù)之間共享,或者在任務(wù)與主程序driver program之間共享澜建。Spark支持兩種類型的共享變量:廣播變量(broadcast variables)-用來在所有的節(jié)點上緩存一個值向挖;accumulators-可進(jìn)行疊加操作的變量,比如計數(shù)和求和變量炕舵。

2. Resilient Distributed Datasets(RDDs)

? RDD的概念貫穿于Spark的整個生態(tài)系統(tǒng)理論中何之,RDD是一個以并行方式運行具有容錯性的元素集合。在Spark中有兩種方式來創(chuàng)建RDD數(shù)據(jù)集:并行化集合- parallelizing一個在driver program中定義的數(shù)據(jù)集合咽筋;外部數(shù)據(jù)集-指向引用一個外部存儲系統(tǒng)中的數(shù)據(jù)集溶推,比如一個共享文件系統(tǒng)上的文件、HDFS奸攻、HBase或者其他提供了Hadoop InputFormat特性接口的任意數(shù)據(jù)源蒜危。

2.1 并行化集合-Parallelized Collections

? 并行化集合通過在一個存在的java或者scala集合上調(diào)用JavaSparkContextparallelize方法來創(chuàng)建。集合的元素被復(fù)制來生成一個可并行操作的分布式數(shù)據(jù)集睹耐。以下是一個創(chuàng)建并行化集合的樣例:

List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);

創(chuàng)建完成辐赞,分布式數(shù)據(jù)集distData就可以被并行操作。比如硝训,我們可以調(diào)用distData.reduce((a,b) -> a + b)來計算集合的元素和响委。

? 并行化集合還有一個重要的參數(shù)是把一個集合切分成多少個partitions。Spark會在每個partition上運行一個任務(wù)窖梁。典型的在集群的每個CPU上會分配2-4個partitions赘风。Spark會根據(jù)配置自動把一個集合切分成多少個partition,我們也可以自己通過調(diào)用parallelize(data, 10)這個方法來手動設(shè)置你想切分的partition數(shù)窄绒。

2.2 外部數(shù)據(jù)集-External Datasets

? Spark可以從任何Hadoop支持的存儲源創(chuàng)建分布式數(shù)據(jù)集贝次;包括本地文件系統(tǒng)、HDFS彰导、HBase蛔翅、Cassandra、Amazon S3等等位谋。Spark支持文本文件山析、序列化文件和其他任何Hadoop支持的InputFormat格式。

? 文本文件的RDD可以使用SparkContext的textFile方法來創(chuàng)建掏父。這個方法根據(jù)提供的文件URI(可以是一個本地路徑或者是hdfs://, s3n://等形式的URI)將文件內(nèi)容讀取為文件中每個行的集合笋轨。下面也是一個樣例:

JavaRDD<String> distFile = sc.textFile("data.txt");

創(chuàng)建完成,distFile就可以執(zhí)行數(shù)據(jù)集的操作赊淑。比如:我們可以計算所有行的sizes:distFile.map(s -> s.length()).reduce((a, b) -> a + b)爵政。

Spark讀取文件需要注意的:

  • 如果使用本地文件系統(tǒng)路徑,那么這個文件必須是要所有節(jié)點可訪問的陶缺〖匦拷貝這個文件到所有的節(jié)點或者是通過網(wǎng)絡(luò)掛載方式掛到一個共享文件系統(tǒng)上饱岸。
  • Spark支持的文件輸入方式:文本文件掺出,目錄文件,壓縮文件苫费,以及通配符文件汤锨。例如:你可以使用textFile("/my/directory"), textFile("/my/directory/*.txt"), txtFile("/my/directory/**.gz")。
  • textFile方法同樣也支持一個可選的第二個參數(shù)來控制partitions的數(shù)目百框。默認(rèn)的闲礼,Spark給每個文件塊(HDFS中的文件分塊)創(chuàng)建一個partition,當(dāng)然你也可以通過傳遞一個更大的值來要求更多的partitions铐维。但是partitions的數(shù)量不能夠比blocks的數(shù)量少柬泽。

3. RDD操作-RDD Operations

? RDDs支持兩種類型的操作:transformations(轉(zhuǎn)換) - 從一個存在的RDD上創(chuàng)建一個新的RDD;actions(動作) - 在數(shù)據(jù)集上執(zhí)行一個計算操作之后返回一個值給driver program方椎。例如聂抢,map是一個轉(zhuǎn)換操作,將數(shù)據(jù)集傳遞給一個函數(shù)并返回一個新的RDD結(jié)果棠众;reduce是一個動作琳疏,使用某些函數(shù)集合RDD的所有元素并返回一個最終的結(jié)構(gòu)給driver program

? Spark所有的transformations操作時懶惰的闸拿,也就是說它們不會立刻計算它們的結(jié)果空盼,它們只會記住這些轉(zhuǎn)換。transformations操作只有當(dāng)一個action動作執(zhí)行并需要某個transformations操作的結(jié)果時新荤,這個transformation才會被計算揽趾。這種設(shè)計模式使得Spark運行更加高效。

? 默認(rèn)情況下苛骨,每個transformed RDD在你每次在它上面運行一個action時都會被重新計算篱瞎。然而苟呐,Spark提供了持久化方式,可以讓你把第一次transformation后的結(jié)果RDD保存在內(nèi)存或者磁盤上俐筋,這樣如果下次有需要這個transformed RDD的時候就不用再次計算從而加快整個計算的速度牵素。

3.1 基本操作-Basic

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);

第一行從一個外部文件創(chuàng)建了一個RDD。這個RDD并不會被加載到內(nèi)存中澄者,lines只是引用了這個文件而已笆呆。第二行的lineLengths是map轉(zhuǎn)換操作的結(jié)果,由于懶惰性這個結(jié)果不會馬上被計算粱挡。最后一行赠幕,當(dāng)執(zhí)行reduce操作時,由于這是一個action询筏,在這個時候榕堰,Spark會把這個計算分成多個任務(wù)分發(fā)到集群中的不同機器上,每個機器會執(zhí)行它本地的map和reduce操作屈留,然后返回它的結(jié)果值到driver program局冰。

如果我們要多次用到lineLengths的值,那么我們可以添加下面這一行代碼:

lineLengths.persist(StorageLevel.MEMORY_ONLY());

在執(zhí)行reduce操作前,上面這句代碼會在lineLengths第一次被計算出來后保存到內(nèi)存中灌危。

3.2 函數(shù)傳遞-Passing Functions to Spark

? Spark提供的API對于函數(shù)的傳遞具有嚴(yán)重的依賴性康二。在java里面,傳遞函數(shù)只能通過類來展現(xiàn)勇蝙。有兩種方式來創(chuàng)建這樣的函數(shù):

  • 實現(xiàn)org.apache.spark.api.java.function.Function接口沫勿,或者是匿名內(nèi)部類;
  • 在Java 8味混,使用lambda表達(dá)式來簡化這個實現(xiàn)产雹。

lanbda表達(dá)式的方式上面有樣例。下面是匿名內(nèi)部類和實現(xiàn)接口的方式來實現(xiàn)通上面代碼一樣的功能:

// 匿名內(nèi)部類
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
  public Integer call(String s) { return s.length(); }
});
int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
  public Integer call(Integer a, Integer b) { return a + b; }
});
// 實現(xiàn)接口方式
class GetLength implements Function<String, Integer> {
  public Integer call(String s) { return s.length(); }
}
class Sum implements Function2<Integer, Integer, Integer> {
  public Integer call(Integer a, Integer b) { return a + b; }
}

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new GetLength());
int totalLength = lineLengths.reduce(new Sum());

4. 理解閉合-Understanding closures

? 在Spark中最難理解的一件事:當(dāng)在集群中執(zhí)行代碼時翁锡,變量和函數(shù)的生命周期和作用域的問題蔓挖。RDD操作在變量的作用域外能夠修改他們的值(注意對這一點的理解:是夸機器導(dǎo)致的這個問題出現(xiàn))是導(dǎo)致這件事發(fā)生的主要原因。

4.1 例子

? 考慮下面的RDD操作馆衔,可能在不同的環(huán)境下執(zhí)行會有不同的結(jié)果(取決與是否在同一個jvm上運行)瘟判。一種常見情況是在Spark的local模式和Spark的cluster模式運行時:

int counter = 0;
JavaRDD<Integer> rdd = sc.parallelize(data);

// Wrong: Don't do this!!
rdd.foreach(x -> counter += x);

println("Counter value: " + counter);

Local vs. cluster modes

? 上面代碼的行為是不確定的。為了執(zhí)行這個作業(yè)角溃,Spark會把RDD操作分配成不同的多個任務(wù)進(jìn)程拷获,每個任務(wù)進(jìn)程都由每個Worker node上的executor執(zhí)行器來執(zhí)行。在被每個executor執(zhí)行器執(zhí)行之前减细,Spark會計算每個任務(wù)的closure匆瓜。這個closure是只那些變量和方法-為了執(zhí)行在RDD上的計算必須讓executor可見的變量和方法。這些closure會被序列化并被發(fā)送到每個executor上面。

? 在closure中的變量現(xiàn)在被發(fā)送到了每個executor上驮吱,executor中有了這些變量的副本茧妒,當(dāng)counter變量在foreach函數(shù)中被引用的時候,這個counter變量不再是driver program所運行節(jié)點上的counter變量了糠馆,雖然在driver program節(jié)點上任然存在counter這個變量嘶伟,但是它的變量對所有的executors是不可見怎憋。executor只能夠訪問到從closure上復(fù)制過來的在本地機器上的counter又碌。所以,counter的最終結(jié)果還是零绊袋。

? 在local模式毕匀,某些條件下,foreach函數(shù)將會在一個相同的jvm虛擬機上運行癌别,可能會引用的同一個counter變量皂岔,在這種情況下counter的值可能會被更新。

? 在上面的場景中為了確保確定的行為發(fā)生展姐,我們應(yīng)該使用Accumulator躁垛。在Spark中Accumulator提供了一種機制來保證在集群中的夸節(jié)點并行任務(wù)能夠安全的更新變量。Accumulator會在稍后討論圾笨。

Printing elements of an RDD

? 一種另外的場景是使用rdd.foreach(println)來打印一個RDD中的所有元素教馆。在單機上,這個可以打印出RDD上的元素擂达。然而在集群中土铺,executor的標(biāo)準(zhǔn)輸出是寫到executor上的標(biāo)準(zhǔn)輸出而不是driver program節(jié)點上的標(biāo)準(zhǔn)輸出,所以并不會在顯示相要的結(jié)果板鬓。為了打印RDD上的所有元素悲敷,我們可以使用collect()方法來將RDD數(shù)據(jù)帶到driver program節(jié)點上:rdd.collect().foreach(println)。這個操作可能會造成driver program節(jié)點內(nèi)存溢出俭令,因為collect()會把RDD的所有數(shù)據(jù)抓到driver program單個節(jié)點上后德。如果你需要打印少量元素,一個安全的方式是使用:rdd.take(100).foreach(println)抄腔。

5. 鍵值對的RDD-Working with Key-Value Pairs

? Spark的大多數(shù)操作可以在任何類型的RDD上工作瓢湃,但是有少部分特殊的操作只能運行在key-value形式的RDD上。最常見的一個是“shuffle”操作妓柜,比如說:通過鍵來分組和聚合的操作箱季。

? key-value形式的RDD通過JavaPairRDD類來表示。我們可以使用mapToPair和flatMapToPair操作來從JavaRDD來構(gòu)建JavaPairRDD棍掐。例如藏雏,下面的代碼使用reduceByKey操作來計算一個文件中每一行文本出現(xiàn)的次數(shù):

JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);

Shuffle Performance Impact

? Shuffle操作時一個非常昂貴的操作,因為它涉及到磁盤I/O,數(shù)據(jù)序列化掘殴,網(wǎng)絡(luò)I/O赚瘦。同時也會浪費很多堆內(nèi)存,還會產(chǎn)生好多中間文件奏寨。這個部分簡化了起意,看得不是很懂

6. RDD持久化-RDD Persistance

? Spark的一個重要能力是持久化或者緩存一個dataset在內(nèi)存中病瞳。當(dāng)我們持久化一個RDD揽咕,每個節(jié)點會存儲屬于這個RDD中的partitions,并且這個持久化的RDD能被多個需要它的action重復(fù)使用套菜。這個特點使得在以后執(zhí)行的action能夠更加快速亲善。

? 我們可以使用persist()和cache()方法來持久化一個RDD,這個RDD第一次被計算之后將會被保存到節(jié)點的內(nèi)存中逗柴。Spark的持久化是可容錯的-如果這個持久化RDD的任何partition丟失了蛹头,那么Spark會自動重新去計算。

? 此外戏溺,每個持久化RDD可以允許你存儲為不同的級別渣蜗。這些存儲級別可以通過StorageLevel得到。

存儲級別 描述
MEMORY_ONLY
MEMORY_AND_DISK
MEMORY_ONLY_SER(Java and Scala)
MEMORY_AND_DISK_SER(Java and Scala)
DISK_ONLY
MEMORY_ONLY_2,MEMORY_AND_DISK_2
OFF_HEAP(experimental)

數(shù)據(jù)刪除

? Spark會自動監(jiān)控緩存信息并且刪除老的數(shù)據(jù)(使用的LRU least-recently-used算法)旷祸。如果要手動刪除耕拷,可以調(diào)用RDD.unpersist()方法。

7. 共享變量-Shared Variables

? 當(dāng)一個函數(shù)被傳遞給在遠(yuǎn)程集群節(jié)點運行的Spark的操作(比如map或者reduce)肋僧,函數(shù)所用到的變量都是一個獨立的副本斑胜。這些變量被復(fù)制到每個節(jié)點,而且在每個節(jié)點上的更新不會反饋到driver program上嫌吠。Spark提供兩種方式來限制共享變量:broadcast variablesaccumulators止潘。

7.1 廣播變量-Broadcast Variables

? 廣播變量程序員緩存一個只讀變量在每個機器上,而不是傳遞副本到每個任務(wù)上辫诅。他們能被用來以一種有效方式給每個節(jié)點傳遞一個大數(shù)據(jù)集的拷貝凭戴。Spark也通過高效的廣播算法來降低廣播變量帶來的通信消耗。

? 廣播變量通過SparkContext.broadcast(v)的方式來創(chuàng)建炕矮,廣播變量的值可以通過value()方法獲得么夫。代碼如下:

Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});

broadcastVar.value();
// returns [1, 2, 3]

? 在一個廣播變量被創(chuàng)建以后,應(yīng)該使用broadcastVar而不要繼續(xù)使用v來操作肤视。此外档痪,為了確保所有的節(jié)點得到相同的廣播變量值,v的值在廣播之后不應(yīng)該再被修改邢滑。

7.2 Accumulators

? Accumulators變量只能通過聯(lián)想和交換操作(associative and commutative operation)來執(zhí)行added操作腐螟。Accumulators變量能夠用了實現(xiàn)計數(shù)和求和。Spark本身只支持?jǐn)?shù)據(jù)類型的Accumulators變量,程序員可以自己增加新的實現(xiàn)類型乐纸。

? 如果一個Accumulatos變量被創(chuàng)建衬廷,那么它能夠在Spark的UI中查看到。
Accumulators in the Spark UI

? 一個Accumulator變量可以通過SparkContext.accumulator(v)的方式來創(chuàng)建汽绢。然后每個任務(wù)可以通過add方法或者+=(這個操作只在Scala和Python中)操作來對他進(jìn)行操作吗跋。但是,每個任務(wù)不能都讀取Accumulator的值宁昭,只有driver program能夠讀取Accumulator變量的值跌宛。

下面代碼用通過Accumulator變量來計算一個數(shù)組中所有元素的和:

LongAccumulator accum = sc.sc().longAccumulator();

sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

accum.value();
// returns 10

? Accumulator變量原生只支持?jǐn)?shù)值類型,程序員可以創(chuàng)建我們自己的Accumulator變量的數(shù)據(jù)類型久窟,通過實現(xiàn)AccumulatorParam接口秩冈。例如:

class VectorAccumulatorParam implements AccumulatorParam<Vector> {
  public Vector zero(Vector initialValue) {
    return Vector.zeros(initialValue.size());
  }
  public Vector addInPlace(Vector v1, Vector v2) {
    v1.addInPlace(v2); return v1;
  }
}

// Then, create an Accumulator of this type:
Accumulator<Vector> vecAccum = sc.accumulator(new Vector(...), new VectorAccumulatorParam());

5. 結(jié)語

? 我也是剛剛接觸Spark,這篇文章也是基于官方文檔寫的斥扛。所以可能有很多細(xì)節(jié)和概念沒有寫清楚,但是對于Spark的一個基本理解入門丹锹,我覺得是可以的稀颁。這篇文章中有什么寫的不好和不到位的地方,還請大家多多指出來楣黍。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末匾灶,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子租漂,更是在濱河造成了極大的恐慌阶女,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,657評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件哩治,死亡現(xiàn)場離奇詭異秃踩,居然都是意外死亡,警方通過查閱死者的電腦和手機业筏,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,889評論 3 394
  • 文/潘曉璐 我一進(jìn)店門憔杨,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人蒜胖,你說我怎么就攤上這事消别。” “怎么了台谢?”我有些...
    開封第一講書人閱讀 164,057評論 0 354
  • 文/不壞的土叔 我叫張陵龙亲,是天一觀的道長。 經(jīng)常有香客問我锨天,道長囱淋,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,509評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮怀读,結(jié)果婚禮上诉位,老公的妹妹穿的比我還像新娘。我一直安慰自己菜枷,他們只是感情好苍糠,可當(dāng)我...
    茶點故事閱讀 67,562評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著啤誊,像睡著了一般岳瞭。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上蚊锹,一...
    開封第一講書人閱讀 51,443評論 1 302
  • 那天瞳筏,我揣著相機與錄音,去河邊找鬼牡昆。 笑死姚炕,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的丢烘。 我是一名探鬼主播柱宦,決...
    沈念sama閱讀 40,251評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼播瞳!你這毒婦竟也來了掸刊?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,129評論 0 276
  • 序言:老撾萬榮一對情侶失蹤赢乓,失蹤者是張志新(化名)和其女友劉穎忧侧,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體牌芋,經(jīng)...
    沈念sama閱讀 45,561評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡蚓炬,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,779評論 3 335
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了姜贡。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片试吁。...
    茶點故事閱讀 39,902評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖楼咳,靈堂內(nèi)的尸體忽然破棺而出熄捍,到底是詐尸還是另有隱情,我是刑警寧澤母怜,帶...
    沈念sama閱讀 35,621評論 5 345
  • 正文 年R本政府宣布余耽,位于F島的核電站,受9級特大地震影響苹熏,放射性物質(zhì)發(fā)生泄漏碟贾。R本人自食惡果不足惜币喧,卻給世界環(huán)境...
    茶點故事閱讀 41,220評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望袱耽。 院中可真熱鬧杀餐,春花似錦、人聲如沸朱巨。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,838評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽冀续。三九已至琼讽,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間洪唐,已是汗流浹背钻蹬。 一陣腳步聲響...
    開封第一講書人閱讀 32,971評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留凭需,地道東北人问欠。 一個月前我還...
    沈念sama閱讀 48,025評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像功炮,于是被迫代替她去往敵國和親溅潜。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,843評論 2 354

推薦閱讀更多精彩內(nèi)容