Spark Python API Docs(part one)

pyspark package

subpackages

  • pyspark.sql module
  • pyspark.streaming module
  • pyspark.ml package
  • pyspark.mllib package

contents

PySpark是Spark的Python API稽亏。
Public classes:

  • SparkContext:
    Spark功能的主要入口點(diǎn)。
  • RDD:
    彈性分布式數(shù)據(jù)集(RDD)犹菱,Spark中的基本抽象朗儒。
  • Broadcast:
    一個廣播變量颊乘,可以在任務(wù)間重用参淹。
  • Accumulator:
    任務(wù)只能添加值的“add-only”共享變量。
  • SparkConf:
    用于配置Spark乏悄。
  • SparkFiles:
    訪問作業(yè)附帶的文件浙值。
  • StorageLevel:
    更細(xì)粒度的緩存持久化級別。
  • TaskContext:
    有關(guān)當(dāng)前正在運(yùn)行的任務(wù)的信息檩小,可在workers和實(shí)驗(yàn)室中獲得开呐。

class pyspark.SpackConf(loadDefaults=True,_jvm=None,_jconf=None)

Spark應(yīng)用程序的配置。 用于設(shè)置spark變量规求,參數(shù)以鍵值對的形式傳遞筐付。大多數(shù)情況下,您將使用SparkConf()創(chuàng)建一個SparkConf對象阻肿,該對象將從spark.* Java系統(tǒng)屬性中加載值瓦戚。 在這種情況下,您直接在SparkConf對象上設(shè)置的任何參數(shù)都優(yōu)先于系統(tǒng)屬性丛塌。

對于單元測試较解,您也可以調(diào)用SparkConf(false)來跳過加載外部設(shè)置并獲取相同的配置,而不管系統(tǒng)屬性是什么赴邻。這個類中的所有setter方法都支持鏈接哨坪。 例如,你可以寫conf.setMaster("local").setAppName("Myapp")乍楚。

  • 注意:一旦SparkConf對象被傳遞給Spark,它就被克隆届慈,不能再被用戶修改徒溪。
  1. contains(key)
    這個配置是否包含給定的key?
  2. get(key, defaultValue=None)
    獲取配置中某個key的value金顿,否則返回默認(rèn)值臊泌。
  3. getAll()
    獲取所有的值,以key-value pairs list的形式揍拆。
  4. set(key, value)
    設(shè)置配置屬性渠概。
  5. setAll(pairs)
    設(shè)置多個參數(shù),以key-value pairs list的形式傳遞嫂拴。
  6. setAppName(value)
    設(shè)置application的name播揪。
  7. setExecutorEnv(key=None, value=None, pairs=None)
    設(shè)置要傳遞給executors的環(huán)境變量。
  8. setIfMissing(key,value)
    設(shè)置配置屬性(如果尚未設(shè)置)筒狠。
  9. setMaster(value)
    設(shè)置要連接到的 master URL猪狈。
  10. setSparkHome(value)
    設(shè)置在worker nodes安裝spark的path
  11. toDebugString()
    以key=value pairs list的形式返回配置的可打印版本,每行一個辩恼。

class pyspark.SparkContext(master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, gateway=None, jsc=None, profiler_cls=<class 'pyspark.profiler.BasicProfiler'>)

Spark功能的主要入口點(diǎn)雇庙。 SparkContext表示與Spark集群的連接谓形,可用于在該集群上創(chuàng)建RDD和廣播變量。
PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar')

  1. accumulator(value, accum_param=None)
    用給定的初始值創(chuàng)建一個累加器(accumulator)疆前,如果提供的話寒跳,使用給定的AccumulatorParam 幫助對象來定義如何 add 某種數(shù)據(jù)類型的值。 默認(rèn)AccumulatorParams用于整數(shù)和浮點(diǎn)數(shù)竹椒,如果你沒有提供童太。 對于其他類型,可以使用自定義的AccumulatorParam碾牌。
  2. addFile(path, recursive=False)
    在每個節(jié)點(diǎn)上添加一個文件康愤,用這個Spark job下載。傳遞的路徑可以是本地文件舶吗,HDFS中的文件(或其他Hadoop支持的文件系統(tǒng))征冷,也可以是HTTP,HTTPS或FTP URI誓琼。要訪問Spark jobs中的文件检激,請使用文件名L {SparkFiles.get(fileName)<pyspark.files.SparkFiles.get>}來查找其下載位置。如果遞歸(recursive)選項(xiàng)設(shè)置為True腹侣,則可以給出一個目錄叔收。 當(dāng)前目錄僅支持Hadoop支持的文件系統(tǒng)。
#下面代碼通過鍵入 bin/pysaprk 命令進(jìn)入交互式模式運(yùn)行傲隶,sc為sparkContext()的實(shí)例對象,spark已經(jīng)為我們自動創(chuàng)建好了饺律。
import os
from pyspark import SparkFiles
with open('/home/test.txt', 'w') as f: #在本地建一個文件并寫入內(nèi)容
    f.write('100')
#加載文件,若是hdfs中的文件可以這樣寫:
# hdfs://hostname:port/file/path
# 例如 hdfs://master:9000/user/hadoop/test.txt
sc.addFile('/home/test.txt') 
def func(iterator):
    with open(SparkFiles.get('test.txt')) as testFile:
        fileVal = int(testFile.readline())
        return [x * fileVal for x in iterator]
sc.parallelize([1,2,3,4]).mapPartitions(func).collect() #創(chuàng)建一個RDD并把函數(shù)func作用到每個元素上跺株,最后輸出結(jié)果复濒。
[100, 200, 300, 400] #output
  1. addPyFile(path)
    為將來在此SparkContext上執(zhí)行的所有tasks添加一個.py或.zip依賴項(xiàng)。 傳遞的路徑(path)可以是本地文件乒省,HDFS中的文件(或Hadoop支持的其他文件系統(tǒng))巧颈,也可以是HTTP,HTTPS或FTP URI袖扛。
  2. applicationId
    Spark應(yīng)用程序的唯一標(biāo)識符砸泛。 其格式取決于調(diào)度程序的實(shí)現(xiàn)。
  • 例如 local spark app something like 'local-1433865536131'
  • in case of YARN something like ‘a(chǎn)pplication_1433865536131_34483’
sc.applicationId
u'local-1513691270280' #output
  1. binaryFiles(path, minPartitions=None)
    從HDFS蛆封,本地文件系統(tǒng)(所有節(jié)點(diǎn)上都可用)或Hadoop支持的任何文件系統(tǒng)URI讀取二進(jìn)制文件的目錄作為字節(jié)數(shù)組唇礁。 每個被讀取的文件作為單個記錄,并返回一個key-value pair娶吞,其中key是每個文件的路徑垒迂,value是每個文件的內(nèi)容。
  • Note:小文件是首選妒蛇,大文件也是允許的机断,但可能會導(dǎo)致性能不佳楷拳。
  1. binaryRecords(path, recordLength)
  • Note: 實(shí)驗(yàn)階段(Experimental)
    從一個扁平的二進(jìn)制文件加載數(shù)據(jù),假定每個記錄都是一組具有指定數(shù)字格式的數(shù)字(請參閱ByteBuffer)吏奸,并且每個記錄的字節(jié)數(shù)是恒定的欢揖。
    參數(shù)(Parmenters):
    path - 輸入數(shù)據(jù)文件的目錄;
    recordLength - 記錄的分割長度
  1. broadcast(value)
    將一個只讀變量廣播到集群,返回一個L {Broadcast <pyspark.broadcast.Broadcast>}對象來在分布式函數(shù)中讀取它奋蔚。 變量只會發(fā)送到每個群集一次她混。
  2. cancelAllJobs()
    取消所有已被調(diào)度的或正在運(yùn)行的作業(yè)。
  3. cancelJobGroup(groupId)
    取消指定組的活動作業(yè)泊碑。 有關(guān)更多信息坤按,請參見SparkContext.setJobGroup。
  4. defaultMinPartitions
    當(dāng)用戶不給定時馒过,Hadoop RDD分區(qū)的默認(rèn)最小數(shù)量臭脓。
  5. defaultParallelism
    當(dāng)用戶沒有給出時,默認(rèn)的并行度級別(例如 reduce tasks)
  6. dump_profiles(path)
    將配置文件轉(zhuǎn)儲到目錄路徑
  7. emptyRDD()
    創(chuàng)建一個沒有分區(qū)或元素的RDD腹忽。
  8. getConf()
  9. getLocalProperty(key)
    獲取在此線程中設(shè)置的本地屬性来累,如果缺失,則返回null窘奏。 請參閱setLocalProperty
    classmethod getOrCreate(conf=None)
    獲取或?qū)嵗粋€SparkContext并將其注冊為一個單例對象嘹锁。
    parameters:
    conf - SparkConf(optional)
  10. hadoopFile(path, inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None,batchSize=0)
    使用HDFS,本地文件系統(tǒng)(所有節(jié)點(diǎn)上都可用)或任何Hadoop支持的文件系統(tǒng)URI讀取具有任意鍵和值類的“old”Hadoop InputFormat着裹。 機(jī)制與sc.sequenceFile相同领猾。Hadoop配置可以作為Python字典的形式傳入。 這將被轉(zhuǎn)換成Java中的配置骇扇。
    parameters:
    path - hadoop 文件的路徑瘤运;
    inputFormatClass - Hadoop InputFormat的完全限定類名(e.g. "org.apache.hadoop.mapred.TextInputFormat");
    keyClass - key Writable類的完全限定類名(e.g. "org.apache.hadoop.io.Text")匠题;
    valueClass - value Writable類的完全限定類名(e.g. "org.apache.hadoop.io.LongWritable");
    keyConverter - (None by default)但金;
    valueConverter - (None by default)韭山;
    conf - 配置Hadoop,以dict的形式傳入(None by default)冷溃;
    batchSize - 表示為單個Java對象的Python對象的數(shù)量钱磅。(默認(rèn)為0,batchSize自動選擇)
  11. hadoopRDD(inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0)
    從任意的Hadoop配置中讀取具有任意鍵和值類的“old”Hadoop InputFormat似枕,以Python dict 的形式傳入盖淡。 這將被轉(zhuǎn)換成Java中的配置。 機(jī)制與sc.sequenceFile相同凿歼。
  12. newAPIHadoopFile(path, inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0)
    使用HDFS褪迟,本地文件系統(tǒng)(所有節(jié)點(diǎn)上都可用)或任何Hadoop支持的文件系統(tǒng)URI讀取具有任意鍵和值類的“new API”Hadoop InputFormat冗恨。 機(jī)制與sc.sequenceFile相同。hadoop 配置可以以python中dict的形式被傳入味赃,這將被轉(zhuǎn)換成Java中的配置掀抹。
  13. newAPIHadoopRDD(inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0)
    從任意的Hadoop配置中讀取具有任意鍵和值類的“new API”Hadoop InputFormat,以Python dict 的形式傳入心俗。 這將被轉(zhuǎn)換成Java中的配置傲武。 機(jī)制與sc.sequenceFile相同。
  14. parallelize(c, numSlices=None)
    分發(fā)本地Python集合轉(zhuǎn)成RDD形式城榛。 如果輸入表示性能范圍揪利,則建議使用xrange。
sc.parallelize([0, 2, 3, 4, 6], 5) #分成5個partitions
[[0], [2], [3], [4], [6]] # output
sc.parallelize(xrange(0, 6, 2),5)
[[], [0], [], [2], [4]] # output
  1. pickleFile(name, minPartitions=None)
    加載之前使用RDD.saveAsPickleFile方法保存的RDD狠持。
from tempfile import NamedTemporaryFile
#創(chuàng)建一個有名字的臨時文件,并自動刪除
tmpFile = NamedTemporaryFile(delete=True) 
tmpFile.close() #關(guān)閉這個臨時文件
sc.parallelize(range(10)).saveAsPickleFile(tmpFile.name, 5)
sorted(sc.pickleFile(tmpFile.name, 3).collect())
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] #output
  1. range(start, end=None, step=1, numSlices=None)
    創(chuàng)建一個新的包含從開始到結(jié)束(獨(dú)占)的元素的int類型的RDD疟位,通過step增加每個元素。 可以像python內(nèi)置的range()函數(shù)一樣調(diào)用工坊。 如果使用單個參數(shù)調(diào)用献汗,參數(shù)被解釋為結(jié)束,并且start被設(shè)置為0王污。numSlices - 新RDD的分區(qū)數(shù)量罢吃,返回Int類型的RDD。
sc.range(5).collect()
[0, 1, 2, 3, 4]
sc.range(2, 4).collect()
[2, 3]
sc.range(1, 7, 2).collect()
[1, 3, 5]
  1. runJob(rdd, partitionFunc, partitions=None, allowLocal=False)
    在指定的一組分區(qū)上執(zhí)行給定的partitionFunc昭齐,并將結(jié)果作為元素?cái)?shù)組的形式返回尿招。如果未指定“partitions”,則將在所有分區(qū)上運(yùn)行阱驾。
myRDD = sc.parallelize(range(6), 3)
sc.runJob(myRDD, lambda part: [x * x for x in part])
[0, 1, 4, 9, 16, 25]
myRDD =  sc.parallelize(range(6), 3)
myRDD.glom().collect()
[[0, 1], [2, 3], [4, 5]]
#匿名函數(shù)只作用在0和2兩個分區(qū)上
sc.runJob(myRDD, lambda part: [x * x for x in part],[0,2], True) 
[0, 1, 16, 25]
  1. sequenceFile(path, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, minSplits=None, batchSize=0)
    從HDFS就谜,本地文件系統(tǒng)(所有節(jié)點(diǎn)都可用)或任何Hadoop支持的文件系統(tǒng)URI讀取任意鍵和值 Writable class 的Hadoop SequenceFile。minSplits - 數(shù)據(jù)集中的最小分割(default min(2, sc.defaultParallelism))
  2. setCheckpointDir(dirName)
    設(shè)置將RDD作為檢查點(diǎn)的目錄里覆。 如果在群集上運(yùn)行丧荐,該目錄必須是HDFS路徑。
  3. setJobGroup(groupId, description, interruptOnCancel=False)
    為此線程啟動的所有作業(yè)分配一個組ID喧枷,直到組ID被設(shè)置為不同的值或清除虹统。通常,應(yīng)用程序中的執(zhí)行單元由多個Spark actions或jobs組成隧甚。 應(yīng)用程序員可以使用這種方法將所有這些作業(yè)分組在一起车荔,并給出一個組描述。 一旦設(shè)置戚扳,Spark Web UI將把這些作業(yè)與這個組關(guān)聯(lián)起來忧便。應(yīng)用程序可以使用SparkContext.cancelJobGroup取消該組中的所有正在運(yùn)行的jobs。
import threading
from time import sleep
result = "Not Set"
lock = threading.Lock() #定義一個鎖
def map_func(x):
    sleep(100)
    raise Exception("Task should have been cancelled")
def start_job(x):
    global result
    try:
        sc.setJobGroup("job_to_cancel", "some description")
        result = sc.parallelize(range(x)).map(map_func).collect()
    except Exception as e:
        result = "Cancelled"
    lock.release() #打開鎖帽借,釋放資源
def stop_job():
    sleep(5)
    sc.cancelJobGroup("job_to_cancel")
supress = lock.acquire() #鎖定資源
supress = threading.Thread(target=start_job, args=(10, )).start()
supress = threading.Thread(target=stop_job).start()
supress = lock.acquire()
print(result)
Cancelled #output

如果作業(yè)組的interruptOnCancel設(shè)置為true珠增,那么作業(yè)取消將導(dǎo)致在作業(yè)的執(zhí)行程序線程上調(diào)用Thread.interrupt()超歌。 這有助于確保任務(wù)實(shí)際上被及時停止,但由于HDFS-1208的原因切平,默認(rèn)情況下會關(guān)閉握础,HDFS可能會通過將節(jié)點(diǎn)標(biāo)記為死亡來響應(yīng)Thread.interrupt()。

  1. setLocalProperty(key, value)
    設(shè)置一個本地屬性悴品,將影響從此線程提交的作業(yè)禀综,例如Spark Fair Scheduler pool。
  2. setLogLevel(logLevel)
    控制我們的logLevel苔严。 這將覆蓋任何用戶定義的日志設(shè)置定枷。 有效的日志級別包括:ALL,DEBUG届氢,ERROR欠窒,F(xiàn)ATAL,INFO退子,OFF岖妄,TRACE,WARN
    classmethod setSystemProperty(key, value)
    設(shè)置一個Java系統(tǒng)屬性寂祥,如spark.executor.memory荐虐。 這必須在實(shí)例化SparkContext之前調(diào)用。
  3. show_profiles()
    將配置文件統(tǒng)計(jì)信息打印到標(biāo)準(zhǔn)輸出
  4. sparkUser()
    為用戶獲取SPARK_USER丸凭,運(yùn)行SparkContext的用戶福扬。
  5. startTime
    返回Spark Context啟動時的紀(jì)元時間。
  6. statusTracker()
    返回StatusTracker對象
  7. stop()
    終止SparkContext
  8. textFile(name, minPartitions=None, use_unicode=True)
    從HDFS惜犀、本地文件系統(tǒng)(所有節(jié)點(diǎn)都可用)或任何Hadoop支持的文件系統(tǒng)URI讀取文本文件铛碑,并將其作為字符串的RDD返回。如果use_unicode為False虽界,則字符串將保持為str(編碼為utf-8)汽烦,這比unicode更快更小。(在Spark 1.2中添加)
with open('/home/test.txt', 'w') as testFile:
    testFile.write("Hello world!")
#讀取本地文件莉御。例子: file:/home/test.txt
testFile = sc.textFile('file:/home/test.txt')
testFile.collect()
[u'Hello world!'] #output
  1. uiWebUrl
    返回由此SparkContext啟動的SparkUI實(shí)例的URL
  2. union(rdds)
    建立一個RDD列表的聯(lián)合刹缝。這支持具有不同序列化格式的RDDs的unions(),盡管這迫使它們使用默認(rèn)序列化器被重新序列化颈将。
path = os.path.join('/home/', 'union-text.txt')
with open(path, 'w') as testFile:
    testFile.write("hello")
textFile = sc.textFile(path)
textFile.collect()
[u'Hello'] #output
parallelize = sc.parallelize(["World!"])
#union類似于列表的append方法
sorted(sc.union([textFile, parallelize]).collect())
[u'Hello', 'World!'] #output
  1. version
    運(yùn)行此應(yīng)用程序的Spark版本。
  2. wholeTextFiles(path, minPartitions=None, use_unicode=True)
    從HDFS言疗,本地文件系統(tǒng)(所有節(jié)點(diǎn)都可用)或任何支持Hadoop的文件系統(tǒng)URI讀取文本文件的目錄晴圾。 每個文件被讀取為單個記錄,并返回一個鍵值對噪奄,其中鍵是每個文件的路徑死姚,值是每個文件的內(nèi)容人乓。如果use_unicode為False,則字符串將保持為str(編碼為utf-8)都毒,這比unicode更快更小色罚,(在Spark 1.2中添加)。小文件是首選账劲,因?yàn)槊總€文件將被完全加載到內(nèi)存中戳护。
dirPath = os.path.join(/home/, 'files')
os.mkdir(dirPath)
with open(os.path.join(dirPath, '1.txt'), 'w') as file1:
    file1.write('1')
with open(os.path.join(dirPath, '2.txt'), 'w') as file2:
    file2.write('2')
textFiles = sc.wholeTextFiles(dirPath)
sorted(textFiles.collect())
[(u'.../1.txt', u'1'), (u'.../2.txt', u'2')]

class pyspark.SparkFiles

解析通過L {SparkContext.addFile()<pyspark.context.SparkContext.addFile>}添加的文件的路徑。parkFiles僅包含類方法; 用戶不應(yīng)該創(chuàng)建SparkFiles實(shí)例瀑焦。
classmethod get(filename)
獲取通過SparkContext.addFile()添加的文件的絕對路徑腌且。
classmethod getRootDirectory()
獲取通過SparkContext.addFile()添加的文件的根目錄。

class pyspark.RDD(jrdd, ctx, jrdd_deserializer=AutoBatchedSerializer(PickleSerializer()))

彈性分布式數(shù)據(jù)集(RDD)榛瓮,Spark中的基本抽象铺董。 表示可以并行操作的不可變分區(qū)元素集合。

  1. aggregate(zeroValue, seqOp, combOp)
    先聚合每個分區(qū)中的元素禀晓,然后使用給定的組合函數(shù)和一個中性“零值”對所有分區(qū)的結(jié)果進(jìn)行聚合精续。
    函數(shù)op(t1,t2)允許修改t1并將其返回為結(jié)果值粹懒,以避免對象分配; 但是重付,該函數(shù)不應(yīng)該修改t2。
    第一個函數(shù)(seqOp)可以返回與此RDD類型不同的結(jié)果類型U. 因此崎淳,我們需要一個合并T到U的操作和一個合并兩個U的操作堪夭。
seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)
(10, 4) #output
sc.parallelize([]).aggregate((0, 0), seqOp, combOp)
(0, 0) #output
解釋如下:
假設(shè)[1,2,3,4]被分成兩個分區(qū),為 分區(qū)1([1,2])拣凹,分區(qū)2([3,4])
seqOp作用在每一個分區(qū)上
首先用seqOp對分區(qū)1進(jìn)行操作:
       x=(0,0)   y=1    -->   (1,1)   #對分區(qū)進(jìn)行第一次seqOp操作時森爽,x為zero value
       x=(1,1)   y=2    -->   (3,2)   #對分區(qū)進(jìn)行的第二次及以后的seqOp操作,x為前一次seqOp的執(zhí)行結(jié)果
       同樣對分區(qū)2進(jìn)行操作:
       x=(0,0)   y=3    -->   (3,1) 
       x=(3,1)   y=4    -->   (7,2)
然后用combOp對兩個分區(qū)seqOp作用后的結(jié)果進(jìn)行操作:
      分區(qū)1:
      x=(0,0)   y=(3,2)   > (3,2)  #對第一個分區(qū)進(jìn)行combOp操作時嚣镜,x為zero value
      分區(qū)2:
      x=(3,2)   y=(7,2)   > (10,4) #對第二個及以后分區(qū)進(jìn)行combOp操作時爬迟,x為前一分區(qū)combOp處理后的結(jié)果
  1. aggregateByKey(zeroValue, seqFunc, combFunc, numPartitions=None, partitionFunc=<function portable_hash at 0x7f51f1ac0668>)
    使用給定的combine(組合)函數(shù)和一個中性的“零值”來聚合每個鍵的值。這個函數(shù)可以返回一個不同的結(jié)果類型U菊匿,比這個RDD中的值的類型V.因此狼渊,我們需要一個操作來合并一個V到一個U和一個合并兩個U的操作躏筏,前一個操作用于 合并分區(qū)內(nèi)的值,后者用于合并分區(qū)之間的值。 為了避免內(nèi)存分配良瞧,這兩個函數(shù)都允許修改并返回它們的第一個參數(shù),而不是創(chuàng)建一個新的U.
  2. cache()
    使用默認(rèn)存儲級別(MEMORY_ONLY)持久化RDD蚊惯。
  3. cartesian(other)
    返回一個RDD和另一個RDD的笛卡爾乘積吆倦,即所有元素對(a,b)的RDD,其中a在自身中潮瓶,b在另一個中陶冷。
rdd = sc.parallelize([1, 2])
sorted(rdd.cartesian(rdd).collect())
[(1, 1), (1, 2), (2, 1), (2, 2)] #output
  1. checkpoint()
    將此RDD標(biāo)記為檢查點(diǎn)。 它將被保存到使用SparkContext.setCheckpointDir()設(shè)置的檢查點(diǎn)目錄內(nèi)的文件中毯辅,并且對其父RDD的所有引用都將被刪除埂伦。 在此RDD上執(zhí)行任何作業(yè)之前,必須調(diào)用此函數(shù)思恐。 強(qiáng)烈建議將此RDD保存在內(nèi)存中沾谜,否則將其保存在文件中將需要重新計(jì)算。
  2. coalesce(numPartitions, shuffle=False)
    返回一個減少到numPartitions分區(qū)的新RDD壁袄。
    把RDD的分區(qū)數(shù)降低到通過參數(shù)numPartitions指定的值类早。在得到的更大一些數(shù)據(jù)集上執(zhí)行操作,會更加高效嗜逻。
sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect()
[[1], [2, 3], [4, 5]]
sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
[[1, 2, 3, 4, 5]]
  1. cogroup(other, numPartitions=None)
    對于自己或其他中的每個關(guān)鍵字k涩僻,返回一個包含一個元組的結(jié)果RDD,以及該關(guān)鍵字在自身以及其他值中的值列表栈顷。
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])
[(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))]
[('a', ([1], [2])), ('b', ([4], []))]
  1. collect()
    返回包含此RDD中所有元素的列表逆日。
    在驅(qū)動程序中,以數(shù)組的形式返回?cái)?shù)據(jù)集的所有元素萄凤。通常用于filter或其它產(chǎn)生了大量小數(shù)據(jù)集的情況室抽。
  • Note:因?yàn)樗械臄?shù)據(jù)都被加載到驅(qū)動程序的內(nèi)存中,所以只能在結(jié)果數(shù)組很小的情況下使用此方法靡努。
  1. collectAsMap()
    將此RDD中的鍵值對作為字典返回給master坪圾。
  • Note: 因?yàn)樗械臄?shù)據(jù)都被加載到驅(qū)動程序的內(nèi)存中,所以只有在結(jié)果數(shù)據(jù)很小的情況下才能使用此方法惑朦。
m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
print(m)
{1: 2, 3: 4}
m[1]
2
  1. combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions=None, partitionFunc=<function portable_hash at 0x7f51f1ac0668>)
    通用函數(shù)使用一組自定義的聚合函數(shù)來組合每個鍵的元素兽泄。
    將RDD [(K,V)]轉(zhuǎn)換為類型RDD [(K漾月,C)]的結(jié)果病梢,以用于“combined type”C.
    使用者提供三個函數(shù):
  • createCombiner,which turns a V into a C (e.g., creates a one-element list)
  • mergeValue梁肿,to merge a V into a C (e.g., adds it to the end of a list)
  • mergeCombiners蜓陌,to combine two C’s into a single one (e.g., merges the lists)
    為了避免內(nèi)存分配,mergeValue和mergeCombiners都允許修改并返回它們的第一個參數(shù)吩蔑,而不是創(chuàng)建一個新的C.
    另外钮热,用戶可以控制輸出RDD的分區(qū)。
x = sc.parallelize([("a", 1), ("b", 1), ("a", 2)])
def to_list(a):
  return [a]
def append(a, b):
  a.append(b)
  return a
def extend(a, b):
  a.extend(b)
  return a
sorted(x.combineByKey(to_list, append, extend).collect())
[('a', [1, 2]), ('b', [1])]
解釋如下:
由于聚合操作會遍歷分區(qū)中所有的元素烛芬,
因此每個元素(這里指的是鍵值對)的鍵只有兩種情況:出現(xiàn)過和沒出現(xiàn)過隧期。
如果以前沒出現(xiàn)過痴奏,則執(zhí)行的是createCombiner函數(shù),
在該例子中首先是('a',1)中key:a沒有出現(xiàn)過厌秒,
我們執(zhí)行createConmbiner函數(shù)作用在value:1上,變成了列表[1]擅憔。('b',1)同理鸵闪。
如果key以前出現(xiàn)過就執(zhí)行mergeValue函數(shù)作用在value上,
('a',2)暑诸,由于key:a以前出現(xiàn)過所以就把value:2 append到列表[1]上蚌讼,結(jié)果為[1,2]。
mergeCombiners函數(shù)是作用在每個分區(qū)上个榕,因?yàn)橹挥幸粋€分區(qū)篡石,所以并沒有用上這個函數(shù)。
  1. context
    此RDD創(chuàng)建的SparkContext西采。
  2. count()
    返回此RDD中的元素?cái)?shù)量凰萨。
sc.parallelize([2, 3, 4]).count()
3
  1. countApprox(timeout, confidence=0.95)
  • Note: 實(shí)驗(yàn)階段
    count()的近似版本,即使并非所有任務(wù)都已完成械馆,在超時內(nèi)返回潛在的不完整結(jié)果胖眷。
  1. countApproxDistinct(relativeSD=0.05)
  • Note: 實(shí)驗(yàn)階段
    返回RDD中不同元素的近似數(shù)量。
  1. countByKey()
    計(jì)算每個鍵的元素?cái)?shù)量霹崎,并將結(jié)果作為字典返回給master珊搀。
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.countByKey().items())
[('a', 2), ('b', 1)]
  1. countByValue()
    將此RDD中每個唯一值的計(jì)數(shù)返回為(值,計(jì)數(shù))對的字典尾菇。
    sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items())
    [(1, 2), (2, 3)]
  2. distinct(numPartitions=None)
    返回包含此RDD中不同元素的新RDD境析。去重。
sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect())
[1,2,3]
  1. filter(f)
    返回僅包含滿足f的元素的新RDD派诬。
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.filter(lambda x: x % 2 == 0).collect()
[2,4]
  1. first()
    返回此RDD中的第一個元素劳淆。
sc.parallelize([2, 3, 4]).first()
2
  1. flatMap(f, preservesPartitioning=False)
    通過首先將一個函數(shù)應(yīng)用于此RDD的所有元素,然后展平結(jié)果千埃,返回一個新的RDD憔儿。
rdd = sc.parallelize([2, 3, 4])
rdd.flatMap(lambda x: range(1, x)).collect()
[1, 1, 2, 1, 2, 3]
sc.parallelize([(1,2),(3,4)]).flatMap(lambda x: x).collect()
[1, 2, 3, 4]
sc.parallelize([(1,2),(3,4)]).map(lambda x: x).collect()
[(1, 2), (3, 4)]
  1. flatMapValues(f)
    通過flatMap函數(shù)傳遞鍵值對RDD中的每個值而不更改鍵; 這也保留了原來的RDD的分區(qū)。
x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])])
def f(x): 
  return x
x.flatMapValues(f).collect()
[('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]
  1. fold(zeroValue, op)
    使用給定的關(guān)聯(lián)函數(shù)和中性“零值”來聚合每個分區(qū)的元素放可,然后聚合所有分區(qū)的結(jié)果谒臼。
    函數(shù)op(t1,t2)允許修改t1并將其作為結(jié)果值返回耀里,以避免對象分配; 但是蜈缤,它不應(yīng)該修改t2。
from operator import add
sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
15
  1. foldByKey(zeroValue, func, numPartitions=None, partitionFunc=<function portable_hash at 0x7f51f1ac0668>)
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
from operator import add
sorted(rdd.foldByKey(0, add).collect())
[('a', 2), ('b', 1)]
  1. foreach(f)
    將一個函數(shù)應(yīng)用于此RDD的所有元素冯挎。
def f(x): 
  print(x)
sc.parallelize([1, 2, 3]).foreach(f)
3
2
1
  1. foreachPartition(f)
    將此功能應(yīng)用于此RDD的每個分區(qū)底哥。
  2. fullOuterJoin(other, numPartitions=None)
    執(zhí)行self和other的右外連接。
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("c", 8)])
sorted(x.fullOuterJoin(y).collect())
[('a', (1, 2)), ('b', (4, None)), ('c', (None, 8))]
  1. getCheckpointFile()
    獲取此RDD檢查點(diǎn)所在的文件的名稱
    RDD在本地檢查點(diǎn)不定義。
  2. getNumPartitions()
    返回RDD中的分區(qū)數(shù)量
  3. getStorageLevel()
    獲取RDD的當(dāng)前存儲級別趾徽。
  4. glom()
    將通過合并每個分區(qū)內(nèi)的所有元素創(chuàng)建的RDD返回到列表中续滋。
rdd = sc.parallelize([1, 2, 3, 4], 2)
sorted(rdd.glom().collect())
[[1, 2], [3, 4]]
  1. groupBy(f, numPartitions=None, partitionFunc=<function portable_hash at 0x7f51f1ac0668>)
    通過函數(shù)f對RDD進(jìn)行分組。
rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
result = rdd.groupBy(lambda x: x % 2).collect()
sorted([(x, sorted(y)) for (x, y) in result])
[(0, [2, 8]), (1, [1, 1, 3, 5])]
  1. groupByKey(numPartitions=None, partitionFunc=<function portable_hash at 0x7f51f1ac0668>)
  2. groupWith(other, *others)
    cogroup的別名孵奶,但支持多個RDD疲酌。
  3. histogram(buckets)
    使用提供的桶計(jì)算直方圖。 除了最后一個關(guān)閉之外了袁,桶都向右開放朗恳。
    桶必須被排序,不包含任何重復(fù)载绿,并且至少有兩個元素粥诫。
rdd = sc.parallelize(range(51))
rdd.histogram(2) #兩個桶
([0, 25, 50], [25, 26]) 
#第一個桶在區(qū)間[1,25),包含25個元素,第二個桶在區(qū)間[25,50],包含26個元素崭庸。
  1. id()
    此RDD的唯一ID(在其SparkContext中)怀浆。
  2. intersection(other)
    返回此RDD與另一個的交集。
rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
rdd1.intersection(rdd2).collect()
[1, 2, 3]
  1. isCheckpointed()
    返回這個RDD是否是檢查點(diǎn)和物化的冀自,可靠的或本地的揉稚。
  2. isEmpty()
    當(dāng)且僅當(dāng)RDD完全不包含任何元素時才返回true。
  3. isLocallyCheckpointed()
    返回此RDD是否標(biāo)記為本地檢查點(diǎn)熬粗。
  4. join(other, numPartitions=None)
    返回一個RDD搀玖,其中包含所有自身和其他匹配鍵的元素對。
    用于操作兩個鍵值對格式的數(shù)據(jù)集驻呐,操作兩個數(shù)據(jù)集(K,V)和(K,W)返回(K, (V, W))格式的數(shù)據(jù)集灌诅。通過leftOuterJoin、rightOuterJoin含末、fullOuterJoin完成外連接操作猜拾。
    每一對元素將作為(k,(v1佣盒,v2))元組返回挎袜,其中(k,v1)是自身的肥惭,(k盯仪,v2)是另一個的。
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("a", 3)])
sorted(x.join(y).collect())
[('a', (1, 2)), ('a', (1, 3))]
  1. keyBy(f)
    通過應(yīng)用函數(shù)f來創(chuàng)建此RDD中元素的元組蜜葱。
x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x)
x.collect()
[(0, 0), (1, 1), (4, 2)]
  1. keys()
    每個元組的鍵返回一個RDD全景。
m = sc.parallelize([(1, 2), (3, 4)]).keys()
m.collect()
[1, 3]
  1. leftOuterJoin(other, numPartitions=None)
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("c", 8)])
x.fullOuterJoin(y).collect()
[('a', (1, 2)), ('c', (None, 8)), ('b', (4, None))]
x.leftOuterJoin(y).collect()
[('a', (1, 2)), ('b', (4, None))]
x.rightOuterJoin(y).collect()
[('a', (1, 2)), ('c', (None, 8))]
  1. localCheckpoint()
    使用Spark的現(xiàn)有緩存層將此RDD標(biāo)記為本地檢查點(diǎn)。
  2. lookup(key)
    返回RDD中鍵值的列表牵囤。 如果RDD具有已知的分區(qū)程序爸黄,則只需搜索鍵映射到的分區(qū)即可高效地執(zhí)行此操作滞伟。
 sc.parallelize([('a',1),('b',2)]).lookup('b')
[2]
  1. map(f, preservesPartitioning=False)
    通過對這個RDD的每個元素應(yīng)用一個函數(shù)來返回一個新的RDD。
rdd = sc.parallelize(["b", "a", "c"])
sorted(rdd.map(lambda x: (x, 1)).collect())
[('a', 1), ('b', 1), ('c', 1)]
  1. mapPartitions(f, preservesPartitioning=False)
    通過對該RDD的每個分區(qū)應(yīng)用一個函數(shù)來返回一個新的RDD炕贵。
rdd = sc.parallelize([1, 2, 3, 4], 2)
rdd.glom().collect()
[[1, 2], [3, 4]]
def f(iterator): 
  yield sum(iterator)
rdd.mapPartitions(f).collect()
[3, 7]
  1. mapPartitionsWithIndex(f, preservesPartitioning=False)
    通過對該RDD的每個分區(qū)應(yīng)用函數(shù)f梆奈,同時跟蹤原始分區(qū)的索引,返回新的RDD称开。
  2. mapPartitionsWithSplit(f, preservesPartitioning=False)
  • 棄用鉴裹,用mapPartitionsWithIndex(f, preservesPartitioning=False)代替
  1. mapValues(f)
    通過映射函數(shù)傳遞鍵值對RDD中的每個值,而不更改鍵; 這也保留了原來的RDD的分區(qū)钥弯。
x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])])
def f(x): return len(x)
x.mapValues(f).collect()
[('a', 3), ('b', 1)]
  1. max(key=None)
    找到這個RDD中的最大項(xiàng)。
  2. mean()
    計(jì)算這個RDD元素的平均值督禽。
  3. meanApprox(timeout, confidence=0.95)
  • 實(shí)驗(yàn)階段
  1. min(key=None)
    找到此RDD中的最小項(xiàng)脆霎。
  2. name()
    返回此RDD的名稱。
  3. partitionBy(numPartitions, partitionFunc=<function portable_hash at 0x7f51f1ac0668>)
    使用指定的分區(qū)程序返回分區(qū)的RDD副本狈惫。
  4. persist(storageLevel=StorageLevel(False, True, False, False, 1))
    設(shè)置此RDD的存儲級別以便在第一次計(jì)算之后在操作之間保留其值睛蛛。 如果RDD尚未設(shè)置存儲級別,則只能用于分配新的存儲級別胧谈。 如果未指定存儲級別忆肾,則默認(rèn)為(MEMORY_ONLY)。
  5. pipe(command, env=None, checkCode=False)
    以管道(pipe)方式將 RDD的各個分區(qū)(partition)使用 shell命令處理(比如一個 Perl或 bash腳本)菱肖。 RDD的元素會被寫入進(jìn)程的標(biāo)準(zhǔn)輸入(stdin)客冈,將進(jìn)程返回的一個字符串型 RDD(RDD of strings),以一行文本的形式寫入進(jìn)程的標(biāo)準(zhǔn)輸出(stdout)中稳强。
  6. randomSplit(weights, seed=None)
    根據(jù)提供的權(quán)重隨機(jī)分配這個RDD场仲。
rdd = sc.parallelize(range(500), 1)
rdd1, rdd2 = rdd.randomSplit([2, 3], 17)
rdd1.count()
213
rdd2.count()
287
  1. reduce(f)
    使用函數(shù)func聚集數(shù)據(jù)集中的元素,這個函數(shù)func輸入為兩個元素退疫,返回為一個元素渠缕。這個函數(shù)應(yīng)該符合結(jié)合律和交換律,這樣才能保證數(shù)據(jù)集中各個元素計(jì)算的正確性褒繁。
from operator import add
sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
15
  1. reduceByKey(func, numPartitions=None, partitionFunc=<function portable_hash at 0x7f51f1ac0668>)
from operator import add
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 2)])
sorted(rdd.reduceByKey(add).collect())
[('a', 3), ('b', 1)]
  1. reduceByKeyLocally(func)
from operator import add
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 2)])
rdd.reduceByKeyLocally(add)
{'a': 3, 'b': 1}
  1. repartition(numPartitions)
  2. repartitionAndSortWithinPartitions(numPartitions=None, partitionFunc=<function portable_hash at 0x7f51f1ac0668>, ascending=True, keyfunc=<function <lambda> at 0x7f51f1ab3ed8>)
    根據(jù)給定的分區(qū)器對RDD進(jìn)行重新分區(qū)亦鳞,在每個結(jié)果分區(qū)中,按照key值對記錄排序棒坏。這在每個分區(qū)中比先調(diào)用repartition再排序效率更高燕差,因?yàn)樗梢詫⑴判蜻^程在shuffle操作的機(jī)器上進(jìn)行。
  3. rightOuterJoin(other, numPartitions=None)
  4. sample(withReplacement, fraction, seed=None)
    對數(shù)據(jù)采樣俊抵。用戶可以設(shè)定是否有放回(withReplacement)谁不、采樣的百分比(fraction)徽诲、隨機(jī)種子(seed)刹帕。
  5. sampleByKey(withReplacement, fractions, seed=None)
  6. sampleStdev()
    計(jì)算RDD元素的樣本方差(通過除以N-1而不是N來估計(jì)方差中的偏差)吵血。
  7. saveAsHadoopDataset(conf, keyConverter=None, valueConverter=None)
  8. saveAsHadoopFile(path, outputFormatClass, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, conf=None, compressionCodecClass=None)
  9. saveAsNewAPIHadoopDataset(conf, keyConverter=None, valueConverter=None)
  10. saveAsNewAPIHadoopFile(path, outputFormatClass, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, conf=None)
  11. saveAsPickleFile(path, batchSize=10)
  12. saveAsSequenceFile(path, compressionCodecClass=None)
  13. saveAsTextFile(path, compressionCodecClass=None)
    將此RDD保存為文本文件,使用元素的字符串表示形式偷溺。
sc.parallelize(range(10)).saveAsTextFile('file:/home/test')
  1. setName(name)
    為此RDD指定一個名稱蹋辅。
  2. sortBy(keyfunc, ascending=True, numPartitions=None)
    按給定的keyfunc對此RDD進(jìn)行排序。
  3. sortByKey(ascending=True, numPartitions=None, keyfunc=<function <lambda> at 0x7f51f1ab5050>)
  4. stats()
    返回一個StatCounter對象挫掏,在一次操作中捕獲RDD元素的均值侦另,方差和計(jì)數(shù)。
sc.parallelize([1,2,3,4,5]).stats()
(count: 5, mean: 3.0, stdev: 1.41421356237, max: 5, min: 1)
  1. stdev()
    計(jì)算這個RDD元素的標(biāo)準(zhǔn)偏差尉共。
  2. subtract(other, numPartitions=None)
    返回一個值褒傅,這個值包含在self中,且不包含在other中袄友。
x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
y = sc.parallelize([("a", 3), ("c", None)])
sorted(x.subtract(y).collect())
[('a', 1), ('b', 4), ('b', 5)]
  1. subtractByKey(other, numPartitions=None)
  2. sum()
  3. sumApprox(timeout, confidence=0.95)
  • 實(shí)驗(yàn)階段
  1. take(num)
    返回RDD中前num個元素殿托。
  2. takeOrdered(num, key=None)
    返回排序之后的前num個元素。
    返回RDD按自然順序或自定義順序排序后的前n個元素剧蚣。
  3. takeSample(withReplacement, num, seed=None)
    隨機(jī)取num個元素支竹。withReplacement若為True表示有放回的取樣。
    對一個數(shù)據(jù)集隨機(jī)抽樣鸠按,返回一個包含num個隨機(jī)抽樣元素的數(shù)組礼搁,參數(shù)withReplacement指定是否有放回抽樣,參數(shù)seed指定生成隨機(jī)數(shù)的種子目尖。
  4. toDebugString()
  5. toLocalIterator()
    返回包含此RDD中所有元素的迭代器馒吴。 迭代器將消耗與RDD中最大分區(qū)一樣多的內(nèi)存。
rdd = sc.parallelize(range(10))
[x for x in rdd.toLocalIterator()]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
  1. top(num, key=None)
  2. treeAggregate(zeroValue, seqOp, combOp, depth=2)
    以多級樹形模式聚合此RDD的元素瑟曲。
  3. treeReduce(f, depth=2)
  4. union(other)
rdd = sc.parallelize([1, 1, 2, 3])
rdd.union(rdd).collect()
[1, 1, 2, 3, 1, 1, 2, 3]
  1. unpersist()
    將RDD標(biāo)記為非持久性募书,并從內(nèi)存和磁盤中刪除所有塊。
  2. values()
m = sc.parallelize([(1, 2), (3, 4)]).values()
m.collect()
[2, 4]
  1. variance()
    計(jì)算RDD元素的方差测蹲。
  2. zip(other)
x = sc.parallelize(range(0,5))
y = sc.parallelize(range(1000, 1005))
x.zip(y).collect()
[(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]
  1. zipWithIndex()
  2. zipWithUniqueId()

class pyspark.StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication=1)

用于控制RDD存儲的標(biāo)志莹捡。 每個StorageLevel記錄是否使用內(nèi)存,如果內(nèi)存不足扣甲,是否將RDD丟棄到磁盤篮赢,是否以特定于JAVA的序列化格式將數(shù)據(jù)保存在內(nèi)存中,以及是否在多個節(jié)點(diǎn)上復(fù)制RDD分區(qū)琉挖。 還包含一些常用存儲級別MEMORY_ONLY的靜態(tài)常量启泣。 由于數(shù)據(jù)總是在Python端序列化,所有的常量使用序列化的格式示辈。

DISK_ONLY = StorageLevel(True, False, False, False, 1)
DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)
MEMORY_AND_DISK = StorageLevel(True, True, False, False, 1)
MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2)
MEMORY_AND_DISK_SER = StorageLevel(True, True, False, False, 1)
MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, False, 2)
MEMORY_ONLY = StorageLevel(False, True, False, False, 1)
MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2)
MEMORY_ONLY_SER = StorageLevel(False, True, False, False, 1)
MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, False, 2)
OFF_HEAP = StorageLevel(True, True, True, False, 1)

class pyspark.Broadcast(sc=None, value=None, pickle_registry=None, path=None)

使用SparkContext.broadcast()創(chuàng)建的廣播變量寥茫。 通過value方法獲取值。

  1. destroy()
    銷毀與此廣播變量相關(guān)的所有數(shù)據(jù)和元數(shù)據(jù)矾麻。 謹(jǐn)慎使用; 廣播變量一旦被銷毀纱耻,就不能再使用芭梯。 這個方法阻塞直到銷毀完成。
  2. dump(value, f)
  3. load(path)
  4. unpersist(blocking=False)
    在每個executor上刪除這個廣播的緩存副本弄喘。 如果在調(diào)用之后使用廣播玖喘,則需要將其重新發(fā)送給每個executor。
  5. value
    返回廣播的值

class pyspark.Accumulator(aid, value, accum_param)

可以累積的共享變量蘑志,即具有可交換和關(guān)聯(lián)的“add”操作累奈。 Spark群集上的worker tasks可以使用+ =操作符將值添加到累加器,但只有驅(qū)動程序可以訪問其值急但。 workers的更新會自動傳播給驅(qū)動程序澎媒。
雖然SparkContext支持像int和float這樣的原始數(shù)據(jù)類型的累加器,但是用戶也可以通過提供一個自定義的AccumulatorParam對象來為自定義類型定義累加器波桩。

  1. add(term)
  2. value
    獲取累加器的值; 只能在driver program中使用旱幼。

class pyspark.AccumulatorParam

幫助對象,定義如何累積給定類型的值突委。

  1. addInPlace(value1, value2)
    添加累加器數(shù)據(jù)類型的兩個值,返回一個新值; 為了效率冬三,也可以更新value1并返回它匀油。
  2. zero(value)

class pyspark.MarshalSerializer

使用Python的Marshal序列化器序列化對象:
http://docs.python.org/2/library/marshal.html
這個序列化器比PickleSerializer更快,但支持更少的數(shù)據(jù)類型勾笆。

  1. dumps(obj)
  2. loads(obj)

class pyspark.PickleSerializer

使用Python的pickle序列化器序列化對象:
http://docs.python.org/2/library/pickle.html
這個序列化程序支持幾乎所有的Python對象敌蚜,但是可能不像更專用的序列化程序那么快。

  1. dumps(obj)
  2. loads(obj, encoding=None)

class pyspark.StatusTracker(jtracker)

用于monitoring job and stage progress的Low-level報(bào)告API窝爪。
這些API有意提供非常弱的一致性語義; 這些API的消費(fèi)者應(yīng)該準(zhǔn)備好處理空的/丟失的信息弛车。 例如,作業(yè)的階段id可能是已知的蒲每,但狀態(tài)API可能沒有關(guān)于這些階段的細(xì)節(jié)的任何信息纷跛,所以getStageInfo可能會返回?zé)o效的有效階段id。
為了限制內(nèi)存使用量邀杏,這些API只提供最近作業(yè)/階段的信息贫奠。 這些API將提供最后一個spark.ui.retainedStages階段和spark.ui.retainedJobs作業(yè)的信息。

  1. getActiveJobsIds()
    返回包含所有活動jobs的ids的數(shù)組望蜡。
  2. getActiveStageIds()
    返回一個包含所有活動stages的id的數(shù)組唤崭。
  3. getJobIdsForGroup(jobGroup=None)
    返回特定工作組中所有已知作業(yè)的列表。 如果jobGroup為None脖律,則返回所有與作業(yè)組不相關(guān)的作業(yè)谢肾。
    返回的列表可能包含正在運(yùn)行的,失敗的和已完成的作業(yè)小泉,并且可能因該方法的調(diào)用而有所不同芦疏。 該方法不保證結(jié)果中元素的順序冕杠。
  4. getJobInfo(jobId)
    返回一個SparkJobInfo對象,如果作業(yè)信息找不到或被垃圾回收眯分,則返回None拌汇。
  5. getStageInfo(stageId)
    返回SparkStageInfo對象,如果找不到階段信息或垃圾收集弊决,則返回None噪舀。

class pyspark.SparkJobInfo

公開有關(guān)Spark作業(yè)的信息。

class pyspark.SparkStageInfo

公開有關(guān)Spark Stage的信息飘诗。

class pyspark.Profiler(ctx)

PySpark支持自定義分析器与倡,這是為了允許使用不同的分析器,以及輸出不同于BasicProfiler中提供的格式昆稿。

  1. dump(id, path)
    將配置文件轉(zhuǎn)儲到路徑中纺座,id是RDD id。
  2. profile(func)
    做函數(shù)func的分析
  3. show(id)
    將配置文件統(tǒng)計(jì)信息打印到標(biāo)準(zhǔn)輸出溉潭,id是RDD id净响。
  4. stats()
    返回收集的性能分析統(tǒng)計(jì)信息(pstats.Stats)

class pyspark.BasicProfiler(ctx)

BasicProfiler是默認(rèn)的profiler,它是基于cProfile和Accumulator實(shí)現(xiàn)的喳瓣。

  1. profile(func)
    運(yùn)行并分析傳入的方法to_profile馋贤。返回配置文件對象。
  2. stats()

class pyspark.TaskContext

  • 實(shí)驗(yàn)階段
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末畏陕,一起剝皮案震驚了整個濱河市配乓,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌惠毁,老刑警劉巖犹芹,帶你破解...
    沈念sama閱讀 222,183評論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異鞠绰,居然都是意外死亡腰埂,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,850評論 3 399
  • 文/潘曉璐 我一進(jìn)店門蜈膨,熙熙樓的掌柜王于貴愁眉苦臉地迎上來盐固,“玉大人,你說我怎么就攤上這事丈挟〉蟛罚” “怎么了?”我有些...
    開封第一講書人閱讀 168,766評論 0 361
  • 文/不壞的土叔 我叫張陵曙咽,是天一觀的道長蛔趴。 經(jīng)常有香客問我,道長例朱,這世上最難降的妖魔是什么孝情? 我笑而不...
    開封第一講書人閱讀 59,854評論 1 299
  • 正文 為了忘掉前任雅宾,我火速辦了婚禮拨扶,結(jié)果婚禮上御板,老公的妹妹穿的比我還像新娘喉誊。我一直安慰自己,他們只是感情好羔挡,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,871評論 6 398
  • 文/花漫 我一把揭開白布洁奈。 她就那樣靜靜地躺著,像睡著了一般绞灼。 火紅的嫁衣襯著肌膚如雪利术。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,457評論 1 311
  • 那天低矮,我揣著相機(jī)與錄音印叁,去河邊找鬼。 笑死军掂,一個胖子當(dāng)著我的面吹牛轮蜕,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播蝗锥,決...
    沈念sama閱讀 40,999評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼跃洛,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了玛追?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,914評論 0 277
  • 序言:老撾萬榮一對情侶失蹤闲延,失蹤者是張志新(化名)和其女友劉穎痊剖,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體垒玲,經(jīng)...
    沈念sama閱讀 46,465評論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡陆馁,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,543評論 3 342
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了合愈。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片叮贩。...
    茶點(diǎn)故事閱讀 40,675評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖佛析,靈堂內(nèi)的尸體忽然破棺而出益老,到底是詐尸還是另有隱情,我是刑警寧澤寸莫,帶...
    沈念sama閱讀 36,354評論 5 351
  • 正文 年R本政府宣布捺萌,位于F島的核電站,受9級特大地震影響膘茎,放射性物質(zhì)發(fā)生泄漏桃纯。R本人自食惡果不足惜酷誓,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,029評論 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望态坦。 院中可真熱鬧盐数,春花似錦、人聲如沸伞梯。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,514評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽壮锻。三九已至琐旁,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間猜绣,已是汗流浹背灰殴。 一陣腳步聲響...
    開封第一講書人閱讀 33,616評論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留掰邢,地道東北人牺陶。 一個月前我還...
    沈念sama閱讀 49,091評論 3 378
  • 正文 我出身青樓,卻偏偏與公主長得像辣之,于是被迫代替她去往敵國和親掰伸。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,685評論 2 360