Spark SQL是Spark中的一個(gè)模塊贷掖,主要用于進(jìn)行結(jié)構(gòu)化數(shù)據(jù)的處理刑赶。它提供的最核心的編程抽象锯厢,就是DataFrame谆膳。同時(shí)Spark SQL還可以作為分布式的SQL查詢引擎叭爱。Spark SQL最重要的功能之一,就是從Hive中查詢數(shù)據(jù)漱病。
DataFrame是以列的形式組織的买雾,分布式的結(jié)構(gòu)化數(shù)據(jù)集合。它其實(shí)和關(guān)系型數(shù)據(jù)庫中的表非常類似杨帽,但是底層做了很多的優(yōu)化漓穿。DataFrame可以通過很多來源進(jìn)行構(gòu)建,包括:結(jié)構(gòu)化的數(shù)據(jù)文件注盈,Hive中的表晃危,外部的關(guān)系型數(shù)據(jù)庫,以及RDD老客。有了DataFrame后僚饭,可以非常方便地使用DataFrame API和SQL來處理數(shù)據(jù)。
-
創(chuàng)建DataFrame(三種)
- 第一種
val dfUsers = spark.read.format("csv").option("header", "true").load("file:///root/data/user.csv") dfUsers.printSchema // root // |-- user_id: string (nullable = true) // |-- locale: string (nullable = true) // |-- birthyear: string (nullable = true) // |-- gender: string (nullable = true) // |-- joinedAt: string (nullable = true) // |-- location: string (nullable = true) // |-- timezone: string (nullable = true)
- 第二種
scala> import org.apache.spark.sql.types._ import org.apache.spark.sql.types._ scala> import org.apache.spark.sql.Row import org.apache.spark.sql.Row scala> import spark.implicits._ import spark.implicits._ // 讀取文件并轉(zhuǎn)換成RDD[Row]類型 scala> val uRdd = spark.sparkContext.textFile("file:///root/data/user.csv") .map(x = x.split(",")) .mapPartitionsWithIndex((index, iter) => if (index == 0) iter.drop(1) else iter) .map(Row.fromSeq(_)) uRdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[26] at map at <console>:30 // 定義Schema scala> val schema = StructType(Array(StructField("user_id", StringType, true), StructField("locale", StringType, true),StructField("birthyear", StringType, true), StructField("gender",StringType, true), StructField("joinedAt", StringType, true), StructField("location", StringType, true), StructField("timezone", StringType, true))) schema: org.apache.spark.sql.types.StructType = StructType(StructField(user_id,StringType,true), StructField(locale,StringType,true), StructField(birthyear,StringType,true), StructField(gender,StringType,true), StructField(joinedAt,StringType,true), StructField(location,StringType,true), StructField(timezone,StringType,true)) // 創(chuàng)建DataFrame scala> val dfUsers = spark.createDataFrame(uRdd, schema) dfUsers: org.apache.spark.sql.DataFrame = [user_id: string, locale: string ... 5 more fields] scala> dfUsers.printSchema // root // |-- user_id: string (nullable = true) // |-- locale: string (nullable = true) // |-- birthyear: string (nullable = true) // |-- gender: string (nullable = true) // |-- joinedAt: string (nullable = true) // |-- location: string (nullable = true) // |-- timezone: string (nullable = true) scala> dfUsers show 3 // +----------+------+---------+------+--------------------+------------------+--------+ // | user_id|locale|birthyear|gender| joinedAt| location|timezone| // +----------+------+---------+------+--------------------+------------------+--------+ // |3197468391| id_ID| 1993| male|2012-10-02T06:40:...| Medan Indonesia| 480| // |3537982273| id_ID| 1992| male|2012-09-29T18:03:...| Medan Indonesia| 420| // | 823183725| en_US| 1975| male|2012-10-06T03:14:...|Stratford Ontario| -240| // +----------+------+---------+------+--------------------+------------------+--------+
注:由于該文件首行是列名沿量,所以使用mapPartitionsWithIndex()函數(shù)過濾掉
- 第三種
scala> val dfUsers = spark.sparkContext.textFile("file:///root/data/users.csv") .map(_.split(",")) .mapPartitionsWithIndex((index, iter) => if (index == 0) iter.drop(1) else iter) .map(x => (x(0), x(1), x(2), x(3), x(4), x(5), x(6))) .toDF("user_id", "locale", "birthyear", "gender", "joinedAt", "location", "timezone") dfUsers: org.apache.spark.sql.DataFrame = [user_id: string, locale: string ... 5 more fields] scala> dfUsers show 3 // +----------+------+---------+------+--------------------+------------------+--------+ // | user_id|locale|birthyear|gender| joinedAt| location|timezone| // +----------+------+---------+------+--------------------+------------------+--------+ // |3197468391| id_ID| 1993| male|2012-10-02T06:40:...| Medan Indonesia| 480| // |3537982273| id_ID| 1992| male|2012-09-29T18:03:...| Medan Indonesia| 420| // | 823183725| en_US| 1975| male|2012-10-06T03:14:...|Stratford Ontario| -240| // +----------+------+---------+------+--------------------+------------------+--------+
- 把DataFrame持久化到內(nèi)存
dfUsers.persist()
- 保存到Hive表
dfUser.write.mode("overwrite").saveAsTable("sparkdf.users")
-
DataFrame Operators
- 常用操作
map, flatMap sample, filter sort pipe groupBy, groupByKey, cogroup reduce, reduceByKey, fold partitionBy ? zip, union join, crossJoin, leftOuterJoin, rightOuterJoin count, save first, take
- 基本操作
1、 cache()同步數(shù)據(jù)的內(nèi)存 2冤荆、 columns 返回一個(gè)string類型的數(shù)組朴则,返回值是所有列的名字 3、 dtypes返回一個(gè)string類型的二維數(shù)組钓简,返回值是所有列的名字以及類型 4乌妒、 explan()打印執(zhí)行計(jì)劃 物理的 5、 explain(n:Boolean) 輸入值為 false 或者true 外邓,返回值是unit 默認(rèn)是false 撤蚊,如果輸入true 將會(huì)打印 邏輯的和物理的 6、 isLocal 返回值是Boolean類型损话,如果允許模式是local返回true 否則返回false 7侦啸、 persist(newlevel:StorageLevel) 返回一個(gè)dataframe.this.type 輸入存儲(chǔ)模型類型 8、 printSchema() 打印出字段名稱和類型 按照樹狀結(jié)構(gòu)來打印 9丧枪、 registerTempTable(tablename:String) 返回Unit 光涂,將df的對(duì)象只放在一張表里面,這 個(gè)表隨著對(duì)象的刪除而刪除了 10拧烦、 schema 返回structType 類型忘闻,將字段名稱和類型按照結(jié)構(gòu)體類型返回 11、 toDF()返回一個(gè)新的dataframe類型的 12恋博、 toDF(colnames:String*)將參數(shù)中的幾個(gè)字段返回一個(gè)新的dataframe類型的齐佳, 13私恬、 unpersist() 返回dataframe.this.type 類型,去除模式中的數(shù)據(jù) 14炼吴、 unpersist(blocking:Boolean)返回dataframe.this.type類型 true 和unpersist是一樣的作用false 是去除RDD
- 聚合查詢操作
1本鸣、 agg(expers:column*) 返回dataframe類型 ,同數(shù)學(xué)計(jì)算求值 df.agg(max("age"), avg("salary")) df.groupBy().agg(max("age"), avg("salary")) 2缺厉、 agg(exprs: Map[String, String]) 返回dataframe類型 永高,同數(shù)學(xué)計(jì)算求值 map類型的 df.agg(Map("age" -> "max", "salary" -> "avg")) df.groupBy().agg(Map("age" -> "max", "salary" -> "avg")) 3、 agg(aggExpr: (String, String), aggExprs: (String, String)*) 返回dataframe類型 提针,同數(shù)學(xué)計(jì)算求值 df.agg(Map("age" -> "max", "salary" -> "avg")) df.groupBy().agg(Map("age" -> "max", "salary" -> "avg")) 4命爬、 apply(colName: String) 返回column類型,捕獲輸入進(jìn)去列的對(duì)象 5辐脖、 as(alias: String) 返回一個(gè)新的dataframe類型饲宛,就是原來的一個(gè)別名 6、 col(colName: String) 返回column類型嗜价,捕獲輸入進(jìn)去列的對(duì)象 7艇抠、 cube(col1: String, cols: String*) 返回一個(gè)GroupedData類型,根據(jù)某些字段來匯總 8久锥、 distinct 去重 返回一個(gè)dataframe類型 9家淤、 drop(col: Column) 刪除某列 返回dataframe類型 10、 dropDuplicates(colNames: Array[String]) 刪除相同的列 返回一個(gè)dataframe 11瑟由、 except(other: DataFrame) 返回一個(gè)dataframe絮重,返回在當(dāng)前集合存在的在其他集合不存在的 12、 explode[A, B](inputColumn: String, outputColumn: String)(f: (A) ? TraversableOnce[B])(implicit arg0: scala.reflect.api.JavaUniverse.TypeTag[B]) 返回值是dataframe類型歹苦,這個(gè) 將一個(gè)字段進(jìn)行更多行的拆分 df.explode("name","names") {name :String=> name.split(" ")}.show(); 將name字段根據(jù)空格來拆分青伤,拆分的字段放在names里面 13、 filter(conditionExpr: String): 刷選部分?jǐn)?shù)據(jù)殴瘦,返回dataframe類型 df.filter("age>10").show(); df.filter(df("age")>10).show(); df.where(df("age")>10).show(); 都可以 14狠角、 groupBy(col1: String, cols: String*) 根據(jù)某寫字段來匯總返回groupedate類型 df.groupBy("age").agg(Map("age" ->"count")).show();df.groupBy("age").avg().show();都可以 15、 intersect(other: DataFrame) 返回一個(gè)dataframe蚪腋,在2個(gè)dataframe都存在的元素 16丰歌、 join(right: DataFrame, joinExprs: Column, joinType: String) 一個(gè)是關(guān)聯(lián)的dataframe,第二個(gè)關(guān)聯(lián)的條件屉凯,第三個(gè)關(guān)聯(lián)的類型:inner, outer, left_outer, right_outer, leftsemi df.join(ds,df("name")===ds("name") and df("age")===ds("age"),"outer").show(); 17动遭、 limit(n: Int) 返回dataframe類型 去n 條數(shù)據(jù)出來 18、 na: DataFrameNaFunctions 神得,可以調(diào)用dataframenafunctions的功能區(qū)做過濾 df.na.drop().show(); 刪除為空的行 19厘惦、 orderBy(sortExprs: Column*) 做alise排序 20、 select(cols:string*) dataframe 做字段的刷選 df.select($"colA", $"colB" + 1) 21、 selectExpr(exprs: String*) 做字段的刷選 df.selectExpr("name","name as names","upper(name)","age+1").show(); 22宵蕉、 sort(sortExprs: Column*) 排序 df.sort(df("age").desc).show(); 默認(rèn)是asc 23酝静、 unionAll(other:Dataframe) 合并 df.unionAll(ds).show(); 24、 withColumnRenamed(existingName: String, newName: String) 修改列表 df.withColumnRenamed("name","names").show(); 25羡玛、 withColumn(colName: String, col: Column) 增加一列 df.withColumn("aa",df("name")).show();
- 行動(dòng)操作
1别智、 collect() ,返回值是一個(gè)數(shù)組,返回dataframe集合所有的行 2稼稿、 collectAsList() 返回值是一個(gè)java類型的數(shù)組薄榛,返回dataframe集合所有的行 3、 count() 返回一個(gè)number類型的让歼,返回dataframe集合的行數(shù) 4敞恋、 describe(cols: String*) 返回一個(gè)通過數(shù)學(xué)計(jì)算的類表值(count, mean, stddev, min, and max),這個(gè)可以傳多個(gè)參數(shù)谋右,中間用逗號(hào)分隔硬猫,如果有字段為空,那么不參與運(yùn)算改执,只這對(duì)數(shù)值類型的字段啸蜜。例如df.describe("age", "height").show() 5、 first() 返回第一行 辈挂,類型是row類型 6衬横、 head() 返回第一行 ,類型是row類型 7终蒂、 head(n:Int)返回n行 蜂林,類型是row 類型 8、 show()返回dataframe集合的值 默認(rèn)是20行后豫,返回類型是unit 9悉尾、 show(n:Int)返回n行突那,挫酿,返回值類型是unit 10、 table(n:Int) 返回n行 愕难,類型是row 類型