背景
最近在弄spark on k8s的時候鹉胖,要集成同事的一些功能所宰,其實(shí)這并沒有什么绒尊,但是里面涉及到了hive的類問題(具體指這個org.apache.hadoop.hive.包下的類)。之后發(fā)現(xiàn)hive類總是優(yōu)先加載應(yīng)用jar包里的類仔粥,而忽略掉spark自帶的系統(tǒng)jars包婴谱,這給我?guī)Я肆撕艽蟮睦_蟹但,大約花了一兩周的時間,終于把這個問題排查清楚了谭羔。
問題分析
直接分析:
我們知道在spark提交的時候华糖,會獲取URLClassLoader去加載類,如下:
private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
// Let the main class re-initialize the logging system once it starts.
if (uninitLog) {
Logging.uninitialize()
}
if (args.verbose) {
logInfo(s"Main class:\n$childMainClass")
logInfo(s"Arguments:\n${childArgs.mkString("\n")}")
// sysProps may contain sensitive information, so redact before printing
logInfo(s"Spark config:\n${Utils.redact(sparkConf.getAll.toMap).mkString("\n")}")
logInfo(s"Classpath elements:\n${childClasspath.mkString("\n")}")
logInfo("\n")
}
val loader = getSubmitClassLoader(sparkConf)
for (jar <- childClasspath) {
addJarToClasspath(jar, loader)
}
var mainClass: Class[_] = null
try {
mainClass = Utils.classForName(childMainClass)
} catch {
....
這里的getSubmitClassLoader方法就是獲得URLClassloader:
private def getSubmitClassLoader(sparkConf: SparkConf): MutableURLClassLoader = {
val loader =
if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) {
new ChildFirstURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
} else {
new MutableURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
}
Thread.currentThread.setContextClassLoader(loader)
loader
}
區(qū)別就是ChildFirstURLClassLoader自定義了類加載的順序瘟裸,也就是會優(yōu)先加載應(yīng)用jar包里的順序缅阳,可是我們的應(yīng)用的并沒有設(shè)置spark.{driver/executor}.userClassPathFirst,所以該hive類是跟這個加載器無關(guān)的。
就在百思不得其解的時候景描,突然想到了spark 對于hive metastore的兼容性隨筆--通過classloader實(shí)現(xiàn),這里的實(shí)現(xiàn)十办,這里就不得不分析一下IsolatedClientLoader這個類的細(xì)節(jié):
private[hive] val classLoader: MutableURLClassLoader = {
val isolatedClassLoader =
if (isolationOn) {
if (allJars.isEmpty) {
// See HiveUtils; this is the Java 9+ + builtin mode scenario
baseClassLoader
} else {
val rootClassLoader: ClassLoader =
if (SystemUtils.JAVA_VERSION.split("\\.")(1).toInt>=9) {
// In Java 9, the boot classloader can see few JDK classes. The intended parent
// classloader for delegation is now the platform classloader.
// See http://java9.wtf/class-loading/
val platformCL =
classOf[ClassLoader].getMethod("getPlatformClassLoader").
invoke(null).asInstanceOf[ClassLoader]
// Check to make sure that the root classloader does not know about Hive.
assert(Try(platformCL.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure)
platformCL
} else {
// The boot classloader is represented by null (the instance itself isn't accessible)
// and before Java 9 can see all JDK classes
null
}
new URLClassLoader(allJars, rootClassLoader) {
override def loadClass(name: String, resolve: Boolean): Class[_] = {
val loaded = findLoadedClass(name)
if (loaded == null) doLoadClass(name, resolve) else loaded
}
def doLoadClass(name: String, resolve: Boolean): Class[_] = {
val classFileName = name.replaceAll("\\.", "/") + ".class"
if (isBarrierClass(name)) {
// For barrier classes, we construct a new copy of the class.
val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName))
logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}")
defineClass(name, bytes, 0, bytes.length)
} else if (!isSharedClass(name)) {
logDebug(s"hive class: $name - ${getResource(classToPath(name))}")
super.loadClass(name, resolve)
} else {
// For shared classes, we delegate to baseClassLoader, but fall back in case the
// class is not found.
logDebug(s"shared class: $name")
try {
baseClassLoader.loadClass(name)
} catch {
case _: ClassNotFoundException =>
super.loadClass(name, resolve)
}
}
}
}
}
} else {
baseClassLoader
}
如果是在JDK8的情況下rootClassLoader是為null的,這就導(dǎo)致了在加載hive相關(guān)的類的時候超棺,super.loadClass方法就會直接執(zhí)行URLClassLoader的findClass方法向族,進(jìn)而從URL(也就是通過addURL方法加載進(jìn)來的jar)查著相關(guān)的類
而在spark中,最終的任務(wù)提交是通過SparkSubmit的runMain方法來提交的棠绘,代碼如第一塊代碼:
val loader = getSubmitClassLoader(sparkConf)
for (jar <- childClasspath) {
addJarToClasspath(jar, loader)
}
注意childClasspath這個是怎么來著件相,我們在提交任務(wù)的時候,是可以看到Classpath elements輸出的氧苍,也就是會包括--jars指定的jar包和因公的jar包夜矗,所以在加載hive相關(guān)的類的時候,就會優(yōu)先從childClassPath中加載對應(yīng)的class让虐,這是通過IolatedClassLoader實(shí)現(xiàn)紊撕。
解決方法
但是如果應(yīng)用中用到的hive相關(guān)的類和系統(tǒng)的類不一致的話,該怎么解決赡突,可以用maven shade插件進(jìn)行重命名对扶,使應(yīng)用的jar包使用重名以后的類,而不會影響其他的類使用系統(tǒng)自帶的hive相關(guān)的類
結(jié)論
hive相關(guān)的class(在org.apache.hadoop.hive包下)惭缰,跟spark.{driver/executor}.userClassPathFirst無關(guān),跟IolatedClassLoader實(shí)現(xiàn)有關(guān)浪南。具體想看哪些hive類是從哪個jar包加載進(jìn)來的,可以開啟debug日志