本文主要翻譯至鏈接且不局限于該文內(nèi)容,也加入了筆者實(shí)踐內(nèi)容榜聂,翻譯水平有限搞疗,歡迎指正,轉(zhuǎn)載請注明出處须肆。
本教程提供了使用Spark的快速介紹匿乃。 我們將首先通過Spark的交互式shell(在Python或Scala中)介紹部分API桩皿,然后演示如何使用Java,Scala和Python編寫應(yīng)用程序幢炸。 有關(guān)更完整的參考泄隔,請參閱編程指南。
你可以先從Spark網(wǎng)站下載Spark的打包版本阳懂。 由于本文中我們不會使用HDFS梅尤,因此下載時(shí)不需要關(guān)注Hadoop的版本。
使用Spark Shell進(jìn)行交互式分析
基礎(chǔ)
Spark的shell環(huán)境提供了一個(gè)簡單的方法來學(xué)習(xí)API岩调,同時(shí)它也是一個(gè)強(qiáng)大的交互式分析數(shù)據(jù)的工具巷燥。 它可以在Scala(Scala在Java VM上運(yùn)行,因此可以方便的使用現(xiàn)有的Java庫)或Python中使用号枕。 (本文以Scala語言為例)通過在Spark目錄中運(yùn)行以下命令來啟動(dòng)它:
./bin/spark-shell
Spark上運(yùn)行的主要抽象是一個(gè)稱為RDD(Resilient Distributed Dataset缰揪,彈性分布式數(shù)據(jù)集)的集合,RDDs可以從Hadoop的InputFormats(例如HDFS文件)中創(chuàng)建葱淳,或者從其他的RDDs轉(zhuǎn)換钝腺。我們先用如下命令以Spark目錄下的README文件作為數(shù)據(jù)源創(chuàng)建一個(gè)RDD:
scala> val textFile = sc.textFile("README.md")
textFile: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:25
返回的RDDs很一些方法可以執(zhí)行,參考文檔1:actions和文檔2:transformations赞厕,其中actions返回普通的值艳狐,transformations返回新的RDD。例如皿桑,下面是兩個(gè)action:
scala> textFile.count() // RDD中有多少行數(shù)
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
scala> textFile.first() // RDD的第一行
res1: String = # Apache Spark
下面這個(gè)例子使用filter轉(zhuǎn)換毫目,并返回一個(gè)新的RDD,它是README文件的一個(gè)子集:
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:27
我們也可以鏈?zhǔn)秸{(diào)用這些方法:
scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15
更多的一些RDD操作
RDD的actions和transformations可以用來執(zhí)行更復(fù)雜的運(yùn)算诲侮,例如我們想找出出現(xiàn)最多的單詞:
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15
第一行創(chuàng)建了一個(gè)新的RDD镀虐,然后將每一行映射到一個(gè)整數(shù)值。reduce函數(shù)鏈?zhǔn)教幚碓揜DD并找到最大行計(jì)數(shù)沟绪。 其中map和reduce的參數(shù)是Scala中的語法(閉包)刮便,這里也可以使用任何Scala / Java語言的其他特性或庫。 例如绽慈,下面的例子中恨旱,我們使用Math.max()函數(shù)來使這段代碼更容易理解:
scala> import java.lang.Math
import java.lang.Math
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15
一種常見的數(shù)據(jù)流模式是MapReduce,是從Hadoop流行起來的坝疼。 Spark可以輕松實(shí)現(xiàn)MapReduce流程:
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[8] at reduceByKey at <console>:28
在這里搜贤,我們將flatMap,map和reduceByKey命令結(jié)合起來裙士,作為(String入客,Int)對的RDD來計(jì)算文件中的每個(gè)字計(jì)數(shù)管毙。 要在我們的shell中收集字?jǐn)?shù)腿椎,我們可以使用collect操作:
scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
緩存
Spark也允許對處理中的數(shù)據(jù)集進(jìn)行緩存桌硫,數(shù)據(jù)可以緩存在集群范圍內(nèi)的節(jié)點(diǎn)內(nèi)存中,以便可以對一些“熱數(shù)據(jù)”快速訪問啃炸。示例代碼如下:
scala> linesWithSpark.cache()
res7: linesWithSpark.type = MapPartitionsRDD[2] at filter at <console>:27
scala> linesWithSpark.count()
res8: Long = 15
scala> linesWithSpark.count()
res9: Long = 15
Spark的功能絕對不僅限于處理這種只有幾百行的小數(shù)據(jù)铆隘,更具有吸引力的是所有這些函數(shù)都支持在超大規(guī)模的數(shù)據(jù)集上工作,即使這些數(shù)據(jù)分布在數(shù)十或數(shù)百個(gè)節(jié)點(diǎn)上南用。你可以通過bin/spark-shell腳本連接的Spark集群中操作這些數(shù)據(jù)膀钠,詳細(xì)的描述請參考編程指南。
自包含應(yīng)用程序
假設(shè)我們想要使用Spark API寫一段自包含的應(yīng)用程序裹虫,下面依次看幾段示例代碼:
Scala(使用sbt構(gòu)建)
/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SimpleApp {
def main(args: Array[String]) {
val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile, 2).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println(s"Lines with a: $numAs, Lines with b: $numBs")
sc.stop()
}
}
上面這個(gè)例子創(chuàng)建了一個(gè)名為SimpleApp.scala的文件肿嘲。注意,應(yīng)用程序應(yīng)該定義一個(gè)main()方法筑公,而不是繼承scala.App雳窟,繼承的這種方式可能無法正常工作。
該程序?qū)崿F(xiàn)的功能是計(jì)算Spark README文件中包含字符‘a(chǎn)’的行數(shù)和包含字符‘b’的行數(shù)匣屡。如果要執(zhí)行這個(gè)程序封救,請?zhí)鎿Q正確的YOUR_SPARK_HOME路徑。與前面的Spark shell初始化自己的SparkContext的例子不同捣作,這里我們需要手動(dòng)初始化一個(gè)SparkContext誉结。程序的配置信息則通過一個(gè)SparkConf對象傳遞給SparkContext的構(gòu)造器。
我們的程序依賴Spark API券躁,因此我們需要準(zhǔn)備一個(gè)sbt的配置文件惩坑,simple.sbt,它將描述Spark是程序的依賴項(xiàng)嘱朽。這個(gè)文件也添加了一個(gè)Spark依賴的存儲庫:
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
為了使sbt正常工作旭贬,我們按正常的代碼目錄格式分布文件SimpleApp.scala和simple.sbt,完成后搪泳,我們就可以將該應(yīng)用程序打包成一個(gè)jar文件稀轨,然后使用spark-submit腳本提交到Spark執(zhí)行。
# Your directory layout should look like this
$ find .
.
./simple.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala
# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.11/simple-project_2.11-1.0.jar
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[4] \
target/scala-2.11/simple-project_2.11-1.0.jar
...
Lines with a: 46, Lines with b: 23
Java(使用Maven構(gòu)建)
/* SimpleApp.java */
import org.apache.spark.api.java.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
public class SimpleApp {
public static void main(String[] args) {
String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
SparkConf conf = new SparkConf().setAppName("Simple Application");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> logData = sc.textFile(logFile).cache();
long numAs = logData.filter(new Function<String, Boolean>() {
public Boolean call(String s) { return s.contains("a"); }
}).count();
long numBs = logData.filter(new Function<String, Boolean>() {
public Boolean call(String s) { return s.contains("b"); }
}).count();
System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
sc.stop();
}
}
這個(gè)列子使用了Maven來編譯和構(gòu)建一個(gè)名為SimpleApp.java的JAR程序岸军,它實(shí)現(xiàn)的功能和上文Scala一致奋刽。你也可以使用其他任意可行的構(gòu)建系統(tǒng)。
與上文Scala一樣艰赞,我們需要初始化一個(gè)SparkContext對象佣谐,上文的例子中使用了一個(gè)更友好的JavaSparkContext對象,然后創(chuàng)建了RDDs對象(即JavaRDD)并在他們上執(zhí)行了transformations方法方妖。最后狭魂,我們給Spark傳遞了繼承至spark.api.java.function.Function的匿名類來執(zhí)行作業(yè)。更詳細(xì)的功能請參考Spark編程指南。
為了構(gòu)建這個(gè)程序雌澄,我們需要編寫一個(gè)pom.xml文件并添加Spark作為依賴項(xiàng)斋泄,注意,Spark的artifacts使用了Scala的版本標(biāo)記(2.11表示的是scala的版本)镐牺。
<project>
<groupId>edu.berkeley</groupId>
<artifactId>simple-project</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Simple Project</name>
<packaging>jar</packaging>
<version>1.0</version>
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>
</dependencies>
</project>
按目錄組織這些文件炫掐,如:
$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java
然后,使用maven命令進(jìn)行編譯和構(gòu)建睬涧,之后就可以使用spark-submit腳本提交到Spark上執(zhí)行:
# Package a JAR containing your application
$ mvn package
...
[INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[4] \
target/simple-project-1.0.jar
...
Lines with a: 46, Lines with b: 23
Python
"""SimpleApp.py"""
from pyspark import SparkContext
logFile = "YOUR_SPARK_HOME/README.md" # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print("Lines with a: %i, lines with b: %i" % (numAs, numBs))
sc.stop()
類似的募胃,python版本創(chuàng)建了一個(gè)SimpleApp.py(使用了pyspark,Spark Python API)畦浓,功能與前述一致痹束。python版本同樣需要?jiǎng)?chuàng)建一個(gè)SparkContext,然后用它來創(chuàng)建RDDs讶请,之后向它傳遞lambda表示的函數(shù)参袱。如果應(yīng)用程序使用了第三方的庫,則需要我們將它們達(dá)成zip包秽梅,并在執(zhí)行spark-submit時(shí)添加--py-files選項(xiàng)抹蚀。在這個(gè)例子中,由于沒有依賴第三方庫企垦,因此我們可以直接提交應(yīng)用程序:
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--master local[4] \
SimpleApp.py
...
Lines with a: 46, Lines with b: 23
進(jìn)一步學(xué)習(xí)
如果你按照前文進(jìn)行了實(shí)踐环壤,那么恭喜你已經(jīng)成功運(yùn)行了你的第一個(gè)Spark應(yīng)用程序。接下來钞诡,你可以:
- 學(xué)習(xí)Spark programming guide以進(jìn)一步了解如果編寫更豐富的功能
- 想要了解如果在集群中提交應(yīng)用程序郑现,可以參考deployment overview
- 最后,Spark的安裝包也包含了一些實(shí)例荧降,位于example目錄(Scala, Java, Python, R)接箫,你可以像下面這樣執(zhí)行它們:
# For Scala and Java, use run-example:
./bin/run-example SparkPi
# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py
# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R