RDD 操作一 基礎 洽损,放入方法庞溜,閉包,輸出元素,使用 Key-Value 工作
原文地址: http://spark.apache.org/docs/latest/programming-guide.html
僅限交流使用碑定,轉載請注明出處流码。如有錯誤,歡迎指出延刘!
Henvealf/譯
RDD 提供了兩種類型的操作:
- transformations :從已經存在的 RDD 中創(chuàng)建出一個新的 RDD漫试。
- actions: 在集群上運行了一個計算后,最終返回一個值給設備中的程序访娶。
transformation 的一個例子就是map商虐,對 RDD 中的每個元素進行相同的操作,返回一個新的 RDD崖疤。
action 的一個例子就是 reduce,使用相同的函數來聚合 RDD 中的元素典勇。
在 Spark 中劫哼,所有的 transformation 都是懶惰的(lazy),以至于他不會立刻計算出他們結果割笙。代替的是权烧,他們僅僅記住這個 transformation 應用在哪些基礎的數據集上(比如一個文件)。transformation 計算僅僅是在程序中的一個動作需要一個返回值的時候才開始伤溉。這個設計讓能夠讓 Spark 更加高效般码。舉個例子,我們能夠意識到一個 map 生成的數據集只會用在一個 reduce 上乱顾,并且僅僅返回 reduce 的結果給設備板祝,而不會是一個 map 后的很大的數據集給設備。
默認情況下走净,在你每次重新運行一個通過轉換(transforme)得到的RDD的action 的時候券时,轉換每次都可能重新再運行一次孤里。然而,你也可以使用 persist(或者 cache)方法將一個 RDD 持久化在內存中橘洞,這樣就能讓 Spark 把這些元素維持在集群中捌袜,讓下一次的存取速度變得飛快。這里也同樣支持持久話 RDD 在磁盤中炸枣,或者備份在多個節(jié)點中虏等。
基礎
val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
lineLengths.persist()
val totalLength = lineLengths.reduce((a, b) => a + b)
往 Spark 中放入方法
scala
Spark 的 API 最信任的就是在集群上運行的方法上放方法。下面有兩個建議:
- 使用匿名方法語法适肠,能夠減少代碼量霍衫。
- 靜態(tài)化在全局的單例對象上的函數,就是定義一個object,可以把它理解為直接創(chuàng)建了一個對象,不需要 new 就可以使用迂猴。也可以把他理解為一個類慕淡,而其中的函數都默認為靜態(tài)的。里面有你用到所有函數/方法沸毁。比如峰髓,你可以定義 object MyFunctions ,之后通過 MyFunxtions 來使用方法:
object MyFunctions {
def func1(s: String): String = { ... }
}
myRdd.map(MyFunctions.func1)
注意你也可能傳入一個引用給一個類(class) 中的函數(與單例 object 的做法是相反的)息尺,他需要向 Spark 中傳入包含了要使用的方法的類的對象携兵。比如下面:
class MyClass {
def func1(s: String): String = { ... }
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}
著這里,我們要 new 一個 MyClass 的對象才能使用 doSuff 方法搂誉。我們要將這一整個對象傳送入集群中才可以徐紧,然后書寫方式和 rdd.map(x => this.func1(x)) 很像。
用很相似的方式炭懊,外部的對象存取字段就會引用到整個對象:
class MyClass {
val field = "Hello"
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}
可以發(fā)現這樣其實就是 rdd.map(x => this.field + x)并级, 這樣在外部就得到了他的 this 引用,這樣很不安全侮腹,容易出錯嘲碧,為了解決這個問題,下面有一個簡單方式, 就是先將字段賦給一個局部變量中:
def doStuff(rdd: RDD[String]): RDD[String] = {
val field_ = this.field
rdd.map(x => field_ + x)
}
理解閉包(closures)
在 Spark 中比較難理解的就是當在集群上運行代碼的時候父阻,變量和方法的作用域與生命周期愈涩。在其作用域之外修改變量的 RDD 操作可以是一個混淆的常用資源( can be a frequent source of confusion)。下面我們使用 foreach 來遞增一個計數器加矛,相同的問題也能同樣出現在其他的操作上履婉。
例子
考慮原生的 RDD 元素的加和操作,這個操作在不同的虛擬機上執(zhí)行可能會呈現出不同的行為斟览。這個普通的例子是將 Spark 運行在 local 模式下(--master = local[n])的情況與運行在集群上的情況做比較(通過 park-submit 給 YARN)毁腿。
Scala
var counter = 0;
var rdd = sc.parallelize(data)
// Wrong: 不會執(zhí)行他
rdd.foreach(x => counter + x)
println("Counter: " + counter)
本地模式 Vs. 集群模式
在這之前先明確一下一些概念:
Driver: 驅動器,一個 job 只有一個,主要負責 job 的解析狸棍,與 task 的調度等身害。
Executor:執(zhí)行器,實際運行 task 的地方草戈,一個 job 有多個塌鸯。
上面這段代碼的行為是不確定的,可能不像預想中那樣工作唐片。為了執(zhí)行 job 丙猬,Spark 會將處理 RDD 的操作拆分到許多 task 中,且每一個 task 被一個執(zhí)行器執(zhí)行费韭。在執(zhí)行之前茧球, Spark 會計算 task 的閉包。閉包是一些必須讓執(zhí)行器可見的變量和方法星持,這樣執(zhí)行器才能執(zhí)行他們在 RDD 上的操作(這里就是 foreach)抢埋。這個閉包是被序列化并傳送到了每個執(zhí)行器。
在集群上的變量會立刻被送到每個執(zhí)行器中督暂,事實上揪垄,當 counter 被引用使用在 foreach 方法里面時,他就不再是驅動(driver)節(jié)點上的 counter 了逻翁。也就是說在驅動節(jié)點的內存中也一直會有一個 counter 饥努,可他對執(zhí)行器來說,已經不可見了八回。執(zhí)行器僅僅能看到序列化了的閉包中的拷貝酷愧。事實上, 驅動器上的 final 的 counter 的值在操作執(zhí)行的時候一直都是0缠诅,執(zhí)行器操作的只是引用的序列化的閉包中的值溶浴。
在 local 模式下,foreach 方法實際會運行在作為驅動器的 JVM 中管引,也就是說運行程序的 JVM 和運行驅動器的 JVM 是同一個戳葵。所以操作就會引用到原始的 counter, counter 的值就被改變了汉匙。
如果想要確保現在說的這種情況有確定的行為生蚁,一種就是使用一個 Accumulator(積累器)噩翠。Accumulator 常常使用于在執(zhí)行被分片到不同的 worker 時需要安全的對變量進行更新的情況。 Accumulator 以后詳細介紹邦投。
一般情況下伤锚,閉包--構建循環(huán)或者局部函數,應該不要用于改變一些全局的狀態(tài)志衣。 Spark 不能確定或者保證修改閉包之外的的對象引用時的行為屯援。一些代碼在本地模式下運行的好好的猛们,在放到集群上運行時就可能得不到期望的結果。如果需要使用全局的聚合狞洋,就使用一個 Accumulator 來代替他弯淘。
輸出一個 RDD 的元素
另一個老事件就是試圖使用 rdd.foreach(println) 或者 rdd.map(println) 打印出元素的值。在一個機器上吉懊,輸出 RDD 所有的元素的將會生成你期望的輸出庐橙。然而,在 cluster 模式下是借嗽,stdout 會由執(zhí)行器來調用态鳖,寫在了執(zhí)行器的標準輸出上,而不是驅動器上恶导。所以在驅動器上你就看不到 stdout 的輸出類了浆竭。
為了在驅動器上輸出所有的元素,一個你可以使用 collect 方法惨寿,先把這個 RDD 帶到驅動器節(jié)點上: rdd.collect().foreach(println)邦泄。不過這中方法容易造成內存不足。因為 collect() 會把 RDD 實體拿進一個單獨的機器中缤沦;如果僅僅需要輸出 RDD 的一小部分元素虎韵,最安全的方式是使用 take(): rdd.take().foreach(println).
使用 Key-Value 工作
Scala
RDDs 中包括任何類型的對象,有一些特殊的操作是能用于 RDDs 的鍵值對上缸废。 最普遍的就是集群上的 “洗牌” 過程包蓝,就是使用 key 來進行分組和聚合。
在 Scala 中企量,這些操作在 包含 ** Tuple2** (二元組)對象的 RDDs 中是自動(直接测萎?)可用的(在本語言中,之間寫一個(a,b) 就能創(chuàng)建 tuples )届巩。鍵值對操作可以在 PairRDDFunction 中得到硅瞧,他是自動包裝了一個元組RDD。
舉個例子,下面的代碼就在鍵值對上使用 reduceByKey 操作來計算當前文件的行數恕汇。
val lines = sc.textFile("data.txt")
val pairs = line.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
我們也可以使用 counts.sortByKey(),在這個例子中腕唧,機會按字母排序這些簡直對,最后 count.collect() 就將他們帶會驅動器程序瘾英,作為一個對象數值使用枣接。
注意 :當你使用自定義的對象來作為鍵值對的鍵值,你必須保證這個自定義的該對象的 equals 方法和與之聯合匹配的 hashCode() 方法缺谴。