Spark編程快速入門

本教程基于Spark官網(wǎng)的快速入門教程改編而來盼忌,官方文檔和中文翻譯的傳送門見最下方。(注意,實(shí)際情況可能因?yàn)榘姹静煌仍蛴兴煌?/p>

如果對本文中的一些名詞感到疑惑目溉,可以看另外一篇入門文章:五分鐘大數(shù)據(jù):Spark入門

關(guān)鍵字:

使用 Spark Shell 進(jìn)行交互式分析


啟動(dòng)

Spark提供了一個(gè)學(xué)習(xí)API的簡單方式,就是Spark shell——同時(shí)也是一個(gè)強(qiáng)大數(shù)據(jù)交互式分析工具菱农$愿叮基于scala和python的Spark shell的啟動(dòng)方式如下:

Scala:./bin/spark-shell

Python:./bin/pyspark

注意,使用上面的方式登錄循未,實(shí)現(xiàn)要把目錄切換到Spark的安裝目錄下蛉腌;如果已經(jīng)配置好SPARK_HOME,那么也可以直接spark-shell或者pyspark啟動(dòng)

讀取文件

Spark 的主要抽象是一個(gè)稱為 Dataset 的分布式的 item 集合。Datasets 可以從 Hadoop 的 InputFormats(例如 HDFS文件)或者通過其它的 Datasets 轉(zhuǎn)換來創(chuàng)建烙丛。

從外部讀取文件

Scala:

scala>val textFile = spark.read.textFile("README.md")

Python:

>>> textFile = spark.read.text("README.md")

處理Dataset(1):統(tǒng)計(jì)含有“Spark”的行數(shù)

Scala版:

scala> textFile.count() //計(jì)數(shù)

scala> textFile.first() //顯示第一行

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark")) //過濾出所需列舅巷,transform操作返回一個(gè)新的Dataset

scala> textFile.filter(line => line.contains("Spark")).count() //鏈?zhǔn)讲僮鱰ransform和action,返回所有含有"Spark"的行數(shù)

Python版:

>>> textFile.count()?

>>> textFile.first()

>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark"))

>>> textFile.filter(textFile.value.contains("Spark")).count()

處理Dataset(2):統(tǒng)計(jì)一行中的單詞最大數(shù)

Scala版

scala>?textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)

//方法2:

scala>?import java.lang.Math

scala>?textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))

Python版

>>> from pyspark.sql.functions import *

>>> textFile.select(size(split(textFile.value, "\s+")).name("numWords")).agg(max(col("numWords"))).collect()

處理Dataset(3):實(shí)現(xiàn)mapreduce中的wordcount

Scala版

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()

scala> wordCounts.collect()

Python版

>>> wordCounts = textFile.select(explode(split(textFile.value, "\s+")).as("word")).groupBy("word").count()

>>> wordCounts.collect()

處理Dataset(4):使用緩存

因?yàn)镾park是基于內(nèi)存的運(yùn)算河咽,所以速度要比基于硬盤的MapReduce快很多钠右;為了高效使用內(nèi)存,Spark會(huì)將已經(jīng)使用過的空間回收忘蟹,但是有一些數(shù)據(jù)飒房,我們想要重復(fù)使用,這時(shí)候我們就可以用到緩存技術(shù)媚值,直接使用之前的計(jì)算結(jié)果狠毯。

Scala

scala> linesWithSpark.cache()

Python

>>> linesWithSpark.cache()

這里同時(shí)列出Scala和Python的實(shí)現(xiàn)方式,是為了大家更好的對比褥芒。需要注意的是嚼松,很多在Scala里的api,在python里都是不存在的锰扶,或者形式是完全不同的献酗,因?yàn)閜ython有自己的方法來處理。

獨(dú)立的應(yīng)用


學(xué)習(xí)完交互式探索之后坷牛,我們再來看一下獨(dú)立的Spark應(yīng)用如何去做罕偎。下面的demo的作用是統(tǒng)計(jì)一行中“a”“b”出現(xiàn)的次數(shù)。

代碼編寫

Scala版

/* SimpleApp.scala */

import org.apache.spark.sql.SparkSession

object SimpleApp {

? def main(args: Array[String]) {

? ? val logFile = "YOUR_SPARK_HOME/README.md"?

? ? val spark = SparkSession.builder.appName("Simple Application").getOrCreate() //首先需要建立與spark的連接

? ? val logData = spark.read.textFile(logFile).cache()

? ? val numAs = logData.filter(line => line.contains("a")).count()

? ? val numBs = logData.filter(line => line.contains("b")).count()

? ? println(s"Lines with a: $numAs, Lines with b: $numBs")

? ? spark.stop() //最后需要停止與spark的連接

? }

}

Python版

"""SimpleApp.py"""

from pyspark.sql import SparkSession

logFile = "YOUR_SPARK_HOME/README.md" ?

spark = SparkSession.builder().appName(appName).master(master).getOrCreate() #建立與spark的連接

logData = spark.read.text(logFile).cache()

numAs = logData.filter(logData.value.contains('a')).count()

numBs = logData.filter(logData.value.contains('b')).count()

print("Lines with a: %i, lines with b: %i" % (numAs, numBs))

spark.stop() ?#停止與spark的連接

任務(wù)提交

完成獨(dú)立程序的編程之后京闰,接下來就要將任務(wù)提交到spark來執(zhí)行颜及,執(zhí)行的方法如下:

# Scala 使用 run-example:

./bin/run-example SparkPi

# Python 使用 spark-submit:

./bin/spark-submit examples/src/main/python/pi.py


文集

Spark:理論與實(shí)踐

文章

五分鐘大數(shù)據(jù):Spark入門

Spark編程快速入門

Spark難點(diǎn)解析:Join實(shí)現(xiàn)原理

可視化發(fā)現(xiàn)Spark數(shù)據(jù)傾斜


補(bǔ)充資源

官方文檔:http://spark.apache.org/docs/latest/quick-start.html

官方文檔中文版:http://spark.apachecn.org/docs/cn/2.2.0/quick-start.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市蹂楣,隨后出現(xiàn)的幾起案子器予,更是在濱河造成了極大的恐慌,老刑警劉巖捐迫,帶你破解...
    沈念sama閱讀 218,858評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件乾翔,死亡現(xiàn)場離奇詭異,居然都是意外死亡施戴,警方通過查閱死者的電腦和手機(jī)反浓,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,372評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來赞哗,“玉大人雷则,你說我怎么就攤上這事》舅瘢” “怎么了月劈?”我有些...
    開封第一講書人閱讀 165,282評論 0 356
  • 文/不壞的土叔 我叫張陵度迂,是天一觀的道長。 經(jīng)常有香客問我猜揪,道長惭墓,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,842評論 1 295
  • 正文 為了忘掉前任而姐,我火速辦了婚禮腊凶,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘拴念。我一直安慰自己钧萍,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,857評論 6 392
  • 文/花漫 我一把揭開白布政鼠。 她就那樣靜靜地躺著风瘦,像睡著了一般。 火紅的嫁衣襯著肌膚如雪公般。 梳的紋絲不亂的頭發(fā)上万搔,一...
    開封第一講書人閱讀 51,679評論 1 305
  • 那天,我揣著相機(jī)與錄音俐载,去河邊找鬼蟹略。 笑死登失,一個(gè)胖子當(dāng)著我的面吹牛遏佣,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播揽浙,決...
    沈念sama閱讀 40,406評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼状婶,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了馅巷?” 一聲冷哼從身側(cè)響起膛虫,我...
    開封第一講書人閱讀 39,311評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎钓猬,沒想到半個(gè)月后稍刀,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,767評論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡敞曹,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年账月,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片澳迫。...
    茶點(diǎn)故事閱讀 40,090評論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡局齿,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出橄登,到底是詐尸還是另有隱情抓歼,我是刑警寧澤讥此,帶...
    沈念sama閱讀 35,785評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站谣妻,受9級特大地震影響萄喳,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜拌禾,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,420評論 3 331
  • 文/蒙蒙 一取胎、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧湃窍,春花似錦闻蛀、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,988評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至茵休,卻和暖如春薪棒,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背榕莺。 一陣腳步聲響...
    開封第一講書人閱讀 33,101評論 1 271
  • 我被黑心中介騙來泰國打工俐芯, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人钉鸯。 一個(gè)月前我還...
    沈念sama閱讀 48,298評論 3 372
  • 正文 我出身青樓吧史,卻偏偏與公主長得像,于是被迫代替她去往敵國和親唠雕。 傳聞我的和親對象是個(gè)殘疾皇子贸营,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,033評論 2 355