Apache Spark 簡(jiǎn)介
Apache Spark 是什么
Apache Spark是一個(gè)分布式計(jì)算框架蜀涨,旨在簡(jiǎn)化運(yùn)行于計(jì)算機(jī)集群上的并行程序的編寫(xiě)瞎嬉。該框架對(duì)資源調(diào)度,任務(wù)的提交勉盅、執(zhí)行和跟蹤佑颇,節(jié)點(diǎn)間的通信以及數(shù)據(jù)并行處理的內(nèi)在底層操作都進(jìn)行了抽象。它提供了一個(gè)更高級(jí)別的API用于處理分布式數(shù)據(jù)草娜。下面的引用是Apache Spark自己的說(shuō)明挑胸。
Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.
Apache Spark 起源
Spark起源于加利福利亞大學(xué)伯克利分校的一個(gè)研究項(xiàng)目。學(xué)校當(dāng)時(shí)關(guān)注分布式機(jī)器學(xué)習(xí)算法的應(yīng)用情況宰闰。因此茬贵,Spark從一開(kāi)始便為應(yīng)對(duì)迭代式應(yīng)用的高性能需求而設(shè)計(jì)簿透。在這類應(yīng)用中,相同的數(shù)據(jù)會(huì)被多次訪問(wèn)解藻。該設(shè)計(jì)主要靠利用數(shù)據(jù)集內(nèi)存緩存以及啟動(dòng)任務(wù)時(shí)的低延遲和低系統(tǒng)開(kāi)銷來(lái)實(shí)現(xiàn)高性能老充。再加上其容錯(cuò)性、靈活的分布式數(shù)據(jù)結(jié)構(gòu)和強(qiáng)大的函數(shù)式編程接口螟左,Spark在各類基于機(jī)器學(xué)習(xí)和迭代分析的大規(guī)模數(shù)據(jù)處理任務(wù)上有廣泛的應(yīng)用啡浊,這也表明了其實(shí)用性。
Apache Spark 運(yùn)行模式
Apache Spark共支持四種運(yùn)行模式胶背,每種模式各有其特點(diǎn)巷嚣,為了方便起見(jiàn),本文基于的運(yùn)行模式為本地單機(jī)模式钳吟。
- 本地單機(jī)模式:所有Spark進(jìn)程都運(yùn)行在同一個(gè)Java虛擬機(jī)(Java Vitural Machine廷粒,JVM)中
- 集群?jiǎn)螜C(jī)模式:使用Spark自己內(nèi)置的任務(wù)調(diào)度框架
- 基于Mesos:Mesos是一個(gè)流行的開(kāi)源集群計(jì)算框架
- 基于YARN:即Hadoop 2,它是一個(gè)與Hadoop關(guān)聯(lián)的集群計(jì)算和資源調(diào)度框架
Apache Spark 環(huán)境搭建
Spark能通過(guò)內(nèi)置的單機(jī)集群調(diào)度器來(lái)在本地運(yùn)行红且。此時(shí)坝茎,所有的Spark進(jìn)程運(yùn)行在同一個(gè)Java虛擬機(jī)中。這實(shí)際上構(gòu)造了一個(gè)獨(dú)立暇番、多線程版本的Spark環(huán)境嗤放。本地模式很適合程序的原型設(shè)計(jì)、開(kāi)發(fā)奔誓、調(diào)試及測(cè)試斤吐。同樣,它也適應(yīng)于在單機(jī)上進(jìn)行多核并行計(jì)算的實(shí)際場(chǎng)景厨喂。
本地構(gòu)建Spark環(huán)境的第一步是下載其版本包, 本文以spark-1.6.1-bin-hadoop2.4.tgz為例進(jìn)行安裝演示。下載完上述版本包后庄呈,解壓蜕煌,并在終端進(jìn)入解壓時(shí)新建的主目錄。Spark的運(yùn)行依賴Scala編程語(yǔ)言诬留,好在預(yù)編譯的二進(jìn)制包中已包含Scala運(yùn)行環(huán)境斜纪,我們不需要另外安裝Scala便可運(yùn)行Spark。但是文兑,JRE(Java運(yùn)行時(shí)環(huán)境)或JDK(Java開(kāi)發(fā)套件)是要安裝的盒刚。
>tar xfvz spark-1.6.1-bin-hadoop2.4.tgz
>cd spark-1.6.1-bin-hadoop2.4
用戶運(yùn)行Spark的腳本在該目錄的bin目錄下。我們可以運(yùn)行Spark附帶的一個(gè)示例程序來(lái)測(cè)試是否一切正常:
>./bin/run-example org.apache.spark.examples.SparkPi
該命令將在本地單機(jī)模式下執(zhí)行SparkPi這個(gè)示例绿贞。在該模式下因块,所有的Spark進(jìn)程均運(yùn)行于同一個(gè)JVM中,而并行處理則通過(guò)多線程來(lái)實(shí)現(xiàn)籍铁。默認(rèn)情況下涡上,該示例會(huì)啟用與本地系統(tǒng)的CPU核心數(shù)目相同的線程趾断。示例運(yùn)行完,應(yīng)可在輸出的結(jié)尾看到類似如下的提示,
Pi is roughly 3.14248
Apache Spark 基本概念
前一部分介紹了Apache Spark的安裝過(guò)程吩愧,接下來(lái)我們一起體驗(yàn)下在Spark上編程的樂(lè)趣芋酌。就像之前介紹的,Spark支持多種編程語(yǔ)言雁佳,包括Java脐帝、Scala、Python 和 R等糖权。接下來(lái)首先介紹下Spark的編程模型腮恩,然后通過(guò)使用這四種不同的語(yǔ)言來(lái)演示Spark的編程運(yùn)行過(guò)程。
Spark 編程模型
任何Spark程序的編寫(xiě)都是從SparkContext(或用Java編寫(xiě)時(shí)的JavaSparkContext)開(kāi)始的,SparkContext的初始化需要一個(gè)SparkConf對(duì)象温兼,后者包含了Spark集群配置的各種參數(shù)(比如主節(jié)點(diǎn)的URL)秸滴。初始化后,我們便可用SparkContext對(duì)象所包含的各種方法來(lái)創(chuàng)建和操作分布式數(shù)據(jù)集和共享變量募判。Spark shell(在Scala和Python下可以荡含,但不支持Java)能自動(dòng)完成上述初始化。
Spark Shell
Spark支持用Scala或Python REPL(Read-Eval-Print-Loop届垫,即交互式shell)來(lái)進(jìn)行交互式的程序編寫(xiě)释液。由于輸入的代碼會(huì)被立即計(jì)算,shell能在輸入代碼時(shí)給出實(shí)時(shí)反饋装处。在Scala shell里误债,命令執(zhí)行結(jié)果的值與類型在代碼執(zhí)行完后也會(huì)顯示出來(lái)。
要想通過(guò)Scala來(lái)使用Spark shell妄迁,只需從Spark的主目錄執(zhí)行./bin/spark-shell寝蹈。它會(huì)啟動(dòng)Scala shell并初始化一個(gè)SparkContext對(duì)象。我們可以通過(guò)sc這個(gè)Scala值來(lái)調(diào)用這個(gè)對(duì)象登淘。
要想在Python shell中使用Spark箫老,直接運(yùn)行./bin/pyspark命令即可。與Scala shell類似黔州, Python下的SparkContext對(duì)象可以通過(guò)Python變量sc來(lái)調(diào)用耍鬓。
彈性分布式數(shù)據(jù)集(RDD)
上文提到的分布式數(shù)據(jù)集其實(shí)就是指RDD。RDD(Resilient Distributed Dataset流妻,彈性分布式數(shù)據(jù)集)是Spark的核心概念之一牲蜀。一個(gè)RDD代表一系列的“記錄”(嚴(yán)格來(lái)說(shuō),某種類型的對(duì)象)绅这。這些記錄被分配或分區(qū)到一個(gè)集群的多個(gè)節(jié)點(diǎn)上(在本地模式下涣达,可以類似地理解為單個(gè)進(jìn)程里的多個(gè)線程上)。Spark中的RDD具備容錯(cuò)性,即當(dāng)某個(gè)節(jié)點(diǎn)或任務(wù)失敗時(shí)(因非用戶代碼錯(cuò)誤的原因而引起峭判,如硬件故障开缎、網(wǎng)絡(luò)不通等),RDD會(huì)在余下的節(jié)點(diǎn)上自動(dòng)重建林螃,以便任務(wù)能最終完成奕删。
創(chuàng)建RDD后,我們便有了一個(gè)可供操作的分布式記錄集疗认。在Spark編程模式下完残,所有的操作被分為轉(zhuǎn)換(transformation)和執(zhí)行(action)兩種。一般來(lái)說(shuō)横漏,轉(zhuǎn)換操作是對(duì)一個(gè)數(shù)據(jù)集里的所有記錄執(zhí)行某種函數(shù)谨设,從而使記錄發(fā)生改變;而執(zhí)行通常是運(yùn)行某些計(jì)算或聚合操作缎浇,并將結(jié)果返回運(yùn)行SparkContext的那個(gè)驅(qū)動(dòng)程序扎拣。
Apache Spark 編程入門
下面我們通過(guò)依次用Java、Python等種語(yǔ)言來(lái)編寫(xiě)一個(gè)簡(jiǎn)單的Spark數(shù)據(jù)處理程序素跺。假設(shè)一存在一個(gè)名為UserPurchaseHistory.csv的文件二蓝,內(nèi)容如下所示。文件的每一行對(duì)應(yīng)一條購(gòu)買記錄指厌,從左到右的各列值依次為客戶名稱刊愚、商品名以及商品價(jià)格。
John,iPhone Cover,9.99
John,Headphones,5.49
Jack,iPhone Cover,9.99
Jill,Samsung Galaxy Cover,8.95
Bob,iPad Cover,5.49
Spark 程序?qū)嵗?Java)
/**
* Created by hackx on 9/11/16.
*/
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
/**
* 用Java編寫(xiě)的一個(gè)簡(jiǎn)單的Spark應(yīng)用
*/
public class JavaApp {
public static void main(String[] args) {
/*正如在Scala項(xiàng)目中一樣踩验,我們首先需要初始化一個(gè)上下文對(duì)象鸥诽。值得注意的是,
這里所使用的是JavaSparkContext類而不是之前的SparkContext箕憾。類似地牡借,調(diào)用
JavaSparkContext對(duì)象,利用textFile函數(shù)來(lái)訪問(wèn)數(shù)據(jù)厕九,然后將各行輸入分割成
多個(gè)字段蓖捶。請(qǐng)注意下面代碼的高亮部分是如何使用匿名類來(lái)定義一個(gè)分割函數(shù)的。
該函數(shù)確定了如何對(duì)各行字符串進(jìn)行分割扁远。*/
JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");
// 將CSV格式的原始數(shù)據(jù)轉(zhuǎn)化為(user,product,price)格式的記錄集
JavaRDD data = sc.textFile("data/UserPurchaseHistory.csv").map(new Function<String, String[]>() {
public String[] call(String s) throws Exception {
return s.split(",");
}
});
/*現(xiàn)在可以算一下用Scala時(shí)計(jì)算過(guò)的指標(biāo)。這里有兩點(diǎn)值得注意的地方刻像,一是
下面Java API中有些函數(shù)(比如distinct和count)實(shí)際上和在Scala API中
一樣畅买,二是我們定義了一個(gè)匿名類并將其傳給map函數(shù)。匿名類的定義方式可參
見(jiàn)代碼的高亮部分细睡。*/
// 求總購(gòu)買次數(shù)
long numPurchases = data.count();
// 求有多少個(gè)不同客戶購(gòu)買過(guò)商品
long uniqueUsers = data.map(new Function<String[], String>() {
public String call(String[] strings) throws Exception {
return strings[0];
}
}).distinct().count();
System.out.println("Total purchases: " + numPurchases);
System.out.println("Unique users: " + uniqueUsers);
}
}
Spark 程序?qū)嵗?Python)
"""用Python編寫(xiě)的一個(gè)簡(jiǎn)單Spark應(yīng)用"""
from pyspark import SparkContext
sc = SparkContext("local[2]", "First Spark App")
# 將CSV格式的原始數(shù)據(jù)轉(zhuǎn)化為(user,product,price)格式的記錄集
data = sc.textFile("data/UserPurchaseHistory.csv").map(lambda line:
line.split(",")).map(lambda record: (record[0], record[1], record[2]))
# 求總購(gòu)買次數(shù)
numPurchases = data.count()
# 求有多少不同客戶購(gòu)買過(guò)商品
uniqueUsers = data.map(lambda record: record[0]).distinct().count()
# 求和得出總收入
totalRevenue = data.map(lambda record: float(record[2])).sum()
# 求最暢銷的產(chǎn)品是什么
products = data.map(lambda record: (record[1], 1.0)).
reduceByKey(lambda a, b: a + b).collect()
mostPopular = sorted(products, key=lambda x: x[1], reverse=True)[0]
print "Total purchases: %d" % numPurchases
print "Unique users: %d" % uniqueUsers
print "Total revenue: %2.2f" % totalRevenue
print "Most popular product: %s with %d purchases" % (mostPopular[0], mostPopular[1])
運(yùn)行該腳本的最好方法是在腳本目錄下運(yùn)行如下命令:
>$SPARK_HOME/bin/spark-submit pythonapp.py
參考資料
Spark官網(wǎng)
Spark機(jī)器學(xué)習(xí)