pyspark 是spark的python api
公有類信息:
spark 函數(shù)式編程的主入口.
RDD:
彈性分布式數(shù)據(jù)集,spark的基本抽象.
廣播變量可以在任務(wù)之間重復(fù)使用.
任務(wù)之間共享的只增不減的變量.
配置spark變量.
Access files shipped with jobs.
細粒度的持久化等級.
當前正在運行的任務(wù)信息,在worker節(jié)點上,目前是實驗性的
class?pyspark.SparkConf(loadDefaults=True,?_jvm=None,?_jconf=None)[source]
Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.
spark應(yīng)用程序的配置枫浙,用來設(shè)置spark的各種各樣的鍵值對參數(shù)。
大多數(shù)情況下惩系,你需要創(chuàng)建一個SparkConf對象舵揭,同時它也會加載java體系的參數(shù)。因此校赤,你設(shè)置的任何參數(shù)的優(yōu)先級是高于系統(tǒng)設(shè)置的參數(shù)的吆玖。
對于單元測試,你總是可以設(shè)置SparkConf(false)來跳過外部參數(shù)的加載马篮,并且獲得同樣的配置沾乘,不管系統(tǒng)的參數(shù)配置是啥。
SparkConf下的所有setter方法支持鏈式操作浑测。比如翅阵,你可以這樣寫:
conf.write.setMaster("local").setAppName("My app")
注意:
一旦SparkConf對象傳遞給了Spark,它就會被克隆并且不能夠再被用戶修改了尽爆。
contains(key)[source]
配置中是否含有某個制定的key
get(key,?defaultValue=None)[source]
獲取某個key的值或者獲取默認值
getAll()[source]
獲取所有參數(shù)值怎顾,返回鍵值對列表
set(key,?value)[source]
設(shè)置一個配置屬性.
setAll(pairs)[source]
設(shè)置多個參數(shù),通過傳入鍵值對列表漱贱。
Parameters:pairs?– list of key-value pairs to set
setAppName(value)[source]
Set application name.
setExecutorEnv(key=None,?value=None,?pairs=None)[source]
設(shè)置一個傳遞個executor的環(huán)境變量
setIfMissing(key,?value)[source]
設(shè)置一個配置屬性如果這個配置屬性缺失.
setMaster(value)[source]
設(shè)置master的url.
setSparkHome(value)[source]
設(shè)置worker節(jié)點的spark安裝目錄.
toDebugString()[source]
返回一個可打印版本的配置信息槐雾,以一個list key=value 對的形式,一個配置一行
class?pyspark.SparkContext(master=None,?appName=None,?sparkHome=None,?pyFiles=None,?environment=None,?batchSize=0,?serializer=PickleSerializer(),?conf=None,?gateway=None,?jsc=None,?profiler_cls=)[source]
Spark 函數(shù)式編程的主要入口幅狮,一個SparkContext對象代表了一個Spark集群的鏈接募强,在集群中,它能夠被用來創(chuàng)建RDD和廣播變量
PACKAGE_EXTENSIONS?= ('.zip', '.egg', '.jar')
accumulator(value,?accum_param=None)[source]
創(chuàng)建一個指定初始值的累加器崇摄,使用一個指定的累加器參數(shù)幫助對象來定義指定類型怎樣累加擎值,如果你沒有指定的話,默認的累加器參數(shù)是用來指定整型和浮點型數(shù)據(jù)的累加方式的逐抑。對于其他類型鸠儿,你可以自定義一個累加器參數(shù)。
addFile(path,?recursive=False)[source]
添加一個Spark每個節(jié)點都需要加載的文件,path可以是一個本地文件进每,hdfs文件汹粤,hadoop支持的其他文件,或者http田晚,https嘱兼,或者ftp uri
在Spark jobs中訪問這個文件,使用SparkFiles.get(fileName)來獲取這個文件的位置
如果recursive設(shè)置成true這里的path也可以是一個目錄贤徒,目前這里的目錄僅支持hadoop支持的文件系統(tǒng)目錄芹壕。
>>> from pyspark? import? SparkFiles?
>>> path=os.path.join(tempdir,"test.txt")
>>> with open(path,"w") as testFile:
? ? ????????_=testFile.write("100")
>>> sc.addFile(path)
>>> def func(iterator):
????????with open(SparkFiles.get("test.txt")) as testFile:
? ?????????fileVal=int(testFile.readline())
????????????return? [x*fileValforxiniterator]
>>> sc.parallelize([1,2,3,4]).mapPartitions(func).collect()
[100, 200, 300, 400]
addPyFile(path)[source]
為將來在SparkContext上執(zhí)行的所有任務(wù)添加一個.py或者.zip依賴。傳遞的路徑可以是一個本地文件接奈,也可以是一個hdfs上的文件或者其他hadoop支持的文件系統(tǒng)踢涌,或者是http,https鲫趁,ftp uri斯嚎。
applicationId
一個spark應(yīng)用程序的獨一無二的標識符。它的格式取決于調(diào)度器的實現(xiàn)方式挨厚。
如果是本地spark程序可能是‘local-1433865536131’
如果是yarn程序可能是‘a(chǎn)pplication_1433865536131_34483’
binaryFiles(path,?minPartitions=None)[source]
Note
Experimental
Read a directory of binary files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI as a byte array. Each file is read as a single record and returned in a key-value pair, where the key is the path of each file, the value is the content of each file.
Note
Small files are preferred, large file is also allowable, but may cause bad performance.
binaryRecords(path,?recordLength)[source]
Note
Experimental
Load data from a flat binary file, assuming each record is a set of numbers with the specified numerical format (see ByteBuffer), and the number of bytes per record is constant.
Parameters:path?– Directory to the input data files
recordLength?– The length at which to split the records
broadcast(value)[source]
Broadcast 是一個只讀的變量,返回一個Broadcast對象用于分布式的方法糠惫。這個變量發(fā)送給每個節(jié)點僅一次疫剃。
cancelAllJobs()[source]
取消所有已經(jīng)調(diào)度的或者正在運行的job。
cancelJobGroup(groupId)[source]
Cancel active jobs for the specified group. See?SparkContext.setJobGroup?for more information.
defaultMinPartitions
Default min number of partitions for Hadoop RDDs when not given by user
defaultParallelism
Default level of parallelism to use when not given by user (e.g. for reduce tasks)
dump_profiles(path)[source]
Dump the profile stats into directory?path
emptyRDD()[source]
Create an RDD that has no partitions or elements.
getConf()[source]
getLocalProperty(key)[source]
Get a local property set in this thread, or null if it is missing. See?setLocalProperty
classmethod?getOrCreate(conf=None)[source]
Get or instantiate a SparkContext and register it as a singleton object.
Parameters:conf?– SparkConf (optional)
hadoopFile(path,?inputFormatClass,?keyClass,?valueClass,?keyConverter=None,?valueConverter=None,?conf=None,?batchSize=0)[source]
Read an ‘old’ Hadoop InputFormat with arbitrary key and value class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. The mechanism is the same as for sc.sequenceFile.
A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java.
Parameters:path?– path to Hadoop file
inputFormatClass?– fully qualified classname of Hadoop InputFormat (e.g. “org.apache.hadoop.mapred.TextInputFormat”)
keyClass?– fully qualified classname of key Writable class (e.g. “org.apache.hadoop.io.Text”)
valueClass?– fully qualified classname of value Writable class (e.g. “org.apache.hadoop.io.LongWritable”)
keyConverter?– (None by default)
valueConverter?– (None by default)
conf?– Hadoop configuration, passed in as a dict (None by default)
batchSize?– The number of Python objects represented as a single Java object. (default 0, choose batchSize automatically)
hadoopRDD(inputFormatClass,?keyClass,?valueClass,?keyConverter=None,?valueConverter=None,?conf=None,?batchSize=0)[source]
Read an ‘old’ Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, which is passed in as a Python dict. This will be converted into a Configuration in Java. The mechanism is the same as for sc.sequenceFile.
Parameters:inputFormatClass?– fully qualified classname of Hadoop InputFormat (e.g. “org.apache.hadoop.mapred.TextInputFormat”)
keyClass?– fully qualified classname of key Writable class (e.g. “org.apache.hadoop.io.Text”)
valueClass?– fully qualified classname of value Writable class (e.g. “org.apache.hadoop.io.LongWritable”)
keyConverter?– (None by default)
valueConverter?– (None by default)
conf?– Hadoop configuration, passed in as a dict (None by default)
batchSize?– The number of Python objects represented as a single Java object. (default 0, choose batchSize automatically)
newAPIHadoopFile(path,?inputFormatClass,?keyClass,?valueClass,?keyConverter=None,?valueConverter=None,?conf=None,?batchSize=0)[source]
Read a ‘new API’ Hadoop InputFormat with arbitrary key and value class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. The mechanism is the same as for sc.sequenceFile.
A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java
Parameters:path?– path to Hadoop file
inputFormatClass?– fully qualified classname of Hadoop InputFormat (e.g. “org.apache.hadoop.mapreduce.lib.input.TextInputFormat”)
keyClass?– fully qualified classname of key Writable class (e.g. “org.apache.hadoop.io.Text”)
valueClass?– fully qualified classname of value Writable class (e.g. “org.apache.hadoop.io.LongWritable”)
keyConverter?– (None by default)
valueConverter?– (None by default)
conf?– Hadoop configuration, passed in as a dict (None by default)
batchSize?– The number of Python objects represented as a single Java object. (default 0, choose batchSize automatically)
newAPIHadoopRDD(inputFormatClass,?keyClass,?valueClass,?keyConverter=None,?valueConverter=None,?conf=None,?batchSize=0)[source]
Read a ‘new API’ Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, which is passed in as a Python dict. This will be converted into a Configuration in Java. The mechanism is the same as for sc.sequenceFile.
Parameters:inputFormatClass?– fully qualified classname of Hadoop InputFormat (e.g. “org.apache.hadoop.mapreduce.lib.input.TextInputFormat”)
keyClass?– fully qualified classname of key Writable class (e.g. “org.apache.hadoop.io.Text”)
valueClass?– fully qualified classname of value Writable class (e.g. “org.apache.hadoop.io.LongWritable”)
keyConverter?– (None by default)
valueConverter?– (None by default)
conf?– Hadoop configuration, passed in as a dict (None by default)
batchSize?– The number of Python objects represented as a single Java object. (default 0, choose batchSize automatically)
parallelize(c,?numSlices=None)[source]
Distribute a local Python collection to form an RDD. Using xrange is recommended if the input represents a range for performance.
>>> sc.parallelize([0,2,3,4,6],5).glom().collect()[[0], [2], [3], [4], [6]]>>> sc.parallelize(xrange(0,6,2),5).glom().collect()[[], [0], [], [2], [4]]
pickleFile(name,?minPartitions=None)[source]
Load an RDD previously saved using?RDD.saveAsPickleFile?method.
>>> tmpFile=NamedTemporaryFile(delete=True)>>> tmpFile.close()>>> sc.parallelize(range(10)).saveAsPickleFile(tmpFile.name,5)>>> sorted(sc.pickleFile(tmpFile.name,3).collect())[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
range(start,?end=None,?step=1,?numSlices=None)[source]
Create a new RDD of int containing elements from?start?to?end?(exclusive), increased by?step?every element. Can be called the same way as python’s built-in range() function. If called with a single argument, the argument is interpreted as?end, and?start?is set to 0.
Parameters:start?– the start value
end?– the end value (exclusive)
step?– the incremental step (default: 1)
numSlices?– the number of partitions of the new RDD
Returns:An RDD of int
>>> sc.range(5).collect()[0, 1, 2, 3, 4]>>> sc.range(2,4).collect()[2, 3]>>> sc.range(1,7,2).collect()[1, 3, 5]
runJob(rdd,?partitionFunc,?partitions=None,?allowLocal=False)[source]
Executes the given partitionFunc on the specified set of partitions, returning the result as an array of elements.
If ‘partitions’ is not specified, this will run over all partitions.
>>> myRDD=sc.parallelize(range(6),3)>>> sc.runJob(myRDD,lambdapart:[x*xforxinpart])[0, 1, 4, 9, 16, 25]
>>> myRDD=sc.parallelize(range(6),3)>>> sc.runJob(myRDD,lambdapart:[x*xforxinpart],[0,2],True)[0, 1, 16, 25]
sequenceFile(path,?keyClass=None,?valueClass=None,?keyConverter=None,?valueConverter=None,?minSplits=None,?batchSize=0)[source]
Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. The mechanism is as follows:
A Java RDD is created from the SequenceFile or other InputFormat, and the key and value Writable classes
Serialization is attempted via Pyrolite pickling
If this fails, the fallback is to call ‘toString’ on each key and value
PickleSerializer?is used to deserialize pickled objects on the Python side
Parameters:path?– path to sequncefile
keyClass?– fully qualified classname of key Writable class (e.g. “org.apache.hadoop.io.Text”)
valueClass?– fully qualified classname of value Writable class (e.g. “org.apache.hadoop.io.LongWritable”)
keyConverter?–
valueConverter?–
minSplits?– minimum splits in dataset (default min(2, sc.defaultParallelism))
batchSize?– The number of Python objects represented as a single Java object. (default 0, choose batchSize automatically)
setCheckpointDir(dirName)[source]
Set the directory under which RDDs are going to be checkpointed. The directory must be a HDFS path if running on a cluster.
setJobDescription(value)[source]
Set a human readable description of the current job.
setJobGroup(groupId,?description,?interruptOnCancel=False)[source]
Assigns a group ID to all the jobs started by this thread until the group ID is set to a different value or cleared.
Often, a unit of execution in an application consists of multiple Spark actions or jobs. Application programmers can use this method to group all those jobs together and give a group description. Once set, the Spark web UI will associate such jobs with this group.
The application can use?SparkContext.cancelJobGroup?to cancel all running jobs in this group.
>>> importthreading>>> fromtimeimportsleep>>> result="Not Set">>> lock=threading.Lock()>>> defmap_func(x):... sleep(100)... raiseException("Task should have been cancelled")>>> defstart_job(x):... globalresult... try:... sc.setJobGroup("job_to_cancel","some description")... result=sc.parallelize(range(x)).map(map_func).collect()... exceptExceptionase:... result="Cancelled"... lock.release()>>> defstop_job():... sleep(5)... sc.cancelJobGroup("job_to_cancel")>>> supress=lock.acquire()>>> supress=threading.Thread(target=start_job,args=(10,)).start()>>> supress=threading.Thread(target=stop_job).start()>>> supress=lock.acquire()>>> print(result)Cancelled
If interruptOnCancel is set to true for the job group, then job cancellation will result in Thread.interrupt() being called on the job’s executor threads. This is useful to help ensure that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208, where HDFS may respond to Thread.interrupt() by marking nodes as dead.
setLocalProperty(key,?value)[source]
Set a local property that affects jobs submitted from this thread, such as the Spark fair scheduler pool.
setLogLevel(logLevel)[source]
Control our logLevel. This overrides any user-defined log settings. Valid log levels include: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
classmethod?setSystemProperty(key,?value)[source]
Set a Java system property, such as spark.executor.memory. This must must be invoked before instantiating SparkContext.
show_profiles()[source]
Print the profile stats to stdout
sparkUser()[source]
Get SPARK_USER for user who is running SparkContext.
startTime
Return the epoch time when the Spark Context was started.
statusTracker()[source]
Return?StatusTracker?object
stop()[source]
Shut down the SparkContext.
textFile(name,?minPartitions=None,?use_unicode=True)[source]
Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings.
If use_unicode is False, the strings will be kept as?str?(encoding as?utf-8), which is faster and smaller than unicode. (Added in Spark 1.2)
>>> path=os.path.join(tempdir,"sample-text.txt")>>> withopen(path,"w")astestFile:... _=testFile.write("Hello world!")>>> textFile=sc.textFile(path)>>> textFile.collect()['Hello world!']
uiWebUrl
Return the URL of the SparkUI instance started by this SparkContext
union(rdds)[source]
Build the union of a list of RDDs.
This supports unions() of RDDs with different serialized formats, although this forces them to be reserialized using the default serializer:
>>> path=os.path.join(tempdir,"union-text.txt")>>> withopen(path,"w")astestFile:... _=testFile.write("Hello")>>> textFile=sc.textFile(path)>>> textFile.collect()['Hello']>>> parallelized=sc.parallelize(["World!"])>>> sorted(sc.union([textFile,parallelized]).collect())['Hello', 'World!']
version
The version of Spark on which this application is running.
wholeTextFiles(path,?minPartitions=None,?use_unicode=True)[source]
Read a directory of text files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. Each file is read as a single record and returned in a key-value pair, where the key is the path of each file, the value is the content of each file.
If use_unicode is False, the strings will be kept as?str?(encoding as?utf-8), which is faster and smaller than unicode. (Added in Spark 1.2)
For example, if you have the following files:
hdfs://a-hdfs-path/part-00000hdfs://a-hdfs-path/part-00001...hdfs://a-hdfs-path/part-nnnnn
Do?rdd = sparkContext.wholeTextFiles(“hdfs://a-hdfs-path”), then?rdd?contains:
(a-hdfs-path/part-00000,itscontent)(a-hdfs-path/part-00001,itscontent)...(a-hdfs-path/part-nnnnn,itscontent)
Note
Small files are preferred, as each file will be loaded fully in memory.
>>> dirPath=os.path.join(tempdir,"files")>>> os.mkdir(dirPath)>>> withopen(os.path.join(dirPath,"1.txt"),"w")asfile1:... _=file1.write("1")>>> withopen(os.path.join(dirPath,"2.txt"),"w")asfile2:... _=file2.write("2")>>> textFiles=sc.wholeTextFiles(dirPath)>>> sorted(textFiles.collect())[('.../1.txt', '1'), ('.../2.txt', '2')]
class?pyspark.SparkFiles[source]
Resolves paths to files added through L{SparkContext.addFile()}.
SparkFiles contains only classmethods; users should not create SparkFiles instances.
classmethod?get(filename)[source]
Get the absolute path of a file added through?SparkContext.addFile().
classmethod?getRootDirectory()[source]
Get the root directory that contains files added through?SparkContext.addFile().
SparkFiles 主要解決了向spark添加文件的問題硼讽,這個文件用于spark的每個節(jié)點巢价,推測spark有自己的臨時目錄存放文件