合并元數(shù)據(jù)
如同ProtocolBuffer狱从,Avro膨蛮,Thrift一樣,Parquet也是支持元數(shù)據(jù)合并的季研。用戶可以在一開始就定義一個簡單的元數(shù)據(jù)敞葛,然后隨著業(yè)務(wù)需要,逐漸往元數(shù)據(jù)中添加更多的列与涡。在這種情況下制肮,用戶可能會創(chuàng)建多個Parquet文件,有著多個不同的但是卻互相兼容的元數(shù)據(jù)递沪。Parquet數(shù)據(jù)源支持自動推斷出這種情況,并且進(jìn)行多個Parquet文件的元數(shù)據(jù)的合并综液。
因為元數(shù)據(jù)合并是一種相對耗時的操作款慨,而且在大多數(shù)情況下不是一種必要的特性,從Spark 1.5.0版本開始谬莹,默認(rèn)是關(guān)閉Parquet文件的自動合并元數(shù)據(jù)的特性的檩奠。可以通過以下兩種方式開啟Parquet數(shù)據(jù)源的自動合并元數(shù)據(jù)的特性:
1附帽、讀取Parquet文件時埠戳,將數(shù)據(jù)源的選項,mergeSchema蕉扮,設(shè)置為true
2整胃、使用SQLContext.setConf()方法,將spark.sql.parquet.mergeSchema參數(shù)設(shè)置為true
案例:合并學(xué)生的基本信息喳钟,和成績信息的元數(shù)據(jù)
Java版本
public class ParquetMergeSchema {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("ParquetMergeSchemaJava").setMaster("local");
JavaSparkContext sparkContext = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sparkContext);
// 創(chuàng)建一個DataFrame屁使,作為學(xué)生的基本信息在岂,并寫入一個parquet文件中
List<String> studentWithNameAndAge = new ArrayList<String>();
studentWithNameAndAge.add("zhaojun,18");
studentWithNameAndAge.add("fengxiangbin,17");
JavaRDD<String> studentWithNameAndAgeRDD = sparkContext.parallelize(studentWithNameAndAge, 2);
JavaRDD<Row> studentWithNameAndAgeRowRDD = studentWithNameAndAgeRDD.map(new Function<String, Row>() {
@Override
public Row call(String v1) throws Exception {
return RowFactory.create(v1.split(",")[0], Integer.parseInt(v1.split(",")[1]));
}
});
List<StructField> fieldList = new ArrayList<StructField>();
fieldList.add(DataTypes.createStructField("name", DataTypes.StringType, true));
fieldList.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
StructType structType = DataTypes.createStructType(fieldList);
DataFrame studentWithNameAndAgeDF = sqlContext.createDataFrame(studentWithNameAndAgeRowRDD, structType);
studentWithNameAndAgeDF.write().format("parquet").mode(SaveMode.Append).save("E:\\testdata\\sparksql\\student");
// 創(chuàng)建第二個DataFrame,作為學(xué)生的成績信息蛮寂,并寫入一個parquet文件中
List<String> studentWithNameAndGrade = new ArrayList<String>();
studentWithNameAndGrade.add("zhaoj,B");
studentWithNameAndGrade.add("fengxiang,A");
JavaRDD<String> studentWithNameAndGradeRDD = sparkContext.parallelize(studentWithNameAndGrade, 2);
JavaRDD<Row> studentWithNameAndGradeRowRDD = studentWithNameAndGradeRDD.map(new Function<String, Row>() {
@Override
public Row call(String v1) throws Exception {
return RowFactory.create(v1.split(",")[0], v1.split(",")[1]);
}
});
fieldList = new ArrayList<StructField>();
fieldList.add(DataTypes.createStructField("name", DataTypes.StringType, true));
fieldList.add(DataTypes.createStructField("grade", DataTypes.StringType, true));
structType = DataTypes.createStructType(fieldList);
DataFrame studentWithNameAndGradeDF = sqlContext.createDataFrame(studentWithNameAndGradeRowRDD, structType);
studentWithNameAndGradeDF.write().format("parquet").mode(SaveMode.Append).save("E:\\testdata\\sparksql\\student");
// 首先蔽午,第一個DataFrame和第二個DataFrame的元數(shù)據(jù)肯定是不一樣的吧
// 一個是包含了name和age兩個列,一個是包含了name和grade兩個列
// 所以酬蹋, 這里期望的是及老,讀取出來的表數(shù)據(jù),自動合并兩個文件的元數(shù)據(jù)范抓,出現(xiàn)三個列骄恶,name、age尉咕、grade
// 用mergeSchema的方式叠蝇,讀取students表中的數(shù)據(jù)年缎,進(jìn)行元數(shù)據(jù)的合并
DataFrame df = sqlContext.read().option("mergeSchema", "true").parquet("E:\\testdata\\sparksql\\student");
df.schema();
df.show();
}
}
Scala版本
object ParquetMergeSchema {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("ParquetPartitionDiscoveryScala")
val sparkContext = new SparkContext(conf)
val sqlContext = new SQLContext(sparkContext)
import sqlContext.implicits._
// 創(chuàng)建一個DataFrame悔捶,作為學(xué)生的基本信息,并寫入一個parquet文件中
val studentWithNameAndAge = Array(("Zhao Jun", 18),("Feng Xiangbin", 17))
val studentWithNameAndAgeDF = sparkContext.parallelize(studentWithNameAndAge, 2).toDF("name", "age")
studentWithNameAndAgeDF.write.format("parquet").mode(SaveMode.Append).save("E:\\testdata\\sparksql\\student_scala")
// 創(chuàng)建第二個DataFrame单芜,作為學(xué)生的成績信息蜕该,并寫入一個parquet文件中
val studentWithNameAndGrade = Array(("Zhao Jun", "B"),("Feng Xiangbin", "A"))
val studentWithNameAndGradeDF = sparkContext.parallelize(studentWithNameAndGrade, 2).toDF("name", "grade")
studentWithNameAndGradeDF.write.format("parquet").mode(SaveMode.Append).save("E:\\testdata\\sparksql\\student_scala")
// 首先,第一個DataFrame和第二個DataFrame的元數(shù)據(jù)肯定是不一樣的吧
// 一個是包含了name和age兩個列洲鸠,一個是包含了name和grade兩個列
// 所以堂淡, 這里期望的是,讀取出來的表數(shù)據(jù)扒腕,自動合并兩個文件的元數(shù)據(jù)绢淀,出現(xiàn)三個列,name瘾腰、age皆的、grade
val df = sqlContext.read.option("mergeSchema", "true").load("E:\\testdata\\sparksql\\student_scala")
df.schema
df.show()
}
}