driver進(jìn)程用于運(yùn)行用戶的主程序,然后在集群的機(jī)子上分布執(zhí)行并行操作犀斋。
概念
RDD
RDD resilient distributed dataset,是分布在集群節(jié)點(diǎn)中的各數(shù)據(jù)元素分片的集合,可被并行地操作楚里。
RDD是通過讀取hdfs中的文件或是通過已經(jīng)存在的集合轉(zhuǎn)換。
shared variables
在分布式執(zhí)行時(shí)优训,傳遞的是變量的復(fù)制朵你,如果需要在任務(wù)之間共享的:
broadcast variables
accumulators
連接
SparkContext是用于告知Spark如何連接到集群中
conf = SparkConf().setAppName(appName)
# 但是首先得創(chuàng)建一個(gè)SparkConf
# 可以在此處直接調(diào)用setmaster設(shè)置運(yùn)行方式 但是一般會(huì)在運(yùn)行時(shí)通過參數(shù)設(shè)置
sc = SparkContext(conf=conf)
如果使用的是shell,則已經(jīng)有了創(chuàng)建好的SparkContext sc來使用揣非,不能再次創(chuàng)建抡医。可在運(yùn)行是加--py-file早敬、--packages忌傻、--repositories來添加python依賴。
RDD
創(chuàng)建的兩種方式
parallelized collections
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
# 會(huì)根據(jù)集群的配置情況自動(dòng)分片
# 然后復(fù)制到各節(jié)點(diǎn)來形成分布式的數(shù)據(jù)集 可以并行地操作
值得注意的是parallelize可接受第二個(gè)參數(shù)來設(shè)置分片的數(shù)量
parallelize(data, 10)
external dataset
distFile = sc.textFile("data.txt")
# 讀取text文件 可以使用hdfs s3n的uri
# 如果使用的是本地文件路徑 需要所有worker的對(duì)應(yīng)路徑上都有
# 支持文件路徑 文件名稱通配符 壓縮
textFile("/my/directory")
textFile("/my/directory/*.txt")
textFile("/my/directory/*.gz")
textfile也可以接受第二個(gè)參數(shù)聲明文件分片大小 默認(rèn)是128MB
除了textfile外還可以使用
wholeTextFiles可以讀取路徑下的所有文件作為鍵值對(duì)返回(一般是處理目錄下包含多個(gè)小文件的情況)
saveAsPickleFile pickleFile 可以按python的Pickle方式存取 默認(rèn)的batch大小是10
rdd.saveAsSequenceFile()
sc.sequenceFile()
sequenceFile和HDFS
operation
有兩種操作類型:
變換:從已存在的dataset中創(chuàng)建出來
動(dòng)作:通過一定的操作計(jì)算后的返回值
basic
lines = sc.textFile("")
lineLength = lines.map(lambda s: len(s))
lineLength.persist()
totalLength = lineLength.reduce(lambda a, b: a+b)
傳遞
lambda
本地函數(shù)(作用域內(nèi)定義的函數(shù))
全局函數(shù)
雖然說可以傳遞類的方法搞监,但是這樣會(huì)傳遞整個(gè)對(duì)象水孩。如果用到了類,最好是把使用到的類中的東西接出來到局部變量中然后傳遞琐驴。
作用域
如果定義了一個(gè)函數(shù)俘种,然后通過rdd的foreach傳遞運(yùn)行該函數(shù),如果在函數(shù)中引用的是driver的全局變量绝淡,則可能會(huì)有問題宙刘。
在調(diào)用分布式函數(shù)之前,spark會(huì)計(jì)算該任務(wù)的作用域牢酵,即必須對(duì)執(zhí)行器可見的變量和方法悬包,然后把該作用域序列化并傳遞給各個(gè)執(zhí)行器。
傳遞給執(zhí)行器的是一份復(fù)制的變量馍乙,每個(gè)執(zhí)行器操作的是他自己的變量玉罐,所以driver中的全局變量不變。
但是如果是在本地運(yùn)行的同時(shí)是在一個(gè)jvm中潘拨,那么全局變量可能是會(huì)被修改的吊输。但是應(yīng)該是accumulater來實(shí)現(xiàn)這一功能。
對(duì)于變量的打印铁追,如果在集群模式下運(yùn)行季蚂,打印的輸出是各個(gè)節(jié)點(diǎn)。正確的方式是先調(diào)用collect方法來收集到本地琅束。如果只想看一些元素扭屁,可以調(diào)用take
變換
map(func) 把func作用于rdd中的每個(gè)元素 返回
filter(func) 返回func為true的元素
flatmap(func) func的返回值是 seq,把func作用到rdd中的每個(gè)元素
mapPartitions(func)
mapPartitionsWithIndex(func)
sample(withReplacement, fraction, seed) 抽樣
union(rdd) 合集
intersection(rdd) 交集
distinct([numTasks])
groupByKey reduceByKey aggregateByKey SortByKey
join
cogroup
cartesian
pipe
coalesce
repartition
repartitionAndSortWithinPartitions
動(dòng)作
reduce(func) func接受兩個(gè)參數(shù)然后返回一個(gè)值
collect()
count()
first()
take(n)
takeSample(withReplacement, num, seed)
takeOrdered(n, [ordering])
saveAsTextFile(path)
saveAsSequenceFile(path)
saveAsObjectFile(path)
countByKey()
foreach(func)
shuffle operation
是spark重新分布數(shù)據(jù)的機(jī)制 通常會(huì)觸發(fā)執(zhí)行器和機(jī)器的數(shù)據(jù)復(fù)制涩禀,是一個(gè)耗時(shí)料滥、復(fù)雜的動(dòng)作,包含:repartition coalesce groupByKey reduceByKey cogroup join
RDD持久化
MEOMORY_ONLY
MEMORY_AND_DISK
MEMORY_ONLY_SER
MEMORY_AND_DISK_SER
DISK_ONLY
MEMORY_ONLY_2 MEMORY_AND_DISK_2...
spark會(huì)在一些shuffle操作時(shí)自動(dòng)持久化艾船,例如reducebykey
可以顯式調(diào)用unpersist
Shared Variable
Broadcast Variables
broadcast = sc.broadcast([1, 2, 3])
broadcast.value
Accumulators
只有driver可以讀取accumulator的數(shù)據(jù)葵腹,其他執(zhí)行器只能加