http://spark.apache.org/docs/latest/api/python/index.html
pyspark軟件包
子包
內(nèi)容
PySpark是Spark的Python API扭勉。
公開課:
Spark功能的主要入口點瓤狐。
RDD:
彈性分布式數(shù)據(jù)集(RDD)泄鹏,Spark中的基本抽象萨西。
一個廣播變量,可以在任務(wù)之間重用。
任務(wù)只能添加值的“只添加”共享變量。
用于配置Spark骤肛。
訪問作業(yè)附帶的文件。
更細(xì)粒度的緩存持久性級別窍蓝。
關(guān)于當(dāng)前正在運行的任務(wù)的信息腋颠,可在工人和實驗室中獲得。
class?pyspark.SparkConf(loadDefaults = True吓笙,_jvm = None淑玫,_jconf = None?)[source]
Spark應(yīng)用程序的配置。用于將各種Spark參數(shù)設(shè)置為鍵值對面睛。
大多數(shù)情況下絮蒿,您將創(chuàng)建一個SparkConf對象SparkConf(),該對象?將從spark叁鉴。*?Java系統(tǒng)屬性中加載值土涝。在這種情況下,您直接在SparkConf對象上設(shè)置的任何參數(shù)都優(yōu)先于系統(tǒng)屬性亲茅。
對于單元測試回铛,您也可以調(diào)用SparkConf(false)跳過加載外部設(shè)置并獲取相同的配置狗准,而不管系統(tǒng)屬性如何克锣。
這個類中的所有setter方法都支持鏈接。例如腔长,您可以編寫conf.setMaster(“l(fā)ocal”)袭祟。setAppName(“My app”)。
注意
一旦將SparkConf對象傳遞給Spark捞附,它就會被克隆巾乳,并且不能再由用戶修改。
contains(key?)[source]
此配置是否包含給定的密鑰鸟召?
get(key胆绊,defaultValue = None?)[source]
獲取某個鍵的配置值,否則返回默認(rèn)值欧募。
getAll()[source]
獲取所有值作為鍵值對的列表压状。
set(key,value?)[source]
設(shè)置配置屬性。
setAll(雙)[來源]
設(shè)置多個參數(shù)种冬,作為鍵值對列表傳遞镣丑。
參數(shù):對?- 要設(shè)置的鍵值對列表
setAppName(價值)[來源]
設(shè)置應(yīng)用程序名稱
setExecutorEnv(key = None,value = None娱两,pairs = None?)[source]
設(shè)置要傳遞給執(zhí)行者的環(huán)境變量莺匠。
setIfMissing(key,value?)[source]
設(shè)置配置屬性(如果尚未設(shè)置)十兢。
setMaster(價值)[來源]
設(shè)置要連接的主要URL趣竣。
setSparkHome(價值)[來源]
設(shè)置工作站節(jié)點上安裝Spark的路徑。
toDebugString()[source]
以鍵=值對的列表形式返回配置的可打印版本旱物,每行一個期贫。
類pyspark.SparkContext(master = None,appName = None异袄,sparkHome = None通砍,pyFiles = None,environment = None烤蜕,batchSize = 0封孙,serializer = PickleSerializer(),conf = None讽营,gateway = None虎忌,jsc = None,profiler_cls = ?)[source]
Spark功能的主要入口點橱鹏。SparkContext表示與Spark集群的連接膜蠢,并可用于RDD在該集群上創(chuàng)建和廣播變量。
PACKAGE_EXTENSIONS=('.zip'莉兰,'.egg'挑围,'.jar')
accumulator(value,accum_param = None?)[source]
創(chuàng)建一個Accumulator給定的初始值糖荒,使用給定的?AccumulatorParam幫助對象來定義如何提供數(shù)據(jù)類型的值杉辙。如果您不提供一個,則默認(rèn)AccumulatorParams用于整數(shù)和浮點數(shù)捶朵。對于其他類型蜘矢,可以使用自定義的AccumulatorParam。
addFile(path综看,recursive = False?)[source]
在每個節(jié)點上使用此Spark作業(yè)添加要下載的文件品腹。該path傳遞可以是本地文件,在HDFS(或其他Hadoop的支持的文件系統(tǒng))的文件红碑,或HTTP舞吭,HTTPS或FTP URI。
要訪問Spark作業(yè)中的文件,請使用文件名L {SparkFiles.get(fileName)}來查找其下載位置镣典。
如果遞歸選項設(shè)置為True兔毙,則可以提供一個目錄。當(dāng)前目錄僅支持Hadoop支持的文件系統(tǒng)兄春。
>>> from pyspark import SparkFiles >>> path = os 澎剥。路徑。加入(tempdir 赶舆,“test.txt” )>>> with open (path 哑姚,“w” )as testFile :... _ = testFile 。寫(“100” )>>> sc 芜茵。addFile (路徑)>>> DEF FUNC (迭代): 叙量。與開放式(SparkFiles 。獲得(“的test.txt” ))為TESTFILE :... fileVal = INT (TESTFILE 九串。的ReadLine ())... 返回[ X * fileVal 為X 在迭代器] >>> SC 绞佩。并行化([ 1 ,2 猪钮,3 品山,4 ]) 。mapPartitions (func )烤低。搜集()[100,200,300,400]
addPyFile(path?)[source]
為將來在此SparkContext上執(zhí)行的所有任務(wù)添加.py或.zip依賴項肘交。該path傳遞可以是本地文件,在HDFS(或其他Hadoop的支持的文件系統(tǒng))的文件扑馁,或HTTP涯呻,HTTPS或FTP URI。
applicationId
Spark應(yīng)用程序的唯一標(biāo)識符腻要。其格式取決于調(diào)度程序的實現(xiàn)复罐。
在本地火花應(yīng)用程序的情況下,如“本地-1433865536131”
在YARN的情況下闯第,像'application_1433865536131_34483'
>>> sc 市栗。applicationId u'local -...'
binaryFiles(path,minPartitions = None?)[source]
注意
試驗
從HDFS咳短,本地文件系統(tǒng)(所有節(jié)點上都可用)或任何Hadoop支持的文件系統(tǒng)URI中讀取二進(jìn)制文件的目錄作為字節(jié)數(shù)組。每個文件都被讀取為單個記錄并以鍵值對返回蛛淋,其中鍵是每個文件的路徑咙好,該值是每個文件的內(nèi)容。
注意
小文件是首選褐荷,大文件也是允許的勾效,但可能會導(dǎo)致性能不佳。
binaryRecords(path,recordLength?)[source]
注意
試驗
如果每個記錄都是一組具有指定數(shù)字格式的數(shù)字(請參閱ByteBuffer)层宫,并且每條記錄的字節(jié)數(shù)是恒定的杨伙,則從平面二進(jìn)制文件加載數(shù)據(jù)。
參數(shù):路徑?- 目錄到輸入數(shù)據(jù)文件
recordLength?- 分割記錄的長度
broadcast(價值)[來源]
向群集廣播一個只讀變量萌腿,返回一個L {Broadcast }對象以便在分布式函數(shù)中讀取它限匣。該變量只會發(fā)送到每個群集一次。
cancelAllJobs()[source]
取消所有已安排或正在運行的作業(yè)毁菱。
cancelJobGroup(groupId?)[source]
取消指定組的活動作業(yè)米死。查看SparkContext.setJobGroup?更多信息。
defaultMinPartitions
當(dāng)用戶未給出Hadoop RDD的默認(rèn)分區(qū)數(shù)時
defaultParallelism
當(dāng)用戶沒有給出默認(rèn)的并行度水平時(例如贮庞,為了減少任務(wù))
dump_profiles(path?)[source]
將配置文件統(tǒng)計信息轉(zhuǎn)儲到目錄路徑中
emptyRDD()[source]
創(chuàng)建一個沒有分區(qū)或元素的RDD峦筒。
getConf()[source]
getLocalProperty(key?)[source]
獲取此線程中設(shè)置的本地屬性,如果缺失則返回null窗慎∥锱纾看到?setLocalProperty
classmethod?getOrCreate(conf = None?)[source]
獲取或?qū)嵗粋€SparkContext并將其注冊為一個單例對象。
參數(shù):conf?- SparkConf(可選)
hadoopFile(path遮斥,inputFormatClass脯丝,keyClass变过,valueClass伏社,keyConverter = None铡羡,valueConverter = None蜕便,conf = None讼渊,batchSize = 0?)[source]
使用HDFS中的任意鍵和值類捶枢,本地文件系統(tǒng)(所有節(jié)點上都可用)或任何Hadoop支持的文件系統(tǒng)URI讀取“舊”Hadoop InputFormat赴魁。該機制與sc.sequenceFile相同规惰。
Hadoop配置可以作為Python字典傳入吝镣。這將轉(zhuǎn)換為Java中的配置堤器。
參數(shù):路徑?- Hadoop文件的路徑
inputFormatClass?- Hadoop InputFormat的完全限定類名(例如“org.apache.hadoop.mapred.TextInputFormat”)
keyClass?- 關(guān)鍵Writable類的完全限定類名(例如“org.apache.hadoop.io.Text”)
valueClass?- 值的完全限定類名可寫類(例如“org.apache.hadoop.io.LongWritable”)
keyConverter?- (默認(rèn)無)
valueConverter?- (默認(rèn)無)
conf?- Hadoop配置,以字典形式傳入(默認(rèn)為無)
batchSize?- 表示為單個Java對象的Python對象的數(shù)量末贾。(默認(rèn)為0闸溃,自動選擇batchSize)
hadoopRDD(inputFormatClass,keyClass拱撵,valueClass辉川,keyConverter = None,valueConverter = None拴测,conf = None乓旗,batchSize = 0?)[source]
從任意Hadoop配置中讀取具有任意鍵和值類的“舊”Hadoop InputFormat,該配置以Python詞典形式傳入集索。這將轉(zhuǎn)換為Java中的配置屿愚。該機制與sc.sequenceFile相同汇跨。
參數(shù):inputFormatClass?- Hadoop InputFormat的完全限定類名(例如“org.apache.hadoop.mapred.TextInputFormat”)
keyClass?- 關(guān)鍵Writable類的完全限定類名(例如“org.apache.hadoop.io.Text”)
valueClass?- 值的完全限定類名可寫類(例如“org.apache.hadoop.io.LongWritable”)
keyConverter?- (默認(rèn)無)
valueConverter?- (默認(rèn)無)
conf?- Hadoop配置,以字典形式傳入(默認(rèn)為無)
batchSize?- 表示為單個Java對象的Python對象的數(shù)量妆距。(默認(rèn)為0穷遂,自動選擇batchSize)
newAPIHadoopFile(path,inputFormatClass娱据,keyClass蚪黑,valueClass,keyConverter = None吸耿,valueConverter = None祠锣,conf = None,batchSize = 0?)[source]
使用HDFS中的任意鍵和值類咽安,本地文件系統(tǒng)(所有節(jié)點都可用)或任何Hadoop支持的文件系統(tǒng)URI讀取“新API”Hadoop InputFormat伴网。該機制與sc.sequenceFile相同。
Hadoop配置可以作為Python字典傳入妆棒。這將轉(zhuǎn)換為Java中的配置
參數(shù):路徑?- Hadoop文件的路徑
inputFormatClass?- Hadoop InputFormat的完全限定類名(例如“org.apache.hadoop.mapreduce.lib.input.TextInputFormat”)
keyClass?- 關(guān)鍵Writable類的完全限定類名(例如“org.apache.hadoop.io.Text”)
valueClass?- 值的完全限定類名可寫類(例如“org.apache.hadoop.io.LongWritable”)
keyConverter?- (默認(rèn)無)
valueConverter?- (默認(rèn)無)
conf?- Hadoop配置澡腾,以字典形式傳入(默認(rèn)為無)
batchSize?- 表示為單個Java對象的Python對象的數(shù)量。(默認(rèn)為0糕珊,自動選擇batchSize)
newAPIHadoopRDD(inputFormatClass动分,keyClass,valueClass红选,keyConverter = None澜公,valueConverter = None,conf = None喇肋,batchSize = 0?)[source]
從任意Hadoop配置中讀取具有任意鍵和值類的'新API'Hadoop InputFormat坟乾,該配置作為Python字典傳入。這將轉(zhuǎn)換為Java中的配置蝶防。該機制與sc.sequenceFile相同甚侣。
參數(shù):inputFormatClass?- Hadoop InputFormat的完全限定類名(例如“org.apache.hadoop.mapreduce.lib.input.TextInputFormat”)
keyClass?- 關(guān)鍵Writable類的完全限定類名(例如“org.apache.hadoop.io.Text”)
valueClass?- 值的完全限定類名可寫類(例如“org.apache.hadoop.io.LongWritable”)
keyConverter?- (默認(rèn)無)
valueConverter?- (默認(rèn)無)
conf?- Hadoop配置,以字典形式傳入(默認(rèn)為無)
batchSize?- 表示為單個Java對象的Python對象的數(shù)量间学。(默認(rèn)為0殷费,自動選擇batchSize)
parallelize(c,numSlices = None?)[source]
分發(fā)本地Python集合以形成RDD低葫。如果輸入表示性能范圍详羡,則建議使用xrange。
>>> sc 氮采。并行化([ 0 殷绍,2 ,3 鹊漠,4 主到,6 ],5 )躯概。glom ()登钥。collect ()[[0],[2]娶靡,[3]牧牢,[4],[6]] >>> sc 姿锭。并行化(x范圍(0 塔鳍,6 ,2 )呻此,5 )轮纫。glom ()。collect ()[[]焚鲜,[0]掌唾,[],[2]忿磅,[4]]
pickleFile(name糯彬,minPartitions = None?)[source]
加載先前使用RDD.saveAsPickleFile方法保存的RDD?。
>>> tmpFile = NamedTemporaryFile (delete = True )>>> tmpFile 葱她。close ()>>> sc 撩扒。并行化(范圍(10 ))。saveAsPickleFile (TMPFILE 吨些。名搓谆,5 )>>> 排序(SC 。pickleFile (TMPFILE 锤灿。名稱挽拔,3 )。收集())[0但校,1螃诅,2,3状囱,4术裸,5,6亭枷,7袭艺,8,9]
range(start叨粘,end = None猾编,step = 1瘤睹,numSlices = None?)[source]
創(chuàng)建一個包含從開始到結(jié)束?(獨占)元素的int的新RDD,逐步增加每個元素答倡『浯可以像python內(nèi)置的range()函數(shù)一樣調(diào)用。如果使用單個參數(shù)調(diào)用瘪撇,參數(shù)被解釋為結(jié)束获茬,并且start被設(shè)置為0。
參數(shù):開始?- 起始值
結(jié)束?- 最終值(獨占)
步?- 增量步(默認(rèn)值:1)
numSlices?- 新RDD的分區(qū)數(shù)量
返回:int的RDD
>>> sc 倔既。范圍(5 )恕曲。collect ()[0,1渤涌,2佩谣,3,4] >>> sc 歼捏。范圍(2 稿存,4 )。collect ()[2瞳秽,3] >>> sc 瓣履。范圍(1 ,7 练俐,2 )袖迎。collect ()[1,3腺晾,5]
runJob(rdd燕锥,partitionFunc,partitions = None悯蝉,allowLocal = False?)[source]
在指定的一組分區(qū)上執(zhí)行給定的partitionFunc归形,并將結(jié)果作為一組元素返回。
如果未指定“分區(qū)”鼻由,則會在所有分區(qū)上運行暇榴。
>>> myRDD = sc 。并行化(范圍(6 )蕉世,3 )>>> sc 蔼紧。runJob (myRDD ,拉姆達(dá)部分:[ X * X 為X 在部分])[0,1狠轻,4奸例,9,16向楼,25]
>>> myRDD = sc 查吊。并行化(范圍(6 )谐区,3 )>>> sc 。runJob (myRDD 菩貌,拉姆達(dá)部分:[ X * X 為X 在部分]卢佣,[ 0 重荠,2 ]箭阶,真)[0,1,16戈鲁,25]
sequenceFile(path仇参,keyClass = None,valueClass = None婆殿,keyConverter = None诈乒,valueConverter = None,minSplits = None婆芦,batchSize = 0?)[source]
用任意鍵和值讀取Hadoop SequenceFile HDFS的可寫類怕磨,本地文件系統(tǒng)(所有節(jié)點都可用)或任何Hadoop支持的文件系統(tǒng)URI。機制如下:
Java RDD由SequenceFile或其他InputFormat以及鍵和值Writable類創(chuàng)建
序列化試圖通過Pyrolite酸洗
如果失敗消约,則回退是對每個鍵和值調(diào)用“toString”
PickleSerializer?用于反序列化Python端的pickle對象
參數(shù):路徑?- sequncefile的路徑
keyClass?- 關(guān)鍵Writable類的完全限定類名(例如“org.apache.hadoop.io.Text”)
valueClass?- 值的完全限定類名可寫類(例如“org.apache.hadoop.io.LongWritable”)
keyConverter?-
valueConverter?-
minSplits?- 數(shù)據(jù)集中的最小分割(默認(rèn)min(2肠鲫,sc.defaultParallelism))
batchSize?- 表示為單個Java對象的Python對象的數(shù)量。(默認(rèn)為0或粮,自動選擇batchSize)
setCheckpointDir(dirName?)[source]
設(shè)置RDD將被檢查點的目錄导饲。如果在群集上運行,該目錄必須是HDFS路徑氯材。
setJobDescription(價值)[來源]
設(shè)置當(dāng)前作業(yè)的人類可讀描述渣锦。
setJobGroup(groupId,description氢哮,interruptOnCancel = False?)[source]
為此線程啟動的所有作業(yè)分配一個組ID袋毙,直到組ID被設(shè)置為不同的值或清除。
通常冗尤,應(yīng)用程序中的執(zhí)行單元由多個Spark操作或作業(yè)組成听盖。應(yīng)用程序員可以使用此方法將所有這些作業(yè)分組在一起并給出組描述。一旦設(shè)置生闲,Spark Web UI將把這些作業(yè)與這個組關(guān)聯(lián)起來媳溺。
該應(yīng)用程序可以SparkContext.cancelJobGroup用來取消該組中的所有正在運行的作業(yè)。
>>> import threading >>> from time import sleep >>> result = “Not Set” >>> lock = 線程碍讯。Lock ()>>> def map_func (x ):... sleep (100 )... raise Exception (“Task should be cancelled” )>>> def start_job (x ):... 全局結(jié)果... 嘗試:sc 悬蔽。setJobGroup (“job_to_cancel” ,“some description” )... result = sc 捉兴。并行化(范圍(x ))蝎困。地圖(map_func )录语。收集()... 除例外為? :... 結(jié)果= “取消” ... 鎖。release ()>>> def stop_job ():... sleep (5)... sc 禾乘。cancelJobGroup (“job_to_cancel” )>>> 剿= 鎖澎埠。acquire ()>>> supress = 線程。線程(target = start_job 始藕,args = (10 蒲稳,))。start ()>>> supress = 線程伍派。線程(target = stop_job )江耀。start ()>>> supress =鎖定。acquire ()>>> print (result )已取消
如果作業(yè)組的interruptOnCancel設(shè)置為true诉植,則作業(yè)取消將導(dǎo)致Thread.interrupt()在作業(yè)的執(zhí)行程序線程上被調(diào)用祥国。這有助于確保實時地停止任務(wù),但由于HDFS-1208的原因晾腔,默認(rèn)情況下會關(guān)閉舌稀,HDFS可能會通過將節(jié)點標(biāo)記為無效來響應(yīng)Thread.interrupt()。
setLocalProperty(key灼擂,value?)[source]
設(shè)置影響從此線程提交的作業(yè)的本地屬性壁查,例如Spark Fair Scheduler池。
setLogLevel(logLevel?)[source]
控制我們的logLevel缤至。這會覆蓋任何用戶定義的日志設(shè)置潮罪。有效的日志級別包括:ALL,DEBUG领斥,ERROR嫉到,F(xiàn)ATAL,INFO月洛,OFF何恶,TRACE,WARN
classmethod?setSystemProperty(key嚼黔,value?)[source]
設(shè)置Java系統(tǒng)屬性细层,如spark.executor.memory。這必須在實例化SparkContext之前調(diào)用唬涧。
show_profiles()[source]
將配置文件統(tǒng)計信息打印到標(biāo)準(zhǔn)輸出
sparkUser()[source]
為運行SparkContext的用戶獲取SPARK_USER疫赎。
startTime
Spark Context開始時返回紀(jì)元時間。
statusTracker()[source]
返回StatusTracker對象
stop()[source]
關(guān)閉SparkContext碎节。
textFile(name捧搞,minPartitions = None,use_unicode = True?)[source]
Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings.
If use_unicode is False, the strings will be kept as?str?(encoding as?utf-8), which is faster and smaller than unicode. (Added in Spark 1.2)
>>> path=os.path.join(tempdir,"sample-text.txt")>>> withopen(path,"w")astestFile:... _=testFile.write("Hello world!")>>> textFile=sc.textFile(path)>>> textFile.collect()[u'Hello world!']
uiWebUrl
Return the URL of the SparkUI instance started by this SparkContext
union(rdds)[source]
Build the union of a list of RDDs.
This supports unions() of RDDs with different serialized formats, although this forces them to be reserialized using the default serializer:
>>> path=os.path.join(tempdir,"union-text.txt")>>> withopen(path,"w")astestFile:... _=testFile.write("Hello")>>> textFile=sc.textFile(path)>>> textFile.collect()[u'Hello']>>> parallelized=sc.parallelize(["World!"])>>> sorted(sc.union([textFile,parallelized]).collect())[u'Hello', 'World!']
version
The version of Spark on which this application is running.
wholeTextFiles(path,?minPartitions=None,?use_unicode=True)[source]
Read a directory of text files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. Each file is read as a single record and returned in a key-value pair, where the key is the path of each file, the value is the content of each file.
If use_unicode is False, the strings will be kept as?str?(encoding as?utf-8), which is faster and smaller than unicode. (Added in Spark 1.2)
For example, if you have the following files:
hdfs://a-hdfs-path/part-00000hdfs://a-hdfs-path/part-00001...hdfs://a-hdfs-path/part-nnnnn
Do?rdd = sparkContext.wholeTextFiles(“hdfs://a-hdfs-path”), then?rdd?contains:
(a-hdfs-path/part-00000,itscontent)(a-hdfs-path/part-00001,itscontent)...(a-hdfs-path/part-nnnnn,itscontent)
Note
Small files are preferred, as each file will be loaded fully in memory.
>>> dirPath=os.path.join(tempdir,"files")>>> os.mkdir(dirPath)>>> withopen(os.path.join(dirPath,"1.txt"),"w")asfile1:... _=file1.write("1")>>> withopen(os.path.join(dirPath,"2.txt"),"w")asfile2:... _=file2.write("2")>>> textFiles=sc.wholeTextFiles(dirPath)>>> sorted(textFiles.collect())[(u'.../1.txt', u'1'), (u'.../2.txt', u'2')]
class?pyspark.SparkFiles[source]
Resolves paths to files added through L{SparkContext.addFile()}.
SparkFiles contains only classmethods; users should not create SparkFiles instances.
classmethod?get(filename)[source]
Get the absolute path of a file added through?SparkContext.addFile().
classmethod?getRootDirectory()[source]
Get the root directory that contains files added through?SparkContext.addFile().
class?pyspark.RDD(jrdd,?ctx,?jrdd_deserializer=AutoBatchedSerializer(PickleSerializer()))[source]
A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel.
aggregate(zeroValue,?seqOp,?combOp)[source]
Aggregate the elements of each partition, and then the results for all the partitions, using a given combine functions and a neutral “zero value.”
The functions?op(t1, t2)?is allowed to modify?t1?and return it as its result value to avoid object allocation; however, it should not modify?t2.
The first function (seqOp) can return a different result type, U, than the type of this RDD. Thus, we need one operation for merging a T into an U and one operation for merging two U
>>> seqOp=(lambdax,y:(x[0]+y,x[1]+1))>>> combOp=(lambdax,y:(x[0]+y[0],x[1]+y[1]))>>> sc.parallelize([1,2,3,4]).aggregate((0,0),seqOp,combOp)(10, 4)>>> sc.parallelize([]).aggregate((0,0),seqOp,combOp)(0, 0)
aggregateByKey(zeroValue,?seqFunc,?combFunc,?numPartitions=None,?partitionFunc=)[source]
Aggregate the values of each key, using given combine functions and a neutral “zero value”. This function can return a different result type, U, than the type of the values in this RDD, V. Thus, we need one operation for merging a V into a U and one operation for merging two U’s, The former operation is used for merging values within a partition, and the latter is used for merging values between partitions. To avoid memory allocation, both of these functions are allowed to modify and return their first argument instead of creating a new U.
cache()[source]
Persist this RDD with the default storage level (MEMORY_ONLY).
cartesian(other)[source]
Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of elements?(a, b)?where?a?is in?self?and?b?is in?other.
>>> rdd=sc.parallelize([1,2])>>> sorted(rdd.cartesian(rdd).collect())[(1, 1), (1, 2), (2, 1), (2, 2)]
checkpoint()[source]
Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint directory set with?SparkContext.setCheckpointDir()?and all references to its parent RDDs will be removed. This function must be called before any job has been executed on this RDD. It is strongly recommended that this RDD is persisted in memory, otherwise saving it on a file will require recomputation.
coalesce(numPartitions,?shuffle=False)[source]
Return a new RDD that is reduced into?numPartitions?partitions.
>>> sc.parallelize([1,2,3,4,5],3).glom().collect()[[1], [2, 3], [4, 5]]>>> sc.parallelize([1,2,3,4,5],3).coalesce(1).glom().collect()[[1, 2, 3, 4, 5]]
cogroup(other,?numPartitions=None)[source]
For each key k in?self?or?other, return a resulting RDD that contains a tuple with the list of values for that key in?self?as well as?other.
>>> x=sc.parallelize([("a",1),("b",4)])>>> y=sc.parallelize([("a",2)])>>> [(x,tuple(map(list,y)))forx,yinsorted(list(x.cogroup(y).collect ()))] [('a',([1]胎撇,[2]))介粘,('b',([4]晚树,[]))]
collect()[source]
返回包含此RDD中所有元素的列表姻采。
注意
因為所有的數(shù)據(jù)都被加載到驅(qū)動程序的內(nèi)存中,所以只有當(dāng)結(jié)果數(shù)組很小時才應(yīng)使用該方法爵憎。
collectAsMap()[source]
將此RDD中的鍵值對作為字典返回給主數(shù)據(jù)庫慨亲。
注意
因為所有的數(shù)據(jù)都加載到驅(qū)動程序的內(nèi)存中,所以只有在結(jié)果數(shù)據(jù)很小時才應(yīng)使用此方法纲堵。
>>> m = sc 巡雨。并行化([(1 ,2 )席函, (3 ,4 )]) 冈涧。collectAsMap ()>>> m [ 1 ] 2 >>> m [ 3 ] 4
combineByKey(createCombiner茂附,mergeValue,mergeCombiners督弓,numPartitions = None营曼,partitionFunc = ?)[source]
通用函數(shù)使用自定義集合函數(shù)集合每個鍵的元素。
對于“組合類型”C愚隧,將RDD [(K蒂阱,V)]轉(zhuǎn)換為RDD [(K,C)]類型的結(jié)果狂塘。
用戶提供三種功能:
createCombiner录煤,它將V變成C(例如,創(chuàng)建一個元素列表)
mergeValue荞胡,將V合并到C中(例如妈踊,將其添加到列表的末尾)
mergeCombiners,將兩個C合并為一個C(例如合并列表)
為了避免內(nèi)存分配泪漂,mergeValue和mergeCombiners都允許修改并返回它們的第一個參數(shù)廊营,而不是創(chuàng)建一個新的C.
另外,用戶可以控制輸出RDD的分區(qū)萝勤。
注意
V和C可以不同 - 例如露筒,可以將類型(Int,Int)的RDD分組為類型(Int敌卓,List [Int])的RDD慎式。
>>> x = sc 。parallelize ([(“a” ,1 )瞬捕,(“b” 帅戒,1 ),(“a” 矢赁,2 )])>>> def to_list (a ):... return [ a ] ... >>> def append (a 哮幢,b ):... a 。追加(b )... 返回一個... >>> def extend(a 扇救,b ):... a 刑枝。延伸(b )... 返回一個... >>> 排序(X 。combineByKey (to_list 迅腔,追加装畅,延伸)。收集())[( 'A'沧烈,[1,2])掠兄,( 'B', [1])]
context
該SparkContextRDD創(chuàng)建于此锌雀。
count()[source]
返回此RDD中的元素數(shù)量蚂夕。
>>> sc.parallelize([2,3,4]).count()3
countApprox(timeout,?confidence=0.95)[source]
Note
Experimental
Approximate version of count() that returns a potentially incomplete result within a timeout, even if not all tasks have finished.
>>> rdd=sc.parallelize(range(1000),10)>>> rdd.countApprox(1000,1.0)1000
countApproxDistinct(relativeSD=0.05)[source]
Note
Experimental
Return approximate number of distinct elements in the RDD.
The algorithm used is based on streamlib’s implementation of?“HyperLogLog in Practice: Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm”, available here.
Parameters:relativeSD?– Relative accuracy. Smaller values create counters that require more space. It must be greater than 0.000017.
>>> n=sc.parallelize(range(1000)).map(str).countApproxDistinct()>>> 900>> n=sc.parallelize([i%20foriinrange(1000)]).countApproxDistinct()>>> 16
countByKey()[source]
Count the number of elements for each key, and return the result to the master as a dictionary.
>>> rdd=sc.parallelize([("a",1),("b",1),("a",1)])>>> sorted(rdd.countByKey().items())[('a', 2), ('b', 1)]
countByValue()[source]
Return the count of each unique value in this RDD as a dictionary of (value, count) pairs.
>>> sorted(sc.parallelize([1,2,1,2,2],2).countByValue().items())[(1, 2), (2, 3)]
distinct(numPartitions=None)[source]
Return a new RDD containing the distinct elements in this RDD.
>>> sorted(sc.parallelize([1,1,2,3]).distinct().collect())[1, 2, 3]
filter(f)[source]
Return a new RDD containing only the elements that satisfy a predicate.
>>> rdd=sc.parallelize([1,2,3,4,5])>>> rdd.filter(lambdax:x%2==0).collect()[2, 4]
first()[source]
Return the first element in this RDD.
>>> sc.parallelize([2,3,4]).first()2>>> sc.parallelize([]).first()Traceback (most recent call last):...ValueError:RDD is empty
flatMap(f,?preservesPartitioning=False)[source]
Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.
>>> rdd=sc.parallelize([2,3,4])>>> sorted(rdd.flatMap(lambdax:range(1,x)).collect())[1, 1, 1, 2, 2, 3]>>> sorted(rdd.flatMap(lambdax:[(x,x),(x,x)]).collect())[(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
flatMapValues(f)[source]
Pass each value in the key-value pair RDD through a flatMap function without changing the keys; this also retains the original RDD’s partitioning.
>>> x=sc.parallelize([("a",["x","y","z"]),("b",["p","r"])])>>> deff(x):returnx>>> x.flatMapValues(f).collect()[('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]
fold(zeroValue,?op)[source]
Aggregate the elements of each partition, and then the results for all the partitions, using a given associative function and a neutral “zero value.”
The function?op(t1, t2)?is allowed to modify?t1?and return it as its result value to avoid object allocation; however, it should not modify?t2.
This behaves somewhat differently from fold operations implemented for non-distributed collections in functional languages like Scala. This fold operation may be applied to partitions individually, and then fold those results into the final result, rather than apply the fold to each element sequentially in some defined ordering. For functions that are not commutative, the result may differ from that of a fold applied to a non-distributed collection.
>>> fromoperatorimportadd>>> sc.parallelize([1,2,3,4,5]).fold(0,add)15
foldByKey(zeroValue,?func,?numPartitions=None,?partitionFunc=)[source]
Merge the values for each key using an associative function “func” and a neutral “zeroValue” which may be added to the result an arbitrary number of times, and must not change the result (e.g., 0 for addition, or 1 for multiplication.).
>>> rdd=sc.parallelize([("a",1),("b",1),("a",1)])>>> fromoperatorimportadd>>> sorted(rdd.foldByKey(0,add).collect())[('a', 2), ('b', 1)]
foreach(f)[source]
Applies a function to all elements of this RDD.
>>> deff(x):print(x)>>> sc.parallelize([1,2,3,4,5]).foreach(f)
foreachPartition(f)[source]
Applies a function to each partition of this RDD.
>>> deff(iterator):... forxiniterator:... print(x)>>> sc.parallelize([1,2,3,4,5]).foreachPartition(f)
fullOuterJoin(other,?numPartitions=None)[source]
Perform a right outer join of?self?and?other.
For each element (k, v) in?self, the resulting RDD will either contain all pairs (k, (v, w)) for w in?other, or the pair (k, (v, None)) if no elements in?otherhave key k.
Similarly, for each element (k, w) in?other, the resulting RDD will either contain all pairs (k, (v, w)) for v in?self, or the pair (k, (None, w)) if no elements in?self?have key k.
Hash-partitions the resulting RDD into the given number of partitions.
>>> x=sc.parallelize([("a",1),("b",4)])>>> y=sc.parallelize([("a",2),("c",8)])>>> sorted(x.fullOuterJoin(y).collect())[('a', (1, 2)), ('b', (4, None)), ('c', (None, 8))]
getCheckpointFile()[source]
Gets the name of the file to which this RDD was checkpointed
Not defined if RDD is checkpointed locally.
getNumPartitions()[source]
Returns the number of partitions in RDD
>>> rdd=sc.parallelize([1,2,3,4],2)>>> rdd.getNumPartitions()2
getStorageLevel()[source]
Get the RDD’s current storage level.
>>> rdd1=sc.parallelize([1,2])>>> rdd1.getStorageLevel()StorageLevel(False, False, False, False, 1)>>> print(rdd1.getStorageLevel())Serialized 1x Replicated
glom()[source]
Return an RDD created by coalescing all elements within each partition into a list.
>>> rdd=sc.parallelize([1,2,3,4],2)>>> sorted(rdd.glom().collect())[[1, 2], [3, 4]]
groupBy(f,?numPartitions=None,?partitionFunc=)[source]
Return an RDD of grouped items.
>>> rdd=sc.parallelize([1,1,2,3,5,8])>>> result=rdd.groupBy(lambdax:x%2).collect()>>> sorted([(x,sorted(y))for(x,y)inresult])[(0, [2, 8]), (1, [1, 1, 3, 5])]
groupByKey(numPartitions=None,?partitionFunc=)[source]
Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with numPartitions partitions.
Note
If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will provide much better performance.
>>> rdd=sc.parallelize([("a",1),("b",1),("a",1)])>>> sorted(rdd.groupByKey().mapValues(len).collect())[('a', 2), ('b', 1)]>>> sorted(rdd.groupByKey().mapValues(list).collect())[('a', [1, 1]), ('b', [1])]
groupWith(other,?*others)[source]
Alias for cogroup but with support for multiple RDDs.
>>> w=sc.parallelize([("a",5),("b",6)])>>> x=sc.parallelize([("a",1),("b",4)])>>> y=sc.parallelize([("a",2)])>>> z=sc.parallelize([("b",42)])>>> [(x,tuple(map(list,y)))forx,yinsorted(list(w.groupWith(x,y,z).collect()))][('a', ([5], [1], [2], [])), ('b', ([6], [4], [], [42]))]
histogram(buckets)[source]
Compute a histogram using the provided buckets. The buckets are all open to the right except for the last which is closed. e.g. [1,10,20,50] means the buckets are [1,10) [10,20) [20,50], which means 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1 and 50 we would have a histogram of 1,0,1.
If your histogram is evenly spaced (e.g. [0, 10, 20, 30]), this can be switched from an O(log n) inseration to O(1) per element (where n is the number of buckets).
Buckets must be sorted, not contain any duplicates, and have at least two elements.
If?buckets?is a number, it will generate buckets which are evenly spaced between the minimum and maximum of the RDD. For example, if the min value is 0 and the max is 100, given?buckets?as 2, the resulting buckets will be [0,50) [50,100].?buckets?must be at least 1. An exception is raised if the RDD contains infinity. If the elements in the RDD do not vary (max == min), a single bucket will be used.
The return value is a tuple of buckets and histogram.
>>> rdd=sc.parallelize(range(51))>>> rdd.histogram(2)([0, 25, 50], [25, 26])>>> rdd.histogram([0,5,25,50])([0, 5, 25, 50], [5, 20, 26])>>> rdd.histogram([0,15,30,45,60])# evenly spaced buckets([0, 15, 30, 45, 60], [15, 15, 15, 6])>>> rdd=sc.parallelize(["ab","ac","b","bd","ef"])>>> rdd.histogram(("a","b","c"))(('a', 'b', 'c'), [2, 2])
id()[source]
A unique ID for this RDD (within its SparkContext).
intersection(other)[source]
Return the intersection of this RDD and another one. The output will not contain any duplicate elements, even if the input RDDs did.
Note
This method performs a shuffle internally.
>>> rdd1=sc.parallelize([1,10,2,3,4,5])>>> rdd2=sc.parallelize([1,6,2,3,7,8])>>> rdd1.intersection(rdd2).collect()[1, 2, 3]
isCheckpointed()[source]
Return whether this RDD is checkpointed and materialized, either reliably or locally.
isEmpty()[source]
Returns true if and only if the RDD contains no elements at all.
Note
an RDD may be empty even when it has at least 1 partition.
>>> sc.parallelize([]).isEmpty()True>>> sc.parallelize([1]).isEmpty()False
isLocallyCheckpointed()[source]
Return whether this RDD is marked for local checkpointing.
Exposed for testing.
join(other,?numPartitions=None)[source]
Return an RDD containing all pairs of elements with matching keys in?self?and?other.
Each pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in?self?and (k, v2) is in?other.
Performs a hash join across the cluster.
>>> x=sc.parallelize([("a",1),("b",4)])>>> y=sc.parallelize([("a",2),("a",3)])>>> sorted(x.join(y).collect())[('a', (1, 2)), ('a', (1, 3))]
keyBy(f)[source]
Creates tuples of the elements in this RDD by applying?f.
>>> x=sc.parallelize(range(0,3)).keyBy(lambdax:x*x)>>> y=sc.parallelize(zip(range(0,5),range(0,5)))>>> [(x,list(map(list,y)))forx,yinsorted(x.cogroup(y).collect())][(0, [[0], [0]]), (1, [[1], [1]]), (2, [[], [2]]), (3, [[], [3]]), (4, [[2], [4]])]
keys()[source]
Return an RDD with the keys of each tuple.
>>> m=sc.parallelize([(1,2),(3,4)]).keys()>>> m.collect()[1, 3]
leftOuterJoin(other,?numPartitions=None)[source]
Perform a left outer join of?self?and?other.
For each element (k, v) in?self, the resulting RDD will either contain all pairs (k, (v, w)) for w in?other, or the pair (k, (v, None)) if no elements in?otherhave key k.
Hash-partitions the resulting RDD into the given number of partitions.
>>> x=sc.parallelize([("a",1),("b",4)])>>> y=sc.parallelize([("a",2)])>>> sorted(x.leftOuterJoin(y).collect())[('a', (1, 2)), ('b', (4, None))]
localCheckpoint()[source]
Mark this RDD for local checkpointing using Spark’s existing caching layer.
This method is for users who wish to truncate RDD lineages while skipping the expensive step of replicating the materialized data in a reliable distributed file system. This is useful for RDDs with long lineages that need to be truncated periodically (e.g. GraphX).
Local checkpointing sacrifices fault-tolerance for performance. In particular, checkpointed data is written to ephemeral local storage in the executors instead of to a reliable, fault-tolerant storage. The effect is that if an executor fails during the computation, the checkpointed data may no longer be accessible, causing an irrecoverable job failure.
This is NOT safe to use with dynamic allocation, which removes executors along with their cached blocks. If you must use both features, you are advised to set?spark.dynamicAllocation.cachedExecutorIdleTimeout?to a high value.
The checkpoint directory set through?SparkContext.setCheckpointDir()?is not used.
lookup(key)[source]
Return the list of values in the RDD for key?key. This operation is done efficiently if the RDD has a known partitioner by only searching the partition that the key maps to.
>>> l=range(1000)>>> rdd=sc.parallelize(zip(l,l),10)>>> rdd.lookup(42)# slow[42]>>> sorted=rdd.sortByKey()>>> sorted.lookup(42)# fast[42]>>> sorted.lookup(1024)[]>>> rdd2=sc.parallelize([(('a','b'),'c')]).groupByKey()>>> list(rdd2.lookup(('a','b'))[0])['c']
map(f,?preservesPartitioning=False)[source]
Return a new RDD by applying a function to each element of this RDD.
>>> rdd=sc.parallelize(["b","a","c"])>>> sorted(rdd.map(lambdax:(x,1)).collect())[('a', 1), ('b', 1), ('c', 1)]
mapPartitions(f,?preservesPartitioning=False)[source]
Return a new RDD by applying a function to each partition of this RDD.
>>> rdd=sc.parallelize([1,2,3,4],2)>>> deff(iterator):yieldsum(iterator)>>> rdd.mapPartitions(f).collect()[3, 7]
mapPartitionsWithIndex(f,?preservesPartitioning=False)[source]
Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.
>>> rdd=sc.parallelize([1,2,3,4],4)>>> deff(splitIndex,iterator):yieldsplitIndex>>> rdd.mapPartitionsWithIndex(f).sum()6
mapPartitionsWithSplit(f,?preservesPartitioning=False)[source]
Deprecated: use mapPartitionsWithIndex instead.
Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.
>>> rdd=sc.parallelize([1,2,3,4],4)>>> deff(splitIndex,iterator):yieldsplitIndex>>> rdd.mapPartitionsWithSplit(f).sum()6
mapValues(f)[source]
Pass each value in the key-value pair RDD through a map function without changing the keys; this also retains the original RDD’s partitioning.
>>> x=sc.parallelize([("a",["apple","banana","lemon"]),("b",["grapes"])])>>> deff(x):returnlen(x)>>> x.mapValues(f).collect()[('a', 3), ('b', 1)]
max(key=None)[source]
Find the maximum item in this RDD.
Parameters:key?– A function used to generate key for comparing
>>> rdd=sc.parallelize([1.0,5.0,43.0,10.0])>>> rdd.max()43.0>>> rdd.max(key=str)5.0
mean()[source]
Compute the mean of this RDD’s elements.
>>> sc.parallelize([1,2,3]).mean()2.0
meanApprox(timeout,?confidence=0.95)[source]
Note
Experimental
Approximate operation to return the mean within a timeout or meet the confidence.
>>> rdd=sc.parallelize(range(1000),10)>>> r=sum(range(1000))/1000.0>>> abs(rdd.meanApprox(1000)-r)/r<0.05True
min(key=None)[source]
Find the minimum item in this RDD.
Parameters:key?– A function used to generate key for comparing
>>> rdd=sc.parallelize([2.0,5.0,43.0,10.0])>>> rdd.min()2.0>>> rdd.min(key=str)10.0
name()[source]
Return the name of this RDD.
partitionBy(numPartitions,?partitionFunc=)[source]
Return a copy of the RDD partitioned using the specified partitioner.
>>> pairs=sc.parallelize([1,2,3,4,2,4,1]).map(lambdax:(x,x))>>> sets=pairs.partitionBy(2).glom().collect()>>> len(set(sets[0]).intersection(set(sets[1])))0
persist(storageLevel=StorageLevel(False,?True,?False,?False,?1))[source]
Set this RDD’s storage level to persist its values across operations after the first time it is computed. This can only be used to assign a new storage level if the RDD does not have a storage level set yet. If no storage level is specified defaults to (MEMORY_ONLY).
>>> rdd=sc.parallelize(["b","a","c"])>>> rdd.persist().is_cachedTrue
pipe(command,?env=None,?checkCode=False)[source]
Return an RDD created by piping elements to a forked external process.
>>> sc.parallelize(['1','2','','3']).pipe('cat').collect()[u'1', u'2', u'', u'3']
Parameters:checkCode?– whether or not to check the return value of the shell command.
randomSplit(weights,?seed=None)[source]
Randomly splits this RDD with the provided weights.
Parameters:weights?– weights for splits, will be normalized if they don’t sum to 1
seed?– random seed
Returns:split RDDs in a list
>>> rdd=sc.parallelize(range(500),1)>>> rdd1,rdd2=rdd.randomSplit([2,3],17)>>> len(rdd1.collect()+rdd2.collect())500>>> 150>> 250
reduce(f)[source]
Reduces the elements of this RDD using the specified commutative and associative binary operator. Currently reduces partitions locally.
>>> fromoperatorimportadd>>> sc.parallelize([1,2,3,4,5]).reduce(add)15>>> sc.parallelize((2for_inrange(10))).map(lambdax:1).cache().reduce(add)10>>> sc.parallelize([]).reduce(add)Traceback (most recent call last):...ValueError:Can not reduce() empty RDD
reduceByKey(func,?numPartitions=None,?partitionFunc=)[source]
Merge the values for each key using an associative and commutative reduce function.
This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce.
Output will be partitioned with?numPartitions?partitions, or the default parallelism level if?numPartitions?is not specified. Default partitioner is hash-partition.
>>> fromoperatorimportadd>>> rdd=sc.parallelize([("a",1),("b",1),("a",1)])>>> sorted(rdd.reduceByKey(add).collect())[('a', 2), ('b', 1)]
reduceByKeyLocally(func)[source]
Merge the values for each key using an associative and commutative reduce function, but return the results immediately to the master as a dictionary.
This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce.
>>> fromoperatorimportadd>>> rdd=sc.parallelize([("a",1),("b",1),("a",1)])>>> sorted(rdd.reduceByKeyLocally(add).items())[('a', 2), ('b', 1)]
repartition(numPartitions)[source]
Return a new RDD that has exactly numPartitions partitions.
Can increase or decrease the level of parallelism in this RDD. Internally, this uses a shuffle to redistribute data. If you are decreasing the number of partitions in this RDD, consider using?coalesce, which can avoid performing a shuffle.
>>> rdd=sc.parallelize([1,2,3,4,5,6,7],4)>>> sorted(rdd.glom().collect())[[1], [2, 3], [4, 5], [6, 7]]>>> len(rdd.repartition(2).glom().collect())2>>> len(rdd.repartition(10).glom().collect())10
repartitionAndSortWithinPartitions(numPartitions=None,?partitionFunc=,?ascending=True,?keyfunc=>)[source]
Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys.
>>> rdd=sc.parallelize([(0,5),(3,8),(2,6),(0,8),(3,8),(1,3)])>>> rdd2=rdd.repartitionAndSortWithinPartitions(2,lambdax:x%2,True)>>> rdd2.glom().collect()[[(0, 5), (0, 8), (2, 6)], [(1, 3), (3, 8), (3, 8)]]
rightOuterJoin(other,?numPartitions=None)[source]
Perform a right outer join of?self?and?other.
For each element (k, w) in?other, the resulting RDD will either contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w)) if no elements in?selfhave key k.
Hash-partitions the resulting RDD into the given number of partitions.
>>> x=sc.parallelize([("a",1),("b",4)])>>> y=sc.parallelize([("a",2)])>>> sorted(y.rightOuterJoin(x).collect())[('a', (2, 1)), ('b', (None, 4))]
sample(withReplacement,?fraction,?seed=None)[source]
Return a sampled subset of this RDD.
Parameters:withReplacement?– can elements be sampled multiple times (replaced when sampled out)
fraction?– expected size of the sample as a fraction of this RDD’s size without replacement: probability that each element is chosen; fraction must be [0, 1] with replacement: expected number of times each element is chosen; fraction must be >= 0
seed?– seed for the random number generator
Note
This is not guaranteed to provide exactly the fraction specified of the total count of the given?DataFrame.
>>> rdd=sc.parallelize(range(100),4)>>> 6<=rdd.sample(False,0.1,81).count()<=14True
sampleByKey(withReplacement,?fractions,?seed=None)[source]
Return a subset of this RDD sampled by key (via stratified sampling). Create a sample of this RDD using variable sampling rates for different keys as specified by fractions, a key to sampling rate map.
>>> fractions={"a":0.2,"b":0.1}>>> rdd=sc.parallelize(fractions.keys()).cartesian(sc.parallelize(range(0,1000)))>>> sample=dict(rdd.sampleByKey(False,fractions,2).groupByKey().collect())>>> 100>> max(sample["a"])<=999andmin(sample["a"])>=0True>>> max(sample["b"])<=999andmin(sample["b"])>=0True
sampleStdev()[source]
Compute the sample standard deviation of this RDD’s elements (which corrects for bias in estimating the standard deviation by dividing by N-1 instead of N).
>>> sc.parallelize([1,2,3]).sampleStdev()1.0
sampleVariance()[source]
Compute the sample variance of this RDD’s elements (which corrects for bias in estimating the variance by dividing by N-1 instead of N).
>>> sc 。并行化([ 1 腋逆,2 婿牍,3 ]) 。sampleVariance ()1.0
saveAsHadoopDataset(conf惩歉,keyConverter = None等脂,valueConverter = None?)[source]
使用舊的Hadoop OutputFormat API(mapred包),將鍵值對(形式為RDD [(K撑蚌,V)])的Python RDD輸出到任何Hadoop文件系統(tǒng)上遥。鍵/值將使用用戶指定的轉(zhuǎn)換器或默認(rèn)情況下轉(zhuǎn)換為輸出?org.apache.spark.api.python.JavaToWritableConverter。
參數(shù):conf?- Hadoop作業(yè)配置锨并,作為字典傳入
keyConverter?- (默認(rèn)無)
valueConverter?- (默認(rèn)無)
saveAsHadoopFile(path露该,outputFormatClass,keyClass = None第煮,valueClass = None解幼,keyConverter = None,valueConverter = None包警,conf = None撵摆,compressionCodecClass = None?)[source]
使用舊的Hadoop OutputFormat API(mapred包),將鍵值對(形式為RDD [(K害晦,V)])的Python RDD輸出到任何Hadoop文件系統(tǒng)特铝。如果未指定暑中,則會推斷鍵和值類型。使用用戶指定的轉(zhuǎn)換器或鍵鲫剿,將鍵和值轉(zhuǎn)換為輸出org.apache.spark.api.python.JavaToWritableConverter鳄逾。將?conf其應(yīng)用于與此RDD的SparkContext關(guān)聯(lián)的基本Hadoop conf之上,以創(chuàng)建用于保存數(shù)據(jù)的合并Hadoop MapReduce作業(yè)配置灵莲。
參數(shù):路徑?- Hadoop文件的路徑
outputFormatClass?- Hadoop OutputFormat的完全限定類名(例如“org.apache.hadoop.mapred.SequenceFileOutputFormat”)
keyClass?- 關(guān)鍵Writable類的完全限定類名(例如“org.apache.hadoop.io.IntWritable”雕凹,默認(rèn)為None)
valueClass?- 值的完全限定類名可寫類(例如“org.apache.hadoop.io.Text”,默認(rèn)為None)
keyConverter?- (默認(rèn)無)
valueConverter?- (默認(rèn)無)
conf?- (默認(rèn)沒有)
compressionCodecClass?- (默認(rèn)無)
saveAsNewAPIHadoopDataset(conf政冻,keyConverter = None枚抵,valueConverter = None?)[source]
使用新的Hadoop OutputFormat API(mapreduce包),將鍵值對(形式為RDD [(K明场,V)]的Python RDD輸出到任何Hadoop文件系統(tǒng)汽摹。鍵/值將使用用戶指定的轉(zhuǎn)換器或默認(rèn)情況下轉(zhuǎn)換為輸出?org.apache.spark.api.python.JavaToWritableConverter。
參數(shù):conf?- Hadoop作業(yè)配置苦锨,作為字典傳入
keyConverter?- (默認(rèn)無)
valueConverter?- (默認(rèn)無)
saveAsNewAPIHadoopFile(path逼泣,outputFormatClass,keyClass = None逆屡,valueClass = None圾旨,keyConverter = None,valueConverter = None魏蔗,conf = None?)[source]
使用新的Hadoop OutputFormat API(mapreduce包),將鍵值對(形式為RDD [(K痹筛,V)]的Python RDD輸出到任何Hadoop文件系統(tǒng)莺治。如果未指定,則會推斷鍵和值類型帚稠。使用用戶指定的轉(zhuǎn)換器或鍵谣旁,將鍵和值轉(zhuǎn)換為輸出org.apache.spark.api.python.JavaToWritableConverter。將?conf其應(yīng)用于與此RDD的SparkContext關(guān)聯(lián)的基本Hadoop conf之上滋早,以創(chuàng)建用于保存數(shù)據(jù)的合并Hadoop MapReduce作業(yè)配置榄审。
參數(shù):路徑?- Hadoop文件的路徑
outputFormatClass?- Hadoop OutputFormat的完全限定類名(例如“org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat”)
keyClass?- 關(guān)鍵Writable類的完全限定類名(例如“org.apache.hadoop.io.IntWritable”,默認(rèn)為None)
valueClass?- 值的完全限定類名可寫類(例如“org.apache.hadoop.io.Text”杆麸,默認(rèn)為None)
keyConverter?- (默認(rèn)無)
valueConverter?- (默認(rèn)無)
conf?- Hadoop作業(yè)配置搁进,以字典形式傳入(默認(rèn)為無)
saveAsPickleFile(path,batchSize = 10?)[source]
將此RDD保存為序列化對象的SequenceFile昔头。使用的序列化程序是pyspark.serializers.PickleSerializer饼问,默認(rèn)批量大小為10。
>>> tmpFile=NamedTemporaryFile(delete=True)>>> tmpFile.close()>>> sc.parallelize([1,2,'spark','rdd']).saveAsPickleFile(tmpFile.name,3)>>> sorted(sc.pickleFile(tmpFile.name,5).map(str).collect())['1', '2', 'rdd', 'spark']
saveAsSequenceFile(path,?compressionCodecClass=None)[source]
Output a Python RDD of key-value pairs (of form?RDD[(K, V)]) to any Hadoop file system, using the?org.apache.hadoop.io.Writable?types that we convert from the RDD’s key and value types. The mechanism is as follows:
Pyrolite is used to convert pickled Python RDD into RDD of Java objects.
Keys and values of this Java RDD are converted to Writables and written out.
Parameters:path?– path to sequence file
compressionCodecClass?– (None by default)
saveAsTextFile(path,?compressionCodecClass=None)[source]
Save this RDD as a text file, using string representations of elements.
Parameters:path?– path to text file
compressionCodecClass?– (None by default) string i.e. “org.apache.hadoop.io.compress.GzipCodec”
>>> tempFile=NamedTemporaryFile(delete=True)>>> tempFile.close()>>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name)>>> fromfileinputimportinput>>> fromglobimportglob>>> ''.join(sorted(input(glob(tempFile.name+"/part-0000*"))))'0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n'
Empty lines are tolerated when saving to text files.
>>> tempFile2=NamedTemporaryFile(delete=True)>>> tempFile2.close()>>> sc.parallelize(['','foo','','bar','']).saveAsTextFile(tempFile2.name)>>> ''.join(sorted(input(glob(tempFile2.name+"/part-0000*"))))'\n\n\nbar\nfoo\n'
Using compressionCodecClass
>>> tempFile3=NamedTemporaryFile(delete=True)>>> tempFile3.close()>>> codec="org.apache.hadoop.io.compress.GzipCodec">>> sc.parallelize(['foo','bar']).saveAsTextFile(tempFile3.name,codec)>>> fromfileinputimportinput,hook_compressed>>> result=sorted(input(glob(tempFile3.name+"/part*.gz"),openhook=hook_compressed))>>> b''.join(result).decode('utf-8')u'bar\nfoo\n'
setName(name)[source]
Assign a name to this RDD.
>>> rdd1=sc.parallelize([1,2])>>> rdd1.setName('RDD1').name()u'RDD1'
sortBy(keyfunc,?ascending=True,?numPartitions=None)[source]
Sorts this RDD by the given keyfunc
>>> tmp=[('a',1),('b',2),('1',3),('d',4),('2',5)]>>> sc.parallelize(tmp).sortBy(lambdax:x[0]).collect()[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]>>> sc.parallelize(tmp).sortBy(lambdax:x[1]).collect()[('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
sortByKey(ascending=True,?numPartitions=None,?keyfunc=>)[source]
Sorts this RDD, which is assumed to consist of (key, value) pairs.
>>> tmp=[('a',1),('b',2),('1',3),('d',4),('2',5)]>>> sc.parallelize(tmp).sortByKey().first()('1', 3)>>> sc.parallelize(tmp).sortByKey(True,1).collect()[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]>>> sc.parallelize(tmp).sortByKey(True,2).collect()[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]>>> tmp2=[('Mary',1),('had',2),('a',3),('little',4),('lamb',5)]>>> tmp2.extend([('whose',6),('fleece',7),('was',8),('white',9)])>>> sc.parallelize(tmp2).sortByKey(True,3,keyfunc=lambdak:k.lower()).collect()[('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5),...('white', 9), ('whose', 6)]
stats()[source]
Return a?StatCounter?object that captures the mean, variance and count of the RDD’s elements in one operation.
stdev()[source]
Compute the standard deviation of this RDD’s elements.
>>> sc.parallelize([1,2,3]).stdev()0.816...
subtract(other,?numPartitions=None)[source]
Return each value in?self?that is not contained in?other.
>>> x=sc.parallelize([("a",1),("b",4),("b",5),("a",3)])>>> y=sc.parallelize([("a",3),("c",None)])>>> sorted(x.subtract(y).collect())[('a', 1), ('b', 4), ('b', 5)]
subtractByKey(other,?numPartitions=None)[source]
Return each (key, value) pair in?self?that has no pair with matching key in?other.
>>> x=sc.parallelize([("a",1),("b",4),("b",5),("a",2)])>>> y=sc.parallelize([("a",3),("c",None)])>>> sorted(x.subtractByKey(y).collect())[('b', 4), ('b', 5)]
sum()[source]
Add up the elements in this RDD.
>>> sc.parallelize([1.0,2.0,3.0]).sum()6.0
sumApprox(timeout,?confidence=0.95)[source]
Note
Experimental
Approximate operation to return the sum within a timeout or meet the confidence.
>>> rdd=sc.parallelize(range(1000),10)>>> r=sum(range(1000))>>> abs(rdd.sumApprox(1000)-r)/r<0.05True
take(num)[source]
Take the first num elements of the RDD.
It works by first scanning one partition, and use the results from that partition to estimate the number of additional partitions needed to satisfy the limit.
Translated from the Scala implementation in RDD#take().
Note
this method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory.
>>> sc.parallelize([2,3,4,5,6]).cache().take(2)[2, 3]>>> sc.parallelize([2,3,4,5,6]).take(10)[2, 3, 4, 5, 6]>>> sc.parallelize(range(100),100).filter(lambdax:x>90).take(3)[91, 92, 93]
takeOrdered(num,?key=None)[source]
Get the N elements from an RDD ordered in ascending order or as specified by the optional key function.
Note
this method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory.
>>> sc.parallelize([10,1,2,9,3,4,5,6,7]).takeOrdered(6)[1, 2, 3, 4, 5, 6]>>> sc.parallelize([10,1,2,9,3,4,5,6,7],2).takeOrdered(6,key=lambdax:-x)[10, 9, 7, 6, 5, 4]
takeSample(withReplacement,?num,?seed=None)[source]
Return a fixed-size sampled subset of this RDD.
Note
This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory.
>>> rdd=sc.parallelize(range(0,10))>>> len(rdd.takeSample(True,20,1))20>>> len(rdd.takeSample(False,5,2))5>>> len(rdd.takeSample(False,15,3))10
toDebugString()[source]
A description of this RDD and its recursive dependencies for debugging.
toLocalIterator()[source]
Return an iterator that contains all of the elements in this RDD. The iterator will consume as much memory as the largest partition in this RDD.
>>> rdd=sc.parallelize(range(10))>>> [xforxinrdd.toLocalIterator()][0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
top(num,?key=None)[source]
Get the top N elements from an RDD.
Note
This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory.
Note
It returns the list sorted in descending order.
>>> sc.parallelize([10,4,2,12,3]).top(1)[12]>>> sc.parallelize([2,3,4,5,6],2).top(2)[6, 5]>>> sc.parallelize([10,4,2,12,3]).top(3,key=str)[4, 3, 2]
treeAggregate(zeroValue,?seqOp,?combOp,?depth=2)[source]
Aggregates the elements of this RDD in a multi-level tree pattern.
Parameters:depth?– suggested depth of the tree (default: 2)
>>> add=lambdax,y:x+y>>> rdd=sc.parallelize([-5,-4,-3,-2,-1,1,2,3,4],10)>>> rdd.treeAggregate(0,add,add)-5>>> rdd.treeAggregate(0,add,add,1)-5>>> rdd.treeAggregate(0,add,add,2)-5>>> rdd.treeAggregate(0,add,add,5)-5>>> rdd.treeAggregate(0,add,add,10)-5
treeReduce(f,?depth=2)[source]
Reduces the elements of this RDD in a multi-level tree pattern.
Parameters:depth?– suggested depth of the tree (default: 2)
>>> add=lambdax,y:x+y>>> rdd=sc.parallelize([-5,-4,-3,-2,-1,1,2,3,4],10)>>> rdd.treeReduce(add)-5>>> rdd.treeReduce(add,1)-5>>> rdd.treeReduce(add,2)-5>>> rdd.treeReduce(add,5)-5>>> rdd.treeReduce(add,10)-5
union(other)[source]
Return the union of this RDD and another one.
>>> rdd=sc.parallelize([1,1,2,3])>>> rdd.union(rdd).collect()[1, 1, 2, 3, 1, 1, 2, 3]
unpersist()[source]
Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
values()[source]
Return an RDD with the values of each tuple.
>>> m=sc.parallelize([(1,2),(3,4)]).values()>>> m.collect()[2, 4]
variance()[source]
Compute the variance of this RDD’s elements.
>>> sc.parallelize([1,2,3]).variance()0.666...
zip(other)[source]
Zips this RDD with another one, returning key-value pairs with the first element in each RDD second element in each RDD, etc. Assumes that the two RDDs have the same number of partitions and the same number of elements in each partition (e.g. one was made through a map on the other).
>>> x = sc 揭斧。并行化(范圍(0 莱革,5 ))>>> y = SC 。并行化(范圍(1000 ,1005 ))>>> X 盅视。zip (y )捐名。collect ()[(0,1000),(1闹击,1001)镶蹋,(2,1002)拇砰,(3梅忌,1003),(4除破,1004)]
zipWithIndex()[source]
將此RDD與其元素索引一起拉伸牧氮。
排序首先基于分區(qū)索引,然后是每個分區(qū)內(nèi)項目的排序瑰枫。因此踱葛,第一個分區(qū)中的第一個項目獲取索引0,最后一個分區(qū)中的最后一個項目接收最大的索引光坝。
當(dāng)此RDD包含多個分區(qū)時尸诽,此方法需要觸發(fā)Spark任務(wù)。
>>> sc 盯另。并行化([ “a” 性含,“b” ,“c” 鸳惯,“d” ]商蕴,3 )。zipWithIndex ()芝发。collect ()[('a'绪商,0),('b'辅鲸,1)格郁,('c',2)独悴,('d'例书,3)]
zipWithUniqueId()[source]
使用生成的獨特Long ID對此RDD進(jìn)行壓縮。
第k個分區(qū)中的項目將獲得ids k绵患,n + k雾叭,2 * n + k,...落蝙,其中n是分區(qū)數(shù)织狐。所以可能存在差距暂幼,但這種方法不會引發(fā)火花的工作,這是不同的zipWithIndex
>>> sc 移迫。并行化([ “a” 旺嬉,“b” ,“c” 厨埋,“d” 邪媳,“e” ],3 )荡陷。zipWithUniqueId ()雨效。collect ()[('a',0)废赞,('b'徽龟,1),('c'唉地,4)据悔,('d',2)耘沼,('e'极颓,5)]
類pyspark.StorageLevel(useDisk,useMemory群嗤,useOffHeap菠隆,反序列化,復(fù)制= 1?)[source]
用于控制RDD存儲的標(biāo)志狂秘。每個StorageLevel記錄是否使用內(nèi)存浸赫,是否將RDD丟棄到內(nèi)存不足,是否將數(shù)據(jù)保存在特定于JAVA的序列化格式的內(nèi)存中赃绊,以及是否在多個節(jié)點上復(fù)制RDD分區(qū)。還包含一些常用存儲級別MEMORY_ONLY的靜態(tài)常量羡榴。由于數(shù)據(jù)總是在Python端序列化碧查,所有常量都使用序列化格式。
DISK_ONLY= StorageLevel(True校仑,F(xiàn)alse忠售,F(xiàn)alse,F(xiàn)alse迄沫,1)
DISK_ONLY_2= StorageLevel(True稻扬,F(xiàn)alse,F(xiàn)alse羊瘩,F(xiàn)alse泰佳,2)
MEMORY_AND_DISK= StorageLevel(True盼砍,True,F(xiàn)alse逝她,F(xiàn)alse浇坐,1)
MEMORY_AND_DISK_2= StorageLevel(True,True黔宛,F(xiàn)alse近刘,F(xiàn)alse,2)
MEMORY_AND_DISK_SER= StorageLevel(True臀晃,True觉渴,F(xiàn)alse,F(xiàn)alse徽惋,1)
MEMORY_AND_DISK_SER_2= StorageLevel(True案淋,True,F(xiàn)alse寂曹,F(xiàn)alse哎迄,2)
MEMORY_ONLY= StorageLevel(False,True隆圆,F(xiàn)alse漱挚,F(xiàn)alse,1)
MEMORY_ONLY_2= StorageLevel(False渺氧,True旨涝,F(xiàn)alse,F(xiàn)alse侣背,2)
MEMORY_ONLY_SER= StorageLevel(False白华,True,F(xiàn)alse贩耐,F(xiàn)alse弧腥,1)
MEMORY_ONLY_SER_2= StorageLevel(False,True潮太,F(xiàn)alse管搪,F(xiàn)alse,2)
OFF_HEAP= StorageLevel(True铡买,True更鲁,True,F(xiàn)alse奇钞,1)
class?pyspark.Broadcast(sc = None澡为,value = None,pickle_registry = None景埃,path = None?)[source]
使用創(chuàng)建的廣播變量SparkContext.broadcast()媒至。通過訪問它的價值value顶别。
例子:
>>> from pyspark.context import SparkContext >>> sc = SparkContext ('local' ,'test' )>>> b = sc 塘慕。廣播([ 1 筋夏,2 ,3 图呢,4 条篷,5 ])>>> b 。值[1蛤织,2赴叹,3,4指蚜,5] >>> sc 乞巧。并行化([ 0 ,0 ]) 摊鸡。flatMap (lambda x :b绽媒。值)。collect ()[1免猾,2是辕,3,4猎提,5获三,1,2锨苏,3疙教,4,5] >>> b 伞租。unpersist ()
>>> large_broadcast = sc 贞谓。廣播(范圍(10000 ))
destroy()[source]
銷毀與此廣播變量相關(guān)的所有數(shù)據(jù)和元數(shù)據(jù)。謹(jǐn)慎使用這個;?一旦廣播變量被銷毀葵诈,它不能再被使用经宏。此方法阻止直到銷毀完成。
dump(value驯击,f?)[source]
load(path?)[source]
unpersist(blocking = False?)[源代碼]
在執(zhí)行者上刪除此廣播的緩存副本。如果廣播在調(diào)用之后使用耐亏,則需要將其重新發(fā)送給每個執(zhí)行者徊都。
參數(shù):阻止?- 是否阻止,直到未完成
value
返回廣播的值
class?pyspark.Accumulator(aid广辰,value暇矫,accum_param?)[source]
可以累積的共享變量主之,即具有可交換和關(guān)聯(lián)的“添加”操作。Spark集群上的工作器任務(wù)可以使用+ =?運算符將值添加到累加器李根,但只有驅(qū)動程序可以使用訪問其值value槽奕。工人的更新會自動傳播到驅(qū)動程序。
雖然SparkContext載體對累加器基本數(shù)據(jù)類型房轿,如int和?float粤攒,用戶還可以通過提供一種定制定義的自定義類型累加器?AccumulatorParam對象。舉例來說囱持,參考這個模塊的doctest夯接。
add(term?)[source]
為此累加器的值添加一個術(shù)語
value
獲取累加器的值;?只能在驅(qū)動程序中使用
class?pyspark.AccumulatorParam[source]
Helper對象,用于定義如何累積給定類型的值纷妆。
addInPlace(value1盔几,value2?)[source]
添加累加器數(shù)據(jù)類型的兩個值,返回一個新值;?為了效率掩幢,也可以更新value1并返回它逊拍。
zero(價值)[來源]
為該類型提供“零值”,與所提供的尺寸兼容value(例如际邻,零向量)
class?pyspark.MarshalSerializer[source]
使用Python的Marshal序列化器序列化對象:
http://docs.python.org/2/library/marshal.html
該序列化程序比PickleSerializer更快芯丧,但支持更少的數(shù)據(jù)類型。
dumps(obj?)[source]
loads(obj?)[source]
class?pyspark.PickleSerializer[source]
使用Python的pickle序列化器序列化對象:
http://docs.python.org/2/library/pickle.html
這個序列化程序幾乎支持任何Python對象枯怖,但可能不如更專用的序列化程序那么快注整。
dumps(obj?)[source]
loads(obj,encoding = None?)[source]
class?pyspark.StatusTracker(jtracker?)[source]
Low-level status reporting APIs for monitoring job and stage progress.
These APIs intentionally provide very weak consistency semantics; consumers of these APIs should be prepared to handle empty / missing information. For example, a job’s stage ids may be known but the status API may not have any information about the details of those stages, so?getStageInfo?could potentially return?None?for a valid stage id.
To limit memory usage, these APIs only provide information on recent jobs / stages. These APIs will provide information for the lastspark.ui.retainedStages?stages and?spark.ui.retainedJobs?jobs.
getActiveJobsIds()[source]
Returns an array containing the ids of all active jobs.
getActiveStageIds()[source]
Returns an array containing the ids of all active stages.
getJobIdsForGroup(jobGroup=None)[source]
Return a list of all known jobs in a particular job group. If?jobGroup?is None, then returns all known jobs that are not associated with a job group.
The returned list may contain running, failed, and completed jobs, and may vary across invocations of this method. This method does not guarantee the order of the elements in its result.
getJobInfo(jobId)[source]
Returns a?SparkJobInfo?object, or None if the job info could not be found or was garbage collected.
getStageInfo(stageId)[source]
Returns a?SparkStageInfo?object, or None if the stage info could not be found or was garbage collected.
class?pyspark.SparkJobInfo[source]
Exposes information about Spark Jobs.
class?pyspark.SparkStageInfo[source]
Exposes information about Spark Stages.
class?pyspark.Profiler(ctx)[source]
Note
DeveloperApi
PySpark supports custom profilers, this is to allow for different profilers to be used as well as outputting to different formats than what is provided in the BasicProfiler.
A custom profiler has to define or inherit the following methods:
profile - will produce a system profile of some sort. stats - return the collected stats. dump - dumps the profiles to a path add - adds a profile to the existing accumulated profile
The profiler class is chosen when creating a SparkContext
>>> frompysparkimportSparkConf,SparkContext>>> frompysparkimportBasicProfiler>>> classMyCustomProfiler(BasicProfiler):... defshow(self,id):... print("My custom profiles for RDD:%s"%id)...>>> conf=SparkConf().set("spark.python.profile","true")>>> sc=SparkContext('local','test',conf=conf,profiler_cls=MyCustomProfiler)>>> sc.parallelize(range(1000)).map(lambdax:2*x).take(10)[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]>>> sc.parallelize(range(1000))度硝。count ()1000 >>> sc 肿轨。show_profiles ()我的RDD自定義配置文件:1 我的RDD自定義配置文件:3 >>> sc 。停止()
dump(id蕊程,path?)[source]
將配置文件轉(zhuǎn)儲到路徑中椒袍,id是RDD標(biāo)識
profile(func?)[source]
分析功能func
show(id?)[source]
將配置文件統(tǒng)計信息打印到標(biāo)準(zhǔn)輸出,id是RDD標(biāo)識
stats()[source]
返回收集的性能分析統(tǒng)計信息(pstats.Stats)
class?pyspark.BasicProfiler(ctx?)[source]
BasicProfiler是默認(rèn)的分析器藻茂,它是基于cProfile和Accumulator實現(xiàn)的
profile(func?)[source]
運行并分析傳入的方法to_profile驹暑。返回配置文件對象。
stats()[source]
class?pyspark.TaskContext[source]
注意
試驗
關(guān)于可在執(zhí)行期間讀取或變更的任務(wù)的上下文信息辨赐。要訪問正在運行的任務(wù)的TaskContext优俘,請使用:?TaskContext.get()。
attemptNumber()[source]
“這個任務(wù)嘗試了多少次掀序。第一個任務(wù)嘗試將被分配attemptNumber = 0帆焕,并且隨后的嘗試將具有增加的嘗試次數(shù)。
classmethod?get()[source]
返回當(dāng)前活動的TaskContext不恭。這可以在用戶函數(shù)內(nèi)部調(diào)用叶雹,以訪問關(guān)于正在運行的任務(wù)的上下文信息财饥。
注意
必須向工人而不是司機打電話。如果未初始化折晦,則返回None钥星。
partitionId()[source]
此任務(wù)計算的RDD分區(qū)的ID。
stageId()[source]
此任務(wù)所屬階段的ID满着。
taskAttemptId()[source]
對于此任務(wù)嘗試唯一的標(biāo)識(在同一個SparkContext中谦炒,沒有兩個任務(wù)嘗試將共享相同的嘗試標(biāo)識)。這大致相當(dāng)于Hadoop的TaskAttemptID漓滔。