Dataset=RDD+schema
Dataset幾乎就是一個(gè)RDD,除了它還包括一個(gè)schema吕座,這個(gè)schema很多時(shí)候也是自動(dòng)推導(dǎo)出來的,最簡(jiǎn)單的schema是包含一個(gè)名為value的列,它的類型可以是String弦追,Int...
如下代碼創(chuàng)建一個(gè)Dataset:
scala> import spark.implicits._
import spark.implicits._
scala> val ds = Seq(("bluejoe", 100), ("alex", 200)).toDS
ds: org.apache.spark.sql.Dataset[(String, Int)] = [_1: string, _2: int]
scala> ds.schema
res0: org.apache.spark.sql.types.StructType = StructType(StructField(_1,StringType,true), StructField(_2,IntegerType,false))
scala> ds.collect
res1: Array[(String, Int)] = Array((bluejoe,100), (alex,200))
這個(gè)Dataset就包含了2行記錄,每個(gè)記錄是一個(gè)Tuple2花竞,如:(bluejoe,100)
可以針對(duì)這個(gè)Dataset做SQL查詢:
scala> ds.select("_1").collect
res4: Array[org.apache.spark.sql.Row] = Array([bluejoe], [alex])
scala> ds.show
+-------+---+
| _1| _2|
+-------+---+
|bluejoe|100|
| alex|200|
+-------+---+
scala> ds.select("_1").show
+-------+
| _1|
+-------+
|bluejoe|
| alex|
+-------+
scala> ds.select(ds("_1")).show
+-------+
| _1|
+-------+
|bluejoe|
| alex|
+-------+
scala> ds.select($"_1").show
+-------+
| _1|
+-------+
|bluejoe|
| alex|
+-------+
ds.select("_1")與ds.select(ds("_1"))劲件,以及ds.select($"_1")等價(jià)
$"_1"神奇嗎?一點(diǎn)都不神奇约急,$()其實(shí)是一個(gè)函數(shù):
implicit class StringToColumn(val sc: StringContext) {
def $(args: Any*): ColumnName = {
new ColumnName(sc.s(args: _*))
}
}
SQL列還可以進(jìn)行運(yùn)算操作:
scala> ds.select(ds("_2")+10).show
+---------+
|(_2 + 10)|
+---------+
| 110|
| 210|
+---------+
scala> ds.select($"_2"+10).show
+---------+
|(_2 + 10)|
+---------+
| 110|
| 210|
+---------+
+零远、-等運(yùn)算符其實(shí)也被ColumnName定義了,這里不再贅述厌蔽。
也可以使用map()對(duì)Dataset進(jìn)行變形:
scala> ds.map(x=>(x._1.toUpperCase, x._2+10)).show
+-------+---+
| _1| _2|
+-------+---+
|BLUEJOE|110|
| ALEX|210|
+-------+---+
可以看出牵辣,map()函數(shù)會(huì)生成新的schema:
scala> ds.map(x=>(x._1.toUpperCase, x._2+10, true)).show
+-------+---+----+
| _1| _2| _3|
+-------+---+----+
|BLUEJOE|110|true|
| ALEX|210|true|
+-------+---+----+
除了將一個(gè)Tuple轉(zhuǎn)換成另外一個(gè)Tuple,還可以轉(zhuǎn)成一個(gè)JavaBean:
scala> case class Person(name:String,age:Int){};
defined class Person
scala> val ds2=ds.map(x=>Person(x._1.toUpperCase, x._2+10))
ds2: org.apache.spark.sql.Dataset[Person] = [name: string, age: int]
scala> ds2.show
+-------+---+
| name|age|
+-------+---+
|BLUEJOE|110|
| ALEX|210|
+-------+---+
注意這個(gè)新的Dataset的每一行變成了一個(gè)Person對(duì)象:
scala> ds2.collect
res36: Array[Person] = Array(Person(BLUEJOE,110), Person(ALEX,210))
注意奴饮,不是任何對(duì)象都可以放到Dataset中:
scala> import org.apache.spark.sql._
import org.apache.spark.sql._
scala> ds.map(x=>Row(x._1.toUpperCase, x._2+10)).show
<console>:32: error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
ds.map(x=>Row(x._1.toUpperCase, x._2+10)).show
DataFrame是Dataset[Row]的別名
A DataFrame is a Dataset organized into named columns.
Dataset可以轉(zhuǎn)成DataFrame:
scala> val df=ds.toDF
df: org.apache.spark.sql.DataFrame = [_1: string, _2: int]
scala> df.collect
res33: Array[org.apache.spark.sql.Row] = Array([bluejoe,100], [alex,200])
注意看到DataFrame的每行確實(shí)是一個(gè)Row纬向,觀察源代碼:
def toDF(): DataFrame = new Dataset[Row](sparkSession, queryExecution, RowEncoder(schema))
實(shí)際上,toDF()使用一個(gè)RowEncoder來實(shí)現(xiàn)Tuple到Row的轉(zhuǎn)碼戴卜。
也可以使用as()函數(shù)來轉(zhuǎn)換成DataFrame:
scala> ds.as[Row](RowEncoder(ds.schema)).collect
res55: Array[org.apache.spark.sql.Row] = Array([bluejoe,100], [alex,200])
DataFrame的map()函數(shù)具有一些陷阱逾条,因?yàn)樗鼘?shí)際上還是一個(gè)Dataset,所以它的每一行還是可以轉(zhuǎn)換成任意對(duì)象(甚至是非Row對(duì)象M栋Jχ):
scala> df.map(x=>(x(0).asInstanceOf[String].toLowerCase, x(1).asInstanceOf[Int]-10)).collect
res43: Array[(String, Int)] = Array((bluejoe,90), (alex,190))
看到?jīng)]?這個(gè)map()之后的對(duì)象并不再是DataFrame了=恰吃警!如果堅(jiān)持要轉(zhuǎn)變成DataFrame,就必須用到別扭的toDF():
scala> df.map(x=>(x(0).asInstanceOf[String].toLowerCase, x(1).asInstanceOf[Int]-10)).toDF.collect
res44: Array[org.apache.spark.sql.Row] = Array([bluejoe,90], [alex,190])
或者指定Encoder:
scala> df.map{x:Row=>Row(x(0).asInstanceOf[String].toLowerCase, x(1).asInstanceOf[Int]-10)}(RowEncoder(ds.schema)).collect
res52: Array[org.apache.spark.sql.Row] = Array([bluejoe,90], [alex,190])
別扭嗎泳桦?真的很別扭汤徽!