RDD編程
RDD (Resilient Distributed Dataset 彈性分布式數(shù)據(jù)集)是Spark中最核心的概念沦零。學(xué)好了RDD就理解了Spark餐曼,這一章就是通過一些最簡單的例子來實(shí)現(xiàn)RDD的Scala編程默辨。RDD的核心操作有三個德频,創(chuàng)建,轉(zhuǎn)化操作缩幸,行動操作壹置。
1. 概述
下面這個最簡單的例子代表了RDD的一系列典型操作。下面為Python代碼表谊,Scala也基本一樣钞护。
lines = sc.textFile("README.md") //創(chuàng)建
pythonLines = lines.filter(lambda line: "Python" in line) //轉(zhuǎn)化操作
pythonLines.first() //行動操作
第一行創(chuàng)建了一個RDD
第二行對RDD執(zhí)行了一個轉(zhuǎn)化操作,返回的還是RDD爆办。注意Spark 只會惰性計(jì)算這些 RDD难咕,意味著程序只是記錄了你的操作,但并不會實(shí)際執(zhí)行它距辆,文件并沒有被讀取余佃,filter操作并沒有執(zhí)行。RDD 的轉(zhuǎn)化操作是返回一 個新的RDD 的操作跨算,比如 map()和filter()咙冗。
第三行對應(yīng)了行動操作,此時(shí)才會實(shí)際讀取文件漂彤,執(zhí)行操作雾消。行動操作則是向驅(qū)動器程序返回結(jié)果或把結(jié)果寫入外部系統(tǒng)的操作灾搏,會觸發(fā)實(shí)際的計(jì)算,比如count()和first()立润。而返回的也會是類似于Int狂窑,Array,String等數(shù)據(jù)類型桑腮。
2. 創(chuàng)建RDD
第一種. 讀取外部數(shù)據(jù)集
val lines = sc.textFile("README.md")
第二種. 利用已有集合
創(chuàng)建 RDD 最簡單的方式就是把程序中一個已有的集合傳給 SparkContext 的 parallelize() 方法
除了開發(fā)原型和測試時(shí)泉哈,這種方式用得并不多,畢竟這種方式需要把你的整個數(shù)據(jù)集先放在一臺機(jī)器的內(nèi)存中破讨。
val lines = sc.parallelize(List("pandas", "i like pandas"))
3. 轉(zhuǎn)化操作
針對各個元素的轉(zhuǎn)化操作
- map()接收一個函數(shù)丛晦,把這個函數(shù)用于 RDD中的每個元素,將函數(shù)的返回結(jié)果作為結(jié)果提陶。
val input = sc.parallelize(List(1, 2, 3, 4))
val result = input.map(x => x * x)
如果有一個字符串 RDD烫沙,并且我們的map() 函數(shù)是用來把字符串解析并返回一個Double值的,那么此時(shí)我們的輸入 RDD 類型就是 RDD[String]隙笆,而輸出類型是RDD[Double]锌蓄。
- filter() 則接收一個函數(shù),并將 RDD 中滿足該函數(shù)的元素放入新的 RDD 中返回撑柔,注意傳遞的函數(shù)返回值必須為布爾型瘸爽。
newlines = lines.filter(line => line.contains("error"))
- flatMap()的函數(shù)被分別應(yīng)用到了輸入RDD的每個元素上。不過返回的不是一個元素铅忿,而是一個返回值序列的迭代器剪决。輸出的 RDD 倒不是由迭代器組成的。我們得到的是一個包含各個迭代器可訪問的所有元素的RDD檀训。
val lines = sc.parallelize(List("hello world", "hi"))
val words = lines.flatMap(line => line.split(" "))
words.first() // 返回"hello"
- distinct()操作可以去除重復(fù)元素柑潦,不過這涉及到了數(shù)據(jù)混洗,效率十分低下肢扯。
偽集合操作妒茬,對兩個RDD操作
下面的四個方法分別實(shí)現(xiàn)了并集担锤,交集蔚晨,差集和笛卡爾積。
val a = sc.parallelize(List(1, 2, 3))
val b = sc.parallelize(List(3, 4, 5))
union(other)是集合操作肛循,它會返回一個包含兩個RDD中所有元素的RDD铭腕。union不會去重。
intersection(other) 方法多糠,只返回兩個RDD中都有的元素累舷。注意intersection()也會涉及到數(shù)據(jù)混洗,效率十分低下夹孔。
subtract(other) 函數(shù)接收另一個 RDD作為參數(shù)被盈,返回一個由只存在于第一個 RDD 中而不存在于第二個 RDD 中的所有元素組成的 RDD析孽。
cartesian(other) 轉(zhuǎn)化操作會返回 所有可能的(a, b)對
a.cartesian(b) //{(1, 3), (1, 4), ... (3, 5)},注意返回的還是RDD哦
4. 行動操作
collect()函數(shù)只怎,可以用來獲取整個RDD中的數(shù)據(jù)袜瞬。只有當(dāng)你的整個數(shù)據(jù)集能在單臺機(jī)器的內(nèi)存中放得下時(shí),才能使用collect()身堡,因此邓尤,collect()不能用在大規(guī)模數(shù)據(jù)集上。
reduce()函數(shù)贴谎,它接收一個函數(shù)作為參數(shù)汞扎,這個函數(shù)要操作兩個RDD 的元素類型的數(shù)據(jù)并返回一個同樣類型的新元素。
val sum = rdd.reduce((x, y) => x + y) //這個操作實(shí)現(xiàn)了求和
- fold()和reduce()類似擅这,接收一個與reduce()接收的函數(shù)簽名相同的函數(shù)澈魄,再加上一個 “初始值”來作為每個分區(qū)第一次調(diào)用時(shí)的結(jié)果。
rdd.fold(0)((x, y) => x + y)
- aggregate()函數(shù)則把我們從返回值類型必須與所操作的 RDD 類型相同的限制中解放出來蕾哟。例如一忱,在計(jì)算平均值時(shí),需要記錄遍歷過程中的計(jì)數(shù)以及元素的數(shù)量谭确,這就需要我們返回一個二元組帘营。關(guān)于具體使用方法看這里。
val result = input.aggregate((0, 0))(
(acc, value) => (acc._1 + value, acc._2 + 1),
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
val avg = result._1 / result._2.toDouble
take(n)返回RDD中的n個元素逐哈,并且嘗試只訪問盡量少的分區(qū)芬迄,因此該操作會得到一個不均衡的集合。
top()從RDD中獲取前幾個元素昂秃。
foreach()行動操作來對RDD中的每個元素進(jìn)行操作禀梳,而不需要把RDD發(fā)回本地。foreach沒有返回值肠骆。
line.foreach(println)
8. 有些函數(shù)只能用于特定類型的RDD算途,比如mean()和variance()只能用在數(shù)值RDD上, 而join()只能用在鍵值對RDD上蚀腿。
## 5. persist(緩存)
Spark RDD是惰性求值的嘴瓤,而有時(shí)我們希望能多次使用同一個RDD。如果簡單地對RDD調(diào)用行動操作莉钙,Spark 每次都會重算RDD以及它的所有依賴廓脆。這在迭代算法中消耗格外大,因?yàn)榈惴ǔ3啻问褂猛唤M數(shù)據(jù)磁玉。使用persist就可以解決這個問題停忿,傳入的參數(shù)決定了緩存級別。
```scala
val result = input.map(x => x * x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
println(result.collect().mkString(","))
6. 一個需要注意的點(diǎn)
當(dāng)你傳遞的對象是某個對象的成員蚊伞,或者包含了對某個對象中一個字段的引用時(shí)(例 如 self.field)席赂,Spark 就會把整個對象發(fā)到工作節(jié)點(diǎn)上吮铭,這可能比你想傳遞的東西大得多
python正確寫法
class WordFunctions(object):
...
def getMatchesNoReference(self, rdd):
# 安全:只把需要的字段提取到局部變量中 query = self.query
return rdd.filter(lambda x: query in x)
Scala正確寫法
def getMatchesNoReference(rdd: RDD[String]): RDD[String] = {
// 安全:只把我們需要的字段拿出來放入局部變量中 val query_ = this.query
rdd.map(x => x.split(query_))
}
7. Spark與Hadoop的一個區(qū)別
在類似Hadoop MapReduce的系統(tǒng)中,開發(fā)者常陈#花費(fèi)大量時(shí)間考慮如何把操作組合到一起沐兵,以減少 MapReduce 的周期數(shù)。而在Spark中便监,寫出一個非常復(fù)雜的映射并不見得能比使用很多簡單的連續(xù)操作獲得好很多的性能扎谎。根本原因就是Spark的轉(zhuǎn)化操作是惰性操作。