本教程提供了如何使用 Spark 的快速入門介紹箫攀。首先通過運(yùn)行 Spark 交互式的 shell(在 Python 或 Scala 中)來介紹 API, 然后展示如何使用 Java , Scala 和 Python 來編寫應(yīng)用程序。
為了繼續(xù)閱讀本指南, 首先從Spark 官網(wǎng)下載 Spark 的發(fā)行包紧索。因?yàn)槲覀儗⒉皇褂?HDFS, 所以你可以下載一個任何 Hadoop 版本的軟件包。
請注意, 在 Spark 2.0 之前, Spark 的主要編程接口是彈性分布式數(shù)據(jù)集(RDD)。 在 Spark 2.0 之后, RDD 被 Dataset 替換, 它是像RDD 一樣的 strongly-typed(強(qiáng)類型), 但是在引擎蓋下更加優(yōu)化狠裹。 RDD 接口仍然受支持, 您可以在RDD 編程指南中獲得更完整的參考胡岔。 但是, 我們強(qiáng)烈建議您切換到使用 Dataset(數(shù)據(jù)集), 其性能要更優(yōu)于 RDD漓穿。 請參閱SQL 編程指南獲取更多有關(guān) Dataset 的信息。
Spark shell 提供了一種來學(xué)習(xí)該 API 比較簡單的方式, 以及一個強(qiáng)大的來分析數(shù)據(jù)交互的工具残黑。在 Scala(運(yùn)行于 Java 虛擬機(jī)之上, 并能很好的調(diào)用已存在的 Java 類庫)或者 Python 中它是可用的馍佑。通過在 Spark 目錄中運(yùn)行以下的命令來啟動它:
./bin/spark-shell
Spark 的主要抽象是一個稱為 Dataset 的分布式的 item 集合。Datasets 可以從 Hadoop 的 InputFormats(例如 HDFS文件)或者通過其它的 Datasets 轉(zhuǎn)換來創(chuàng)建梨水。讓我們從 Spark 源目錄中的 README 文件來創(chuàng)建一個新的 Dataset:
scala>valtextFile=spark.read.textFile("README.md")textFile:org.apache.spark.sql.Dataset[String]=[value:string]
您可以直接從 Dataset 中獲取 values(值), 通過調(diào)用一些 actions(動作), 或者 transform(轉(zhuǎn)換)Dataset 以獲得一個新的拭荤。更多細(xì)節(jié), 請參閱API doc。
scala>textFile.count()// Number of items in this Datasetres0:Long=126// May be different from yours as README.md will change over time, similar to other outputsscala>textFile.first()// First item in this Datasetres1:String=#ApacheSpark
現(xiàn)在讓我們 transform 這個 Dataset 以獲得一個新的疫诽。我們調(diào)用filter以返回一個新的 Dataset, 它是文件中的 items 的一個子集舅世。
scala>vallinesWithSpark=textFile.filter(line=>line.contains("Spark"))linesWithSpark:org.apache.spark.sql.Dataset[String]=[value:string]
我們可以鏈?zhǔn)讲僮?transformation(轉(zhuǎn)換)和 action(動作):
scala>textFile.filter(line=>line.contains("Spark")).count()// How many lines contain "Spark"?res3:Long=15
Dataset actions(操作)和 transformations(轉(zhuǎn)換)可以用于更復(fù)雜的計(jì)算。例如, 統(tǒng)計(jì)出現(xiàn)次數(shù)最多的單詞 :
scala>textFile.map(line=>line.split(" ").size).reduce((a,b)=>if(a>b)aelseb)res4:Long=15
第一個 map 操作創(chuàng)建一個新的 Dataset, 將一行數(shù)據(jù) map 為一個整型值奇徒。在 Dataset 上調(diào)用reduce來找到最大的行計(jì)數(shù)雏亚。參數(shù)map與reduce是 Scala 函數(shù)(closures), 并且可以使用 Scala/Java 庫的任何語言特性。例如, 我們可以很容易地調(diào)用函數(shù)聲明, 我們將定義一個 max 函數(shù)來使代碼更易于理解 :
scala>importjava.lang.Mathimportjava.lang.Mathscala>textFile.map(line=>line.split(" ").size).reduce((a,b)=>Math.max(a,b))res5:Int=15
一種常見的數(shù)據(jù)流模式是被 Hadoop 所推廣的 MapReduce摩钙。Spark 可以很容易實(shí)現(xiàn) MapReduce:
scala>valwordCounts=textFile.flatMap(line=>line.split(" ")).groupByKey(identity).count()wordCounts:org.apache.spark.sql.Dataset[(String,Long)]=[value:string,count(1):bigint]
在這里, 我們調(diào)用了flatMap以 transform 一個 lines 的 Dataset 為一個 words 的 Dataset, 然后結(jié)合groupByKey和count來計(jì)算文件中每個單詞的 counts 作為一個 (String, Long) 的 Dataset pairs罢低。要在 shell 中收集 word counts, 我們可以調(diào)用collect:
scala>wordCounts.collect()res6:Array[(String,Int)]=Array((means,1),(under,2),(this,3),(Because,1),(Python,2),(agree,1),(cluster.,1),...)
Spark 還支持 Pulling(拉取)數(shù)據(jù)集到一個群集范圍的內(nèi)存緩存中腺律。例如當(dāng)查詢一個小的 “hot” 數(shù)據(jù)集或運(yùn)行一個像 PageRANK 這樣的迭代算法時, 在數(shù)據(jù)被重復(fù)訪問時是非常高效的奕短。舉一個簡單的例子, 讓我們標(biāo)記我們的linesWithSpark數(shù)據(jù)集到緩存中:
scala>linesWithSpark.cache()res7:linesWithSpark.type=[value:string]scala>linesWithSpark.count()res8:Long=15scala>linesWithSpark.count()res9:Long=15
使用 Spark 來探索和緩存一個 100 行的文本文件看起來比較愚蠢。有趣的是, 即使在他們跨越幾十或者幾百個節(jié)點(diǎn)時, 這些相同的函數(shù)也可以用于非常大的數(shù)據(jù)集匀钧。您也可以像編程指南. 中描述的一樣通過連接bin/spark-shell到集群中, 使用交互式的方式來做這件事情。
假設(shè)我們希望使用 Spark API 來創(chuàng)建一個獨(dú)立的應(yīng)用程序谬返。我們在 Scala(SBT), Java(Maven)和 Python 中練習(xí)一個簡單應(yīng)用程序之斯。
我們將在 Scala 中創(chuàng)建一個非常簡單的 Spark 應(yīng)用程序 - 很簡單的, 事實(shí)上, 它名為SimpleApp.scala:
/* SimpleApp.scala */importorg.apache.spark.sql.SparkSessionobjectSimpleApp{defmain(args:Array[String]){vallogFile="YOUR_SPARK_HOME/README.md"http:// Should be some file on your systemvalspark=SparkSession.builder.appName("Simple Application").getOrCreate()vallogData=spark.read.textFile(logFile).cache()valnumAs=logData.filter(line=>line.contains("a")).count()valnumBs=logData.filter(line=>line.contains("b")).count()println(s"Lines with a:$numAs, Lines with b:$numBs")spark.stop()}}
注意, 這個應(yīng)用程序我們應(yīng)該定義一個main()方法而不是去擴(kuò)展scala.App。使用scala.App的子類可能不會正常運(yùn)行遣铝。
該程序僅僅統(tǒng)計(jì)了 Spark README 文件中每一行包含 ‘a(chǎn)’ 的數(shù)量和包含 ‘b’ 的數(shù)量佑刷。注意, 您需要將 YOUR_SPARK_HOME 替換為您 Spark 安裝的位置莉擒。不像先前使用 spark shell 操作的示例, 它們初始化了它們自己的 SparkContext, 我們初始化了一個 SparkContext 作為應(yīng)用程序的一部分。
我們調(diào)用SparkSession.builder以構(gòu)造一個 [[SparkSession]], 然后設(shè)置 application name(應(yīng)用名稱), 最終調(diào)用getOrCreate以獲得 [[SparkSession]] 實(shí)例瘫絮。
我們的應(yīng)用依賴了 Spark API, 所以我們將包含一個名為build.sbt的 sbt 配置文件, 它描述了 Spark 的依賴涨冀。該文件也會添加一個 Spark 依賴的 repository:
name:="Simple Project"version:="1.0"scalaVersion:="2.11.8"libraryDependencies+="org.apache.spark"%%"spark-sql"%"2.2.0"
為了讓 sbt 正常的運(yùn)行, 我們需要根據(jù)經(jīng)典的目錄結(jié)構(gòu)來布局SimpleApp.scala和build.sbt文件。在成功后, 我們可以創(chuàng)建一個包含應(yīng)用程序代碼的 JAR 包, 然后使用spark-submit腳本來運(yùn)行我們的程序麦萤。
# Your directory layout should look like this$ find .../build.sbt./src./src/main./src/main/scala./src/main/scala/SimpleApp.scala# Package a jar containing your application$ sbt package...[info]Packaging{..}/{..}/target/scala-2.11/simple-project_2.11-1.0.jar# Use spark-submit to run your application$ YOUR_SPARK_HOME/bin/spark-submit\--class"SimpleApp"\--master local[4]\target/scala-2.11/simple-project_2.11-1.0.jar...Lines with a:46, Lines with b:23
恭喜您成功的運(yùn)行了您的第一個 Spark 應(yīng)用程序鹿鳖!
更多 API 的深入概述, 從RDD programming guide和SQL programming guide這里開始, 或者看看 “編程指南” 菜單中的其它組件。
為了在集群上運(yùn)行應(yīng)用程序, 請前往deployment overview.
最后, 在 Spark 的examples目錄中包含了一些 (Scala,Java,Python,R) 示例壮莹。您可以按照如下方式來運(yùn)行它們:
# 針對 Scala 和 Java, 使用 run-example:./bin/run-example SparkPi# 針對 Python 示例, 直接使用 spark-submit:./bin/spark-submit examples/src/main/python/pi.py# 針對 R 示例, 直接使用 spark-submit:./bin/spark-submit examples/src/main/r/dataframe.R
原文地址:?http://spark.apachecn.org/docs/cn/2.2.0/quick-start.html
網(wǎng)頁地址:?http://spark.apachecn.org/
github:?https://github.com/apachecn/spark-doc-zh(覺得不錯麻煩給個?Star翅帜,謝謝!~)