昨天有位大哥問(wèn)小弟一個(gè)Spark問(wèn)題木张,他們想在不停Spark程序的情況下動(dòng)態(tài)更新UDF的邏輯,他一問(wèn)我這個(gè)問(wèn)題的時(shí)候调违,本豬心里一驚窟哺,Spark**還能這么玩?我出于程序員的本能回復(fù)他肯定不行,但今天再回過(guò)來(lái)頭想了一想技肩,昨天腦子肯定進(jìn)水了且轨,回復(fù)太膚淺了,既然Spark可以通過(guò)編程方式注冊(cè)UDF虚婿,當(dāng)然把那位大哥的代碼邏輯使用反射加載進(jìn)去再調(diào)用不就行了旋奢?這不就是JVM的優(yōu)勢(shì)么,怪自己的反射沒(méi)學(xué)到家然痊,說(shuō)搞就搞起至朗。
分析過(guò)程
我會(huì)說(shuō)這波分析過(guò)程很無(wú)聊,你還會(huì)看么?
跟著本豬看一個(gè)Spark
注冊(cè)UDF
的例子
spark.udf.register(name, (a1: String) => a1.toUpperCase)
點(diǎn)擊register
的源碼進(jìn)去看
一個(gè)`A1`:參數(shù)類(lèi)型,`RT`:返回類(lèi)型
def register[RT: TypeTag, A1: TypeTag](name: String, func: Function1[A1, RT]): UserDefinedFunction = {
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: Nil).toOption
def builder(e: Seq[Expression]) = if (e.length == 1) {
ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable, udfDeterministic = true)
} else {
...
}
...
}
def register[RT: TypeTag, A1: TypeTag, A2: TypeTag](name: String, func: Function2[A1, A2, RT]): UserDefinedFunction
def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag](name: String, func: Function3[A1, A2, A3, RT]): UserDefinedFunction = {
def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag](name: String, func: Function4[A1, A2, A3, A4, RT]): UserDefinedFunction = {
...
1. func是上面方法的重點(diǎn)矗钟,既然想要?jiǎng)討B(tài)UDF
邏輯代碼,那我們把Function1
這個(gè)函數(shù)實(shí)現(xiàn)不就可以了嫌变?再利用JVM反射的技術(shù)調(diào)用吨艇,完美。
2. 順便還看出了在scala-2.10.x版本中case class
的元素是不能超過(guò) 個(gè)的腾啥。
上面的UDF
注冊(cè)的原型其實(shí)是
val udf = new Function1[String,String] {
override def apply(a1: String): String = {
a1.toUpperCase
}
}
spark.udf.register(name, udf)
到這里我有一個(gè) 膚淺 并且 大膽 的想法东涡,我把那位大哥的代碼放到apply方法里面調(diào)用不就行了?
val udf = new Function1[String,String] {
override def apply(a1: String): String = {
//method.invoke(instance) //使用反射加載代碼,把大哥動(dòng)態(tài)邏輯方法method拿出來(lái)調(diào)用凸舵。
}
}
1. 但是還有一些問(wèn)題要解決祖娘,我不能強(qiáng)制我的老大哥只能傳遞一個(gè)參數(shù)吧,那也太年輕不懂事了贞间,至少讓他可以隨意傳 參數(shù)贿条。
2. 唯一的解決方法,就是要控制Function1
到Function22
函數(shù)的動(dòng)態(tài)生成增热,找了半天沒(méi)發(fā)現(xiàn)Function
的動(dòng)態(tài)生成整以,然后還發(fā)現(xiàn)Spark也是根據(jù)參數(shù)長(zhǎng)度生成FunctionN
的,真**刷新本豬的三觀呀峻仇。
3. 既然實(shí)現(xiàn)方式找到了公黑,那就簡(jiǎn)單了,只要通過(guò)反射就能 上知天文,下知地理 摄咆。
既然是Spark
凡蚜,肯定要用Scala
去寫(xiě)反射了。
case class ClassInfo(clazz: Class[_], instance: Any, defaultMethod: Method, methods: Map[String, Method], func:String) {
def invoke[T](args: Object*): T = {
defaultMethod.invoke(instance, args: _*).asInstanceOf[T]
}
}
object ClassCreateUtils extends Logging{
private val clazzs = new util.HashMap[String, ClassInfo]()
private val classLoader = scala.reflect.runtime.universe.getClass.getClassLoader
private val toolBox = universe.runtimeMirror(classLoader).mkToolBox()
def apply(func: String): ClassInfo = this.synchronized {
var clazz = clazzs.get(func)
if (clazz == null) {
val (className, classBody) = wrapClass(func)
val zz = compile(prepareScala(className, classBody))
val defaultMethod = zz.getDeclaredMethods.head
val methods = zz.getDeclaredMethods
clazz = ClassInfo(
zz,
zz.newInstance(),
defaultMethod,
methods = methods.map { m => (m.getName, m) }.toMap,
func
)
clazzs.put(func, clazz)
logInfo(s"dynamic load class => $clazz")
}
clazz
}
def compile(src: String): Class[_] = {
val tree = toolBox.parse(src)
toolBox.compile(tree).apply().asInstanceOf[Class[_]]
}
def prepareScala(className: String, classBody: String): String = {
classBody + "\n" + s"scala.reflect.classTag[$className].runtimeClass"
}
def wrapClass(function: String): (String, String) = {
val className = s"dynamic_class_${UUID.randomUUID().toString.replaceAll("-", "")}"
val classBody =
s"""
|class $className{
| $function
|}
""".stripMargin
(className, classBody)
}
}
上面的代碼是小弟給大佬寫(xiě)好的吭从,不用大佬親自動(dòng)手了朝蜘。
使用方法就灰常簡(jiǎn)單了我的大佬們。
val infos = ClassCreateUtils(
"""
|def apply(name:String)=name.toUpperCase
""".stripMargin
)
println(infos.defaultMethod.invoke(infos.instance,"dounine 本豬會(huì)一點(diǎn)點(diǎn) spark"))
# 輸出結(jié)果不用猜也知道是
DOUNINE 本豬會(huì)一點(diǎn)點(diǎn) SPARK
# 也可以手動(dòng)指定方法
println(infos.methods("apply").invoke(infos.instance,"dounine 本豬會(huì)一點(diǎn)點(diǎn) spark"))
根據(jù)反射的方法信息生成FunctionN
object ScalaGenerateFuns {
def apply(func: String): (AnyRef, Array[DataType], DataType) = {
val (argumentTypes, returnType) = getFunctionReturnType(func)
(generateFunction(func, argumentTypes.length), argumentTypes, returnType)
}
//獲取方法的參數(shù)類(lèi)型及返回類(lèi)型
private def getFunctionReturnType(func: String): (Array[DataType], DataType) = {
val classInfo = ClassCreateUtils(func)
val method = classInfo.defaultMethod
val dataType = JavaTypeInference.inferDataType(method.getReturnType)._1
(method.getParameterTypes.map(JavaTypeInference.inferDataType).map(_._1), dataType)
}
//生成22個(gè)Function
def generateFunction(func: String, argumentsNum: Int): AnyRef = {
lazy val instance = ClassCreateUtils(func).instance
lazy val method = ClassCreateUtils(func).methods("apply")
argumentsNum match {
case 0 => new (() => Any) with Serializable with Logging {
override def apply(): Any = {
try {
method.invoke(instance)
} catch {
case e: Exception =>
logError(e.getMessage)
}
}
}
case 1 => new (Object => Any) with Serializable with Logging {
override def apply(v1: Object): Any = {
try {
method.invoke(instance, v1)
} catch {
case e: Exception =>
e.printStackTrace()
logError(e.getMessage)
null
}
}
}
case 2 => new ((Object, Object) => Any) with Serializable with Logging {
override def apply(v1: Object, v2: Object): Any = {
try {
method.invoke(instance, v1, v2)
} catch {
case e: Exception =>
logError(e.getMessage)
null
}
}
}
//... 麻煩大佬自己去寫(xiě)剩下的20個(gè)了涩金,這里裝不下了谱醇,不然瀏覽器會(huì)崩潰的,然后電腦會(huì)重啟的步做,為了大佬的電腦著想副渴。
}
前戲我們都做完了,高潮的環(huán)節(jié)來(lái)了全度。
我們最后再照著register
的實(shí)現(xiàn)方式煮剧,把我們動(dòng)態(tài)Function
注冊(cè)給Spark
1. val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
2. val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: Nil).toOption
3. def builder(e: Seq[Expression]) = if (e.length == 1) {
ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil),
Some(name), nullable, udfDeterministic = true)
}
4. functionRegistry.createOrReplaceTempFunction(name, builder)
1. 這句代碼比較好理解,就是獲取RT
返回值類(lèi)型,就是我們的returnType
2. 就是參數(shù)類(lèi)型勉盅,對(duì)應(yīng)的修改如下
val inputTypes = Try(argumentTypes.toList).toOption
3. 剛開(kāi)始看到這個(gè)時(shí)候佑颇,我是一臉???,后來(lái)看源碼才發(fā)現(xiàn)builder
是一種自定類(lèi)型,源碼如下
type FunctionBuilder = Seq[Expression] => Expression
改造方式如下
def builder(e: Seq[Expression]) = ScalaUDF(rf, returnType, e, inputTypes.getOrElse(Nil), Some(name))
4. 看到這句的時(shí)候我以為簡(jiǎn)單了菇篡,直接使用spark.sessionState.functionRegistry
發(fā)現(xiàn)編譯不過(guò)漩符,看到private[sql]
這個(gè)作用域的時(shí)候有點(diǎn)崩潰一喘,本來(lái)是想用下面的方式注冊(cè)的驱还。
val udf = UserDefinedFunction(rf, returnType, inputTypes).withName(name)
spark.udf.register(name, udf)
是小弟我想太多了,另辟捷徑凸克,做了那么多工作難道就白費(fèi)了议蟆?
發(fā)現(xiàn)下面這句代碼,瞬間找到了家的方向萎战。
functionRegistry.registerFunction(new FunctionIdentifier(name), builder)
人生巔峰
到此咐容,大豬的分析與編碼已經(jīng)完成,下面是今天給大哥的解決方案蚂维。
方法實(shí)現(xiàn)可以通過(guò)查詢sql得到戳粒,或者接口都渴以。
val spark = SparkSession
.builder()
.appName("test")
.master("local[*]")
.getOrCreate()
val name = "hello"
val (fun, argumentTypes, returnType) = ScalaSourceUDF(
"""
|def apply(name:String)=name+" => hi"
|""".stripMargin)
val inputTypes = Try(argumentTypes.toList).toOption
def builder(e: Seq[Expression]) = ScalaUDF(fun, returnType, e, inputTypes.getOrElse(Nil), Some(name))
spark.sessionState.functionRegistry.registerFunction(new FunctionIdentifier(name), builder)
val rdd = spark
.sparkContext
.parallelize(Array(("dounine", "20")))
.map(x => Row.fromSeq(Array(x._1, x._2)))
val types = StructType(
Array(
StructField("name", StringType),
StructField("age", StringType)
)
)
spark.createDataFrame(rdd, types).createTempView("log")
spark.sql("select hello(name) from log").show(false)
真打臉虫啥,昨天還說(shuō)不行的蔚约。