spark 2.X與1.x的區(qū)別
spark sql 2.x以上版本和1.x版本有個很大的區(qū)別:spark1.x的sqlContext在spark2.0中被整合到sparkSession,故而利用spark-shell客戶端操作會有些許不同温赔,具體如下文所述。
載入外部數(shù)據(jù)的load方法
在spark sql中有一個DataStreamReader封裝了讀取各種格式的外部數(shù)據(jù)的方法奋隶,其中,format(str)用于傳數(shù)據(jù)格式赡若,比如csv,json达布,parquet,jdbc等逾冬;load(path)用于傳入數(shù)據(jù)的地址黍聂,其中可以傳入本地?cái)?shù)據(jù)路徑也可以是hdfs上的路徑,在官網(wǎng)給的demo中都是傳的本地?cái)?shù)據(jù)路徑:比如:
val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
-
load(path)的源碼:注意:load不能l載入hive的數(shù)據(jù)身腻,hive數(shù)據(jù)需要使用table方法來載入产还。
def load(path: String): DataFrame = { option("path", path).load() } def load(): DataFrame = { if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) { throw new AnalysisException("Hive data source can only be used with tables, you can not " + "read files of Hive data source directly.") } val dataSource = DataSource( sparkSession, userSpecifiedSchema = userSpecifiedSchema, className = source, options = extraOptions.toMap) Dataset.ofRows(sparkSession, StreamingRelation(dataSource)) }
-
【hdfs路徑】寫入寫出hdfs上的路徑,則需要加入hdfs的完全路徑嘀趟,如:
studentDF.write.parquet("hdfs://h4:9000/test/spark/parquet")
studentDF.write.json("hdfs://h4:9000/test/spark/json")
spark sql與mysql 和hdfs交互的實(shí)戰(zhàn)
- 1.添加jar包
- 正常配置不再贅述脐区,這里如果需要讀取MySQL數(shù)據(jù),則需要在當(dāng)前用戶下的環(huán)境變量里額外加上JDBC的驅(qū)動jar包 例如我的是:mysql-connector-java-5.1.18-bin.jar 存放路徑是$SPARK_HOME/jars 所以需要額外配置環(huán)境變量
export PATH = $PATH:$SPARK_HOME/jars
-
2.啟動spark-shell
bin/spark-shell --master=spark://h4:7077 --driver-class-path=./jars/mysql-connector-java-5.1.18-bin.jar -- jars=./jars/mysql-connector-java-5.1.18-bin.jar
3.代碼
spark-sql采用sql方式執(zhí)行操作正常啟動之后可以先通過spark-sql建立數(shù)據(jù)庫并切換到當(dāng)前新建的數(shù)據(jù)庫
spark.sql("create database spark")
可以查看下是否新建成功
spark.sql("show databases ").show
創(chuàng)建成功之后切換數(shù)據(jù)庫
spark.sql("use spark")
現(xiàn)在開始讀取遠(yuǎn)程MySQL數(shù)據(jù)
val sql = """CREATE TABLE student USING org.apache.spark.sql.jdbc OPTIONS ( url "jdbc:mysql://worker2:3306/spark", dbtable "student", user "root", password "root" )"""
執(zhí)行:
spark.sql(sql);
等待執(zhí)行完畢之后她按,將表數(shù)據(jù)存入緩存
spark.sql("cache table student")
此時即可進(jìn)行操作牛隅,例如:val studentDF = spark.sql("select id,name from student")
完成需求查詢之后,可將結(jié)果以parquet的格式保存到HDFS
studentDF.write.parquet("hdfs://h4:9000/test/spark/parquet")
也可以寫成json格式
studentDF.write.json("hdfs://h4:9000/test/spark/json")
- 4.性能:
集群狀態(tài)下酌泰,硬件配置32G內(nèi)存 2T硬盤媒佣,spark配了4核,內(nèi)存分配了20G的情況下陵刹,測試速度如下: 2700萬條記錄的表導(dǎo)入spark用時1秒以內(nèi) sparksql將其以json格式存入HDFS用時288秒默伍,共1.0G,將其以parquet格式存入HDFS用時207秒衰琐,共86.6M也糊,可見parquet的優(yōu)勢還是比較明顯