pyspark.sql module
Module context
Spark SQL和DataFrames中的重要類:
- pyspark.sql.SparkSession - DataFrame和SQL功能的主要入口點刽宪。
- pyspark.sql.DataFrame - 分布式數(shù)據(jù)集合分組到命名的列。
- pyspark.sql.Column - DataFrame中的列表達式腌乡。
- pyspark.sql.Row - DataFrame中的一行數(shù)據(jù)苗缩。
- pyspark.sql.GroupedData - 由DataFrame.groupBy()返回的聚合方法怯屉。
- pyspark.sql.DataFrameNaFunctions - 處理缺失數(shù)據(jù)的方法(空值)急凰。
- pyspark.sql.DataFrameStatFunctions - 統(tǒng)計功能的方法正压。
- pyspark.sql.functions - 可用于DataFrame的內(nèi)置函數(shù)列表乍炉。
- pyspark.sql.types - 可用的數(shù)據(jù)類型列表馋贤。
- pyspark.sql.Window - 用于處理窗口函數(shù)赞别。
class pyspark.sql.SparkSession(sparkContext, jsparkSession=None)
使用Dataset和DataFrame API編程Spark的入口點。
SparkSession可用于創(chuàng)建DataFrame配乓,將DataFrame注冊為表格仿滔,在表格上執(zhí)行SQL,緩存表格以及讀取parquet文件犹芹。 要創(chuàng)建SparkSession崎页,請使用以下構建器模式:
spark = SparkSession.builder.master('spark://cn01:7077').appName("Word Count").getOrCreate()
class Builder
SparkSession的生成器。
- appName(name)
為應用程序設置一個名稱腰埂,該名稱將顯示在Spark Web UI中窟她。
如果沒有設置應用程序名稱插掂,則會隨機生成名稱榨呆。 - config(key=None, value=None, conf=None)
設置一個配置選項倦炒。 使用此方法設置的選項會自動傳播到SparkConf和SparkSession自己的配置中。
對于現(xiàn)有的SparkConf刁卜,請使用conf參數(shù)志电。
>>> from pyspark import SparkConf
>>> SparkSession.builder.config(conf=SparkConf())
<pyspark.sql.session.Builder object at 0x2ab7d2ab7650>
- enableHiveSupport()
啟用Hive支持,包括連接到持久化的Hive Metastore蛔趴,支持Hive serdes和Hive用戶定義的功能挑辆。 - getOrCreate()
獲取現(xiàn)有的SparkSession,或者孝情,如果沒有現(xiàn)有的SparkSession鱼蝉,則根據(jù)此構建器中設置的選項創(chuàng)建一個新的SparkSession。
此方法首先檢查是否存在有效的全局默認SparkSession箫荡,如果是魁亦,則返回該值。 如果不存在有效的全局默認SparkSession羔挡,則該方法創(chuàng)建一個新的SparkSession洁奈,并將新創(chuàng)建的SparkSession指定為全局默認值间唉。 - master(master)
設置要連接到的Spark master URL,例如本地運行的“l(fā)ocal”利术,本地運行4核的“l(fā)ocal [4]”或運行在Spark獨立群集上的“spark:// master:7077”呈野。
SparkSession.builder = <pyspark.sql.session.Builder object at 0x7f51f134a110>
SparkSession.catalog
用戶可以通過它創(chuàng)建,刪除印叁,修改或查詢底層數(shù)據(jù)庫被冒,表格,函數(shù)等的接口SparkSession.conf
Spark的運行時配置接口轮蜕。
這是用戶可以獲取并設置與Spark SQL相關的所有Spark和Hadoop配置的接口昨悼。 獲取配置的值時,默認為基礎SparkContext中設置的值(如果有)跃洛。SparkSession.createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)
從RDD幔戏,列表或pandas.DataFrame創(chuàng)建一個DataFrame。
當schema是列名稱的列表時税课,每列的類型將從數(shù)據(jù)中推斷出來。
當schema為None時痊剖,它將嘗試從數(shù)據(jù)中推斷出schema(列名和類型)韩玩,數(shù)據(jù)應該是Row的RDD,或者是namedtuple或者dict陆馁。
當schema是pyspark.sql.types.DataType或數(shù)據(jù)類型字符串時找颓,它必須匹配真實數(shù)據(jù),否則將在運行時引發(fā)異常叮贩。 如果給定的schema不是pyspark.sql.types.StructType击狮,它將被封裝成一個pyspark.sql.types.StructType作為唯一的字段,字段名稱將是“值”益老,每個記錄也將被包裝成一個 元組彪蓬,可以稍后轉(zhuǎn)換為行。
如果需要(schema)模式推斷捺萌,則使用samplingRatio來確定用于模式推斷的行的比例档冬。 如果samplingRatio為None,則使用第一行桃纯。
Parameters:
- data - 任何類型的SQL數(shù)據(jù)表示d的RDD(例如行酷誓,元組,int态坦,布爾等)或列表或pandas.DataFrame盐数。
- schema - 一個pyspark.sql.types.DataType或一個數(shù)據(jù)類型字符串或列名稱列表,默認值為None伞梯。 數(shù)據(jù)類型字符串格式等于pyspark.sql.types.DataType.simpleString玫氢,除了頂層結(jié)構類型可以省略struct <>帚屉,原子類型使用typeName()作為它們的格式。 使用字節(jié)而不是tinyint為pyspark.sql.types.ByteType琐旁。 我們也可以使用int作為IntegerType的簡稱涮阔。
- samplingRatio - 用于推斷行的樣本比例。
- verifySchema - 根據(jù)模式驗證每一行的數(shù)據(jù)類型灰殴。
Returns: DataFrame
# list to DataFrame
>>> l = [("name", 1), ("Bob", 2)]
>>> spark.createDataFrame(l, ["name", "age"]).collect()
[Row(name=u'name', age=1), Row(name=u'Bob', age=2)]
>>> d = [{'name': 'Alice', 'age': 1}, {"name": "Bob", "age": 2}]
>>> spark.createDataFrame(d, ["name", "age"]).collect()
[Row(name=1, age=u'Alice'), Row(name=2, age=u'Bob')]
#RDD to DataFrame
>>> spark.createDataFrame(sc.parallelize(l), ["name", "age"]).collect()
[Row(name=u'name', age=1), Row(name=u'Bob', age=2)]
#pandas.DataFrame to DataFrame
>>> spark.createDataFrame(pandas.DataFrame([[1, 2]])).collect()
[Row(0=1, 1=2)]
>>> from pyspark.sql.types import *
>>> schema = StructType([StructField("name", StringType(), True),StructField("age", IntegerType(), True)])
>>> spark.createDataFrame(l, schema).collect()
[Row(name=u'name', age=1), Row(name=u'Bob', age=2)]
>>> spark.createDataFrame(l, "name: string, age: int").collect()
[Row(name=u'name', age=1), Row(name=u'Bob', age=2)]
>>> from pyspark.sql import Row
>>> Person = Row('name', 'age')
>>> spark.createDataFrame(l, Person).collect()
[Row(name=u'name', age=1), Row(name=u'Bob', age=2)]
- SparkSession.newSession()
以新會話形式返回一個新的SparkSession敬特,它具有單獨的SQLConf,注冊的臨時視圖和UDF牺陶,但共享SparkContext和表緩存伟阔。 - SparkSession.range(start, end=None, step=1, numPartitions=None)
>>> spark.range(1,7,2).collect()
[Row(id=1), Row(id=3), Row(id=5)]
- SparkSession.read
返回可用于讀取DataFrame中的數(shù)據(jù)的DataFrameReader。 - SparkSession.readStream
返回一個DataStreamReader掰伸,它可以用來讀取數(shù)據(jù)流作為一個數(shù)據(jù)流DataFrame皱炉。 - SparkSession.sparkContext
返回底層的SparkContext。 - SparkSession.sql(sqlQuery)
返回表示給定查詢結(jié)果的DataFrame狮鸭。
>>> l
[('name', 1), ('Bob', 2)]
>>> df = spark.createDataFrame(l, ["name", "age"])
#使用dataFrame(df)創(chuàng)建或替換本地臨時視圖合搅。
>>> df.createOrReplaceTempView("table1")
>>> spark.sql("select * from table1").collect()
[Row(name=u'name', age=1), Row(name=u'Bob', age=2)]
- SparkSession.stop()
停止底層的SparkContext。 - SparkSession.streams
返回一個StreamingQueryManager歧蕉,它允許管理所有的StreamingQuery 灾部,在此上下文中激活的StreamingQueries。 - SparkSession.table(tableName)
以DataFrame的形式返回指定的表惯退。
>>> spark.table("table1").collect()
[Row(name=u'name', age=1), Row(name=u'Bob', age=2)]
- SparkSession.udf
返回UDF注冊的UDFRegistration赌髓。 - SparkSession.version
運行此應用程序的Spark版本。
class pyspark.sql.SQLContext(sparkContext, sparkSession=None, jsqlContext=None)
在Spark 1.x中使用Spark中結(jié)構化數(shù)據(jù)(行和列)的入口點催跪。
從Spark 2.0開始锁蠕,它被SparkSession所取代。 但是懊蒸,為了向后兼容荣倾,我們在這里保留這個類。
可以使用SQLContext創(chuàng)建DataFrame骑丸,將DataFrame注冊為表逃呼,在表上執(zhí)行SQL,緩存表和讀取parquet文件者娱。
- cacheTable(tableName)
在內(nèi)存中緩存指定的表抡笼。 - clearCache()
從內(nèi)存緩存中刪除所有緩存的表。 - createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)
- createExternalTable(tableName, path=None, source=None, schema=None, **options)
根據(jù)數(shù)據(jù)源中的數(shù)據(jù)集創(chuàng)建外部表黄鳍。 - dropTempTable(tableName)
從目錄中刪除臨時表推姻。 - getConf(key, defaultValue=None)
返回Spark SQL配置屬性中給定鍵的值。 - classmethod getOrCreate(sc)
獲取現(xiàn)有的SQLContext或使用給定的SparkContext創(chuàng)建一個新的SQLContext框沟。 - newSession()
將新的SQLContext作為新會話返回藏古,它具有單獨的SQLConf增炭,注冊的臨時視圖和UDF,但是共享SparkContext和表緩存拧晕。 - range(start, end=None, step=1, numPartitions=None)
- read
- readStream
- registerDataFrameAsTable(df, tableName)
將給定的DataFrame注冊為目錄中的臨時表隙姿。
臨時表僅在此SQLContext實例的生命周期中存在。 - registerFunction(name, f, returnType=StringType)
將一個python函數(shù)(包括lambda函數(shù))注冊為UDF(自定義函數(shù))厂捞,以便在SQL語句中使用输玷。
除了名稱和函數(shù)本身之外,還可以指定返回類型靡馁。 當返回類型沒有給出它默認為一個字符串和轉(zhuǎn)換將自動完成欲鹏。 對于任何其他返回類型,生成的對象必須匹配指定的類型臭墨。
Parameters:
- name - udf的名字
- f - python 函數(shù)
- returnType - 一個pyspark.sql.types.DataType對象
>>> from pyspark.sql.types import IntegerType
>>> sqlContext.registerFunction("stringLengthInt", lambda x: len(x), IntegerType())
>>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
[Row(stringLengthInt(test)=4)]
- registerJavaFunction(name, javaClassName, returnType=None)
注冊一個Java UDF赔嚎,以便在SQL語句中使用它。 - setConf(key, value)
設置給定的Spark SQL配置屬性胧弛。 - sql(sqlQuery)
- streams
- table(tableName)
- tableNames(dbName=None)
返回數(shù)據(jù)庫dbName中表的名稱列表尤误。 - tables(dbName=None)
返回包含給定數(shù)據(jù)庫中表的名稱的DataFrame。
如果未指定dbName结缚,則將使用當前數(shù)據(jù)庫袄膏。 - udf
- uncacheTable(tableName)
從內(nèi)存緩存中刪除指定的表。
class pyspark.sql.HiveContext(sparkContext, jhiveContext=None)
Spark SQL的一個變體掺冠,與存儲在Hive中的數(shù)據(jù)整合在一起。
Hive配置是從classpath的hive-site.xml中讀取的码党。 它支持同時運行SQL和HiveQL命令德崭。
- refreshTable(tableName)
class pyspark.sql.UDFRegistration(sqlContext)
用戶自定義函數(shù)注冊的包裝器。
- register(name, f, returnType=StringType)
將一個python函數(shù)(包括lambda函數(shù))注冊為UDF揖盘,以便在SQL語句中使用眉厨。
class pyspark.sql.DataFrame(jdf, sql_ctx)
分布式數(shù)據(jù)集合分組到命名的列。
DataFrame相當于Spark SQL中的關系表兽狭,可以使用SQLContext中的各種函數(shù)創(chuàng)建:
創(chuàng)建后憾股,可以使用DataFrame,Column中定義的各種domain-specific-language(DSL)函數(shù)對其進行操作箕慧。
- agg(*exprs)
在沒有組的情況下匯總整個DataFrame(df.groupBy.agg()的簡寫)服球。
>>> df.agg({"age": "max"}).collect()
[Row(max(age)=2)]
>>> from pyspark.sql import functions as f
>>> df.agg(f.min(df.age)).collect()
[Row(min(age)=1)]
- alias(alias)
返回一個帶有別名集的新DataFrame。 - approxQuantile(col, probabilities, relativeError)
計算DataFrame的數(shù)值列的近似分位數(shù)颠焦。 - cache()
使用默認存儲級別(MEMORY_AND_DISK)存儲DataFrame斩熊。 - checkpoint(eager=True)
返回此數(shù)據(jù)集的檢查點版本。 檢查點可用于截斷此DataFrame的邏輯計劃伐庭,這在計劃可能呈指數(shù)增長的迭代算法中特別有用粉渠。 它將被保存到使用SparkContext.setCheckpointDir()設置的檢查點目錄內(nèi)的文件中分冈。 - coalesce(numPartitions)
返回具有完全numPartitions分區(qū)的新DataFrame。 - collect()
- columns
以列表形式返回所有列名稱霸株。
>>> df.columns
['name', 'age']
- corr(col1, col2, method=None)
以雙精度值計算DataFrame的兩列的相關性雕沉。 目前只支持Pearson Correlation Coefficient。 DataFrame.corr()和DataFrameStatFunctions.corr()是彼此的別名去件。
Parameters:
- col1 - 第一列的名稱
- col2 - 第二列的名稱
- method - 相關方法坡椒。 目前只支持“皮爾森”
- count()
返回此DataFrame中的行數(shù)。
>>> df.count()
2
- cov(col1, col2)
計算給定列的樣本協(xié)方差(由它們的名稱指定)作為雙精度值箫攀。 DataFrame.cov()和DataFrameStatFunctions.cov()是別名肠牲。 - createGlobalTempView(name)
使用此DataFrame創(chuàng)建全局臨時視圖。
這個臨時視圖的生命周期與這個Spark應用程序有關靴跛。 如果視圖名稱已經(jīng)存在于目錄中缀雳,則拋出TempTableAlreadyExistsException。 - createOrReplaceGlobalTempView(name)
使用給定名稱創(chuàng)建或替換全局臨時視圖梢睛。
這個臨時視圖的生命周期與這個Spark應用程序有關肥印。 - createOrReplaceTempView(name)
使用此DataFrame創(chuàng)建或替換本地臨時視圖。
此臨時表的生命周期與用于創(chuàng)建此DataFrame的SparkSession綁定绝葡。 - createTempView(name)
使用此DataFrame創(chuàng)建本地臨時視圖深碱。
此臨時表的生命周期與用于創(chuàng)建此DataFrame的SparkSession綁定。如果視圖名稱已經(jīng)存在于目錄中藏畅,拋出TempTableAlreadyExistsException敷硅。 - crossJoin(other)
用另一個DataFrame相互作用返回笛卡爾積。
>>> df1 = spark.createDataFrame([("Alice", 1), ("Bob", 5)], ["name", "age"])
>>> df2 = spark.createDataFrame([("Alice", 66), ("Bob", 88)], ["name", "height"])
>>> df1.select(["name", "age"]).collect()
[Row(name=u'Alice', age=1), Row(name=u'Bob', age=5)]
>>> df2.select(["name", "height"]).collect()
[Row(name=u'Alice', height=66), Row(name=u'Bob', height=88)]
>>> df1.crossJoin(df2.select("height")).select("age", "name", "height").collect()
[Row(age=1, name=u'Alice', height=66), Row(age=1, name=u'Alice', height=88), Row(age=5, name=u'Bob', height=66), Row(age=5, name=u'Bob', height=88)]
- crosstab(col1, col2)
- cube(*cols)
使用指定的列為當前的DataFrame創(chuàng)建一個多維數(shù)據(jù)集愉阎,所以我們可以對它們進行聚合绞蹦。
>>> df.cube("name", df.age).count().orderBy("name", "age").show()
+----+----+-----+
|name| age|count|
+----+----+-----+
|null|null| 2|
|null| 1| 1|
|null| 2| 1|
| Bob|null| 1|
| Bob| 2| 1|
|name|null| 1|
|name| 1| 1|
+----+----+-----+
- describe(*cols)
計算數(shù)字和字符串列的統(tǒng)計信息。
這包括count榜旦,mean幽七,stddev,min和max溅呢。 如果未給出具體的列名澡屡,則此函數(shù)計算所有數(shù)字或字符串列的統(tǒng)計信息。
>>> df.describe(["age"]).show()
+-------+------------------+
|summary| age|
+-------+------------------+
| count| 2|
| mean| 1.5|
| stddev|0.7071067811865476|
| min| 1|
| max| 2|
+-------+------------------+
- distinct()
返回包含此DataFrame中不相同行的新DataFrame咐旧。(去除相同的行) - drop(*cols)
返回刪除指定列的新DataFrame驶鹉。 如果模式不包含給定的列名,這是一個無意義操作铣墨。 - dropDuplicates(subset=None)
返回刪除重復行的新DataFrame梁厉,可選地僅考慮某些列。
>>> df3 = sc.parallelize([Row(name='Alice', age=5, height=80),Row(name='Alice', age=5, height=80),Row(name='Alice', age=10, height=80)]).toDF()
>>> df3.dropDuplicates().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 5| 80|Alice|
| 10| 80|Alice|
+---+------+-----+
- drop_duplicates(subset=None)
dropDuplicates()的別名。 - dropna(how='any', thresh=None, subset=None)
返回一個新的DataFrame词顾,省略含有空值的行八秃。 DataFrame.dropna()和DataFrameNaFunctions.drop()是彼此的別名。
Parameters:
- how - “any”或“all”肉盹。 如果“any”昔驱,如果它包含任何空值,則刪除一行上忍。 如果'all'骤肛,只有當所有的值都為null時才刪除一行。
- thresh -
- subset -
- dtypes
以列表形式返回所有列名稱及其數(shù)據(jù)類型窍蓝。
>>> df3.dtypes
[('age', 'bigint'), ('height', 'bigint'), ('name', 'string')]
- explain(extended=False)
打右傅摺(邏輯和物理)計劃到控制臺進行調(diào)試。
Parameters:
- extended - 布爾值吓笙,默認為False淑玫。 如果為False,則僅打印物理計劃面睛。
>>> df3.explain()
== Physical Plan ==
Scan ExistingRDD[age#277L,height#278L,name#279]
- fillna(value, subset=None)
替換空值絮蒿,na.fill()的別名。 DataFrame.fillna()和DataFrameNaFunctions.fill()是彼此的別名叁鉴。 - filter(condition)
使用給定的條件過濾行土涝。
where()是filter()的別名。
>>> df.filter(df.age > 1).collect()
[Row(name=u'Bob', age=2)]
>>> df.filter("age > 1").collect()
[Row(name=u'Bob', age=2)]
- first()
將第一行作為Row返回幌墓。
>>> df.first()
Row(name=u'name', age=1)
- foreach(f)
將f函數(shù)應用于此DataFrame的所有行但壮。
這是df.rdd.foreach()的簡寫。 - foreachPartition(f)
將f函數(shù)應用于此DataFrame的每個分區(qū)常侣。
這是df.rdd.foreachPartition()的簡寫蜡饵。 - freqItems(cols, support=None)
找到列的頻繁項,可能有誤報袭祟。 - groupBy(*cols)
使用指定的列對DataFrame進行分組,所以我們可以對它們進行聚合捞附。 有關所有可用的聚合函數(shù)巾乳,請參閱GroupedData。groupby()是groupBy()的別名鸟召。
>>> df.groupBy("name").agg({"age":"mean"}).collect()
[Row(name=u'name', avg(age)=1.0), Row(name=u'Bob', avg(age)=2.0)]
>>> df.groupBy(["name",df.age]).count().collect()
[Row(name=u'Bob', age=2, count=1), Row(name=u'name', age=1, count=1)]
- groupby(*cols)
- head(n=None)
返回前n行胆绊。 - hint(name, *parameters)
在當前的DataFrame上指定一些提示。 - intersect(other)
僅返回包含此frame和另一frame中的行的新DataFrame欧募。(兩者的交集) - isLocal()
如果collect()和take()方法可以在本地運行(沒有任何Spark執(zhí)行器)压状,則返回True。 - isStreaming
如果此Dataset包含一個或多個在到達時連續(xù)返回數(shù)據(jù)的源,則返回true种冬。 從流源讀取數(shù)據(jù)的數(shù)據(jù)集必須使用DataStreamWriter中的start()方法作為StreamingQuery執(zhí)行镣丑。 返回單個答案的方法(例如,count()或collect())將在存在流式源時引發(fā)AnalysisException娱两。 - join(other, on=None, how=None)
使用給定的連接表達式與另一個DataFrame進行連接莺匠。
Parameters:
- other -
- on - 連接列名稱的字符串,列名稱列表十兢,連接表達式(列)或列的列表趣竣。 如果on是一個字符串或者一個表示連接列名的字符串列表,那么這個列必須存在于兩邊旱物,并且執(zhí)行一個等連接遥缕。
- how - str, default inner. Must be one of: inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, and left_anti.
- limit(num)
將結(jié)果計數(shù)限制為指定的數(shù)字。 - na
返回一個DataFrameNaFunctions來處理缺失的值宵呛。 - orderBy(*cols, **kwargs)
返回按指定列排序的新DataFrame单匣。
>>> df1.orderBy(["name","age"],ascending=[0,1]).collect()
[Row(name=u'Bob', age=5), Row(name=u'Alice', age=1)]
- persist(storageLevel=StorageLevel(True, True, False, False, 1))
- printSchema()
以樹形結(jié)構打印schema。
>>> df1.printSchema()
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
- randomSplit(weights, seed=None)
用提供的權重隨機分割這個DataFrame烤蜕。 - rdd
將內(nèi)容作為行的pyspark.RDD返回封孙。 - registerTempTable(name)
使用給定名稱將此RDD注冊為臨時表。
此臨時表的生命周期與用于創(chuàng)建此DataFrame的SQLContext相關聯(lián)讽营。
>>> df1.registerTempTable("people")
>>> spark.sql("select * from people").collect()
[Row(name=u'Alice', age=1), Row(name=u'Bob', age=5)]
>>> spark.catalog.dropTampView("people")
- repartition(numPartitions, *cols)
返回由給定分區(qū)表達式分區(qū)的新DataFrame虎忌。 生成的DataFrame是hash分區(qū)的。
numPartitions可以是一個int來指定目標分區(qū)數(shù)量或一個Column橱鹏。 如果它是一個列混狠,它將被用作第一個分區(qū)列。 如果未指定匀哄,則使用默認的分區(qū)數(shù)量倔喂。 - replace(to_replace, value=None, subset=None)
返回一個新的DataFrame,用另一個值替換一個值糖荒。 DataFrame.replace()和DataFrameNaFunctions.replace()是彼此的別名杉辙。
>>> df1.replace(["Alice", "Bob"], ["A", "B"]).show()
+----+---+
|name|age|
+----+---+
| A| 1|
| B| 5|
+----+---+
- rollup(*cols)
使用指定的列為當前的DataFrame創(chuàng)建一個多維匯總,所以我們可以在它上運行聚合函數(shù)捶朵。 - sample(withReplacement, fraction, seed=None)
- sampleBy(col, fractions, seed=None)
- schema
以pyspark.sql.types.StructType的形式返回此DataFrame的schema蜘矢。
>>> df1.schema
StructType(List(StructField(name,StringType,true),StructField(age,LongType,true)))
- select(*cols)
投影一組表達式并返回一個新的DataFrame。
>>> df.select(df.name, (df.age + 10).alias("height")).show()
+----+------+
|name|height|
+----+------+
|name| 11|
| Bob| 12|
+----+------+
- selectExpr(*expr)
這是接受SQL表達式的select()的變體综看。 - show(n=20, truncate=True)
將前n行打印到控制臺品腹。 - sort(*cols, **kwargs)
返回按指定列排序的新DataFrame。 - sortWithinPartitions(*cols, **kwargs)
返回一個新的DataFrame红碑,每個分區(qū)按指定的列排序舞吭。 - stat
為統(tǒng)計函數(shù)返回一個DataFrameStatFunctions。 - storageLevel
獲取DataFrame的當前存儲級別。
>>> df1.storageLevel
StorageLevel(False, False, False, False, 1)
>>> df1.cache().storageLevel
StorageLevel(True, True, False, True, 1)
- subtract(other)
返回一個新的DataFrame羡鸥,它包含這個frame中的行蔑穴,但不包含在另一個frame中。 - take(num)
- toDF(*cols)
返回一個新的類:帶有新指定列名的DataFrame兄春。 - toJSON(use_unicode=True)
將DataFrame轉(zhuǎn)換為字符串的RDD澎剥。
每行都被轉(zhuǎn)換成一個JSON文檔作為返回的RDD中的一個元素。
>>> df1.toJSON().collect()
[u'{"name":"Alice","age":1}', u'{"name":"Bob","age":5}']
- toLocalIterator()
返回包含此DataFrame中所有行的迭代器赶舆。 迭代器將占用與此DataFrame中最大分區(qū)一樣多的內(nèi)存哑姚。 - toPandas()
以Pandas中的pandas.DataFrame的形式返回此DataFrame的內(nèi)容。 - union(other)
在這個和另一個frame中返回一個包含行聯(lián)合的新DataFrame芜茵。 - unpersist(blocking=False)
將DataFrame標記為非持久性叙量,并從內(nèi)存和磁盤中刪除所有的塊。 - where(condition)
與filter()相同九串。 - withColumn(colName, col)
通過添加列或替換具有相同名稱的現(xiàn)有列來返回新的DataFrame绞佩。
>>> df1.withColumn("height", df1.age + 50).show()
+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice| 1| 51|
| Bob| 5| 55|
+-----+---+------+
- withColumnRenamed(existing, new)
通過重命名現(xiàn)有列來返回新的DataFrame。 如果模式不包含給定的列名猪钮,則這是一個無意義操作品山。 - withWatermark(eventTime, delayThreshold)
為此DataFrame定義事件時間水印。 一個水印跟蹤一個時間點烤低,在這個時間點之前肘交,我們假設沒有更晚的數(shù)據(jù)將要到達。 - write
用于將非流式DataFrame的內(nèi)容保存到外部存儲器的接口扑馁。 - writeStream
用于將流式DataFrame的內(nèi)容保存到外部存儲的接口涯呻。
class pyspark.sql.GroupedData(jgd, sql_ctx)
由DataFrame.groupBy()創(chuàng)建的DataFrame上的一組聚合方法。
- Note: 實驗階段
- agg(*exprs)
計算聚合并將結(jié)果作為DataFrame返回腻要。
可用的集合函數(shù)是avg复罐,max,min雄家,sum效诅,count。
如果exprs是從字符串到字符串的單個字典映射趟济,則key是要執(zhí)行聚合的列名乱投,并且該value是聚合函數(shù)名。
或者咙好,exprs也可以是聚合列表達式的列表篡腌。 - avg(*cols)
計算每個組的每個數(shù)字列的平均值褐荷。
mean()是avg()的別名勾效。 - count()
統(tǒng)計每個組的記錄數(shù)。 - max(*cols)
計算每個組的每個數(shù)字列的最大值。 - mean(*cols)
計算每個組的每個數(shù)字列的平均值层宫。 - min(*cols)
計算每個組的每個數(shù)字列的最小值杨伙。 - pivot(pivot_col, values=None)
旋轉(zhuǎn)當前[[DataFrame]]的列并執(zhí)行指定的聚合。 有兩個版本的透視函數(shù):一個需要調(diào)用者指定不同值的列表以進行透視萌腿,另一個不支持限匣。 后者更簡潔但效率更低,因為Spark需要首先在內(nèi)部計算不同值的列表毁菱。
Parameters:
- pivot_col - 要轉(zhuǎn)移的列的名稱米死。
- values - 將被轉(zhuǎn)換為輸出DataFrame中的列的值的列表。
- sum(*cols)
計算每個組的每個數(shù)字列的總和贮庞。
class pyspark.sql.Column(jc)
DataFrame中的一列峦筒。
- alias(*alias, **kwargs)
使用新名稱返回此列的別名。 - asc()
基于給定列名稱的升序返回一個排序表達式窗慎。 - astype(dataType)
astype()是cast()的別名物喷。 - between(lowerBound, upperBound)
一個布爾表達式,如果此表達式的值位于給定列之間遮斥,則該表達式的值為true峦失。
>>> df1.select(d1.name, df1.age.between(2, 4)).show()
+-----+---------------------------+
| name|((age >= 2) AND (age <= 4))|
+-----+---------------------------+
|Alice| false|
| Bob| false|
+-----+---------------------------+
5 .bitwiseAND(other)
二元運算符
- bitwiseOR(other)
二元運算符 - bitwiseXOR(other)
二元運算符 - cast(dataType)
將列轉(zhuǎn)換為dataType類型。(轉(zhuǎn)換某列的類型)
>>> df.select(df.name, df.age.cast("string").alias("ages")).collect()
[Row(name=u'Alice', ages=u'1'), Row(name=u'Bob', ages=u'5')]
- contains(other)
二元運算符 - desc()
基于給定列名稱的降序返回一個排序表達式术吗。 - endswith(other)
根據(jù)匹配的字符串結(jié)尾返回一個布爾列尉辑。
>>> df.filter(df.name.endswith("ce")).collect()
[Row(name=u'Alice', age=1)]
- getField(name)
在StructField中通過名稱獲取字段的表達式。 - getItem(key)
從列表中獲取位置序號的項藐翎,或者通過字典獲取項的表達式材蹬。 - isNotNull()
如果當前表達式為null,則為真吝镣。 通常結(jié)合DataFrame.filter()來選擇具有非空值的行堤器。
>>> df2 = sc.parallelize([Row(name=u'Tom', height=80), Row(name=u'Alice', height=None)]).toDF()
>>> df2.filter(df2.height.isNotNull()).collect()
[Row(height=80, name=u'Tom')]
- isNull()
如果當前表達式為null,則為真末贾。 通常與DataFrame.filter()結(jié)合來選擇具有空值的行闸溃。 - isin(*cols)
一個布爾表達式,如果此表達式的值由參數(shù)的評估值包含拱撵,則該值被評估為true辉川。
>>> df[df.age.isin([1,2,3])].collect()
[Row(name=u'Alice', age=1)]
- like(other)
返回基于SQL LIKE匹配的布爾列。
>>> df.filter(df.name.like("Al%")).collect()
[Row(name=u'Alice', age=1)]
- name(*alias, **kwargs)
name()是alias()的別名拴测。 - otherwise(value)
評估條件列表并返回多個可能的結(jié)果表達式之一乓旗。 如果不調(diào)用Column.otherwise(),則不匹配條件返回None集索。
>>> from pyspark.sql import functions as f
>>> df.select(df.name, f.when(df.age > 3, 1).otherwise(0)).show()
+-----+-------------------------------------+
| name|CASE WHEN (age > 3) THEN 1 ELSE 0 END|
+-----+-------------------------------------+
|Alice| 0|
| Bob| 1|
+-----+-------------------------------------+
- over(window)
定義一個窗口列屿愚。 - rlike(other)
基于正則表達式匹配返回一個布爾列汇跨。
>>> df.filter(df.name.rlike('ice$')).collect()
[Row(age=2, name=u'Alice')]
- startswith(other)
根據(jù)字符串匹配返回一個布爾列。 - substr(startPos, length)
返回一個列妆距,它是該列的一個子字符串穷遂。 - when(condition, value)
評估條件列表并返回多個可能的結(jié)果表達式之一。 如果不調(diào)用Column.otherwise()娱据,則不匹配條件返回None蚪黑。
>>> df.select(df.name, f.when(df.age > 4, 1).when(df.age < 3, -1).otherwise(0)).show()
+-----+------------------------------------------------------------+
| name|CASE WHEN (age > 4) THEN 1 WHEN (age < 3) THEN -1 ELSE 0 END|
+-----+------------------------------------------------------------+
|Alice| -1|
| Bob| 1|
+-----+------------------------------------------------------------+
class pyspark.sql.Row
DataFrame中的一行。 其中的字段可以被訪問:row.key
或者row[key]
中剩。
>>> from pyspark.sql import Row
>>> row = Row(name="Alice", age=1)
>>> row.name
'Alice'
>>> row["age"]
1
>>> row
Row(age=1, name='Alice')
>>> "name" in row
True
- asDict(recursive=False)
recursive - 將嵌套的行轉(zhuǎn)換為字典(默認為False)忌穿。
>>> row = Row(name="Alice",value=Row(age=1, height=88))
>>> row.asDict()
{'name': 'Alice', 'value': Row(age=1, height=88)}
>>> row.asDict(True)
{'name': 'Alice', 'value': {'age': 1, 'height': 88}}
class pyspark.sql.DataFrameNaFunctions(df)
在DataFrame中處理丟失的數(shù)據(jù)的功能。
- drop(how='any', thresh=None, subset=None)
返回一個新的DataFrame结啼,省略含有空值的行伴网。 DataFrame.dropna()和DataFrameNaFunctions.drop()是彼此的別名。 - fill(value, subset=None)
替換空值妆棒,na.fill()的別名澡腾。 DataFrame.fillna()和DataFrameNaFunctions.fill()是彼此的別名。 - replace(to_replace, value, subset=None)
class pyspark.sql.DataFrameStatFunctions(df)
DataFrame的統(tǒng)計函數(shù)的功能糕珊。
- approxQuantile(col, probabilities, relativeError)
計算DataFrame的數(shù)值列的近似分位數(shù)动分。 - corr(col1, col2, method=None)
以雙精度值計算DataFrame的兩列的相關性。 目前只支持Pearson Correlation Coefficient红选。 DataFrame.corr()和DataFrameStatFunctions.corr()是彼此的別名澜公。 - cov(col1, col2)
計算給定列的樣本協(xié)方差(由它們的名稱指定)作為雙精度值。 DataFrame.cov()和DataFrameStatFunctions.cov()是別名喇肋。 - crosstab(col1, col2)
計算給定列的成對頻率表坟乾。 也被稱為應急表。 - freqItems(cols, support=None)
找到列的頻繁項蝶防,可能有誤報甚侣。 - sampleBy(col, fractions, seed=None)
根據(jù)每層上給出的分數(shù)返回一個沒有更換的分層樣本。
class pyspark.sql.Window
用于在DataFrame中定義窗口的實用函數(shù)间学。
>>> # ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
>>> window = Window.orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow)
>>> # PARTITION BY country ORDER BY date RANGE BETWEEN 3 PRECEDING AND 3 FOLLOWING
>>> window = Window.orderBy("date").partitionBy("country").rangeBetween(-3, 3)
- Note: 實驗階段
currentRow = 0
static orderBy(cols) - 用定義的順序創(chuàng)建一個WindowSpec殷费。
static partitionBy(cols) - 用定義的分區(qū)創(chuàng)建一個WindowSpec。
static rangeBetween(start, end) -
static rowsBetween(start, end) -
unboundedFollowing = 9223372036854775807L
unboundedPreceding = -9223372036854775808L
class pyspark.sql.WindowSpec(jspec)
定義 partitioning, ordering, and frame的窗口規(guī)范低葫。
使用Window
中的靜態(tài)方法創(chuàng)建一個WindowSpec
详羡。
- orderBy(*cols)
定義WindowSpec中的排序列。 - partitionBy(*cols)
定義WindowSpec中的分區(qū)列嘿悬。 - rangeBetween(start, end)
定義從開始(包含)到結(jié)束(包含)的框架邊界实柠。 - rowsBetween(start, end)
定義從開始(包含)到結(jié)束(包含)的框架邊界。
class pyspark.sql.DataFrameReader(spark)
用于從外部存儲系統(tǒng)(例如文件系統(tǒng)善涨,鍵值存儲等)加載DataFrame的接口窒盐。 使用spark.read()來訪問它茶行。
- csv(path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, columnNameOfCorruptRecord=None, multiLine=None)
加載CSV文件并將結(jié)果作為DataFrame返回。
>>> df = spark.read.csv("file:/home/spark_sql_test.csv",header=True)
>>> df.show()
+---+-----+---+------+
| id| name|age|salary|
+---+-----+---+------+
| 1|Alice| 11| 111|
| 2| Bob| 22| 222|
+---+-----+---+------+
- format(source)
指定輸入數(shù)據(jù)源格式登钥。
>>> df = spark.read.format('json').load('python/test_support/sql/people.json')
>>> df.dtypes
[('age', 'bigint'), ('name', 'string')]
- jdbc(url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None, predicates=None, properties=None)
Parameters:
- url – a JDBC URL of the form jdbc:subprotocol:subname
- table – the name of the table
- json(path, schema=None, primitivesAsString=None, prefersDecimal=None, allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None, multiLine=None)
加載JSON文件并將結(jié)果作為DataFrame返回。
>>> df1 = spark.read.json('python/test_support/sql/people.json')
>>> df1.dtypes
[('age', 'bigint'), ('name', 'string')]
- load(path=None, format=None, schema=None, **options)
從數(shù)據(jù)源加載數(shù)據(jù)并將其作為:class DataFrame返回娶靡。 - option(key, value)
為基礎數(shù)據(jù)源添加一個輸入選項牧牢。
您可以設置以下選項來讀取文件:
- timeZone: 設置指示用于分析時間戳的時區(qū)的字符串
在JSON / CSV數(shù)據(jù)源或分區(qū)值。 如果沒有設置姿锭,它使用默認值塔鳍,會話本地時區(qū)。
- options(**options)
- orc(path)
加載ORC文件呻此,將結(jié)果作為DataFrame返回轮纫。 - parquet(*paths)
加載Parquet文件,將結(jié)果作為DataFrame返回焚鲜。 - schema(schema)
指定輸入模式掌唾。 - table(tableName)
以DataFrame的形式返回指定的表。 - text(paths)
加載文本文件并返回一個DataFrame忿磅,該DataFrame的架構以名為“value”的字符串列開頭糯彬,如果有的話,后跟分區(qū)列葱她。
class pyspark.sql.DataFrameWriter(df)
用于將DataFrame寫入外部存儲系統(tǒng)(例如文件系統(tǒng)撩扒,鍵值存儲等)的接口。 使用DataFrame.write()來訪問這個吨些。
- csv(path, mode=None, compression=None, sep=None, quote=None, escape=None, header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None, timestampFormat=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None)
以指定的路徑以CSV格式保存DataFrame的內(nèi)容搓谆。
>>> df1 = spark.createDataFrame([(3,"Tom",33,333),],["id","name","age","salary"])
>>> df1.show()
+---+----+---+------+
| id|name|age|salary|
+---+----+---+------+
| 3| Tom| 33| 333|
+---+----+---+------+
>>> df1.write.csv("file:/home/spark_sql_test",mode="overwrite",header=True)
- format(source)
指定基礎輸出數(shù)據(jù)源。
>>> df.write.format('json').save(os.path.join(tempfile.mkdtemp(), 'data'))
- insertInto(tableName, overwrite=False)
將DataFrame的內(nèi)容插入到指定的表中豪墅。
它要求類的架構:DataFrame與表的架構相同泉手。
可以覆蓋任何現(xiàn)有的數(shù)據(jù)。 - jdbc(url, table, mode=None, properties=None)
將DataFrame的內(nèi)容通過JDBC保存到外部數(shù)據(jù)庫表中偶器。 - json(path, mode=None, compression=None, dateFormat=None, timestampFormat=None)
將DataFrame的內(nèi)容以JSON格式保存在指定的路徑中螃诅。 - mode(saveMode)
指定數(shù)據(jù)或表已經(jīng)存在的行為。
選項包括:
- append:將此DataFrame的內(nèi)容附加到現(xiàn)有數(shù)據(jù)状囱。
- overwrite:覆蓋現(xiàn)有數(shù)據(jù)术裸。
- ignore: 如果數(shù)據(jù)已經(jīng)存在,靜默地忽略這個操作亭枷。
- error:如果數(shù)據(jù)已經(jīng)存在袭艺,則拋出異常。
- option(key, value)
- options(**options)
- orc(path, mode=None, partitionBy=None, compression=None)
以指定的路徑以ORC格式保存DataFrame的內(nèi)容叨粘。 - parquet(path, mode=None, partitionBy=None, compression=None)
將DataFrame的內(nèi)容以Parquet格式保存在指定的路徑中猾编。 - partitionBy(*cols)
按文件系統(tǒng)上的給定列對輸出進行分區(qū)瘤睹。
如果指定,則輸出將在文件系統(tǒng)上進行布局答倡,類似于Hive的分區(qū)方案轰传。 - save(path=None, format=None, mode=None, partitionBy=None, **options)
將DataFrame的內(nèi)容保存到數(shù)據(jù)源。
數(shù)據(jù)源由格式和一組選項指定瘪撇。 如果未指定format获茬,則將使用由spark.sql.sources.default配置的缺省數(shù)據(jù)源。 - saveAsTable(name, format=None, mode=None, partitionBy=None, **options)
將DataFrame的內(nèi)容保存為指定的表格倔既。 - text(path, compression=None)
將DataFrame的內(nèi)容保存在指定路徑的文本文件中恕曲。
pyspark.sql.types module
class pyspark.sql.types.DataType
數(shù)據(jù)類型的基類。
- fromInternal(obj)
將內(nèi)部SQL對象轉(zhuǎn)換為本地Python對象渤涌。 - json()
- jsonValue()
- needConversion()
這種類型是否需要在Python對象和內(nèi)部SQL對象之間進行轉(zhuǎn)換佩谣?
這用于避免ArrayType / MapType / StructType的不必要的轉(zhuǎn)換。 - simpleString()
- toInternal(obj)
將Python對象轉(zhuǎn)換為內(nèi)部SQL對象实蓬。 - classmethod typeName()
class pyspark.sql.types.NullType
空類型茸俭。
表示None的數(shù)據(jù)類型,用于無法推斷的類型安皱。
class pyspark.sql.types.StringType
字符串數(shù)據(jù)類型瓣履。
class pyspark.sql.types.BinaryType
二進制(字節(jié)數(shù)組)數(shù)據(jù)類型。
class pyspark.sql.types.BooleanType
布爾數(shù)據(jù)類型练俐。
class pyspark.sql.types.DateType
Date(datetime.date)數(shù)據(jù)類型袖迎。
EPOCH_ORDINAL = 719163
- fromInternal(v)
- needConversion()
- toInternal(d)
class pyspark.sql.types.TimestampType
時間戳(datetime.datetime)數(shù)據(jù)類型。
- fromInternal(ts)
- needConversion()
- toInternal(dt)
class pyspark.sql.types.DecimalType(precision=10, scale=0)
十進制(decimal.Decimal)數(shù)據(jù)類型腺晾。
- jsonValue()
- simpleString()
class pyspark.sql.types.DoubleType
雙數(shù)據(jù)類型燕锥,表示雙精度浮點數(shù)。
class pyspark.sql.types.FloatType
浮點數(shù)據(jù)類型悯蝉,表示單精度浮點數(shù)归形。
class pyspark.sql.types.ByteType
字節(jié)數(shù)據(jù)類型,即單個字節(jié)中的有符號整數(shù)鼻由。
- simpleString()
class pyspark.sql.types.IntegerType
Int數(shù)據(jù)類型暇榴,即有符號的32位整數(shù)。
- simpleString()
class pyspark.sql.types.LongType
長數(shù)據(jù)類型蕉世,即有符號的64位整數(shù)蔼紧。
- simpleString()
class pyspark.sql.types.ShortType
短數(shù)據(jù)類型,即有符號的16位整數(shù)狠轻。
- simpleString()
class pyspark.sql.types.ArrayType(elementType, containsNull=True)
數(shù)組數(shù)據(jù)類型奸例。
- fromInternal(obj)
- classmethod fromJson(json)
- jsonValue()
- needConversion()
- simpleString()
- toInternal(obj)
class pyspark.sql.types.MapType(keyType, valueType, valueContainsNull=True)
Map數(shù)據(jù)類型。
- fromInternal(obj)
- classmethod fromJson(json)
- jsonValue()
- needConversion()
- simpleString()
- toInternal(obj)
class pyspark.sql.types.StructField(name, dataType, nullable=True, metadata=None)
StructType中的一個字段向楼。
- fromInternal(obj)
- classmethod fromJson(json)
- jsonValue()
- needConversion()
- simpleString()
- toInternal(obj)
class pyspark.sql.types.StructType(fields=None)
結(jié)構類型查吊,由StructField的列表組成谐区。
這是表示一個行的數(shù)據(jù)類型。
- add(field, data_type=None, nullable=True, metadata=None)
通過添加新元素來構造一個StructType來定義模式逻卖。 該方法接受:
一個參數(shù)是一個StructField對象宋列;介于2到4之間的參數(shù)(name,data_type评也,nullable(可選)炼杖,metadata(可選))。data_type參數(shù)可以是String或DataType對象仇参。 - fromInternal(obj)
- classmethod fromJson(json)
- jsonValue()
- needConversion()
- simpleString()
- toInternal(obj)
pyspark.sql.functions module
內(nèi)建函數(shù)的集合
- pyspark.sql.functions.abs(col)
計算絕對值。 - pyspark.sql.functions.acos(col)
計算給定值的余弦逆; 返回的角度在0到π的范圍內(nèi)婆殿。 - pyspark.sql.functions.add_months(start, months)
返回開始后幾個月的日期
>>> from pyspark.sql import functions as f
>>> df = spark.createDataFrame([("2017-12-25",)],["d"])
>>> df.select(f.add_months(df.d,1).alias("d")).collect()
[Row(d=datetime.date(2018, 1, 25))]
- pyspark.sql.functions.approx_count_distinct(col, rsd=None)
返回col的近似不同計數(shù)的新列诈乒。 - pyspark.sql.functions.array(*cols)
創(chuàng)建一個新的數(shù)組列。 - pyspark.sql.functions.array_contains(col, value)
集合函數(shù):如果數(shù)組為null婆芦,則返回null;如果數(shù)組包含給定值怕磨,則返回true;否則返回false。 - pyspark.sql.functions.asc(col)
基于給定列名稱的升序返回一個排序表達式消约。 - pyspark.sql.functions.ascii(col)
計算字符串列的第一個字符的數(shù)值肠鲫。 - pyspark.sql.functions.asin(col)
計算給定值的正弦倒數(shù); 返回的角度在- π/ 2到π/ 2的范圍內(nèi)。 - pyspark.sql.functions.atan(col)
計算給定值的正切倒數(shù)或粮。 - pyspark.sql.functions.atan2(col1, col2)
返回直角坐標(x导饲,y)到極坐標(r,theta)轉(zhuǎn)換的角度theta氯材。 - pyspark.sql.functions.avg(col)
聚合函數(shù):返回組中的值的平均值渣锦。 - pyspark.sql.functions.base64(col)
計算二進制列的BASE64編碼并將其作為字符串列返回。 - pyspark.sql.functions.bin(col)
返回給定列的二進制值的字符串表示形式氢哮。 - pyspark.sql.functions.bitwiseNOT(col)
不按位計算袋毙。 - pyspark.sql.functions.broadcast(df)
將DataFrame標記為足夠小以用于廣播連接。 - pyspark.sql.functions.bround(col, scale=0)
如果scale> = 0冗尤,則使用HALF_EVEN舍入模式對給定值進行四舍五入以縮放小數(shù)點;如果scale <0听盖,則將其舍入到整數(shù)部分。 - pyspark.sql.functions.cbrt(col)
計算給定值的立方根裂七。 - pyspark.sql.functions.ceil(col)
計算給定值的上限皆看。 - pyspark.sql.functions.coalesce(*cols)
返回不為空的第一列。 - pyspark.sql.functions.col(col)
根據(jù)給定的列名返回一個列背零。 - pyspark.sql.functions.collect_list(col)
聚合函數(shù):返回重復對象的列表悬蔽。 - pyspark.sql.functions.collect_set(col)
聚合函數(shù):返回一組消除重復元素的對象。 - pyspark.sql.functions.column(col)
根據(jù)給定的列名返回一個列捉兴。 - pyspark.sql.functions.concat(*cols)
將多個輸入字符串列連接成一個字符串列蝎困。
>>> df = spark.createDataFrame([('abcd','123')], ['s', 'd'])
>>> df.select(f.concat(df.s, df.d).alias('s')).collect()
[Row(s=u'abcd123')]
- pyspark.sql.functions.concat_ws(sep, *cols)
使用給定的分隔符將多個輸入字符串列連接到一個字符串列中录语。
>>> df = spark.createDataFrame([('abcd','123')], ['s', 'd'])
>>> df.select(concat_ws('-', df.s, df.d).alias('s')).collect()
[Row(s=u'abcd-123')]
- pyspark.sql.functions.conv(col, fromBase, toBase)
將字符串列中的數(shù)字從一個進制轉(zhuǎn)換為另一個進制。
>>> df = spark.createDataFrame([("010101",)], ['n'])
>>> df.select(conv(df.n, 2, 16).alias('hex')).collect()
[Row(hex=u'15')]
- pyspark.sql.functions.corr(col1, col2)
返回col1和col2的Pearson相關系數(shù)的新列禾乘。 - pyspark.sql.functions.cos(col)
計算給定值的余弦澎埠。 - pyspark.sql.functions.cosh(col)
計算給定值的雙曲余弦。 - pyspark.sql.functions.count(col)
聚合函數(shù):返回組中的項目數(shù)量。 - pyspark.sql.functions.countDistinct(col, *cols)
返回col或col的不同計數(shù)的新列。 - pyspark.sql.functions.covar_pop(col1, col2)
返回col1和col2的總體協(xié)方差的新列盔粹。 - pyspark.sql.functions.covar_samp(col1, col2)
返回col1和col2的樣本協(xié)方差的新列椰弊。 - pyspark.sql.functions.crc32(col)
計算二進制列的循環(huán)冗余校驗值(CRC32),并將該值作為bigint返回萨咳。 - pyspark.sql.functions.create_map(*cols)
創(chuàng)建一個新的地圖列。 - pyspark.sql.functions.cume_dist()
窗口函數(shù):返回窗口分區(qū)內(nèi)值的累積分布,即在當前行下面的行的分數(shù)祥国。 - pyspark.sql.functions.current_date()
以日期列的形式返回當前日期。 - pyspark.sql.functions.current_timestamp()
將當前時間戳作為時間戳列返回晾腔。 - pyspark.sql.functions.date_add(start, days)
返回開始后幾天的日期
>>> df = spark.createDataFrame([('2015-04-08',)], ['d'])
>>> df.select(date_add(df.d, 1).alias('d')).collect()
[Row(d=datetime.date(2015, 4, 9))]
- pyspark.sql.functions.date_format(date, format)
將日期/時間戳/字符串轉(zhuǎn)換為由第二個參數(shù)給定日期格式指定格式的字符串值舌稀。
>>> df = spark.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(date_format('a', 'MM/dd/yyy').alias('date')).collect()
[Row(date=u'04/08/2015')]
- pyspark.sql.functions.date_sub(start, days)
返回開始前幾天的日期
>>> df = spark.createDataFrame([('2015-04-08',)], ['d'])
>>> df.select(date_sub(df.d, 1).alias('d')).collect()
[Row(d=datetime.date(2015, 4, 7))]
- pyspark.sql.functions.datediff(end, start)
返回從開始到結(jié)束的天數(shù)。
>>> df = spark.createDataFrame([('2015-04-08','2015-05-10')], ['d1', 'd2'])
>>> df.select(datediff(df.d2, df.d1).alias('diff')).collect()
[Row(diff=32)]
- pyspark.sql.functions.dayofmonth(col)
將給定日期的月份的日期解壓為整數(shù)灼擂。(一月中第幾天)
>>> df = spark.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(dayofmonth('a').alias('day')).collect()
[Row(day=8)]
- pyspark.sql.functions.dayofyear(col)
將給定日期的年份中的某一天提取為整數(shù)壁查。(一年中第幾天)
>>> df = spark.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(dayofyear('a').alias('day')).collect()
[Row(day=98)]
- pyspark.sql.functions.decode(col, charset)
Computes the first argument into a string from a binary using the provided character set (one of ‘US-ASCII’, ‘ISO-8859-1’, ‘UTF-8’, ‘UTF-16BE’, ‘UTF-16LE’, ‘UTF-16’). - pyspark.sql.functions.degrees(col)
將以弧度度量的角度轉(zhuǎn)換為以度數(shù)度量的近似等效角度。 - pyspark.sql.functions.dense_rank()
窗口函數(shù):返回窗口分區(qū)內(nèi)的行的等級剔应,沒有任何間隙 - pyspark.sql.functions.desc(col)
基于給定列名稱的降序返回一個排序表達式睡腿。 - pyspark.sql.functions.encode(col, charset)
Computes the first argument into a binary from a string using the provided character set (one of ‘US-ASCII’, ‘ISO-8859-1’, ‘UTF-8’, ‘UTF-16BE’, ‘UTF-16LE’, ‘UTF-16’). - pyspark.sql.functions.exp(col)
計算給定值的指數(shù)。 - pyspark.sql.functions.explode(col)
返回給定數(shù)組或映射中每個元素的新行峻贮。 - pyspark.sql.functions.expm1(col)
計算給定值的指數(shù)減1嫉到。 - pyspark.sql.functions.expr(str)
將表達式字符串解析到它表示的列中 - pyspark.sql.functions.factorial(col)
計算給定值的階乘。
>>> df = spark.createDataFrame([(5,)], ['n'])
>>> df.select(factorial(df.n).alias('f')).collect()
[Row(f=120)]
- pyspark.sql.functions.first(col, ignorenulls=False)
聚合函數(shù):返回組中的第一個值月洛。 - pyspark.sql.functions.floor(col)
計算給定值的floor何恶。 - pyspark.sql.functions.format_number(col, d)
將數(shù)字X格式化為像'#, - #嚼黔, - #.-'這樣的格式细层,用HALF_EVEN舍入模式四舍五入到小數(shù)點后的位置,然后以字符串形式返回結(jié)果唬涧。 - pyspark.sql.functions.format_string(format, *cols)
以printf-style格式化參數(shù)疫赎,并將結(jié)果作為字符串列返回。 - pyspark.sql.functions.from_json(col, schema, options={})
使用指定的模式將包含JSON字符串的列解析為[[StructType]]的[[StructType]]或[[ArrayType]]碎节。 在不可解析的字符串的情況下返回null捧搞。 - pyspark.sql.functions.from_unixtime(timestamp, format='yyyy-MM-dd HH:mm:ss')
將來自unix時期(1970-01-01 00:00:00 UTC)的秒數(shù)轉(zhuǎn)換為以給定格式表示當前系統(tǒng)時區(qū)中該時刻的時間戳的字符串。 - pyspark.sql.functions.from_utc_timestamp(timestamp, tz)
給定一個時間戳,對應于UTC中的某個特定時間胎撇,返回對應于給定時區(qū)中同一時間的另一個時間戳介粘。 - pyspark.sql.functions.get_json_object(col, path)
從基于指定的json路徑的json字符串中提取json對象,并返回提取的json對象的json字符串晚树。 如果輸入的json字符串無效姻采,它將返回null。 - pyspark.sql.functions.greatest(*cols)
返回列名稱列表的最大值爵憎,跳過空值慨亲。 該功能至少需要2個參數(shù)。 如果所有參數(shù)都為空宝鼓,它將返回null刑棵。 - pyspark.sql.functions.grouping(col)
聚合函數(shù):指示GROUP BY列表中的指定列是否被聚合,在結(jié)果集中返回1表示聚合或0表示未聚合愚铡。 - pyspark.sql.functions.grouping_id(*cols)
聚合函數(shù):返回分組的級別蛉签,等于(grouping(c1) << (n-1)) + (grouping(c2) << (n-2)) + ... + grouping(cn) - pyspark.sql.functions.hash(*cols)
計算給定列的哈希碼,并將結(jié)果作為int列返回茂附。
>>> spark.createDataFrame([('ABC',)], ['a']).select(hash('a').alias('hash')).collect()
[Row(hash=-757602832)]
- pyspark.sql.functions.hex(col)
計算給定列的十六進制值正蛙,可能是pyspark.sql.types.StringType督弓,pyspark.sql.types.BinaryType营曼,pyspark.sql.types.IntegerType或pyspark.sql.types.LongType。
>>> spark.createDataFrame([('ABC', 3)], ['a', 'b']).select(hex('a'), hex('b')).collect()
[Row(hex(a)=u'414243', hex(b)=u'3')]
- pyspark.sql.functions.hour(col)
將給定日期的小時數(shù)提取為整數(shù)愚隧。
>>> df = spark.createDataFrame([('2015-04-08 13:08:15',)], ['a'])
>>> df.select(hour('a').alias('hour')).collect()
[Row(hour=13)]
- pyspark.sql.functions.hypot(col1, col2)
計算sqrt(a ^ 2 + b ^ 2)蒂阱,無中間上溢或下溢。 - pyspark.sql.functions.initcap(col)
在句子中將每個單詞的第一個字母翻譯成大寫狂塘。
>>> spark.createDataFrame([('ab cd',)], ['a']).select(initcap("a").alias('v')).collect()
[Row(v=u'Ab Cd')]
- pyspark.sql.functions.input_file_name()
為當前Spark任務的文件名創(chuàng)建一個字符串列录煤。 - pyspark.sql.functions.instr(str, substr)
找到給定字符串中第一次出現(xiàn)substr列的位置。 如果其中任一參數(shù)為null荞胡,則返回null妈踊。
>>> df = spark.createDataFrame([('abcd',)], ['s',])
>>> df.select(instr(df.s, 'b').alias('s')).collect()
[Row(s=2)]
- pyspark.sql.functions.isnan(col)
如果列是NaN,則返回true的表達式泪漂。 - pyspark.sql.functions.isnull(col)
如果列為空廊营,則返回true的表達式。 - pyspark.sql.functions.json_tuple(col, *fields)
根據(jù)給定的字段名稱為json列創(chuàng)建一個新行萝勤。
Parameters:
- col - json格式的字符串列
- fields - 要提取的字段列表
- pyspark.sql.functions.kurtosis(col)
聚合函數(shù):返回組中值的峰度露筒。 - pyspark.sql.functions.lag(col, count=1, default=None)
窗口函數(shù):返回當前行之前的偏移行值;如果當前行之前的行數(shù)小于偏移量,則返回defaultValue敌卓。例如慎式,若偏移量為1,將返回窗口分區(qū)中任何給定點的前一行。
這相當于SQL中的LAG函數(shù)瘪吏。
Parameters:
- col - 列名或表達式的名稱
- count - 要擴展的行數(shù)
- default - 默認值
- pyspark.sql.functions.last(col, ignorenulls=False)
聚合函數(shù):返回組中的最后一個值癣防。
該函數(shù)默認返回它看到的最后一個值。 當ignoreNulls設置為true時肪虎,它將返回它看到的最后一個非null值劣砍。 如果所有值都為空,則返回null扇救。 - pyspark.sql.functions.last_day(date)
返回給定日期所屬月份的最后一天刑枝。
>>> df = spark.createDataFrame([('1997-02-10',)], ['d'])
>>> df.select(last_day(df.d).alias('date')).collect()
[Row(date=datetime.date(1997, 2, 28))]
- pyspark.sql.functions.lead(col, count=1, default=None)
Window函數(shù):返回當前行之后的偏移行值;如果當前行之后的行數(shù)小于偏移行,則返回defaultValue迅腔。 例如装畅,偏移量為1,將返回窗口分區(qū)中任意給定點的下一行沧烈。
這相當于SQL中的LEAD函數(shù)掠兄。 - pyspark.sql.functions.least(*cols)
返回多列中的最小值,跳過空值锌雀。 該功能至少需要2個參數(shù)蚂夕,及最少需要兩個列名。 如果所有參數(shù)都為空腋逆,它將返回null婿牍。
>>> df = spark.createDataFrame([(1, 4, 3)], ['a', 'b', 'c'])
>>> df.select(least(df.a, df.b, df.c).alias("least")).collect()
[Row(least=1)]
- pyspark.sql.functions.length(col)
計算字符串或二進制表達式的長度。
>>> spark.createDataFrame([('ABC',)], ['a']).select(length('a').alias('length')).collect()
[Row(length=3)]
- pyspark.sql.functions.levenshtein(left, right)
計算兩個給定字符串的Levenshtein距離惩歉。
Levenshtein距離(編輯距離)等脂,是指兩個字串之間,由一個轉(zhuǎn)成另一個所需的最少編輯操作次數(shù)撑蚌。具體可自行百度上遥。
>>> df0 = spark.createDataFrame([('kitten', 'sitting',)], ['l', 'r'])
>>> df0.select(levenshtein('l', 'r').alias('d')).collect()
[Row(d=3)]
- pyspark.sql.functions.lit(col)
創(chuàng)建一個字面值的列。 - pyspark.sql.functions.locate(substr, str, pos=1)
在str字符串列中找到在pos位置后面第一個出現(xiàn)substr的位置争涌。
- Note: 該位置不是從零開始的粉楚,而是從1開始的。 如果在str中找不到substr亮垫,則返回0模软。
Parameters: - substr - 要查找的字符串
- str - pyspark.sql.types.StringType的列
- pos - 起始位置
>>> df = spark.createDataFrame([('abcd',)], ['s',])
>>> df.select(locate('b', df.s, 1).alias('s')).collect()
[Row(s=2)]
- pyspark.sql.functions.log(arg1, arg2=None)
返回第二個參數(shù)的基于第一個參數(shù)的對數(shù)。
如果只有一個參數(shù)包警,那么這個參數(shù)就是自然對數(shù)撵摆。
>>> df.select(log(10.0, df.age).alias('ten')).rdd.map(lambda l: str(l.ten)[:7]).collect()
['0.30102', '0.69897']
>>> df.select(log(df.age).alias('e')).rdd.map(lambda l: str(l.e)[:7]).collect()
['0.69314', '1.60943']
- pyspark.sql.functions.log10(col)
計算給定一個數(shù)以10為底的對數(shù)。 - pyspark.sql.functions.log1p(col)
Computes the natural logarithm of the given value plus one. - pyspark.sql.functions.log2(col)
返回參數(shù)的基數(shù)為2的對數(shù)害晦。
>>> spark.createDataFrame([(4,)], ['a']).select(log2('a').alias('log2')).collect()
[Row(log2=2.0)]
- pyspark.sql.functions.lower(col)
將字符串列轉(zhuǎn)換為小寫特铝。 - pyspark.sql.functions.lpad(col, len, pad)
左填充到指定長度暑中。
>>> df = spark.createDataFrame([('abcd',)], ['s',])
>>> df.select(lpad(df.s, 6, '#').alias('s')).collect()
[Row(s=u'##abcd')]
- pyspark.sql.functions.ltrim(col)
去掉字符串左邊的空格。 - pyspark.sql.functions.max(col)
聚合函數(shù):返回組中表達式的最大值鲫剿。 - pyspark.sql.functions.md5(col)
計算某給定值的MD5值鳄逾,將該值作為32個字符的十六進制字符串返回。
>>> spark.createDataFrame([('ABC',)], ['a']).select(md5('a').alias('hash')).collect()
[Row(hash=u'902fbdd2b1df0c4f70b4a5d23525e932')]
- pyspark.sql.functions.mean(col)
聚合函數(shù):返回組中所有值的平均值灵莲。 - pyspark.sql.functions.min(col)
聚合函數(shù):返回組中表達式的最小值雕凹。 - pyspark.sql.functions.minute(col)
提取給定日期的分鐘數(shù)為整數(shù)。
>>> df = spark.createDataFrame([('2015-04-08 13:08:15',)], ['a'])
>>> df.select(minute('a').alias('minute')).collect()
[Row(minute=8)]
- pyspark.sql.functions.monotonically_increasing_id()
生成單調(diào)遞增的64位整數(shù)的列政冻。
生成的ID保證是單調(diào)遞增和唯一的枚抵,但不是連續(xù)的。 當前的實現(xiàn)將分區(qū)ID放在高31位明场,并將每個分區(qū)內(nèi)的記錄號放在低33位汽摹。 假設數(shù)據(jù)框的分區(qū)少于10億個,每個分區(qū)少于80億條記錄苦锨。
作為一個例子逼泣,考慮一個帶有兩個分區(qū)的DataFrame,每個分區(qū)有三個記錄舟舒。 該表達式將返回以下ID:0,1,2,8589934592(1L << 33)拉庶,8589934593,8589934594秃励。
>>> df0 = sc.parallelize(range(2), 2).mapPartitions(lambda x: [(1,), (2,), (3,)]).toDF(['col1'])
>>> df0.select(monotonically_increasing_id().alias('id')).collect()
[Row(id=0), Row(id=1), Row(id=2), Row(id=8589934592), Row(id=8589934593), Row(id=8589934594)]
- pyspark.sql.functions.month(col)
將給定日期的月份提取為整數(shù)氏仗。
>>> df = spark.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(month('a').alias('month')).collect()
[Row(month=4)]
- pyspark.sql.functions.months_between(date1, date2)
返回date1和date2之間的月數(shù)。 - pyspark.sql.functions.nanvl(col1, col2)
如果col1不是NaN莺治,則返回col1;如果col1是NaN廓鞠,則返回col2帚稠。
兩個輸入都應該是浮點列(DoubleType或FloatType)谣旁。
>>> df = spark.createDataFrame([(1.0, float('nan')), (float('nan'), 2.0)], ("a", "b"))
>>> df.select(nanvl("a", "b").alias("r1"), nanvl(df.a, df.b).alias("r2")).collect()
[Row(r1=1.0, r2=1.0), Row(r1=2.0, r2=2.0)]
- pyspark.sql.functions.next_day(date, dayOfWeek)
返回晚于日期列值的第一個日期。
星期幾參數(shù)不區(qū)分大小寫滋早,并接受:“Mon”, “Tue”, “Wed”, “Thu”, “Fri”, “Sat”, “Sun”.
>>> df = spark.createDataFrame([('2015-07-27',)], ['d'])
>>> df.select(next_day(df.d, 'Sun').alias('date')).collect()
[Row(date=datetime.date(2015, 8, 2))]
- pyspark.sql.functions.ntile(n)
窗口函數(shù):在有序的窗口分區(qū)中返回ntile組ID(從1到n)榄审。 例如,如果n是4杆麸,則第一季度行將得到值1搁进,第二季度將得到2,第三季度將得到3昔头,并且最后一個季度將得到4饼问。
這相當于SQL中的NTILE函數(shù)。
Parameters: n – an integer - pyspark.sql.functions.percent_rank()
窗口函數(shù):返回窗口分區(qū)內(nèi)的行的相對等級(即百分比)揭斧。 - pyspark.sql.functions.posexplode(col)
為給定數(shù)組或映射中的每個元素返回一個新行莱革。
>>> from pyspark.sql import Row
>>> eDF = spark.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
>>> eDF.select(posexplode(eDF.intlist)).collect()
[Row(pos=0, col=1), Row(pos=1, col=2), Row(pos=2, col=3)]
>>> eDF.select(posexplode(eDF.mapfield)).show()
+---+---+-----+
|pos|key|value|
+---+---+-----+
| 0| a| b|
+---+---+-----+
- pyspark.sql.functions.pow(col1, col2)
返回col1的col2次方的值。 - pyspark.sql.functions.quarter(col)
提取給定日期所屬的季度值。
>>> df = spark.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(quarter('a').alias('quarter')).collect()
[Row(quarter=2)]
- pyspark.sql.functions.radians(col)
將以度數(shù)度量的角度轉(zhuǎn)換為以弧度測量的近似等效角度盅视。 - pyspark.sql.functions.rand(seed=None)
從U [0.0捐名,1.0]生成一個具有獨立且分布相同(i.i.d.)樣本的隨機列。 - pyspark.sql.functions.randn(seed=None)
從標準正態(tài)分布生成具有獨立且分布相同(i.i.d.)樣本的列闹击。 - pyspark.sql.functions.rank()
窗口函數(shù):返回窗口分區(qū)內(nèi)的行的等級镶蹋。
rank和dense_rank之間的區(qū)別在于,當有tie時赏半,dense_rank在排序順序上沒有差距贺归。也就是說,如果你使用dense_rank排名比賽断箫,并且有三個人排在第二位牧氮,那么你會說這三個都排在第二位,下一個排在第三位瑰枫。排名會給我連續(xù)的數(shù)字踱葛,使排在第三位(關系之后)的人將登記為第五名。
這相當于SQL中的RANK函數(shù)光坝。 - pyspark.sql.functions.regexp_extract(str, pattern, idx)
從指定的字符串列中提取由Java正則表達式匹配的特定組尸诽。 如果正則表達式不匹配,或者指定的組不匹配盯另,則返回空字符串性含。
>>> df = spark.createDataFrame([('100-200',)], ['str'])
>>> df.select(regexp_extract('str', '(\d+)-(\d+)', 1).alias('d')).collect()
[Row(d=u'100')]
>>> df = spark.createDataFrame([('foo',)], ['str'])
>>> df.select(regexp_extract('str', '(\d+)', 1).alias('d')).collect()
[Row(d=u'')]
>>> df = spark.createDataFrame([('aaaac',)], ['str'])
>>> df.select(regexp_extract('str', '(a+)(b)?(c)', 2).alias('d')).collect()
[Row(d=u'')]
- pyspark.sql.functions.regexp_replace(str, pattern, replacement)
將與regexp匹配的指定字符串值的所有子字符串替換為rep。
>>> df = spark.createDataFrame([('100-200',)], ['str'])
>>> df.select(regexp_replace('str', '(\d+)', '+').alias('d')).collect()
[Row(d=u'+-+')]
- pyspark.sql.functions.repeat(col, n)
重復一個字符串列n次鸳惯,并將其作為新的字符串列返回商蕴。
>>> df = spark.createDataFrame([('ab',)], ['s',])
>>> df.select(repeat(df.s, 3).alias('s')).collect()
[Row(s=u'ababab')]
- pyspark.sql.functions.reverse(col)
反轉(zhuǎn)字符串列并將其作為新的字符串列返回。 - pyspark.sql.functions.rint(col)
返回值最接近參數(shù)的double值芝发,等于一個數(shù)學整數(shù)绪商。 - pyspark.sql.functions.round(col, scale=0)
如果scale> = 0,則使用HALF_UP舍入模式對給定值進行四舍五入以縮放小數(shù)點;如果scale <0辅鲸,則將其舍入到整數(shù)部分格郁。
>>> spark.createDataFrame([(2.5,)], ['a']).select(round('a', 0).alias('r')).collect()
[Row(r=3.0)]
- pyspark.sql.functions.row_number()
窗口函數(shù):返回窗口分區(qū)內(nèi)從1開始的連續(xù)編號。 - pyspark.sql.functions.rpad(col, len, pad)
右填充到指定長度独悴。
>>> df = spark.createDataFrame([('abcd',)], ['s',])
>>> df.select(rpad(df.s, 6, '#').alias('s')).collect()
[Row(s=u'abcd##')]
- pyspark.sql.functions.rtrim(col)
去除字符串右邊的空格例书。 - pyspark.sql.functions.second(col)
提取給定日期的秒數(shù)為整數(shù)。
>>> df = spark.createDataFrame([('2015-04-08 13:08:15',)], ['a'])
>>> df.select(second('a').alias('second')).collect()
[Row(second=15)]
- pyspark.sql.functions.sha1(col)
返回SHA-1的十六進制字符串結(jié)果刻炒。
>>> spark.createDataFrame([('ABC',)], ['a']).select(sha1('a').alias('hash')).collect()
[Row(hash=u'3c01bdbb26f358bab27f267924aa2c9a03fcfdb8')]
- pyspark.sql.functions.sha2(col, numBits)
返回SHA-2系列散列函數(shù)(SHA-224决采,SHA-256,SHA-384和SHA-512)的十六進制字符串結(jié)果坟奥。 numBits表示結(jié)果的所需位長度树瞭,其值必須為224,256,384,512或0(相當于256)暂幼。
>>> digests = df.select(sha2(df.name, 256).alias('s')).collect()
>>> digests[0]
Row(s=u'3bc51062973c458d5a6f2d8d64a023246354ad7e064b1e4e009ec8a0699a3043')
>>> digests[1]
Row(s=u'cd9fb1e148ccd8442e5aa74904cc73bf6fb54d1d54d333bd596aa9bb4bb4e961')
- pyspark.sql.functions.shiftLeft(col, numBits)
將給定值col左移numBits位。
>>> spark.createDataFrame([(21,)], ['a']).select(shiftLeft('a', 1).alias('r')).collect()
[Row(r=42)]
- pyspark.sql.functions.shiftRight(col, numBits)
將給定值col右移numBits位(Signed)移迫。 - pyspark.sql.functions.shiftRightUnsigned(col, numBits)
將給定值col右移numBits位(Unsigned)旺嬉。
>>> df = spark.createDataFrame([(-42,)], ['a'])
>>> df.select(shiftRightUnsigned('a', 1).alias('r')).collect()
[Row(r=9223372036854775787)]
- pyspark.sql.functions.signum(col)
計算給定值的正負號。 - pyspark.sql.functions.sin(col)
計算給定值的正弦值厨埋。 - pyspark.sql.functions.sinh(col)
計算給定值的雙曲正弦值邪媳。 - pyspark.sql.functions.size(col)
集合函數(shù):返回存儲在列中的數(shù)組或映射的長度。
>>> df = spark.createDataFrame([([1, 2, 3],),([1],),([],)], ['data'])
>>> df.select(size(df.data)).collect()
[Row(size(data)=3), Row(size(data)=1), Row(size(data)=0)]
- pyspark.sql.functions.skewness(col)
聚合函數(shù):返回組中值的偏度荡陷。 - pyspark.sql.functions.sort_array(col, asc=True)
Collection函數(shù):對輸入數(shù)組進行升序或降序排序雨效。
>>> df = spark.createDataFrame([([2, 1, 3],),([1],),([],)], ['data'])
>>> df.select(sort_array(df.data).alias('r')).collect()
[Row(r=[1, 2, 3]), Row(r=[1]), Row(r=[])]
>>> df.select(sort_array(df.data, asc=False).alias('r')).collect()
[Row(r=[3, 2, 1]), Row(r=[1]), Row(r=[])]
- pyspark.sql.functions.soundex(col)
返回字符串的SoundEx編碼。
>>> df = spark.createDataFrame([("Peters",),("Uhrbach",)], ['name'])
>>> df.select(soundex(df.name).alias("soundex")).collect()
[Row(soundex=u'P362'), Row(soundex=u'U612')]
- pyspark.sql.functions.spark_partition_id()
列所在的分區(qū)id
- Note: 這是不確定的废赞,因為它依賴于數(shù)據(jù)分區(qū)和任務調(diào)度徽龟。
>>> df.repartition(1).select(spark_partition_id().alias("pid")).collect()
[Row(pid=0), Row(pid=0)]
- pyspark.sql.functions.split(str, pattern)
切分字符串。
>>> df = spark.createDataFrame([('ab12cd',)], ['s',])
>>> df.select(split(df.s, '[0-9]+').alias('s')).collect()
[Row(s=[u'ab', u'cd'])]
- pyspark.sql.functions.sqrt(col)
計算指定浮點值的平方根唉地。 - pyspark.sql.functions.stddev(col)
聚合函數(shù):返回組中表達式的無偏樣本標準差据悔。 - pyspark.sql.functions.stddev_pop(col)
聚合函數(shù):返回一個組中表達式的總體標準差。 - pyspark.sql.functions.stddev_samp(col)
聚合函數(shù):返回組中表達式的無偏樣本標準差耘沼。 - pyspark.sql.functions.struct(*cols)
創(chuàng)建一個新的結(jié)構列极颓。
>>> df.select(struct('age', 'name').alias("struct")).collect()
[Row(struct=Row(age=2, name=u'Alice')), Row(struct=Row(age=5, name=u'Bob'))]
>>> df.select(struct([df.age, df.name]).alias("struct")).collect()
[Row(struct=Row(age=2, name=u'Alice')), Row(struct=Row(age=5, name=u'Bob'))]
- pyspark.sql.functions.substring(str, pos, len)
返回在str中從pos位置開始的長度為len值的substring。
>>> df = spark.createDataFrame([('abcd',)], ['s',])
>>> df.select(substring(df.s, 1, 2).alias('s')).collect()
[Row(s=u'ab')]
- pyspark.sql.functions.substring_index(str, delim, count)
在計數(shù)定界符delimiter之前群嗤,返回字符串str的子串菠隆。 如果count是正數(shù),則返回最后一個分隔符左邊的數(shù)字(從左數(shù)起)狂秘。 如果計數(shù)為負數(shù)骇径,則返回最后一個分隔符右邊的數(shù)字(從右數(shù)起)。 substring_index搜索delim時執(zhí)行區(qū)分大小寫的匹配者春。
>>> df = spark.createDataFrame([('a.b.c.d',)], ['s'])
>>> df.select(substring_index(df.s, '.', 2).alias('s')).collect()
[Row(s=u'a.b')]
>>> df.select(substring_index(df.s, '.', -3).alias('s')).collect()
[Row(s=u'b.c.d')]
- pyspark.sql.functions.sum(col)
聚合函數(shù):返回表達式中所有值的總和破衔。 - pyspark.sql.functions.sumDistinct(col)
聚合函數(shù):返回表達式中不同值的總和。 - pyspark.sql.functions.tan(col)
計算給定值的正切值碧查。 - pyspark.sql.functions.tanh(col)
計算給定值的雙曲正切运敢。 - pyspark.sql.functions.to_date(col, format=None)
使用可選的指定格式將pyspark.sql.types.StringType或pyspark.sql.types.TimestampType的列轉(zhuǎn)換為pyspark.sql.types.DateType校仑。默認格式是'yyyy-MM-dd'忠售。 根據(jù)SimpleDateFormats指定格式。
>>> df = spark.createDataFrame([('1997-02-28 10:30:00',)], ['t'])
>>> df.select(to_date(df.t).alias('date')).collect()
[Row(date=datetime.date(1997, 2, 28))]
>>> df = spark.createDataFrame([('1997-02-28 10:30:00',)], ['t'])
>>> df.select(to_date(df.t, 'yyyy-MM-dd HH:mm:ss').alias('date')).collect()
[Row(date=datetime.date(1997, 2, 28))]
- pyspark.sql.functions.to_json(col, options={})
將包含[[StructType]]的[[StructType]]或[[ArrayType]]的列轉(zhuǎn)換為JSON字符串迄沫。 在不支持的類型的情況下會引發(fā)異常稻扬。
Parameters:
- col - 包含結(jié)構體或結(jié)構體數(shù)組的列的名稱
- options - 控制轉(zhuǎn)換的選項。 接受與json數(shù)據(jù)源相同的選項
>>> from pyspark.sql import Row
>>> from pyspark.sql.types import *
>>> data = [(1, Row(name='Alice', age=2))]
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(to_json(df.value).alias("json")).collect()
[Row(json=u'{"age":2,"name":"Alice"}')]
>>> data = [(1, [Row(name='Alice', age=2), Row(name='Bob', age=3)])]
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(to_json(df.value).alias("json")).collect()
[Row(json=u'[{"age":2,"name":"Alice"},{"age":3,"name":"Bob"}]')]
- pyspark.sql.functions.to_timestamp(col, format=None)
使用可選的指定格式將pyspark.sql.types.StringType或pyspark.sql.types.TimestampType的列轉(zhuǎn)換為pyspark.sql.types.DateType羊瘩。 默認格式是'yyyy-MM-dd HH:mm:ss'泰佳。 根據(jù)SimpleDateFormats指定格式盼砍。
>>> df = spark.createDataFrame([('1997-02-28 10:30:00',)], ['t'])
>>> df.select(to_timestamp(df.t).alias('dt')).collect()
[Row(dt=datetime.datetime(1997, 2, 28, 10, 30))]
>>> df = spark.createDataFrame([('1997-02-28 10:30:00',)], ['t'])
>>> df.select(to_timestamp(df.t, 'yyyy-MM-dd HH:mm:ss').alias('dt')).collect()
[Row(dt=datetime.datetime(1997, 2, 28, 10, 30))]
- pyspark.sql.functions.to_utc_timestamp(timestamp, tz)
給定一個時間戳,它對應于給定時區(qū)中的特定時間逝她,返回對應于UTC中同一時間的另一個時間戳浇坐。
>>> df = spark.createDataFrame([('1997-02-28 10:30:00',)], ['t'])
>>> df.select(to_utc_timestamp(df.t, "PST").alias('t')).collect()
[Row(t=datetime.datetime(1997, 2, 28, 18, 30))]
- pyspark.sql.functions.translate(srcCol, matching, replace)
一個函數(shù)通過匹配中的一個字符來轉(zhuǎn)換srcCol中的任何字符。 替換中的字符對應于匹配的字符黔宛。 當字符串中的任何字符與匹配中的字符匹配時近刘,翻譯將發(fā)生。
>>> spark.createDataFrame([('translate',)], ['a']).select(translate('a', "rnlt", "123").alias('r')).collect()
[Row(r=u'1a2s3ae')]
- pyspark.sql.functions.trim(col)
去除字符串兩邊的空格臀晃。 - pyspark.sql.functions.trunc(date, format)
返回截斷到格式指定單位的日期觉渴。
Parameters: format – ‘year’, ‘YYYY’, ‘yy’ or ‘month’, ‘mon’, ‘mm’
>>> df = spark.createDataFrame([('1997-02-28',)], ['d'])
>>> df.select(trunc(df.d, 'year').alias('year')).collect()
[Row(year=datetime.date(1997, 1, 1))]
>>> df.select(trunc(df.d, 'mon').alias('month')).collect()
[Row(month=datetime.date(1997, 2, 1))]
- pyspark.sql.functions.udf(f=None, returnType=StringType)
創(chuàng)建一個表示用戶定義函數(shù)(UDF)的列表達式。
*Note: 用戶定義的函數(shù)必須是確定性的徽惋。 由于優(yōu)化案淋,可能會消除重復的調(diào)用,甚至可能會調(diào)用該函數(shù)的次數(shù)超過查詢中的次數(shù)险绘。
>>> from pyspark.sql.types import IntegerType
>>> slen = udf(lambda s: len(s), IntegerType())
>>> :udf
... def to_upper(s):
... if s is not None:
... return s.upper()
...
>>> :udf(returnType=IntegerType())
... def add_one(x):
... if x is not None:
... return x + 1
...
>>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
>>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")).show()
+----------+--------------+------------+
|slen(name)|to_upper(name)|add_one(age)|
+----------+--------------+------------+
| 8| JOHN DOE| 22|
+----------+--------------+------------+
- pyspark.sql.functions.unbase64(col)
解碼BASE64編碼的字符串列并將其作為二進制列返回踢京。 - pyspark.sql.functions.unhex(col)
十六進制的反轉(zhuǎn)。 將每對字符解釋為十六進制數(shù)字并轉(zhuǎn)換為數(shù)字的字節(jié)表示宦棺。
>>> spark.createDataFrame([('414243',)], ['a']).select(unhex('a')).collect()
[Row(unhex(a)=bytearray(b'ABC'))]
- pyspark.sql.functions.unix_timestamp(timestamp=None, format='yyyy-MM-dd HH:mm:ss')
使用默認時區(qū)和默認語言環(huán)境漱挚,將具有給定模式的時間字符串(默認為'yyyy-MM-dd HH:mm:ss')轉(zhuǎn)換為Unix時間戳(以秒為單位),如果失敗則返回null渺氧。
如果時間戳記為None旨涝,則返回當前時間戳。 - pyspark.sql.functions.upper(col)
將字符串列轉(zhuǎn)換為大寫侣背。 - pyspark.sql.functions.var_pop(col)
聚合函數(shù):返回組中值的總體方差白华。 - pyspark.sql.functions.var_samp(col)
聚合函數(shù):返回組中值的無偏差。 - pyspark.sql.functions.variance(col)
聚合函數(shù):返回組中值的總體方差贩耐。 - pyspark.sql.functions.weekofyear(col)
返回指定時間是一年中的第幾周弧腥。
>>> df = spark.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(weekofyear(df.a).alias('week')).collect()
[Row(week=15)]
- pyspark.sql.functions.when(condition, value)
評估條件列表并返回多個可能的結(jié)果表達式之一。 如果不調(diào)用Column.otherwise()潮太,則不匹配條件返回None管搪。(條件判斷)
>>> df.select(when(df['age'] == 2, 3).otherwise(4).alias("age")).collect()
[Row(age=3), Row(age=4)]
>>> df.select(when(df.age == 2, df.age + 1).alias("age")).collect()
[Row(age=3), Row(age=None)]
- pyspark.sql.functions.window(timeColumn, windowDuration, slideDuration=None, startTime=None)
>>> df = spark.createDataFrame([("2016-03-11 09:00:07", 1)]).toDF("date", "val")
>>> w = df.groupBy(window("date", "5 seconds")).agg(sum("val").alias("sum"))
>>> w.select(w.window.start.cast("string").alias("start"),
... w.window.end.cast("string").alias("end"), "sum").collect()
[Row(start=u'2016-03-11 09:00:05', end=u'2016-03-11 09:00:10', sum=1)]
- pyspark.sql.functions.year(col)
將給定日期的年份提取為整數(shù)。
>>> df = spark.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(year('a').alias('year')).collect()
[Row(year=2015)]
pyspark.sql.streaming module
class pyspark.sql.streaming.StreamingQuery(jsq)
當新數(shù)據(jù)到達時铡买,在后臺執(zhí)行的查詢的句柄更鲁。 所有這些方法都是線程安全的。
- awaitTermination(timeout=None)
通過query.stop()或異常來等待查詢的終止奇钞。如果由于異常而查詢終止澡为,則會拋出異常。 如果設置了超時景埃,則會在超時秒數(shù)內(nèi)返回查詢是否終止媒至。如果查詢已被終止顶别,則對該方法的所有后續(xù)調(diào)用將立即返回(如果查詢已由stop()終止),或者立即拋出異常(如果查詢已由異常終止)拒啰。如果由于一個異常使這個查詢已經(jīng)終止了驯绎,則會拋出StreamingQueryException。 - exception()
Returns: StreamingQueryException(如果查詢由異常終止)或None谋旦。 - explain(extended=False)
打犹跖瘛(邏輯和物理)計劃到控制臺進行調(diào)試。
Parameters: 布爾值蛤织,默認為False赴叹。 如果為False,則僅打印物理計劃指蚜。 - id
返回檢查點數(shù)據(jù)重新啟動時持續(xù)存在的此查詢的唯一標識乞巧。 也就是說,這個ID是在第一次啟動查詢時生成的摊鸡,每次從檢查點數(shù)據(jù)重新啟動時都會一樣绽媒。 Spark群集中只能有一個查詢具有相同的激活碼。 另請參閱runId免猾。 - isActive
此流式查詢當前是否處于活動狀態(tài)是辕。 - lastProgress
返回此流式查詢的最新StreamingQueryProgress更新;如果沒有進度更新,則返回None:return:一個映射猎提。 - name
給此查詢起一個名字获三,如果未指定,則返回null锨苏。 這個名稱可以在org.apache.spark.sql.streaming.DataStreamWriter中指定為dataframe.writeStream.queryName(“query”).start()疙教。 該名稱(如果已設置)在所有活動查詢中必須唯一。 - processAllAvailable()
阻塞直到源中的所有可用數(shù)據(jù)都被處理并提交到接收器伞租。 此方法用于測試贞谓。
- Note: 在連續(xù)到達數(shù)據(jù)的情況下,這種方法可能永遠阻塞葵诈。 另外份帐,只有在調(diào)用之前辐啄,這個方法才被保證阻塞焕参,直到數(shù)據(jù)已經(jīng)同步地將數(shù)據(jù)附加到流源霞势。 (即getOffset必須立即反映添加)。
- recentProgress
返回此查詢的最新[[StreamingQueryProgress]]更新數(shù)組徊都。 為每個流保留的進度更新數(shù)由Spark會話配置spark.sql.streaming.numRecentProgressUpdates進行配置沪斟。 - runId
返回此查詢的唯一標識,該標識在重新啟動時不會保留暇矫。 也就是說主之,每個啟動(或從檢查點重新啟動)的查詢將具有不同的runId。 - status
返回查詢的當前狀態(tài)李根。 - stop()
停止此流式查詢槽奕。
class pyspark.sql.streaming.StreamingQueryManager(jsqm)
一個來管理所有的StreamingQuery StreamingQueries活動的類。
- active
返回與此SQLContext關聯(lián)的活動查詢的列表房轿。 - awaitAnyTermination(timeout=None)
等到相關SQLContext的任何查詢自上下文創(chuàng)建以來粤攒,或者自調(diào)用resetTerminated()以來已終止。 如果有任何查詢由于異常而終止囱持,那么異常將被拋出夯接。 如果設置了超時,則會在超時秒數(shù)內(nèi)返回查詢是否終止纷妆。 - get(id)
返回來自此SQLContext的活動查詢盔几,或者如果具有此名稱的活動查詢不存在,則會拋出異常掩幢。 - resetTerminated()
忘記過去已終止的查詢逊拍,以便awaitAnyTermination()可以再次用于等待新的終止。
class pyspark.sql.streaming.DataStreamReader(spark)
用于從外部存儲系統(tǒng)(例如文件系統(tǒng)际邻,鍵值存儲等)加載流式DataFrame的接口芯丧。 使用spark.readStream()來訪問它。
- csv(path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, columnNameOfCorruptRecord=None, multiLine=None)
加載CSV文件流并將結(jié)果作為DataFrame返回世曾。 - format(source)
指定輸入數(shù)據(jù)源格式缨恒。
Parameters: source - 字符串,數(shù)據(jù)源的名稱轮听,例如 'json'肿轨,'parquet'。
>>> s = spark.readStream.format("text")
- json(path, schema=None, primitivesAsString=None, prefersDecimal=None, allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None, multiLine=None)
加載JSON文件流并將結(jié)果作為DataFrame返回蕊程。 - load(path=None, format=None, schema=None, **options)
從數(shù)據(jù)源加載數(shù)據(jù)流并將其作為:class DataFrame 返回椒袍。 - option(key, value)
為基礎數(shù)據(jù)源添加一個輸入選項。 - options(**options)
為底層數(shù)據(jù)源添加多個輸入選項藻茂。 - parquet(path)
加載Parquet文件流驹暑,將結(jié)果作為DataFrame返回。 - schema(schema)
指定輸入模式辨赐。
某些數(shù)據(jù)源(例如JSON)可以從數(shù)據(jù)自動推斷輸入模式优俘。 通過在這里指定模式,底層數(shù)據(jù)源可以跳過模式推斷步驟掀序,從而加速數(shù)據(jù)加載帆焕。
Parameters: schema – 一個pyspark.sql.types.StructType 對象。 - text(path)
加載一個文本文件流并返回一個DataFrame,其架構以一個名為“value”的字符串列開始叶雹,如果有的話财饥,后跟分區(qū)列。
文本文件中的每一行都是生成的DataFrame中的新行折晦。
class pyspark.sql.streaming.DataStreamWriter(df)
用于將流式DataFrame寫入外部存儲系統(tǒng)(例如文件系統(tǒng)钥星,鍵值存儲等)的接口。 使用DataFrame.writeStream()來訪問這個满着。
- format(source)
指定基礎輸出數(shù)據(jù)源谦炒。 - option(key, value)
添加一個底層數(shù)據(jù)源的輸出選項。 - options(**options)
為底層數(shù)據(jù)源添加多個輸出選項风喇。 - outputMode(outputMode)
指定如何將DataFrame/Dataset流數(shù)據(jù)寫入流式接收器宁改。
Options include:
- append - 只有DataFrame / Dataset流數(shù)據(jù)中的新行才會寫入接收器
- complete - DataFrame / Dataset流中的所有行將在每次更新時寫入接收器
- update - 每當有更新時,只有在DataFrame / Dataset流數(shù)據(jù)中更新的行才會被寫入接收器魂莫。 如果查詢不包含聚合还蹲,它將相當于追加模式。
writer = sdf.writeStream.outputMode('append')
- partitionBy(*cols)
按文件系統(tǒng)上的給定列對輸出進行分區(qū)豁鲤。
如果指定秽誊,則輸出將在文件系統(tǒng)上進行布局,類似于Hive的分區(qū)方案琳骡。 - queryName(queryName)
指定可以用start()啟動的StreamingQuery的名稱锅论。 該名稱在相關聯(lián)的SparkSession中的所有當前活動查詢中必須是唯一的。 - start(path=None, format=None, outputMode=None, partitionBy=None, queryName=None, **options)
將DataFrame的內(nèi)容流式傳輸?shù)綌?shù)據(jù)源楣号。
數(shù)據(jù)源由格式和一組選項指定最易。 如果未指定format,則將使用由spark.sql.sources.default配置的缺省數(shù)據(jù)源炫狱。 - trigger(*args, **kwargs)
設置流查詢的觸發(fā)器藻懒。 如果沒有設置,它將盡可能快地運行查詢视译,這相當于將觸發(fā)器設置為processingTime ='0秒'嬉荆。
Parameters: processingTime – a processing time interval as a string, e.g. ‘5 seconds’, ‘1 minute’.
>>> # trigger the query for execution every 5 seconds
>>> writer = sdf.writeStream.trigger(processingTime='5 seconds')
>>> # trigger the query for just once batch of data
>>> writer = sdf.writeStream.trigger(once=True)