Spark序列化概述
在Spark的架構中赖瞒,在網絡中傳遞的或者緩存在內存愕贡、硬盤中的對象需要進行序列化操作穆律,序列化的作用主要是利用時間換空間:
- 分發(fā)給Executor上的Task
- 需要緩存的RDD(前提是使用序列化方式緩存)
- 廣播變量
- Shuffle過程中的數(shù)據(jù)緩存
- 使用receiver方式接收的流數(shù)據(jù)緩存
- 算子函數(shù)中使用的外部變量
? 上面的六種數(shù)據(jù)读慎,通過Java序列化(默認的序列化方式)形成一個二進制字節(jié)數(shù)組怕品,大大減少了數(shù)據(jù)在內存、硬盤中占用的空間易迹,減少了網絡數(shù)據(jù)傳輸?shù)拈_銷宰衙,并且可以精確的推測內存使用情況,降低GC頻率睹欲。
? 其好處很多供炼,但是缺陷也很明顯:
? 把數(shù)據(jù)序列化為字節(jié)數(shù)組、把字節(jié)數(shù)組反序列化為對象的操作,是會消耗CPU劲蜻、延長作業(yè)時間的陆淀,從而降低了Spark的性能。 至少默認的Java序列化方式在這方面是不盡如人意的先嬉。Java序列化很靈活但性能較差轧苫,同時序列化后占用的字節(jié)數(shù)也較多。
? 所以官方也推薦盡量使用Kryo的序列化庫(版本2)疫蔓。官文介紹含懊,Kryo序列化機制比Java序列化機制性能提高10倍左右,Spark之所以沒有默認使用Kryo作為序列化類庫衅胀,是因為它不支持所有對象的序列化岔乔,同時Kryo需要用戶在使用前注冊需要序列化的類型,不夠方便滚躯。
相關配置參數(shù)
Property Name | Default |
---|---|
spark.serializer | org.apache.spark.serializer.JavaSerializer |
spark.kryoserializer.buffer | 64k |
spark.kryoserializer.buffer.max | 64m |
spark.kryo.classesToRegister | none |
spark.kryo.referenceTracking | true |
spark.kryo.registrationRequired | false |
spark.kryo.registrator | none |
spark.kryo.unsafe | false |
配置說明:
- spark.serializer:序列化時用的類雏门,需要申明為org.apache.spark.serializer.KryoSerializer。這個設置不僅控制各個worker節(jié)點之間的混洗數(shù)據(jù)序列化格式掸掏,同時還控制RDD存到磁盤上的序列化格式及廣播變量的序列化格式茁影。
- spark.kryoserializer.buffer:每個Executor中的每個core對應著一個序列化buffer。如果你的對象很大丧凤,可能需要增大該配置項募闲。其值不能超過spark.kryoserializer.buffer.max
- spark.kryoserializer.buffer.max:允許使用序列化buffer的最大值
- spark.kryo.classesToRegister:向Kryo注冊自定義的的類型,類名間用逗號分隔
- spark.kryo.referenceTracking:跟蹤對同一個對象的引用情況愿待,這對發(fā)現(xiàn)有循環(huán)引用或同一對象有多個副本的情況是很有用的浩螺。設置為false可以提高性能
- spark.kryo.registrationRequired:是否需要在Kryo登記注冊?如果為true仍侥,則序列化一個未注冊的類時會拋出異常
- spark.kryo.registrator:為Kryo設置這個類去注冊你自定義的類要出。最后,如果你不注冊需要序列化的自定義類型访圃,Kryo也能工作厨幻,不過每一個對象實例的序列化結果都會包含一份完整的類名相嵌,這有點浪費空間
- spark.kryo.unsafe:如果想更加提升性能腿时,可以使用Kryo unsafe方式
Kryo使用
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");//設置序列化方式
conf.registerKryoClasses(new Class[]{_KryoBean.class});//注冊使用kryo序列化的類
另外一種注冊的方法
//實現(xiàn)一個KryoRegistrator注冊類,在該類里面對自定義的序列化類進行注冊饭宾,然后在conf里面配置該類
public class _KryoRegistor implements KryoRegistrator{
@Override
public void registerClasses(Kryo kryo) {
kryo.register(_KryoBean.class, new FieldSerializer<>(kryo, _KryoBean.class));
kryo.register(xxx.class, new FieldSerializer<>(kryo, xxx.class));
...
...
}
}
// 在conf配置如下
SparkSession spark = SparkSession.builder().appName("").master("local")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.kryo.registrator", _KryoRegistor.class.getName())
.getOrCreate();