序列化在分布式系統(tǒng)中扮演著重要的角色亚茬,優(yōu)化Spark程序時(shí),首當(dāng)其沖的就是對序列化方式的優(yōu)化夜郁。Spark為使用者提供兩種序列化方式:
Java Serialization: 默認(rèn)的序列化方式钩蚊。
Kryo Serialization: 相較于 Java Serialization 的方式,速度更快蹈矮,空間占用更小砰逻,但并不支持所有的序列化格式,同時(shí)使用的時(shí)候需要注冊class泛鸟。spark-sql中默認(rèn)使用的是kyro的序列化方式蝠咆。
下文將會講解kryo的使用方式并對比性能。
配置
可以在spark-default.conf
設(shè)置全局參數(shù)北滥,也可以代碼中初始化時(shí)對SparkConf設(shè)置 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
刚操,該參數(shù)會同時(shí)作用于機(jī)器之間數(shù)據(jù)的shuffle操作以及序列化rdd到磁盤,內(nèi)存再芋。
Spark不將Kyro設(shè)置成默認(rèn)的序列化方式是因?yàn)樗枰獙︻愡M(jìn)行注冊菊霜,官方強(qiáng)烈建議在一些網(wǎng)絡(luò)數(shù)據(jù)傳輸很大的應(yīng)用中使用kyro序列化占卧。
val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[MyClass1],classOf[MyClass2]))
val sc = new SparkContext(conf)
如果你要序列化的對象比較大,可以增加參數(shù)spark.kryoserializer.buffer
所設(shè)置的值联喘。
如果你沒有注冊需要序列化的class华蜒,Kyro依然可以照常工作,但會存儲每個(gè)對象的全類名(full class name)豁遭,這樣的使用方式往往比默認(rèn)的 Java serialization 還要浪費(fèi)更多的空間捂蕴。
可以設(shè)置 spark.kryo.registrationRequired
參數(shù)為 true
,使用kyro時(shí)如果在應(yīng)用中有類沒有進(jìn)行注冊則會報(bào)錯(cuò):
如上這個(gè)錯(cuò)誤需要添加
sparkConf.registerKryoClasses(
Array(classOf[scala.collection.mutable.WrappedArray.ofRef[_]],
classOf[MyClass]))
下面的 demo 將會演示不同方式的序列化對空間占用的情況闪幽。
DEMO
case class Info(name: String ,age: Int,gender: String,addr: String)
object KyroTest {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[2]").setAppName("KyroTest")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[Info]))
val sc = new SparkContext(conf)
val arr = new ArrayBuffer[Info]()
val nameArr = Array[String]("lsw","yyy","lss")
val genderArr = Array[String]("male","female")
val addressArr = Array[String]("beijing","shanghai","shengzhen","wenzhou","hangzhou")
for(i <- 1 to 1000000){
val name = nameArr(Random.nextInt(3))
val age = Random.nextInt(100)
val gender = genderArr(Random.nextInt(2))
val address = addressArr(Random.nextInt(5))
arr.+=(Info(name,age,gender,address))
}
val rdd = sc.parallelize(arr)
//序列化的方式將rdd存到內(nèi)存
rdd.persist(StorageLevel.MEMORY_ONLY_SER)
rdd.count()
}
}
結(jié)果
可以在web ui中看到緩存的rdd大猩侗妗:
序列化方式 | 是否注冊 | 空間占用 |
---|---|---|
kyro | 是 | 21.1 MB |
kyro | 否 | 38.3 MB |
Java | 無 | 25.1 MB |