從零開始學(xué)習(xí)Spark(三)RDD編程

RDD編程

RDD (Resilient Distributed Dataset 彈性分布式數(shù)據(jù)集)是Spark中最核心的概念沦零。學(xué)好了RDD就理解了Spark餐曼,這一章就是通過一些最簡單的例子來實(shí)現(xiàn)RDD的Scala編程默辨。RDD的核心操作有三個德频,創(chuàng)建,轉(zhuǎn)化操作缩幸,行動操作壹置。

1. 概述

下面這個最簡單的例子代表了RDD的一系列典型操作。下面為Python代碼表谊,Scala也基本一樣钞护。

lines = sc.textFile("README.md") //創(chuàng)建
pythonLines = lines.filter(lambda line: "Python" in line) //轉(zhuǎn)化操作
pythonLines.first() //行動操作
  • 第一行創(chuàng)建了一個RDD

  • 第二行對RDD執(zhí)行了一個轉(zhuǎn)化操作,返回的還是RDD爆办。注意Spark 只會惰性計(jì)算這些 RDD难咕,意味著程序只是記錄了你的操作,但并不會實(shí)際執(zhí)行它距辆,文件并沒有被讀取余佃,filter操作并沒有執(zhí)行。RDD 的轉(zhuǎn)化操作是返回一 個新的RDD 的操作跨算,比如 map()和filter()咙冗。

  • 第三行對應(yīng)了行動操作,此時(shí)才會實(shí)際讀取文件漂彤,執(zhí)行操作雾消。行動操作則是向驅(qū)動器程序返回結(jié)果或把結(jié)果寫入外部系統(tǒng)的操作灾搏,會觸發(fā)實(shí)際的計(jì)算,比如count()和first()立润。而返回的也會是類似于Int狂窑,Array,String等數(shù)據(jù)類型桑腮。

2. 創(chuàng)建RDD

第一種. 讀取外部數(shù)據(jù)集

val lines = sc.textFile("README.md")

第二種. 利用已有集合

創(chuàng)建 RDD 最簡單的方式就是把程序中一個已有的集合傳給 SparkContext 的 parallelize() 方法

除了開發(fā)原型和測試時(shí)泉哈,這種方式用得并不多,畢竟這種方式需要把你的整個數(shù)據(jù)集先放在一臺機(jī)器的內(nèi)存中破讨。

val lines = sc.parallelize(List("pandas", "i like pandas"))

3. 轉(zhuǎn)化操作

針對各個元素的轉(zhuǎn)化操作

  1. map()接收一個函數(shù)丛晦,把這個函數(shù)用于 RDD中的每個元素,將函數(shù)的返回結(jié)果作為結(jié)果提陶。
 val input = sc.parallelize(List(1, 2, 3, 4))
 val result = input.map(x => x * x)

如果有一個字符串 RDD烫沙,并且我們的map() 函數(shù)是用來把字符串解析并返回一個Double值的,那么此時(shí)我們的輸入 RDD 類型就是 RDD[String]隙笆,而輸出類型是RDD[Double]锌蓄。

  1. filter() 則接收一個函數(shù),并將 RDD 中滿足該函數(shù)的元素放入新的 RDD 中返回撑柔,注意傳遞的函數(shù)返回值必須為布爾型瘸爽。
newlines = lines.filter(line => line.contains("error")) 
  1. flatMap()的函數(shù)被分別應(yīng)用到了輸入RDD的每個元素上。不過返回的不是一個元素铅忿,而是一個返回值序列的迭代器剪决。輸出的 RDD 倒不是由迭代器組成的。我們得到的是一個包含各個迭代器可訪問的所有元素的RDD檀训。
val lines = sc.parallelize(List("hello world", "hi")) 
val words = lines.flatMap(line => line.split(" ")) 
words.first() // 返回"hello"
  1. distinct()操作可以去除重復(fù)元素柑潦,不過這涉及到了數(shù)據(jù)混洗,效率十分低下肢扯。

偽集合操作妒茬,對兩個RDD操作

下面的四個方法分別實(shí)現(xiàn)了并集担锤,交集蔚晨,差集和笛卡爾積。

val a = sc.parallelize(List(1, 2, 3))
val b = sc.parallelize(List(3, 4, 5))
  1. union(other)是集合操作肛循,它會返回一個包含兩個RDD中所有元素的RDD铭腕。union不會去重。

  2. intersection(other) 方法多糠,只返回兩個RDD中都有的元素累舷。注意intersection()也會涉及到數(shù)據(jù)混洗,效率十分低下夹孔。

  3. subtract(other) 函數(shù)接收另一個 RDD作為參數(shù)被盈,返回一個由只存在于第一個 RDD 中而不存在于第二個 RDD 中的所有元素組成的 RDD析孽。

  4. cartesian(other) 轉(zhuǎn)化操作會返回 所有可能的(a, b)對

a.cartesian(b) //{(1, 3), (1, 4), ... (3, 5)},注意返回的還是RDD哦

4. 行動操作

  1. collect()函數(shù)只怎,可以用來獲取整個RDD中的數(shù)據(jù)袜瞬。只有當(dāng)你的整個數(shù)據(jù)集能在單臺機(jī)器的內(nèi)存中放得下時(shí),才能使用collect()身堡,因此邓尤,collect()不能用在大規(guī)模數(shù)據(jù)集上。

  2. reduce()函數(shù)贴谎,它接收一個函數(shù)作為參數(shù)汞扎,這個函數(shù)要操作兩個RDD 的元素類型的數(shù)據(jù)并返回一個同樣類型的新元素。

val sum = rdd.reduce((x, y) => x + y) //這個操作實(shí)現(xiàn)了求和
  1. fold()和reduce()類似擅这,接收一個與reduce()接收的函數(shù)簽名相同的函數(shù)澈魄,再加上一個 “初始值”來作為每個分區(qū)第一次調(diào)用時(shí)的結(jié)果。
rdd.fold(0)((x, y) => x + y)
  1. aggregate()函數(shù)則把我們從返回值類型必須與所操作的 RDD 類型相同的限制中解放出來蕾哟。例如一忱,在計(jì)算平均值時(shí),需要記錄遍歷過程中的計(jì)數(shù)以及元素的數(shù)量谭确,這就需要我們返回一個二元組帘营。關(guān)于具體使用方法看這里
val result = input.aggregate((0, 0))(
                     (acc, value) => (acc._1 + value, acc._2 + 1),
                     (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
val avg = result._1 / result._2.toDouble
  1. take(n)返回RDD中的n個元素逐哈,并且嘗試只訪問盡量少的分區(qū)芬迄,因此該操作會得到一個不均衡的集合。

  2. top()從RDD中獲取前幾個元素昂秃。

  3. foreach()行動操作來對RDD中的每個元素進(jìn)行操作禀梳,而不需要把RDD發(fā)回本地。foreach沒有返回值肠骆。

line.foreach(println)

8. 有些函數(shù)只能用于特定類型的RDD算途,比如mean()和variance()只能用在數(shù)值RDD上, 而join()只能用在鍵值對RDD上蚀腿。

## 5. persist(緩存)

Spark RDD是惰性求值的嘴瓤,而有時(shí)我們希望能多次使用同一個RDD。如果簡單地對RDD調(diào)用行動操作莉钙,Spark 每次都會重算RDD以及它的所有依賴廓脆。這在迭代算法中消耗格外大,因?yàn)榈惴ǔ3啻问褂猛唤M數(shù)據(jù)磁玉。使用persist就可以解決這個問題停忿,傳入的參數(shù)決定了緩存級別。

```scala
  val result = input.map(x => x * x)
  result.persist(StorageLevel.DISK_ONLY)
  println(result.count())
  println(result.collect().mkString(","))

6. 一個需要注意的點(diǎn)

當(dāng)你傳遞的對象是某個對象的成員蚊伞,或者包含了對某個對象中一個字段的引用時(shí)(例 如 self.field)席赂,Spark 就會把整個對象發(fā)到工作節(jié)點(diǎn)上吮铭,這可能比你想傳遞的東西大得多

python正確寫法

class WordFunctions(object):
        ...
  def getMatchesNoReference(self, rdd):
  # 安全:只把需要的字段提取到局部變量中 query = self.query
  return rdd.filter(lambda x: query in x)

Scala正確寫法

def getMatchesNoReference(rdd: RDD[String]): RDD[String] = {
  // 安全:只把我們需要的字段拿出來放入局部變量中 val query_ = this.query
  rdd.map(x => x.split(query_))
}

7. Spark與Hadoop的一個區(qū)別

在類似Hadoop MapReduce的系統(tǒng)中,開發(fā)者常陈#花費(fèi)大量時(shí)間考慮如何把操作組合到一起沐兵,以減少 MapReduce 的周期數(shù)。而在Spark中便监,寫出一個非常復(fù)雜的映射并不見得能比使用很多簡單的連續(xù)操作獲得好很多的性能扎谎。根本原因就是Spark的轉(zhuǎn)化操作是惰性操作。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末烧董,一起剝皮案震驚了整個濱河市毁靶,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌逊移,老刑警劉巖预吆,帶你破解...
    沈念sama閱讀 211,194評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異胳泉,居然都是意外死亡拐叉,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,058評論 2 385
  • 文/潘曉璐 我一進(jìn)店門扇商,熙熙樓的掌柜王于貴愁眉苦臉地迎上來凤瘦,“玉大人,你說我怎么就攤上這事案铺∈呓妫” “怎么了?”我有些...
    開封第一講書人閱讀 156,780評論 0 346
  • 文/不壞的土叔 我叫張陵控汉,是天一觀的道長笔诵。 經(jīng)常有香客問我,道長姑子,這世上最難降的妖魔是什么乎婿? 我笑而不...
    開封第一講書人閱讀 56,388評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮街佑,結(jié)果婚禮上谢翎,老公的妹妹穿的比我還像新娘。我一直安慰自己舆乔,他們只是感情好岳服,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,430評論 5 384
  • 文/花漫 我一把揭開白布剂公。 她就那樣靜靜地躺著希俩,像睡著了一般。 火紅的嫁衣襯著肌膚如雪纲辽。 梳的紋絲不亂的頭發(fā)上颜武,一...
    開封第一講書人閱讀 49,764評論 1 290
  • 那天璃搜,我揣著相機(jī)與錄音,去河邊找鬼鳞上。 笑死这吻,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的篙议。 我是一名探鬼主播唾糯,決...
    沈念sama閱讀 38,907評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼鬼贱!你這毒婦竟也來了移怯?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,679評論 0 266
  • 序言:老撾萬榮一對情侶失蹤这难,失蹤者是張志新(化名)和其女友劉穎舟误,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體姻乓,經(jīng)...
    沈念sama閱讀 44,122評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡嵌溢,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,459評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了蹋岩。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片赖草。...
    茶點(diǎn)故事閱讀 38,605評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖剪个,靈堂內(nèi)的尸體忽然破棺而出疚顷,到底是詐尸還是另有隱情,我是刑警寧澤禁偎,帶...
    沈念sama閱讀 34,270評論 4 329
  • 正文 年R本政府宣布腿堤,位于F島的核電站,受9級特大地震影響如暖,放射性物質(zhì)發(fā)生泄漏笆檀。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,867評論 3 312
  • 文/蒙蒙 一盒至、第九天 我趴在偏房一處隱蔽的房頂上張望酗洒。 院中可真熱鬧,春花似錦枷遂、人聲如沸樱衷。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,734評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽矩桂。三九已至,卻和暖如春痪伦,著一層夾襖步出監(jiān)牢的瞬間侄榴,已是汗流浹背雹锣。 一陣腳步聲響...
    開封第一講書人閱讀 31,961評論 1 265
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留癞蚕,地道東北人蕊爵。 一個月前我還...
    沈念sama閱讀 46,297評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像桦山,于是被迫代替她去往敵國和親攒射。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,472評論 2 348

推薦閱讀更多精彩內(nèi)容