轉(zhuǎn)載自:https://www.cnblogs.com/mr-bigdata/p/14426049.html
RDD折欠、DataFrame贝或、DataSet介紹
我們每天都在基于框架開發(fā)吼过,對于我們來說,一套易于使用的API太重要了咪奖。對于Spark來說盗忱,有三套API。
分別是:
RDD
DataFrame
DataSet
三套的API羊赵,開發(fā)人員就要學(xué)三套趟佃。不過,從Spark 2.2開始昧捷,DataFrame和DataSet的API已經(jīng)統(tǒng)一了闲昭。而編寫Spark程序的時候,RDD已經(jīng)慢慢退出我們的視野了料身。
但Spark既然提供三套API汤纸,我們到底什么時候用RDD、什么時候用DataFrame芹血、或者DataSet呢贮泞?我們先來了解下這幾套API。
RDD
RDD的概念
RDD是Spark 1.1版本開始引入的幔烛。
RDD是Spark的基本數(shù)據(jù)結(jié)構(gòu)啃擦。
RDD是Spark的彈性分布式數(shù)據(jù)集,它是不可變的(Immutable)饿悬。
RDD所描述的數(shù)據(jù)分布在集群的各個節(jié)點(diǎn)中令蛉,基于RDD提供了很多的轉(zhuǎn)換的并行處理操作。
RDD具備容錯性狡恬,在任何節(jié)點(diǎn)上出現(xiàn)了故障珠叔,RDD是能夠進(jìn)行容錯恢復(fù)的。
RDD專注的是How弟劲!就是如何處理數(shù)據(jù)祷安,都由我們自己來去各種算子來實現(xiàn)。
什么時候使用RDD兔乞?
應(yīng)該避免使用RDD汇鞭!
RDD的短板
集群間通信都需要將JVM中的對象進(jìn)行序列化和反序列化,RDD開銷較大
頻繁創(chuàng)建和銷毀對象會增加GC庸追,GC的性能開銷較大
Spark 2.0開始霍骄,RDD不再是一等公民
從Apache Spark 2.0開始,RDD已經(jīng)被降級為二等公民淡溯,RDD已經(jīng)被棄用了读整。而且,我們一會就會發(fā)現(xiàn)咱娶,DataFrame/DataSet是可以和RDD相互轉(zhuǎn)換的米间,DataFrame和DataSet也是建立在RDD上煎楣。
DataFrame
DataFrame概念
DataFrame是從Spark 1.3版本開始引入的。
通過DataFrame可以簡化Spark程序的開發(fā)车伞,讓Spark處理結(jié)構(gòu)化數(shù)據(jù)變得更簡單择懂。DataFrame可以使用SQL的方式來處理數(shù)據(jù)。例如:業(yè)務(wù)分析人員可以基于編寫Spark SQL來進(jìn)行數(shù)據(jù)開發(fā)另玖,而不僅僅是Spark開發(fā)人員困曙。
DataFrame和RDD有一些共同點(diǎn),也是不可變的分布式數(shù)據(jù)集谦去。但與RDD不一樣的是慷丽,DataFrame是有schema的,有點(diǎn)類似于關(guān)系型數(shù)據(jù)庫中的表鳄哭,每一行的數(shù)據(jù)都是一樣的要糊,因為。有了schema妆丘,這也表明了DataFrame是比RDD提供更高層次的抽象锄俄。
DataFrame支持各種數(shù)據(jù)格式的讀取和寫入,例如:CSV勺拣、JSON奶赠、AVRO、HDFS药有、Hive表毅戈。
DataFrame使用Catalyst進(jìn)行優(yōu)化。
DataFrame專注的是What愤惰!苇经,而不是How!
DataFrame的優(yōu)點(diǎn)
因為DataFrame是有統(tǒng)一的schema的宦言,所以序列化和反序列無需存儲schema扇单。這樣節(jié)省了一定的空間。
DataFrame存儲在off-heap(堆外內(nèi)存)中蜡励,由操作系統(tǒng)直接管理(RDD是JVM管理)令花,可以將數(shù)據(jù)直接序列化為二進(jìn)制存入off-heap中阻桅。操作數(shù)據(jù)也是直接操作off-heap凉倚。
DataFrane的短板
DataFrame不是類型安全的
API也不是面向?qū)ο蟮?/p>
Apache Spark 2.0 統(tǒng)一API
從Spark 2.0開始,DataFrame和DataSet的API合并在一起嫂沉,實現(xiàn)了跨庫統(tǒng)一成為一套API稽寒。這樣,開發(fā)人員的學(xué)習(xí)成本就降低了趟章。只需要學(xué)習(xí)一個High Level的杏糙、類型安全的DataSet API就可以了慎王。——這對于Spark開發(fā)人員來說宏侍,是一件好事赖淤。
上圖我們可以看到,從Spark 2.0開始谅河,Dataset提供了兩組不同特性的API:
非類型安全
類型安全
其中非類型安全就是DataSet[Row]咱旱,我們可以對Row中的字段取別名。這不就是DataFrame嗎绷耍?而類型安全就是JVM對象的集合吐限,類型就是scala的樣例類,或者是Java的實體類褂始。
有Spark 2.0源碼為證:
packageobjectsql{// ...typeDataFrame=Dataset[Row]}
也就是說诸典,每當(dāng)我們用導(dǎo)DataFrame其實就是在使用Dataset。
針對Python或者R崎苗,不提供類型安全的DataSet狐粱,只能基于DataFrame API開發(fā)。
什么時候使用DataFrame
DataSet
DataSet是從Spark 1.6版本開始引入的胆数。
DataSet具有RDD和DataFrame的優(yōu)點(diǎn)脑奠,既提供了更有效率的處理、以及類型安全的API幅慌。
DataSet API都是基于Lambda函數(shù)宋欺、以及JVM對象來進(jìn)行開發(fā),所以在編譯期間就可以快速檢測到錯誤胰伍,節(jié)省開發(fā)時間和成本齿诞。
DataSet使用起來很像,但它的執(zhí)行效率骂租、空間資源效率都要比RDD高很多祷杈。可以很方便地使用DataSet處理結(jié)構(gòu)化渗饮、和非結(jié)構(gòu)數(shù)據(jù)但汞。
DataSet API的優(yōu)點(diǎn)
DataSet結(jié)合了RDD和DataFrame的優(yōu)點(diǎn)。
當(dāng)序列化數(shù)據(jù)時互站,Encoder生成的字節(jié)碼可以直接與堆交互私蕾,實現(xiàn)對數(shù)據(jù)按需訪問,而無需反序列化整個對象胡桃。
類型安全
寫過Java或者C#的同學(xué)都會知道踩叭,一旦在代碼中類型使用不當(dāng),編譯都編譯不過去。日常開發(fā)中容贝,我們更多地是使用泛型自脯。因為一旦我們使用非類型安全的類型,軟件的維護(hù)周期一長斤富,如果集合中放入了一些不合適的類型膏潮,就會出現(xiàn)嚴(yán)重的故障。這也是為什么Java满力、C#還有C++都要去支持泛型的原因戏罢。
在Spark中也會有類型安全的問題。而且脚囊,一旦在運(yùn)行時出現(xiàn)類型安全問題龟糕,會影響整個大規(guī)模計算作業(yè)。這種作業(yè)的錯誤排除難度悔耘,要比單機(jī)故障排查起來更復(fù)雜讲岁。如果在運(yùn)行時期間就能發(fā)現(xiàn)問題,這很美好啊衬以。
DataFrame中編寫SQL進(jìn)行數(shù)據(jù)處理分析缓艳,在編譯時是不做檢查的,只有在Spark程序運(yùn)行起來看峻,才會檢測到問題阶淘。
SQLDataFrameDataset
語法錯誤運(yùn)行時編譯時編譯時
解析錯誤運(yùn)行時運(yùn)行時編譯時
對結(jié)構(gòu)化和半結(jié)構(gòu)化數(shù)據(jù)的High Level抽象
例如:我們有一個較大的網(wǎng)站流量日志JSON數(shù)據(jù)集,可以很容易的使用DataSet[WebLog]來處理互妓,強(qiáng)類型操作可以讓處理起來更加簡單溪窒。
以RDD更易用的API
DataSet引入了更豐富的、更容易使用的API操作冯勉。這些操作是基于High Level抽象的澈蚌,而且基于實體類的操作,例如:進(jìn)行g(shù)roupBy灼狰、agg宛瞄、select、sum交胚、avg份汗、filter等操作會容易很多。
性能優(yōu)化
使用DataFrame和DataSet API在性能和空間使用率上都有大幅地提升蝴簇。
DataFrame和DataSet API是基于Spark SQL引擎之上構(gòu)建的杯活,會使用Catalyst生成優(yōu)化后的邏輯和物理執(zhí)行計劃。尤其是無類型的DataSet[Row](DataFrame)军熏,它的速度更快轩猩,很適合交互式查詢卷扮。
由于Spark能夠理解DataSet中的JVM對象類型荡澎,所以Spark會將將JVM對象映射為Tungsten的內(nèi)部內(nèi)存方式存儲均践。而Tungsten編碼器可以讓JVM對象更有效地進(jìn)行序列化和反序列化,生成更緊湊摩幔、更有效率的字節(jié)碼彤委。
通過上圖可以看到,DataSet的空間存儲效率是RDD的4倍或衡。RDD要使用60GB的空間焦影,而DataSet只需要使用不到15GB就可以了。
Youtube視頻分析案例
數(shù)據(jù)集
去Kaggle下載youtube地址:
https://www.kaggle.com/datasnaek/youtube-new?select=USvideos.csv
每個字段的含義都有說明封断。
Maven開發(fā)環(huán)境準(zhǔn)備
882.123.0.1centralhttp://maven.aliyun.com/nexus/content/groups/public/centralhttp://maven.aliyun.com/nexus/content/groups/public/org.apache.sparkspark-core_${scala.version}${spark.version}org.apache.sparkspark-sql_${scala.version}${spark.version}com.opencsvopencsv5.3src/main/scala
RDD開發(fā)
/**
* Spark RDD處理示例
*/objectRddAnalysis{defmain(args:Array[String]):Unit= {valconf =newSparkConf().setAppName("RDD Process").setMaster("local[*]")valsc =newSparkContext(conf)// 讀取本地文件創(chuàng)建RDDvalyoutubeVideosRDD = {? ? ? ? ? ? sc.textFile("""E:\05.git_project\dataset\youtube""")? ? ? ? }// 統(tǒng)計不同分類Youtube視頻的喜歡人數(shù)斯辰、不喜歡人數(shù)// 1. 添加行號// 創(chuàng)建計數(shù)器valrownumAcc = sc.longAccumulator("rownum")// 帶上行號youtubeVideosRDD.map(line => {? ? ? ? ? ? ? ? rownumAcc.add(1)? ? ? ? ? ? ? ? rownumAcc.value -> line? ? ? ? ? ? })// 過濾掉第一行.filter(_._1 !=1)// 去除行號.map(_._2)// 過濾掉非法的數(shù)據(jù).filter(line => {valfields = line.split("\001")valtry1 = scala.util.Try(fields(8).toLong)valtry2 = scala.util.Try(fields(9).toLong)if(try1.isFailure || try2.isFailure)falseelsetrue})// 讀取三個字段(視頻分類、喜歡的人數(shù)坡疼、不喜歡的人數(shù).map(line => {// 按照\001解析CSVvalfields = line.split("\001")// 取第4個(分類)彬呻、第8個(喜歡人數(shù))、第9個(不喜歡人數(shù))// (分類id, 喜歡人數(shù), 不喜歡人數(shù))(fields(4), fields(8).toLong, fields(9).toLong)? ? ? ? ? ? })// 按照分類id分組.groupBy(_._1)? ? ? ? ? ? .map(t => {valresult = t._2.reduce((r1, r2) => {? ? ? ? ? ? ? ? ? ? (r1._1, r1._2 + r2._2, r1._3 + r2._3)? ? ? ? ? ? ? ? })? ? ? ? ? ? ? ? result? ? ? ? ? ? })? ? ? ? ? ? .foreach(println)? ? }}
運(yùn)行結(jié)果如下:
("BBC Three",8980120,149525)
("Ryan Canty",11715543,80544)
("Al Jazeera English",34427,411)
("FBE",9003314,191819)
("Sugar Pine 7",1399232,81062)
("Rob Scallon",11652652,704748)
("CamilaCabelloVEVO",19077166,1271494)
("Grist",3133,37)
代碼中做了一些數(shù)據(jù)的過濾柄瑰,然后進(jìn)行了分組排序闸氮。如果Spark都要這么來寫的話,業(yè)務(wù)人員幾乎是沒法寫了教沾。著代碼完全解釋了How蒲跨,而不是What。每一個處理的細(xì)節(jié)授翻,都要我們自己親力親為或悲。實現(xiàn)起來臃腫。
查看下基于RDD的DAG
打開瀏覽器堪唐,輸入:localhost:4040隆箩,來看下DAG。
DAG非常的直觀羔杨,按照shuffle分成了兩個Stage來執(zhí)行捌臊。Stage中依次執(zhí)行了每個Operator。程序沒有經(jīng)過任何優(yōu)化兜材。我把每一個操作都和DAG上的節(jié)點(diǎn)對應(yīng)了起來理澎。
DataFrame開發(fā)
objectDataFrameAnalysis{defmain(args:Array[String]):Unit= {valspark =SparkSession.builder()? ? ? ? ? ? .appName("Youtube Analysis")? ? ? ? ? ? .master("local[*]")? ? ? ? ? ? .config("spark.sql.shuffle.partitions",1)? ? ? ? ? ? .getOrCreate()importspark.sqlContext.implicits._// 讀取CSVvalyoutubeVideoDF = spark.read.option("header",true).csv("""E:\05.git_project\dataset\USvideos.csv""")importorg.apache.spark.sql.functions._// 按照category_id分組聚合youtubeVideoDF.select($"category_id", $"likes".cast(LongType), $"dislikes".cast(LongType))? ? ? ? ? ? .where($"likes".isNotNull)? ? ? ? ? ? .where( $"dislikes".isNotNull)? ? ? ? ? ? .groupBy($"category_id")? ? ? ? ? ? .agg(sum("likes"), sum("dislikes"))? ? ? ? ? ? .show()? ? }}
大家可以看到,現(xiàn)在實現(xiàn)方式非常的簡單曙寡,而且清晰糠爬。
查看下基于DataFrame的執(zhí)行計劃與DAG
但我們運(yùn)行上面的Spark程序時,其實運(yùn)行了兩個JOB举庶。
下面這個是第一個Job的DAG执隧。我們看到只有一個Stage。這個DAG我們看得不是特別清楚做了什么,因為Spark SQL是做過優(yōu)化的镀琉,我們需要查看Query的詳細(xì)信息峦嗤,才能看到具體執(zhí)行的工作。
第一個Job的詳細(xì)執(zhí)行信息如下:
哦屋摔,原來這個JOB掃描了所有的行烁设,然后執(zhí)行了一個Filter過濾操作。再查看下查詢計劃:
== Parsed Logical Plan ==
GlobalLimit 1
+- LocalLimit 1
? +- Filter (length(trim(value#6, None)) > 0)
? ? ? +- Project [value#0 AS value#6]
? ? ? ? +- Project [value#0]
? ? ? ? ? ? +- Relation[value#0] text
== Analyzed Logical Plan ==
value: string
GlobalLimit 1
+- LocalLimit 1
? +- Filter (length(trim(value#6, None)) > 0)
? ? ? +- Project [value#0 AS value#6]
? ? ? ? +- Project [value#0]
? ? ? ? ? ? +- Relation[value#0] text
== Optimized Logical Plan ==
GlobalLimit 1
+- LocalLimit 1
? +- Filter (length(trim(value#0, None)) > 0)
? ? ? +- Relation[value#0] text
== Physical Plan ==
CollectLimit 1
+- *(1) Filter (length(trim(value#0, None)) > 0)
? +- FileScan text [value#0] Batched: false, DataFilters: [(length(trim(value#0, None)) > 0)], Format: Text, Location: InMemoryFileIndex[file:/E:/05.git_project/dataset/USvideos.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value:string>
可以非常清晰地看到钓试,我們說看到的DAG是經(jīng)過優(yōu)化后的装黑。
第二個JOB的DAG如下,同樣弓熏,我們也只能看到個大概恋谭。例如:Scan csv讀取csv文件,然后執(zhí)行Spark SQL自動生成挽鞠、優(yōu)化后的Codegen階段箕别,再執(zhí)行了一次Shuffle(Exchange),然后再執(zhí)行Spark SQL的codegen滞谢,最后執(zhí)行mapPartition操作串稀。
為了一探究竟,我們依然得去查看Query Detail狮杨。這個Query Detail圖稍微長一點(diǎn)母截。我們很兩個部分來講解。
第一部分:
掃描csv文件橄教,一共讀取了一個文件清寇,大小是59.8MB,一共有41035行护蝶。鼠標(biāo)移上去华烟,可以看到讀取的文件路徑、讀取的schema是什么持灰。
執(zhí)行過濾操作(Filter)過濾出來的結(jié)果是40949行盔夜。把鼠標(biāo)放在該操作,可以看到具體過濾的內(nèi)容堤魁。
執(zhí)行Project投影查詢喂链。其實就是執(zhí)行select語句。
然后開始執(zhí)行Hash聚合妥泉。按照category_id進(jìn)行分組椭微,并執(zhí)行了partial_sum。
第二部分:
Exchange表示進(jìn)行數(shù)據(jù)交換(其實就是shuffle)盲链,shuffle一共讀取了122行蝇率。
接著進(jìn)行Hash聚合迟杂,按照category分組,并進(jìn)行sum求和本慕,計算得到最終結(jié)果排拷。
最后輸出21行,多出來的一行顯示的第頭部间狂。
雖然DataFrame我們使用的是DSL方式攻泼,但我們可以感受這個過程處理起來比較簡單火架。根據(jù)列進(jìn)行分組聚合的時候鉴象,在編譯時期是對類型不敏感的、非安全的何鸡。我們要保證列名纺弊、類型都是正確的。同時骡男,我們可以清晰的看到Spark SQL對程序執(zhí)行過程的優(yōu)化淆游。
DataSet開發(fā)
要使用DataSet開發(fā),我們先來看一下csv讀取數(shù)據(jù)成為DataFrame的spark源碼隔盛。
defcsv(path:String):DataFrame= {// This method ensures that calls that explicit need single argument works, see SPARK-16009csv(Seq(path): _*)}
我們可以看到csv返回的是一個DataFrame類型犹菱。而進(jìn)一步查看DataFrame的源碼,我們發(fā)現(xiàn):
typeDataFrame=Dataset[Row]
而Row是非類型安全的吮炕,就有點(diǎn)像JDBC里面的ResultSet那樣腊脱。我們?yōu)榱瞬僮髌饋砀樖忠恍x一個實體類來開發(fā)龙亲。
上代碼:
caseclassYoutubeVideo(video_id:String, trending_date:String, title:String, channel_title:String, category_id:String, publish_time:String, tags:String, views:Long, likes:Long, dislikes:Long, comment_count:String, thumbnail_link:String, comments_disabled:Boolean, ratings_disabled:Boolean, video_error_or_removed:String, description:String)caseclassCategoryResult(categoryId:String, totalLikes:Long, totalDislikes:Long)objectDataSetAnalysis{defmain(args:Array[String]):Unit= {valspark =SparkSession.builder()? ? ? ? ? ? .appName("Youtube Analysis")? ? ? ? ? ? .master("local[*]")? ? ? ? ? ? .config("spark.sql.shuffle.partitions",1)? ? ? ? ? ? .getOrCreate()importspark.sqlContext.implicits._// 讀取CSVvalyoutubeVideoDF:DataFrame= spark.read.option("header",true).csv("""E:\05.git_project\dataset\USvideos.csv""")// 轉(zhuǎn)換為DataSetyoutubeVideoDF.printSchema()// 轉(zhuǎn)換為Dataset[YoutubeVideo]valyoutubeVideoDS = youtubeVideoDF.filter(row => {if(row.getString(7) !=null&& !row.getString(7).isBlank? ? ? ? ? ? ? ? && row.getString(8) !=null&& !row.getString(8).isBlank? ? ? ? ? ? ? ? && row.getString(9) !=null&& !row.getString(9).isBlank) {if(util.Try(row.getString(7).toLong).isSuccess? ? ? ? ? ? ? ? ? ? && util.Try(row.getString(8).toLong).isSuccess? ? ? ? ? ? ? ? ? ? && util.Try(row.getString(9).toLong).isSuccess) {true}else{false}? ? ? ? ? ? }else{false}? ? ? ? })? ? ? ? .map(row =>YoutubeVideo(row.getString(0)? ? ? ? ? ? , row.getString(1)? ? ? ? ? ? , row.getString(2)? ? ? ? ? ? , row.getString(3)? ? ? ? ? ? , row.getString(4)? ? ? ? ? ? , row.getString(5)? ? ? ? ? ? , row.getString(6)? ? ? ? ? ? , row.getString(7).toLong? ? ? ? ? ? , row.getString(8).toLong? ? ? ? ? ? , row.getString(9).toLong? ? ? ? ? ? , row.getString(10)? ? ? ? ? ? , row.getString(11)? ? ? ? ? ? , row.getString(12).toLowerCase().toBoolean? ? ? ? ? ? , row.getString(13).toLowerCase().toBoolean? ? ? ? ? ? , row.getString(14)? ? ? ? ? ? , row.getString(15)? ? ? ? ))? ? ? ? youtubeVideoDS.groupByKey(_.category_id)? ? ? ? ? ? .mapValues(y =>CategoryResult(y.category_id, y.likes, y.dislikes))? ? ? ? ? ? .reduceGroups{(cr1, cr2) => {CategoryResult(cr1.categoryId, cr1.totalLikes + cr2.totalLikes, cr1.totalDislikes + cr2.totalDislikes)? ? ? ? ? ? }}// 只獲取Value部分陕凹,key部分過濾掉.map(t => t._2)? ? ? ? ? ? .toDF()? ? ? ? ? ? .show()TimeUnit.HOURS.sleep(1)? ? }}
可以看到,我們對DataFrame進(jìn)行了類型的安全轉(zhuǎn)換鳄炉。來看一下Spark SQL執(zhí)行的JOB杜耙。
同樣,基于DataSet的代碼拂盯,也執(zhí)行了兩個JOB佑女。
第一個JOB是一樣的,因為我們一樣要處理CSV的header谈竿。
而第二部分珊豹,命名我們了用了很多的groupByKey、mapValues榕订、reduceGroups店茶、map等操作。但其底層劫恒,執(zhí)行的還是與DataFrame一樣高效的DAG贩幻。
很明顯轿腺,這個部門是我們編寫的DSL得到的DAG代碼。查看詳細(xì)的執(zhí)行過程:
Spark依然給我們做了不少的一些優(yōu)化動作丛楚。
看一下執(zhí)行計劃族壳。
基于DataSet依然是有執(zhí)行計劃的。依然會基于Catalyst進(jìn)行優(yōu)化趣些。但可以看到仿荆,這個實現(xiàn)明顯比基于DataFrame的邏輯更加復(fù)雜,雖然做的事情差不太多坏平。
對比RDD和DataSet的API
RDD的操作都是最底層的拢操,Spark不會做任何的優(yōu)化。是low level的API舶替,無法執(zhí)行schema的高階聲明式操作
DataSet支持很多類似于RDD的功能函數(shù)令境,而且支持DataFrame的所有操作。其實我們前面看到了DataFrame就是一種特殊的顾瞪、能力稍微弱一點(diǎn)的DataSet舔庶。DataSet是一種High Level的API,在效率上比RDD有很大的提升陈醒。
對比RDD惕橙、DataFrame、DataSet
RDDDataFrameDataSet
schema無
需要自己建立shcema
有
支持自動識別schema
有schema
支持自動識別schema
聚合操作慢最快快
自動性能優(yōu)化無
開發(fā)人員自己優(yōu)化
有有
類型安全安全非安全安全
序列化Java序列化钉跷,存儲/讀取整個Java對象Tungsten弥鹦,堆外內(nèi)存,可以按需存儲訪問屬性Tungsten尘应,堆外內(nèi)存惶凝,可以按需存儲訪問屬性
內(nèi)存使用率低高高
GC創(chuàng)建和銷毀每一個對象都有GC開銷無需GC,使用堆外存儲無需GC犬钢,使用堆外存儲
懶執(zhí)行