Spark - 動(dòng)態(tài)注冊(cè)UDF

昨天有位大哥問(wèn)小弟一個(gè)Spark問(wèn)題木张,他們想在不停Spark程序的情況下動(dòng)態(tài)更新UDF的邏輯,他一問(wèn)我這個(gè)問(wèn)題的時(shí)候调违,本豬心里一驚窟哺,Spark**還能這么玩?我出于程序員的本能回復(fù)他肯定不行,但今天再回過(guò)來(lái)頭想了一想技肩,昨天腦子肯定進(jìn)水了且轨,回復(fù)太膚淺了,既然Spark可以通過(guò)編程方式注冊(cè)UDF虚婿,當(dāng)然把那位大哥的代碼邏輯使用反射加載進(jìn)去再調(diào)用不就行了旋奢?這不就是JVM的優(yōu)勢(shì)么,怪自己的反射沒(méi)學(xué)到家然痊,說(shuō)搞就搞起至朗。

分析過(guò)程

我會(huì)說(shuō)這波分析過(guò)程很無(wú)聊,你還會(huì)看么?


想看更多Spark有趣的文章來(lái)關(guān)注本豬剧浸,包你跟我一樣肥锹引。

跟著本豬看一個(gè)Spark注冊(cè)UDF的例子

spark.udf.register(name, (a1: String) => a1.toUpperCase)

點(diǎn)擊register的源碼進(jìn)去看

一個(gè)`A1`:參數(shù)類(lèi)型,`RT`:返回類(lèi)型
def register[RT: TypeTag, A1: TypeTag](name: String, func: Function1[A1, RT]): UserDefinedFunction = {
    val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
    val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: Nil).toOption
    def builder(e: Seq[Expression]) = if (e.length == 1) {
      ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable, udfDeterministic = true)
    } else {
      ...
    }
    ...
  }
def register[RT: TypeTag, A1: TypeTag, A2: TypeTag](name: String, func: Function2[A1, A2, RT]): UserDefinedFunction
def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag](name: String, func: Function3[A1, A2, A3, RT]): UserDefinedFunction = {
def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag](name: String, func: Function4[A1, A2, A3, A4, RT]): UserDefinedFunction = {
...

1. func是上面方法的重點(diǎn)矗钟,既然想要?jiǎng)討B(tài)UDF邏輯代碼,那我們把Function1這個(gè)函數(shù)實(shí)現(xiàn)不就可以了嫌变?再利用JVM反射的技術(shù)調(diào)用吨艇,完美。
2. 順便還看出了在scala-2.10.x版本中case class的元素是不能超過(guò) \color{#FF0000}{22} 個(gè)的腾啥。


上面的UDF注冊(cè)的原型其實(shí)是

val udf = new Function1[String,String] {
      override def apply(a1: String): String = {
        a1.toUpperCase
      }
}
spark.udf.register(name, udf)

到這里我有一個(gè) 膚淺 并且 大膽 的想法东涡,我把那位大哥的代碼放到apply方法里面調(diào)用不就行了?

再打一個(gè)廣告倘待,快關(guān)注我吧疮跑。

val udf = new Function1[String,String] {
      override def apply(a1: String): String = {
        //method.invoke(instance) //使用反射加載代碼,把大哥動(dòng)態(tài)邏輯方法method拿出來(lái)調(diào)用凸舵。
      }
}

1. 但是還有一些問(wèn)題要解決祖娘,我不能強(qiáng)制我的老大哥只能傳遞一個(gè)參數(shù)吧,那也太年輕不懂事了贞间,至少讓他可以隨意傳 \color{#FF0000}{22} 參數(shù)贿条。
2. 唯一的解決方法,就是要控制Function1Function22函數(shù)的動(dòng)態(tài)生成增热,找了半天沒(méi)發(fā)現(xiàn)Function的動(dòng)態(tài)生成整以,然后還發(fā)現(xiàn)Spark也是根據(jù)參數(shù)長(zhǎng)度生成FunctionN的,真**刷新本豬的三觀呀峻仇。
3. 既然實(shí)現(xiàn)方式找到了公黑,那就簡(jiǎn)單了,只要通過(guò)反射就能 上知天文,下知地理 摄咆。


既然是Spark凡蚜,肯定要用Scala去寫(xiě)反射了。

case class ClassInfo(clazz: Class[_], instance: Any, defaultMethod: Method, methods: Map[String, Method], func:String) {
  def invoke[T](args: Object*): T = {
    defaultMethod.invoke(instance, args: _*).asInstanceOf[T]
  }
}
object ClassCreateUtils extends Logging{
  private val clazzs = new util.HashMap[String, ClassInfo]()
  private val classLoader = scala.reflect.runtime.universe.getClass.getClassLoader
  private val toolBox = universe.runtimeMirror(classLoader).mkToolBox()
  def apply(func: String): ClassInfo = this.synchronized {
    var clazz = clazzs.get(func)
    if (clazz == null) {
      val (className, classBody) = wrapClass(func)
      val zz = compile(prepareScala(className, classBody))
      val defaultMethod = zz.getDeclaredMethods.head
      val methods = zz.getDeclaredMethods
      clazz = ClassInfo(
        zz,
        zz.newInstance(),
        defaultMethod,
        methods = methods.map { m => (m.getName, m) }.toMap,
        func
      )
      clazzs.put(func, clazz)
      logInfo(s"dynamic load class => $clazz")
    }
    clazz
  }
  def compile(src: String): Class[_] = {
    val tree = toolBox.parse(src)
    toolBox.compile(tree).apply().asInstanceOf[Class[_]]
  }
  def prepareScala(className: String, classBody: String): String = {
    classBody + "\n" + s"scala.reflect.classTag[$className].runtimeClass"
  }
  def wrapClass(function: String): (String, String) = {
    val className = s"dynamic_class_${UUID.randomUUID().toString.replaceAll("-", "")}"
    val classBody =
      s"""
         |class $className{
         |  $function
         |}
            """.stripMargin
    (className, classBody)
  }
}

上面的代碼是小弟給大佬寫(xiě)好的吭从,不用大佬親自動(dòng)手了朝蜘。


Spark 大數(shù)據(jù)更多技術(shù)文章,here

使用方法就灰常簡(jiǎn)單了我的大佬們。

val infos = ClassCreateUtils(
      """
        |def apply(name:String)=name.toUpperCase
      """.stripMargin
)
    
println(infos.defaultMethod.invoke(infos.instance,"dounine 本豬會(huì)一點(diǎn)點(diǎn) spark"))
# 輸出結(jié)果不用猜也知道是
DOUNINE 本豬會(huì)一點(diǎn)點(diǎn) SPARK
# 也可以手動(dòng)指定方法
println(infos.methods("apply").invoke(infos.instance,"dounine 本豬會(huì)一點(diǎn)點(diǎn) spark"))

根據(jù)反射的方法信息生成FunctionN

object ScalaGenerateFuns {

  def apply(func: String): (AnyRef, Array[DataType], DataType) = {
    val (argumentTypes, returnType) = getFunctionReturnType(func)
    (generateFunction(func, argumentTypes.length), argumentTypes, returnType)
  }

  //獲取方法的參數(shù)類(lèi)型及返回類(lèi)型
  private def getFunctionReturnType(func: String): (Array[DataType], DataType) = {
    val classInfo = ClassCreateUtils(func)
    val method = classInfo.defaultMethod
    val dataType = JavaTypeInference.inferDataType(method.getReturnType)._1
    (method.getParameterTypes.map(JavaTypeInference.inferDataType).map(_._1), dataType)
  }

  //生成22個(gè)Function
  def generateFunction(func: String, argumentsNum: Int): AnyRef = {
    lazy val instance = ClassCreateUtils(func).instance
    lazy val method = ClassCreateUtils(func).methods("apply")

    argumentsNum match {
      case 0 => new (() => Any) with Serializable with Logging {
        override def apply(): Any = {
          try {
            method.invoke(instance)
          } catch {
            case e: Exception =>
              logError(e.getMessage)
          }
        }
      }
      case 1 => new (Object => Any) with Serializable with Logging {
        override def apply(v1: Object): Any = {
          try {
            method.invoke(instance, v1)
          } catch {
            case e: Exception =>
              e.printStackTrace()
              logError(e.getMessage)
              null
          }
        }
      }
      case 2 => new ((Object, Object) => Any) with Serializable with Logging {
        override def apply(v1: Object, v2: Object): Any = {
          try {
            method.invoke(instance, v1, v2)
          } catch {
            case e: Exception =>
              logError(e.getMessage)
              null
          }
        }
      }
      //... 麻煩大佬自己去寫(xiě)剩下的20個(gè)了涩金,這里裝不下了谱醇,不然瀏覽器會(huì)崩潰的,然后電腦會(huì)重啟的步做,為了大佬的電腦著想副渴。
}

前戲我們都做完了,高潮的環(huán)節(jié)來(lái)了全度。

Spark 動(dòng)態(tài)加載代碼注冊(cè)UDF

我們最后再照著register的實(shí)現(xiàn)方式煮剧,把我們動(dòng)態(tài)Function注冊(cè)給Spark

1. val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
2. val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: Nil).toOption
3. def builder(e: Seq[Expression]) = if (e.length == 1) {
  ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), 
    Some(name), nullable, udfDeterministic = true)
}
4. functionRegistry.createOrReplaceTempFunction(name, builder)

1. 這句代碼比較好理解,就是獲取RT返回值類(lèi)型,就是我們的returnType
2. 就是參數(shù)類(lèi)型勉盅,對(duì)應(yīng)的修改如下

val inputTypes = Try(argumentTypes.toList).toOption

3. 剛開(kāi)始看到這個(gè)時(shí)候佑颇,我是一臉???,后來(lái)看源碼才發(fā)現(xiàn)builder是一種自定類(lèi)型,源碼如下

type FunctionBuilder = Seq[Expression] => Expression

改造方式如下

def builder(e: Seq[Expression]) = ScalaUDF(rf, returnType, e, inputTypes.getOrElse(Nil), Some(name))

4. 看到這句的時(shí)候我以為簡(jiǎn)單了菇篡,直接使用spark.sessionState.functionRegistry發(fā)現(xiàn)編譯不過(guò)漩符,看到private[sql]這個(gè)作用域的時(shí)候有點(diǎn)崩潰一喘,本來(lái)是想用下面的方式注冊(cè)的驱还。

val udf = UserDefinedFunction(rf, returnType, inputTypes).withName(name)
spark.udf.register(name, udf)

是小弟我想太多了,另辟捷徑凸克,做了那么多工作難道就白費(fèi)了议蟆?


關(guān)注可以安慰小弟

發(fā)現(xiàn)下面這句代碼,瞬間找到了家的方向萎战。

functionRegistry.registerFunction(new FunctionIdentifier(name), builder)

人生巔峰

到此咐容,大豬的分析與編碼已經(jīng)完成,下面是今天給大哥的解決方案蚂维。
方法實(shí)現(xiàn)可以通過(guò)查詢sql得到戳粒,或者接口都渴以。

    val spark = SparkSession
      .builder()
      .appName("test")
      .master("local[*]")
      .getOrCreate()

    val name = "hello"

    val (fun, argumentTypes, returnType) = ScalaSourceUDF(
      """
        |def apply(name:String)=name+" => hi"
        |""".stripMargin)

    val inputTypes = Try(argumentTypes.toList).toOption

    def builder(e: Seq[Expression]) = ScalaUDF(fun, returnType, e, inputTypes.getOrElse(Nil), Some(name))

    spark.sessionState.functionRegistry.registerFunction(new FunctionIdentifier(name), builder)

    val rdd = spark
      .sparkContext
      .parallelize(Array(("dounine", "20")))
      .map(x => Row.fromSeq(Array(x._1, x._2)))

    val types = StructType(
      Array(
        StructField("name", StringType),
        StructField("age", StringType)
      )
    )

    spark.createDataFrame(rdd, types).createTempView("log")

    spark.sql("select hello(name) from log").show(false)

真打臉虫啥,昨天還說(shuō)不行的蔚约。



?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市涂籽,隨后出現(xiàn)的幾起案子苹祟,更是在濱河造成了極大的恐慌,老刑警劉巖评雌,帶你破解...
    沈念sama閱讀 219,589評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件树枫,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡景东,警方通過(guò)查閱死者的電腦和手機(jī)砂轻,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,615評(píng)論 3 396
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)斤吐,“玉大人搔涝,你說(shuō)我怎么就攤上這事∏酰” “怎么了体谒?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,933評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)臼婆。 經(jīng)常有香客問(wèn)我抒痒,道長(zhǎng),這世上最難降的妖魔是什么颁褂? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,976評(píng)論 1 295
  • 正文 為了忘掉前任故响,我火速辦了婚禮傀广,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘彩届。我一直安慰自己伪冰,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,999評(píng)論 6 393
  • 文/花漫 我一把揭開(kāi)白布樟蠕。 她就那樣靜靜地躺著贮聂,像睡著了一般。 火紅的嫁衣襯著肌膚如雪寨辩。 梳的紋絲不亂的頭發(fā)上吓懈,一...
    開(kāi)封第一講書(shū)人閱讀 51,775評(píng)論 1 307
  • 那天,我揣著相機(jī)與錄音靡狞,去河邊找鬼耻警。 笑死,一個(gè)胖子當(dāng)著我的面吹牛甸怕,可吹牛的內(nèi)容都是我干的甘穿。 我是一名探鬼主播,決...
    沈念sama閱讀 40,474評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼梢杭,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼温兼!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起式曲,我...
    開(kāi)封第一講書(shū)人閱讀 39,359評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤妨托,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后吝羞,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體兰伤,經(jīng)...
    沈念sama閱讀 45,854評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,007評(píng)論 3 338
  • 正文 我和宋清朗相戀三年钧排,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了敦腔。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,146評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡恨溜,死狀恐怖符衔,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情糟袁,我是刑警寧澤判族,帶...
    沈念sama閱讀 35,826評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站项戴,受9級(jí)特大地震影響形帮,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,484評(píng)論 3 331
  • 文/蒙蒙 一辩撑、第九天 我趴在偏房一處隱蔽的房頂上張望界斜。 院中可真熱鬧,春花似錦合冀、人聲如沸各薇。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,029評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)峭判。三九已至,卻和暖如春晰洒,著一層夾襖步出監(jiān)牢的瞬間朝抖,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,153評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工谍珊, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人急侥。 一個(gè)月前我還...
    沈念sama閱讀 48,420評(píng)論 3 373
  • 正文 我出身青樓砌滞,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親坏怪。 傳聞我的和親對(duì)象是個(gè)殘疾皇子贝润,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,107評(píng)論 2 356

推薦閱讀更多精彩內(nèi)容

  • 昨天有位大哥問(wèn)小弟一個(gè)Spark問(wèn)題,他們想在不停Spark程序的情況下動(dòng)態(tài)更新UDF的邏輯铝宵,他一問(wèn)我這個(gè)問(wèn)題的時(shí)...
    kikiki2閱讀 148評(píng)論 0 2
  • 昨天有位大哥問(wèn)小弟一個(gè)Spark問(wèn)題打掘,他們想在不停Spark程序的情況下動(dòng)態(tài)更新UDF的邏輯,他一問(wèn)我這個(gè)問(wèn)題的時(shí)...
    大豬大豬閱讀 159評(píng)論 0 1
  • 昨天有位大哥問(wèn)小弟一個(gè)Spark問(wèn)題鹏秋,他們想在不停Spark程序的情況下動(dòng)態(tài)更新UDF的邏輯尊蚁,他一問(wèn)我這個(gè)問(wèn)題的時(shí)...
    大豬大豬閱讀 324評(píng)論 0 1
  • 昨天有位大哥問(wèn)小弟一個(gè)Spark問(wèn)題,他們想在不停Spark程序的情況下動(dòng)態(tài)更新UDF的邏輯侣夷,他一問(wèn)我這個(gè)問(wèn)題的時(shí)...
    kikiki2閱讀 209評(píng)論 0 1
  • 昨天有位大哥問(wèn)小弟一個(gè)Spark問(wèn)題横朋,他們想在不停Spark程序的情況下動(dòng)態(tài)更新UDF的邏輯,他一問(wèn)我這個(gè)問(wèn)題的時(shí)...
    kikiki2閱讀 293評(píng)論 0 3