《Spark指南》一节值、快速開始

本文主要翻譯至鏈接且不局限于該文內(nèi)容,也加入了筆者實(shí)踐內(nèi)容榜聂,翻譯水平有限搞疗,歡迎指正,轉(zhuǎn)載請注明出處须肆。

本教程提供了使用Spark的快速介紹匿乃。 我們將首先通過Spark的交互式shell(在Python或Scala中)介紹部分API桩皿,然后演示如何使用Java,Scala和Python編寫應(yīng)用程序幢炸。 有關(guān)更完整的參考泄隔,請參閱編程指南

你可以先從Spark網(wǎng)站下載Spark的打包版本阳懂。 由于本文中我們不會使用HDFS梅尤,因此下載時(shí)不需要關(guān)注Hadoop的版本。

使用Spark Shell進(jìn)行交互式分析

基礎(chǔ)

Spark的shell環(huán)境提供了一個(gè)簡單的方法來學(xué)習(xí)API岩调,同時(shí)它也是一個(gè)強(qiáng)大的交互式分析數(shù)據(jù)的工具巷燥。 它可以在Scala(Scala在Java VM上運(yùn)行,因此可以方便的使用現(xiàn)有的Java庫)或Python中使用号枕。 (本文以Scala語言為例)通過在Spark目錄中運(yùn)行以下命令來啟動(dòng)它:

./bin/spark-shell

Spark上運(yùn)行的主要抽象是一個(gè)稱為RDD(Resilient Distributed Dataset缰揪,彈性分布式數(shù)據(jù)集)的集合,RDDs可以從Hadoop的InputFormats(例如HDFS文件)中創(chuàng)建葱淳,或者從其他的RDDs轉(zhuǎn)換钝腺。我們先用如下命令以Spark目錄下的README文件作為數(shù)據(jù)源創(chuàng)建一個(gè)RDD:

scala> val textFile = sc.textFile("README.md")
textFile: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:25

返回的RDDs很一些方法可以執(zhí)行,參考文檔1:actions文檔2:transformations赞厕,其中actions返回普通的值艳狐,transformations返回新的RDD。例如皿桑,下面是兩個(gè)action:

scala> textFile.count() // RDD中有多少行數(shù)
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs

scala> textFile.first() // RDD的第一行
res1: String = # Apache Spark

下面這個(gè)例子使用filter轉(zhuǎn)換毫目,并返回一個(gè)新的RDD,它是README文件的一個(gè)子集:

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:27

我們也可以鏈?zhǔn)秸{(diào)用這些方法:

scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15

更多的一些RDD操作

RDD的actions和transformations可以用來執(zhí)行更復(fù)雜的運(yùn)算诲侮,例如我們想找出出現(xiàn)最多的單詞:

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15

第一行創(chuàng)建了一個(gè)新的RDD镀虐,然后將每一行映射到一個(gè)整數(shù)值。reduce函數(shù)鏈?zhǔn)教幚碓揜DD并找到最大行計(jì)數(shù)沟绪。 其中map和reduce的參數(shù)是Scala中的語法(閉包)刮便,這里也可以使用任何Scala / Java語言的其他特性或庫。 例如绽慈,下面的例子中恨旱,我們使用Math.max()函數(shù)來使這段代碼更容易理解:

scala> import java.lang.Math
import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15

一種常見的數(shù)據(jù)流模式是MapReduce,是從Hadoop流行起來的坝疼。 Spark可以輕松實(shí)現(xiàn)MapReduce流程:

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[8] at reduceByKey at <console>:28

在這里搜贤,我們將flatMap,map和reduceByKey命令結(jié)合起來裙士,作為(String入客,Int)對的RDD來計(jì)算文件中的每個(gè)字計(jì)數(shù)管毙。 要在我們的shell中收集字?jǐn)?shù)腿椎,我們可以使用collect操作:

scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)

緩存

Spark也允許對處理中的數(shù)據(jù)集進(jìn)行緩存桌硫,數(shù)據(jù)可以緩存在集群范圍內(nèi)的節(jié)點(diǎn)內(nèi)存中,以便可以對一些“熱數(shù)據(jù)”快速訪問啃炸。示例代碼如下:

scala> linesWithSpark.cache()
res7: linesWithSpark.type = MapPartitionsRDD[2] at filter at <console>:27

scala> linesWithSpark.count()
res8: Long = 15

scala> linesWithSpark.count()
res9: Long = 15

Spark的功能絕對不僅限于處理這種只有幾百行的小數(shù)據(jù)铆隘,更具有吸引力的是所有這些函數(shù)都支持在超大規(guī)模的數(shù)據(jù)集上工作,即使這些數(shù)據(jù)分布在數(shù)十或數(shù)百個(gè)節(jié)點(diǎn)上南用。你可以通過bin/spark-shell腳本連接的Spark集群中操作這些數(shù)據(jù)膀钠,詳細(xì)的描述請參考編程指南

自包含應(yīng)用程序

假設(shè)我們想要使用Spark API寫一段自包含的應(yīng)用程序裹虫,下面依次看幾段示例代碼:

Scala(使用sbt構(gòu)建)

/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println(s"Lines with a: $numAs, Lines with b: $numBs")
    sc.stop()
  }
}

上面這個(gè)例子創(chuàng)建了一個(gè)名為SimpleApp.scala的文件肿嘲。注意,應(yīng)用程序應(yīng)該定義一個(gè)main()方法筑公,而不是繼承scala.App雳窟,繼承的這種方式可能無法正常工作。

該程序?qū)崿F(xiàn)的功能是計(jì)算Spark README文件中包含字符‘a(chǎn)’的行數(shù)和包含字符‘b’的行數(shù)匣屡。如果要執(zhí)行這個(gè)程序封救,請?zhí)鎿Q正確的YOUR_SPARK_HOME路徑。與前面的Spark shell初始化自己的SparkContext的例子不同捣作,這里我們需要手動(dòng)初始化一個(gè)SparkContext誉结。程序的配置信息則通過一個(gè)SparkConf對象傳遞給SparkContext的構(gòu)造器。

我們的程序依賴Spark API券躁,因此我們需要準(zhǔn)備一個(gè)sbt的配置文件惩坑,simple.sbt,它將描述Spark是程序的依賴項(xiàng)嘱朽。這個(gè)文件也添加了一個(gè)Spark依賴的存儲庫:

name := "Simple Project"

version := "1.0"

scalaVersion := "2.11.7"

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"

為了使sbt正常工作旭贬,我們按正常的代碼目錄格式分布文件SimpleApp.scala和simple.sbt,完成后搪泳,我們就可以將該應(yīng)用程序打包成一個(gè)jar文件稀轨,然后使用spark-submit腳本提交到Spark執(zhí)行。

# Your directory layout should look like this
$ find .
.
./simple.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala

# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.11/simple-project_2.11-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/scala-2.11/simple-project_2.11-1.0.jar
...
Lines with a: 46, Lines with b: 23

Java(使用Maven構(gòu)建)

/* SimpleApp.java */
import org.apache.spark.api.java.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;

public class SimpleApp {
  public static void main(String[] args) {
    String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
    SparkConf conf = new SparkConf().setAppName("Simple Application");
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaRDD<String> logData = sc.textFile(logFile).cache();

    long numAs = logData.filter(new Function<String, Boolean>() {
      public Boolean call(String s) { return s.contains("a"); }
    }).count();

    long numBs = logData.filter(new Function<String, Boolean>() {
      public Boolean call(String s) { return s.contains("b"); }
    }).count();

    System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
    
    sc.stop();
  }
}

這個(gè)列子使用了Maven來編譯和構(gòu)建一個(gè)名為SimpleApp.java的JAR程序岸军,它實(shí)現(xiàn)的功能和上文Scala一致奋刽。你也可以使用其他任意可行的構(gòu)建系統(tǒng)。

與上文Scala一樣艰赞,我們需要初始化一個(gè)SparkContext對象佣谐,上文的例子中使用了一個(gè)更友好的JavaSparkContext對象,然后創(chuàng)建了RDDs對象(即JavaRDD)并在他們上執(zhí)行了transformations方法方妖。最后狭魂,我們給Spark傳遞了繼承至spark.api.java.function.Function的匿名類來執(zhí)行作業(yè)。更詳細(xì)的功能請參考Spark編程指南

為了構(gòu)建這個(gè)程序雌澄,我們需要編寫一個(gè)pom.xml文件并添加Spark作為依賴項(xiàng)斋泄,注意,Spark的artifacts使用了Scala的版本標(biāo)記(2.11表示的是scala的版本)镐牺。

<project>
  <groupId>edu.berkeley</groupId>
  <artifactId>simple-project</artifactId>
  <modelVersion>4.0.0</modelVersion>
  <name>Simple Project</name>
  <packaging>jar</packaging>
  <version>1.0</version>
  <dependencies>
    <dependency> <!-- Spark dependency -->
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.1.0</version>
    </dependency>
  </dependencies>
</project>

按目錄組織這些文件炫掐,如:

$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java

然后,使用maven命令進(jìn)行編譯和構(gòu)建睬涧,之后就可以使用spark-submit腳本提交到Spark上執(zhí)行:

# Package a JAR containing your application
$ mvn package
...
[INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/simple-project-1.0.jar
...
Lines with a: 46, Lines with b: 23

Python

"""SimpleApp.py"""
from pyspark import SparkContext

logFile = "YOUR_SPARK_HOME/README.md"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()

print("Lines with a: %i, lines with b: %i" % (numAs, numBs))

sc.stop()

類似的募胃,python版本創(chuàng)建了一個(gè)SimpleApp.py(使用了pyspark,Spark Python API)畦浓,功能與前述一致痹束。python版本同樣需要?jiǎng)?chuàng)建一個(gè)SparkContext,然后用它來創(chuàng)建RDDs讶请,之后向它傳遞lambda表示的函數(shù)参袱。如果應(yīng)用程序使用了第三方的庫,則需要我們將它們達(dá)成zip包秽梅,并在執(zhí)行spark-submit時(shí)添加--py-files選項(xiàng)抹蚀。在這個(gè)例子中,由于沒有依賴第三方庫企垦,因此我們可以直接提交應(yīng)用程序:

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --master local[4] \
  SimpleApp.py
...
Lines with a: 46, Lines with b: 23

進(jìn)一步學(xué)習(xí)

如果你按照前文進(jìn)行了實(shí)踐环壤,那么恭喜你已經(jīng)成功運(yùn)行了你的第一個(gè)Spark應(yīng)用程序。接下來钞诡,你可以:

  • 學(xué)習(xí)Spark programming guide以進(jìn)一步了解如果編寫更豐富的功能
  • 想要了解如果在集群中提交應(yīng)用程序郑现,可以參考deployment overview
  • 最后,Spark的安裝包也包含了一些實(shí)例荧降,位于example目錄(Scala, Java, Python, R)接箫,你可以像下面這樣執(zhí)行它們:
# For Scala and Java, use run-example:
./bin/run-example SparkPi

# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py

# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R

相關(guān)的文章

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末废累,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子脱盲,更是在濱河造成了極大的恐慌邑滨,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,126評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件钱反,死亡現(xiàn)場離奇詭異掖看,居然都是意外死亡匣距,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評論 2 382
  • 文/潘曉璐 我一進(jìn)店門哎壳,熙熙樓的掌柜王于貴愁眉苦臉地迎上來墨礁,“玉大人,你說我怎么就攤上這事耳峦。” “怎么了焕毫?”我有些...
    開封第一講書人閱讀 152,445評論 0 341
  • 文/不壞的土叔 我叫張陵蹲坷,是天一觀的道長。 經(jīng)常有香客問我邑飒,道長循签,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,185評論 1 278
  • 正文 為了忘掉前任疙咸,我火速辦了婚禮县匠,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘撒轮。我一直安慰自己乞旦,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,178評論 5 371
  • 文/花漫 我一把揭開白布题山。 她就那樣靜靜地躺著兰粉,像睡著了一般。 火紅的嫁衣襯著肌膚如雪顶瞳。 梳的紋絲不亂的頭發(fā)上玖姑,一...
    開封第一講書人閱讀 48,970評論 1 284
  • 那天,我揣著相機(jī)與錄音慨菱,去河邊找鬼焰络。 笑死,一個(gè)胖子當(dāng)著我的面吹牛符喝,可吹牛的內(nèi)容都是我干的闪彼。 我是一名探鬼主播,決...
    沈念sama閱讀 38,276評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼协饲,長吁一口氣:“原來是場噩夢啊……” “哼备蚓!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起囱稽,我...
    開封第一講書人閱讀 36,927評論 0 259
  • 序言:老撾萬榮一對情侶失蹤郊尝,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后战惊,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體流昏,經(jīng)...
    沈念sama閱讀 43,400評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,883評論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了况凉。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片谚鄙。...
    茶點(diǎn)故事閱讀 37,997評論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖刁绒,靈堂內(nèi)的尸體忽然破棺而出闷营,到底是詐尸還是另有隱情,我是刑警寧澤知市,帶...
    沈念sama閱讀 33,646評論 4 322
  • 正文 年R本政府宣布傻盟,位于F島的核電站,受9級特大地震影響嫂丙,放射性物質(zhì)發(fā)生泄漏娘赴。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,213評論 3 307
  • 文/蒙蒙 一跟啤、第九天 我趴在偏房一處隱蔽的房頂上張望诽表。 院中可真熱鬧,春花似錦隅肥、人聲如沸竿奏。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽议双。三九已至,卻和暖如春捉片,著一層夾襖步出監(jiān)牢的瞬間平痰,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評論 1 260
  • 我被黑心中介騙來泰國打工伍纫, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留宗雇,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,423評論 2 352
  • 正文 我出身青樓莹规,卻偏偏與公主長得像赔蒲,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子良漱,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,722評論 2 345

推薦閱讀更多精彩內(nèi)容