SparkSQL中可以創(chuàng)建自定義函數(shù)UDF對dataframe進(jìn)行操作篮赢,UDF是一對一的關(guān)系,用于給dataframe增加一列數(shù)據(jù)的場景示辈。 每次傳入一行數(shù)據(jù)矾麻,該行數(shù)據(jù)可以是一列,也可以是多列胰耗,進(jìn)行一頓操作后柴灯,最終只能輸出該新增列的一個值。
UDF函數(shù)有兩種注冊方式:
- spark.udf.register() // spark是SparkSession對象
- udf() // 需要import org.apache.spark.sql.functions._
詳細(xì)見示例代碼如下:
import com.longi.bigdata.spark.utils.SparkSessionCreate
import org.apache.spark.sql.functions._
/**
* Author: whn
* Date: 2019-12-19 18:12
* Version: 1.0
* Function:
*/
object TwoWaysOfUsingUDF {
case class Person(name: String, age: Int)
def main(args: Array[String]): Unit = {
val spark = SparkSessionCreate.getSparkSession("TwoWaysOfUsingUDF", "local")
import spark.implicits._
//people.txt數(shù)據(jù)如下
//Michael, 10
//Andy, 60
//Justin, 40
val inputDF = spark.sparkContext
.textFile("file:\\E:\\ideaProjects\\sparketl\\people.txt")
.map(_.split(","))
.map((attributes: Array[String]) => Person(attributes(0), attributes(1).trim.toInt)) // 將Array[String]轉(zhuǎn)換為自定義的Person類型
.toDF("name", "age") // rdd轉(zhuǎn)換為DF,并為每個字段命名冬三,字段順序與case class對應(yīng)
// TODO 方式1. spark.udf.register方式注冊udf用于sql語法中
spark.udf.register("age_judge", ageJudge _)
// selectExpr是Dataframe的方法,屬于DSL編程窝爪,但是該方法可以解析sql語句
val res1 = inputDF.selectExpr("name", "age", "age_judge(age) AS age_divide")
res1.show()
inputDF.createOrReplaceTempView("temp")
val res2 = spark.sql(
"""
|SELECT name, age, age_judge(age) AS age_divide
|FROM temp
""".stripMargin)
res2.show()
// TODO 方式2. udf()注冊udf用于withColumn方法中
val ageJudgeUDF = udf(ageJudge _)
val res3 = inputDF.withColumn("age_divide", ageJudgeUDF(inputDF("age")))
res3.show()
spark.stop()
}
def ageJudge(age: Int): String = {
age match {
case ages =>
if (ages >= 1 && ages <= 3) "嬰兒"
else if (ages >= 4 && ages <= 18) "少年"
else if (ages >= 19 && ages <= 45) "青年"
else if (ages >= 46 && ages <= 60) "中年"
else if (ages >= 61 && ages <= 130) "老年"
else "數(shù)據(jù)異常"
case _ => "數(shù)據(jù)異常"
}
}
}