load操作:主要用于加載數(shù)據(jù),創(chuàng)建出DataFrame
save操作:主要用于將DataFrame中的數(shù)據(jù)保存到文件中
代碼示例(默認(rèn)為parquet數(shù)據(jù)源類型)
package wujiadong_sparkSQL
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by Administrator on 2017/2/3.
*/
object GenericLoadSave {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("GenericLoadSave")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
//load默認(rèn)是加載parquet格式文件
val usersDF = sqlContext.read.load("hdfs://master:9000/student/2016113012/spark/users.parquet")
usersDF.write.save("hdfs://master:9000/student/2016113012/parquet_out1")
}
}
提交集群運(yùn)行
hadoop@master:~/wujiadong$ spark-submit --class wujiadong_sparkSQL.GenericLoadSave --executor-memory 500m --total-executor-cores 2 /home/hadoop/wujiadong/wujiadong.spark.jar
運(yùn)行后查看是否保存成功hadoop@slave01:~$ hadoop fs -ls /student/2016113012/parquet_out1
17/02/03 12:06:26 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 4 items
-rw-r--r-- 3 hadoop supergroup 0 2017-02-03 12:05 /student/2016113012/parquet_out1/_SUCCESS
-rw-r--r-- 3 hadoop supergroup 476 2017-02-03 12:05 /student/2016113012/parquet_out1/_common_metadata
-rw-r--r-- 3 hadoop supergroup 841 2017-02-03 12:05 /student/2016113012/parquet_out1/_metadata
-rw-r--r-- 3 hadoop supergroup 864 2017-02-03 12:05 /student/2016113012/parquet_out1/part-r-00000-8025e2a8-ab06-4558-9d76-bb2cad0042cf.gz.parquet
手動指定數(shù)據(jù)源類型(進(jìn)行格式轉(zhuǎn)換很方便)
默認(rèn)情況下不指定數(shù)據(jù)源類型的話就是parquet類型
代碼示例(手動指定數(shù)據(jù)源類型)
package wujiadong_sparkSQL
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by Administrator on 2017/2/3.
*/
object ManuallySpecifyOptions {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("ManuallySpecifyOptions")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
//load讀其他格式文件如json時誊册,需要先用format指定格式
val peopleDF = sqlContext.read.format("json").load("hdfs://master:9000/student/2016113012/people.json")
peopleDF.select("name").write.format("parquet").save("hdfs://master:9000/sudent/2016113012/people_out1")
}
}
提交集群運(yùn)行
hadoop@master:~/wujiadong$ spark-submit --class wujiadong_sparkSQL.ManuallySpecifyOptions --executor-memory 500m --total-executor-cores 2 /home/hadoop/wujiadong/wujiadong.spark.jar
查看是否運(yùn)行成功
hadoop@master:~/wujiadong$ hadoop fs -ls hdfs://master:9000/sudent/2016113012/people_out1
17/02/03 12:24:27 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 4 items
-rw-r--r-- 3 hadoop supergroup 0 2017-02-03 12:22 hdfs://master:9000/sudent/2016113012/people_out1/_SUCCESS
-rw-r--r-- 3 hadoop supergroup 207 2017-02-03 12:22 hdfs://master:9000/sudent/2016113012/people_out1/_common_metadata
-rw-r--r-- 3 hadoop supergroup 327 2017-02-03 12:22 hdfs://master:9000/sudent/2016113012/people_out1/_metadata
-rw-r--r-- 3 hadoop supergroup 352 2017-02-03 12:22 hdfs://master:9000/sudent/2016113012/people_out1/part-r-00000-4d1a62a4-f550-4bde-899f-35e9aabfdc0c.gz.parquet
Save Mode
SaveMode.ErrorIfExists (默認(rèn)):如果目標(biāo)位置已經(jīng)存在數(shù)據(jù),那么拋出一個異常
SaveMode.Append:如果目標(biāo)位置已經(jīng)存在數(shù)據(jù)妈候,那么將數(shù)據(jù)追加進(jìn)去
SaveMode.Overwrite:如果目標(biāo)位置已經(jīng)存在數(shù)據(jù)色鸳,那么就將已經(jīng)存在的數(shù)據(jù)刪除,用新數(shù)據(jù)進(jìn)行覆蓋
SaveMode.Ignore:如果目標(biāo)位置已經(jīng)存在數(shù)據(jù)胀蛮,那么就忽略冷溶,不做任何操作
代碼示例1
package wujiadong_sparkSQL
import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by Administrator on 2017/2/3.
*/
object SaveModelTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SaveModelTest")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val peopleDF = sqlContext.read.format("json").load("hdfs://master:9000/student/2016113012/people.json")
peopleDF.save("hdfs://master:9000/student/2016113012/people.json",SaveMode.ErrorIfExists)
}
}
因為這種save mode文件已存在就報錯
package wujiadong_sparkSQL
import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by Administrator on 2017/2/3.
*/
object SaveModelTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SaveModelTest")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val peopleDF = sqlContext.read.format("json").load("hdfs://master:9000/student/2016113012/people.json")
peopleDF.save("hdfs://master:9000/student/2016113012/people.json",SaveMode.Overwrite)
}
}
這種會直接覆蓋