一個多月的地鐵閱讀時光,閱讀《Spark for python developers》電子書溉知,不動筆墨不看書鬼譬,隨手在evernote中做了一下翻譯娜膘,多年不習(xí)英語,自娛自樂拧簸。周末整理了一下劲绪,發(fā)現(xiàn)再多做一點(diǎn)就可基本成文了,于是開始這個地鐵譯系列盆赤。
我們將為開發(fā)搭建一個獨(dú)立的虛擬環(huán)境,通過Spark和Anaconda提供的PyData 庫為該環(huán)境補(bǔ)充能力歉眷。 這些庫包括Pandas,Scikit-Learn, Blaze, Matplotlib, Seaborn, 和 Bokeh. 我們的做法如下:
- 使用Anaconda 的Python 發(fā)布包搭建開發(fā)環(huán)境牺六,包括使用 IPython Notebook 環(huán)境來完成我們的數(shù)據(jù)探索任務(wù)。
- 安裝并使Spark以及 PyData 庫正常工作汗捡,例如Pandas淑际,Scikit-Learn, Blaze, Matplotlib, 和 Bokeh.
- 構(gòu)建一個 word count例子程序來保證一切工作正常.
近些年來涌現(xiàn)出了很多數(shù)據(jù)驅(qū)動型的大公司,例如Amazon, Google, Twitter, LinkedIn, 和 Facebook. 這些公司,通過傳播分享扇住,透漏它們的基礎(chǔ)設(shè)施概念春缕,軟件實(shí)踐,以及數(shù)據(jù)處理框架艘蹋,已經(jīng)培育了一個生機(jī)勃勃的開源軟件社區(qū)锄贼,形成了企業(yè)的技術(shù),系統(tǒng)和軟件架構(gòu)女阀,還包括新型的基礎(chǔ)架構(gòu)宅荤,DevOps,虛擬化浸策,云計(jì)算和軟件定義網(wǎng)絡(luò)冯键。
受到Google File System (GFS)啟發(fā),開源的分布式計(jì)算框架Hadoop和MapReduce被開發(fā)出來處理PB級數(shù)據(jù)庸汗。在保持低成本的同時克服了擴(kuò)展的復(fù)雜性惫确,著也導(dǎo)致了數(shù)據(jù)存儲的新生,例如近來的數(shù)據(jù)庫技術(shù),列存儲數(shù)據(jù)庫 Cassandra, 文檔型數(shù)據(jù)庫 MongoDB, 以及圖譜數(shù)據(jù)庫Neo4J改化。
Hadoop, 歸功于他處理大數(shù)據(jù)集的能力昧诱,培育了一個巨大的生態(tài)系統(tǒng),通過Pig, Hive, Impala, and Tez完成數(shù)據(jù)的迭代和交互查詢所袁。
當(dāng)只使用MapReduce的批處理模式時盏档,Hadoop的操作是笨重而繁瑣的。Spark 創(chuàng)造了數(shù)據(jù)分析和處理界的革命燥爷,克服了MapReduce 任務(wù)磁盤IO和帶寬緊張的缺陷蜈亩。Spark 是用 Scala實(shí)現(xiàn)的, 同時原生地集成了 Java Virtual Machine (JVM) 的生態(tài)系統(tǒng). Spark 很早就提供了Python API 并使用PySpark. 基于Java系統(tǒng)的強(qiáng)健表現(xiàn),使 Spark 的架構(gòu)和生態(tài)系統(tǒng)具有內(nèi)在的多語言性.
本書聚焦于PySpark 和 PyData 生態(tài)系統(tǒng) Python 在數(shù)據(jù)密集型處理的學(xué)術(shù)和科學(xué)社區(qū)是一個優(yōu)選編程語言. Python已經(jīng)發(fā)展成了一個豐富多彩的生態(tài)系統(tǒng). Pandas 和 Blaze提供了數(shù)據(jù)處理的工具庫 Scikit-Learn專注在機(jī)器學(xué)習(xí) Matplotlib, Seaborn, 和 Bokeh完成數(shù)據(jù)可視化 因此, 本書的目的是使用Spark and Python為數(shù)據(jù)密集型應(yīng)用構(gòu)建一個端到端系統(tǒng)架構(gòu). 為了把這些概念付諸實(shí)踐 我們將分析諸如 Twitter, GitHub, 和 Meetup.這樣的社交網(wǎng)絡(luò).我們通過訪問這些網(wǎng)站來關(guān)注Spark 和開源軟件社區(qū)的社交活動與交互.
構(gòu)建數(shù)據(jù)密集型應(yīng)用需要高度可擴(kuò)展的基礎(chǔ)架構(gòu)前翎,多語言存儲, 無縫的數(shù)據(jù)集成, 多元分析處理, 和有效的可視化. 下面要描述的數(shù)據(jù)密集型應(yīng)用的架構(gòu)藍(lán)圖將貫穿本書的始終. 這是本書的骨干.
我們將發(fā)現(xiàn)spark在廣闊的PyData 生態(tài)系統(tǒng)中的應(yīng)用場景.
理解數(shù)據(jù)密集型應(yīng)用的架構(gòu)
為了理解數(shù)據(jù)密集型應(yīng)用的架構(gòu) 使用了下面的概念框架 該架構(gòu) 被設(shè)計(jì)成5層:
? 基礎(chǔ)設(shè)施層
? 持久化層
? 集成層
? 分析層
? 參與層
下圖描述了數(shù)據(jù)密集型應(yīng)用框架的五個分層:
從下往上 我們遍歷各層的主要用途.
基礎(chǔ)設(shè)施層(Infrastructure layer)
基礎(chǔ)設(shè)施層主要關(guān)注虛擬化稚配,擴(kuò)展性和持續(xù)集成. 在實(shí)踐中, 虛擬化一詞, 我們指的是開發(fā)環(huán)境 的VirtualBox以及Spark 和Anaconda 的虛擬機(jī)環(huán)境。 如果擴(kuò)展它港华,我們可以在云端創(chuàng)建類似的環(huán)境道川。創(chuàng)建一個隔離的開發(fā)環(huán)境,然后遷移到測試環(huán)境立宜,通過DevOps 工具冒萄,還可以作為持續(xù)集成的一部分被部署到生產(chǎn)環(huán)境,例如 Vagrant, Chef, Puppet, 和Docker. Docker 是一個非常流行的開源項(xiàng)目橙数,可以輕松的實(shí)現(xiàn)新環(huán)境的部署和安裝尊流。本書局限于使用VirtualBox構(gòu)建虛擬機(jī). 從數(shù)據(jù)密集型應(yīng)用架構(gòu)看,我們將在關(guān)注擴(kuò)展性和持續(xù)集成前提下只闡述虛擬化的基本步驟.
持久化層(Persistence layer)
持久化層管理了適應(yīng)于數(shù)據(jù)需要和形態(tài)的各種倉庫灯帮。它保證了多元數(shù)據(jù)存儲的建立和管理崖技。 這包括關(guān)系型數(shù)據(jù)庫如 MySQL和 PostgreSQL;key-value數(shù)據(jù)存儲 Hadoop, Riak, 和 Redis ;列存儲數(shù)據(jù)庫如HBase 和 Cassandra; 文檔型數(shù)據(jù)庫 MongoDB 和 Couchbase; 圖譜數(shù)據(jù)庫如 Neo4j. 持久化層還管理了各種各樣的文件系統(tǒng)钟哥,如 Hadoop's HDFS. 它與各種各樣的存儲系統(tǒng)交互迎献,從原始硬盤到 Amazon S3. 它還管理了各種各樣的文件存儲格式 如 csv, json, 和parquet(這是一個面向列的格式).
集成層(Integration layer)
集成層專注于數(shù)據(jù)的獲取、轉(zhuǎn)移腻贰、質(zhì)量吁恍、持久化、消費(fèi)和控制.基本上由以下的5C來驅(qū)動: connect, collect, correct, compose和consume.這五個步驟描述了數(shù)據(jù)的生命周期银受。它們聚焦于如何獲取有興趣的數(shù)據(jù)集践盼、探索數(shù)據(jù)、反復(fù)提煉使采集的信息更豐富宾巍,為數(shù)據(jù)消費(fèi)做好準(zhǔn)備. 因此, 這些步驟執(zhí)行如下的操作:
- Connect: 目標(biāo)是從各種各樣數(shù)據(jù)源選擇最好的方法.如果存在的話,這些數(shù)據(jù)源會提供APIs,輸入格式,數(shù)據(jù)采集的速率,和提供者的限制.
- Correct: 聚焦于數(shù)據(jù)轉(zhuǎn)移以便于進(jìn)一步處理 同時保證維護(hù)數(shù)據(jù)的質(zhì)量和一致性
- Collect: 哪些數(shù)據(jù)存儲在哪 用什么格式 方便后面階段的組裝和消費(fèi)
- Compose: 集中關(guān)注如何對已采集的各種數(shù)據(jù)集的混搭, 豐富這些信息能夠構(gòu)建一個引入入勝的數(shù)據(jù)驅(qū)動產(chǎn)品咕幻。
- Consume: 關(guān)注數(shù)據(jù)的使用、渲染以及如何使正確的數(shù)據(jù)在正確的時間達(dá)到正確的效果顶霞。
- Control: 這是隨著數(shù)據(jù)肄程、組織锣吼、參與者的增長,早晚需要的第六個附加步驟蓝厌,它保證了數(shù)據(jù)的管控玄叠。
下圖描述了數(shù)據(jù)獲取以及提煉消費(fèi)的迭代過程:
分析層(Analytics layer)
分析層是Spark 處理數(shù)據(jù)的地方,通過各種模型, 算法和機(jī)器學(xué)習(xí)管道從而得出有用的見解. 對我們而言, 本書的分析層使用的是Spark. 我們將在接下來的章節(jié)深入挖掘Spark的優(yōu)良特性. 簡而言之,我們使它足夠強(qiáng)大以致于在單個同一平臺完成多周范式的分析處理拓提。 它允許批處理, 流處理和交互式分析. 在大數(shù)據(jù)集上的批處理盡管有較長的時延單使我們能夠提取模式和見解读恃,也可以在流模式中處理實(shí)時事件。 交互和迭代分析更適合數(shù)據(jù)探索. Spark 提供了Python 和R語言的綁定API代态,通過SparkSQL 模塊和Spark Dataframe, 它提供了非常熟悉的分析接口.
參與層(Engagement layer)
參與層完成與用戶的交互,提供了 Dashboards,交互的可視化和告警. 我們將聚焦在 PyData 生態(tài)系統(tǒng)提供的工具如Matplotlib, Seaborn, 和Bokeh.
理解Spark
Hadoop 隨著數(shù)據(jù)的增長水平擴(kuò)展寺惫,可以運(yùn)行在普通的硬件上, 所以是低成本的. 數(shù)據(jù)密集型應(yīng)用利用可擴(kuò)展的分布處理框架在大規(guī)模商業(yè)集群上分析PB級的數(shù)據(jù). Hadoop 是第一個map-reduce的開源實(shí)現(xiàn). Hadoop 依賴的分布式存儲框架叫做 HDFS(Hadoop Distributed File System). Hadoop 在批處理中運(yùn)行map-reduce任務(wù).Hadoop 要求在每個 map, shuffle,和reduce 處理步驟中將數(shù)據(jù)持久化到硬盤. 這些批處理工作的過載和延遲明顯地影響了性能.
Spark 是一個面向大規(guī)模數(shù)據(jù)處理的快速、分布式蹦疑、通用的分析計(jì)算引擎. 主要不同于Hadoop的特點(diǎn)在于Spark 通過數(shù)據(jù)管道的內(nèi)存處理允許不同階段共享數(shù)據(jù). Spark 的獨(dú)特之處在于允許四種不同的數(shù)據(jù)分析和處理風(fēng)格. Spark能夠用在:
- Batch: 該模式用于處理大數(shù)據(jù)集典型的是執(zhí)行大規(guī)模map-reduce 任務(wù)西雀。
- Streaming: 該模式用于近限處理流入的信息。
- Iterative: 這種模式是機(jī)器學(xué)習(xí)算法歉摧,如梯度下降的數(shù)據(jù)訪問重復(fù)以達(dá)到數(shù)據(jù)收斂艇肴。
- Interactive: 這種模式用于數(shù)據(jù)探索,有用大數(shù)據(jù)塊位于內(nèi)存中叁温,所以Spark的響應(yīng)時間非吃俚浚快。
下圖描述了數(shù)據(jù)處理的4種方式:
Spark 有三種部署方式: 單機(jī)單節(jié)點(diǎn)和兩種分布式集群方式Y(jié)arn(Hadoop 的分布式資源管理器)或者M(jìn)esos(Berkeley 開發(fā)的開源資源管理器券盅,同時可用于Spark):
Spark 提供了一個Scala, Java, Python, and R的多語言接口.
Spark libraries
Spark 時一個完整的解決方案, 有很多強(qiáng)大的庫:
- SparkSQL: 提供 類SQL 的能力 來訪問結(jié)構(gòu)化數(shù)據(jù)帮哈,并交互性地探索大數(shù)據(jù)集
- SparkMLLIB: 用于機(jī)器學(xué)習(xí)的大量算法和一個管道框架
- Spark Streaming: 使用微型批處理和滑動窗口對進(jìn)入的流數(shù)據(jù)T實(shí)現(xiàn)近限分析
- Spark GraphX: 對于復(fù)雜連接的尸體和關(guān)系提供圖處理和計(jì)算
PySpark實(shí)戰(zhàn)
Spark是使用Scala實(shí)現(xiàn)的,整個Spark生態(tài)系統(tǒng)既充分利用了JVM環(huán)境也充分利用了原生的HDFS. Hadoop HDFS是Spark支持的眾多數(shù)據(jù)存儲之一锰镀。 Spark與其相互作用多數(shù)據(jù)源、類型和格式無關(guān).
PySpark 不是Spark的一個Python轉(zhuǎn)寫咖刃,如同Jython 相對于Java泳炉。PySpark 提供了綁定Spark的集成 API,能夠在所有的集群節(jié)點(diǎn)中通過pickle序列化充分使用Python 生態(tài)系統(tǒng)嚎杨,更重要的是, 能夠訪問由Python機(jī)器學(xué)習(xí)庫形成的豐富的生態(tài)系統(tǒng)花鹅,如Scikit-Learn 或者象Pandas那樣的數(shù)據(jù)處理。
當(dāng)我們著有一個Spark 程序的時候, 程序第一件必需要做的事情是創(chuàng)建一個SparkContext 對象枫浙,來告訴Spark如何防蚊雞群刨肃。Python程序會創(chuàng)建PySparkContext。Py4J 是一個網(wǎng)關(guān)將Spark JVM SparkContex于python程序綁定箩帚。應(yīng)用代碼JVM SparkContextserializes
和閉包把他們發(fā)送給集群執(zhí)行.
集群管理器分配資源真友,調(diào)度,運(yùn)送這些閉包給集群上的 Spark workers紧帕,這需要激活 Python 虛擬機(jī).
在每一臺機(jī)器上, 管理 Spark Worker 執(zhí)行器負(fù)責(zé)控制盔然,計(jì)算桅打,存儲和緩存.
這個例子展示了 Spark driver 在本地文件系統(tǒng)上如何管理PySpark context 和Spark context以及如何通過集群管理器與 Spark worker完成交互。
彈性分布數(shù)據(jù)集(RDS愈案,Resilient Distributed Dataset)
Spark 應(yīng)用包含了一個驅(qū)動程序來運(yùn)行用戶的主函數(shù),在集群上創(chuàng)建分布式數(shù)據(jù)集, 并在這些數(shù)據(jù)集上執(zhí)行各種并行操作
(轉(zhuǎn)換和動作 )挺尾。 Spark 應(yīng)用運(yùn)行在獨(dú)立的進(jìn)程集合, 與一個驅(qū)動程序中的一個 SparkContext 協(xié)調(diào)工作。SparkContext 將從集群管理器中分配系統(tǒng)資源 (主機(jī), 內(nèi)存, CPU)站绪。
SparkContext管理執(zhí)行器遭铺,執(zhí)行器來管理集群上的多個worker .驅(qū)動程序中有需要運(yùn)行的Spark 工作。這些工作被分拆成多個任務(wù)恢准,提交給執(zhí)行器來完成魂挂。執(zhí)行器負(fù)責(zé)每臺機(jī)器的計(jì)算,存儲和緩存顷歌。Spark 中的核心構(gòu)建塊是 RDD (Resilient Distributed Dataset). 一個已選元素的數(shù)據(jù)集锰蓬。分布意味著數(shù)據(jù)集可以位于集群的任何節(jié)點(diǎn)。彈性意味著數(shù)據(jù)集在不傷害數(shù)據(jù)計(jì)算進(jìn)程的條件下可以全部或部分丟失眯漩,spark 將重新計(jì)算內(nèi)存中的數(shù)據(jù)關(guān)系芹扭,例如操作 DAG (Directed Acyclic Graph) 基本上,Spark 將RDD的一個狀態(tài)的內(nèi)存快照放入緩存赦抖。如果一臺計(jì)算機(jī)在操作中掛了舱卡, Spark 將從緩存的RDD中重建并操作DAG,從而使RDD從節(jié)點(diǎn)故障中恢復(fù)队萤。
這里有兩類的RDD 操作:
? Transformations: 數(shù)據(jù)轉(zhuǎn)換使用現(xiàn)存的RDD轮锥,并生產(chǎn)一個新轉(zhuǎn)換后的RDD指針。一個RDD是不可變的要尔,一旦創(chuàng)建舍杜,不能更改。 每次轉(zhuǎn)換生成新的RDD. 數(shù)據(jù)轉(zhuǎn)換的延遲計(jì)算的赵辕,只有當(dāng)一個動作發(fā)生時執(zhí)行既绩。如果發(fā)生故障,轉(zhuǎn)換的數(shù)據(jù)世系重建RDD
.
? Actions: 動作是一個RDD觸發(fā)了Spark job还惠,并纏上一個值饲握。一個動作操作引發(fā)Spark 執(zhí)行數(shù)據(jù)轉(zhuǎn)換操作,需要計(jì)算動作返回的RDD蚕键。動作導(dǎo)致操作的一個DAG救欧。 DAG 被編譯到不同階段,每個階段執(zhí)行一系列任務(wù)锣光。 一個任務(wù)是基礎(chǔ)的工作單元笆怠。
這是關(guān)于RDD的有用信息:
- RDD 從一個數(shù)據(jù)源創(chuàng)建,例如一個HDFS文件或一個數(shù)據(jù)庫查詢 .
有三種方法創(chuàng)建 RDD:
∞從數(shù)據(jù)存儲中讀取
∞ 從一個現(xiàn)存的RDD轉(zhuǎn)換
∞使用內(nèi)存中的集合
- RDDs 的轉(zhuǎn)換函數(shù)有 map 或 filter, 它們生成一個新的RDD.
- 一個RDD上的一個動作包括 first, take, collect, 或count 將發(fā)送結(jié)果到Spark 驅(qū)動程序. Spark驅(qū)動程序是用戶與Spark集群交互的客戶端嫉晶。
下圖描述了RDD 數(shù)據(jù)轉(zhuǎn)換和動作:
理解 Anaconda
Anaconda 是由 Continuum(https://www.continuum.io/)維護(hù)的被廣泛使用的Python分發(fā)包. 我們將使用 Anaconda 提供的流行的軟件棧來生成我們的應(yīng)用. 本書中,使用 PySpark和PyData生態(tài)系統(tǒng)骑疆。PyData生態(tài)系統(tǒng)由Continuum維護(hù)田篇,支持并升級,并提供 Anaconda Python 分發(fā)包箍铭。Anaconda
Python分發(fā)包基本避免了python 環(huán)境的安裝過程惡化從而節(jié)約了時間泊柬;我們用它與Spark對接. Anaconda 有自己的包管理工具可以替代傳統(tǒng)的 pip install 和easy_install. Anaconda 也是完整的解決方案,包括一下有名的包如 Pandas, Scikit-Learn, Blaze, Matplotlib, and Bokeh. 通過一個簡單的命令久可以升級任何已經(jīng)安裝的庫:
$ conda update
通過命令可以我們環(huán)境中已安裝庫的列表:
$ conda list
主要組件如下:
- Anaconda: 這是一個免費(fèi)的Python分發(fā)包包含了科學(xué),數(shù)學(xué)诈火,工程和數(shù)據(jù)分析的200多個Python包
- Conda: 包管理器負(fù)責(zé)安裝復(fù)雜軟件棧的所有依賴兽赁,不僅限于 Python ,也可以管理R和其它語言的安裝進(jìn)程冷守。
- Numba: 通過共性能函數(shù)和及時編譯刀崖,提供了加速Python代碼的能力。
- Blaze: 通過統(tǒng)一和適配的接口來訪問提供者的數(shù)據(jù)來實(shí)現(xiàn)大規(guī)模數(shù)據(jù)分析拍摇,包括Python 流處理, Pandas, SQLAlchemy, 和Spark.
- Bokeh: 為巨型流數(shù)據(jù)集提供了交互數(shù)據(jù)的可視化.
- Wakari: 允許我們在一個托管環(huán)境中分享和部署 IPython Notebooks和其它應(yīng)用
下圖展示了 Anaconda 軟件棧中的組件:
搭建Spark 環(huán)境
本節(jié)我們學(xué)習(xí)搭建 Spark環(huán)境:
- 在Ubuntu 14.04的虛擬機(jī)上創(chuàng)建隔離的開發(fā)環(huán)境亮钦,可以不影響任何現(xiàn)存的系統(tǒng)
- 安裝 Spark 1.3.0 及其依賴.
- 安裝Anaconda Python 2.7 環(huán)境包含了所需的庫 例如Pandas, Scikit-Learn, Blaze, and Bokeh, 使用PySpark, 可以通過IPython Notebooks訪問
- 在我們的環(huán)境中搭建后端或數(shù)據(jù)存儲,使用MySQL作為關(guān)系型數(shù)據(jù)庫充活;MongoDB作文檔存儲蜂莉;Cassandra 作為列存儲數(shù)據(jù)庫。 根據(jù)所需處理數(shù)據(jù)的需要混卵,每種存儲服務(wù)于不同的特殊目的映穗。MySQL RDBMs可以使用SQL輕松地完成表信息查詢;如果我們處理各種API獲得的大量JSON類型數(shù)據(jù), 最簡單的方法是把它們存儲在一個文檔里幕随;對于實(shí)時和時間序列信息蚁滋,Cassandra 是最合適的列存儲數(shù)據(jù)庫.
下圖給出了我們將要構(gòu)建的環(huán)境視圖 將貫穿本書的使用:
在Oracle VirtualBox 搭建Ubuntu
搭建一個運(yùn)行Ubuntu 14.04的virtualbox環(huán)境是搭建開發(fā)環(huán)境最安全的辦法,可以避免與現(xiàn)存庫的沖突赘淮,還可以用類似的命令將環(huán)境復(fù)制到云端辕录。
為了搭建Anaconda和Spark的環(huán)境,我們要創(chuàng)建一個運(yùn)行Ubuntu 14.04的virtual box虛擬機(jī).
步驟如下:
- Oracle VirtualBox VM從 https://www.virtualbox.org/wiki/Downloads 免費(fèi)下載,徑直安裝就可以了.
- 裝完 VirtualBox,打開Oracle VM VirtualBox Manager梢卸,點(diǎn)擊按鈕New.
- 給新的VM指定一個名字, 選擇Linux 類型和Ubuntu(64 bit)版本.
- 需要從Ubuntu的官網(wǎng)下載ISO的文件分配足夠的內(nèi)存(4GB推薦) 和硬盤(20GB推薦).我們使用Ubuntu 14.04.1 LTS版本,下載地址: http://www.ubuntu.com/download/desktop.
- 一旦安裝完成, 就可以安裝VirtualBox Guest Additions了 (從VirtualBox 菜單,選擇新運(yùn)行的VM) Devices|Insert Guest Additions CD image. 由于windows系統(tǒng)限制了用戶界面踏拜,可能會導(dǎo)致安裝失敗.
- 一旦鏡像安裝完成,重啟VM,就已經(jīng)可用了.打開共享剪貼板功能是非常有幫助的。選擇VM點(diǎn)擊 Settings, 然后General|Advanced|Shared Clipboard 再點(diǎn)擊 Bidirectional.
安裝Anaconda的Python 2.7版本
PySpark當(dāng)前只能運(yùn)行在Python 2.7(社區(qū)需求升級到Python 3.3)低剔,安裝Anaconda, 按照以下步驟:
- 下載 Linux 64-bit Python 2.7的Anaconda安裝器 http://continuum.io/downloads#all.
- 下載完Anaconda 安裝器后, 打開 terminal進(jìn)入到它的安裝位置.在這里運(yùn)行下面的命令, 在命令中替換2.x.x為安裝器的版本號:
#install anaconda 2.x.x
#bash Anaconda-2.x.x-Linux-x86[_64].sh
- 接受了協(xié)議許可后, 將讓你確定安裝的路徑 (默認(rèn)為 ~/anaconda).
- 自解壓完成后, 需要添加 anaconda 執(zhí)行路徑到 PATH 的環(huán)境變量:
# add anaconda to PATH
bash Anaconda-2.x.x-Linux-x86[_64].sh
安裝 Java 8
Spark運(yùn)行在 JVM之上所以需要安裝Java SDK而不只是JRE, 這是我們構(gòu)建spark應(yīng)用所要求的. 推薦的版本是Java Version 7 or higher. Java 8 是最合適的, 它包安裝 Java 8, 安裝以下步驟:
- 安裝 Oracle Java 8 使用的命令如下:
# install oracle java 8
$ sudo apt-get install software-properties-common
$ sudo add-apt-repository ppa:webupd8team/java
$ sudo apt-get update
$ sudo apt-get install oracle-java8-installer
- 設(shè)置 JAVA_HOME 環(huán)境變量,保證Java 執(zhí)行程序在PATH中.
3.檢查JAVA_HOME 是否被正確安裝:
#
$ echo JAVA_HOME
安裝 Spark
首先瀏覽一下Spark的下載頁 http://spark.apache.org/downloads.
html.
它提供了多種可能來下載Spark的早期版本,不同的分發(fā)包和下載類型肮塞。 我們選擇最新的版本. pre-built for Hadoop 2.6 and later. 安裝 Spark 的最簡方法是使用 Spark
package prebuilt for Hadoop 2.6 and later, 而不是從源代碼編譯襟齿,然后 移動 ~/spark 到根目錄下。下載最新版本 Spark—Spark 1.5.2, released on November 9, 2015:
選擇Spark 版本 1.5.2 (Nov 09 2015),
選擇包類型 Prebuilt for Hadoop 2.6 and later,
選擇下載類型 Direct Download,
下載spark: spark-1.5.2-bin-hadoop2.6.tgz,
- 驗(yàn)證 1.3.0 簽名校驗(yàn)枕赵,也可以運(yùn)行:
# download spark
$ wget http://d3kbcqa49mib13.cloudfront.net/spark-1.5.2-bin-hadoop2.6.tgz
接下來, 我們將提取和清理文件:
`
extract,clean up,move the unzipped files under the spark directory
$ rm spark-1.5.2-bin-hadoop2.6.tgz
$ sudo mv spark-* spark
`
現(xiàn)在猜欺,我們能夠運(yùn)行 Spark 的 Python 解釋器了:
# run spark
$ cd ~/spark
./bin/pyspark
應(yīng)該可以看到類似這樣的效果:
解釋器已經(jīng)提供了一個Spark context 對象, sc, 我們可以看到:
`
>>>print(sc) <pyspark.context.SparkContext object at 0x7f34b61c4e50>
`
使用 IPython Notebook
IPython Notebook 比控制臺擁有更更友好的用戶體驗(yàn).
可以使用如新命令啟動IPython Notebook:
$ IPYTHON_OPTS="notebook --pylab inline" ./bin/pyspark
在目錄 examples/AN_Spark,啟動PySpark和IPYNB或者在Jupyter或
IPython Notebooks 的安裝目錄啟動:
# cd to /home/an/spark/spark-1.5.0-bin-hadoop2.6/examples/AN_Spark
$ IPYTHON_OPTS='notebook' /home/an/spark/spark-1.5.0-bin-hadoop2.6/bin/
pyspark --packages com.databricks:spark-csv_2.11:1.2.0
# launch command using python 3.4 and the spark-csv package:
$ IPYTHON_OPTS='notebook' PYSPARK_PYTHON=python3
/home/an/spark/spark-1.5.0-bin-hadoop2.6/bin/pyspark --packages com.databricks:spark-csv_2.11:1.2.0
構(gòu)建在 PySpark上的第一個應(yīng)用
我們已經(jīng)檢查了一切工作正常拷窜,將word
count 作為本書的第一個實(shí)驗(yàn)是義不容辭的:
# Word count on 1st Chapter of the Book using PySpark
import re
# import add from operator module
from operator import add
# read input file
file_in = sc.textFile('/home/an/Documents/A00_Documents/Spark4Py
20150315')
# count lines
print('number of lines in file: %s' % file_in.count())
# add up lengths of each line
chars = file_in.map(lambda s: len(s)).reduce(add)
print('number of characters in file: %s' % chars)
# Get words from the input file
words =file_in.flatMap(lambda line: re.split('\W+', line.lower().
strip()))
# words of more than 3 characters
swords = words.filter(lambda x: len(x) > 3)
# set count 1 per word
words = words.map(lambda w: (w,1))
# reduce phase - sum count all the words
words = words.reduceByKey(add)
在這個程序中, 首先從目錄 /home/an/
Documents/A00_Documents/Spark4Py 20150315 中讀取文件到 file_in. 然后計(jì)算文件的行數(shù)以及每行的字符數(shù).
我們把文件拆分成單詞并變成小寫开皿。 為了統(tǒng)計(jì)單詞的目的, 我們選擇多于三個字符的單詞來避免象 the, and, for 這樣的高頻詞. 一般地, 這些被認(rèn)為是停詞涧黄,應(yīng)該被語言處理任務(wù)過濾掉 .
在該階段,我們準(zhǔn)備了 MapReduce 步驟赋荆,每個單詞 map 為值1笋妥, 計(jì)算所有唯一單詞的出現(xiàn)數(shù)量.
這是IPython Notebook中的代碼描述. 最初的 10 cells是在數(shù)據(jù)集上的單詞統(tǒng)計(jì)預(yù)處理 數(shù)據(jù)集在本地文件中提取.
以(count, word)格式交換詞頻統(tǒng)計(jì)元組是為了把count作為元組的key 進(jìn)行排序 :
# create tuple (count, word) and sort in descending
words = words.map(lambda x: (x[1], x[0])).sortByKey(False)
# take top 20 words by frequency
words.take(20)
未來顯示結(jié)果, 我們創(chuàng)建(count, word) 元組來以逆序顯示詞頻出現(xiàn)最高的20個詞:
生成直方圖:
# create function for histogram of most frequent words
% matplotlib inline
import matplotlib.pyplot as plt
#
def histogram(words):
count = map(lambda x: x[1], words)m
word = map(lambda x: x[0], words)
plt.barh(range(len(count)), count,color = 'grey')
plt.yticks(range(len(count)), word)
# Change order of tuple (word, count) from (count, word)
words = words.map(lambda x:(x[1], x[0]))
words.take(25)
# display histogram
histogram(words.take(25))
我們可以看到以直方圖形式畫出的高頻詞,我們已經(jīng)交換了初識元組 (count,word) 為(word, count):

所以,我們也已經(jīng)回顧了本章所有的高頻詞 Spark, Data 和 Anaconda.