Resilient Distributed Datasets
RDD是每個(gè)spark程序的核心竿拆,本節(jié)我們來看看更多細(xì)節(jié)埠戳。
Creation
創(chuàng)建RDD有三種方式:從一個(gè)內(nèi)存中的對(duì)象集合昂秃,被稱為并行化(parallelizing) 一個(gè)集合败晴;使用一個(gè)外部存儲(chǔ)(比如HDFS)的數(shù)據(jù)集阶淘;轉(zhuǎn)變(transform)已存在的RDD未桥。在對(duì)少量的輸入數(shù)據(jù)并行地進(jìn)行CPU密集型運(yùn)算時(shí)笔刹,第一種方式非常有用。例如冬耿,下面執(zhí)行從1到10的獨(dú)立運(yùn)算:
val params = sc.parallelize(1 to 10)
val result = params.map(performExpensiveComputation)
函數(shù)performExpensiveComputation并行處理輸入數(shù)據(jù)舌菜。并行性的級(jí)別由屬性spark.default.parallelism決定,該屬性的默認(rèn)值取決于Spark的運(yùn)行方式亦镶。本地運(yùn)行時(shí)日月,是本地機(jī)器的核心數(shù)量,集群運(yùn)行時(shí)缤骨,是集群中所有執(zhí)行(executor)節(jié)點(diǎn)的核心總數(shù)量爱咬。
可以為某特定運(yùn)算設(shè)置并行性級(jí)別,指定parallelize()方法的第二個(gè)參數(shù)即可:
sc.parallelize(1 to 10, 10)
創(chuàng)建RDD的第二種方式绊起,是創(chuàng)建一個(gè)指向外部數(shù)據(jù)集的引用精拟。我們已經(jīng)見過怎樣為一個(gè)文本文件創(chuàng)建String對(duì)象的RDD:
val text:RDD[String] = sc.textFile(inputPath)
路徑inputPath可以是任意的Hadoop文件系統(tǒng)路徑,比如本地文件系統(tǒng)或HDFS上的一個(gè)文件虱歪。內(nèi)部來看蜂绎,Spark使用舊的MapReduce API中的TextInputFormat來讀取這個(gè)文件。這就意味著文件切分行為與Hadoop是一樣的笋鄙,因此在HDFS的情況下师枣,一個(gè)Spark分區(qū)對(duì)應(yīng)一個(gè)HDFS塊(block)。這個(gè)默認(rèn)行為可以改變萧落,傳入第二個(gè)參數(shù)來請(qǐng)求一個(gè)特殊的切分?jǐn)?shù)量:
sc.textFile(inputPath, 10)
另外一個(gè)方法允許把多個(gè)文本文件作為一個(gè)整體來處理践美,返回的RDD中洗贰,是成對(duì)的string,第一個(gè)string是文件的路徑拨脉,第二個(gè)string是文件的內(nèi)容哆姻。因?yàn)槊總€(gè)文件都會(huì)加載進(jìn)內(nèi)存,所以這種方式僅僅適合于小文件:
val files:RDD[(String, String)] = sc.wholeTextFiles(inputPath)
Spark能夠處理文本文件以外的其他文件格式玫膀,比如矛缨,序列文件可以這樣讀入:
sc.sequenceFile[IntWritable, Text](inputPath)
注意這里指定序列文件的鍵和值的Writable類型的方式。對(duì)于常用的Writable類型帖旨,Spark能夠映射到Java中的等價(jià)物箕昭,因此我們可以使用等價(jià)的方式:
sc.sequenceFile[Int, String](inputPath)
從任意的Hadoop InputFormat來創(chuàng)建RDD,有兩種方式:基于文件的格式解阅,使用hadoopFile()落竹,接收一個(gè)路徑;其他格式货抄,比如HBase的TableInputFormat述召,使用hadoopRDD()。這些方法使用舊的MapReduce API蟹地。如果要用新的MapReduce API积暖,使用newAPIHadoopFile()和newAPIHadoopRDD()。下面是讀取Avro數(shù)據(jù)文件的示例怪与,使用特定的API和一個(gè)WeatherRecord類:
val job = new Job()
AvroJob.setInputKeySchema(job, WeatherRecord.getClassSchema)
val data = sc.newAPIHadoopFile(inputPath,
classOf[AvroKeyInputFormat[WeatherRecord]],
classOf[AvroKey[WeatherRecord]], classOf[NullWritable],
job.getConfiguration)
除了路徑之外夺刑,newAPIHadoopFile()方法還需要InputFormat的類型、鍵的類型分别、值的類型遍愿,再加上Hadoop配置,該配置中帶有Avro模式耘斩,在第二行我們使用AvroJob幫助類做的設(shè)置沼填。
創(chuàng)建RDD的第三種方式,是轉(zhuǎn)變(transform)已存在的RDD煌往。
Transformations and Actions
Spark提供兩種類型的操作:transformations和actions倾哺。transformations從已存在的RDD生成新的RDD,而actions會(huì)觸發(fā)運(yùn)算并輸出結(jié)果——返回給用戶刽脖,或者保存到外部存儲(chǔ)羞海。
Actions會(huì)立刻產(chǎn)生影響,而transformations不會(huì)——它們是懶惰的曲管,它們不做任何工作却邓,直到action被觸發(fā)。下面的例子院水,把文本文件中的每一行轉(zhuǎn)為小寫:
val text = sc.textFile(inputPath)
val lower: RDD[String] = text.map(_.toLowerCase())
lower.foreach(println(_))
map()方法是個(gè)transformation腊徙,Spark內(nèi)部這樣處理:稍晚的時(shí)候简十,一個(gè)函數(shù)(這里是toLowerCase())會(huì)被調(diào)用,來處理RDD中的每一個(gè)元素撬腾。這個(gè)函數(shù)實(shí)際上沒有執(zhí)行螟蝙,直到foreach()方法(這是個(gè)action)被調(diào)用,然后Spark會(huì)運(yùn)行一個(gè)job民傻,讀取輸入的文件胰默,對(duì)文件中的每一行調(diào)用toLowerCase(),然后把結(jié)果寫到控制臺(tái)漓踢。
怎樣分辨一個(gè)操作究竟是transformation還是action呢牵署?一個(gè)方法是看它的返回類型:如果返回類型是RDD,這是個(gè)transformation喧半;否則就是action奴迅。當(dāng)你查閱RDD的文檔時(shí),這種方法是很有用的挺据。對(duì)RDD執(zhí)行的大多數(shù)操作取具,可以在RDD的文檔(org.apache.spark.rdd包)中找到,更多的操作在PairRDDFunctions里扁耐,這里包含了處理鍵值對(duì)RDD的transformations和actions者填。
Spark的庫(kù)中包含了豐富的操作,有transformations者諸如映射(mapping)做葵、分組(grouping)、聚合(aggregating)心墅、再分配(repartitioning)酿矢、取樣(sampling)、連接(joining)多個(gè)RDD怎燥、把RDDs作為集合(sets)對(duì)待瘫筐。還有actions者諸如把RDDs物化(materializing)為集合(collections)、對(duì)RDD進(jìn)行計(jì)算統(tǒng)計(jì)铐姚、從RDD中取樣出固定數(shù)目的元素策肝,把RDD保存到外部存儲(chǔ)。細(xì)節(jié)內(nèi)容隐绵,查看文檔之众。
MapReduce in Spark
盡管名字很有暗示性,Spark中的map()和reduce()操作依许,與Hadoop MapReduce中相同名字的函數(shù)棺禾,不是直接對(duì)應(yīng)的。Hadoop MapReduce中的map和reduce的通常形式是:
map: (K1, V1) -> list(K2, V2)
reduce: (K2, list(V2)) -> list(K3, V3)
從list標(biāo)記可以看出峭跳,這兩個(gè)函數(shù)都可以返回多個(gè)輸出對(duì)膘婶。這種操作在Spark(Scala)中被實(shí)現(xiàn)為flatMap()缺前,與map()很像,但是移除了一層嵌套:
scala> val l = List(1, 2, 3)
l: List[Int] = List(1, 2, 3)
scala> l.map(a => List(a))
res0: List[List[Int]] = List(List(1), List(2), List(3))
scala> l.flatMap(a => List(a))
res1: List[Int] = List(1, 2, 3)
有一種樸素的方式悬襟,可以在Spark中模擬Hadoop MapReduce衅码。用兩個(gè)flatMap()操作,中間用groupByKey()和sortByKey()來執(zhí)行MapReduce的混洗(shuffle)和排序:
val input: RDD[(K1, V1)] = ...
val mapOutput: RDD[(K2, V2)] = input.flatMap(mapFn)
val shuffled: RDD[(K2, Iterable[V2])] = mapOutput.groupByKey().sortByKey()
val output: RDD[(K3, V3)] = shuffled.flatMap(reduceFn)
這里key的類型K2要繼承自Scala的Ordering類型脊岳,以滿足sortByKey()逝段。
這個(gè)例子可以幫助我們理解MapReduce和Spark的關(guān)系,但是不能盲目應(yīng)用逸绎。首先惹恃,這里的語(yǔ)義和Hadoop的MapReduce有微小的差別,sortByKey()執(zhí)行的是全量排序棺牧。使用repartitionAndSortWithinPartitions()方法來執(zhí)行部分排序巫糙,可以避免這個(gè)問題。然而颊乘,這樣還是無(wú)效的参淹,因?yàn)镾park有兩次混洗的過程(一次groupByKey(),一次sort)乏悄。
與其重造MapReduce浙值,不如僅僅使用那些你實(shí)際需要的操作。比如檩小,如果不需要按key排序开呐,你可以省略sortByKey(),這在Hadoop MapReduce中是不可能的规求。
同樣的筐付,大多數(shù)情況下groupByKey()太普遍了。通常只在聚合數(shù)據(jù)時(shí)需要混洗阻肿,因此應(yīng)該使用reduceByKey()瓦戚,foldByKey(),或者aggregateByKey()丛塌,這些函數(shù)比groupByKey()更有效率较解,因?yàn)樗鼈兛梢栽趍ap任務(wù)中作為combiner運(yùn)行。最后赴邻,flatMap()可能總是不需要的印衔,如果總有一個(gè)返回值,map()是首選姥敛,如果有0或1個(gè)返回值当编,使用filter()。
Aggregation transformations
根據(jù)key來聚合鍵值對(duì)RDD的三個(gè)主要的transformations是reduceByKey(),foldByKey()忿偷,和aggregateByKey()金顿。它們的工作方式稍有不同,但它們都是根據(jù)鍵來聚合值的鲤桥,為每一個(gè)鍵生成一個(gè)單獨(dú)的值揍拆。對(duì)應(yīng)的actions是reduce(),fold()和aggregate()茶凳,它們以類似的方式運(yùn)行嫂拴,為整個(gè)RDD輸出一個(gè)單獨(dú)的值。
最簡(jiǎn)單的是reduceByKey()贮喧,它對(duì)成對(duì)兒的值反復(fù)執(zhí)行一個(gè)函數(shù)筒狠,直到生成一個(gè)單獨(dú)的值。例如:
val pairs: RDD[(String, Int)] =
sc.parallelize(Array(("a", 3), ("a", 1), ("b", 7), ("a", 5)))
val sums: RDD[(String, Int)] = pairs.reduceByKey(_+_)
assert(sums.collect().toSet === Set(("a", 9), ("b", 7)))
鍵 a 對(duì)應(yīng)的值箱沦,使用相加函數(shù)(+)聚合起來辩恼,(3 + 1)+ 5 = 9,而鍵 b 對(duì)應(yīng)的值只有一個(gè)谓形,因此不需要聚合灶伊。一般來說,這些操作是分布式的寒跳,在RDD的不同分區(qū)對(duì)應(yīng)的任務(wù)中分別執(zhí)行聘萨,因此這些函數(shù)要具有互換性和連接性。換句話說童太,操作的順序和分組是不重要的米辐。這種情況下,聚合函數(shù)可以這樣執(zhí)行 5 +(3 + 1)书释,或者 3 + (1 + 5)儡循,都會(huì)返回相同的結(jié)果。
在assert語(yǔ)句中使用的三聯(lián)相等操作符(===)征冷,來自ScalaTest,比通常的 == 操作符提供更多有用的失敗信息誓琼。
下面是用foldByKey()來執(zhí)行相同的操作:
val sums: RDD[(String, Int)] = pairs.foldByKey(0)(_+_)
assert(sums.collect().toSet === Set(("a", 9), ("b", 7)))
注意到這次我們需要提供一個(gè)零值检激,整數(shù)相加時(shí)是0,但如果是別的類型和操作腹侣,零值將是其他不同的東西叔收。這一次,鍵 a 對(duì)應(yīng)的值聚合的方式是((0 + 3)+ 1)+ 5)= 9(也可能是其他的順序傲隶,不過加 0 總是第一個(gè)操作)饺律。對(duì)于 b 是0 + 7 = 7。
使用foldByKey()跺株,并不比reduceByKey()更強(qiáng)或更弱复濒。特別地脖卖,也不能改變聚合結(jié)果的值類型。為此我們需要aggregateByKey()巧颈,例如畦木,我們可以把那些整數(shù)值聚合到一個(gè)集合里:
val sets: RDD[(String, HashSet[Int])] =
pairs.aggregateByKey(new HashSet[Int])(_+=_, _++=_)
assert(sets.collect.toSet === Set(("a", Set(1, 3, 5)), ("b", Set(7))))
集合相加時(shí),零值是空集合砸泛,因此我們用new HashSet[Int]來創(chuàng)建一個(gè)新的可變集合十籍。我們需要向aggregateByKey()提供兩個(gè)函數(shù)作為參數(shù)。第一個(gè)函數(shù)用來控制怎樣把一個(gè)Int和一個(gè)HashSet[Int]相加唇礁,本例中我們用加等函數(shù) += 把整數(shù)加到集合里面(+ 會(huì)返回一個(gè)新集合勾栗,舊集合不會(huì)改變)。
第二個(gè)函數(shù)用來控制怎樣把兩個(gè)HashSet[Int]相加(這種情況發(fā)生在map任務(wù)的combiner執(zhí)行之后盏筐,reduce任務(wù)把兩個(gè)分區(qū)聚合之時(shí))围俘,這里我們使用 ++= 把第二個(gè)集合的所有元素加到第一個(gè)集合里。
對(duì)于鍵 a机断,操作的順序可能是:
(( ? + 3) + 1) + 5) = (1, 3, 5)
或者:
( ? + 3) + 1) ++ ( ? + 5) = (1, 3) ++ (5) = (1, 3, 5)
如果Spark使用了組合器(combiner)的話楷拳。
轉(zhuǎn)變后的RDD可以持久化到內(nèi)存中,因此后續(xù)的操作效率很高吏奸。