【譯】MLXTEND之StackingCVRegressor

www.DataCamp.com 中有很多數(shù)據(jù)科學(xué)家的cheat sheet兔辅,可以放在手邊參考呀酸,大部分情況就夠用了次酌,以下是個(gè)人整理的一些詳細(xì)的例子酣栈。

spark中通常使用rdd擅笔,但是這樣代碼可讀性差志衣,目前rdd的很多方法已經(jīng)不再更新了。dataframe大部分使用Spark SQL操作猛们,速度會比rdd的方法更快念脯,dataset是dataframe的子集,大部分api是互通的弯淘,目前主流是在使用Spark SQL绿店。

Spark SQL概述

  1. SQL只是Spark SQL的一個(gè)功能而已
  2. 可以訪問hive、json、parquet等文件的數(shù)據(jù)
  3. Spark SQL 提供了SQL假勿、Dataframe和Dataset的API

DataFrames具有如下特點(diǎn):

  • Ability to scale from kilobytes of data on a single laptop to petabytes on a large cluster(支持單機(jī)KB級到集群PB級的數(shù)據(jù)處理)
  • Support for a wide array of data formats and storage systems(支持多種數(shù)據(jù)格式和存儲系統(tǒng)借嗽,如圖所示)
  • State-of-the-art optimization and code generation through the Spark SQL Catalyst optimizer(通過Spark SQL Catalyst優(yōu)化器可以進(jìn)行高效的代碼生成和優(yōu)化)
  • Seamless integration with all big data tooling and infrastructure via Spark(能夠無縫集成所有的大數(shù)據(jù)處理工具)
  • APIs for Python, Java, Scala, and R (in development via SparkR)

生成一個(gè)DF進(jìn)行測試

 import sparkSession.implicits._
 var test_df = Seq((1,Array("1.0")),(2,Array("2.0")),(3,Array("3.0"))).toDF("imei","feature")    //可以直接將schema寫在toDF里面。

rdd轉(zhuǎn)化為DF

在Spark SQL中SQLContext是創(chuàng)建DataFrames和執(zhí)行SQL的入口转培,在spark-1.5.2中已經(jīng)內(nèi)置了一個(gè)sqlContext
1.在本地創(chuàng)建一個(gè)文件恶导,有三列,分別是id浸须、name惨寿、age,用空格分隔删窒,然后上傳到hdfs上
hdfs dfs -put person.txt /
2.在spark shell執(zhí)行下面命令裂垦,讀取數(shù)據(jù),將每一行的數(shù)據(jù)使用列分隔符分割
val lineRDD = sc.textFile("hdfs://node1:9000/person.txt").map(_.split(" "))
3.定義case class(相當(dāng)于表的schema)
case class Person(id:Int, name:String, age:Int)
4.將RDD和case class關(guān)聯(lián)
val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
5.將RDD轉(zhuǎn)換成DataFrame
val personDF = personRDD.toDF
6.對DataFrame進(jìn)行處理
personDF.show

1.選出若干列(字段)

1.1 選出一列(字段)

imei_features_df.col("id")
org.apache.spark.sql.Column
(col返回為Column,select 返回為dataframe肌索,df(col_name)為Column類型)

imei_features_df.select((imei_features_df.col("value") + 10).as("value2")).show//修改了把一列的值加十蕉拢,再修改列名
org.apache.spark.sql.DataFrame

imei_features_df.select("value").show

imei_features_df.selectExpr("value") 選出名字為value的一列。
查看兩列(兩個(gè)字段)
import org.apache.spark.sql.functions
imei_features_df.select(functions.col("name"),functions.col("value")).show
df.select(df("name"),df("value")+1).show()
df.select("name","value").show() select操作诚亚,典型的弱類型企量,untyped操作
不能寫成 df.select("name","value"+1).show()這樣。
df.select($"name", $"value" + 1).show() // 使用表達(dá)式亡电,scala的語法届巩,要用$符號作為前綴
另外selectExpr:也可以對指定字段進(jìn)行特殊處理:
df.selectExpr("id", "col2 as time", "round(col3)").show(false)

用apply:獲取指定字段(只能一個(gè),返回為Column)

     val id1 = df.apply("id")
     val id2 = df("id")

1.2 選出兩列份乒,把其中一列加1

imei_features_df.select(col("name"),col("value") + 1).show

imei_features_df.select(imei_features_df("name"),imei_features_df("value") + 1).show

1.3 獲取指定字段統(tǒng)計(jì)信息

stat方法可以用于計(jì)算指定字段或指定字段之間的統(tǒng)計(jì)信息恕汇,比如方差,協(xié)方差等或辖。這個(gè)方法返回一個(gè)DataFramesStatFunctions類型對象瘾英。
df.stat.freqItems(Seq ("c1","c2")).show()
  下面代碼演示根據(jù)c4字段,統(tǒng)計(jì)c1字段值出現(xiàn)頻率在30%以上的內(nèi)容颂暇。在df中字段c1的內(nèi)容為"a, b, a, c, d, b"缺谴。其中a和b出現(xiàn)的頻率為2 / 6,大于0.3耳鸯,(注意:這個(gè)方法算出來的結(jié)果經(jīng)常出錯(cuò))
df.stat.freqItems(Seq ("c1") , 0.3).show()

2.選取若干行記錄

2.1 first, head, take, takeAsList:獲取若干行記錄

(1)first獲取第一行
(2)head獲取第一行湿蛔,head(n: Int)獲取前n行記錄
(3)take(n: Int)獲取前n行數(shù)據(jù)
(4)takeAsList(n: Int)獲取前n行數(shù)據(jù),并以List的形式展現(xiàn)
  以Row或者Array[Row]的形式返回一行或多行數(shù)據(jù)县爬。first和head功能相同阳啥。
  take和takeAsList方法會將獲得到的數(shù)據(jù)返回到Driver端,所以财喳,使用這兩個(gè)方法時(shí)需要注意數(shù)據(jù)量察迟,以免Driver發(fā)生OutOfMemoryError

2.2 limit

limit方法獲取指定DataFrame的前n行記錄斩狱,得到一個(gè)新的DataFrame對象。和take與head不同的是扎瓶,limit方法不是Action操作所踊。
df.limit(3).show

3.刪除操作

3.1.刪除指定字段(列),保留其他字段

df.drop("id")
df.drop(df("id"))

3.2.取出前n行記錄概荷,得到一個(gè)新的dataframe污筷。take和head是Action操作,limit則不是乍赫。

df.limit(n)

3.3.刪除空值

df.na.drop() 刪除帶有空值的行

4.排序

生成數(shù)據(jù)

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().appName("DfSortDesc").master("local").getOrCreate()
val data = Array((7, 2, 3), (1, 8, 6), (4, 5, 9))
val df = spark.createDataFrame(data).toDF("col1", "col2", "col3")

4.1 orderBy和sort: 按指定字段排序瓣蛀,默認(rèn)為升序,用法相同

//升序排列,只對數(shù)字類型和日期類型生效
df.orderBy("col2").show()
//降序排列雷厂,只對數(shù)字類型和日期類型生效
df.orderBy( - df("col2")).show(false)//加"-"表示降序排列
df.orderBy(df("col2").desc).show
df.orderBy(desc("col2")).show
df.orderBy(-col("col2")).show
df.orderBy(col("col2").desc).show
//sort降序惋增,sort用法和orderBy相同,可以直接替換
df.sort(desc("col2")).show
//一列升序改鲫,一列降序诈皿,同時(shí)調(diào)整兩列
df.sort(desc("col2"),asc("col1")).show

4.2 sortWithinPartitions

和sort類似,但是是使用Partition來對其他字段排序

5. 去重

(1)distinct:返回一個(gè)不包含重復(fù)記錄的Dataframe(整體去重)像棘,結(jié)果和dropDuplicates不傳入指定字段的結(jié)果相同稽亏。
返回當(dāng)前不重復(fù)的row(行)記錄df.distinct()
(2)dropDuplicates:根據(jù)指定字段去重
根據(jù)指定字段去重。
df.dropDuplicates(Seq("col_name"))

6.聚合(groupBy&agg)

6.1.group by

6.1.1 groupBy: 根據(jù)字段進(jìn)行g(shù)roup by操作

groupBy方法有兩種調(diào)用方式缕题,可以傳入String類型的字段名截歉,也可傳入Column類型的對象:

df.groupBy("col1")
df.groupBy(df("col"))

//分組計(jì)數(shù)(各個(gè)年齡的人出現(xiàn)了多少次)
df.groupBy("age").count().show()

6.1.2 cube和rollup:group by的擴(kuò)展

功能和SQL中的group by cube/rollup類似

6.1.3 GroupedData對象

該方法得到的是GroupedData類型對象,在GroupedData的API中提供了group by之后的操作烟零,比如瘪松,
max(colNames: String)方法,數(shù)字類型字段最大值
min(colNames: String
)方法锨阿,數(shù)字類型字段的最小值
mean(colNames: String)方法宵睦,數(shù)字類型字段的平均值
sum(colNames: String
)方法,數(shù)字類型字段的和
count()方法墅诡,分組中元素個(gè)數(shù)
agg方法壳嚎,對指定字段進(jìn)行聚合

6.2 agg (aggregate)

聚合操作調(diào)用的是agg方法,該方法有多種調(diào)用方式末早。通常和groupBy方法配合使用烟馅。
//輸入col,輸出dataframe,對id字段求最大荐吉,對col2求和焙糟。
agg(expers:column*)
df.agg("id" -> "max", "col2" -> "sum")
df.agg(max("age"),avg("salary"))
df.groupBy().agg(max("age"), avg("salary"))

agg(exprs: Map[String, String]) 返回dataframe類型 ,同數(shù)學(xué)計(jì)算求值 map類型的
df.agg(Map("age" -> "max", "salary" -> "avg"))
df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
agg(aggExpr: (String, String), aggExprs: (String, String)*) 返回dataframe類型 样屠,同數(shù)學(xué)計(jì)算求值
df.agg(Map("age" -> "max", "salary" -> "avg"))
df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))

6.3 給一個(gè)日常使用的例子

假設(shè)df是一個(gè)關(guān)于ctr預(yù)估的矩陣,存在物料的各個(gè)特征字段和點(diǎn)擊字段,點(diǎn)擊字段值為[0,1]痪欲,代表點(diǎn)與不點(diǎn)悦穿。
隨便生成個(gè)數(shù)據(jù),再轉(zhuǎn)成dataframe业踢。
val data = Array(("1", "2", "3", "4", 0), ("1", "7", "8", "9", 1),("2", "7", "8", "9", 1),("2", "7", "8", "9", 0),("1", "7", "8", "9", 1),("3", "7", "8", "9", 1))

df.groupBy("feature_col1").agg( mean("click"), count("click"))
這個(gè)表就能看出點(diǎn)擊和特征(feature_col1)的關(guān)系栗柒,可以判斷出此特征的重要性。
這個(gè)特征為3的時(shí)候知举,點(diǎn)擊都是1瞬沦。
特征為2的時(shí)候,點(diǎn)擊平均值是0.67
特征為1的時(shí)候雇锡,點(diǎn)擊平均值是0.5
如果count(click)的樣本足夠多的情況下逛钻,以上平均值就能反映點(diǎn)擊和這個(gè)特征的關(guān)系。
即:此特征為3和2的時(shí)候點(diǎn)擊的概率比較大锰提,為1的時(shí)候跟點(diǎn)擊似乎無關(guān)曙痘。
+----+------------------+-----------+
|feature_col1|         avg(click)|count(click)|
+----+------------------+-----------+
|   3|               1.0|          1|
|   1|0.6666666666666666|          3|
|   2|               0.5|          2|
+----+------------------+-----------+

7. 數(shù)據(jù)的分割、拼接(上下合并)和采樣立肘,join&union&randomSplit&sample


(1)笛卡爾積
DF1.crossJoin(DF2)
(2)一個(gè)字段形式
需要兩個(gè)DataFrame中有相同的一個(gè)列名边坤,默認(rèn)是"inner"
DF1.join(DF2, "id")
(3)多個(gè)字段形式 ,多個(gè)字段時(shí)最好指定join的類型,有:“full”,”outer”,”full_outer”,”fullouter”代表全連接谅年,“l(fā)eft”,”left_outer”或者”leftouter”代表左連接茧痒,“right”,”right_outer”及“rightouter”代表右連接,還有" leftsemi(左半連接)"等等融蹂。
val df = df1.join(df2, Seq("key1","key2")) // 基于兩個(gè)公共字段key1和key的等值連接
DF1.join(DF2, Seq("id", "name")文黎,"inner") // 多列
val df = df1.join(df2, df1("key1") === df2("key1"), "outer")
DF1.join(DF2, DF1("app_id") === DF2("item_id"), "left")

(4)當(dāng)兩個(gè)字段名字不一樣時(shí)候
DF1.join(DF2 , DF1("id" ) === DF2( "ID")岛宦,"inner")

(5)
如果出現(xiàn)類似這樣的報(bào)錯(cuò):Reference 'ID' is ambiguous
說明表里有兩個(gè)名字為“ID”列蔓涧,這個(gè)ID列就是join的連接字段,操作的時(shí)候會出現(xiàn)歧義蝉稳。
即使連接字段名稱不相同淋纲,比如一個(gè)"id"一個(gè)"ID"劳闹,表中兩個(gè)字段為id和ID
仍然會報(bào)上面的錯(cuò)誤
DF1.join(DF2 , DF1("id" ) === DF2( "ID"),"inner")這樣也會報(bào)錯(cuò)
正確做法是合并兩個(gè)代表ID的字段為一個(gè)洽瞬,改寫成
df2.join(df, Seq("ID"),"left")

8.1 union

DF1.union(DF2)
DF1.unionAll(DF2) //unionAll以及棄用本涕,新版本一般使用union

8.2 randomSplit

例如:
val Array(train, test) = DF.randomSplit(Array(0.8,0.2))

8 交叉:

獲取兩個(gè)dataframe共有的紀(jì)錄
df.intersect(df.limit(1)).show
獲取一個(gè)dataframe中有另一個(gè)daraframe中沒有的紀(jì)錄
df.except(df.limit(1)).show

9.生成一個(gè)dataframe&&重命名Dataframe中指定字段名

import sqlContext.implicits._
val df = Seq(
  (12, "First Value", java.sql.Date.valueOf("2010-01-01")),
  (234, "Second Value", java.sql.Date.valueOf("2010-02-01")),
(224, "Second Value", java.sql.Date.valueOf("2010-02-01"))
).toDF("int_column", "string_column", "date_column")

指定字段名不存在則不進(jìn)行操作,id字段名改為ID
df.withColumnRenamed("id","ID")

10. 讀取數(shù)據(jù)用“|”切分伙窃,轉(zhuǎn)成rdd菩颖,再把rdd(Array(s0,s1,s2))轉(zhuǎn)化為dataframe

例如: 123123|閃電球|label_2

  case class firstlevel(id: String, name: String, result: String)
## “|”需要兩個(gè)轉(zhuǎn)義字符
  val textFile_DF = sc.textFile("/user/result")
          .map(v => v.toString.split("\\|"))
          .map{case Array(s0, s1, s2) => firstlevel(s0, s1, s2)}.toDF

11. spark sql和DataFrame中使用UDF(User Defined Function),用戶自定義函數(shù)

11.1.測試數(shù)據(jù)

構(gòu)造測試數(shù)據(jù)-(name为障,age)
val userData = Array(("Leo",16),("Marry",21),("Jack",14),("Tom",18))

創(chuàng)建測試df

val user_df = spark.createDataFrame(userData).toDF("name","age")
user_df.show(false)

注冊user表
user_df.createOrReplaceTempView("user")

11.2.Spark.sql中的用法(hive 中的udf,Spark Sql的dataframe中無效)

11.2.1 通過匿名函數(shù)注冊UDF
下面的UDF的功能是計(jì)算某列的長度晦闰,該列的類型為String
11.2.1.1 注冊
spark.udf.register("strLen",(str:String) => str.length())

11.2.2 通過實(shí)名函數(shù)注冊UDF
實(shí)名函數(shù)注冊有點(diǎn)不同放祟,要在后面加 " _"(空格+下劃線)
定義一個(gè)實(shí)名函數(shù)

 def isAdult(age: Int) = {
  if (age < 18) {
    false
  } else {
    true
  }
}

11.2.2.1 注冊
spark.udf.register("isAdult", isAdult _)
11.2.2.2 使用
spark.sql("select name,strLen(name) as name_len from user").show

11.3.DataFrame中的udf用法
DataFrame的udf方法雖然和Spark.sql的名字一樣,但是屬于不同的類呻右,它在org.apache.spark.sql.functions里
11.3.1 注冊

import org.apache.spark.sql.functions.udf
//注冊自定義函數(shù)(通過匿名函數(shù))
val strLen = udf((str: String) => str.length())
//注冊自定義函數(shù)(通過實(shí)名函數(shù))
val udf_isAdult = udf(isAdult _)

11.3.2 使用
可通過withColumn和select使用
給user表添加兩列跪妥。

//通過withColumn添加列
user_df.withColumn("name_len", strLen(col("name"))).withColumn("isAdult", udf_isAdult(col("age"))).show
//通過select添加列
user_df.select(col("*"), strLen(col("name")) as "name_len", udf_isAdult(col("age")) as "isAdult").show

12.添加一列

添加一列有很多方法:

12.1. withColumn

通常使用withColumn,他會先判斷DataFrame里有沒有這個(gè)列名声滥,如果有的話就會替換掉原來的列眉撵,沒有的話就用調(diào)用select方法增加一列,所以如果我們的需求是增加一列的話落塑,但這個(gè)函數(shù)只能對原有列做處理生成新列纽疟。兩者實(shí)現(xiàn)的功能一樣,且最終都是調(diào)用select方法憾赁,但是withColumn會提前做一些判斷處理污朽,所以withColumn的性能不如select好。
df.withColumn("Column Name", value)
df.withColumn("feature", set_feature(imei_label_feature_df("feature")))

12.2 使用udf

可以用udf寫自定義函數(shù)新增列

import org.apache.spark.sql.functions.udf
// 新建一個(gè)dataFrame
val tempDataFrame = spark.createDataFrame(Seq(
  ("a", "asf"),
  ("b", "2143"),
  ("c", "rfds")
)).toDF("id", "content")
// 自定義udf的函數(shù)
val code = (arg: String) => {
      if (arg.getClass.getName == "java.lang.String") 1 else 0
    }

val addCol = udf(code)
// 增加一列
val addColDataframe = tempDataFrame.withColumn("addcol", addCol(tempDataFrame("id")))
addColDataframe.show(10, false)

12.3 添加常量列時(shí)缠沈,需要使用lit

例如:添加所有行的值均為0的一列
import org.apache.spark.sql.functions.{col, concat_ws, udf,lit}
df.withColumn("Column Name", lit(0))
例如:添加所有行的值均為字符串"imei"的一列
df.withColumn("Column Name", lit("imei"))

Spark 2.2引入了typedLit來支持Seq膘壶、list等等

import org.apache.spark.sql.functions.typedLit

df.withColumn("some_array", typedLit(Seq(1, 2, 3)))
df.withColumn("some_struct", typedLit(("foo", 1, .0.3)))
df.withColumn("some_map", typedLit(Map("key1" -> 1, "key2" -> 2)))

另外可以使用alias來改變列名,類似sql中的as

df.withColumn(
    "some_struct",
    struct(lit("foo").alias("x"), lit(1).alias("y"), lit(0.3).alias("z"))
 )

可以使用cast函數(shù)來改變類型

df.withColumn(
    "some_struct", 
    struct(lit("foo"), lit(1), lit(0.3)).cast("struct<x: string, y: integer, z: double>")
 )

12.3 添加值為null的一列

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
val df = spark.createDataFrame(List(
  (1.2, 1),
  (3.1, 2)))
  .toDF("col1", "col2")
val udf_null = udf((s: Any) => null) //有時(shí)候這個(gè)會報(bào)錯(cuò)
//scala.MatchError: Null (of class scala.reflect.internal.Types$ClassNoArgsTypeRef)
//出現(xiàn)這個(gè)錯(cuò)誤時(shí)洲愤,是因?yàn)閟cala不知道要給null轉(zhuǎn)換成何種類型
//可以強(qiáng)制給一個(gè)類型
//val udf_null = udf((s: Any) => null.asInstanceOf[String])

val df_res = df.withColumn("col_name", udf_null(col("col1")).cast(types. StringType))
df_res.show

12.4 一般需要把null的列轉(zhuǎn)換成需要的新列

比如已經(jīng)新增了一個(gè)全為null的列颓芭,我們需要把這列轉(zhuǎn)化成uuid的隨機(jī)數(shù)列

    val code: (String => String) = (arg: String) => { if (arg == null) UUID.randomUUID().toString else "error"}
    val add_uuid_Col = udf(code)
    df_res.withColumn("col_name", add_uuid_Col(df("col_name"))

13.更改一列的列名

df.withColumnRenamed("x", "y")

14. 過濾操作(filter and where)

14.1.filter & filterNot

filter方法返回了所有使假設(shè)條件為真的集合元素組成的新集合。還有一個(gè)方法filterNot柬赐,可以返回所有使假設(shè)條件返回false的元素組成的新集合亡问,用法一樣。

val df = sc.parallelize(Seq(("a", 2), ("b", 5), ("c", 2), ("d", 3), ("e", 1))).toDF("id", "num")
整數(shù)類型
邏輯運(yùn)算符:>, <, ===

import org.apache.spark.sql.functions
val number = 2;
df.filter(functions.col("num") === 2)
df.filter(df.col("num") === 2)
df.filter(df("num") === 2)
df.filter($"num">number)    //傳遞參數(shù)過濾(新版本scala可能不能用)
df.filter($"num"<2)

或者

df.filter("num=2")
df.filter("num>2")
df.filter("num is null")

字符串類型
df.filter(functions.col("id") === ("aaaaaa"))等于
df.filter($"id".equalTo("aaaaaa")) 等于
df.filter(functions.col("id") =!= ("aaaaaa")) 不等于

df.filter($"id".length>1)這種寫法已經(jīng)棄用了肛宋,改成df.filter(length(col("id")) >1)

Map州藕、Array類型判斷相等需要使用sameElements方法
df .filter(col("array1" sameElements "array2"))

傳遞參數(shù)過濾

val str = s"a"

df.filter($"id"equalTo(str))

當(dāng)dataframe沒有字段名時(shí),可以用默認(rèn)的字段名[_1, _2, .....]來進(jìn)行判斷

多條件判斷
邏輯連接符 &&(并)酝陈、||(或)

df.filter($"num"===2 && $"id".equalTo("a")
df.filter($"num"===1 || $"num"===3)
val df1 = df
       .filter(row => row.getAs("col1")!= "") // 過濾掉異常數(shù)據(jù)
       // 如果存在床玻,過濾掉col2日期不在當(dāng)前執(zhí)行周期內(nèi)的數(shù)據(jù)
       .filter(row => row.getAs("col2") >= row.getAs("col1").split("_")(0) &&
        row.getAs("col2") <= row.getAs("col1").split("_")(1))

這樣寫會報(bào)錯(cuò)
因?yàn)槭褂胓etAs函數(shù)獲取某列的數(shù)據(jù)時(shí)沒有指明具體的類型,導(dǎo)致無法判斷是否支持"<="或 ">="這類的運(yùn)算符沉帮,因而會報(bào)錯(cuò)

val df1 = df
       .filter(row => row.getAs[String]("col1")!= "") // 過濾掉異常數(shù)據(jù)
       // 如果存在锈死,過濾掉col2日期不在當(dāng)前執(zhí)行周期內(nèi)的數(shù)據(jù)
       .filter(row => row.getAs[String]("col2") >= row.getAs[String]("col1").split("_")(0) &&
        row.getAs[String]("col2") <= row.getAs("col1").split("_")(1))

14.2.where

df.where("id = 1 or sex = 'male' ").show
用法和filter相同

15.轉(zhuǎn)化列類型

15.1單列轉(zhuǎn)換

import org.apache.spark.sql.types._
//import org.apache.spark.sql.types.{DoubleType, IntegerType}

val data = Array(("1", "2", "3", "4", "5"), ("6", "7", "8", "9", "10"))
val df = spark.createDataFrame(data).toDF("col1", "col2", "col3", "col4", "col5")

import org.apache.spark.sql.functions._
df.select(col("col1").cast(DoubleType)).show()

15.2使用withColumn循環(huán)多列轉(zhuǎn)換(遍歷)

import org.apache.spark.sql.types
var df1 = df
val colNames = df.columns

for (colName <- colNames) {
  df1 = df1.withColumn(colName, col(colName).cast(types.DoubleType))
// 有些type名稱會和其他類里的名稱起沖突,例如StringType穆壕,調(diào)用的時(shí)候手動加上類名如types.StringType調(diào)用待牵。
}
df1.show()

15.3 當(dāng)一列類型為Array,需要轉(zhuǎn)成字符串

平時(shí)都是使用Array.mkString(',')這種喇勋。在dataframe中需要使用concat_ws缨该,詳細(xì)見【20.1.數(shù)據(jù)的合并】。

16.創(chuàng)建一個(gè)空dataframe

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types
import org.apache.spark.sql.Row
import org.apache.spark.rdd.EmptyRDD

/**
 * Spark創(chuàng)建空DataFrame示例
 */
object EmptyDataFrame {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().appName("EmptyDataFrame").master("local").getOrCreate()

    /**
     * 創(chuàng)建一個(gè)空的DataFrame川背,代表用戶
     * 有四列贰拿,分別代表ID蛤袒、名字、年齡壮不、生日
     */
    val colNames = Array("id", "name", "age", "birth")
    //為了簡單起見汗盘,字段類型都為String
    val schema = StructType(colNames.map(fieldName => StructField(fieldName, types.StringType, true)))
    //主要是利用了spark.sparkContext.emptyRDD
    val emptyDf = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema)

    emptyDf.show

    /**
   也可以給每列指定相對應(yīng)的類型(使用這種會比較多)
   通過StructType直接指定每個(gè)字段的schema
     */
    val schema1 = StructType(
      Seq(
        StructField("id", types.IntegerType, true),
        StructField("name", types.StringType, true),
        StructField("age", types.IntegerType, true),
        StructField("birth", types.StringType, true)))
    val emptyDf1 = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema1)
    emptyDf1.show

    //還有一種空的DataFrame皱碘,沒有任何行任何列
    spark.emptyDataFrame.show

    spark.stop()
  }
}

17.指定列缺失值填充询一、刪除以及替換

17.1 填充

    val df1 = source_df.na.fill("新值")
    val df2 = source_df.na.fill(10)?
    val df3 = source_df.na.fill(value = 0.0,cols=Array("col1","col2"))
    val df4 = source_df.na.fill(value = "wang",cols=Array("col1","col2"))
//傳入一個(gè)值,以及所有需要用此值填充的Array癌椿,或者使用Map健蕊,按照列進(jìn)行不同填充
    val df5 = source_df.na.fill(value = "male",cols=Array("col1"))
    val df6 = source_df.na.fill(Map("col1"->"wang","col2"->"yao"))

17.2 刪除

val df7 = source_df.na.drop()
val df8 = source_df.na.drop(Array("col1"))
val df9 = source_df.na.drop(10,Array("col1","col2"))    //刪除某一或幾列的非空非NaN但是值低于10的

17.3 替換

df.na.replace("col1",Map(1->2))         //將col1列的值為1替換為2.
df.na.replace(Array("col1","col2"),Map(1->2))??
df.na.replace[Int]("col1",Map(1->2))   //可以添加泛型,Map中的key和value類型必須與其保持一致踢俄。

18.根據(jù)列分割成多行 explode

explode 用來根據(jù)某列分割成多行
通常用來展開array或map為多行缩功。

拆分List格式的列。
df = df.withColumn("entityPair", explode(col("List_col")));

拆分Map格式的列都办。
df = df.select(explode(col("data"))).toDF("key", "value");
可以看到嫡锌,這里和List有一個(gè)不同的地方是需要在explode后接一個(gè)toDF操作,是因?yàn)镸ap進(jìn)行展開操作后自然會得到兩列琳钉,我們需要將其轉(zhuǎn)化為DataFrame格式的兩列势木,列名可以自己指定。

18.2 網(wǎng)上的例子1:https://blog.csdn.net/strongyoung88/article/details/52227568

有的時(shí)候JSON數(shù)據(jù)歌懒,會包含嵌套關(guān)系啦桌,比如像如下的JSON數(shù)據(jù):

{"name":"Michael", "age":25,"myScore":[{"score1":19,"score2":23},{"score1":58,"score2":50}]}
{"name":"Andy", "age":30,"myScore":[{"score1":29,"score2":33},{"score1":38,"score2":52},{"score1":88,"score2":71}]}
{"name":"Justin", "age":19,"myScore":[{"score1":39,"score2":43},{"score1":28,"score2":53}]}

用sparkql讀取以后

df.show()
df.printSchema()
+---+--------------------+-------+
|age|             myScore|   name|
+---+--------------------+-------+
| 25|  [[23,19], [50,58]]|Michael|
| 30|[[33,29], [52,38]...|   Andy|
| 19|  [[43,39], [53,28]]| Justin|
+---+--------------------+-------+
root
 |-- age: long (nullable = true)
 |-- myScore: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- score2: long (nullable = true)
 |    |    |-- score1: long (nullable = true)
 |-- name: string (nullable = true)

使用explode函數(shù)把myScore的數(shù)組類型展開

val dfScore = df.select(df("name"),explode(df("myScore"))).toDF("name","myScore")
val dfMyScore = dfScore.select("name","myScore.score1", "myScore.score2")
dfScore.show()

此時(shí),會得到如下結(jié)果及皂,這個(gè)時(shí)候的表甫男,就跟我們平時(shí)看到的關(guān)系型數(shù)據(jù)庫的表是一樣的了,接下來验烧,我們就可以執(zhí)行相關(guān)的sql查詢了板驳。

+-------+-----------------+------------------+
|   name|           score1|            score2|
+-------+-----------------+------------------+
|Michael|               19|                23|
|Michael|               58|                50|
|   Andy|               29|                33|
|   Andy|               38|                52|
|   Andy|               88|                71|
| Justin|               39|                43|
| Justin|               28|                53|
+-------+-----------------+------------------+

18.3 網(wǎng)上的例子2:

有時(shí)候需要根據(jù)某個(gè)字段內(nèi)容進(jìn)行分割,然后生成多行碍拆,這時(shí)可以使用explode方法
根據(jù)c3字段中空格將字段內(nèi)容分割若治,分割內(nèi)容存在c3_字段中
分割前:


df.explode( "c3" , "c3_" ){time: String => time.split( " " )}
結(jié)果:

19.對dataframe中的值進(jìn)行替換

我們需要把第二列中的“Tesla”改成“S”,“Ford”改成“F”

//初始化一個(gè)dataframe
//import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.{functions => F}
import org.apache.spark.sql.{DataFrame,SparkSession}

val sparkSession =  SparkSession.builder().enableHiveSupport().getOrCreate()
val rdd = sc.parallelize(
      List( (2012,"Tesla","S"), (1997,"Ford","E350"), (2015,"Chevy","Volt"))
  )
 import sparkSession.implicits._
  val df = rdd.toDF("year","trademark","model_number")

使用functions.udf

val makeSIfTesla = udf {(trademark: String) => 
  if(trademark == "Tesla") "S"  else if(trademark == "Ford")"F"
 else trademark
}
var df1 = df.withColumn("trademark", makeSIfTesla(df("trademark")))

使用functions.when

val df2 = df.withColumn("trademark", F.when(col("trademark").equalTo("Tesla"), "S").when(col("trademark").equalTo("Ford"), "F").otherwise(col("trademark")))

val df3 = df.withColumn("trademark", F.when(col("trademark") === "Tesla", "S").when(col("trademark") === "Ford", "F").otherwise(col("trademark")))

20.數(shù)據(jù)的拆分與合并

20.1.數(shù)據(jù)的合并

concat合并列,列類型需要為字符串 
concat_ws按照特殊符號分隔來合并列倔监,返回col格式
連接字符串:
concat_ws("_", field1, field2)直砂,輸出結(jié)果將會是:“field1_field2”。

數(shù)組元素連接:
concat_ws("_", [a,b,c])浩习,輸出結(jié)果將會是:"a_b_c"静暂。
多個(gè)列連接:
concat_ws(",",col(col_name1),col(col_name2),col(col_name3))
import org.apache.spark.sql.types._
import org.apache.spark.sql.types
import org.apache.spark.sql.{DataFrame, Row}
//import org.apache.spark.sql.types.{DoubleType, StringType}

val data = Array(("1", "2", "3", "4", "5"), ("6", "7", "8", "9", "10"))
val df = spark.createDataFrame(data).toDF("col1", "col2", "col3", "col4", "col5")

//合并多個(gè)列,加在原dataframe的后面
val new_df = df.withColumn("new_col_name", concat_ws(",", col("col1"),col("col2"),col("col3")).cast(types.StringType))

//使用自定義UDF函數(shù)
    // 編寫udf函數(shù)谱秽,對行做處理
val separator = "洽蛀,"
    def mergeCols(merge_row: Row): String = {
        merge_row.toSeq.foldLeft("")(_ + separator + _).substring(1)
    }
    val mergeColsUDF = udf(mergeCols _)
    df.select($"col4",$"col5",mergeColsUDF(struct($"col1", $"col2", $"col3")).as("value")).show

結(jié)果:
+----+----+-----+
|col4|col5|value|
+----+----+-----+
|   4|   5|1摹迷,2,3|
|   9|  10|6郊供,7峡碉,8|
+----+----+-----+

20.2.數(shù)據(jù)的拆分

把一列(字符串),按照特殊的分隔符號切分成多列

生成數(shù)據(jù)
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row}
//import org.apache.spark.sql.types.{DoubleType, StringType}

val data = Array(("1", "2", "3", "4", "5"), ("6", "7", "8", "9", "10"))
var df = spark.createDataFrame(data).toDF("col1", "col2", "col3", "col4", "col5")
val separator = "驮审,"
    def mergeCols(merge_row: Row): String = {
        merge_row.toSeq.foldLeft("")(_ + separator + _).substring(1)
    }

    val mergeColsUDF = udf(mergeCols _)
   df = df.select(mergeColsUDF(struct($"col1", $"col2", $"col3",$"col4",$"col5")).as("value"))
使用內(nèi)置函數(shù)split鲫寄,然后遍歷添加列
    val separator = ","
    lazy val first = df.first()
使用lazy val first疯淫,原因是構(gòu)造順序問題地来,在使用.length等方法時(shí),first可能還沒有被賦值熙掺,所以先使用lazy val生成未斑,等到用到這個(gè)變量的時(shí)候再初始化。
    val numAttrs = first.toString().split(separator).length
tabulate返回指定長度數(shù)組币绩,每個(gè)數(shù)組元素為指定函數(shù)的返回值蜡秽,默認(rèn)從0開始。
    val attrs = Array.tabulate(numAttrs)(n => "col_" + n)
    //按指定分隔符拆分value列缆镣,生成splitCols列
    var newDF = df.withColumn("splitCols", split($"value", separator))
    attrs.zipWithIndex.foreach(x => {
      newDF = newDF.withColumn(x._1, $"splitCols".getItem(x._2))
    })
    newDF.show()

結(jié)果:
+----------+----------------+-----+-----+-----+-----+-----+
|     value|       splitCols|col_0|col_1|col_2|col_3|col_4|
+----------+----------------+-----+-----+-----+-----+-----+
| 1芽突,2,3费就,4诉瓦,5| [1, 2, 3, 4, 5]|    1|    2|    3|    4|    5|
|6,7力细,8睬澡,9,10|[6, 7, 8, 9, 10]|    6|    7|    8|    9|   10|
+----------+----------------+-----+-----+-----+-----+-----+

21.查看分區(qū)以及重分區(qū)

分區(qū)數(shù)太少眠蚂,不能充分利用集群中所有可用的內(nèi)核煞聪。 【影響較大】
分區(qū)數(shù)太多,將管理許多小任務(wù)逝慧,使得產(chǎn)生過多的開銷(更多的抓取昔脯,更多的磁盤搜索,driver app需要跟蹤每個(gè)任務(wù)的狀態(tài))笛臣。 【小于1000的分區(qū)計(jì)數(shù)云稚,調(diào)度太多的小任務(wù)對這一點(diǎn)影響相對較小】
查看分區(qū)
df.rdd.getNumPartitions
重分區(qū)(10個(gè)分區(qū))repartition可以用來減少或增多分區(qū)數(shù)
df.repartition(10).rdd.getNumPartitions
重分區(qū)(1個(gè))coalesce為repartition的優(yōu)化版本,只能用來減少分區(qū)數(shù)
df.coalesce(1).rdd.getNumPartitions
如果重分區(qū)的數(shù)目大于原來的分區(qū)數(shù)沈堡,那么必須指定shuffle參數(shù)為true
df.coalesce(14,true) repartition相當(dāng)于帶true的coalesce

注意:增加分區(qū)數(shù)會產(chǎn)生shuffle静陈,但是減少分區(qū)數(shù)不會產(chǎn)生shuffle,重分區(qū)可以緩解數(shù)據(jù)傾斜的問題,一般指減少分區(qū)數(shù)

22.Spark SQL符合類型struct

df.selectExpr("(col1,col2) as new_col","*").show(1)
df.selectExpr("struct(col1,col2) as new_col","*").show(1)
val complexDF=df.select(struct("col1","col2").as("new_col3"))
complexDF.createOrReplaceTempView("complexDF")
complexDF.select("new_col3.col1").show(1)
complexDF.select(col("new_col3").getField("col2")).show(1)
complexDF.select("new_col3.*").show(1)
spark.sql("select new_col3.* from complexDF").show(1)

23.注冊成表鲸拥,并進(jìn)行SparkSQL操作拐格,以及df的map操作

df.show
這個(gè)數(shù)據(jù)代表用戶有沒有車,label = 1為有車刑赶。

  • 將DataFrame注冊成表
    df.registerTempTable("people_car_table")

  • 利用sql方法進(jìn)行SparkSQL操作捏浊,選出有車的用戶
    val people_car = sparkSession.sql("SELECT imei,label FROM people_car_table WHERE label = 1")

  • 將返回結(jié)果看作是數(shù)據(jù)庫操作的一行,(0)表示第一列撞叨,依次類推
    這里把imei前面加上字符串為“IMEI:”的前綴
    people_car.map(t => "IMEI: " + t(0)).collect().foreach(println)


    people_car.map(t => "IMEI: " + t(0)).show

    同時(shí)改變兩列
    people_car.map(t => ("imei:"+t(0),"label:"+t(1))).show

  • 通過域的名稱獲取信息,結(jié)果和people_car.map(t => "IMEI: " + t(0)).show一樣
    使用getAs金踪。
    people_car.map(t => "Name: " + t.getAs[String]("imei")).show

上面兩張圖可以看出dataframe的map操作和rdd基本是一樣的,但是dataframe相比rdd的優(yōu)勢在于帶有schema谒所,更加直觀热康,使用map以后schema就變化了沛申,需要重新定義schema劣领。
  • 通過sql語句讀取hive表,dataframe格式
    val car_df = sparkSession.sql(select_sql)

24.檢查數(shù)據(jù)

先生成數(shù)據(jù)

import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row}
//import org.apache.spark.sql.types.{DoubleType, StringType}

val data = Array(("1", "2", "3", "4", "5"), ("6", "7", "8", "9", "10"))

var df = spark.createDataFrame(data).toDF("col1", "col2", "col3", "col4", "col5")

檢查數(shù)據(jù)

24.1

df.dtypes 返回字段名和數(shù)據(jù)類型
結(jié)果:
res1: Array[(String, String)] = Array((col1,StringType), (col2,StringType), (col3,StringType), (col4,StringType), (col5,StringType))

24.2

【show方法參數(shù)默認(rèn)為true铁材,如果字段太長則會略去尖淘,改為false,會顯示整個(gè)字段】
df.show(false) 返回?cái)?shù)據(jù)內(nèi)容
結(jié)果:

+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
|   1|   2|   3|   4|   5|
|   6|   7|   8|   9|  10|
+----+----+----+----+----+
24.3

df.schema 返回schema
結(jié)果:
res3: org.apache.spark.sql.types.StructType = StructType(StructField(col1,StringType,true), StructField(col2,StringType,true), StructField(col3,StringType,true), StructField(col4,StringType,true), StructField(col5,StringType,true))

24.4

df.columns    //返回列名
df.count()      //返回行數(shù)
df.distinct().count() //不重復(fù)行數(shù)
df.printSchema()   //打印schema
df.describe().show() 
//導(dǎo)入數(shù)據(jù)后執(zhí)行的第一個(gè)操作是了解它們的大致情況著觉。
//對于數(shù)字列, 了解描述性摘要統(tǒng)計(jì)信息對理解數(shù)據(jù)的分布有很大幫助村生。
//可以使用describe函數(shù)來返回一個(gè)DataFrame, 其中會包含非空項(xiàng)目數(shù), 平均值, 標(biāo)準(zhǔn)偏差以及每個(gè)數(shù)字列的最小值和最大值等信息。

25.Spark SQL 加載數(shù)據(jù)

import org.apache.spark.sql.types._

// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")

// 轉(zhuǎn)化rdd中的行為Row格式
import org.apache.spark.sql.Row
val rowRDD = peopleRDD.map(x => Row(x))

val schemaString = "name age" //兩列

// 根據(jù)schemaString 生成schema
val fields = schemaString.split(" ")
  .map(fieldName => StructField(fieldName, StringType, nullable = true))

val schema = StructType(fields)

// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)

// 創(chuàng)建臨時(shí)表
peopleDF.createOrReplaceTempView("people")

// 可以直接使用sql操作dataframe了
val results = spark.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
results.map(attributes => "Name: " + attributes(0)).show()
// +-------------+
// |        value|
// +-------------+
// |Name: Michael|
// |   Name: Andy|
// | Name: Justin|
// +-------------+

Reference

https://blog.csdn.net/strongyoung88/article/details/52227568
https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html
https://stackoverflow.com/questions/32357774/scala-how-can-i-replace-value-in-dataframes-using-scala
https://stackoverflow.com/questions/32788322/how-to-add-a-constant-column-in-a-spark-dataframe
https://www.cnblogs.com/xiaoma0529/p/7126923.html
https://dongkelun.com/2018/08/14/sparkEmptyDataFrame/
https://dongkelun.com/2018/08/02/sparkUDF/
https://blog.csdn.net/sinat_36121406/article/details/82755516

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末饼丘,一起剝皮案震驚了整個(gè)濱河市趁桃,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌肄鸽,老刑警劉巖卫病,帶你破解...
    沈念sama閱讀 206,968評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異典徘,居然都是意外死亡蟀苛,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評論 2 382
  • 文/潘曉璐 我一進(jìn)店門逮诲,熙熙樓的掌柜王于貴愁眉苦臉地迎上來帜平,“玉大人,你說我怎么就攤上這事梅鹦●伤Γ” “怎么了?”我有些...
    開封第一講書人閱讀 153,220評論 0 344
  • 文/不壞的土叔 我叫張陵齐唆,是天一觀的道長嗤栓。 經(jīng)常有香客問我,道長蝶念,這世上最難降的妖魔是什么抛腕? 我笑而不...
    開封第一講書人閱讀 55,416評論 1 279
  • 正文 為了忘掉前任芋绸,我火速辦了婚禮,結(jié)果婚禮上担敌,老公的妹妹穿的比我還像新娘摔敛。我一直安慰自己,他們只是感情好全封,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,425評論 5 374
  • 文/花漫 我一把揭開白布马昙。 她就那樣靜靜地躺著,像睡著了一般刹悴。 火紅的嫁衣襯著肌膚如雪行楞。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,144評論 1 285
  • 那天土匀,我揣著相機(jī)與錄音子房,去河邊找鬼。 笑死就轧,一個(gè)胖子當(dāng)著我的面吹牛证杭,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播妒御,決...
    沈念sama閱讀 38,432評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼解愤,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了乎莉?” 一聲冷哼從身側(cè)響起送讲,我...
    開封第一講書人閱讀 37,088評論 0 261
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎惋啃,沒想到半個(gè)月后哼鬓,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,586評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡肥橙,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,028評論 2 325
  • 正文 我和宋清朗相戀三年魄宏,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片存筏。...
    茶點(diǎn)故事閱讀 38,137評論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡宠互,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出椭坚,到底是詐尸還是另有隱情予跌,我是刑警寧澤,帶...
    沈念sama閱讀 33,783評論 4 324
  • 正文 年R本政府宣布善茎,位于F島的核電站券册,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜烁焙,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,343評論 3 307
  • 文/蒙蒙 一航邢、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧骄蝇,春花似錦膳殷、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,333評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至岔激,卻和暖如春勒极,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背虑鼎。 一陣腳步聲響...
    開封第一講書人閱讀 31,559評論 1 262
  • 我被黑心中介騙來泰國打工辱匿, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人震叙。 一個(gè)月前我還...
    沈念sama閱讀 45,595評論 2 355
  • 正文 我出身青樓掀鹅,卻偏偏與公主長得像,于是被迫代替她去往敵國和親媒楼。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,901評論 2 345

推薦閱讀更多精彩內(nèi)容