翻譯:Hadoop權(quán)威指南之Spark-3

本文原始地址

Resilient Distributed Datasets

RDD是每個(gè)spark程序的核心竿拆,本節(jié)我們來看看更多細(xì)節(jié)埠戳。

Creation

創(chuàng)建RDD有三種方式:從一個(gè)內(nèi)存中的對(duì)象集合昂秃,被稱為并行化(parallelizing) 一個(gè)集合败晴;使用一個(gè)外部存儲(chǔ)(比如HDFS)的數(shù)據(jù)集阶淘;轉(zhuǎn)變(transform)已存在的RDD未桥。在對(duì)少量的輸入數(shù)據(jù)并行地進(jìn)行CPU密集型運(yùn)算時(shí)笔刹,第一種方式非常有用。例如冬耿,下面執(zhí)行從1到10的獨(dú)立運(yùn)算:

val params = sc.parallelize(1 to 10)
val result = params.map(performExpensiveComputation)

函數(shù)performExpensiveComputation并行處理輸入數(shù)據(jù)舌菜。并行性的級(jí)別由屬性spark.default.parallelism決定,該屬性的默認(rèn)值取決于Spark的運(yùn)行方式亦镶。本地運(yùn)行時(shí)日月,是本地機(jī)器的核心數(shù)量,集群運(yùn)行時(shí)缤骨,是集群中所有執(zhí)行(executor)節(jié)點(diǎn)的核心總數(shù)量爱咬。

可以為某特定運(yùn)算設(shè)置并行性級(jí)別,指定parallelize()方法的第二個(gè)參數(shù)即可:

sc.parallelize(1 to 10, 10)

創(chuàng)建RDD的第二種方式绊起,是創(chuàng)建一個(gè)指向外部數(shù)據(jù)集的引用精拟。我們已經(jīng)見過怎樣為一個(gè)文本文件創(chuàng)建String對(duì)象的RDD:

val text:RDD[String] = sc.textFile(inputPath)

路徑inputPath可以是任意的Hadoop文件系統(tǒng)路徑,比如本地文件系統(tǒng)或HDFS上的一個(gè)文件虱歪。內(nèi)部來看蜂绎,Spark使用舊的MapReduce API中的TextInputFormat來讀取這個(gè)文件。這就意味著文件切分行為與Hadoop是一樣的笋鄙,因此在HDFS的情況下师枣,一個(gè)Spark分區(qū)對(duì)應(yīng)一個(gè)HDFS塊(block)。這個(gè)默認(rèn)行為可以改變萧落,傳入第二個(gè)參數(shù)來請(qǐng)求一個(gè)特殊的切分?jǐn)?shù)量:

sc.textFile(inputPath, 10)

另外一個(gè)方法允許把多個(gè)文本文件作為一個(gè)整體來處理践美,返回的RDD中洗贰,是成對(duì)的string,第一個(gè)string是文件的路徑拨脉,第二個(gè)string是文件的內(nèi)容哆姻。因?yàn)槊總€(gè)文件都會(huì)加載進(jìn)內(nèi)存,所以這種方式僅僅適合于小文件:

val files:RDD[(String, String)] = sc.wholeTextFiles(inputPath)

Spark能夠處理文本文件以外的其他文件格式玫膀,比如矛缨,序列文件可以這樣讀入:

sc.sequenceFile[IntWritable, Text](inputPath)

注意這里指定序列文件的鍵和值的Writable類型的方式。對(duì)于常用的Writable類型帖旨,Spark能夠映射到Java中的等價(jià)物箕昭,因此我們可以使用等價(jià)的方式:

sc.sequenceFile[Int, String](inputPath)

從任意的Hadoop InputFormat來創(chuàng)建RDD,有兩種方式:基于文件的格式解阅,使用hadoopFile()落竹,接收一個(gè)路徑;其他格式货抄,比如HBase的TableInputFormat述召,使用hadoopRDD()。這些方法使用舊的MapReduce API蟹地。如果要用新的MapReduce API积暖,使用newAPIHadoopFile()和newAPIHadoopRDD()。下面是讀取Avro數(shù)據(jù)文件的示例怪与,使用特定的API和一個(gè)WeatherRecord類:

val job = new Job()
AvroJob.setInputKeySchema(job, WeatherRecord.getClassSchema)
val data = sc.newAPIHadoopFile(inputPath,
    classOf[AvroKeyInputFormat[WeatherRecord]],
    classOf[AvroKey[WeatherRecord]], classOf[NullWritable],
    job.getConfiguration)

除了路徑之外夺刑,newAPIHadoopFile()方法還需要InputFormat的類型、鍵的類型分别、值的類型遍愿,再加上Hadoop配置,該配置中帶有Avro模式耘斩,在第二行我們使用AvroJob幫助類做的設(shè)置沼填。

創(chuàng)建RDD的第三種方式,是轉(zhuǎn)變(transform)已存在的RDD煌往。

Transformations and Actions

Spark提供兩種類型的操作:transformationsactions倾哺。transformations從已存在的RDD生成新的RDD,而actions會(huì)觸發(fā)運(yùn)算并輸出結(jié)果——返回給用戶刽脖,或者保存到外部存儲(chǔ)羞海。

Actions會(huì)立刻產(chǎn)生影響,而transformations不會(huì)——它們是懶惰的曲管,它們不做任何工作却邓,直到action被觸發(fā)。下面的例子院水,把文本文件中的每一行轉(zhuǎn)為小寫:

val text = sc.textFile(inputPath)
val lower: RDD[String] = text.map(_.toLowerCase())
lower.foreach(println(_))

map()方法是個(gè)transformation腊徙,Spark內(nèi)部這樣處理:稍晚的時(shí)候简十,一個(gè)函數(shù)(這里是toLowerCase())會(huì)被調(diào)用,來處理RDD中的每一個(gè)元素撬腾。這個(gè)函數(shù)實(shí)際上沒有執(zhí)行螟蝙,直到foreach()方法(這是個(gè)action)被調(diào)用,然后Spark會(huì)運(yùn)行一個(gè)job民傻,讀取輸入的文件胰默,對(duì)文件中的每一行調(diào)用toLowerCase(),然后把結(jié)果寫到控制臺(tái)漓踢。

怎樣分辨一個(gè)操作究竟是transformation還是action呢牵署?一個(gè)方法是看它的返回類型:如果返回類型是RDD,這是個(gè)transformation喧半;否則就是action奴迅。當(dāng)你查閱RDD的文檔時(shí),這種方法是很有用的挺据。對(duì)RDD執(zhí)行的大多數(shù)操作取具,可以在RDD的文檔(org.apache.spark.rdd包)中找到,更多的操作在PairRDDFunctions里扁耐,這里包含了處理鍵值對(duì)RDD的transformations和actions者填。

Spark的庫(kù)中包含了豐富的操作,有transformations者諸如映射(mapping)做葵、分組(grouping)、聚合(aggregating)心墅、再分配(repartitioning)酿矢、取樣(sampling)、連接(joining)多個(gè)RDD怎燥、把RDDs作為集合(sets)對(duì)待瘫筐。還有actions者諸如把RDDs物化(materializing)為集合(collections)、對(duì)RDD進(jìn)行計(jì)算統(tǒng)計(jì)铐姚、從RDD中取樣出固定數(shù)目的元素策肝,把RDD保存到外部存儲(chǔ)。細(xì)節(jié)內(nèi)容隐绵,查看文檔之众。

MapReduce in Spark

盡管名字很有暗示性,Spark中的map()和reduce()操作依许,與Hadoop MapReduce中相同名字的函數(shù)棺禾,不是直接對(duì)應(yīng)的。Hadoop MapReduce中的map和reduce的通常形式是:

map: (K1, V1) -> list(K2, V2)
reduce: (K2, list(V2)) -> list(K3, V3)

從list標(biāo)記可以看出峭跳,這兩個(gè)函數(shù)都可以返回多個(gè)輸出對(duì)膘婶。這種操作在Spark(Scala)中被實(shí)現(xiàn)為flatMap()缺前,與map()很像,但是移除了一層嵌套:

scala> val l = List(1, 2, 3)
l: List[Int] = List(1, 2, 3)

scala> l.map(a => List(a))
res0: List[List[Int]] = List(List(1), List(2), List(3))

scala> l.flatMap(a => List(a))
res1: List[Int] = List(1, 2, 3)

有一種樸素的方式悬襟,可以在Spark中模擬Hadoop MapReduce衅码。用兩個(gè)flatMap()操作,中間用groupByKey()和sortByKey()來執(zhí)行MapReduce的混洗(shuffle)和排序:

val input: RDD[(K1, V1)] = ...
val mapOutput: RDD[(K2, V2)] = input.flatMap(mapFn)
val shuffled: RDD[(K2, Iterable[V2])] = mapOutput.groupByKey().sortByKey()
val output: RDD[(K3, V3)] = shuffled.flatMap(reduceFn)

這里key的類型K2要繼承自Scala的Ordering類型脊岳,以滿足sortByKey()逝段。

這個(gè)例子可以幫助我們理解MapReduce和Spark的關(guān)系,但是不能盲目應(yīng)用逸绎。首先惹恃,這里的語(yǔ)義和Hadoop的MapReduce有微小的差別,sortByKey()執(zhí)行的是全量排序棺牧。使用repartitionAndSortWithinPartitions()方法來執(zhí)行部分排序巫糙,可以避免這個(gè)問題。然而颊乘,這樣還是無(wú)效的参淹,因?yàn)镾park有兩次混洗的過程(一次groupByKey(),一次sort)乏悄。

與其重造MapReduce浙值,不如僅僅使用那些你實(shí)際需要的操作。比如檩小,如果不需要按key排序开呐,你可以省略sortByKey(),這在Hadoop MapReduce中是不可能的规求。

同樣的筐付,大多數(shù)情況下groupByKey()太普遍了。通常只在聚合數(shù)據(jù)時(shí)需要混洗阻肿,因此應(yīng)該使用reduceByKey()瓦戚,foldByKey(),或者aggregateByKey()丛塌,這些函數(shù)比groupByKey()更有效率较解,因?yàn)樗鼈兛梢栽趍ap任務(wù)中作為combiner運(yùn)行。最后赴邻,flatMap()可能總是不需要的印衔,如果總有一個(gè)返回值,map()是首選姥敛,如果有0或1個(gè)返回值当编,使用filter()。

Aggregation transformations

根據(jù)key來聚合鍵值對(duì)RDD的三個(gè)主要的transformations是reduceByKey(),foldByKey()忿偷,和aggregateByKey()金顿。它們的工作方式稍有不同,但它們都是根據(jù)鍵來聚合值的鲤桥,為每一個(gè)鍵生成一個(gè)單獨(dú)的值揍拆。對(duì)應(yīng)的actions是reduce(),fold()和aggregate()茶凳,它們以類似的方式運(yùn)行嫂拴,為整個(gè)RDD輸出一個(gè)單獨(dú)的值。

最簡(jiǎn)單的是reduceByKey()贮喧,它對(duì)成對(duì)兒的值反復(fù)執(zhí)行一個(gè)函數(shù)筒狠,直到生成一個(gè)單獨(dú)的值。例如:

val pairs: RDD[(String, Int)] =
    sc.parallelize(Array(("a", 3), ("a", 1), ("b", 7), ("a", 5)))
val sums: RDD[(String, Int)] = pairs.reduceByKey(_+_)
assert(sums.collect().toSet === Set(("a", 9), ("b", 7)))

鍵 a 對(duì)應(yīng)的值箱沦,使用相加函數(shù)(+)聚合起來辩恼,(3 + 1)+ 5 = 9,而鍵 b 對(duì)應(yīng)的值只有一個(gè)谓形,因此不需要聚合灶伊。一般來說,這些操作是分布式的寒跳,在RDD的不同分區(qū)對(duì)應(yīng)的任務(wù)中分別執(zhí)行聘萨,因此這些函數(shù)要具有互換性和連接性。換句話說童太,操作的順序和分組是不重要的米辐。這種情況下,聚合函數(shù)可以這樣執(zhí)行 5 +(3 + 1)书释,或者 3 + (1 + 5)儡循,都會(huì)返回相同的結(jié)果。

在assert語(yǔ)句中使用的三聯(lián)相等操作符(===)征冷,來自ScalaTest,比通常的 == 操作符提供更多有用的失敗信息誓琼。

下面是用foldByKey()來執(zhí)行相同的操作:

val sums: RDD[(String, Int)] = pairs.foldByKey(0)(_+_)
assert(sums.collect().toSet === Set(("a", 9), ("b", 7)))

注意到這次我們需要提供一個(gè)零值检激,整數(shù)相加時(shí)是0,但如果是別的類型和操作腹侣,零值將是其他不同的東西叔收。這一次,鍵 a 對(duì)應(yīng)的值聚合的方式是((0 + 3)+ 1)+ 5)= 9(也可能是其他的順序傲隶,不過加 0 總是第一個(gè)操作)饺律。對(duì)于 b 是0 + 7 = 7。

使用foldByKey()跺株,并不比reduceByKey()更強(qiáng)或更弱复濒。特別地脖卖,也不能改變聚合結(jié)果的值類型。為此我們需要aggregateByKey()巧颈,例如畦木,我們可以把那些整數(shù)值聚合到一個(gè)集合里:

val sets: RDD[(String, HashSet[Int])] =
    pairs.aggregateByKey(new HashSet[Int])(_+=_, _++=_)
assert(sets.collect.toSet === Set(("a", Set(1, 3, 5)), ("b", Set(7))))

集合相加時(shí),零值是空集合砸泛,因此我們用new HashSet[Int]來創(chuàng)建一個(gè)新的可變集合十籍。我們需要向aggregateByKey()提供兩個(gè)函數(shù)作為參數(shù)。第一個(gè)函數(shù)用來控制怎樣把一個(gè)Int和一個(gè)HashSet[Int]相加唇礁,本例中我們用加等函數(shù) += 把整數(shù)加到集合里面(+ 會(huì)返回一個(gè)新集合勾栗,舊集合不會(huì)改變)。

第二個(gè)函數(shù)用來控制怎樣把兩個(gè)HashSet[Int]相加(這種情況發(fā)生在map任務(wù)的combiner執(zhí)行之后盏筐,reduce任務(wù)把兩個(gè)分區(qū)聚合之時(shí))围俘,這里我們使用 ++= 把第二個(gè)集合的所有元素加到第一個(gè)集合里。

對(duì)于鍵 a机断,操作的順序可能是:
(( ? + 3) + 1) + 5) = (1, 3, 5)
或者:
( ? + 3) + 1) ++ ( ? + 5) = (1, 3) ++ (5) = (1, 3, 5)
如果Spark使用了組合器(combiner)的話楷拳。

轉(zhuǎn)變后的RDD可以持久化到內(nèi)存中,因此后續(xù)的操作效率很高吏奸。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末欢揖,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子奋蔚,更是在濱河造成了極大的恐慌她混,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,734評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件泊碑,死亡現(xiàn)場(chǎng)離奇詭異坤按,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)馒过,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,931評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門臭脓,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人腹忽,你說我怎么就攤上這事来累。” “怎么了窘奏?”我有些...
    開封第一講書人閱讀 164,133評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵嘹锁,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我着裹,道長(zhǎng)领猾,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,532評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮摔竿,結(jié)果婚禮上面粮,老公的妹妹穿的比我還像新娘。我一直安慰自己拯坟,他們只是感情好但金,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,585評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著郁季,像睡著了一般冷溃。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上梦裂,一...
    開封第一講書人閱讀 51,462評(píng)論 1 302
  • 那天似枕,我揣著相機(jī)與錄音,去河邊找鬼年柠。 笑死凿歼,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的冗恨。 我是一名探鬼主播答憔,決...
    沈念sama閱讀 40,262評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼掀抹!你這毒婦竟也來了虐拓?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,153評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤傲武,失蹤者是張志新(化名)和其女友劉穎蓉驹,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體揪利,經(jīng)...
    沈念sama閱讀 45,587評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡态兴,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,792評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了疟位。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片瞻润。...
    茶點(diǎn)故事閱讀 39,919評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖甜刻,靈堂內(nèi)的尸體忽然破棺而出绍撞,到底是詐尸還是另有隱情,我是刑警寧澤罢吃,帶...
    沈念sama閱讀 35,635評(píng)論 5 345
  • 正文 年R本政府宣布,位于F島的核電站昭齐,受9級(jí)特大地震影響尿招,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,237評(píng)論 3 329
  • 文/蒙蒙 一就谜、第九天 我趴在偏房一處隱蔽的房頂上張望怪蔑。 院中可真熱鬧黔州,春花似錦戳玫、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,855評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)弓坞。三九已至,卻和暖如春车荔,著一層夾襖步出監(jiān)牢的瞬間渡冻,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,983評(píng)論 1 269
  • 我被黑心中介騙來泰國(guó)打工忧便, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留族吻,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,048評(píng)論 3 370
  • 正文 我出身青樓珠增,卻偏偏與公主長(zhǎng)得像超歌,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子蒂教,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,864評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容