Spark基礎(chǔ)二之RDD編程
RDD彈性分布式數(shù)據(jù)集
Python牵啦、Java亚情、Scala中任意類型的對象或者用戶自定義的對象,形成的不可變的對象集合哈雏。每個(gè)RDD可以被分成多個(gè)分區(qū)運(yùn)行在集群中不同節(jié)點(diǎn)之上楞件。所謂彈性是指當(dāng)某個(gè)節(jié)點(diǎn)掛掉衫生,Spark可以利用重算特性算出丟掉的分區(qū)數(shù)據(jù)。
一土浸、創(chuàng)建RDD
(1)讀取外部數(shù)據(jù)罪针,每一行就是一個(gè)集合元素
// 返回的是一個(gè)RDD對象
val inputRDD = sc.textFile("/opt/spark/test/log.txt")
(2)程序中對集合進(jìn)行并行化
val inputRDD = sc.parallelize(List("java","scala","python"))
二、RDD操作
轉(zhuǎn)化操作:返回一個(gè)新的RDD類型黄伊,如map()泪酱、filter()
行動操作:觸發(fā)實(shí)際計(jì)算,返回其他類型的結(jié)果Arrar[String]还最,如count()西篓、first()、collect()
//惰性計(jì)算:只有在一次行動操作中用到時(shí)憋活,才會執(zhí)行之前的轉(zhuǎn)化操作岂津。譬如textFile就不會一開是加載整個(gè)日志文件,而是明白了整個(gè)行動目的后悦即,才會加載文件到內(nèi)存吮成,而且讀到有10行error日志即停止。若當(dāng)某個(gè)RDD需要反復(fù)被用到辜梳,那么可以使用rdd.persist()強(qiáng)制該RDD完整加載到內(nèi)存中
val errorRDD = inputRDD.filter(line => line.contains("error"))
errorRDD.take(10).foreach(println)
//向Spark傳遞函數(shù):大部分轉(zhuǎn)化操作與一部分行動操作需要依賴用戶傳遞的函數(shù)來計(jì)算粱甫。當(dāng)出現(xiàn)NotSerializableException,通常是因?yàn)槲覀儌鬟f了一個(gè)不可序列化的類中的函數(shù)或字段作瞄。還有當(dāng)傳遞的對象包含某個(gè)超大對象的引用茶宵,spark會把整個(gè)超大對象發(fā)布到工作節(jié)點(diǎn)而導(dǎo)致內(nèi)存不夠失敗
scala> def containsErrors(s:String) = s.contains("error")
containsErrors: (s: String)Boolean
scala> inputRDD.filter(containsErrors)
res8: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[10] at filter at <console>:28
常見轉(zhuǎn)化操作:
常見行動操作:
RDD類型轉(zhuǎn)化:
有些函數(shù)只能用于特定類型的 RDD,比如 mean() 和 variance() 只能用在數(shù)值 RDD 上宗挥, 而 join() 只能用在鍵值對 RDD 上乌庶。Scala中通過隱式轉(zhuǎn)換可以把 RDD[Double] 轉(zhuǎn)為DoubleRDDFunctions,然后調(diào)用 mean()
持久化:
Scala和 Java中契耿,默認(rèn)情況下persist() 會把數(shù)據(jù)以序列化的形式緩存在 JVM 的堆空間中瞒大。當(dāng)然也可以自定義緩存級別。RDD還有一個(gè)方法叫作unpersist()搪桂,調(diào)用該方法可以手動把持久化的 RDD 從緩存中移除
val result = input.map(x => x * x)
// 緩存透敌,避免了兩次行動操作導(dǎo)致加載兩次RDD
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
println(result.collect().mkString(","))