本文是對Spark基礎知識的一個學習總結(jié),包含如下幾部分的內(nèi)容:
- 概述
- 運行模式
- Spark Shell
- RDD/DataFrame/DataSet
- 獨立可執(zhí)行程序
- 小結(jié)
參考資料:
1舟肉、Spark的核心代碼是用scala語言開發(fā)的劲弦,且提供了針對scala,java,python幾種語言的官方API打掘,在本文的示例中,我們采用的是基于scala語言的API条摸。所以需要對scala語言有個基礎的了解辰斋。可以參考scala系列文檔埠褪,如《Scala學習筆記(1)-快速起步》和《Scala學習筆記(2)-基礎語法》浓利。
2、Spark對數(shù)據(jù)的處理组橄,處處體現(xiàn)了函數(shù)式編程的思想荞膘,尤其是集合對象的幾個原子操作(filter,map,reduce),熟悉這些對理解spark的使用非常有幫助玉工,可參考文檔《函數(shù)式編程之集合操作》 和《Scala學習筆記(5)-函數(shù)式編程》羽资。
一、概述
(一)簡介
Apache Spark是一種通用的用于大數(shù)據(jù)處理的集群計算框架遵班。Spark是由UC Berkeley AMP lab (加州大學伯克利分校的AMP實驗室)開發(fā)和開源的一個通用計算框架屠升,目前在大數(shù)據(jù)計算領(lǐng)域得到了廣泛的應用,形成了一個高速發(fā)展應用廣泛的生態(tài)系統(tǒng)狭郑。
spark的核心是采用scala語言開發(fā)的腹暖,同時它提供了多種編程語言(如java,scala.python)的客戶端Api接口。spark框架可以運行在各種操作系統(tǒng)上翰萨。
spark目前最新的版本是2.4.0脏答,是在2018年11月份發(fā)布的。本系列文章所介紹的都基于2.4.0版本亩鬼。
整個spark系統(tǒng)主要由兩部分組成:
1)spark core庫:包括支持Spark通用執(zhí)行引擎的API殖告,以及一些核心的用戶API。
2)spark擴展庫:基于spark core庫開發(fā)的一些重要的在不同場景下使用的功能雳锋,在實際的產(chǎn)品開發(fā)中黄绩,我們更多的基于這些擴展庫來開發(fā)自己的應用。
(二)spark擴展庫
在Spark Core的基礎上玷过,Spark提供了一系列面向不同應用需求的組件爽丹,主要有Spark SQL、StructuredStreaming/Spark Streaming辛蚊、MLlib粤蝎、GraphX。
下面分別簡單介紹下袋马。
1诽里、Spark SQL
Spark SQL是Spark用來操作結(jié)構(gòu)化數(shù)據(jù)的組件。通過Spark SQL飞蛹,用戶可以使用SQL或者Apache Hive版本的SQL方言(HQL)來查詢數(shù)據(jù)谤狡。Spark SQL支持多種數(shù)據(jù)源類型,例如Hive表卧檐、Parquet以及JSON等墓懂。Spark SQL不僅為Spark提供了一個SQL接口,還支持開發(fā)者將SQL語句融入到Spark應用程序開發(fā)過程中霉囚,無論是使用Python捕仔、Java還是Scala,用戶可以在單個的應用中同時進行SQL查詢和復雜的數(shù)據(jù)分析盈罐。
2榜跌、StructuredStreaming/Spark Streaming
Spark Streaming是spark中一個非常重要的擴展庫,它是Spark核心API的一個擴展盅粪,可以實現(xiàn)高吞吐量的钓葫、具備容錯機制的實時流數(shù)據(jù)的處理。支持從多種數(shù)據(jù)源獲取數(shù)據(jù)票顾,包括Kafk础浮、Flume、以及TCP socket等奠骄,從數(shù)據(jù)源獲取數(shù)據(jù)之后豆同,可以使用諸如map、reduce和window等高級函數(shù)進行復雜算法的處理含鳞。最后還可以將處理結(jié)果存儲到文件系統(tǒng)和數(shù)據(jù)庫等影锈。
但從Spark2.0開始,提出了新的實時流框架 Structured Streaming (2.0和2.1是實驗版本蝉绷,從Spark2.2開始為穩(wěn)定版本)來替代Spark streaming鸭廷,這時Spark streaming就進入維護模式。相比Spark Streaming潜必,Structured Streaming的Api更加好用靴姿,功能強大。
3磁滚、MLlib
MLlib是Spark提供的一個機器學習算法庫佛吓,其中包含了多種經(jīng)典、常見的機器學習算法垂攘,主要有分類维雇、回歸、聚類晒他、協(xié)同過濾等吱型。MLlib不僅提供了模型評估、數(shù)據(jù)導入等額外的功能陨仅,還提供了一些更底層的機器學習原語津滞,包括一個通用的梯度下降優(yōu)化基礎算法铝侵。所有這些方法都被設計為可以在集群上輕松伸縮的架構(gòu)。
4触徐、GraphX
GraphX是Spark面向圖計算提供的框架與算法庫咪鲜。GraphX中提出了彈性分布式屬性圖的概念,并在此基礎上實現(xiàn)了圖視圖與表視圖的有機結(jié)合與統(tǒng)一撞鹉;同時針對圖數(shù)據(jù)處理提供了豐富的操作疟丙,例如取子圖操作subgraph、頂點屬性操作mapVertices鸟雏、邊屬性操作mapEdges等享郊。GraphX還實現(xiàn)了與Pregel的結(jié)合,可以直接使用一些常用圖算法孝鹊,如PageRank炊琉、三角形計數(shù)等。
(三)版本下載與安裝
我們可以從官方網(wǎng)站https://spark.apache.org/downloads.html下載所需的版本惶室,我們是下載的二進制安裝包温自,版本是2.4.0,下載的壓縮文件是 spark-2.4.0-bin-hadoop2.7.tgz皇钞。然后把上面的壓縮文件解壓到某個目錄即可悼泌。
spark的運行依賴jdk環(huán)境,對于2.4.0版本夹界,需要java8及以上的版本馆里。對于python客戶端,需要Python 2.7 及以上版本 或 Python3.4及以上版本可柿。對于scala客戶端鸠踪,需要Scala 2.11版本。
因為spark是一個分布式集群系統(tǒng)复斥,我們需要在每臺節(jié)點上去安裝spark营密。當然,如果只是開發(fā)和學習目锭,只需在一臺機器上安裝评汰。下面章節(jié)會介紹spark的運行方式。
二痢虹、運行模式
spark有多種運行模式被去,可以在本地運行,也可以在分布式集群模式下運行奖唯,而且集群模式下可以支持多種集群管理器惨缆,下面一一介紹。
(一)local(本地模式)
只需要一臺機器,運行該模式非常簡單坯墨,只需要把Spark的安裝包解壓后寂汇,默認也不需修改任何配置文件,取默認值捣染。不用啟動Spark的Master健无、Worker守護進程( 只有集群的Standalone方式時,才需要這兩個角色)液斜,也不用啟動Hadoop的各服務(除非你要用到HDFS)。
運行客戶端程序(可以是spark自帶的命令行程序叠穆,如spark-shell少漆,也可以是程序員利用spark api編寫的程序),就可以完成相應的運行硼被。相當于這一個客戶端進程示损,充當了所有的角色。
這種模式嚷硫,只適合開發(fā)階段使用检访,我們可以在該模式下開發(fā)和測試代碼,使的代碼的邏輯沒問題仔掸,后面再提交到集群上去運行和測試脆贵。
本文中的例子主要是學習spark的一些核心API,為了搭建環(huán)境的簡化,采用的是獨立模式起暮。
在實際生產(chǎn)環(huán)境卖氨,spark會采用集群模式來運行,即分布式式運行负懦,spark可以使用多種集群資源管理器來管理自己的集群筒捺。
(二)standalone(集群模式之一)
Standalone模式,即獨立模式纸厉,自帶完整的服務系吭,使用spark自帶的集群資源管理功能】牌罚可單獨部署到一個集群中肯尺,無需依賴任何其他資源管理系統(tǒng)。即每臺機器上只需部署下載的Spark版本即可抛猫。
這種模式需要提前啟動spark的master和Worker守護進程蟆盹,才能運行spark客戶端程序。
因為Standalone模式不需要依賴任何第三方組件闺金,如果數(shù)據(jù)量比較小逾滥,且不需要hadoop(如不需要訪問hdfs服務),則使用Standalone模式是一種可選的簡單方便的方案。
(三)On YARN模式(集群模式之二)
該模式寨昙,使用hadoop的YARN作為集群資源管理器讥巡。這種模式下因為使用yarn的服務進行資源管理,所以不需要啟動Spark的Master舔哪、Worker守護進程欢顷。
如果你的應用不僅使用spark,還用到hadoop生態(tài)圈的其它服務捉蚤,從兼容性上考慮抬驴,使用Yarn作為統(tǒng)一的資源管理是更好的選擇,這樣選擇這種模式就比較適合缆巧。
(四)On Mesos模式(集群模式之三)
該模式布持,使用Mesos作為集群資源管理器。如果你的應用還使用了docker陕悬,則選擇此模式更加通用题暖。
(五)偽分布式集群模式
即在一臺機器上模擬集群下的分布式場景,會啟動多個進程捉超。上述的2,3,4三種集群模式都可以啟動偽分布式集群模式胧卤,當然要求機器的配置滿足要求。這種模式主要是開發(fā)階段和學習使用拼岳。
說明:因為本文主要是介紹spark的基礎知識枝誊,會通過一些實例介紹如何使用spark來進行數(shù)據(jù)分析和計算。為了簡單化裂问,我們采用的是lcoal模式侧啼。
三、Spark Shell
Spark 版本中提供了系列的交互式命令行程序堪簿,用它來進行spark API的學習是最方便的了痊乾,它支持scala、python椭更、R多種語言的api(在本文中哪审,我們使用scala語言),我們可以利用它來進行API的學習虑瀑;同時它也是一種以交互方式來進行數(shù)據(jù)分析的強大工具湿滓,我們可以直接利用它進行數(shù)據(jù)的分析。
spark提供了多個腳本程序(為不同的編程語言提供不同的腳本)舌狗,位于spark壓縮包解壓后目錄的bin目錄下叽奥。比如針對scala語言,linux下腳本文件名為spark-shell痛侍,windows下文件名為spark-shell.cmd朝氓。
在控制臺下,運行spark-shell,出現(xiàn)交互式界面赵哲,就可以輸入scala代碼待德,輸入:quit退出交互式程序(注意quit前要有冒號)。如下面的界面:
scala> val value=3+2
value: Int = 5
scala> val str = "hello"
str: String = hello
scala> str.length()
res14: Int = 5
scala> :quit
運行spark shell枫夺,會出現(xiàn)scala提示符将宪,然后就可以在該提示符下輸入scala代碼,按回車執(zhí)行橡庞。退出時輸入:quit命令较坛。
在下面的章節(jié)中,我們會在spark shell中來舉例說明spark的一些核心api的概念和使用方式扒最。
四燎潮、RDD /DataFrame/DataSet
(一)基本概念
spark是用來處理數(shù)據(jù)的,這樣就需要一種數(shù)據(jù)模型來表示數(shù)據(jù)扼倘。RDD,DsataFrame,DataSet是其版本發(fā)展過程出現(xiàn)的三種模型除呵,也就是三種API再菊。spark對數(shù)據(jù)的處理的主要操作就是圍繞這些API的處理。
其中RDD(Resilient Distributed Dataset)被稱為彈性的分布式數(shù)據(jù)集颜曾,是spark舊的核心API纠拔,在Spark 2.0之前,Spark的主要編程接口是RDD泛豪。但是在Spark 2.0之后稠诲,RDD被新的DataSet數(shù)據(jù)集取代,DataSet像RDD一樣強類型诡曙,但在底層有更豐富的優(yōu)化臀叙。DataSet與RDD一樣,內(nèi)置了各種函數(shù)操作价卤,通過函數(shù)式操作可以完成各種強大的計算劝萤。spark2.4.0版本仍然支持RDD接口,但是強烈建議切換到使用數(shù)據(jù)集慎璧,它具有比RDD更好的性能床嫌。
DataFrame開始是在spark1.3.0版本提出來的,開始時以RDD為基礎胸私,它在概念上等同于關(guān)系數(shù)據(jù)庫中的表或R / Python中的數(shù)據(jù)框厌处,但是進行了更豐富的優(yōu)化。DataFrame與RDD的主要區(qū)別在于岁疼,前者帶有schema元數(shù)據(jù)信息阔涉,既DataFrame所表示的二維數(shù)據(jù)集的每一列都帶有名稱和類型。而RDD中存儲的只是一個對象。如下圖所示:
如上圖所示洒敏,左側(cè)的RDD[Person]雖然以Person為類型參數(shù)龄恋,但Spark框架本身不了解Person類的內(nèi)部結(jié)構(gòu)。而右側(cè)的DataFrame卻提供了詳細的結(jié)構(gòu)信息凶伙,使得Spark SQL可以清楚地知道該數(shù)據(jù)集中包含哪些列郭毕,每列的名稱和類型各是什么。DataFrame多了數(shù)據(jù)的結(jié)構(gòu)信息函荣,即schema显押。
Dataset是分布式數(shù)據(jù)集合。是Spark1.6中添加的新接口傻挂,它提供了RDD的優(yōu)點以乘碑,并在性能上進行了優(yōu)化。在Spark2.0之后金拒,自Spark2.0之后兽肤,DataFrame和DataSet合并為更高級的DataSet,新的DataSet具有兩個不同的API特性:
非強類型(untyped)绪抛,DataSet[Row]是泛型對象的集合资铡,它的別名是DataFrame;
強類型(strongly-typed)幢码,DataSet[T]是具體對象的集合笤休,如scala和java中定義的類.
下面我們通過spark shell來舉例說明如何使用DataSet,因為RDD已經(jīng)全面被DataSet替換症副,本文中不再介紹RDD的使用店雅。DataFrame的使用我們將在Spark SQL學習筆記中介紹。
(二)創(chuàng)建DataSet對象
使用DataSet數(shù)據(jù)集贞铣,首先要創(chuàng)建DataSet闹啦,可以從Hadoop InputFormats(例如HDFS文件)或通過轉(zhuǎn)換其他數(shù)據(jù)集來創(chuàng)建DataSet。
不夠最簡單的方式是從列表對象來創(chuàng)建DataSet對象辕坝,如下面例子:
scala> val ds = Seq("hello","world").toDS()
ds: org.apache.spark.sql.Dataset[String] = [value: string]
上述代碼先利用Seq創(chuàng)建了一個列表亥揖,調(diào)用toDS方法生成一個Dataset對象。
我們可以調(diào)用Dataset對象的show方法來輸出其中數(shù)據(jù)圣勒,show方法會以表格的方式輸出费变,這里Dataset中的元素是字符串對象,默認的列名為value圣贸。
scala> ds.show()
+-----+
|value|
+-----+
|hello|
|world|
+-----+
我們也可以創(chuàng)建數(shù)值型的DataSet挚歧,如下面例子:
scala> val ds = Seq(12,15,16).toDS()
ds: org.apache.spark.sql.Dataset[Int] = [value: int]
scala> ds.show()
+-----+
|value|
+-----+
| 12|
| 15|
| 16|
+-----+
下面我們來自定義一個類,DataSet中存放該類的對象吁峻,如下面例子代碼:
scala> case class Person(name:String,age:Long)
defined class Person
scala> val ds = Seq(Person("tom",12),Person("kad",22)).toDS()
ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
scala> ds.show()
+----+---+
|name|age|
+----+---+
| tom| 12|
| kad| 22|
+----+---+
上面代碼定義了一個Person類滑负,類中有兩個成員變量在张。調(diào)用show方法時,自動將類中的變量作為列名來顯示矮慕。
更多時候帮匾,我們會從外部數(shù)據(jù)源(如文件)來創(chuàng)建DataSet對象,下面舉例從Spark安裝目錄中的本地README文件的文本中創(chuàng)建一個新的DataSet痴鳄。如下面代碼:
scala> val textFile = spark.read.textFile("README.md")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]
上面創(chuàng)建了一個DataSet[String]瘟斜,DataSet中存儲的是字符串對象,每個字符串對應README文件中的一行痪寻。
當然還有更多創(chuàng)建DataSet對象的方法螺句,上面只是一些最簡單的方式。下面我們來接著介紹DataSet的一些常見操作橡类。
(三)操作DataSet對象
DataSet類提供了豐富的方法來對數(shù)據(jù)進行計算蛇尚,下面通過舉例的方式來介紹一些常見的方法。
scala> textFile.count()
res1: Long = 104
上面代碼我們調(diào)用了DataSet的count函數(shù)顾画,獲取了DataSet中包含的對象個數(shù)取劫,返回的值就是README.md文件中的行數(shù)豁陆。
scala> textFile.first()
res4: String = # Apache Spark
上面代碼調(diào)用了DataSet的first函數(shù)像啼,返回了DataSet中的第一個對象,這里返回的是一個String對象埠巨,內(nèi)容是“# Apache Spark”义辕。
如果了解java8,python等編程語言中數(shù)據(jù)集合的高階函數(shù),這些語言的數(shù)據(jù)集(如列表)都支持高階函數(shù)filter,map等寓盗。同樣spark中的DataSet提供了強大的高階函數(shù)功能灌砖,也是我們使用DataSet主要的方式。如下面例子:
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
scala> linesWithSpark.count()
res5: Long = 20
上面代碼先調(diào)用了DataSet的filter函數(shù)返回一個新的數(shù)據(jù)集傀蚌,然后調(diào)用count方法獲取新數(shù)據(jù)集中的對象個數(shù)基显。與其它語言中的filter函數(shù)一樣,spark中的filter函數(shù)是一個高階函數(shù)善炫,用于返回滿足條件的數(shù)據(jù)生成的一個新的數(shù)據(jù)集撩幽。上面的filter函數(shù)參數(shù)是一個Lambda表達式,只有字符串中包含”Spark“子串的字符串才不被過濾箩艺。
再比如窜醉,如果我們要返回所有字符串長度大于100的數(shù)據(jù)集,代碼如下:
val linesWithSpark = textFile.filter(line => line.length>100)
下面我們再看一個map函數(shù)的使用艺谆,map函數(shù)也是一個常見的集合操作中的高階函數(shù)榨惰,其作用是將一個數(shù)據(jù)集映射成一個新的數(shù)據(jù)集,map函數(shù)與filter函數(shù)不一樣静汤,它返回的新的數(shù)據(jù)集中的元素個數(shù)與原數(shù)據(jù)集一樣琅催,但數(shù)據(jù)集中的數(shù)據(jù)發(fā)生了變化居凶,可以是新的數(shù)據(jù)類型。如下面例子代碼:
scala> val linesWithSpark = textFile.map(line => line.length)
linesWithSpark: org.apache.spark.sql.Dataset[Int] = [value: int]
scala> linesWithSpark.count()
res8: Long = 104
scala> linesWithSpark.first()
res9: Int = 14
上面代碼中藤抡,map函數(shù)返回一個新的數(shù)據(jù)集侠碧,新數(shù)據(jù)集中每個數(shù)據(jù)是原數(shù)據(jù)集中字符串對象的長度。
我們再來看集合操作中另一個常見的原子操作reduce缠黍,spark的DataSet也提供了reduce函數(shù)弄兜,reduce函數(shù)的作用用來對數(shù)據(jù)做匯總等計算,比如前面的例子中的count函數(shù)實際上就是一個reduce操作的特列嫁佳。我們可以直接使用reduce函數(shù)來統(tǒng)計DataSet中的元素個數(shù)挨队,代碼如:
scala> val textFile = spark.read.textFile("README.md")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]
scala> textFile.count()
res11: Long = 104
scala> textFile.map(line=>1).reduce((re,a)=>re+a)
res12: Int = 104
上面代碼中最后一行代碼,我們先調(diào)用了map函數(shù)蒿往,返回一個新的數(shù)據(jù)集盛垦,數(shù)據(jù)集中每個元素都是數(shù)值1,然后對新的數(shù)據(jù)集調(diào)用reduce函數(shù)(這里是連在一起寫瓤漏,沒用單獨的變量腾夯,這正是函數(shù)式編程的常用方式,多個函數(shù)調(diào)用串在一起完成所需的功能)蔬充。reduce函數(shù)有兩個參數(shù)蝶俱,第1個參數(shù)是用來存放結(jié)果的,第2個參數(shù)是代表集合中元素饥漫。關(guān)于reduce函數(shù)的詳細含義這里不詳細介紹榨呆。
從上面結(jié)果,可以看出對數(shù)據(jù)集調(diào)用map和reduce函數(shù)得到的結(jié)果和直接調(diào)用count函數(shù)得到的結(jié)果是一樣的庸队。
我們再看一個稍微復雜的例子积蜻,textFile數(shù)據(jù)集中每個元素是對應文本文件中的一行,每行由多個英文單詞彻消,我們希望計算出含單詞數(shù)最多的行所包含的單詞數(shù)竿拆。這個功能的實現(xiàn)代碼如下:
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res13: Int = 22
上面代碼先調(diào)用了DataSet的map函數(shù),返回一個新的數(shù)據(jù)集宾尚,數(shù)據(jù)集中的每個元素是文件中每行文本中包含的單詞數(shù)丙笋。然后調(diào)用reduce函數(shù),計算出新的數(shù)據(jù)集中的最大值煌贴。這里傳給reduce函數(shù)的Lambda表達式函數(shù)體是一個if語句(在scala中御板,if語句也是一個表達式),我們可以直接用scala庫中的Math類的max函數(shù)來代替牛郑,代碼如:
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res19: Int = 22
可以看出稳吮,結(jié)果是一樣的。
我們在看一個更復雜的例子井濒,就是統(tǒng)計文件中的單詞重復出現(xiàn)的次數(shù)灶似,如果學習過hadoop的mapreduce功能的會知道列林,這是mapreduce應用的一個經(jīng)典例子,我們看看使用spark是怎么實現(xiàn)的酪惭?代碼如下:
scala> val textFile = spark.read.textFile("README.md")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]
scala> wordCounts.collect()
res20: Array[(String, Long)] = Array((online,1), (graphs,1), (["Parallel,1), (["Building,1), (thread,1),......
上面代碼希痴,第一行語句,首先創(chuàng)建了一個DataSet春感,加載了文件中內(nèi)容砌创,這個上面例子已經(jīng)反復用過。
第2行語句鲫懒,先調(diào)用DataSet的flatMap函數(shù)嫩实,flatMap函數(shù)也是一個高階函數(shù),將行數(shù)據(jù)集轉(zhuǎn)換為單詞數(shù)據(jù)集(即新數(shù)據(jù)集中的元素是文件中的每個單詞)窥岩,再對新的數(shù)據(jù)集組合調(diào)用DataSet的groupByKey函數(shù)和count函數(shù)甲献,返回一個數(shù)據(jù)集,該數(shù)據(jù)集中的每個元素是一個二維元組(類似key-value鍵值對)颂翼,元組中第1個值是單詞本身晃洒,第2個值是數(shù)值1。
第3行語句朦乏,針對返回的數(shù)據(jù)集調(diào)用collect函數(shù)球及,該函數(shù)返回的是一個數(shù)組,數(shù)組中每個元素也是一個二維元組呻疹,元組中第1個值是單詞本身(不重復的單詞)吃引,第2個值是單詞出現(xiàn)的個數(shù)。
可以看出刽锤,用scala的Dataset來實現(xiàn)mapreduce的功能镊尺,比編寫Mapreduce代碼簡單多了,而且更加清晰明了姑蓝。關(guān)于Mapreduce程序的編寫可參考文檔《mapreduce學習筆記》。
五吕粗、獨立可執(zhí)行程序
下面我們用spark提供的scala api來編寫一個可獨立運行的scala程序纺荧,例子代碼如下:
import org.apache.spark.sql.SparkSession
object SimpleApp {
def main(args: Array[String]) {
val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
val logData = spark.read.textFile(logFile).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println(s"Lines with a: $numAs, Lines with b: $numBs")
spark.stop()
}
}
上面代碼創(chuàng)建了一個scala的單例對象,包含main函數(shù)颅筋,是一個典型的可獨立運行的scala程序宙暇。可以看出议泵,相比在spark shell中執(zhí)行占贫,區(qū)別在于
編譯和運行spark的scala程序,最好方式是使用scala的sbt工具(類似java中的maven工具)先口。關(guān)于如何使用sbt型奥,我們這里不作詳細介紹瞳收。
六、小結(jié)
在本文中厢汹,我們介紹了Spark的基本概念螟深,并通過spark shell演示了spark中的核心Api DataSet的使用。在后面的文章中烫葬,我們將會介紹spark中兩個重要的擴展庫Spark SQL和StructruedStreaming,它們?yōu)閿?shù)據(jù)的處理提供了更加方便和強大的操作界弧。
需要說明的是,Spark依然處于快速發(fā)展階段中搭综,其提供的功能可能隨著版本的演進也會在不停的演進垢箕,就如RDD被DataSet替換,Spark Streaming被StructuredStreaming替換兑巾,我們需要關(guān)注其最新發(fā)展条获。shi