這個有兩種方法
1 使用zipWithUniqueId
獲取id 并重建 DataFrame.
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}
val df =Seq(("a", -1.0), ("b", -2.0), ("c", -3.0)).toDF("foo", "bar")
// 獲取df 的表頭
val s = df.schema
// 將原表轉(zhuǎn)換成帶有rdd,
//再轉(zhuǎn)換成帶有id的rdd,
//再展開成Seq方便轉(zhuǎn)化成 Dataframe
val rows = df.rdd.zipWithUniqueId.map{case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}
// 再由 row 根據(jù)原表頭進行轉(zhuǎn)換
val dfWithPK = spark.createDataFrame( rows, StructType(StructField("id", LongType, false) +: s.fields))
2 直接使用spark 自己的api,monotonicallyIncreasingId
這個id雖然是唯一的,但是不能從零開始,也不是順序排列,可以簡單理解為是隨機產(chǎn)生的標(biāo)識碼
import org.apache.spark.sql.functions.monotonicallyIncreasingId
df.withColumn("id", monotonicallyIncreasingId).show()
// |foo| bar| id|
// | a|-1.0|17179869184|
// | b|-2.0|42949672960|
// | c|-3.0|60129542144|