Spark 概述
Support working sets (of data) through RDD
- Enabling reuse & fault‐tolerance
RDD Resilient Distributed Dataset 彈性分布式數(shù)據(jù)集
源碼顯示:
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging
1)RDD是一個抽象類
2)泛型的胰舆,可以支持多種類型: String宇整、Person履肃、User
RDD represents an immutable,partitioned collection of elements that can be operated on in parallel
簡單來說圾亏, RDD是Spark最基本的數(shù)據(jù)抽象,它是只讀的、分區(qū)記錄的集合缀踪,支持并行操作盈匾。
Propeties of RDD
Internally, each RDD is characterized by 5 main properties:
- A list of partitions 一個 RDD 由一個或者多個分區(qū)組成腾务,用戶可以在創(chuàng)建 RDD 時指定其分區(qū)個數(shù),如果沒有指定削饵,則默認采用程序所分配到的 CPU 的核心數(shù)
- A function for computing each split/partition 對一個RDD執(zhí)行一個函數(shù)岩瘦,就是對于一個RDD的所有分區(qū)執(zhí)行這個函數(shù)
- A list of dependencies on other RDDs 保存彼此間的依賴關(guān)系,子RDD丟失了可以從父RDD中重新得到
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)通過分區(qū)器決定數(shù)據(jù)被存儲在哪個分區(qū)窿撬,目前Spark支 HashPartitioner和 RangeParationer
- Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file) 優(yōu)先把作業(yè)調(diào)度到數(shù)據(jù)所在的節(jié)點進行計算:移動數(shù)據(jù)不如移動計算
另外后續(xù)會提到關(guān)于分區(qū):
one task on per partition --- 對于每一個分區(qū)启昧,其都有一個task去處理
one partition could be persisted --- 對于每一個分區(qū),其都可以被持久化
源碼體現(xiàn)特性:
def getPartitions: Array[Partition] 特性一
def compute(split: Partition, context: TaskContext): Iterator[T] 特性二
def getDependencies: Seq[Dependency[_]] = deps 特性三
val partitioner: Option[Partitioner] = None 特性四
def getPreferredLocations(split: Partition): Seq[String] = Nil 特性五
RDD 創(chuàng)建方式
1.Parallelized Collections 已有的集合創(chuàng)建
命令行運行
到 sprak的 bin目錄下 pyspark開啟shell操作
>>> sc # 命令行運行中系統(tǒng)會自動創(chuàng)建一個SparkContext 名稱為sc
<SparkContext master=local[*] appName=PySparkShell>
>>> sc.getConf() # sc對應的SparkConf
<pyspark.conf.SparkConf object at 0x7f26237eeba8>
>>> data = [1,2,3,4,5]
>>> dataRDD = sc.parallelize(data)
>>> dataRDD.collect()
[1, 2, 3, 4, 5]
>>> dataRDD2 = sc.parallelize(data,5) # 把data切成五份 5 個partition
# 運行一個partition有1個task劈伴,言下之意對應有5個partition的data密末,有5個task來執(zhí)行
>>> dataRDD2.collect()
[1, 2, 3, 4, 5]
WebUI上查看(4040端口)信息:http://hadoop000:4040
2.External Datasets 外部存儲資源讀取,例如本地文件系統(tǒng)跛璧,HDFS严里,HBase 或支持 Hadoop InputFormat 的任何數(shù)據(jù)源。
#本地上 讀本地文件 輸入的參數(shù)路徑格式: file://xxx
sc.textFile('file:///home/hadoop/data/hello.txt').collect()
如果是本地操作的話追城,文件必須要能夠被這個工作的節(jié)點訪問得到刹碾。
支持目錄路徑,支持壓縮文件座柱,支持使用通配符
textFile(dir) textFile(*.txt) textFile(*.gz)
textFile:其返回格式是 RDD[String] 迷帜,返回的是就是文件內(nèi)容,RDD 中每一個元素對應一行數(shù)據(jù)
#hadoop集群上讀取 輸入的參數(shù)路徑格式:hdfs://xxx
>>> sc.textFile("hdfs://hadoop000:8020/test/hello.txt").collect()
#還可以使用wholeTextFiles進行讀取
>>> sc.wholeTextFiles('file:///home/hadoop/data/hello.txt').collect()
[('file:/home/hadoop/data/hello.txt', 'hello spark\nhello pyspark\nhello sparksql\n')]
返回一個list list中每一個元素是一個tuple色洞,第一個是全路徑戏锹,第二個是內(nèi)容
wholeTextFiles:其返回格式是 RDD[(String, String)],元組中第一個參數(shù)是文件路徑锋玲,第二個參數(shù)是文件內(nèi)容
從HDFS上讀取文件時景用,Spark默認每個塊設(shè)置為一個分區(qū)
RDD 算子
- transformations
由于RDD是不可變的集合,所以需要通過轉(zhuǎn)換從現(xiàn)有數(shù)據(jù)集創(chuàng)建新數(shù)據(jù)集
常用操作
1.map(func): 將func函數(shù)作用到數(shù)據(jù)集的每一個元素上,生成一個新的分布式的數(shù)據(jù)集返回
2.filter(func):選出所有func返回值為true的元素伞插,生成一個新的分布式的數(shù)據(jù)集返回
3.flatMap(func) 輸入的item能夠被map到0或者多個items輸出割粮,返回值是一個Sequence
4.groupByKey():把相同的key的數(shù)據(jù)分發(fā)到一起 返回的每一個item (key, ResultIterable)
5.reduceByKey(): 把相同的key的數(shù)據(jù)分發(fā)到一起并進行相應的計算
6.union
7.distinct
8.join 默認為innerjoin,可以使用leftjoin rightjoin媚污, fulljoin
一個簡單計算wordcount的寫法
sc.textFile("file:///home/hadoop/data/hello.txt").flatMap(lambda line: line.split("\t")).map(lambda x: (x, 1)).reduceByKey(lambda a, b: a + b).collect()
- actions
action操作將數(shù)據(jù)集上運計算后將得到的值返回到driver program或者寫到external storage中去
常見操作
1. collect() 輸出 類似于print
2. take(n) 取前幾個
3. max() min() sum() count()
4. reduce(func)
5. saveAsTextFile()
6. foreach() 對每一個元素進行操作
- transformations are lazy execution, nothing actually happens until an action is called;
- action triggers the computation;
- action returns values to driver or writes data to external storage;
Spark的使用
from pyspark import SparkConf,SparkContext
# 1.創(chuàng)建SparkConf:設(shè)置Spark相關(guān)的參數(shù)信息
conf = SparkConf().setMaster("local[2]").setAppName("PySpark")
# 2.創(chuàng)建SparkContext
sc = SparkContext(conf=conf)
## 3.業(yè)務(wù)邏輯 RDD創(chuàng)建和操作
"""
xxxxxxxxx
"""
# 4.清除上下文
sc.stop()
Spark運行模式
- local (開發(fā)過程使用)
一般使用的參數(shù)有
--master 舀瓢, --name , --py-files
提交作業(yè)
./bin/spark-submit --master local[2] -- master url
--name spark-local -- app name
/home/hadoop/script/test.py -- py file
file:///home/hadoop/data/hello.txt -- 參數(shù)
file:///home/hadoop/wc/output
使用場景:一般是取小部分數(shù)據(jù) 通過 spark-submit --master local[*] xx.py [args]
在本地跑一跑看看效果
local模式不需要對spark進行任何配置耗美,但是standalone需要進行相應的配置京髓,standalone模式下有worker和master進程
1.配置slaves
$SPARK_HOME/conf/slaves
hadoop000
hadoop001 ...
2.啟動spark集群
$SPARK_HOME/sbin/start-all.sh
ps: 建議在spark-env.sh中添加JAVA_HOME,否則有可能報錯
檢測: jps 查看master和worker進程是否啟動成功
提交作業(yè)
./bin/spark-submit --master spark://hadoop000:7077 -- master url
--name spark-standalone -- app name
/home/hadoop/script/test.py -- py file
hdfs://hadoop000:8020/wc.txt -- 參數(shù)
hdfs://hadoop000:8020/wc/output
注意:使用standalone模式商架,而且節(jié)點個數(shù)大于1的時候堰怨,
使用本地文件測試,必須要保證每個節(jié)點上都有本地測試文件
spark://hadoop000:7077 表示spark提交作業(yè)的端口
WebUI地址端口: 8080
- yarn
yarn 和 standalone的比較:
https://www.runexception.com/q/3546
yarn: 只需要一個節(jié)點蛇摸,然后提交作業(yè)即可备图,不需要spark集群的(不需要啟動master和worker),spark 做為一個應用程序(application) 的方式運行在 yarn 上
standalone:spark集群上每個節(jié)點都需要部署spark赶袄,需要啟動spark集群(需要master和worker)
提交作業(yè)
./spark-submit --master yarn
--name spark-yarn
/home/hadoop/script/test.py
hdfs://hadoop000:8020/wc.txt
hdfs://hadoop000:8020/wc/output
Exception in thread "main" java.lang.Exception:When running with master
'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment
spark 想要跑在yarn 上必須要知道HDFS 和 yarn 的信息揽涮,不然 spark無法找到y(tǒng)arn
Ensure that HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory which
contains the (client side) configuration files for the Hadoop cluster.
These configs are used to write to HDFS and connect to the YARN ResourceManager.
The configuration contained in this directory will be distributed to the YARN cluster
so that all containers used by the application use the same configuration.
部署模式 deploy-mode: client cluster: 決定driver啟動的位置(driver運行在哪)
client: driver運行在本地,提交作業(yè)的進程是不能停止饿肺,否則作業(yè)就掛了
cluster:提交完作業(yè)蒋困,那么提交作業(yè)端就可以斷開了,因為driver是運行在application master中
注意:Cluster deploy mode is not applicable to Spark shells cluster的部署模式不能在交互式的pyspark shell中實現(xiàn)
Spark 架構(gòu)
Application:基于Spark的應用程序 = 1 driver + executors
Driver:Application的 main() 方法的進程敬辣,并創(chuàng)建 SparkContext
Cluster Manager:集群資源管理 例如 spark-submit **--master local[2]** / **spark://hadoop000:7077/yarn** 這些就是集群的管理
Deploy mode:區(qū)分driver運行在哪雪标,cluster模式,運行在集群里面购岗,client模式汰聋,運行在集群外面(本地)
Worker node:執(zhí)行計算任務(wù)的工作節(jié)點(機器)
Executor:位于工作節(jié)點上的進程,負責執(zhí)行計算任務(wù)并且將輸出數(shù)據(jù)保存到內(nèi)存或者磁盤中
Task:Executor 中的工作單元 driver端發(fā)起喊积,發(fā)送到worker node上的executor上去執(zhí)行
Job:并行計算烹困,由多個task構(gòu)成,一個action對應一個job
Stage:一個stage的邊界往往是從某個地方取數(shù)據(jù)開始乾吻,到shuffle的結(jié)束
每個Application都有自己的一系列executors的進程髓梅,這樣的話保證了不同的Applications間的隔離,但同時也導致了不同Applications間數(shù)據(jù)的不可共享性绎签,如果要共享的話枯饿,只能寫到外部文件,然后再去訪問
Spark 緩存
cache transformation一樣诡必,采用lazy execution :沒有遇到action是不會提交作業(yè)到spark上運行
rdd.cache()
底層使用persist方法:
rdd.cache() = rdd.persist(StorageLevel.MEMORY_ONLY)
如果一個RDD在后續(xù)的計算中可能會被使用到奢方,那么建議cache
最底層其實調(diào)用的是StorageLevel類
Spark automatically monitors cache usage on each node
and drops out old data partitions in a least-recently-used (LRU) fashion.
# 手動刪除cache
rdd.unpersist() # unpersist是一個立即執(zhí)行的操作搔扁,不是lazy execution
- 緩存策略的選擇:權(quán)衡memory usage and CPU efficiency 一句話:memory 但副本->memory_ser 單副本 -> 硬盤 ->多副本
- 能用MEMORY_ONLY搞定就用默認的MEMORY_ONLY(most CPU-efficient option)
- 內(nèi)存不夠的話,嘗試使用MEMORY_ONLY_SER并且選擇一個比較快的序列化庫(省空間蟋字,但是反序列化開銷CPU時間稿蹲,且僅僅Java和Scala使用)
Spark Lineage 機制
Lineage: RDD間的依賴關(guān)系
窄依賴:一個父RDD的partition至多被子RDD的某個partition使用一次(pipline-able)
-
寬依賴:一個父RDD的partition會被子RDD的partition使用多次,有shuffle操作
依賴.png
有n個stage對應了n+1個shuffle
Spark shuffle
xxx
Spark 優(yōu)化
內(nèi)存管理 execution & storage
execution memory refers to that used for computation in shuffles, joins, sorts, and aggregations,
storage memory refers to that used for caching and propagating internal data across the cluster廣播變量 (20KB)
每個 Task會有自由變量的副本鹊奖,如果變量很大且 Task 任務(wù)很多的情況下苛聘,這必然會對網(wǎng)絡(luò) IO 造成壓力
廣播變量:就是不把副本變量分發(fā)到每個 Task 中,而是將其分發(fā)到每個 Executor忠聚,Executor 中的所有 Task 共享一個副本變量设哗。
sc.broadcast(data)
數(shù)據(jù)本地化 Data locality:how close data is to the code processing it 數(shù)據(jù)和代碼有多近
it is faster to ship serialized code from place to place than a chunk of data because code size is much smaller than data
移動數(shù)據(jù)不如移動計算(代碼)
類型:
PROCESS_LOCAL:同一個JVM下
NODE_LOCAL:同一個節(jié)點
NO_PREF:所有地方都一樣
RACK_LOCAL:同一個機架
ANY:不同機架