本文是一個(gè)如何使用Spark的簡要教程呀癣。首先通過Spark的交互式Shell來介紹API(使用Python或Scala)奏瞬,然后展示如何用Java蛋辈,Scala和Python來寫Spark引用程序避凝。更完整的內(nèi)容請參考編程指南社痛。
為了跟隨這篇指南進(jìn)行學(xué)習(xí)坠七,請首先從Spark網(wǎng)站上下載一個(gè)Spark release包水醋。因?yàn)槲覀冞@里不會(huì)使用HDFS旗笔,所以下載一個(gè)用于任意Hadoop版本的包即可。
使用Spark Shell進(jìn)行交互式分析
基礎(chǔ)
Spark的Shell提供了一種學(xué)習(xí)API的簡單方法拄踪,同樣也是交互式分析數(shù)據(jù)的強(qiáng)大工具蝇恶。在Scala(運(yùn)行在Java虛擬機(jī)上,可以使用現(xiàn)有的Java庫)或Python中可用惶桐。在Spark目錄下運(yùn)行下面命令:
./bin/spark-shell
Spark的主要抽象是彈性分布式數(shù)據(jù)集(RDD)艘包。RDD可以通過Hadoop輸入格式(比如HDFS文件)或者其它RDD轉(zhuǎn)換來創(chuàng)建。讓我們使用Spark源路徑的README文件的文本內(nèi)容來創(chuàng)建一個(gè)RDD耀盗。
scala> val textFile = sc.textFile("README.md")
textFile: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:25
RDD有可以返回值的actions想虎,有可以返回指向新RDD指針的transformations。讓我們從幾個(gè)actions開始:
scala> textFile.count() // Number of items in this RDD
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
scala> textFile.first() // First item in this RDD
res1: String = # Apache Spark
接下來使用transformation叛拷。我們使用filter transformation來返回一個(gè)包含文件條目子集的RDD舌厨。
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:27
可以把transformations和actions鏈接起來使用。
scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15
更多RDD操作
RDD的actions和transformations可用于更復(fù)雜的計(jì)算忿薇。如我們想找到單詞最多的行:
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15
首先使用map
函數(shù)將每一行轉(zhuǎn)換成一個(gè)整數(shù)裙椭,創(chuàng)建了一個(gè)新RDD。然后在這個(gè)新RDD上調(diào)用reduce
函數(shù)找出最大行的單詞數(shù)署浩。map
和reduce
的參數(shù)是Scala函數(shù)式(閉包)揉燃,并且可以使用任何語言特性或者Scala/Java庫。例如筋栋,可以很容易地調(diào)用聲明在其它地方的函數(shù)炊汤。可以使用Math.max()
來讓這段代碼更好理解弊攘。
scala> import java.lang.Math
import java.lang.Math
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15
一種常見的數(shù)據(jù)流模式為MapReduce抢腐,是由Hadoop推廣的。Spark可以很容易地實(shí)現(xiàn)MapReduce:
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[8] at reduceByKey at <console>:28
這里我們結(jié)合了flatmap襟交,map和reduceByKey轉(zhuǎn)換來計(jì)算文件中每個(gè)單詞的數(shù)量迈倍,生成一個(gè)(String,Int)對的RDD捣域。
為了在我們的Shell中統(tǒng)計(jì)單詞數(shù)量啼染,可以使用collect action:
scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
緩存
Spark支持拉取數(shù)據(jù)集到集群范圍的內(nèi)存緩存中。當(dāng)數(shù)據(jù)需要重復(fù)訪問時(shí)會(huì)非常有用焕梅,比如查詢一個(gè)熱數(shù)據(jù)集或者運(yùn)行像PageRank這樣的迭代算法迹鹅。作為一個(gè)簡單的例子,讓我們把linesWithSpark
數(shù)據(jù)集標(biāo)記為緩存:
scala> linesWithSpark.cache()
res7: linesWithSpark.type = MapPartitionsRDD[2] at filter at <console>:27
scala> linesWithSpark.count()
res8: Long = 15
scala> linesWithSpark.count()
res9: Long = 15
用Spark來探索和緩存一個(gè)100行的文本文件看起來有點(diǎn)呆丘侠。有意思的是這些相同的函數(shù)可以用在非常龐大的數(shù)據(jù)集上徒欣,即使跨越數(shù)十或者數(shù)百個(gè)節(jié)點(diǎn)逐样。你也可以像編程指南中描述的一樣通過使用bin/spark-shell
連接到集群的方式蜗字,來交互式地做這件事打肝。
獨(dú)立的應(yīng)用程序
假設(shè)我們想用Spark API寫一個(gè)獨(dú)立的應(yīng)用程序∨膊叮可以使用Scala(with sbt)粗梭,Java(with Maven)和Python實(shí)現(xiàn)一個(gè)簡單的應(yīng)用程序。
在Scala中創(chuàng)建一個(gè)簡單的Spark應(yīng)用程序级零,命名為SimpleApp.scala
断医。
/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SimpleApp {
def main(args: Array[String]) {
val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile, 2).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println(s"Lines with a: $numAs, Lines with b: $numBs")
sc.stop()
}
}
可以看到應(yīng)用程序應(yīng)該定義main()
函數(shù)而不是擴(kuò)展scala.App
。使用scala.App
的子類可能會(huì)運(yùn)行不正常奏纪。這個(gè)程序就計(jì)算了Spark的README文件中包含a
的行數(shù)和包含b
的行數(shù)鉴嗤。注意把YOUR_SPARK_HOME
替換成Spark的安裝路徑。不像之前使用Spark Shell的例子會(huì)初始化自己的SparkContext序调,我們需要初始化SparkContext作為程序的一部分醉锅。
我們要傳一個(gè)包含了應(yīng)用程序信息的SparkConf對象給SparkContext構(gòu)造函數(shù)。
應(yīng)用程序需要依賴Spark API发绢,所以會(huì)包含一個(gè)sbt配置文件硬耍,simple.sbt
,里面說明了Spark是一個(gè)依賴边酒。這個(gè)文件也添加了一個(gè)Spark依賴的庫:
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
為了讓sbt正確運(yùn)行经柴,我們需要根據(jù)經(jīng)典的目錄結(jié)構(gòu)來組織 SimpleApp.scala
和simple.sbt
。完成之后墩朦,我們可以創(chuàng)建一個(gè)包含我們應(yīng)用程序代碼的JAR包坯认,然后使用spark-submit
腳本來運(yùn)行我們的程序。
# Your directory layout should look like this
$ find .
.
./simple.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala
# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.11/simple-project_2.11-1.0.jar
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[4] \
target/scala-2.11/simple-project_2.11-1.0.jar
...
Lines with a: 46, Lines with b: 23
從這里開始
恭喜你運(yùn)行了第一個(gè)Spark應(yīng)用程序氓涣!
- 查看API的深入概述鹃操,從Spark編程指南開始,或者看其它組件的編程指南菜單春哨。
- 要在集群上運(yùn)行應(yīng)用程序荆隘,請看部署概述。
- 最后赴背,Spark在examples目錄中包含了很多示例(Scala椰拒,Java,Python凰荚,R)燃观,可以使用如下方式運(yùn)行:
# For Scala and Java, use run-example:
./bin/run-example SparkPi
# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py
# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R