1 spark核心概念
1 Application
基于Spark的應(yīng)用程序 =1driver+n executors
User program built on Spark.
Consists of a driver program and executors on the cluster.
例如:一個(gè)py腳本仆葡,或者pyshark/spark-shell
2 Application jar
在java或者scala開發(fā)中會(huì)有,python中較為少見
A jar containing the user's Spark application. In some cases users will want to create an "uber jar" containing their application along with its dependencies. The user's jar should never include Hadoop or Spark libraries, however, these will be added at runtime.
3 Driver program
運(yùn)行一個(gè)main方法并創(chuàng)建一個(gè)sc
The process running the main() function of the application and creating the SparkContext
4 Cluster manager
獲取外部資源的量窘,可以指定需要的資源量
An external service for acquiring resources on the cluster (e.g. standalone manager, Mesos, YARN)
一個(gè)app需要申請(qǐng)笙蒙,driver的內(nèi)存,executors的內(nèi)存嫉沽,
5 Deploy mode
部署模式——driver 運(yùn)行的位置
Distinguishes where the driver process runs. In "cluster" mode, the framework launches the driver inside of the cluster. In "client" mode, the submitter launches the driver outside of the cluster.
6 Worker node
standalone 相當(dāng)于slave節(jié)點(diǎn)他嫡,yarn相當(dāng)于node manager
Any node that can run application code in the cluster
7 Executor
A process launched for an application on a worker node, that runs tasks and keeps data in memory or disk storage across them. Each application has its own executors.
在不同的app中是不共用的枉长。
8 Task
A unit of work that will be sent to one executor
map、shuffle
9 Job
一個(gè)action對(duì)應(yīng)一個(gè)job
A parallel computation consisting of multiple tasks that gets spawned in response to a Spark action (e.g.?save,?collect); you'll see this term used in the driver's logs.
10 stage
一個(gè)stage的邊界是從某個(gè)地方取數(shù)據(jù)開始到suffle結(jié)束
Each job gets divided into smaller sets of tasks called?stagesthat depend on each other (similar to the map and reduce stages in MapReduce); you'll see this term used in the driver's logs.
2 spark 運(yùn)行架構(gòu)以及注意事項(xiàng)
【重要】
Spark applications run as independent sets of processes on a cluster, coordinated by the?SparkContext?object in your main program (called the?driver program).
Specifically, to run on a cluster, the SparkContext can connect to several types ofcluster managers?(either Spark’s own standalone cluster manager, Mesos or YARN), which allocate resources(分配資源) across applications.
Step1 Once connected, Spark acquires?executors?on nodes in the cluster, which are processes that run computations and store data for your application.
完成鏈接后裆蒸,spark的任務(wù)就會(huì)鏈接到executors熔萧,executors會(huì)負(fù)責(zé)處理任務(wù)和數(shù)據(jù)存儲(chǔ)
Step2 Next, it sends your application code (defined by JAR or Python files passed to SparkContext) to the executors.
接下來,SC會(huì)把代碼發(fā)送到executors下
Step3? Finally, SparkContext sends?tasks?to the executors to run.
SparkContext 會(huì)把?tasks發(fā)送到executors 去執(zhí)行僚祷。相當(dāng)于是具體的任務(wù)執(zhí)行的命令佛致。
Each application gets its own executor processes, which stay up for the duration of the whole application and run tasks in multiple threads. This has the benefit of isolating applications from each other, on both the scheduling side (each driver schedules its own tasks) and executor side (tasks from different applications run in different JVMs). However, it also means that data cannot be shared across different Spark applications (instances of SparkContext) without writing it to an external storage system.
每一個(gè)都是獨(dú)立的線程,應(yīng)用程序之間是隔離的辙谜。數(shù)據(jù)不能跨應(yīng)用程序共享俺榆。
Spark is agnostic to the underlying cluster manager. As long as it can acquire executor processes, and these communicate with each other, it is relatively easy to run it even on a cluster manager that also supports other applications (e.g. Mesos/YARN).
spark不關(guān)心底層運(yùn)行的節(jié)點(diǎn)?
The driver program must listen for and accept incoming connections from its executors throughout its lifetime (e.g., see?spark.driver.port in the network config section). As such, the driver program must be network addressable from the worker nodes.
有監(jiān)聽機(jī)制,防止運(yùn)行錯(cuò)誤
Because the driver schedules tasks on the cluster, it should be run close to the worker nodes, preferably on the same local area network. If you’d like to send requests to the cluster remotely, it’s better to open an RPC to the driver and have it submit operations from nearby than to run a driver far away from the worker nodes.
driver要盡可能近的靠近節(jié)點(diǎn)装哆,想要發(fā)送遠(yuǎn)程請(qǐng)求需要開一個(gè)RPC
3 spark和hadoop重要概念區(qū)分
hadoop
1 一個(gè)MR程序-一個(gè)Job
2 一個(gè)Job =1個(gè)N個(gè)Task(Map/Reduce)
3 一個(gè)Task對(duì)對(duì)應(yīng)于一個(gè)進(jìn)程
4 Task運(yùn)行時(shí)開啟進(jìn)程罐脊,Task執(zhí)行完畢后銷毀進(jìn)程,對(duì)于多個(gè)Task蜕琴,開銷較大? (JVM共享對(duì)其無效)
Spark
1 Application= Driver(main方法創(chuàng)建SparkContext)+Executor
2 一個(gè)Application = 0~n個(gè)Job
3 一個(gè)Job = 一個(gè)Action
4 一個(gè)Job =1~n個(gè)Stage
5 一個(gè)Stage = 1~n個(gè)Task
6 一個(gè)Task 有一個(gè)線程萍桌,多個(gè)Task可以并行的方式運(yùn)行在一個(gè)Executor進(jìn)程中
4 Spark Cache詳情
調(diào)用方法:rdd.cache()/persist()
如果啟動(dòng)這個(gè)機(jī)制,會(huì)把storageLevel改正memory only
使用lazy機(jī)制:沒有遇到action 不提交作業(yè)
如果一個(gè)RDD在后續(xù)的計(jì)算中會(huì)被使用凌简,建議Cache
from pyspark import StorageLevel
lines= sc.textFile("file:///home/hadoop/data/page_views.dat")
lines.count()
lines.cache()
lines.count()
lines.persist(StorageLevel.MEMORY_ONLY_2)
如果做了持久化上炎,這樣讀取只有一次
rdd.unprisist()是非lazy的
RDD Persistence
One of the most important capabilities in Spark is?persisting?(or?caching) a dataset in memory across operations. When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use.
cache方法底層是persist,緩存是有容錯(cuò)機(jī)制的雏搂,支持多副本藕施。
如果內(nèi)存較小可以用序列化的方式,較為節(jié)省內(nèi)存畔派。
persist的StorageLevel參數(shù):MEMORY_ONLY,?MEMORY_ONLY_2,?MEMORY_AND_DISK,?MEMORY_AND_DISK_2,?DISK_ONLY, and?DISK_ONLY_2.
持久化策略:內(nèi)存铅碍,單副本,序列化
ps 查看存儲(chǔ)情況:// 192.168.199.102:4040
5 Spark Lineage詳情
Lineage:RDD的依賴關(guān)系
在容錯(cuò)機(jī)制中线椰,按照partition計(jì)算胞谈,無需全部計(jì)算。
問1:如果沒有持久化憨愉,是不是如果第二個(gè)partition丟失了烦绳,還是的從磁盤里讀取后再計(jì)算?
問2:持久化的計(jì)算消耗大么配紫?
6 Spark Dependancy
narrow窄依賴-pipline-abe
一個(gè)父RDD的分區(qū)最多被一個(gè)字RDD使用
——map径密,filiter,union躺孝,join with inputs co-partitioned
wide寬依賴-shuffle
一個(gè)父RDD會(huì)被子RDD使用多次
——groupbykey on non-partitioned data享扔,join with inputs not co-partitioned底桂,repartition,coclesce惧眠,cogroup籽懦,reducebykey
The?reduceByKey?operation generates a new RDD where all values for a single key are combined into a tuple - the key and the result of executing a reduce function against all values associated with that key.?
The?Shuffle?is an expensive operation since it involves disk I/O, data serialization, and network I/O. T
區(qū)別
如果是窄依賴,如果錯(cuò)了只需要重算部分父RDD氛魁,寬依賴要重新計(jì)算所有RDD暮顺。
問:reducebykey之后會(huì)重新分配數(shù)據(jù),那么partition輸出的partition應(yīng)該分幾塊秀存?
寫代碼作業(yè)
lines = sc.textFile
words = flatMap
pairs = map(word,1)
reduceByKey
sc.textFile().flatmap().map().reduceByKey()
可以看到兩個(gè)stage
【作業(yè)】:官網(wǎng)suffle內(nèi)容閱讀
tips
進(jìn)程——process
線程——threads
血源——lineage