RDD序列化
閉包檢查
從計(jì)算的角度秘症,**算子以外的代碼都是在Driver端執(zhí)行垒棋,算子里面的代碼都是在Executor端執(zhí)行爬范。**那么在scala函數(shù)式編程中掸茅,就會(huì)導(dǎo)致算子內(nèi)經(jīng)常會(huì)用到算子外的數(shù)據(jù),這樣就形成了閉包的效果柠逞,如果使用的算子外的數(shù)據(jù)無(wú)法序列化昧狮,就意味著無(wú)法傳值給Executor端執(zhí)行,從而發(fā)生錯(cuò)誤板壮,所以需要在執(zhí)行任務(wù)計(jì)算前逗鸣,檢測(cè)閉包內(nèi)對(duì)象是否可以進(jìn)行序列化,這個(gè)操作我們稱之為閉包檢測(cè)绰精。
Kryo序列化框架
Java的序列化能夠序列化任何的類慕购。單比較重(字節(jié)多),序列化后茬底,對(duì)象的提交也比較大沪悲。Spark2.0開(kāi)始支持Kryo序列化機(jī)制。Kryo速度是Serializable的10倍阱表。當(dāng)RDD子shuffle數(shù)據(jù)時(shí)殿如,簡(jiǎn)單數(shù)據(jù)類型、數(shù)組最爬、字符串類型已經(jīng)在Spark內(nèi)部使用Kryo來(lái)序列化涉馁。
object SerTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("serTest")
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
val sc = new SparkContext(conf)
val rdd1 = sc.makeRDD(List("hello spark","hello scala"))
val person:Person = new Person("hel")
val rdd2 = person.getMatchedRDD1(rdd1)
rdd2.collect().foreach(println)
}
}
class Person(str:String) extends Serializable {
def isMatch(s:String)={
s.contains(str)
}
def getMatchedRDD1(rdd:RDD[String]): RDD[String] ={
rdd.filter(isMatch)
}
}
RDD依賴關(guān)系
1.血緣關(guān)系
圖中的rdd之間存在血緣關(guān)系
rdd.toDebugString
2.依賴關(guān)系
每個(gè)相鄰rdd之間存在依賴關(guān)系
rdd.dependencies
1)rdd窄依賴
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd)
2)rdd寬依賴
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Serializer = SparkEnv.get.serializer,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
extends Dependency[Product2[K, V]]
rdd階段劃分&&rdd任務(wù)劃分
url
rdd緩存和檢查點(diǎn)
rdd.cache
rdd.persist(StorageLevel.緩存方式)
sc.checkpointDir(path)
rdd.checkpoint
注:以上操作要執(zhí)行行動(dòng)算子后才進(jìn)行存儲(chǔ)。因?yàn)橹挥行袆?dòng)算子被執(zhí)行爱致,driver端才會(huì)拿到executor端計(jì)算的數(shù)據(jù)烤送。
緩存和檢查的區(qū)別
1) Cache 緩存只是將數(shù)據(jù)保存起來(lái),不切斷血緣依賴糠悯。 Checkpoint 檢查點(diǎn)切斷血緣依賴帮坚。
2) Cache 緩存的數(shù)據(jù)通常存儲(chǔ)在磁盤、內(nèi)存等地方互艾,可靠性低试和。 Checkpoint 的數(shù)據(jù)通常存
儲(chǔ)在 HDFS 等容錯(cuò)、高可用的文件系統(tǒng)纫普,可靠性高阅悍。
3)建議對(duì) checkpoint()的 RDD 使用 Cache 緩存,這樣 checkpoint 的 job 只需從 Cache 緩存
中讀取數(shù)據(jù)即可昨稼,否則需要再?gòu)念^計(jì)算一次 RDD节视。
自定義分區(qū)(類似mr的分區(qū))
1.繼承Partitioner抽象類
2.實(shí)現(xiàn)其中方法
方法1:numPartitions:Int,返回分區(qū)數(shù)量即可
方法2:getPartition(key:Any): Boolean 寫具體分區(qū)邏輯。
文件讀寫
1.text文件
2.sequence文件:Hadoop用來(lái)存儲(chǔ)二進(jìn)制形式的key-value而設(shè)計(jì)的一種平面文件(FlatFile)假栓。
3.object對(duì)象文件:對(duì)象文件是將對(duì)象序列化后保存的文件寻行,采用Java的序列化機(jī)制。