pyspark api 解讀一

pyspark 是spark的python api

公有類信息:

SparkContext:

spark 函數(shù)式編程的主入口.

RDD:

彈性分布式數(shù)據(jù)集,spark的基本抽象.

Broadcast:

廣播變量可以在任務(wù)之間重復(fù)使用.

Accumulator:

任務(wù)之間共享的只增不減的變量.

SparkConf:

配置spark變量.

SparkFiles:

Access files shipped with jobs.

StorageLevel:

細粒度的持久化等級.

TaskContext:

當前正在運行的任務(wù)信息,在worker節(jié)點上,目前是實驗性的

class?pyspark.SparkConf(loadDefaults=True,?_jvm=None,?_jconf=None)[source]

Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.

spark應(yīng)用程序的配置枫浙,用來設(shè)置spark的各種各樣的鍵值對參數(shù)。

大多數(shù)情況下惩系,你需要創(chuàng)建一個SparkConf對象舵揭,同時它也會加載java體系的參數(shù)。因此校赤,你設(shè)置的任何參數(shù)的優(yōu)先級是高于系統(tǒng)設(shè)置的參數(shù)的吆玖。

對于單元測試,你總是可以設(shè)置SparkConf(false)來跳過外部參數(shù)的加載马篮,并且獲得同樣的配置沾乘,不管系統(tǒng)的參數(shù)配置是啥。

SparkConf下的所有setter方法支持鏈式操作浑测。比如翅阵,你可以這樣寫:

conf.write.setMaster("local").setAppName("My app")

注意:

一旦SparkConf對象傳遞給了Spark,它就會被克隆并且不能夠再被用戶修改了尽爆。

contains(key)[source]

配置中是否含有某個制定的key

get(key,?defaultValue=None)[source]

獲取某個key的值或者獲取默認值

getAll()[source]

獲取所有參數(shù)值怎顾,返回鍵值對列表

set(key,?value)[source]

設(shè)置一個配置屬性.

setAll(pairs)[source]

設(shè)置多個參數(shù),通過傳入鍵值對列表漱贱。

Parameters:pairs?– list of key-value pairs to set

setAppName(value)[source]

Set application name.

setExecutorEnv(key=None,?value=None,?pairs=None)[source]

設(shè)置一個傳遞個executor的環(huán)境變量

setIfMissing(key,?value)[source]

設(shè)置一個配置屬性如果這個配置屬性缺失.

setMaster(value)[source]

設(shè)置master的url.

setSparkHome(value)[source]

設(shè)置worker節(jié)點的spark安裝目錄.

toDebugString()[source]

返回一個可打印版本的配置信息槐雾,以一個list key=value 對的形式,一個配置一行


class?pyspark.SparkContext(master=None,?appName=None,?sparkHome=None,?pyFiles=None,?environment=None,?batchSize=0,?serializer=PickleSerializer(),?conf=None,?gateway=None,?jsc=None,?profiler_cls=)[source]

Spark 函數(shù)式編程的主要入口幅狮,一個SparkContext對象代表了一個Spark集群的鏈接募强,在集群中,它能夠被用來創(chuàng)建RDD和廣播變量

PACKAGE_EXTENSIONS?= ('.zip', '.egg', '.jar')

accumulator(value,?accum_param=None)[source]

創(chuàng)建一個指定初始值的累加器崇摄,使用一個指定的累加器參數(shù)幫助對象來定義指定類型怎樣累加擎值,如果你沒有指定的話,默認的累加器參數(shù)是用來指定整型和浮點型數(shù)據(jù)的累加方式的逐抑。對于其他類型鸠儿,你可以自定義一個累加器參數(shù)。

addFile(path,?recursive=False)[source]

添加一個Spark每個節(jié)點都需要加載的文件,path可以是一個本地文件进每,hdfs文件汹粤,hadoop支持的其他文件,或者http田晚,https嘱兼,或者ftp uri

在Spark jobs中訪問這個文件,使用SparkFiles.get(fileName)來獲取這個文件的位置

如果recursive設(shè)置成true這里的path也可以是一個目錄贤徒,目前這里的目錄僅支持hadoop支持的文件系統(tǒng)目錄芹壕。

>>> from pyspark? import? SparkFiles?

>>> path=os.path.join(tempdir,"test.txt")

>>> with open(path,"w") as testFile:

? ? ????????_=testFile.write("100")

>>> sc.addFile(path)

>>> def func(iterator):

????????with open(SparkFiles.get("test.txt")) as testFile:

? ?????????fileVal=int(testFile.readline())

????????????return? [x*fileValforxiniterator]

>>> sc.parallelize([1,2,3,4]).mapPartitions(func).collect()

[100, 200, 300, 400]

addPyFile(path)[source]

為將來在SparkContext上執(zhí)行的所有任務(wù)添加一個.py或者.zip依賴。傳遞的路徑可以是一個本地文件接奈,也可以是一個hdfs上的文件或者其他hadoop支持的文件系統(tǒng)踢涌,或者是http,https鲫趁,ftp uri斯嚎。

applicationId

一個spark應(yīng)用程序的獨一無二的標識符。它的格式取決于調(diào)度器的實現(xiàn)方式挨厚。

如果是本地spark程序可能是‘local-1433865536131’

如果是yarn程序可能是‘a(chǎn)pplication_1433865536131_34483’

binaryFiles(path,?minPartitions=None)[source]

Note


Experimental

Read a directory of binary files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI as a byte array. Each file is read as a single record and returned in a key-value pair, where the key is the path of each file, the value is the content of each file.

Note


Small files are preferred, large file is also allowable, but may cause bad performance.

binaryRecords(path,?recordLength)[source]

Note


Experimental

Load data from a flat binary file, assuming each record is a set of numbers with the specified numerical format (see ByteBuffer), and the number of bytes per record is constant.

Parameters:path?– Directory to the input data files

recordLength?– The length at which to split the records

broadcast(value)[source]

Broadcast 是一個只讀的變量,返回一個Broadcast對象用于分布式的方法糠惫。這個變量發(fā)送給每個節(jié)點僅一次疫剃。

cancelAllJobs()[source]

取消所有已經(jīng)調(diào)度的或者正在運行的job。

cancelJobGroup(groupId)[source]

Cancel active jobs for the specified group. See?SparkContext.setJobGroup?for more information.

defaultMinPartitions

Default min number of partitions for Hadoop RDDs when not given by user

defaultParallelism

Default level of parallelism to use when not given by user (e.g. for reduce tasks)

dump_profiles(path)[source]

Dump the profile stats into directory?path

emptyRDD()[source]

Create an RDD that has no partitions or elements.

getConf()[source]

getLocalProperty(key)[source]

Get a local property set in this thread, or null if it is missing. See?setLocalProperty

classmethod?getOrCreate(conf=None)[source]

Get or instantiate a SparkContext and register it as a singleton object.

Parameters:conf?– SparkConf (optional)

hadoopFile(path,?inputFormatClass,?keyClass,?valueClass,?keyConverter=None,?valueConverter=None,?conf=None,?batchSize=0)[source]

Read an ‘old’ Hadoop InputFormat with arbitrary key and value class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. The mechanism is the same as for sc.sequenceFile.

A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java.

Parameters:path?– path to Hadoop file

inputFormatClass?– fully qualified classname of Hadoop InputFormat (e.g. “org.apache.hadoop.mapred.TextInputFormat”)

keyClass?– fully qualified classname of key Writable class (e.g. “org.apache.hadoop.io.Text”)

valueClass?– fully qualified classname of value Writable class (e.g. “org.apache.hadoop.io.LongWritable”)

keyConverter?– (None by default)

valueConverter?– (None by default)

conf?– Hadoop configuration, passed in as a dict (None by default)

batchSize?– The number of Python objects represented as a single Java object. (default 0, choose batchSize automatically)

hadoopRDD(inputFormatClass,?keyClass,?valueClass,?keyConverter=None,?valueConverter=None,?conf=None,?batchSize=0)[source]

Read an ‘old’ Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, which is passed in as a Python dict. This will be converted into a Configuration in Java. The mechanism is the same as for sc.sequenceFile.

Parameters:inputFormatClass?– fully qualified classname of Hadoop InputFormat (e.g. “org.apache.hadoop.mapred.TextInputFormat”)

keyClass?– fully qualified classname of key Writable class (e.g. “org.apache.hadoop.io.Text”)

valueClass?– fully qualified classname of value Writable class (e.g. “org.apache.hadoop.io.LongWritable”)

keyConverter?– (None by default)

valueConverter?– (None by default)

conf?– Hadoop configuration, passed in as a dict (None by default)

batchSize?– The number of Python objects represented as a single Java object. (default 0, choose batchSize automatically)

newAPIHadoopFile(path,?inputFormatClass,?keyClass,?valueClass,?keyConverter=None,?valueConverter=None,?conf=None,?batchSize=0)[source]

Read a ‘new API’ Hadoop InputFormat with arbitrary key and value class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. The mechanism is the same as for sc.sequenceFile.

A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java

Parameters:path?– path to Hadoop file

inputFormatClass?– fully qualified classname of Hadoop InputFormat (e.g. “org.apache.hadoop.mapreduce.lib.input.TextInputFormat”)

keyClass?– fully qualified classname of key Writable class (e.g. “org.apache.hadoop.io.Text”)

valueClass?– fully qualified classname of value Writable class (e.g. “org.apache.hadoop.io.LongWritable”)

keyConverter?– (None by default)

valueConverter?– (None by default)

conf?– Hadoop configuration, passed in as a dict (None by default)

batchSize?– The number of Python objects represented as a single Java object. (default 0, choose batchSize automatically)

newAPIHadoopRDD(inputFormatClass,?keyClass,?valueClass,?keyConverter=None,?valueConverter=None,?conf=None,?batchSize=0)[source]

Read a ‘new API’ Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, which is passed in as a Python dict. This will be converted into a Configuration in Java. The mechanism is the same as for sc.sequenceFile.

Parameters:inputFormatClass?– fully qualified classname of Hadoop InputFormat (e.g. “org.apache.hadoop.mapreduce.lib.input.TextInputFormat”)

keyClass?– fully qualified classname of key Writable class (e.g. “org.apache.hadoop.io.Text”)

valueClass?– fully qualified classname of value Writable class (e.g. “org.apache.hadoop.io.LongWritable”)

keyConverter?– (None by default)

valueConverter?– (None by default)

conf?– Hadoop configuration, passed in as a dict (None by default)

batchSize?– The number of Python objects represented as a single Java object. (default 0, choose batchSize automatically)

parallelize(c,?numSlices=None)[source]

Distribute a local Python collection to form an RDD. Using xrange is recommended if the input represents a range for performance.

>>> sc.parallelize([0,2,3,4,6],5).glom().collect()[[0], [2], [3], [4], [6]]>>> sc.parallelize(xrange(0,6,2),5).glom().collect()[[], [0], [], [2], [4]]

pickleFile(name,?minPartitions=None)[source]

Load an RDD previously saved using?RDD.saveAsPickleFile?method.

>>> tmpFile=NamedTemporaryFile(delete=True)>>> tmpFile.close()>>> sc.parallelize(range(10)).saveAsPickleFile(tmpFile.name,5)>>> sorted(sc.pickleFile(tmpFile.name,3).collect())[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

range(start,?end=None,?step=1,?numSlices=None)[source]

Create a new RDD of int containing elements from?start?to?end?(exclusive), increased by?step?every element. Can be called the same way as python’s built-in range() function. If called with a single argument, the argument is interpreted as?end, and?start?is set to 0.

Parameters:start?– the start value

end?– the end value (exclusive)

step?– the incremental step (default: 1)

numSlices?– the number of partitions of the new RDD

Returns:An RDD of int

>>> sc.range(5).collect()[0, 1, 2, 3, 4]>>> sc.range(2,4).collect()[2, 3]>>> sc.range(1,7,2).collect()[1, 3, 5]

runJob(rdd,?partitionFunc,?partitions=None,?allowLocal=False)[source]

Executes the given partitionFunc on the specified set of partitions, returning the result as an array of elements.

If ‘partitions’ is not specified, this will run over all partitions.

>>> myRDD=sc.parallelize(range(6),3)>>> sc.runJob(myRDD,lambdapart:[x*xforxinpart])[0, 1, 4, 9, 16, 25]

>>> myRDD=sc.parallelize(range(6),3)>>> sc.runJob(myRDD,lambdapart:[x*xforxinpart],[0,2],True)[0, 1, 16, 25]

sequenceFile(path,?keyClass=None,?valueClass=None,?keyConverter=None,?valueConverter=None,?minSplits=None,?batchSize=0)[source]

Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. The mechanism is as follows:

A Java RDD is created from the SequenceFile or other InputFormat, and the key and value Writable classes

Serialization is attempted via Pyrolite pickling

If this fails, the fallback is to call ‘toString’ on each key and value

PickleSerializer?is used to deserialize pickled objects on the Python side

Parameters:path?– path to sequncefile

keyClass?– fully qualified classname of key Writable class (e.g. “org.apache.hadoop.io.Text”)

valueClass?– fully qualified classname of value Writable class (e.g. “org.apache.hadoop.io.LongWritable”)

keyConverter?–

valueConverter?–

minSplits?– minimum splits in dataset (default min(2, sc.defaultParallelism))

batchSize?– The number of Python objects represented as a single Java object. (default 0, choose batchSize automatically)

setCheckpointDir(dirName)[source]

Set the directory under which RDDs are going to be checkpointed. The directory must be a HDFS path if running on a cluster.

setJobDescription(value)[source]

Set a human readable description of the current job.

setJobGroup(groupId,?description,?interruptOnCancel=False)[source]

Assigns a group ID to all the jobs started by this thread until the group ID is set to a different value or cleared.

Often, a unit of execution in an application consists of multiple Spark actions or jobs. Application programmers can use this method to group all those jobs together and give a group description. Once set, the Spark web UI will associate such jobs with this group.

The application can use?SparkContext.cancelJobGroup?to cancel all running jobs in this group.

>>> importthreading>>> fromtimeimportsleep>>> result="Not Set">>> lock=threading.Lock()>>> defmap_func(x):... sleep(100)... raiseException("Task should have been cancelled")>>> defstart_job(x):... globalresult... try:... sc.setJobGroup("job_to_cancel","some description")... result=sc.parallelize(range(x)).map(map_func).collect()... exceptExceptionase:... result="Cancelled"... lock.release()>>> defstop_job():... sleep(5)... sc.cancelJobGroup("job_to_cancel")>>> supress=lock.acquire()>>> supress=threading.Thread(target=start_job,args=(10,)).start()>>> supress=threading.Thread(target=stop_job).start()>>> supress=lock.acquire()>>> print(result)Cancelled

If interruptOnCancel is set to true for the job group, then job cancellation will result in Thread.interrupt() being called on the job’s executor threads. This is useful to help ensure that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208, where HDFS may respond to Thread.interrupt() by marking nodes as dead.

setLocalProperty(key,?value)[source]

Set a local property that affects jobs submitted from this thread, such as the Spark fair scheduler pool.

setLogLevel(logLevel)[source]

Control our logLevel. This overrides any user-defined log settings. Valid log levels include: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN

classmethod?setSystemProperty(key,?value)[source]

Set a Java system property, such as spark.executor.memory. This must must be invoked before instantiating SparkContext.

show_profiles()[source]

Print the profile stats to stdout

sparkUser()[source]

Get SPARK_USER for user who is running SparkContext.

startTime

Return the epoch time when the Spark Context was started.

statusTracker()[source]

Return?StatusTracker?object

stop()[source]

Shut down the SparkContext.

textFile(name,?minPartitions=None,?use_unicode=True)[source]

Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings.

If use_unicode is False, the strings will be kept as?str?(encoding as?utf-8), which is faster and smaller than unicode. (Added in Spark 1.2)

>>> path=os.path.join(tempdir,"sample-text.txt")>>> withopen(path,"w")astestFile:... _=testFile.write("Hello world!")>>> textFile=sc.textFile(path)>>> textFile.collect()['Hello world!']

uiWebUrl

Return the URL of the SparkUI instance started by this SparkContext

union(rdds)[source]

Build the union of a list of RDDs.

This supports unions() of RDDs with different serialized formats, although this forces them to be reserialized using the default serializer:

>>> path=os.path.join(tempdir,"union-text.txt")>>> withopen(path,"w")astestFile:... _=testFile.write("Hello")>>> textFile=sc.textFile(path)>>> textFile.collect()['Hello']>>> parallelized=sc.parallelize(["World!"])>>> sorted(sc.union([textFile,parallelized]).collect())['Hello', 'World!']

version

The version of Spark on which this application is running.

wholeTextFiles(path,?minPartitions=None,?use_unicode=True)[source]

Read a directory of text files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. Each file is read as a single record and returned in a key-value pair, where the key is the path of each file, the value is the content of each file.

If use_unicode is False, the strings will be kept as?str?(encoding as?utf-8), which is faster and smaller than unicode. (Added in Spark 1.2)

For example, if you have the following files:

hdfs://a-hdfs-path/part-00000hdfs://a-hdfs-path/part-00001...hdfs://a-hdfs-path/part-nnnnn

Do?rdd = sparkContext.wholeTextFiles(“hdfs://a-hdfs-path”), then?rdd?contains:

(a-hdfs-path/part-00000,itscontent)(a-hdfs-path/part-00001,itscontent)...(a-hdfs-path/part-nnnnn,itscontent)

Note


Small files are preferred, as each file will be loaded fully in memory.

>>> dirPath=os.path.join(tempdir,"files")>>> os.mkdir(dirPath)>>> withopen(os.path.join(dirPath,"1.txt"),"w")asfile1:... _=file1.write("1")>>> withopen(os.path.join(dirPath,"2.txt"),"w")asfile2:... _=file2.write("2")>>> textFiles=sc.wholeTextFiles(dirPath)>>> sorted(textFiles.collect())[('.../1.txt', '1'), ('.../2.txt', '2')]

class?pyspark.SparkFiles[source]

Resolves paths to files added through L{SparkContext.addFile()}.

SparkFiles contains only classmethods; users should not create SparkFiles instances.

classmethod?get(filename)[source]

Get the absolute path of a file added through?SparkContext.addFile().

classmethod?getRootDirectory()[source]

Get the root directory that contains files added through?SparkContext.addFile().

SparkFiles 主要解決了向spark添加文件的問題硼讽,這個文件用于spark的每個節(jié)點巢价,推測spark有自己的臨時目錄存放文件

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市固阁,隨后出現(xiàn)的幾起案子壤躲,更是在濱河造成了極大的恐慌,老刑警劉巖备燃,帶你破解...
    沈念sama閱讀 218,941評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件碉克,死亡現(xiàn)場離奇詭異,居然都是意外死亡并齐,警方通過查閱死者的電腦和手機漏麦,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來况褪,“玉大人撕贞,你說我怎么就攤上這事〔舛猓” “怎么了捏膨?”我有些...
    開封第一講書人閱讀 165,345評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長食侮。 經(jīng)常有香客問我号涯,道長目胡,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,851評論 1 295
  • 正文 為了忘掉前任诚隙,我火速辦了婚禮讶隐,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘久又。我一直安慰自己巫延,他們只是感情好,可當我...
    茶點故事閱讀 67,868評論 6 392
  • 文/花漫 我一把揭開白布地消。 她就那樣靜靜地躺著炉峰,像睡著了一般。 火紅的嫁衣襯著肌膚如雪脉执。 梳的紋絲不亂的頭發(fā)上疼阔,一...
    開封第一講書人閱讀 51,688評論 1 305
  • 那天,我揣著相機與錄音半夷,去河邊找鬼婆廊。 笑死,一個胖子當著我的面吹牛巫橄,可吹牛的內(nèi)容都是我干的淘邻。 我是一名探鬼主播,決...
    沈念sama閱讀 40,414評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼湘换,長吁一口氣:“原來是場噩夢啊……” “哼宾舅!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起彩倚,我...
    開封第一講書人閱讀 39,319評論 0 276
  • 序言:老撾萬榮一對情侶失蹤筹我,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后帆离,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體蔬蕊,經(jīng)...
    沈念sama閱讀 45,775評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年盯质,在試婚紗的時候發(fā)現(xiàn)自己被綠了袁串。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,096評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡呼巷,死狀恐怖囱修,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情王悍,我是刑警寧澤破镰,帶...
    沈念sama閱讀 35,789評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站,受9級特大地震影響鲜漩,放射性物質(zhì)發(fā)生泄漏源譬。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,437評論 3 331
  • 文/蒙蒙 一孕似、第九天 我趴在偏房一處隱蔽的房頂上張望踩娘。 院中可真熱鬧,春花似錦喉祭、人聲如沸养渴。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,993評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽理卑。三九已至,卻和暖如春蔽氨,著一層夾襖步出監(jiān)牢的瞬間藐唠,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,107評論 1 271
  • 我被黑心中介騙來泰國打工鹉究, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留宇立,地道東北人。 一個月前我還...
    沈念sama閱讀 48,308評論 3 372
  • 正文 我出身青樓自赔,卻偏偏與公主長得像泄伪,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子匿级,可洞房花燭夜當晚...
    茶點故事閱讀 45,037評論 2 355

推薦閱讀更多精彩內(nèi)容