Spark

參考鏈接:https://github.com/wangzhiwubigdata/God-Of-BigData/blob/master/%E5%A4%A7%E6%95%B0%E6%8D%AE%E6%A1%86%E6%9E%B6%E5%AD%A6%E4%B9%A0/Spark%E7%AE%80%E4%BB%8B.md

Spark 概述

Support working sets (of data) through RDD

  • Enabling reuse & fault‐tolerance

RDD Resilient Distributed Dataset 彈性分布式數(shù)據(jù)集

源碼顯示:
abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging

1)RDD是一個抽象類
2)泛型的胰舆,可以支持多種類型: String宇整、Person履肃、User

RDD represents an immutablepartitioned collection of elements that can be operated on in parallel
簡單來說圾亏, RDD是Spark最基本的數(shù)據(jù)抽象,它是只讀的、分區(qū)記錄的集合缀踪,支持并行操作盈匾。

Propeties of RDD

Internally, each RDD is characterized by 5 main properties:

  • A list of partitions 一個 RDD 由一個或者多個分區(qū)組成腾务,用戶可以在創(chuàng)建 RDD 時指定其分區(qū)個數(shù),如果沒有指定削饵,則默認采用程序所分配到的 CPU 的核心數(shù)
  • A function for computing each split/partition 對一個RDD執(zhí)行一個函數(shù)岩瘦,就是對于一個RDD的所有分區(qū)執(zhí)行這個函數(shù)
  • A list of dependencies on other RDDs 保存彼此間的依賴關(guān)系,子RDD丟失了可以從父RDD中重新得到
  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)通過分區(qū)器決定數(shù)據(jù)被存儲在哪個分區(qū)窿撬,目前Spark支 HashPartitioner和 RangeParationer
  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file) 優(yōu)先把作業(yè)調(diào)度到數(shù)據(jù)所在的節(jié)點進行計算:移動數(shù)據(jù)不如移動計算

另外后續(xù)會提到關(guān)于分區(qū):
one task on per partition --- 對于每一個分區(qū)启昧,其都有一個task去處理
one partition could be persisted --- 對于每一個分區(qū),其都可以被持久化

源碼體現(xiàn)特性:

def getPartitions: Array[Partition]  特性一
def compute(split: Partition, context: TaskContext): Iterator[T] 特性二
def getDependencies: Seq[Dependency[_]] = deps  特性三
val partitioner: Option[Partitioner] = None  特性四
def getPreferredLocations(split: Partition): Seq[String] = Nil  特性五

RDD 創(chuàng)建方式

1.Parallelized Collections 已有的集合創(chuàng)建

命令行運行
到 sprak的 bin目錄下 pyspark開啟shell操作   
>>> sc # 命令行運行中系統(tǒng)會自動創(chuàng)建一個SparkContext 名稱為sc
<SparkContext master=local[*] appName=PySparkShell>
>>> sc.getConf() # sc對應的SparkConf
<pyspark.conf.SparkConf object at 0x7f26237eeba8>
>>> data = [1,2,3,4,5]
>>> dataRDD = sc.parallelize(data)
>>> dataRDD.collect()
[1, 2, 3, 4, 5]
>>> dataRDD2 = sc.parallelize(data,5) # 把data切成五份 5 個partition 
# 運行一個partition有1個task劈伴,言下之意對應有5個partition的data密末,有5個task來執(zhí)行
>>> dataRDD2.collect()
[1, 2, 3, 4, 5]

WebUI上查看(4040端口)信息:http://hadoop000:4040

image.png

2.External Datasets 外部存儲資源讀取,例如本地文件系統(tǒng)跛璧,HDFS严里,HBase 或支持 Hadoop InputFormat 的任何數(shù)據(jù)源。

#本地上 讀本地文件 輸入的參數(shù)路徑格式: file://xxx
sc.textFile('file:///home/hadoop/data/hello.txt').collect()
如果是本地操作的話追城,文件必須要能夠被這個工作的節(jié)點訪問得到刹碾。
支持目錄路徑,支持壓縮文件座柱,支持使用通配符
textFile(dir)   textFile(*.txt)    textFile(*.gz) 
textFile:其返回格式是 RDD[String] 迷帜,返回的是就是文件內(nèi)容,RDD 中每一個元素對應一行數(shù)據(jù)

#hadoop集群上讀取   輸入的參數(shù)路徑格式:hdfs://xxx
>>> sc.textFile("hdfs://hadoop000:8020/test/hello.txt").collect()

#還可以使用wholeTextFiles進行讀取
>>> sc.wholeTextFiles('file:///home/hadoop/data/hello.txt').collect()
[('file:/home/hadoop/data/hello.txt', 'hello spark\nhello pyspark\nhello sparksql\n')] 
返回一個list list中每一個元素是一個tuple色洞,第一個是全路徑戏锹,第二個是內(nèi)容
wholeTextFiles:其返回格式是 RDD[(String, String)],元組中第一個參數(shù)是文件路徑锋玲,第二個參數(shù)是文件內(nèi)容

從HDFS上讀取文件時景用,Spark默認每個塊設(shè)置為一個分區(qū)

RDD 算子

  1. transformations
    由于RDD是不可變的集合,所以需要通過轉(zhuǎn)換從現(xiàn)有數(shù)據(jù)集創(chuàng)建新數(shù)據(jù)集
常用操作
1.map(func): 將func函數(shù)作用到數(shù)據(jù)集的每一個元素上,生成一個新的分布式的數(shù)據(jù)集返回 
2.filter(func):選出所有func返回值為true的元素伞插,生成一個新的分布式的數(shù)據(jù)集返回  
3.flatMap(func) 輸入的item能夠被map到0或者多個items輸出割粮,返回值是一個Sequence 
4.groupByKey():把相同的key的數(shù)據(jù)分發(fā)到一起 返回的每一個item (key, ResultIterable) 
5.reduceByKey(): 把相同的key的數(shù)據(jù)分發(fā)到一起并進行相應的計算
6.union
7.distinct
8.join 默認為innerjoin,可以使用leftjoin rightjoin媚污, fulljoin


一個簡單計算wordcount的寫法
sc.textFile("file:///home/hadoop/data/hello.txt").flatMap(lambda line: line.split("\t")).map(lambda x: (x, 1)).reduceByKey(lambda a, b: a + b).collect()
  1. actions
    action操作將數(shù)據(jù)集上運計算后將得到的值返回到driver program或者寫到external storage中去
常見操作
1. collect() 輸出 類似于print
2. take(n) 取前幾個
3. max() min() sum() count()
4. reduce(func)
5. saveAsTextFile()
6. foreach() 對每一個元素進行操作
 
  • transformations are lazy execution, nothing actually happens until an action is called;
  • action triggers the computation;
  • action returns values to driver or writes data to external storage;

Spark的使用

from pyspark import SparkConf,SparkContext

# 1.創(chuàng)建SparkConf:設(shè)置Spark相關(guān)的參數(shù)信息
conf = SparkConf().setMaster("local[2]").setAppName("PySpark")

# 2.創(chuàng)建SparkContext
sc = SparkContext(conf=conf)

## 3.業(yè)務(wù)邏輯 RDD創(chuàng)建和操作
"""
xxxxxxxxx
"""
# 4.清除上下文
sc.stop()

Spark運行模式

  • local (開發(fā)過程使用)
    一般使用的參數(shù)有
    --master 舀瓢, --name , --py-files
提交作業(yè)
./bin/spark-submit            --master local[2]                    -- master url
                              --name spark-local                   -- app name
                              /home/hadoop/script/test.py          -- py file
                              file:///home/hadoop/data/hello.txt   -- 參數(shù)
                              file:///home/hadoop/wc/output

使用場景:一般是取小部分數(shù)據(jù) 通過 spark-submit --master local[*] xx.py [args]在本地跑一跑看看效果

standalone.png

local模式不需要對spark進行任何配置耗美,但是standalone需要進行相應的配置京髓,standalone模式下有worker和master進程

1.配置slaves
$SPARK_HOME/conf/slaves
hadoop000
hadoop001 ...

2.啟動spark集群
$SPARK_HOME/sbin/start-all.sh
ps: 建議在spark-env.sh中添加JAVA_HOME,否則有可能報錯
檢測: jps 查看master和worker進程是否啟動成功

提交作業(yè)
./bin/spark-submit            --master spark://hadoop000:7077        -- master url
                              --name spark-standalone                -- app name
                              /home/hadoop/script/test.py          -- py file
                              hdfs://hadoop000:8020/wc.txt    -- 參數(shù)
                              hdfs://hadoop000:8020/wc/output

注意:使用standalone模式商架,而且節(jié)點個數(shù)大于1的時候堰怨,
使用本地文件測試,必須要保證每個節(jié)點上都有本地測試文件

spark://hadoop000:7077 表示spark提交作業(yè)的端口
WebUI地址端口: 8080
WebUI.png

端口的問題.png
  • yarn

yarn 和 standalone的比較:
https://www.runexception.com/q/3546
yarn: 只需要一個節(jié)點蛇摸,然后提交作業(yè)即可备图,不需要spark集群的(不需要啟動master和worker),spark 做為一個應用程序(application) 的方式運行在 yarn 上
standalone:spark集群上每個節(jié)點都需要部署spark赶袄,需要啟動spark集群(需要master和worker)

提交作業(yè)
./spark-submit         --master yarn 
                       --name spark-yarn         
                       /home/hadoop/script/test.py 
                      hdfs://hadoop000:8020/wc.txt 
                      hdfs://hadoop000:8020/wc/output

Exception in thread "main" java.lang.Exception:When running with master 
'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment
 spark 想要跑在yarn 上必須要知道HDFS 和 yarn 的信息揽涮,不然 spark無法找到y(tǒng)arn 

Ensure that HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory which 
contains the (client side) configuration files for the Hadoop cluster. 
These configs are used to write to HDFS and connect to the YARN ResourceManager.
The configuration contained in this directory will be distributed to the YARN cluster 
so that all containers used by the application use the same configuration.

部署模式 deploy-mode: client cluster: 決定driver啟動的位置(driver運行在哪)
client: driver運行在本地,提交作業(yè)的進程是不能停止饿肺,否則作業(yè)就掛了
cluster:提交完作業(yè)蒋困,那么提交作業(yè)端就可以斷開了,因為driver是運行在application master中
注意:Cluster deploy mode is not applicable to Spark shells cluster的部署模式不能在交互式的pyspark shell中實現(xiàn)

Spark 架構(gòu)

Glossary.png

Application:基于Spark的應用程序 = 1 driver + executors
Driver:Application的 main() 方法的進程敬辣,并創(chuàng)建 SparkContext
Cluster Manager:集群資源管理 例如 spark-submit **--master local[2]** / **spark://hadoop000:7077/yarn** 這些就是集群的管理
Deploy mode:區(qū)分driver運行在哪雪标,cluster模式,運行在集群里面购岗,client模式汰聋,運行在集群外面(本地)
Worker node:執(zhí)行計算任務(wù)的工作節(jié)點(機器)
Executor:位于工作節(jié)點上的進程,負責執(zhí)行計算任務(wù)并且將輸出數(shù)據(jù)保存到內(nèi)存或者磁盤中
Task:Executor 中的工作單元 driver端發(fā)起喊积,發(fā)送到worker node上的executor上去執(zhí)行
Job:并行計算烹困,由多個task構(gòu)成,一個action對應一個job
Stage:一個stage的邊界往往是從某個地方取數(shù)據(jù)開始乾吻,到shuffle的結(jié)束

每個Application都有自己的一系列executors的進程髓梅,這樣的話保證了不同的Applications間的隔離,但同時也導致了不同Applications間數(shù)據(jù)的不可共享性绎签,如果要共享的話枯饿,只能寫到外部文件,然后再去訪問


architecture.png

Spark 緩存

cache transformation一樣诡必,采用lazy execution :沒有遇到action是不會提交作業(yè)到spark上運行

rdd.cache()
底層使用persist方法:
rdd.cache() = rdd.persist(StorageLevel.MEMORY_ONLY)
如果一個RDD在后續(xù)的計算中可能會被使用到奢方,那么建議cache
最底層其實調(diào)用的是StorageLevel類

Spark automatically monitors cache usage on each node 
and drops out old data partitions in a least-recently-used (LRU) fashion. 

# 手動刪除cache
rdd.unpersist()    # unpersist是一個立即執(zhí)行的操作搔扁,不是lazy execution 
緩存級別 .png
  • 緩存策略的選擇:權(quán)衡memory usage and CPU efficiency 一句話:memory 但副本->memory_ser 單副本 -> 硬盤 ->多副本
    • 能用MEMORY_ONLY搞定就用默認的MEMORY_ONLY(most CPU-efficient option)
    • 內(nèi)存不夠的話,嘗試使用MEMORY_ONLY_SER并且選擇一個比較快的序列化庫(省空間蟋字,但是反序列化開銷CPU時間稿蹲,且僅僅Java和Scala使用)

Spark Lineage 機制

Lineage: RDD間的依賴關(guān)系

  • 窄依賴:一個父RDD的partition至多被子RDD的某個partition使用一次(pipline-able)

  • 寬依賴:一個父RDD的partition會被子RDD的partition使用多次,有shuffle操作


    依賴.png

有n個stage對應了n+1個shuffle


stage和shuffle.png

Spark shuffle

xxx

Spark 優(yōu)化

  • 序列化
    Java serialization
    Kryo serialization

  • 內(nèi)存管理 execution & storage
    execution memory refers to that used for computation in shuffles, joins, sorts, and aggregations,
    storage memory refers to that used for caching and propagating internal data across the cluster

  • 廣播變量 (20KB)
    每個 Task會有自由變量的副本鹊奖,如果變量很大且 Task 任務(wù)很多的情況下苛聘,這必然會對網(wǎng)絡(luò) IO 造成壓力
    廣播變量:就是不把副本變量分發(fā)到每個 Task 中,而是將其分發(fā)到每個 Executor忠聚,Executor 中的所有 Task 共享一個副本變量设哗。
    sc.broadcast(data)

  • 數(shù)據(jù)本地化 Data locality:how close data is to the code processing it 數(shù)據(jù)和代碼有多近
    it is faster to ship serialized code from place to place than a chunk of data because code size is much smaller than data
    移動數(shù)據(jù)不如移動計算(代碼)
    類型:
    PROCESS_LOCAL:同一個JVM下
    NODE_LOCAL:同一個節(jié)點
    NO_PREF:所有地方都一樣
    RACK_LOCAL:同一個機架
    ANY:不同機架

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市两蟀,隨后出現(xiàn)的幾起案子网梢,更是在濱河造成了極大的恐慌,老刑警劉巖垫竞,帶你破解...
    沈念sama閱讀 222,729評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件澎粟,死亡現(xiàn)場離奇詭異,居然都是意外死亡欢瞪,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,226評論 3 399
  • 文/潘曉璐 我一進店門徐裸,熙熙樓的掌柜王于貴愁眉苦臉地迎上來遣鼓,“玉大人,你說我怎么就攤上這事重贺∑锼睿” “怎么了?”我有些...
    開封第一講書人閱讀 169,461評論 0 362
  • 文/不壞的土叔 我叫張陵气笙,是天一觀的道長次企。 經(jīng)常有香客問我,道長潜圃,這世上最難降的妖魔是什么缸棵? 我笑而不...
    開封第一講書人閱讀 60,135評論 1 300
  • 正文 為了忘掉前任,我火速辦了婚禮谭期,結(jié)果婚禮上堵第,老公的妹妹穿的比我還像新娘。我一直安慰自己隧出,他們只是感情好踏志,可當我...
    茶點故事閱讀 69,130評論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著胀瞪,像睡著了一般针余。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,736評論 1 312
  • 那天圆雁,我揣著相機與錄音忍级,去河邊找鬼。 笑死摸柄,一個胖子當著我的面吹牛颤练,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播驱负,決...
    沈念sama閱讀 41,179評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼嗦玖,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了跃脊?” 一聲冷哼從身側(cè)響起宇挫,我...
    開封第一講書人閱讀 40,124評論 0 277
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎酪术,沒想到半個月后器瘪,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,657評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡绘雁,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,723評論 3 342
  • 正文 我和宋清朗相戀三年橡疼,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片庐舟。...
    茶點故事閱讀 40,872評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡欣除,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出挪略,到底是詐尸還是另有隱情历帚,我是刑警寧澤,帶...
    沈念sama閱讀 36,533評論 5 351
  • 正文 年R本政府宣布杠娱,位于F島的核電站挽牢,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏摊求。R本人自食惡果不足惜禽拔,卻給世界環(huán)境...
    茶點故事閱讀 42,213評論 3 336
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望睹簇。 院中可真熱鬧奏赘,春花似錦、人聲如沸太惠。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,700評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽凿渊。三九已至梁只,卻和暖如春缚柳,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背搪锣。 一陣腳步聲響...
    開封第一講書人閱讀 33,819評論 1 274
  • 我被黑心中介騙來泰國打工秋忙, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人构舟。 一個月前我還...
    沈念sama閱讀 49,304評論 3 379
  • 正文 我出身青樓灰追,卻偏偏與公主長得像,于是被迫代替她去往敵國和親狗超。 傳聞我的和親對象是個殘疾皇子弹澎,可洞房花燭夜當晚...
    茶點故事閱讀 45,876評論 2 361