將RDD[Map[String,String]] 轉化為展平 DataFrame隶债,類似于pyspark 中 dict 結構toDF的效果内颗。
input
val mapRDD: RDD[Map[String, String]] = sc.parallelize(Seq(
Map("name" -> "zhangsan", "age" -> "18", "addr" -> "bj"),
Map("name" -> "lisi", "age" -> "20", "addr" -> "hz"),
))
output
name age addr
zhangsan 18 bj
lisi 20 hz
1. Map中元素固定
每個 Map 只有三個元素的情況下
val columns=mapRDD.take(1).flatMap(_.keys)
val resultantDF=mapRDD.filter(_.nonEmpty).map{m=>
val seq=m.values.toSeq
(seq(0),seq(1),seq(2))
}.toDF(columns:_*)
resultantDF.show()
2. Map中元素不固定
RDD[Map[String,String]] -> RDD[Row] -> DataFrame
def map2DF(spark: SparkSession, rdd: RDD[Map[String, String]]): DataFrame = {
val cols = rdd.take(1).flatMap(_.keys)
val resRDD = rdd.filter(_.nonEmpty).map { m =>
val seq = m.values.toSeq
Row.fromSeq(seq)
}
val fields = cols.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
spark.createDataFrame(resRDD, schema)
}