1. Overview-概覽
? 每一個Spark應(yīng)用都是由包含一個main方法的driver program組成荞雏,并且能夠在一個集群上執(zhí)行一系列的并行操作轴合。Spark的第一個主要抽象概念是RDD(Resilient distributed dataset)-分布在集群的各個節(jié)點上能夠被并行操作的被分割的數(shù)據(jù)集创坞。RDD開始可以是由在hdfs(或其他hadoop支持的文件系統(tǒng))上的文件或者是driver program中的一個集合通過轉(zhuǎn)換來創(chuàng)建,用戶可以在內(nèi)存中persist一個RDD來允許它被高效的重復(fù)使用受葛,RDD具備自動恢復(fù)能力题涨。
? Spark的第二個抽象概念是:共享變量。共享變量可以在并行操作中被使用奔坟。默認(rèn)情況携栋,Spark通過在不同的節(jié)點以任務(wù)集的方式來運行并行操作函數(shù),spark會把在并行操作中用到的變量傳遞到每個節(jié)點上咳秉。有時婉支,一個變量需要在不同的任務(wù)之間共享,或者在任務(wù)與主程序driver program之間共享澜建。Spark支持兩種類型的共享變量:廣播變量(broadcast variables)-用來在所有的節(jié)點上緩存一個值向挖;accumulators-可進(jìn)行疊加操作的變量,比如計數(shù)和求和變量炕舵。
2. Resilient Distributed Datasets(RDDs)
? RDD的概念貫穿于Spark的整個生態(tài)系統(tǒng)理論中何之,RDD是一個以并行方式運行具有容錯性的元素集合。在Spark中有兩種方式來創(chuàng)建RDD數(shù)據(jù)集:并行化集合- parallelizing一個在driver program中定義的數(shù)據(jù)集合咽筋;外部數(shù)據(jù)集-指向引用一個外部存儲系統(tǒng)中的數(shù)據(jù)集溶推,比如一個共享文件系統(tǒng)上的文件、HDFS奸攻、HBase或者其他提供了Hadoop InputFormat特性接口的任意數(shù)據(jù)源蒜危。
2.1 并行化集合-Parallelized Collections
? 并行化集合通過在一個存在的java或者scala集合上調(diào)用JavaSparkContext的parallelize方法來創(chuàng)建。集合的元素被復(fù)制來生成一個可并行操作的分布式數(shù)據(jù)集睹耐。以下是一個創(chuàng)建并行化集合的樣例:
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);
創(chuàng)建完成辐赞,分布式數(shù)據(jù)集distData就可以被并行操作。比如硝训,我們可以調(diào)用distData.reduce((a,b) -> a + b)來計算集合的元素和响委。
? 并行化集合還有一個重要的參數(shù)是把一個集合切分成多少個partitions。Spark會在每個partition上運行一個任務(wù)窖梁。典型的在集群的每個CPU上會分配2-4個partitions赘风。Spark會根據(jù)配置自動把一個集合切分成多少個partition,我們也可以自己通過調(diào)用parallelize(data, 10)這個方法來手動設(shè)置你想切分的partition數(shù)窄绒。
2.2 外部數(shù)據(jù)集-External Datasets
? Spark可以從任何Hadoop支持的存儲源創(chuàng)建分布式數(shù)據(jù)集贝次;包括本地文件系統(tǒng)、HDFS彰导、HBase蛔翅、Cassandra、Amazon S3等等位谋。Spark支持文本文件山析、序列化文件和其他任何Hadoop支持的InputFormat格式。
? 文本文件的RDD可以使用SparkContext的textFile方法來創(chuàng)建掏父。這個方法根據(jù)提供的文件URI(可以是一個本地路徑或者是hdfs://, s3n://等形式的URI)將文件內(nèi)容讀取為文件中每個行的集合笋轨。下面也是一個樣例:
JavaRDD<String> distFile = sc.textFile("data.txt");
創(chuàng)建完成,distFile就可以執(zhí)行數(shù)據(jù)集的操作赊淑。比如:我們可以計算所有行的sizes:distFile.map(s -> s.length()).reduce((a, b) -> a + b)爵政。
Spark讀取文件需要注意的:
- 如果使用本地文件系統(tǒng)路徑,那么這個文件必須是要所有節(jié)點可訪問的陶缺〖匦拷貝這個文件到所有的節(jié)點或者是通過網(wǎng)絡(luò)掛載方式掛到一個共享文件系統(tǒng)上饱岸。
- Spark支持的文件輸入方式:文本文件掺出,目錄文件,壓縮文件苫费,以及通配符文件汤锨。例如:你可以使用textFile("/my/directory"), textFile("/my/directory/*.txt"), txtFile("/my/directory/**.gz")。
- textFile方法同樣也支持一個可選的第二個參數(shù)來控制partitions的數(shù)目百框。默認(rèn)的闲礼,Spark給每個文件塊(HDFS中的文件分塊)創(chuàng)建一個partition,當(dāng)然你也可以通過傳遞一個更大的值來要求更多的partitions铐维。但是partitions的數(shù)量不能夠比blocks的數(shù)量少柬泽。
3. RDD操作-RDD Operations
? RDDs支持兩種類型的操作:transformations(轉(zhuǎn)換) - 從一個存在的RDD上創(chuàng)建一個新的RDD;actions(動作) - 在數(shù)據(jù)集上執(zhí)行一個計算操作之后返回一個值給driver program方椎。例如聂抢,map是一個轉(zhuǎn)換操作,將數(shù)據(jù)集傳遞給一個函數(shù)并返回一個新的RDD結(jié)果棠众;reduce是一個動作琳疏,使用某些函數(shù)集合RDD的所有元素并返回一個最終的結(jié)構(gòu)給driver program。
? Spark所有的transformations操作時懶惰的闸拿,也就是說它們不會立刻計算它們的結(jié)果空盼,它們只會記住這些轉(zhuǎn)換。transformations操作只有當(dāng)一個action動作執(zhí)行并需要某個transformations操作的結(jié)果時新荤,這個transformation才會被計算揽趾。這種設(shè)計模式使得Spark運行更加高效。
? 默認(rèn)情況下苛骨,每個transformed RDD在你每次在它上面運行一個action時都會被重新計算篱瞎。然而苟呐,Spark提供了持久化方式,可以讓你把第一次transformation后的結(jié)果RDD保存在內(nèi)存或者磁盤上俐筋,這樣如果下次有需要這個transformed RDD的時候就不用再次計算從而加快整個計算的速度牵素。
3.1 基本操作-Basic
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);
第一行從一個外部文件創(chuàng)建了一個RDD。這個RDD并不會被加載到內(nèi)存中澄者,lines只是引用了這個文件而已笆呆。第二行的lineLengths是map轉(zhuǎn)換操作的結(jié)果,由于懶惰性這個結(jié)果不會馬上被計算粱挡。最后一行赠幕,當(dāng)執(zhí)行reduce操作時,由于這是一個action询筏,在這個時候榕堰,Spark會把這個計算分成多個任務(wù)分發(fā)到集群中的不同機器上,每個機器會執(zhí)行它本地的map和reduce操作屈留,然后返回它的結(jié)果值到driver program局冰。
如果我們要多次用到lineLengths的值,那么我們可以添加下面這一行代碼:
lineLengths.persist(StorageLevel.MEMORY_ONLY());
在執(zhí)行reduce操作前,上面這句代碼會在lineLengths第一次被計算出來后保存到內(nèi)存中灌危。
3.2 函數(shù)傳遞-Passing Functions to Spark
? Spark提供的API對于函數(shù)的傳遞具有嚴(yán)重的依賴性康二。在java里面,傳遞函數(shù)只能通過類來展現(xiàn)勇蝙。有兩種方式來創(chuàng)建這樣的函數(shù):
- 實現(xiàn)org.apache.spark.api.java.function.Function接口沫勿,或者是匿名內(nèi)部類;
- 在Java 8味混,使用lambda表達(dá)式來簡化這個實現(xiàn)产雹。
lanbda表達(dá)式的方式上面有樣例。下面是匿名內(nèi)部類和實現(xiàn)接口的方式來實現(xiàn)通上面代碼一樣的功能:
// 匿名內(nèi)部類
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
public Integer call(String s) { return s.length(); }
});
int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) { return a + b; }
});
// 實現(xiàn)接口方式
class GetLength implements Function<String, Integer> {
public Integer call(String s) { return s.length(); }
}
class Sum implements Function2<Integer, Integer, Integer> {
public Integer call(Integer a, Integer b) { return a + b; }
}
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new GetLength());
int totalLength = lineLengths.reduce(new Sum());
4. 理解閉合-Understanding closures
? 在Spark中最難理解的一件事:當(dāng)在集群中執(zhí)行代碼時翁锡,變量和函數(shù)的生命周期和作用域的問題蔓挖。RDD操作在變量的作用域外能夠修改他們的值(注意對這一點的理解:是夸機器導(dǎo)致的這個問題出現(xiàn))是導(dǎo)致這件事發(fā)生的主要原因。
4.1 例子
? 考慮下面的RDD操作馆衔,可能在不同的環(huán)境下執(zhí)行會有不同的結(jié)果(取決與是否在同一個jvm上運行)瘟判。一種常見情況是在Spark的local模式和Spark的cluster模式運行時:
int counter = 0;
JavaRDD<Integer> rdd = sc.parallelize(data);
// Wrong: Don't do this!!
rdd.foreach(x -> counter += x);
println("Counter value: " + counter);
Local vs. cluster modes
? 上面代碼的行為是不確定的。為了執(zhí)行這個作業(yè)角溃,Spark會把RDD操作分配成不同的多個任務(wù)進(jìn)程拷获,每個任務(wù)進(jìn)程都由每個Worker node上的executor執(zhí)行器來執(zhí)行。在被每個executor執(zhí)行器執(zhí)行之前减细,Spark會計算每個任務(wù)的closure匆瓜。這個closure是只那些變量和方法-為了執(zhí)行在RDD上的計算必須讓executor可見的變量和方法。這些closure會被序列化并被發(fā)送到每個executor上面。
? 在closure中的變量現(xiàn)在被發(fā)送到了每個executor上驮吱,executor中有了這些變量的副本茧妒,當(dāng)counter變量在foreach函數(shù)中被引用的時候,這個counter變量不再是driver program所運行節(jié)點上的counter變量了糠馆,雖然在driver program節(jié)點上任然存在counter這個變量嘶伟,但是它的變量對所有的executors是不可見怎憋。executor只能夠訪問到從closure上復(fù)制過來的在本地機器上的counter又碌。所以,counter的最終結(jié)果還是零绊袋。
? 在local模式毕匀,某些條件下,foreach函數(shù)將會在一個相同的jvm虛擬機上運行癌别,可能會引用的同一個counter變量皂岔,在這種情況下counter的值可能會被更新。
? 在上面的場景中為了確保確定的行為發(fā)生展姐,我們應(yīng)該使用Accumulator躁垛。在Spark中Accumulator提供了一種機制來保證在集群中的夸節(jié)點并行任務(wù)能夠安全的更新變量。Accumulator會在稍后討論圾笨。
Printing elements of an RDD
? 一種另外的場景是使用rdd.foreach(println)來打印一個RDD中的所有元素教馆。在單機上,這個可以打印出RDD上的元素擂达。然而在集群中土铺,executor的標(biāo)準(zhǔn)輸出是寫到executor上的標(biāo)準(zhǔn)輸出而不是driver program節(jié)點上的標(biāo)準(zhǔn)輸出,所以并不會在顯示相要的結(jié)果板鬓。為了打印RDD上的所有元素悲敷,我們可以使用collect()方法來將RDD數(shù)據(jù)帶到driver program節(jié)點上:rdd.collect().foreach(println)。這個操作可能會造成driver program節(jié)點內(nèi)存溢出俭令,因為collect()會把RDD的所有數(shù)據(jù)抓到driver program單個節(jié)點上后德。如果你需要打印少量元素,一個安全的方式是使用:rdd.take(100).foreach(println)抄腔。
5. 鍵值對的RDD-Working with Key-Value Pairs
? Spark的大多數(shù)操作可以在任何類型的RDD上工作瓢湃,但是有少部分特殊的操作只能運行在key-value形式的RDD上。最常見的一個是“shuffle”操作妓柜,比如說:通過鍵來分組和聚合的操作箱季。
? key-value形式的RDD通過JavaPairRDD類來表示。我們可以使用mapToPair和flatMapToPair操作來從JavaRDD來構(gòu)建JavaPairRDD棍掐。例如藏雏,下面的代碼使用reduceByKey操作來計算一個文件中每一行文本出現(xiàn)的次數(shù):
JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
Shuffle Performance Impact
? Shuffle操作時一個非常昂貴的操作,因為它涉及到磁盤I/O,數(shù)據(jù)序列化掘殴,網(wǎng)絡(luò)I/O赚瘦。同時也會浪費很多堆內(nèi)存,還會產(chǎn)生好多中間文件奏寨。這個部分簡化了起意,看得不是很懂。
6. RDD持久化-RDD Persistance
? Spark的一個重要能力是持久化或者緩存一個dataset在內(nèi)存中病瞳。當(dāng)我們持久化一個RDD揽咕,每個節(jié)點會存儲屬于這個RDD中的partitions,并且這個持久化的RDD能被多個需要它的action重復(fù)使用套菜。這個特點使得在以后執(zhí)行的action能夠更加快速亲善。
? 我們可以使用persist()和cache()方法來持久化一個RDD,這個RDD第一次被計算之后將會被保存到節(jié)點的內(nèi)存中逗柴。Spark的持久化是可容錯的-如果這個持久化RDD的任何partition丟失了蛹头,那么Spark會自動重新去計算。
? 此外戏溺,每個持久化RDD可以允許你存儲為不同的級別渣蜗。這些存儲級別可以通過StorageLevel得到。
存儲級別 | 描述 |
---|---|
MEMORY_ONLY | |
MEMORY_AND_DISK | |
MEMORY_ONLY_SER(Java and Scala) | |
MEMORY_AND_DISK_SER(Java and Scala) | |
DISK_ONLY | |
MEMORY_ONLY_2,MEMORY_AND_DISK_2 | |
OFF_HEAP(experimental) |
數(shù)據(jù)刪除
? Spark會自動監(jiān)控緩存信息并且刪除老的數(shù)據(jù)(使用的LRU least-recently-used算法)旷祸。如果要手動刪除耕拷,可以調(diào)用RDD.unpersist()方法。
7. 共享變量-Shared Variables
? 當(dāng)一個函數(shù)被傳遞給在遠(yuǎn)程集群節(jié)點運行的Spark的操作(比如map或者reduce)肋僧,函數(shù)所用到的變量都是一個獨立的副本斑胜。這些變量被復(fù)制到每個節(jié)點,而且在每個節(jié)點上的更新不會反饋到driver program上嫌吠。Spark提供兩種方式來限制共享變量:broadcast variables和accumulators止潘。
7.1 廣播變量-Broadcast Variables
? 廣播變量程序員緩存一個只讀變量在每個機器上,而不是傳遞副本到每個任務(wù)上辫诅。他們能被用來以一種有效方式給每個節(jié)點傳遞一個大數(shù)據(jù)集的拷貝凭戴。Spark也通過高效的廣播算法來降低廣播變量帶來的通信消耗。
? 廣播變量通過SparkContext.broadcast(v)的方式來創(chuàng)建炕矮,廣播變量的值可以通過value()方法獲得么夫。代碼如下:
Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});
broadcastVar.value();
// returns [1, 2, 3]
? 在一個廣播變量被創(chuàng)建以后,應(yīng)該使用broadcastVar而不要繼續(xù)使用v來操作肤视。此外档痪,為了確保所有的節(jié)點得到相同的廣播變量值,v的值在廣播之后不應(yīng)該再被修改邢滑。
7.2 Accumulators
? Accumulators變量只能通過聯(lián)想和交換操作(associative and commutative operation)來執(zhí)行added操作腐螟。Accumulators變量能夠用了實現(xiàn)計數(shù)和求和。Spark本身只支持?jǐn)?shù)據(jù)類型的Accumulators變量,程序員可以自己增加新的實現(xiàn)類型乐纸。
? 如果一個Accumulatos變量被創(chuàng)建衬廷,那么它能夠在Spark的UI中查看到。? 一個Accumulator變量可以通過SparkContext.accumulator(v)的方式來創(chuàng)建汽绢。然后每個任務(wù)可以通過add方法或者+=(這個操作只在Scala和Python中)操作來對他進(jìn)行操作吗跋。但是,每個任務(wù)不能都讀取Accumulator的值宁昭,只有driver program能夠讀取Accumulator變量的值跌宛。
下面代碼用通過Accumulator變量來計算一個數(shù)組中所有元素的和:
LongAccumulator accum = sc.sc().longAccumulator();
sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
accum.value();
// returns 10
? Accumulator變量原生只支持?jǐn)?shù)值類型,程序員可以創(chuàng)建我們自己的Accumulator變量的數(shù)據(jù)類型久窟,通過實現(xiàn)AccumulatorParam接口秩冈。例如:
class VectorAccumulatorParam implements AccumulatorParam<Vector> {
public Vector zero(Vector initialValue) {
return Vector.zeros(initialValue.size());
}
public Vector addInPlace(Vector v1, Vector v2) {
v1.addInPlace(v2); return v1;
}
}
// Then, create an Accumulator of this type:
Accumulator<Vector> vecAccum = sc.accumulator(new Vector(...), new VectorAccumulatorParam());
5. 結(jié)語
? 我也是剛剛接觸Spark,這篇文章也是基于官方文檔寫的斥扛。所以可能有很多細(xì)節(jié)和概念沒有寫清楚,但是對于Spark的一個基本理解入門丹锹,我覺得是可以的稀颁。這篇文章中有什么寫的不好和不到位的地方,還請大家多多指出來楣黍。