DataFrame的創(chuàng)建&操作


Spark SQL是Spark中的一個(gè)模塊贷掖,主要用于進(jìn)行結(jié)構(gòu)化數(shù)據(jù)的處理刑赶。它提供的最核心的編程抽象锯厢,就是DataFrame谆膳。同時(shí)Spark SQL還可以作為分布式的SQL查詢引擎叭爱。Spark SQL最重要的功能之一,就是從Hive中查詢數(shù)據(jù)漱病。

DataFrame是以列的形式組織的买雾,分布式的結(jié)構(gòu)化數(shù)據(jù)集合。它其實(shí)和關(guān)系型數(shù)據(jù)庫中的表非常類似杨帽,但是底層做了很多的優(yōu)化漓穿。DataFrame可以通過很多來源進(jìn)行構(gòu)建,包括:結(jié)構(gòu)化的數(shù)據(jù)文件注盈,Hive中的表晃危,外部的關(guān)系型數(shù)據(jù)庫,以及RDD老客。有了DataFrame后僚饭,可以非常方便地使用DataFrame API和SQL來處理數(shù)據(jù)。

  • 創(chuàng)建DataFrame(三種)
    • 第一種
    val dfUsers = spark.read.format("csv").option("header", "true").load("file:///root/data/user.csv")
    
    dfUsers.printSchema
    
    // root
    // |-- user_id: string (nullable = true)
    // |-- locale: string (nullable = true)
    // |-- birthyear: string (nullable = true)
    // |-- gender: string (nullable = true)
    // |-- joinedAt: string (nullable = true)
    // |-- location: string (nullable = true)
    // |-- timezone: string (nullable = true)
    
    
    • 第二種
    scala> import org.apache.spark.sql.types._
    import org.apache.spark.sql.types._
    
    scala> import org.apache.spark.sql.Row
    import org.apache.spark.sql.Row
    
    scala> import spark.implicits._
    import spark.implicits._
    
    // 讀取文件并轉(zhuǎn)換成RDD[Row]類型
    scala> val uRdd = spark.sparkContext.textFile("file:///root/data/user.csv")
                         .map(x = x.split(","))
                         .mapPartitionsWithIndex((index, iter) => if (index == 0) iter.drop(1) else iter)
                         .map(Row.fromSeq(_))
    uRdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[26]  at map at <console>:30
    
    // 定義Schema
    scala> val schema = StructType(Array(StructField("user_id", StringType, true),       
     StructField("locale", StringType, true),StructField("birthyear", StringType, true),   
     StructField("gender",StringType, true), StructField("joinedAt", StringType, true), 
     StructField("location", StringType, true), StructField("timezone", StringType, true)))
    schema: org.apache.spark.sql.types.StructType = StructType(StructField(user_id,StringType,true), StructField(locale,StringType,true), StructField(birthyear,StringType,true), StructField(gender,StringType,true), StructField(joinedAt,StringType,true), StructField(location,StringType,true), StructField(timezone,StringType,true))
    
    // 創(chuàng)建DataFrame
    scala> val dfUsers = spark.createDataFrame(uRdd, schema)
    dfUsers: org.apache.spark.sql.DataFrame = [user_id: string, locale: string ... 5 more fields]
    
    scala> dfUsers.printSchema
    
    // root
    // |-- user_id: string (nullable = true)
    // |-- locale: string (nullable = true)
    // |-- birthyear: string (nullable = true)
    // |-- gender: string (nullable = true)
    // |-- joinedAt: string (nullable = true)
    // |-- location: string (nullable = true)
    // |-- timezone: string (nullable = true)
    
    scala> dfUsers show 3
    // +----------+------+---------+------+--------------------+------------------+--------+
    // |   user_id|locale|birthyear|gender|            joinedAt|          location|timezone|
    // +----------+------+---------+------+--------------------+------------------+--------+
    // |3197468391| id_ID|     1993|  male|2012-10-02T06:40:...|  Medan  Indonesia|     480|
    // |3537982273| id_ID|     1992|  male|2012-09-29T18:03:...|  Medan  Indonesia|     420|
    // | 823183725| en_US|     1975|  male|2012-10-06T03:14:...|Stratford  Ontario|    -240|
    // +----------+------+---------+------+--------------------+------------------+--------+
    
    
    注:由于該文件首行是列名沿量,所以使用mapPartitionsWithIndex()函數(shù)過濾掉
    • 第三種
    scala> val dfUsers = spark.sparkContext.textFile("file:///root/data/users.csv")
                   .map(_.split(","))
                   .mapPartitionsWithIndex((index, iter) => if (index == 0) iter.drop(1) else iter)
                   .map(x => (x(0), x(1), x(2), x(3), x(4), x(5), x(6)))
                   .toDF("user_id", "locale", "birthyear", "gender", "joinedAt", "location", "timezone")
    dfUsers: org.apache.spark.sql.DataFrame = [user_id: string, locale: string ... 5 more  fields]
    
    scala> dfUsers show 3
    // +----------+------+---------+------+--------------------+------------------+--------+
    // |   user_id|locale|birthyear|gender|            joinedAt|          location|timezone|
    // +----------+------+---------+------+--------------------+------------------+--------+
    // |3197468391| id_ID|     1993|  male|2012-10-02T06:40:...|  Medan  Indonesia|     480|
    // |3537982273| id_ID|     1992|  male|2012-09-29T18:03:...|  Medan  Indonesia|     420|
    // | 823183725| en_US|     1975|  male|2012-10-06T03:14:...|Stratford  Ontario|    -240|
    // +----------+------+---------+------+--------------------+------------------+--------+
    
    
    • 把DataFrame持久化到內(nèi)存
      dfUsers.persist()
      
    • 保存到Hive表
      dfUser.write.mode("overwrite").saveAsTable("sparkdf.users")
      

  • DataFrame Operators

    • 常用操作
    map, flatMap 
    sample, filter 
    sort 
    pipe 
    groupBy, groupByKey, cogroup 
    reduce, reduceByKey, fold 
    partitionBy ? zip, union 
    join, crossJoin, leftOuterJoin, rightOuterJoin 
    count, save 
    first, take
    
    • 基本操作
    1、 cache()同步數(shù)據(jù)的內(nèi)存
    2冤荆、 columns 返回一個(gè)string類型的數(shù)組朴则,返回值是所有列的名字
    3、 dtypes返回一個(gè)string類型的二維數(shù)組钓简,返回值是所有列的名字以及類型
    4乌妒、 explan()打印執(zhí)行計(jì)劃  物理的
    5、 explain(n:Boolean) 輸入值為 false 或者true 外邓,返回值是unit  默認(rèn)是false 撤蚊,如果輸入true 將會(huì)打印 邏輯的和物理的
    6、 isLocal 返回值是Boolean類型损话,如果允許模式是local返回true 否則返回false
    7侦啸、 persist(newlevel:StorageLevel) 返回一個(gè)dataframe.this.type 輸入存儲(chǔ)模型類型
    8、 printSchema() 打印出字段名稱和類型 按照樹狀結(jié)構(gòu)來打印
    9丧枪、 registerTempTable(tablename:String) 返回Unit 光涂,將df的對(duì)象只放在一張表里面,這  個(gè)表隨著對(duì)象的刪除而刪除了
    10拧烦、 schema 返回structType 類型忘闻,將字段名稱和類型按照結(jié)構(gòu)體類型返回
    11、 toDF()返回一個(gè)新的dataframe類型的
    12恋博、 toDF(colnames:String*)將參數(shù)中的幾個(gè)字段返回一個(gè)新的dataframe類型的齐佳,
    13私恬、 unpersist() 返回dataframe.this.type 類型,去除模式中的數(shù)據(jù)
    14炼吴、 unpersist(blocking:Boolean)返回dataframe.this.type類型 true 和unpersist是一樣的作用false 是去除RDD
    
    • 聚合查詢操作
    1本鸣、 agg(expers:column*) 返回dataframe類型 ,同數(shù)學(xué)計(jì)算求值
          df.agg(max("age"), avg("salary"))
          df.groupBy().agg(max("age"), avg("salary"))
    2缺厉、 agg(exprs: Map[String, String])  返回dataframe類型 永高,同數(shù)學(xué)計(jì)算求值 map類型的
        df.agg(Map("age" -> "max", "salary" -> "avg"))
        df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
    3、 agg(aggExpr: (String, String), aggExprs: (String, String)*)  返回dataframe類型 提针,同數(shù)學(xué)計(jì)算求值
        df.agg(Map("age" -> "max", "salary" -> "avg"))
        df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
    4命爬、 apply(colName: String) 返回column類型,捕獲輸入進(jìn)去列的對(duì)象
    5辐脖、 as(alias: String) 返回一個(gè)新的dataframe類型饲宛,就是原來的一個(gè)別名
    6、 col(colName: String)  返回column類型嗜价,捕獲輸入進(jìn)去列的對(duì)象
    7艇抠、 cube(col1: String, cols: String*) 返回一個(gè)GroupedData類型,根據(jù)某些字段來匯總
    8久锥、 distinct 去重 返回一個(gè)dataframe類型
    9家淤、 drop(col: Column) 刪除某列 返回dataframe類型
    10、 dropDuplicates(colNames: Array[String]) 刪除相同的列 返回一個(gè)dataframe
    11瑟由、 except(other: DataFrame) 返回一個(gè)dataframe絮重,返回在當(dāng)前集合存在的在其他集合不存在的
    12、 explode[A, B](inputColumn: String, outputColumn: String)(f: (A) ? TraversableOnce[B])(implicit arg0: scala.reflect.api.JavaUniverse.TypeTag[B]) 返回值是dataframe類型歹苦,這個(gè) 將一個(gè)字段進(jìn)行更多行的拆分
        df.explode("name","names") {name :String=> name.split(" ")}.show();
        將name字段根據(jù)空格來拆分青伤,拆分的字段放在names里面
    13、 filter(conditionExpr: String): 刷選部分?jǐn)?shù)據(jù)殴瘦,返回dataframe類型 df.filter("age>10").show();  df.filter(df("age")>10).show();   df.where(df("age")>10).show(); 都可以
    14狠角、 groupBy(col1: String, cols: String*) 根據(jù)某寫字段來匯總返回groupedate類型   df.groupBy("age").agg(Map("age" ->"count")).show();df.groupBy("age").avg().show();都可以
    15、 intersect(other: DataFrame) 返回一個(gè)dataframe蚪腋,在2個(gè)dataframe都存在的元素
    16丰歌、 join(right: DataFrame, joinExprs: Column, joinType: String)
      一個(gè)是關(guān)聯(lián)的dataframe,第二個(gè)關(guān)聯(lián)的條件屉凯,第三個(gè)關(guān)聯(lián)的類型:inner, outer, left_outer, right_outer, leftsemi
        df.join(ds,df("name")===ds("name") and  df("age")===ds("age"),"outer").show();
    17动遭、 limit(n: Int) 返回dataframe類型  去n 條數(shù)據(jù)出來
    18、 na: DataFrameNaFunctions 神得,可以調(diào)用dataframenafunctions的功能區(qū)做過濾 df.na.drop().show(); 刪除為空的行
    19厘惦、 orderBy(sortExprs: Column*) 做alise排序
    20、 select(cols:string*) dataframe 做字段的刷選 df.select($"colA", $"colB" + 1)
    21、 selectExpr(exprs: String*) 做字段的刷選 df.selectExpr("name","name as names","upper(name)","age+1").show();
    22宵蕉、 sort(sortExprs: Column*) 排序 df.sort(df("age").desc).show(); 默認(rèn)是asc
    23酝静、 unionAll(other:Dataframe) 合并 df.unionAll(ds).show();
    24、 withColumnRenamed(existingName: String, newName: String) 修改列表 df.withColumnRenamed("name","names").show();
    25羡玛、 withColumn(colName: String, col: Column) 增加一列 df.withColumn("aa",df("name")).show();
    
    • 行動(dòng)操作
    1别智、 collect() ,返回值是一個(gè)數(shù)組,返回dataframe集合所有的行
    2稼稿、 collectAsList() 返回值是一個(gè)java類型的數(shù)組薄榛,返回dataframe集合所有的行
    3、 count() 返回一個(gè)number類型的让歼,返回dataframe集合的行數(shù)
    4敞恋、 describe(cols: String*) 返回一個(gè)通過數(shù)學(xué)計(jì)算的類表值(count, mean, stddev, min, and max),這個(gè)可以傳多個(gè)參數(shù)谋右,中間用逗號(hào)分隔硬猫,如果有字段為空,那么不參與運(yùn)算改执,只這對(duì)數(shù)值類型的字段啸蜜。例如df.describe("age", "height").show()
    5、 first() 返回第一行 辈挂,類型是row類型
    6衬横、 head() 返回第一行 ,類型是row類型
    7终蒂、 head(n:Int)返回n行  蜂林,類型是row 類型
    8、 show()返回dataframe集合的值 默認(rèn)是20行后豫,返回類型是unit
    9悉尾、 show(n:Int)返回n行突那,挫酿,返回值類型是unit
    10、 table(n:Int) 返回n行  愕难,類型是row 類型
    
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末早龟,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子猫缭,更是在濱河造成了極大的恐慌葱弟,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,548評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件猜丹,死亡現(xiàn)場(chǎng)離奇詭異芝加,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)射窒,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,497評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門藏杖,熙熙樓的掌柜王于貴愁眉苦臉地迎上來将塑,“玉大人,你說我怎么就攤上這事蝌麸〉懔龋” “怎么了?”我有些...
    開封第一講書人閱讀 167,990評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵来吩,是天一觀的道長敢辩。 經(jīng)常有香客問我,道長弟疆,這世上最難降的妖魔是什么戚长? 我笑而不...
    開封第一講書人閱讀 59,618評(píng)論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮兽间,結(jié)果婚禮上历葛,老公的妹妹穿的比我還像新娘。我一直安慰自己嘀略,他們只是感情好恤溶,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,618評(píng)論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著帜羊,像睡著了一般咒程。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上讼育,一...
    開封第一講書人閱讀 52,246評(píng)論 1 308
  • 那天帐姻,我揣著相機(jī)與錄音,去河邊找鬼奶段。 笑死饥瓷,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的痹籍。 我是一名探鬼主播呢铆,決...
    沈念sama閱讀 40,819評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼蹲缠!你這毒婦竟也來了棺克?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,725評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤线定,失蹤者是張志新(化名)和其女友劉穎娜谊,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體斤讥,經(jīng)...
    沈念sama閱讀 46,268評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡纱皆,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,356評(píng)論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片派草。...
    茶點(diǎn)故事閱讀 40,488評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡撑帖,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出澳眷,到底是詐尸還是另有隱情胡嘿,我是刑警寧澤,帶...
    沈念sama閱讀 36,181評(píng)論 5 350
  • 正文 年R本政府宣布钳踊,位于F島的核電站衷敌,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏拓瞪。R本人自食惡果不足惜缴罗,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,862評(píng)論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望祭埂。 院中可真熱鬧面氓,春花似錦、人聲如沸蛆橡。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,331評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽泰演。三九已至呻拌,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間睦焕,已是汗流浹背藐握。 一陣腳步聲響...
    開封第一講書人閱讀 33,445評(píng)論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留垃喊,地道東北人猾普。 一個(gè)月前我還...
    沈念sama閱讀 48,897評(píng)論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像本谜,于是被迫代替她去往敵國和親初家。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,500評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容