1 問題描述
當(dāng)使用Spark-sql執(zhí)行 Hive UDF時會發(fā)生NullPointerException(NPE),從而導(dǎo)致作業(yè)異常終止箩兽。NPE具體堆棧信息如下:
Serialization trace:
fields (com.xiaoju.dataservice.api.hive.udf.LoadFromDataServiceMetricSetUDTF)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:551)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:686)
at org.apache.spark.sql.hive.HiveShim$HiveFunctionWrapper.deserializeObjectByKryo(HiveShim.scala:155)
at org.apache.spark.sql.hive.HiveShim$HiveFunctionWrapper.deserializePlan(HiveShim.scala:171)
at org.apache.spark.sql.hive.HiveShim$HiveFunctionWrapper.readExternal(HiveShim.scala:210)
at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1842)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1799)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:234)
at java.util.ArrayList.ensureCapacity(ArrayList.java:218)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:114)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:40)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:708)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
2 問題分析
2.1 NPE直接原因分析
從上述堆棧信息可知枢泰,NPE發(fā)生在Kryo反序列化ArrayList對象時坤按。
Kryo是一個快速高效的序列化框架死陆,它不強制使用某種模式或具有特殊操作特點的數(shù)據(jù),所有的規(guī)范都交由Serializers自己來處理前联。不同的數(shù)據(jù)類型采用的Serializers進(jìn)行處理,同時也允許用戶自定義Serializers來處理數(shù)據(jù)娶眷。而針對ArrayList類型的集合類型的數(shù)據(jù)似嗤,Kryo默認(rèn)提供了CollectionSerializer.
at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:234)
at java.util.ArrayList.ensureCapacity(ArrayList.java:218)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:114)
結(jié)合上述堆棧信息,通過源碼調(diào)試届宠,我們發(fā)現(xiàn)CollectionSerializer#read中會反序列化生成ArrayList對象烁落,在調(diào)用ensureCapacity設(shè)置ArrayList容量時發(fā)生NPE異常. 通過試信息發(fā)現(xiàn)生成的ArrayList中elementData屬性未初始化,調(diào)試信息如下:
而通過查看ArrayList的各個構(gòu)造函數(shù)豌注,均對ArrayList@elementData進(jìn)行了初始化伤塌。為什么調(diào)試結(jié)果顯示elementData為NULL呢,除非創(chuàng)建對象時未調(diào)用任何構(gòu)造函數(shù)幌羞,于是問題的分析方向轉(zhuǎn)移到了ArrayList的創(chuàng)建方式上寸谜。
/**
* Constructs an empty list with an initial capacity of ten.
*/
public ArrayList() {
this.elementData = DEFAULTCAPACITY_EMPTY_ELEMENTDATA;
}
//其它構(gòu)造函數(shù)也均對elementData進(jìn)行了初始化
2.2 ArrayList對象的創(chuàng)建方式
上文提到,創(chuàng)建的ArrayList對象的elementData屬性為NULL属桦,而ArrayList的各個構(gòu)造方法中都對elementData進(jìn)行了初始化熊痴,出現(xiàn)此結(jié)果的原因可能是由于創(chuàng)建對象時未使用任何構(gòu)造方法。帶著此假設(shè)聂宾,再次對程序進(jìn)行調(diào)試果善。
//創(chuàng)建ArrayList對象的方法
/** Creates a new instance of a class using {@link Registration#getInstantiator()}. If the registration's instantiator is null,
* a new one is set using {@link #newInstantiator(Class)}. */
public <T> T newInstance (Class<T> type) {
Registration registration = getRegistration(type);
ObjectInstantiator instantiator = registration.getInstantiator();
if (instantiator == null) {
instantiator = newInstantiator(type);
registration.setInstantiator(instantiator);
}
return (T)instantiator.newInstance();
ArrayList對象由Kryo#newInstance方法進(jìn)行實例化,而具體采用的實例化器(創(chuàng)建對象采用的構(gòu)造器)系谐,類型向Kryo注冊Registration時指定的實例器巾陕,若注冊時未指定讨跟,則會依據(jù)Class Type按設(shè)置的InstantiatorStrategy創(chuàng)建實例化器。實現(xiàn)如下:
/** Returns a new instantiator for creating new instances of the specified type. By default, an instantiator is returned that
* uses reflection if the class has a zero argument constructor, an exception is thrown. If a
* {@link #setInstantiatorStrategy(InstantiatorStrategy) strategy} is set, it will be used instead of throwing an exception. */
protected ObjectInstantiator newInstantiator (final Class type) {
// InstantiatorStrategy.
return strategy.newInstantiatorOf(type);
}
SparkSql在序列化及反序列化Hive UDF時默認(rèn)采用的Kryo實例由Hive代碼定義的鄙煤,其采用的實例化器策略為StdInstantiatorStrategy(若注冊的Registration未設(shè)置instantiator晾匠,則使用該策略創(chuàng)建instantiator),具體實現(xiàn)如下:
// Kryo is not thread-safe,
// Also new Kryo() is expensive, so we want to do it just once.
public static ThreadLocal<Kryo> runtimeSerializationKryo = new ThreadLocal<Kryo>() {
@Override
protected synchronized Kryo initialValue() {
Kryo kryo = new Kryo();
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
kryo.register(java.sql.Date.class, new SqlDateSerializer());
kryo.register(java.sql.Timestamp.class, new TimestampSerializer());
kryo.register(Path.class, new PathSerializer());
kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
......
return kryo;
};
};
而StdInstantiatorStrategy在創(chuàng)建對象時是依據(jù)JVM version信息及JVM vendor信息進(jìn)行的梯刚,而不是依據(jù)Class的具體實現(xiàn)凉馆,
其可以不調(diào)用對象的任何構(gòu)造方法創(chuàng)建對象。
// StdInstantiatorStrategy的描述信息
/**
* Guess the best instantiator for a given class. The instantiator will instantiate the class
* without calling any constructor. Currently, the selection doesn't depend on the class. It relies
* on the
* <ul>
* <li>JVM version</li>
* <li>JVM vendor</li>
* <li>JVM vendor version</li>
* </ul>
* However, instantiators are stateful and so dedicated to their class.
*
* @author Henri Tremblay
* @see ObjectInstantiator
*/
public class StdInstantiatorStrategy extends BaseInstantiatorStrategy {
而我們發(fā)現(xiàn)Kryo在注冊各類型Class的Registration對象時都未顯式設(shè)置instantiator亡资,因此都會采用StdInstantiatorStrategy策略構(gòu)造對象澜共。
至此,我們的假設(shè)成立锥腻,NPE的原因是由于生成ArrayList對象時未調(diào)用任何構(gòu)造方法嗦董,從而使其elementData屬性未初始化所致。
3 部分Spark版本可以正常執(zhí)行的原因
同樣的用戶程序瘦黑,在公司較早期的Spark中可以正常執(zhí)行京革,而在最新提供的Spark版本中會出現(xiàn)上述Bug,為什么會出現(xiàn)這樣的問題呢供璧,我們的第一反應(yīng)是可能Kryo的版本不同存崖,通過查看IDE的External Libraries 觀查到老版本Spark采用的是Kryo 2, 而最新版本中依賴的是Kryo 3。
通過分析兩個版本的Kryo代碼實現(xiàn)睡毒,并沒有發(fā)現(xiàn)對ArrayList的操作行為有何不同来惧。于是重新進(jìn)行排查,因問題發(fā)生于Hive UDF的反序列化過程演顾,因此排查了兩個版本Spark 依賴的Hive版本信息供搀。
公司老版本Spark依賴的Hive信息(Spark官方的依賴版本,即:閹割版):
<hive.group>org.spark-project.hive</hive.group>
<!-- Version used in Maven Hive dependency -->
<hive.version>1.2.1.spark</hive.version>
公司新版本Spark依賴的Hive信息(本質(zhì)為社區(qū)版Hive):
<hive.group>com.my corporation.hive</hive.group>
<!-- Version used in Maven Hive dependency -->
<hive.version>1.2.1-200-spark</hive.version>
顯然钠至,公司使用的新老版本的Spark依賴的Hive是不同的葛虐。通過調(diào)研發(fā)現(xiàn)Spark社區(qū)版的Hive依賴“org.spark-project.hive” 系在原版Hive基礎(chǔ)上修改過的獨立的工程,其中存在自己定義的Kryo的組件(即對Hive社區(qū)版進(jìn)行了閹割棉钧,并自己實現(xiàn)了Kryo)屿脐。 而公司新版Spark中依賴的Hive是社區(qū)版Hive, Hive中使用的Kryo組件為第三方依賴(Kryo官方版,并通過maven-shade-plugin的relocation將包路徑重定義到了hive-exec中)宪卿。
通過對比分析發(fā)現(xiàn):
公司老版本依賴的Hive(即Spark社區(qū)版中依賴的Hive)中對Kryo的newInstantiator方法進(jìn)行了改造的诵,其并未設(shè)置實例化器策略(InstantiatorStrategy),而是直接通過獲取Class的默認(rèn)構(gòu)造函數(shù)來創(chuàng)建對象佑钾,即其創(chuàng)建的對象是被實例化的西疤。因此,創(chuàng)建ArrayList時休溶,elementData屬性可以被初始化代赁。
對該問題存在影響的不同實現(xiàn):
- 公司老版本Spark依賴Hive(即社區(qū)版Spark中閹割的Hive)中使用的Kryo
protected ObjectInstantiator newInstantiator(final Class type) {
if (!Util.isAndroid) {
Class enclosingType = type.getEnclosingClass();
boolean isNonStaticMemberClass = enclosingType != null && type.isMemberClass() && !Modifier.isStatic(type.getModifiers());
if (!isNonStaticMemberClass) {
try {
// 獲取無參構(gòu)造方法
final ConstructorAccess access = ConstructorAccess.get(type);
return new ObjectInstantiator() {
public Object newInstance() {
try {
return access.newInstance();
} catch (Exception var2) {
throw new KryoException("Error constructing instance of class: " + Util.className(type), var2);
}
}
};
} catch (Exception var7) {
;
}
}
}
......
}
- 公司新版本Spark依賴的Hive(實為社區(qū)版Hive)中使用的Kryo扰她,是依據(jù)InstantiatorStrategy選取不同的策略進(jìn)行創(chuàng)建對象,在本文2.2節(jié)已進(jìn)行描述芭碍,不再贅述徒役。
/** Returns a new instantiator for creating new instances of the specified type. By default, an instantiator is returned that
* uses reflection if the class has a zero argument constructor, an exception is thrown. If a
* {@link #setInstantiatorStrategy(InstantiatorStrategy) strategy} is set, it will be used instead of throwing an exception. */
protected ObjectInstantiator newInstantiator (final Class type) {
// InstantiatorStrategy.
return strategy.newInstantiatorOf(type);
}
4 解決方案
經(jīng)過以上分析,可知NPE的主要原因是由于Spark調(diào)用了Hive中設(shè)置了StdInstantiatorStrategy的Kryo對象對ArrayList對象反序列化時未調(diào)用其任何構(gòu)造函數(shù)豁跑,從而使用創(chuàng)建的對象未實例化所致廉涕。
因此泻云,可以在Spark艇拍、Hive、Kryo三者中任一中修復(fù)宠纯。目前卸夕,該問題只在Spark引擎中出現(xiàn),故選擇在Spark中進(jìn)行修復(fù)婆瓜。主要思想是首先使用默認(rèn)無參構(gòu)造策略DefaultInstantiatorStrategy快集,若創(chuàng)建對象失敗則采用StdInstantiatorStrategy
@transient
def deserializeObjectByKryo[T: ClassTag](
kryo: Kryo,
in: InputStream,
clazz: Class[_]): T = {
val inp = new Input(in)
// 顯式設(shè)置instantiator
kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy))
val t: T = kryo.readObject(inp, clazz).asInstanceOf[T]
inp.close()
t
}