本教程基于Spark官網(wǎng)的快速入門教程改編而來盼忌,官方文檔和中文翻譯的傳送門見最下方。(注意,實(shí)際情況可能因?yàn)榘姹静煌仍蛴兴煌?/p>
如果對本文中的一些名詞感到疑惑目溉,可以看另外一篇入門文章:五分鐘大數(shù)據(jù):Spark入門
關(guān)鍵字:
使用 Spark Shell 進(jìn)行交互式分析
啟動(dòng)
Spark提供了一個(gè)學(xué)習(xí)API的簡單方式,就是Spark shell——同時(shí)也是一個(gè)強(qiáng)大數(shù)據(jù)交互式分析工具菱农$愿叮基于scala和python的Spark shell的啟動(dòng)方式如下:
Scala:./bin/spark-shell
Python:./bin/pyspark
注意,使用上面的方式登錄循未,實(shí)現(xiàn)要把目錄切換到Spark的安裝目錄下蛉腌;如果已經(jīng)配置好SPARK_HOME,那么也可以直接spark-shell或者pyspark啟動(dòng)
讀取文件
Spark 的主要抽象是一個(gè)稱為 Dataset 的分布式的 item 集合。Datasets 可以從 Hadoop 的 InputFormats(例如 HDFS文件)或者通過其它的 Datasets 轉(zhuǎn)換來創(chuàng)建烙丛。
從外部讀取文件
Scala:
scala>val textFile = spark.read.textFile("README.md")
Python:
>>> textFile = spark.read.text("README.md")
處理Dataset(1):統(tǒng)計(jì)含有“Spark”的行數(shù)
Scala版:
scala> textFile.count() //計(jì)數(shù)
scala> textFile.first() //顯示第一行
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark")) //過濾出所需列舅巷,transform操作返回一個(gè)新的Dataset
scala> textFile.filter(line => line.contains("Spark")).count() //鏈?zhǔn)讲僮鱰ransform和action,返回所有含有"Spark"的行數(shù)
Python版:
>>> textFile.count()?
>>> textFile.first()
>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark"))
>>> textFile.filter(textFile.value.contains("Spark")).count()
處理Dataset(2):統(tǒng)計(jì)一行中的單詞最大數(shù)
Scala版
scala>?textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
//方法2:
scala>?import java.lang.Math
scala>?textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
Python版
>>> from pyspark.sql.functions import *
>>> textFile.select(size(split(textFile.value, "\s+")).name("numWords")).agg(max(col("numWords"))).collect()
處理Dataset(3):實(shí)現(xiàn)mapreduce中的wordcount
Scala版
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
scala> wordCounts.collect()
Python版
>>> wordCounts = textFile.select(explode(split(textFile.value, "\s+")).as("word")).groupBy("word").count()
>>> wordCounts.collect()
處理Dataset(4):使用緩存
因?yàn)镾park是基于內(nèi)存的運(yùn)算河咽,所以速度要比基于硬盤的MapReduce快很多钠右;為了高效使用內(nèi)存,Spark會(huì)將已經(jīng)使用過的空間回收忘蟹,但是有一些數(shù)據(jù)飒房,我們想要重復(fù)使用,這時(shí)候我們就可以用到緩存技術(shù)媚值,直接使用之前的計(jì)算結(jié)果狠毯。
Scala
scala> linesWithSpark.cache()
Python
>>> linesWithSpark.cache()
這里同時(shí)列出Scala和Python的實(shí)現(xiàn)方式,是為了大家更好的對比褥芒。需要注意的是嚼松,很多在Scala里的api,在python里都是不存在的锰扶,或者形式是完全不同的献酗,因?yàn)閜ython有自己的方法來處理。
獨(dú)立的應(yīng)用
學(xué)習(xí)完交互式探索之后坷牛,我們再來看一下獨(dú)立的Spark應(yīng)用如何去做罕偎。下面的demo的作用是統(tǒng)計(jì)一行中“a”“b”出現(xiàn)的次數(shù)。
代碼編寫
Scala版
/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession
object SimpleApp {
? def main(args: Array[String]) {
? ? val logFile = "YOUR_SPARK_HOME/README.md"?
? ? val spark = SparkSession.builder.appName("Simple Application").getOrCreate() //首先需要建立與spark的連接
? ? val logData = spark.read.textFile(logFile).cache()
? ? val numAs = logData.filter(line => line.contains("a")).count()
? ? val numBs = logData.filter(line => line.contains("b")).count()
? ? println(s"Lines with a: $numAs, Lines with b: $numBs")
? ? spark.stop() //最后需要停止與spark的連接
? }
}
Python版
"""SimpleApp.py"""
from pyspark.sql import SparkSession
logFile = "YOUR_SPARK_HOME/README.md" ?
spark = SparkSession.builder().appName(appName).master(master).getOrCreate() #建立與spark的連接
logData = spark.read.text(logFile).cache()
numAs = logData.filter(logData.value.contains('a')).count()
numBs = logData.filter(logData.value.contains('b')).count()
print("Lines with a: %i, lines with b: %i" % (numAs, numBs))
spark.stop() ?#停止與spark的連接
任務(wù)提交
完成獨(dú)立程序的編程之后京闰,接下來就要將任務(wù)提交到spark來執(zhí)行颜及,執(zhí)行的方法如下:
# Scala 使用 run-example:
./bin/run-example SparkPi
# Python 使用 spark-submit:
./bin/spark-submit examples/src/main/python/pi.py
文集
文章
Spark難點(diǎn)解析:Join實(shí)現(xiàn)原理
可視化發(fā)現(xiàn)Spark數(shù)據(jù)傾斜
補(bǔ)充資源
官方文檔:http://spark.apache.org/docs/latest/quick-start.html
官方文檔中文版:http://spark.apachecn.org/docs/cn/2.2.0/quick-start.html