Spark Python API Docs(part two)

pyspark.sql module

Module context

Spark SQL和DataFrames中的重要類:

  • pyspark.sql.SparkSession - DataFrame和SQL功能的主要入口點刽宪。
  • pyspark.sql.DataFrame - 分布式數(shù)據(jù)集合分組到命名的列。
  • pyspark.sql.Column - DataFrame中的列表達式腌乡。
  • pyspark.sql.Row - DataFrame中的一行數(shù)據(jù)苗缩。
  • pyspark.sql.GroupedData - 由DataFrame.groupBy()返回的聚合方法怯屉。
  • pyspark.sql.DataFrameNaFunctions - 處理缺失數(shù)據(jù)的方法(空值)急凰。
  • pyspark.sql.DataFrameStatFunctions - 統(tǒng)計功能的方法正压。
  • pyspark.sql.functions - 可用于DataFrame的內(nèi)置函數(shù)列表乍炉。
  • pyspark.sql.types - 可用的數(shù)據(jù)類型列表馋贤。
  • pyspark.sql.Window - 用于處理窗口函數(shù)赞别。

class pyspark.sql.SparkSession(sparkContext, jsparkSession=None)

使用Dataset和DataFrame API編程Spark的入口點。
SparkSession可用于創(chuàng)建DataFrame配乓,將DataFrame注冊為表格仿滔,在表格上執(zhí)行SQL,緩存表格以及讀取parquet文件犹芹。 要創(chuàng)建SparkSession崎页,請使用以下構建器模式:

spark = SparkSession.builder.master('spark://cn01:7077').appName("Word Count").getOrCreate()

class Builder

SparkSession的生成器。

  1. appName(name)
    為應用程序設置一個名稱腰埂,該名稱將顯示在Spark Web UI中窟她。
    如果沒有設置應用程序名稱插掂,則會隨機生成名稱榨呆。
  2. config(key=None, value=None, conf=None)
    設置一個配置選項倦炒。 使用此方法設置的選項會自動傳播到SparkConf和SparkSession自己的配置中。
    對于現(xiàn)有的SparkConf刁卜,請使用conf參數(shù)志电。
>>> from pyspark import SparkConf
>>> SparkSession.builder.config(conf=SparkConf())
<pyspark.sql.session.Builder object at 0x2ab7d2ab7650>
  1. enableHiveSupport()
    啟用Hive支持,包括連接到持久化的Hive Metastore蛔趴,支持Hive serdes和Hive用戶定義的功能挑辆。
  2. getOrCreate()
    獲取現(xiàn)有的SparkSession,或者孝情,如果沒有現(xiàn)有的SparkSession鱼蝉,則根據(jù)此構建器中設置的選項創(chuàng)建一個新的SparkSession。
    此方法首先檢查是否存在有效的全局默認SparkSession箫荡,如果是魁亦,則返回該值。 如果不存在有效的全局默認SparkSession羔挡,則該方法創(chuàng)建一個新的SparkSession洁奈,并將新創(chuàng)建的SparkSession指定為全局默認值间唉。
  3. master(master)
    設置要連接到的Spark master URL,例如本地運行的“l(fā)ocal”利术,本地運行4核的“l(fā)ocal [4]”或運行在Spark獨立群集上的“spark:// master:7077”呈野。
  1. SparkSession.builder = <pyspark.sql.session.Builder object at 0x7f51f134a110>

  2. SparkSession.catalog
    用戶可以通過它創(chuàng)建,刪除印叁,修改或查詢底層數(shù)據(jù)庫被冒,表格,函數(shù)等的接口

  3. SparkSession.conf
    Spark的運行時配置接口轮蜕。
    這是用戶可以獲取并設置與Spark SQL相關的所有Spark和Hadoop配置的接口昨悼。 獲取配置的值時,默認為基礎SparkContext中設置的值(如果有)跃洛。

  4. SparkSession.createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)
    從RDD幔戏,列表或pandas.DataFrame創(chuàng)建一個DataFrame。
    當schema是列名稱的列表時税课,每列的類型將從數(shù)據(jù)中推斷出來。
    當schema為None時痊剖,它將嘗試從數(shù)據(jù)中推斷出schema(列名和類型)韩玩,數(shù)據(jù)應該是Row的RDD,或者是namedtuple或者dict陆馁。
    當schema是pyspark.sql.types.DataType或數(shù)據(jù)類型字符串時找颓,它必須匹配真實數(shù)據(jù),否則將在運行時引發(fā)異常叮贩。 如果給定的schema不是pyspark.sql.types.StructType击狮,它將被封裝成一個pyspark.sql.types.StructType作為唯一的字段,字段名稱將是“值”益老,每個記錄也將被包裝成一個 元組彪蓬,可以稍后轉(zhuǎn)換為行。
    如果需要(schema)模式推斷捺萌,則使用samplingRatio來確定用于模式推斷的行的比例档冬。 如果samplingRatio為None,則使用第一行桃纯。
    Parameters:

  • data - 任何類型的SQL數(shù)據(jù)表示d的RDD(例如行酷誓,元組,int态坦,布爾等)或列表或pandas.DataFrame盐数。
  • schema - 一個pyspark.sql.types.DataType或一個數(shù)據(jù)類型字符串或列名稱列表,默認值為None伞梯。 數(shù)據(jù)類型字符串格式等于pyspark.sql.types.DataType.simpleString玫氢,除了頂層結(jié)構類型可以省略struct <>帚屉,原子類型使用typeName()作為它們的格式。 使用字節(jié)而不是tinyint為pyspark.sql.types.ByteType琐旁。 我們也可以使用int作為IntegerType的簡稱涮阔。
  • samplingRatio - 用于推斷行的樣本比例。
  • verifySchema - 根據(jù)模式驗證每一行的數(shù)據(jù)類型灰殴。
    Returns: DataFrame
# list to DataFrame
>>> l = [("name", 1), ("Bob", 2)]
>>> spark.createDataFrame(l, ["name", "age"]).collect()
[Row(name=u'name', age=1), Row(name=u'Bob', age=2)]
>>> d = [{'name': 'Alice', 'age': 1}, {"name": "Bob", "age": 2}]
>>> spark.createDataFrame(d, ["name", "age"]).collect()
[Row(name=1, age=u'Alice'), Row(name=2, age=u'Bob')]
#RDD to DataFrame
>>> spark.createDataFrame(sc.parallelize(l), ["name", "age"]).collect()
[Row(name=u'name', age=1), Row(name=u'Bob', age=2)]
#pandas.DataFrame to DataFrame
>>> spark.createDataFrame(pandas.DataFrame([[1, 2]])).collect()  
[Row(0=1, 1=2)]
>>> from pyspark.sql.types import *
>>> schema = StructType([StructField("name", StringType(), True),StructField("age", IntegerType(), True)])
>>> spark.createDataFrame(l, schema).collect()
[Row(name=u'name', age=1), Row(name=u'Bob', age=2)]
>>> spark.createDataFrame(l, "name: string, age: int").collect()
[Row(name=u'name', age=1), Row(name=u'Bob', age=2)]
>>> from pyspark.sql import Row
>>> Person = Row('name', 'age')
>>> spark.createDataFrame(l, Person).collect()
[Row(name=u'name', age=1), Row(name=u'Bob', age=2)]
  1. SparkSession.newSession()
    以新會話形式返回一個新的SparkSession敬特,它具有單獨的SQLConf,注冊的臨時視圖和UDF牺陶,但共享SparkContext和表緩存伟阔。
  2. SparkSession.range(start, end=None, step=1, numPartitions=None)
>>> spark.range(1,7,2).collect()
[Row(id=1), Row(id=3), Row(id=5)]
  1. SparkSession.read
    返回可用于讀取DataFrame中的數(shù)據(jù)的DataFrameReader。
  2. SparkSession.readStream
    返回一個DataStreamReader掰伸,它可以用來讀取數(shù)據(jù)流作為一個數(shù)據(jù)流DataFrame皱炉。
  3. SparkSession.sparkContext
    返回底層的SparkContext。
  4. SparkSession.sql(sqlQuery)
    返回表示給定查詢結(jié)果的DataFrame狮鸭。
>>> l
[('name', 1), ('Bob', 2)]
>>> df = spark.createDataFrame(l, ["name", "age"])
#使用dataFrame(df)創(chuàng)建或替換本地臨時視圖合搅。
>>> df.createOrReplaceTempView("table1")
>>> spark.sql("select * from table1").collect()
[Row(name=u'name', age=1), Row(name=u'Bob', age=2)]
  1. SparkSession.stop()
    停止底層的SparkContext。
  2. SparkSession.streams
    返回一個StreamingQueryManager歧蕉,它允許管理所有的StreamingQuery 灾部,在此上下文中激活的StreamingQueries。
  3. SparkSession.table(tableName)
    以DataFrame的形式返回指定的表惯退。
>>> spark.table("table1").collect()
[Row(name=u'name', age=1), Row(name=u'Bob', age=2)]
  1. SparkSession.udf
    返回UDF注冊的UDFRegistration赌髓。
  2. SparkSession.version
    運行此應用程序的Spark版本。

class pyspark.sql.SQLContext(sparkContext, sparkSession=None, jsqlContext=None)

在Spark 1.x中使用Spark中結(jié)構化數(shù)據(jù)(行和列)的入口點催跪。
從Spark 2.0開始锁蠕,它被SparkSession所取代。 但是懊蒸,為了向后兼容荣倾,我們在這里保留這個類。
可以使用SQLContext創(chuàng)建DataFrame骑丸,將DataFrame注冊為表逃呼,在表上執(zhí)行SQL,緩存表和讀取parquet文件者娱。

  1. cacheTable(tableName)
    在內(nèi)存中緩存指定的表抡笼。
  2. clearCache()
    從內(nèi)存緩存中刪除所有緩存的表。
  3. createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)
  4. createExternalTable(tableName, path=None, source=None, schema=None, **options)
    根據(jù)數(shù)據(jù)源中的數(shù)據(jù)集創(chuàng)建外部表黄鳍。
  5. dropTempTable(tableName)
    從目錄中刪除臨時表推姻。
  6. getConf(key, defaultValue=None)
    返回Spark SQL配置屬性中給定鍵的值。
  7. classmethod getOrCreate(sc)
    獲取現(xiàn)有的SQLContext或使用給定的SparkContext創(chuàng)建一個新的SQLContext框沟。
  8. newSession()
    將新的SQLContext作為新會話返回藏古,它具有單獨的SQLConf增炭,注冊的臨時視圖和UDF,但是共享SparkContext和表緩存拧晕。
  9. range(start, end=None, step=1, numPartitions=None)
  10. read
  11. readStream
  12. registerDataFrameAsTable(df, tableName)
    將給定的DataFrame注冊為目錄中的臨時表隙姿。
    臨時表僅在此SQLContext實例的生命周期中存在。
  13. registerFunction(name, f, returnType=StringType)
    將一個python函數(shù)(包括lambda函數(shù))注冊為UDF(自定義函數(shù))厂捞,以便在SQL語句中使用输玷。
    除了名稱和函數(shù)本身之外,還可以指定返回類型靡馁。 當返回類型沒有給出它默認為一個字符串和轉(zhuǎn)換將自動完成欲鹏。 對于任何其他返回類型,生成的對象必須匹配指定的類型臭墨。
    Parameters:
  • name - udf的名字
  • f - python 函數(shù)
  • returnType - 一個pyspark.sql.types.DataType對象
>>> from pyspark.sql.types import IntegerType
>>> sqlContext.registerFunction("stringLengthInt", lambda x: len(x), IntegerType())
>>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
[Row(stringLengthInt(test)=4)]
  1. registerJavaFunction(name, javaClassName, returnType=None)
    注冊一個Java UDF赔嚎,以便在SQL語句中使用它。
  2. setConf(key, value)
    設置給定的Spark SQL配置屬性胧弛。
  3. sql(sqlQuery)
  4. streams
  5. table(tableName)
  6. tableNames(dbName=None)
    返回數(shù)據(jù)庫dbName中表的名稱列表尤误。
  7. tables(dbName=None)
    返回包含給定數(shù)據(jù)庫中表的名稱的DataFrame。
    如果未指定dbName结缚,則將使用當前數(shù)據(jù)庫袄膏。
  8. udf
  9. uncacheTable(tableName)
    從內(nèi)存緩存中刪除指定的表。

class pyspark.sql.HiveContext(sparkContext, jhiveContext=None)

Spark SQL的一個變體掺冠,與存儲在Hive中的數(shù)據(jù)整合在一起。
Hive配置是從classpath的hive-site.xml中讀取的码党。 它支持同時運行SQL和HiveQL命令德崭。

  1. refreshTable(tableName)

class pyspark.sql.UDFRegistration(sqlContext)

用戶自定義函數(shù)注冊的包裝器。

  1. register(name, f, returnType=StringType)
    將一個python函數(shù)(包括lambda函數(shù))注冊為UDF揖盘,以便在SQL語句中使用眉厨。

class pyspark.sql.DataFrame(jdf, sql_ctx)

分布式數(shù)據(jù)集合分組到命名的列。
DataFrame相當于Spark SQL中的關系表兽狭,可以使用SQLContext中的各種函數(shù)創(chuàng)建:
創(chuàng)建后憾股,可以使用DataFrame,Column中定義的各種domain-specific-language(DSL)函數(shù)對其進行操作箕慧。

  1. agg(*exprs)
    在沒有組的情況下匯總整個DataFrame(df.groupBy.agg()的簡寫)服球。
>>> df.agg({"age": "max"}).collect()
[Row(max(age)=2)]
>>> from pyspark.sql import functions as f
>>> df.agg(f.min(df.age)).collect()
[Row(min(age)=1)]
  1. alias(alias)
    返回一個帶有別名集的新DataFrame。
  2. approxQuantile(col, probabilities, relativeError)
    計算DataFrame的數(shù)值列的近似分位數(shù)颠焦。
  3. cache()
    使用默認存儲級別(MEMORY_AND_DISK)存儲DataFrame斩熊。
  4. checkpoint(eager=True)
    返回此數(shù)據(jù)集的檢查點版本。 檢查點可用于截斷此DataFrame的邏輯計劃伐庭,這在計劃可能呈指數(shù)增長的迭代算法中特別有用粉渠。 它將被保存到使用SparkContext.setCheckpointDir()設置的檢查點目錄內(nèi)的文件中分冈。
  5. coalesce(numPartitions)
    返回具有完全numPartitions分區(qū)的新DataFrame。
  6. collect()
  7. columns
    以列表形式返回所有列名稱霸株。
>>> df.columns
['name', 'age']
  1. corr(col1, col2, method=None)
    以雙精度值計算DataFrame的兩列的相關性雕沉。 目前只支持Pearson Correlation Coefficient。 DataFrame.corr()和DataFrameStatFunctions.corr()是彼此的別名去件。
    Parameters:
  • col1 - 第一列的名稱
  • col2 - 第二列的名稱
  • method - 相關方法坡椒。 目前只支持“皮爾森”
  1. count()
    返回此DataFrame中的行數(shù)。
>>> df.count()
2
  1. cov(col1, col2)
    計算給定列的樣本協(xié)方差(由它們的名稱指定)作為雙精度值箫攀。 DataFrame.cov()和DataFrameStatFunctions.cov()是別名肠牲。
  2. createGlobalTempView(name)
    使用此DataFrame創(chuàng)建全局臨時視圖。
    這個臨時視圖的生命周期與這個Spark應用程序有關靴跛。 如果視圖名稱已經(jīng)存在于目錄中缀雳,則拋出TempTableAlreadyExistsException。
  3. createOrReplaceGlobalTempView(name)
    使用給定名稱創(chuàng)建或替換全局臨時視圖梢睛。
    這個臨時視圖的生命周期與這個Spark應用程序有關肥印。
  4. createOrReplaceTempView(name)
    使用此DataFrame創(chuàng)建或替換本地臨時視圖。
    此臨時表的生命周期與用于創(chuàng)建此DataFrame的SparkSession綁定绝葡。
  5. createTempView(name)
    使用此DataFrame創(chuàng)建本地臨時視圖深碱。
    此臨時表的生命周期與用于創(chuàng)建此DataFrame的SparkSession綁定。如果視圖名稱已經(jīng)存在于目錄中藏畅,拋出TempTableAlreadyExistsException敷硅。
  6. crossJoin(other)
    用另一個DataFrame相互作用返回笛卡爾積。
>>> df1 = spark.createDataFrame([("Alice", 1), ("Bob", 5)], ["name", "age"])
>>> df2 = spark.createDataFrame([("Alice", 66), ("Bob", 88)], ["name", "height"])
>>> df1.select(["name", "age"]).collect()
[Row(name=u'Alice', age=1), Row(name=u'Bob', age=5)]
>>> df2.select(["name", "height"]).collect()
[Row(name=u'Alice', height=66), Row(name=u'Bob', height=88)]
>>> df1.crossJoin(df2.select("height")).select("age", "name", "height").collect()
[Row(age=1, name=u'Alice', height=66), Row(age=1, name=u'Alice', height=88), Row(age=5, name=u'Bob', height=66), Row(age=5, name=u'Bob', height=88)]
  1. crosstab(col1, col2)
  2. cube(*cols)
    使用指定的列為當前的DataFrame創(chuàng)建一個多維數(shù)據(jù)集愉阎,所以我們可以對它們進行聚合绞蹦。
>>> df.cube("name", df.age).count().orderBy("name", "age").show()
+----+----+-----+                                                               
|name| age|count|
+----+----+-----+
|null|null|    2|
|null|   1|    1|
|null|   2|    1|
| Bob|null|    1|
| Bob|   2|    1|
|name|null|    1|
|name|   1|    1|
+----+----+-----+
  1. describe(*cols)
    計算數(shù)字和字符串列的統(tǒng)計信息。
    這包括count榜旦,mean幽七,stddev,min和max溅呢。 如果未給出具體的列名澡屡,則此函數(shù)計算所有數(shù)字或字符串列的統(tǒng)計信息。
>>> df.describe(["age"]).show()
+-------+------------------+
|summary|               age|
+-------+------------------+
|  count|                 2|
|   mean|               1.5|
| stddev|0.7071067811865476|
|    min|                 1|
|    max|                 2|
+-------+------------------+
  1. distinct()
    返回包含此DataFrame中不相同行的新DataFrame咐旧。(去除相同的行)
  2. drop(*cols)
    返回刪除指定列的新DataFrame驶鹉。 如果模式不包含給定的列名,這是一個無意義操作铣墨。
  3. dropDuplicates(subset=None)
    返回刪除重復行的新DataFrame梁厉,可選地僅考慮某些列。
>>> df3 = sc.parallelize([Row(name='Alice', age=5, height=80),Row(name='Alice', age=5, height=80),Row(name='Alice', age=10, height=80)]).toDF()
>>> df3.dropDuplicates().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
| 10|    80|Alice|
+---+------+-----+
  1. drop_duplicates(subset=None)
    dropDuplicates()的別名。
  2. dropna(how='any', thresh=None, subset=None)
    返回一個新的DataFrame词顾,省略含有空值的行八秃。 DataFrame.dropna()和DataFrameNaFunctions.drop()是彼此的別名。
    Parameters:
  • how - “any”或“all”肉盹。 如果“any”昔驱,如果它包含任何空值,則刪除一行上忍。 如果'all'骤肛,只有當所有的值都為null時才刪除一行。
  • thresh -
  • subset -
  1. dtypes
    以列表形式返回所有列名稱及其數(shù)據(jù)類型窍蓝。
>>> df3.dtypes
[('age', 'bigint'), ('height', 'bigint'), ('name', 'string')]
  1. explain(extended=False)
    打右傅摺(邏輯和物理)計劃到控制臺進行調(diào)試。
    Parameters:
  • extended - 布爾值吓笙,默認為False淑玫。 如果為False,則僅打印物理計劃面睛。
>>> df3.explain()
== Physical Plan ==
Scan ExistingRDD[age#277L,height#278L,name#279]
  1. fillna(value, subset=None)
    替換空值絮蒿,na.fill()的別名。 DataFrame.fillna()和DataFrameNaFunctions.fill()是彼此的別名叁鉴。
  2. filter(condition)
    使用給定的條件過濾行土涝。
    where()是filter()的別名。
>>> df.filter(df.age > 1).collect()
[Row(name=u'Bob', age=2)]
>>> df.filter("age > 1").collect()
[Row(name=u'Bob', age=2)]
  1. first()
    將第一行作為Row返回幌墓。
>>> df.first()
Row(name=u'name', age=1)
  1. foreach(f)
    將f函數(shù)應用于此DataFrame的所有行但壮。
    這是df.rdd.foreach()的簡寫。
  2. foreachPartition(f)
    將f函數(shù)應用于此DataFrame的每個分區(qū)常侣。
    這是df.rdd.foreachPartition()的簡寫蜡饵。
  3. freqItems(cols, support=None)
    找到列的頻繁項,可能有誤報袭祟。
  4. groupBy(*cols)
    使用指定的列對DataFrame進行分組,所以我們可以對它們進行聚合捞附。 有關所有可用的聚合函數(shù)巾乳,請參閱GroupedData。groupby()是groupBy()的別名鸟召。
>>> df.groupBy("name").agg({"age":"mean"}).collect()
[Row(name=u'name', avg(age)=1.0), Row(name=u'Bob', avg(age)=2.0)]
>>> df.groupBy(["name",df.age]).count().collect()
[Row(name=u'Bob', age=2, count=1), Row(name=u'name', age=1, count=1)]           
  1. groupby(*cols)
  2. head(n=None)
    返回前n行胆绊。
  3. hint(name, *parameters)
    在當前的DataFrame上指定一些提示。
  4. intersect(other)
    僅返回包含此frame和另一frame中的行的新DataFrame欧募。(兩者的交集)
  5. isLocal()
    如果collect()和take()方法可以在本地運行(沒有任何Spark執(zhí)行器)压状,則返回True。
  6. isStreaming
    如果此Dataset包含一個或多個在到達時連續(xù)返回數(shù)據(jù)的源,則返回true种冬。 從流源讀取數(shù)據(jù)的數(shù)據(jù)集必須使用DataStreamWriter中的start()方法作為StreamingQuery執(zhí)行镣丑。 返回單個答案的方法(例如,count()或collect())將在存在流式源時引發(fā)AnalysisException娱两。
  7. join(other, on=None, how=None)
    使用給定的連接表達式與另一個DataFrame進行連接莺匠。
    Parameters:
  • other -
  • on - 連接列名稱的字符串,列名稱列表十兢,連接表達式(列)或列的列表趣竣。 如果on是一個字符串或者一個表示連接列名的字符串列表,那么這個列必須存在于兩邊旱物,并且執(zhí)行一個等連接遥缕。
  • how - str, default inner. Must be one of: inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, and left_anti.
  1. limit(num)
    將結(jié)果計數(shù)限制為指定的數(shù)字。
  2. na
    返回一個DataFrameNaFunctions來處理缺失的值宵呛。
  3. orderBy(*cols, **kwargs)
    返回按指定列排序的新DataFrame单匣。
>>> df1.orderBy(["name","age"],ascending=[0,1]).collect()
[Row(name=u'Bob', age=5), Row(name=u'Alice', age=1)]
  1. persist(storageLevel=StorageLevel(True, True, False, False, 1))
  2. printSchema()
    以樹形結(jié)構打印schema。
>>> df1.printSchema()
root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
  1. randomSplit(weights, seed=None)
    用提供的權重隨機分割這個DataFrame烤蜕。
  2. rdd
    將內(nèi)容作為行的pyspark.RDD返回封孙。
  3. registerTempTable(name)
    使用給定名稱將此RDD注冊為臨時表。
    此臨時表的生命周期與用于創(chuàng)建此DataFrame的SQLContext相關聯(lián)讽营。
>>> df1.registerTempTable("people")
>>> spark.sql("select * from people").collect()
[Row(name=u'Alice', age=1), Row(name=u'Bob', age=5)]
>>> spark.catalog.dropTampView("people")
  1. repartition(numPartitions, *cols)
    返回由給定分區(qū)表達式分區(qū)的新DataFrame虎忌。 生成的DataFrame是hash分區(qū)的。
    numPartitions可以是一個int來指定目標分區(qū)數(shù)量或一個Column橱鹏。 如果它是一個列混狠,它將被用作第一個分區(qū)列。 如果未指定匀哄,則使用默認的分區(qū)數(shù)量倔喂。
  2. replace(to_replace, value=None, subset=None)
    返回一個新的DataFrame,用另一個值替換一個值糖荒。 DataFrame.replace()和DataFrameNaFunctions.replace()是彼此的別名杉辙。
>>> df1.replace(["Alice", "Bob"], ["A", "B"]).show()
+----+---+
|name|age|
+----+---+
|   A|  1|
|   B|  5|
+----+---+
  1. rollup(*cols)
    使用指定的列為當前的DataFrame創(chuàng)建一個多維匯總,所以我們可以在它上運行聚合函數(shù)捶朵。
  2. sample(withReplacement, fraction, seed=None)
  3. sampleBy(col, fractions, seed=None)
  4. schema
    以pyspark.sql.types.StructType的形式返回此DataFrame的schema蜘矢。
>>> df1.schema
StructType(List(StructField(name,StringType,true),StructField(age,LongType,true)))
  1. select(*cols)
    投影一組表達式并返回一個新的DataFrame。
>>> df.select(df.name, (df.age + 10).alias("height")).show()
+----+------+
|name|height|
+----+------+
|name|    11|
| Bob|    12|
+----+------+
  1. selectExpr(*expr)
    這是接受SQL表達式的select()的變體综看。
  2. show(n=20, truncate=True)
    將前n行打印到控制臺品腹。
  3. sort(*cols, **kwargs)
    返回按指定列排序的新DataFrame。
  4. sortWithinPartitions(*cols, **kwargs)
    返回一個新的DataFrame红碑,每個分區(qū)按指定的列排序舞吭。
  5. stat
    為統(tǒng)計函數(shù)返回一個DataFrameStatFunctions。
  6. storageLevel
    獲取DataFrame的當前存儲級別。
>>> df1.storageLevel
StorageLevel(False, False, False, False, 1)
>>> df1.cache().storageLevel
StorageLevel(True, True, False, True, 1)
  1. subtract(other)
    返回一個新的DataFrame羡鸥,它包含這個frame中的行蔑穴,但不包含在另一個frame中。
  2. take(num)
  3. toDF(*cols)
    返回一個新的類:帶有新指定列名的DataFrame兄春。
  4. toJSON(use_unicode=True)
    將DataFrame轉(zhuǎn)換為字符串的RDD澎剥。
    每行都被轉(zhuǎn)換成一個JSON文檔作為返回的RDD中的一個元素。
>>> df1.toJSON().collect()
[u'{"name":"Alice","age":1}', u'{"name":"Bob","age":5}']
  1. toLocalIterator()
    返回包含此DataFrame中所有行的迭代器赶舆。 迭代器將占用與此DataFrame中最大分區(qū)一樣多的內(nèi)存哑姚。
  2. toPandas()
    以Pandas中的pandas.DataFrame的形式返回此DataFrame的內(nèi)容。
  3. union(other)
    在這個和另一個frame中返回一個包含行聯(lián)合的新DataFrame芜茵。
  4. unpersist(blocking=False)
    將DataFrame標記為非持久性叙量,并從內(nèi)存和磁盤中刪除所有的塊。
  5. where(condition)
    與filter()相同九串。
  6. withColumn(colName, col)
    通過添加列或替換具有相同名稱的現(xiàn)有列來返回新的DataFrame绞佩。
>>> df1.withColumn("height", df1.age + 50).show()
+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice|  1|    51|
|  Bob|  5|    55|
+-----+---+------+
  1. withColumnRenamed(existing, new)
    通過重命名現(xiàn)有列來返回新的DataFrame。 如果模式不包含給定的列名猪钮,則這是一個無意義操作品山。
  2. withWatermark(eventTime, delayThreshold)
    為此DataFrame定義事件時間水印。 一個水印跟蹤一個時間點烤低,在這個時間點之前肘交,我們假設沒有更晚的數(shù)據(jù)將要到達。
  3. write
    用于將非流式DataFrame的內(nèi)容保存到外部存儲器的接口扑馁。
  4. writeStream
    用于將流式DataFrame的內(nèi)容保存到外部存儲的接口涯呻。

class pyspark.sql.GroupedData(jgd, sql_ctx)

由DataFrame.groupBy()創(chuàng)建的DataFrame上的一組聚合方法。

  • Note: 實驗階段
  1. agg(*exprs)
    計算聚合并將結(jié)果作為DataFrame返回腻要。
    可用的集合函數(shù)是avg复罐,max,min雄家,sum效诅,count。
    如果exprs是從字符串到字符串的單個字典映射趟济,則key是要執(zhí)行聚合的列名乱投,并且該value是聚合函數(shù)名。
    或者咙好,exprs也可以是聚合列表達式的列表篡腌。
  2. avg(*cols)
    計算每個組的每個數(shù)字列的平均值褐荷。
    mean()是avg()的別名勾效。
  3. count()
    統(tǒng)計每個組的記錄數(shù)。
  4. max(*cols)
    計算每個組的每個數(shù)字列的最大值。
  5. mean(*cols)
    計算每個組的每個數(shù)字列的平均值层宫。
  6. min(*cols)
    計算每個組的每個數(shù)字列的最小值杨伙。
  7. pivot(pivot_col, values=None)
    旋轉(zhuǎn)當前[[DataFrame]]的列并執(zhí)行指定的聚合。 有兩個版本的透視函數(shù):一個需要調(diào)用者指定不同值的列表以進行透視萌腿,另一個不支持限匣。 后者更簡潔但效率更低,因為Spark需要首先在內(nèi)部計算不同值的列表毁菱。
    Parameters:
  • pivot_col - 要轉(zhuǎn)移的列的名稱米死。
  • values - 將被轉(zhuǎn)換為輸出DataFrame中的列的值的列表。
  1. sum(*cols)
    計算每個組的每個數(shù)字列的總和贮庞。

class pyspark.sql.Column(jc)

DataFrame中的一列峦筒。

  1. alias(*alias, **kwargs)
    使用新名稱返回此列的別名。
  2. asc()
    基于給定列名稱的升序返回一個排序表達式窗慎。
  3. astype(dataType)
    astype()是cast()的別名物喷。
  4. between(lowerBound, upperBound)
    一個布爾表達式,如果此表達式的值位于給定列之間遮斥,則該表達式的值為true峦失。
>>> df1.select(d1.name, df1.age.between(2, 4)).show()
+-----+---------------------------+
| name|((age >= 2) AND (age <= 4))|
+-----+---------------------------+
|Alice|                      false|
|  Bob|                      false|
+-----+---------------------------+

5 .bitwiseAND(other)
二元運算符

  1. bitwiseOR(other)
    二元運算符
  2. bitwiseXOR(other)
    二元運算符
  3. cast(dataType)
    將列轉(zhuǎn)換為dataType類型。(轉(zhuǎn)換某列的類型)
>>> df.select(df.name, df.age.cast("string").alias("ages")).collect()
[Row(name=u'Alice', ages=u'1'), Row(name=u'Bob', ages=u'5')]
  1. contains(other)
    二元運算符
  2. desc()
    基于給定列名稱的降序返回一個排序表達式术吗。
  3. endswith(other)
    根據(jù)匹配的字符串結(jié)尾返回一個布爾列尉辑。
>>> df.filter(df.name.endswith("ce")).collect()
[Row(name=u'Alice', age=1)]
  1. getField(name)
    在StructField中通過名稱獲取字段的表達式。
  2. getItem(key)
    從列表中獲取位置序號的項藐翎,或者通過字典獲取項的表達式材蹬。
  3. isNotNull()
    如果當前表達式為null,則為真吝镣。 通常結(jié)合DataFrame.filter()來選擇具有非空值的行堤器。
>>> df2 = sc.parallelize([Row(name=u'Tom', height=80), Row(name=u'Alice', height=None)]).toDF()
>>> df2.filter(df2.height.isNotNull()).collect()
[Row(height=80, name=u'Tom')]
  1. isNull()
    如果當前表達式為null,則為真末贾。 通常與DataFrame.filter()結(jié)合來選擇具有空值的行闸溃。
  2. isin(*cols)
    一個布爾表達式,如果此表達式的值由參數(shù)的評估值包含拱撵,則該值被評估為true辉川。
>>> df[df.age.isin([1,2,3])].collect()
[Row(name=u'Alice', age=1)]
  1. like(other)
    返回基于SQL LIKE匹配的布爾列。
>>> df.filter(df.name.like("Al%")).collect()
[Row(name=u'Alice', age=1)]
  1. name(*alias, **kwargs)
    name()是alias()的別名拴测。
  2. otherwise(value)
    評估條件列表并返回多個可能的結(jié)果表達式之一乓旗。 如果不調(diào)用Column.otherwise(),則不匹配條件返回None集索。
>>> from pyspark.sql import functions as f
>>> df.select(df.name, f.when(df.age > 3, 1).otherwise(0)).show()
+-----+-------------------------------------+
| name|CASE WHEN (age > 3) THEN 1 ELSE 0 END|
+-----+-------------------------------------+
|Alice|                                    0|
|  Bob|                                    1|
+-----+-------------------------------------+
  1. over(window)
    定義一個窗口列屿愚。
  2. rlike(other)
    基于正則表達式匹配返回一個布爾列汇跨。
>>> df.filter(df.name.rlike('ice$')).collect()
[Row(age=2, name=u'Alice')]
  1. startswith(other)
    根據(jù)字符串匹配返回一個布爾列。
  2. substr(startPos, length)
    返回一個列妆距,它是該列的一個子字符串穷遂。
  3. when(condition, value)
    評估條件列表并返回多個可能的結(jié)果表達式之一。 如果不調(diào)用Column.otherwise()娱据,則不匹配條件返回None蚪黑。
>>> df.select(df.name, f.when(df.age > 4, 1).when(df.age < 3, -1).otherwise(0)).show()
+-----+------------------------------------------------------------+
| name|CASE WHEN (age > 4) THEN 1 WHEN (age < 3) THEN -1 ELSE 0 END|
+-----+------------------------------------------------------------+
|Alice|                                                          -1|
|  Bob|                                                           1|
+-----+------------------------------------------------------------+

class pyspark.sql.Row

DataFrame中的一行。 其中的字段可以被訪問:row.key或者row[key]中剩。

>>> from pyspark.sql import Row
>>> row = Row(name="Alice", age=1)
>>> row.name
'Alice'
>>> row["age"]
1
>>> row
Row(age=1, name='Alice')
>>> "name" in row
True
  1. asDict(recursive=False)
    recursive - 將嵌套的行轉(zhuǎn)換為字典(默認為False)忌穿。
>>> row = Row(name="Alice",value=Row(age=1, height=88))
>>> row.asDict()
{'name': 'Alice', 'value': Row(age=1, height=88)}
>>> row.asDict(True)
{'name': 'Alice', 'value': {'age': 1, 'height': 88}}

class pyspark.sql.DataFrameNaFunctions(df)

在DataFrame中處理丟失的數(shù)據(jù)的功能。

  1. drop(how='any', thresh=None, subset=None)
    返回一個新的DataFrame结啼,省略含有空值的行伴网。 DataFrame.dropna()和DataFrameNaFunctions.drop()是彼此的別名。
  2. fill(value, subset=None)
    替換空值妆棒,na.fill()的別名澡腾。 DataFrame.fillna()和DataFrameNaFunctions.fill()是彼此的別名。
  3. replace(to_replace, value, subset=None)

class pyspark.sql.DataFrameStatFunctions(df)

DataFrame的統(tǒng)計函數(shù)的功能糕珊。

  1. approxQuantile(col, probabilities, relativeError)
    計算DataFrame的數(shù)值列的近似分位數(shù)动分。
  2. corr(col1, col2, method=None)
    以雙精度值計算DataFrame的兩列的相關性。 目前只支持Pearson Correlation Coefficient红选。 DataFrame.corr()和DataFrameStatFunctions.corr()是彼此的別名澜公。
  3. cov(col1, col2)
    計算給定列的樣本協(xié)方差(由它們的名稱指定)作為雙精度值。 DataFrame.cov()和DataFrameStatFunctions.cov()是別名喇肋。
  4. crosstab(col1, col2)
    計算給定列的成對頻率表坟乾。 也被稱為應急表。
  5. freqItems(cols, support=None)
    找到列的頻繁項蝶防,可能有誤報甚侣。
  6. sampleBy(col, fractions, seed=None)
    根據(jù)每層上給出的分數(shù)返回一個沒有更換的分層樣本。

class pyspark.sql.Window

用于在DataFrame中定義窗口的實用函數(shù)间学。

>>> # ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
>>> window = Window.orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow)
>>> # PARTITION BY country ORDER BY date RANGE BETWEEN 3 PRECEDING AND 3 FOLLOWING
>>> window = Window.orderBy("date").partitionBy("country").rangeBetween(-3, 3)
  • Note: 實驗階段
    currentRow = 0
    static orderBy(cols) - 用定義的順序創(chuàng)建一個WindowSpec殷费。
    static partitionBy(
    cols) - 用定義的分區(qū)創(chuàng)建一個WindowSpec。
    static rangeBetween(start, end) -
    static rowsBetween(start, end) -
    unboundedFollowing = 9223372036854775807L
    unboundedPreceding = -9223372036854775808L

class pyspark.sql.WindowSpec(jspec)

定義 partitioning, ordering, and frame的窗口規(guī)范低葫。
使用Window中的靜態(tài)方法創(chuàng)建一個WindowSpec详羡。

  1. orderBy(*cols)
    定義WindowSpec中的排序列。
  2. partitionBy(*cols)
    定義WindowSpec中的分區(qū)列嘿悬。
  3. rangeBetween(start, end)
    定義從開始(包含)到結(jié)束(包含)的框架邊界实柠。
  4. rowsBetween(start, end)
    定義從開始(包含)到結(jié)束(包含)的框架邊界。

class pyspark.sql.DataFrameReader(spark)

用于從外部存儲系統(tǒng)(例如文件系統(tǒng)善涨,鍵值存儲等)加載DataFrame的接口窒盐。 使用spark.read()來訪問它茶行。

  1. csv(path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, columnNameOfCorruptRecord=None, multiLine=None)
    加載CSV文件并將結(jié)果作為DataFrame返回。
>>> df = spark.read.csv("file:/home/spark_sql_test.csv",header=True)
>>> df.show()
+---+-----+---+------+
| id| name|age|salary|
+---+-----+---+------+
|  1|Alice| 11|   111|
|  2|  Bob| 22|   222|
+---+-----+---+------+
  1. format(source)
    指定輸入數(shù)據(jù)源格式登钥。
>>> df = spark.read.format('json').load('python/test_support/sql/people.json')
>>> df.dtypes
[('age', 'bigint'), ('name', 'string')]
  1. jdbc(url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None, predicates=None, properties=None)
    Parameters:
  • url – a JDBC URL of the form jdbc:subprotocol:subname
  • table – the name of the table
  1. json(path, schema=None, primitivesAsString=None, prefersDecimal=None, allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None, multiLine=None)
    加載JSON文件并將結(jié)果作為DataFrame返回。
>>> df1 = spark.read.json('python/test_support/sql/people.json')
>>> df1.dtypes
[('age', 'bigint'), ('name', 'string')]
  1. load(path=None, format=None, schema=None, **options)
    從數(shù)據(jù)源加載數(shù)據(jù)并將其作為:class DataFrame返回娶靡。
  2. option(key, value)
    為基礎數(shù)據(jù)源添加一個輸入選項牧牢。
    您可以設置以下選項來讀取文件:
  • timeZone: 設置指示用于分析時間戳的時區(qū)的字符串
    在JSON / CSV數(shù)據(jù)源或分區(qū)值。 如果沒有設置姿锭,它使用默認值塔鳍,會話本地時區(qū)。
  1. options(**options)
  2. orc(path)
    加載ORC文件呻此,將結(jié)果作為DataFrame返回轮纫。
  3. parquet(*paths)
    加載Parquet文件,將結(jié)果作為DataFrame返回焚鲜。
  4. schema(schema)
    指定輸入模式掌唾。
  5. table(tableName)
    以DataFrame的形式返回指定的表。
  6. text(paths)
    加載文本文件并返回一個DataFrame忿磅,該DataFrame的架構以名為“value”的字符串列開頭糯彬,如果有的話,后跟分區(qū)列葱她。

class pyspark.sql.DataFrameWriter(df)

用于將DataFrame寫入外部存儲系統(tǒng)(例如文件系統(tǒng)撩扒,鍵值存儲等)的接口。 使用DataFrame.write()來訪問這個吨些。

  1. csv(path, mode=None, compression=None, sep=None, quote=None, escape=None, header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None, timestampFormat=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None)
    以指定的路徑以CSV格式保存DataFrame的內(nèi)容搓谆。
>>> df1 = spark.createDataFrame([(3,"Tom",33,333),],["id","name","age","salary"])
>>> df1.show()
+---+----+---+------+
| id|name|age|salary|
+---+----+---+------+
|  3| Tom| 33|   333|
+---+----+---+------+
>>> df1.write.csv("file:/home/spark_sql_test",mode="overwrite",header=True)
  1. format(source)
    指定基礎輸出數(shù)據(jù)源。
>>> df.write.format('json').save(os.path.join(tempfile.mkdtemp(), 'data'))
  1. insertInto(tableName, overwrite=False)
    將DataFrame的內(nèi)容插入到指定的表中豪墅。
    它要求類的架構:DataFrame與表的架構相同泉手。
    可以覆蓋任何現(xiàn)有的數(shù)據(jù)。
  2. jdbc(url, table, mode=None, properties=None)
    將DataFrame的內(nèi)容通過JDBC保存到外部數(shù)據(jù)庫表中偶器。
  3. json(path, mode=None, compression=None, dateFormat=None, timestampFormat=None)
    將DataFrame的內(nèi)容以JSON格式保存在指定的路徑中螃诅。
  4. mode(saveMode)
    指定數(shù)據(jù)或表已經(jīng)存在的行為。
    選項包括:
  • append:將此DataFrame的內(nèi)容附加到現(xiàn)有數(shù)據(jù)状囱。
  • overwrite:覆蓋現(xiàn)有數(shù)據(jù)术裸。
  • ignore: 如果數(shù)據(jù)已經(jīng)存在,靜默地忽略這個操作亭枷。
  • error:如果數(shù)據(jù)已經(jīng)存在袭艺,則拋出異常。
  1. option(key, value)
  2. options(**options)
  3. orc(path, mode=None, partitionBy=None, compression=None)
    以指定的路徑以ORC格式保存DataFrame的內(nèi)容叨粘。
  4. parquet(path, mode=None, partitionBy=None, compression=None)
    將DataFrame的內(nèi)容以Parquet格式保存在指定的路徑中猾编。
  5. partitionBy(*cols)
    按文件系統(tǒng)上的給定列對輸出進行分區(qū)瘤睹。
    如果指定,則輸出將在文件系統(tǒng)上進行布局答倡,類似于Hive的分區(qū)方案轰传。
  6. save(path=None, format=None, mode=None, partitionBy=None, **options)
    將DataFrame的內(nèi)容保存到數(shù)據(jù)源。
    數(shù)據(jù)源由格式和一組選項指定瘪撇。 如果未指定format获茬,則將使用由spark.sql.sources.default配置的缺省數(shù)據(jù)源。
  7. saveAsTable(name, format=None, mode=None, partitionBy=None, **options)
    將DataFrame的內(nèi)容保存為指定的表格倔既。
  8. text(path, compression=None)
    將DataFrame的內(nèi)容保存在指定路徑的文本文件中恕曲。

pyspark.sql.types module

class pyspark.sql.types.DataType

數(shù)據(jù)類型的基類。

  1. fromInternal(obj)
    將內(nèi)部SQL對象轉(zhuǎn)換為本地Python對象渤涌。
  2. json()
  3. jsonValue()
  4. needConversion()
    這種類型是否需要在Python對象和內(nèi)部SQL對象之間進行轉(zhuǎn)換佩谣?
    這用于避免ArrayType / MapType / StructType的不必要的轉(zhuǎn)換。
  5. simpleString()
  6. toInternal(obj)
    將Python對象轉(zhuǎn)換為內(nèi)部SQL對象实蓬。
  7. classmethod typeName()

class pyspark.sql.types.NullType

空類型茸俭。
表示None的數(shù)據(jù)類型,用于無法推斷的類型安皱。

class pyspark.sql.types.StringType

字符串數(shù)據(jù)類型瓣履。

class pyspark.sql.types.BinaryType

二進制(字節(jié)數(shù)組)數(shù)據(jù)類型。

class pyspark.sql.types.BooleanType

布爾數(shù)據(jù)類型练俐。

class pyspark.sql.types.DateType

Date(datetime.date)數(shù)據(jù)類型袖迎。
EPOCH_ORDINAL = 719163

  1. fromInternal(v)
  2. needConversion()
  3. toInternal(d)

class pyspark.sql.types.TimestampType

時間戳(datetime.datetime)數(shù)據(jù)類型。

  1. fromInternal(ts)
  2. needConversion()
  3. toInternal(dt)

class pyspark.sql.types.DecimalType(precision=10, scale=0)

十進制(decimal.Decimal)數(shù)據(jù)類型腺晾。

  1. jsonValue()
  2. simpleString()

class pyspark.sql.types.DoubleType

雙數(shù)據(jù)類型燕锥,表示雙精度浮點數(shù)。

class pyspark.sql.types.FloatType

浮點數(shù)據(jù)類型悯蝉,表示單精度浮點數(shù)归形。

class pyspark.sql.types.ByteType

字節(jié)數(shù)據(jù)類型,即單個字節(jié)中的有符號整數(shù)鼻由。

  1. simpleString()

class pyspark.sql.types.IntegerType

Int數(shù)據(jù)類型暇榴,即有符號的32位整數(shù)。

  1. simpleString()

class pyspark.sql.types.LongType

長數(shù)據(jù)類型蕉世,即有符號的64位整數(shù)蔼紧。

  1. simpleString()

class pyspark.sql.types.ShortType

短數(shù)據(jù)類型,即有符號的16位整數(shù)狠轻。

  1. simpleString()

class pyspark.sql.types.ArrayType(elementType, containsNull=True)

數(shù)組數(shù)據(jù)類型奸例。

  1. fromInternal(obj)
  2. classmethod fromJson(json)
  3. jsonValue()
  4. needConversion()
  5. simpleString()
  6. toInternal(obj)

class pyspark.sql.types.MapType(keyType, valueType, valueContainsNull=True)

Map數(shù)據(jù)類型。

  1. fromInternal(obj)
  2. classmethod fromJson(json)
  3. jsonValue()
  4. needConversion()
  5. simpleString()
  6. toInternal(obj)

class pyspark.sql.types.StructField(name, dataType, nullable=True, metadata=None)

StructType中的一個字段向楼。

  1. fromInternal(obj)
  2. classmethod fromJson(json)
  3. jsonValue()
  4. needConversion()
  5. simpleString()
  6. toInternal(obj)

class pyspark.sql.types.StructType(fields=None)

結(jié)構類型查吊,由StructField的列表組成谐区。
這是表示一個行的數(shù)據(jù)類型。

  1. add(field, data_type=None, nullable=True, metadata=None)
    通過添加新元素來構造一個StructType來定義模式逻卖。 該方法接受:
    一個參數(shù)是一個StructField對象宋列;介于2到4之間的參數(shù)(name,data_type评也,nullable(可選)炼杖,metadata(可選))。data_type參數(shù)可以是String或DataType對象仇参。
  2. fromInternal(obj)
  3. classmethod fromJson(json)
  4. jsonValue()
  5. needConversion()
  6. simpleString()
  7. toInternal(obj)

pyspark.sql.functions module

內(nèi)建函數(shù)的集合

  1. pyspark.sql.functions.abs(col)
    計算絕對值。
  2. pyspark.sql.functions.acos(col)
    計算給定值的余弦逆; 返回的角度在0到π的范圍內(nèi)婆殿。
  3. pyspark.sql.functions.add_months(start, months)
    返回開始后幾個月的日期
>>> from pyspark.sql import functions as f
>>> df = spark.createDataFrame([("2017-12-25",)],["d"])
>>> df.select(f.add_months(df.d,1).alias("d")).collect()
[Row(d=datetime.date(2018, 1, 25))]
  1. pyspark.sql.functions.approx_count_distinct(col, rsd=None)
    返回col的近似不同計數(shù)的新列诈乒。
  2. pyspark.sql.functions.array(*cols)
    創(chuàng)建一個新的數(shù)組列。
  3. pyspark.sql.functions.array_contains(col, value)
    集合函數(shù):如果數(shù)組為null婆芦,則返回null;如果數(shù)組包含給定值怕磨,則返回true;否則返回false。
  4. pyspark.sql.functions.asc(col)
    基于給定列名稱的升序返回一個排序表達式消约。
  5. pyspark.sql.functions.ascii(col)
    計算字符串列的第一個字符的數(shù)值肠鲫。
  6. pyspark.sql.functions.asin(col)
    計算給定值的正弦倒數(shù); 返回的角度在- π/ 2到π/ 2的范圍內(nèi)。
  7. pyspark.sql.functions.atan(col)
    計算給定值的正切倒數(shù)或粮。
  8. pyspark.sql.functions.atan2(col1, col2)
    返回直角坐標(x导饲,y)到極坐標(r,theta)轉(zhuǎn)換的角度theta氯材。
  9. pyspark.sql.functions.avg(col)
    聚合函數(shù):返回組中的值的平均值渣锦。
  10. pyspark.sql.functions.base64(col)
    計算二進制列的BASE64編碼并將其作為字符串列返回。
  11. pyspark.sql.functions.bin(col)
    返回給定列的二進制值的字符串表示形式氢哮。
  12. pyspark.sql.functions.bitwiseNOT(col)
    不按位計算袋毙。
  13. pyspark.sql.functions.broadcast(df)
    將DataFrame標記為足夠小以用于廣播連接。
  14. pyspark.sql.functions.bround(col, scale=0)
    如果scale> = 0冗尤,則使用HALF_EVEN舍入模式對給定值進行四舍五入以縮放小數(shù)點;如果scale <0听盖,則將其舍入到整數(shù)部分。
  15. pyspark.sql.functions.cbrt(col)
    計算給定值的立方根裂七。
  16. pyspark.sql.functions.ceil(col)
    計算給定值的上限皆看。
  17. pyspark.sql.functions.coalesce(*cols)
    返回不為空的第一列。
  18. pyspark.sql.functions.col(col)
    根據(jù)給定的列名返回一個列背零。
  19. pyspark.sql.functions.collect_list(col)
    聚合函數(shù):返回重復對象的列表悬蔽。
  20. pyspark.sql.functions.collect_set(col)
    聚合函數(shù):返回一組消除重復元素的對象。
  21. pyspark.sql.functions.column(col)
    根據(jù)給定的列名返回一個列捉兴。
  22. pyspark.sql.functions.concat(*cols)
    將多個輸入字符串列連接成一個字符串列蝎困。
>>> df = spark.createDataFrame([('abcd','123')], ['s', 'd'])
>>> df.select(f.concat(df.s, df.d).alias('s')).collect()
[Row(s=u'abcd123')]
  1. pyspark.sql.functions.concat_ws(sep, *cols)
    使用給定的分隔符將多個輸入字符串列連接到一個字符串列中录语。
>>> df = spark.createDataFrame([('abcd','123')], ['s', 'd'])
>>> df.select(concat_ws('-', df.s, df.d).alias('s')).collect()
[Row(s=u'abcd-123')]
  1. pyspark.sql.functions.conv(col, fromBase, toBase)
    將字符串列中的數(shù)字從一個進制轉(zhuǎn)換為另一個進制。
>>> df = spark.createDataFrame([("010101",)], ['n'])
>>> df.select(conv(df.n, 2, 16).alias('hex')).collect()
[Row(hex=u'15')]
  1. pyspark.sql.functions.corr(col1, col2)
    返回col1和col2的Pearson相關系數(shù)的新列禾乘。
  2. pyspark.sql.functions.cos(col)
    計算給定值的余弦澎埠。
  3. pyspark.sql.functions.cosh(col)
    計算給定值的雙曲余弦。
  4. pyspark.sql.functions.count(col)
    聚合函數(shù):返回組中的項目數(shù)量。
  5. pyspark.sql.functions.countDistinct(col, *cols)
    返回col或col的不同計數(shù)的新列。
  6. pyspark.sql.functions.covar_pop(col1, col2)
    返回col1和col2的總體協(xié)方差的新列盔粹。
  7. pyspark.sql.functions.covar_samp(col1, col2)
    返回col1和col2的樣本協(xié)方差的新列椰弊。
  8. pyspark.sql.functions.crc32(col)
    計算二進制列的循環(huán)冗余校驗值(CRC32),并將該值作為bigint返回萨咳。
  9. pyspark.sql.functions.create_map(*cols)
    創(chuàng)建一個新的地圖列。
  10. pyspark.sql.functions.cume_dist()
    窗口函數(shù):返回窗口分區(qū)內(nèi)值的累積分布,即在當前行下面的行的分數(shù)祥国。
  11. pyspark.sql.functions.current_date()
    以日期列的形式返回當前日期。
  12. pyspark.sql.functions.current_timestamp()
    將當前時間戳作為時間戳列返回晾腔。
  13. pyspark.sql.functions.date_add(start, days)
    返回開始后幾天的日期
>>> df = spark.createDataFrame([('2015-04-08',)], ['d'])
>>> df.select(date_add(df.d, 1).alias('d')).collect()
[Row(d=datetime.date(2015, 4, 9))]
  1. pyspark.sql.functions.date_format(date, format)
    將日期/時間戳/字符串轉(zhuǎn)換為由第二個參數(shù)給定日期格式指定格式的字符串值舌稀。
>>> df = spark.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(date_format('a', 'MM/dd/yyy').alias('date')).collect()
[Row(date=u'04/08/2015')]
  1. pyspark.sql.functions.date_sub(start, days)
    返回開始前幾天的日期
>>> df = spark.createDataFrame([('2015-04-08',)], ['d'])
>>> df.select(date_sub(df.d, 1).alias('d')).collect()
[Row(d=datetime.date(2015, 4, 7))]
  1. pyspark.sql.functions.datediff(end, start)
    返回從開始到結(jié)束的天數(shù)。
>>> df = spark.createDataFrame([('2015-04-08','2015-05-10')], ['d1', 'd2'])
>>> df.select(datediff(df.d2, df.d1).alias('diff')).collect()
[Row(diff=32)]
  1. pyspark.sql.functions.dayofmonth(col)
    將給定日期的月份的日期解壓為整數(shù)灼擂。(一月中第幾天)
>>> df = spark.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(dayofmonth('a').alias('day')).collect()
[Row(day=8)]
  1. pyspark.sql.functions.dayofyear(col)
    將給定日期的年份中的某一天提取為整數(shù)壁查。(一年中第幾天)
>>> df = spark.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(dayofyear('a').alias('day')).collect()
[Row(day=98)]
  1. pyspark.sql.functions.decode(col, charset)
    Computes the first argument into a string from a binary using the provided character set (one of ‘US-ASCII’, ‘ISO-8859-1’, ‘UTF-8’, ‘UTF-16BE’, ‘UTF-16LE’, ‘UTF-16’).
  2. pyspark.sql.functions.degrees(col)
    將以弧度度量的角度轉(zhuǎn)換為以度數(shù)度量的近似等效角度。
  3. pyspark.sql.functions.dense_rank()
    窗口函數(shù):返回窗口分區(qū)內(nèi)的行的等級剔应,沒有任何間隙
  4. pyspark.sql.functions.desc(col)
    基于給定列名稱的降序返回一個排序表達式睡腿。
  5. pyspark.sql.functions.encode(col, charset)
    Computes the first argument into a binary from a string using the provided character set (one of ‘US-ASCII’, ‘ISO-8859-1’, ‘UTF-8’, ‘UTF-16BE’, ‘UTF-16LE’, ‘UTF-16’).
  6. pyspark.sql.functions.exp(col)
    計算給定值的指數(shù)。
  7. pyspark.sql.functions.explode(col)
    返回給定數(shù)組或映射中每個元素的新行峻贮。
  8. pyspark.sql.functions.expm1(col)
    計算給定值的指數(shù)減1嫉到。
  9. pyspark.sql.functions.expr(str)
    將表達式字符串解析到它表示的列中
  10. pyspark.sql.functions.factorial(col)
    計算給定值的階乘。
>>> df = spark.createDataFrame([(5,)], ['n'])
>>> df.select(factorial(df.n).alias('f')).collect()
[Row(f=120)]
  1. pyspark.sql.functions.first(col, ignorenulls=False)
    聚合函數(shù):返回組中的第一個值月洛。
  2. pyspark.sql.functions.floor(col)
    計算給定值的floor何恶。
  3. pyspark.sql.functions.format_number(col, d)
    將數(shù)字X格式化為像'#, - #嚼黔, - #.-'這樣的格式细层,用HALF_EVEN舍入模式四舍五入到小數(shù)點后的位置,然后以字符串形式返回結(jié)果唬涧。
  4. pyspark.sql.functions.format_string(format, *cols)
    以printf-style格式化參數(shù)疫赎,并將結(jié)果作為字符串列返回。
  5. pyspark.sql.functions.from_json(col, schema, options={})
    使用指定的模式將包含JSON字符串的列解析為[[StructType]]的[[StructType]]或[[ArrayType]]碎节。 在不可解析的字符串的情況下返回null捧搞。
  6. pyspark.sql.functions.from_unixtime(timestamp, format='yyyy-MM-dd HH:mm:ss')
    將來自unix時期(1970-01-01 00:00:00 UTC)的秒數(shù)轉(zhuǎn)換為以給定格式表示當前系統(tǒng)時區(qū)中該時刻的時間戳的字符串。
  7. pyspark.sql.functions.from_utc_timestamp(timestamp, tz)
    給定一個時間戳,對應于UTC中的某個特定時間胎撇,返回對應于給定時區(qū)中同一時間的另一個時間戳介粘。
  8. pyspark.sql.functions.get_json_object(col, path)
    從基于指定的json路徑的json字符串中提取json對象,并返回提取的json對象的json字符串晚树。 如果輸入的json字符串無效姻采,它將返回null。
  9. pyspark.sql.functions.greatest(*cols)
    返回列名稱列表的最大值爵憎,跳過空值慨亲。 該功能至少需要2個參數(shù)。 如果所有參數(shù)都為空宝鼓,它將返回null刑棵。
  10. pyspark.sql.functions.grouping(col)
    聚合函數(shù):指示GROUP BY列表中的指定列是否被聚合,在結(jié)果集中返回1表示聚合或0表示未聚合愚铡。
  11. pyspark.sql.functions.grouping_id(*cols)
    聚合函數(shù):返回分組的級別蛉签,等于(grouping(c1) << (n-1)) + (grouping(c2) << (n-2)) + ... + grouping(cn)
  12. pyspark.sql.functions.hash(*cols)
    計算給定列的哈希碼,并將結(jié)果作為int列返回茂附。
>>> spark.createDataFrame([('ABC',)], ['a']).select(hash('a').alias('hash')).collect()
[Row(hash=-757602832)]
  1. pyspark.sql.functions.hex(col)
    計算給定列的十六進制值正蛙,可能是pyspark.sql.types.StringType督弓,pyspark.sql.types.BinaryType营曼,pyspark.sql.types.IntegerType或pyspark.sql.types.LongType。
>>> spark.createDataFrame([('ABC', 3)], ['a', 'b']).select(hex('a'), hex('b')).collect()
[Row(hex(a)=u'414243', hex(b)=u'3')]
  1. pyspark.sql.functions.hour(col)
    將給定日期的小時數(shù)提取為整數(shù)愚隧。
>>> df = spark.createDataFrame([('2015-04-08 13:08:15',)], ['a'])
>>> df.select(hour('a').alias('hour')).collect()
[Row(hour=13)]
  1. pyspark.sql.functions.hypot(col1, col2)
    計算sqrt(a ^ 2 + b ^ 2)蒂阱,無中間上溢或下溢。
  2. pyspark.sql.functions.initcap(col)
    在句子中將每個單詞的第一個字母翻譯成大寫狂塘。
>>> spark.createDataFrame([('ab cd',)], ['a']).select(initcap("a").alias('v')).collect()
[Row(v=u'Ab Cd')]
  1. pyspark.sql.functions.input_file_name()
    為當前Spark任務的文件名創(chuàng)建一個字符串列录煤。
  2. pyspark.sql.functions.instr(str, substr)
    找到給定字符串中第一次出現(xiàn)substr列的位置。 如果其中任一參數(shù)為null荞胡,則返回null妈踊。
>>> df = spark.createDataFrame([('abcd',)], ['s',])
>>> df.select(instr(df.s, 'b').alias('s')).collect()
[Row(s=2)]
  1. pyspark.sql.functions.isnan(col)
    如果列是NaN,則返回true的表達式泪漂。
  2. pyspark.sql.functions.isnull(col)
    如果列為空廊营,則返回true的表達式。
  3. pyspark.sql.functions.json_tuple(col, *fields)
    根據(jù)給定的字段名稱為json列創(chuàng)建一個新行萝勤。
    Parameters:
  • col - json格式的字符串列
  • fields - 要提取的字段列表
  1. pyspark.sql.functions.kurtosis(col)
    聚合函數(shù):返回組中值的峰度露筒。
  2. pyspark.sql.functions.lag(col, count=1, default=None)
    窗口函數(shù):返回當前行之前的偏移行值;如果當前行之前的行數(shù)小于偏移量,則返回defaultValue敌卓。例如慎式,若偏移量為1,將返回窗口分區(qū)中任何給定點的前一行。
    這相當于SQL中的LAG函數(shù)瘪吏。
    Parameters:
  • col - 列名或表達式的名稱
  • count - 要擴展的行數(shù)
  • default - 默認值
  1. pyspark.sql.functions.last(col, ignorenulls=False)
    聚合函數(shù):返回組中的最后一個值癣防。
    該函數(shù)默認返回它看到的最后一個值。 當ignoreNulls設置為true時肪虎,它將返回它看到的最后一個非null值劣砍。 如果所有值都為空,則返回null扇救。
  2. pyspark.sql.functions.last_day(date)
    返回給定日期所屬月份的最后一天刑枝。
>>> df = spark.createDataFrame([('1997-02-10',)], ['d'])
>>> df.select(last_day(df.d).alias('date')).collect()
[Row(date=datetime.date(1997, 2, 28))]
  1. pyspark.sql.functions.lead(col, count=1, default=None)
    Window函數(shù):返回當前行之后的偏移行值;如果當前行之后的行數(shù)小于偏移行,則返回defaultValue迅腔。 例如装畅,偏移量為1,將返回窗口分區(qū)中任意給定點的下一行沧烈。
    這相當于SQL中的LEAD函數(shù)掠兄。
  2. pyspark.sql.functions.least(*cols)
    返回多列中的最小值,跳過空值锌雀。 該功能至少需要2個參數(shù)蚂夕,及最少需要兩個列名。 如果所有參數(shù)都為空腋逆,它將返回null婿牍。
>>> df = spark.createDataFrame([(1, 4, 3)], ['a', 'b', 'c'])
>>> df.select(least(df.a, df.b, df.c).alias("least")).collect()
[Row(least=1)]
  1. pyspark.sql.functions.length(col)
    計算字符串或二進制表達式的長度。
>>> spark.createDataFrame([('ABC',)], ['a']).select(length('a').alias('length')).collect()
[Row(length=3)]
  1. pyspark.sql.functions.levenshtein(left, right)
    計算兩個給定字符串的Levenshtein距離惩歉。
    Levenshtein距離(編輯距離)等脂,是指兩個字串之間,由一個轉(zhuǎn)成另一個所需的最少編輯操作次數(shù)撑蚌。具體可自行百度上遥。
>>> df0 = spark.createDataFrame([('kitten', 'sitting',)], ['l', 'r'])
>>> df0.select(levenshtein('l', 'r').alias('d')).collect()
[Row(d=3)]
  1. pyspark.sql.functions.lit(col)
    創(chuàng)建一個字面值的列。
  2. pyspark.sql.functions.locate(substr, str, pos=1)
    在str字符串列中找到在pos位置后面第一個出現(xiàn)substr的位置争涌。
  • Note: 該位置不是從零開始的粉楚,而是從1開始的。 如果在str中找不到substr亮垫,則返回0模软。
    Parameters:
  • substr - 要查找的字符串
  • str - pyspark.sql.types.StringType的列
  • pos - 起始位置
>>> df = spark.createDataFrame([('abcd',)], ['s',])
>>> df.select(locate('b', df.s, 1).alias('s')).collect()
[Row(s=2)]
  1. pyspark.sql.functions.log(arg1, arg2=None)
    返回第二個參數(shù)的基于第一個參數(shù)的對數(shù)。
    如果只有一個參數(shù)包警,那么這個參數(shù)就是自然對數(shù)撵摆。
>>> df.select(log(10.0, df.age).alias('ten')).rdd.map(lambda l: str(l.ten)[:7]).collect()
['0.30102', '0.69897']
>>> df.select(log(df.age).alias('e')).rdd.map(lambda l: str(l.e)[:7]).collect()
['0.69314', '1.60943']
  1. pyspark.sql.functions.log10(col)
    計算給定一個數(shù)以10為底的對數(shù)。
  2. pyspark.sql.functions.log1p(col)
    Computes the natural logarithm of the given value plus one.
  3. pyspark.sql.functions.log2(col)
    返回參數(shù)的基數(shù)為2的對數(shù)害晦。
>>> spark.createDataFrame([(4,)], ['a']).select(log2('a').alias('log2')).collect()
[Row(log2=2.0)]
  1. pyspark.sql.functions.lower(col)
    將字符串列轉(zhuǎn)換為小寫特铝。
  2. pyspark.sql.functions.lpad(col, len, pad)
    左填充到指定長度暑中。
>>> df = spark.createDataFrame([('abcd',)], ['s',])
>>> df.select(lpad(df.s, 6, '#').alias('s')).collect()
[Row(s=u'##abcd')]
  1. pyspark.sql.functions.ltrim(col)
    去掉字符串左邊的空格。
  2. pyspark.sql.functions.max(col)
    聚合函數(shù):返回組中表達式的最大值鲫剿。
  3. pyspark.sql.functions.md5(col)
    計算某給定值的MD5值鳄逾,將該值作為32個字符的十六進制字符串返回。
>>> spark.createDataFrame([('ABC',)], ['a']).select(md5('a').alias('hash')).collect()
[Row(hash=u'902fbdd2b1df0c4f70b4a5d23525e932')]
  1. pyspark.sql.functions.mean(col)
    聚合函數(shù):返回組中所有值的平均值灵莲。
  2. pyspark.sql.functions.min(col)
    聚合函數(shù):返回組中表達式的最小值雕凹。
  3. pyspark.sql.functions.minute(col)
    提取給定日期的分鐘數(shù)為整數(shù)。
>>> df = spark.createDataFrame([('2015-04-08 13:08:15',)], ['a'])
>>> df.select(minute('a').alias('minute')).collect()
[Row(minute=8)]
  1. pyspark.sql.functions.monotonically_increasing_id()
    生成單調(diào)遞增的64位整數(shù)的列政冻。
    生成的ID保證是單調(diào)遞增和唯一的枚抵,但不是連續(xù)的。 當前的實現(xiàn)將分區(qū)ID放在高31位明场,并將每個分區(qū)內(nèi)的記錄號放在低33位汽摹。 假設數(shù)據(jù)框的分區(qū)少于10億個,每個分區(qū)少于80億條記錄苦锨。
    作為一個例子逼泣,考慮一個帶有兩個分區(qū)的DataFrame,每個分區(qū)有三個記錄舟舒。 該表達式將返回以下ID:0,1,2,8589934592(1L << 33)拉庶,8589934593,8589934594秃励。
>>> df0 = sc.parallelize(range(2), 2).mapPartitions(lambda x: [(1,), (2,), (3,)]).toDF(['col1'])
>>> df0.select(monotonically_increasing_id().alias('id')).collect()
[Row(id=0), Row(id=1), Row(id=2), Row(id=8589934592), Row(id=8589934593), Row(id=8589934594)]
  1. pyspark.sql.functions.month(col)
    將給定日期的月份提取為整數(shù)氏仗。
>>> df = spark.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(month('a').alias('month')).collect()
[Row(month=4)]
  1. pyspark.sql.functions.months_between(date1, date2)
    返回date1和date2之間的月數(shù)。
  2. pyspark.sql.functions.nanvl(col1, col2)
    如果col1不是NaN莺治,則返回col1;如果col1是NaN廓鞠,則返回col2帚稠。
    兩個輸入都應該是浮點列(DoubleType或FloatType)谣旁。
>>> df = spark.createDataFrame([(1.0, float('nan')), (float('nan'), 2.0)], ("a", "b"))
>>> df.select(nanvl("a", "b").alias("r1"), nanvl(df.a, df.b).alias("r2")).collect()
[Row(r1=1.0, r2=1.0), Row(r1=2.0, r2=2.0)]
  1. pyspark.sql.functions.next_day(date, dayOfWeek)
    返回晚于日期列值的第一個日期。
    星期幾參數(shù)不區(qū)分大小寫滋早,并接受:“Mon”, “Tue”, “Wed”, “Thu”, “Fri”, “Sat”, “Sun”.
>>> df = spark.createDataFrame([('2015-07-27',)], ['d'])
>>> df.select(next_day(df.d, 'Sun').alias('date')).collect()
[Row(date=datetime.date(2015, 8, 2))]
  1. pyspark.sql.functions.ntile(n)
    窗口函數(shù):在有序的窗口分區(qū)中返回ntile組ID(從1到n)榄审。 例如,如果n是4杆麸,則第一季度行將得到值1搁进,第二季度將得到2,第三季度將得到3昔头,并且最后一個季度將得到4饼问。
    這相當于SQL中的NTILE函數(shù)。
    Parameters: n – an integer
  2. pyspark.sql.functions.percent_rank()
    窗口函數(shù):返回窗口分區(qū)內(nèi)的行的相對等級(即百分比)揭斧。
  3. pyspark.sql.functions.posexplode(col)
    為給定數(shù)組或映射中的每個元素返回一個新行莱革。
>>> from pyspark.sql import Row
>>> eDF = spark.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
>>> eDF.select(posexplode(eDF.intlist)).collect()
[Row(pos=0, col=1), Row(pos=1, col=2), Row(pos=2, col=3)]
>>> eDF.select(posexplode(eDF.mapfield)).show()
+---+---+-----+
|pos|key|value|
+---+---+-----+
|  0|  a|    b|
+---+---+-----+
  1. pyspark.sql.functions.pow(col1, col2)
    返回col1的col2次方的值。
  2. pyspark.sql.functions.quarter(col)
    提取給定日期所屬的季度值。
>>> df = spark.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(quarter('a').alias('quarter')).collect()
[Row(quarter=2)]
  1. pyspark.sql.functions.radians(col)
    將以度數(shù)度量的角度轉(zhuǎn)換為以弧度測量的近似等效角度盅视。
  2. pyspark.sql.functions.rand(seed=None)
    從U [0.0捐名,1.0]生成一個具有獨立且分布相同(i.i.d.)樣本的隨機列。
  3. pyspark.sql.functions.randn(seed=None)
    從標準正態(tài)分布生成具有獨立且分布相同(i.i.d.)樣本的列闹击。
  4. pyspark.sql.functions.rank()
    窗口函數(shù):返回窗口分區(qū)內(nèi)的行的等級镶蹋。
    rank和dense_rank之間的區(qū)別在于,當有tie時赏半,dense_rank在排序順序上沒有差距贺归。也就是說,如果你使用dense_rank排名比賽断箫,并且有三個人排在第二位牧氮,那么你會說這三個都排在第二位,下一個排在第三位瑰枫。排名會給我連續(xù)的數(shù)字踱葛,使排在第三位(關系之后)的人將登記為第五名。
    這相當于SQL中的RANK函數(shù)光坝。
  5. pyspark.sql.functions.regexp_extract(str, pattern, idx)
    從指定的字符串列中提取由Java正則表達式匹配的特定組尸诽。 如果正則表達式不匹配,或者指定的組不匹配盯另,則返回空字符串性含。
>>> df = spark.createDataFrame([('100-200',)], ['str'])
>>> df.select(regexp_extract('str', '(\d+)-(\d+)', 1).alias('d')).collect()
[Row(d=u'100')]
>>> df = spark.createDataFrame([('foo',)], ['str'])
>>> df.select(regexp_extract('str', '(\d+)', 1).alias('d')).collect()
[Row(d=u'')]
>>> df = spark.createDataFrame([('aaaac',)], ['str'])
>>> df.select(regexp_extract('str', '(a+)(b)?(c)', 2).alias('d')).collect()
[Row(d=u'')]
  1. pyspark.sql.functions.regexp_replace(str, pattern, replacement)
    將與regexp匹配的指定字符串值的所有子字符串替換為rep。
>>> df = spark.createDataFrame([('100-200',)], ['str'])
>>> df.select(regexp_replace('str', '(\d+)', '+').alias('d')).collect()
[Row(d=u'+-+')]
  1. pyspark.sql.functions.repeat(col, n)
    重復一個字符串列n次鸳惯,并將其作為新的字符串列返回商蕴。
>>> df = spark.createDataFrame([('ab',)], ['s',])
>>> df.select(repeat(df.s, 3).alias('s')).collect()
[Row(s=u'ababab')]
  1. pyspark.sql.functions.reverse(col)
    反轉(zhuǎn)字符串列并將其作為新的字符串列返回。
  2. pyspark.sql.functions.rint(col)
    返回值最接近參數(shù)的double值芝发,等于一個數(shù)學整數(shù)绪商。
  3. pyspark.sql.functions.round(col, scale=0)
    如果scale> = 0,則使用HALF_UP舍入模式對給定值進行四舍五入以縮放小數(shù)點;如果scale <0辅鲸,則將其舍入到整數(shù)部分格郁。
>>> spark.createDataFrame([(2.5,)], ['a']).select(round('a', 0).alias('r')).collect()
[Row(r=3.0)]
  1. pyspark.sql.functions.row_number()
    窗口函數(shù):返回窗口分區(qū)內(nèi)從1開始的連續(xù)編號。
  2. pyspark.sql.functions.rpad(col, len, pad)
    右填充到指定長度独悴。
>>> df = spark.createDataFrame([('abcd',)], ['s',])
>>> df.select(rpad(df.s, 6, '#').alias('s')).collect()
[Row(s=u'abcd##')]
  1. pyspark.sql.functions.rtrim(col)
    去除字符串右邊的空格例书。
  2. pyspark.sql.functions.second(col)
    提取給定日期的秒數(shù)為整數(shù)。
>>> df = spark.createDataFrame([('2015-04-08 13:08:15',)], ['a'])
>>> df.select(second('a').alias('second')).collect()
[Row(second=15)]
  1. pyspark.sql.functions.sha1(col)
    返回SHA-1的十六進制字符串結(jié)果刻炒。
>>> spark.createDataFrame([('ABC',)], ['a']).select(sha1('a').alias('hash')).collect()
[Row(hash=u'3c01bdbb26f358bab27f267924aa2c9a03fcfdb8')]
  1. pyspark.sql.functions.sha2(col, numBits)
    返回SHA-2系列散列函數(shù)(SHA-224决采,SHA-256,SHA-384和SHA-512)的十六進制字符串結(jié)果坟奥。 numBits表示結(jié)果的所需位長度树瞭,其值必須為224,256,384,512或0(相當于256)暂幼。
>>> digests = df.select(sha2(df.name, 256).alias('s')).collect()
>>> digests[0]
Row(s=u'3bc51062973c458d5a6f2d8d64a023246354ad7e064b1e4e009ec8a0699a3043')
>>> digests[1]
Row(s=u'cd9fb1e148ccd8442e5aa74904cc73bf6fb54d1d54d333bd596aa9bb4bb4e961')
  1. pyspark.sql.functions.shiftLeft(col, numBits)
    將給定值col左移numBits位。
>>> spark.createDataFrame([(21,)], ['a']).select(shiftLeft('a', 1).alias('r')).collect()
[Row(r=42)]
  1. pyspark.sql.functions.shiftRight(col, numBits)
    將給定值col右移numBits位(Signed)移迫。
  2. pyspark.sql.functions.shiftRightUnsigned(col, numBits)
    將給定值col右移numBits位(Unsigned)旺嬉。
>>> df = spark.createDataFrame([(-42,)], ['a'])
>>> df.select(shiftRightUnsigned('a', 1).alias('r')).collect()
[Row(r=9223372036854775787)]
  1. pyspark.sql.functions.signum(col)
    計算給定值的正負號。
  2. pyspark.sql.functions.sin(col)
    計算給定值的正弦值厨埋。
  3. pyspark.sql.functions.sinh(col)
    計算給定值的雙曲正弦值邪媳。
  4. pyspark.sql.functions.size(col)
    集合函數(shù):返回存儲在列中的數(shù)組或映射的長度。
>>> df = spark.createDataFrame([([1, 2, 3],),([1],),([],)], ['data'])
>>> df.select(size(df.data)).collect()
[Row(size(data)=3), Row(size(data)=1), Row(size(data)=0)]
  1. pyspark.sql.functions.skewness(col)
    聚合函數(shù):返回組中值的偏度荡陷。
  2. pyspark.sql.functions.sort_array(col, asc=True)
    Collection函數(shù):對輸入數(shù)組進行升序或降序排序雨效。
>>> df = spark.createDataFrame([([2, 1, 3],),([1],),([],)], ['data'])
>>> df.select(sort_array(df.data).alias('r')).collect()
[Row(r=[1, 2, 3]), Row(r=[1]), Row(r=[])]
>>> df.select(sort_array(df.data, asc=False).alias('r')).collect()
[Row(r=[3, 2, 1]), Row(r=[1]), Row(r=[])]
  1. pyspark.sql.functions.soundex(col)
    返回字符串的SoundEx編碼。
>>> df = spark.createDataFrame([("Peters",),("Uhrbach",)], ['name'])
>>> df.select(soundex(df.name).alias("soundex")).collect()
[Row(soundex=u'P362'), Row(soundex=u'U612')]
  1. pyspark.sql.functions.spark_partition_id()
    列所在的分區(qū)id
  • Note: 這是不確定的废赞,因為它依賴于數(shù)據(jù)分區(qū)和任務調(diào)度徽龟。
>>> df.repartition(1).select(spark_partition_id().alias("pid")).collect()
[Row(pid=0), Row(pid=0)]
  1. pyspark.sql.functions.split(str, pattern)
    切分字符串。
>>> df = spark.createDataFrame([('ab12cd',)], ['s',])
>>> df.select(split(df.s, '[0-9]+').alias('s')).collect()
[Row(s=[u'ab', u'cd'])]
  1. pyspark.sql.functions.sqrt(col)
    計算指定浮點值的平方根唉地。
  2. pyspark.sql.functions.stddev(col)
    聚合函數(shù):返回組中表達式的無偏樣本標準差据悔。
  3. pyspark.sql.functions.stddev_pop(col)
    聚合函數(shù):返回一個組中表達式的總體標準差。
  4. pyspark.sql.functions.stddev_samp(col)
    聚合函數(shù):返回組中表達式的無偏樣本標準差耘沼。
  5. pyspark.sql.functions.struct(*cols)
    創(chuàng)建一個新的結(jié)構列极颓。
>>> df.select(struct('age', 'name').alias("struct")).collect()
[Row(struct=Row(age=2, name=u'Alice')), Row(struct=Row(age=5, name=u'Bob'))]
>>> df.select(struct([df.age, df.name]).alias("struct")).collect()
[Row(struct=Row(age=2, name=u'Alice')), Row(struct=Row(age=5, name=u'Bob'))]
  1. pyspark.sql.functions.substring(str, pos, len)
    返回在str中從pos位置開始的長度為len值的substring。
>>> df = spark.createDataFrame([('abcd',)], ['s',])
>>> df.select(substring(df.s, 1, 2).alias('s')).collect()
[Row(s=u'ab')]
  1. pyspark.sql.functions.substring_index(str, delim, count)
    在計數(shù)定界符delimiter之前群嗤,返回字符串str的子串菠隆。 如果count是正數(shù),則返回最后一個分隔符左邊的數(shù)字(從左數(shù)起)狂秘。 如果計數(shù)為負數(shù)骇径,則返回最后一個分隔符右邊的數(shù)字(從右數(shù)起)。 substring_index搜索delim時執(zhí)行區(qū)分大小寫的匹配者春。
>>> df = spark.createDataFrame([('a.b.c.d',)], ['s'])
>>> df.select(substring_index(df.s, '.', 2).alias('s')).collect()
[Row(s=u'a.b')]
>>> df.select(substring_index(df.s, '.', -3).alias('s')).collect()
[Row(s=u'b.c.d')]
  1. pyspark.sql.functions.sum(col)
    聚合函數(shù):返回表達式中所有值的總和破衔。
  2. pyspark.sql.functions.sumDistinct(col)
    聚合函數(shù):返回表達式中不同值的總和。
  3. pyspark.sql.functions.tan(col)
    計算給定值的正切值碧查。
  4. pyspark.sql.functions.tanh(col)
    計算給定值的雙曲正切运敢。
  5. pyspark.sql.functions.to_date(col, format=None)
    使用可選的指定格式將pyspark.sql.types.StringType或pyspark.sql.types.TimestampType的列轉(zhuǎn)換為pyspark.sql.types.DateType校仑。默認格式是'yyyy-MM-dd'忠售。 根據(jù)SimpleDateFormats指定格式。
>>> df = spark.createDataFrame([('1997-02-28 10:30:00',)], ['t'])
>>> df.select(to_date(df.t).alias('date')).collect()
[Row(date=datetime.date(1997, 2, 28))]
>>> df = spark.createDataFrame([('1997-02-28 10:30:00',)], ['t'])
>>> df.select(to_date(df.t, 'yyyy-MM-dd HH:mm:ss').alias('date')).collect()
[Row(date=datetime.date(1997, 2, 28))]
  1. pyspark.sql.functions.to_json(col, options={})
    將包含[[StructType]]的[[StructType]]或[[ArrayType]]的列轉(zhuǎn)換為JSON字符串迄沫。 在不支持的類型的情況下會引發(fā)異常稻扬。
    Parameters:
  • col - 包含結(jié)構體或結(jié)構體數(shù)組的列的名稱
  • options - 控制轉(zhuǎn)換的選項。 接受與json數(shù)據(jù)源相同的選項
>>> from pyspark.sql import Row
>>> from pyspark.sql.types import *
>>> data = [(1, Row(name='Alice', age=2))]
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(to_json(df.value).alias("json")).collect()
[Row(json=u'{"age":2,"name":"Alice"}')]
>>> data = [(1, [Row(name='Alice', age=2), Row(name='Bob', age=3)])]
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(to_json(df.value).alias("json")).collect()
[Row(json=u'[{"age":2,"name":"Alice"},{"age":3,"name":"Bob"}]')]
  1. pyspark.sql.functions.to_timestamp(col, format=None)
    使用可選的指定格式將pyspark.sql.types.StringType或pyspark.sql.types.TimestampType的列轉(zhuǎn)換為pyspark.sql.types.DateType羊瘩。 默認格式是'yyyy-MM-dd HH:mm:ss'泰佳。 根據(jù)SimpleDateFormats指定格式盼砍。
>>> df = spark.createDataFrame([('1997-02-28 10:30:00',)], ['t'])
>>> df.select(to_timestamp(df.t).alias('dt')).collect()
[Row(dt=datetime.datetime(1997, 2, 28, 10, 30))]
>>> df = spark.createDataFrame([('1997-02-28 10:30:00',)], ['t'])
>>> df.select(to_timestamp(df.t, 'yyyy-MM-dd HH:mm:ss').alias('dt')).collect()
[Row(dt=datetime.datetime(1997, 2, 28, 10, 30))]
  1. pyspark.sql.functions.to_utc_timestamp(timestamp, tz)
    給定一個時間戳,它對應于給定時區(qū)中的特定時間逝她,返回對應于UTC中同一時間的另一個時間戳浇坐。
>>> df = spark.createDataFrame([('1997-02-28 10:30:00',)], ['t'])
>>> df.select(to_utc_timestamp(df.t, "PST").alias('t')).collect()
[Row(t=datetime.datetime(1997, 2, 28, 18, 30))]
  1. pyspark.sql.functions.translate(srcCol, matching, replace)
    一個函數(shù)通過匹配中的一個字符來轉(zhuǎn)換srcCol中的任何字符。 替換中的字符對應于匹配的字符黔宛。 當字符串中的任何字符與匹配中的字符匹配時近刘,翻譯將發(fā)生。
>>> spark.createDataFrame([('translate',)], ['a']).select(translate('a', "rnlt", "123").alias('r')).collect()
[Row(r=u'1a2s3ae')]
  1. pyspark.sql.functions.trim(col)
    去除字符串兩邊的空格臀晃。
  2. pyspark.sql.functions.trunc(date, format)
    返回截斷到格式指定單位的日期觉渴。
    Parameters: format – ‘year’, ‘YYYY’, ‘yy’ or ‘month’, ‘mon’, ‘mm’
>>> df = spark.createDataFrame([('1997-02-28',)], ['d'])
>>> df.select(trunc(df.d, 'year').alias('year')).collect()
[Row(year=datetime.date(1997, 1, 1))]
>>> df.select(trunc(df.d, 'mon').alias('month')).collect()
[Row(month=datetime.date(1997, 2, 1))]
  1. pyspark.sql.functions.udf(f=None, returnType=StringType)
    創(chuàng)建一個表示用戶定義函數(shù)(UDF)的列表達式。
    *Note: 用戶定義的函數(shù)必須是確定性的徽惋。 由于優(yōu)化案淋,可能會消除重復的調(diào)用,甚至可能會調(diào)用該函數(shù)的次數(shù)超過查詢中的次數(shù)险绘。
>>> from pyspark.sql.types import IntegerType
>>> slen = udf(lambda s: len(s), IntegerType())
>>> :udf
... def to_upper(s):
...     if s is not None:
...         return s.upper()
...
>>> :udf(returnType=IntegerType())
... def add_one(x):
...     if x is not None:
...         return x + 1
...
>>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
>>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")).show()
+----------+--------------+------------+
|slen(name)|to_upper(name)|add_one(age)|
+----------+--------------+------------+
|         8|      JOHN DOE|          22|
+----------+--------------+------------+
  1. pyspark.sql.functions.unbase64(col)
    解碼BASE64編碼的字符串列并將其作為二進制列返回踢京。
  2. pyspark.sql.functions.unhex(col)
    十六進制的反轉(zhuǎn)。 將每對字符解釋為十六進制數(shù)字并轉(zhuǎn)換為數(shù)字的字節(jié)表示宦棺。
>>> spark.createDataFrame([('414243',)], ['a']).select(unhex('a')).collect()
[Row(unhex(a)=bytearray(b'ABC'))]
  1. pyspark.sql.functions.unix_timestamp(timestamp=None, format='yyyy-MM-dd HH:mm:ss')
    使用默認時區(qū)和默認語言環(huán)境漱挚,將具有給定模式的時間字符串(默認為'yyyy-MM-dd HH:mm:ss')轉(zhuǎn)換為Unix時間戳(以秒為單位),如果失敗則返回null渺氧。
    如果時間戳記為None旨涝,則返回當前時間戳。
  2. pyspark.sql.functions.upper(col)
    將字符串列轉(zhuǎn)換為大寫侣背。
  3. pyspark.sql.functions.var_pop(col)
    聚合函數(shù):返回組中值的總體方差白华。
  4. pyspark.sql.functions.var_samp(col)
    聚合函數(shù):返回組中值的無偏差。
  5. pyspark.sql.functions.variance(col)
    聚合函數(shù):返回組中值的總體方差贩耐。
  6. pyspark.sql.functions.weekofyear(col)
    返回指定時間是一年中的第幾周弧腥。
>>> df = spark.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(weekofyear(df.a).alias('week')).collect()
[Row(week=15)]
  1. pyspark.sql.functions.when(condition, value)
    評估條件列表并返回多個可能的結(jié)果表達式之一。 如果不調(diào)用Column.otherwise()潮太,則不匹配條件返回None管搪。(條件判斷)
>>> df.select(when(df['age'] == 2, 3).otherwise(4).alias("age")).collect()
[Row(age=3), Row(age=4)]
>>> df.select(when(df.age == 2, df.age + 1).alias("age")).collect()
[Row(age=3), Row(age=None)]
  1. pyspark.sql.functions.window(timeColumn, windowDuration, slideDuration=None, startTime=None)
>>> df = spark.createDataFrame([("2016-03-11 09:00:07", 1)]).toDF("date", "val")
>>> w = df.groupBy(window("date", "5 seconds")).agg(sum("val").alias("sum"))
>>> w.select(w.window.start.cast("string").alias("start"),
...          w.window.end.cast("string").alias("end"), "sum").collect()
[Row(start=u'2016-03-11 09:00:05', end=u'2016-03-11 09:00:10', sum=1)]
  1. pyspark.sql.functions.year(col)
    將給定日期的年份提取為整數(shù)。
>>> df = spark.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(year('a').alias('year')).collect()
[Row(year=2015)]

pyspark.sql.streaming module

class pyspark.sql.streaming.StreamingQuery(jsq)

當新數(shù)據(jù)到達時铡买,在后臺執(zhí)行的查詢的句柄更鲁。 所有這些方法都是線程安全的。

  1. awaitTermination(timeout=None)
    通過query.stop()或異常來等待查詢的終止奇钞。如果由于異常而查詢終止澡为,則會拋出異常。 如果設置了超時景埃,則會在超時秒數(shù)內(nèi)返回查詢是否終止媒至。如果查詢已被終止顶别,則對該方法的所有后續(xù)調(diào)用將立即返回(如果查詢已由stop()終止),或者立即拋出異常(如果查詢已由異常終止)拒啰。如果由于一個異常使這個查詢已經(jīng)終止了驯绎,則會拋出StreamingQueryException。
  2. exception()
    Returns: StreamingQueryException(如果查詢由異常終止)或None谋旦。
  3. explain(extended=False)
    打犹跖瘛(邏輯和物理)計劃到控制臺進行調(diào)試。
    Parameters: 布爾值蛤织,默認為False赴叹。 如果為False,則僅打印物理計劃指蚜。
  4. id
    返回檢查點數(shù)據(jù)重新啟動時持續(xù)存在的此查詢的唯一標識乞巧。 也就是說,這個ID是在第一次啟動查詢時生成的摊鸡,每次從檢查點數(shù)據(jù)重新啟動時都會一樣绽媒。 Spark群集中只能有一個查詢具有相同的激活碼。 另請參閱runId免猾。
  5. isActive
    此流式查詢當前是否處于活動狀態(tài)是辕。
  6. lastProgress
    返回此流式查詢的最新StreamingQueryProgress更新;如果沒有進度更新,則返回None:return:一個映射猎提。
  7. name
    給此查詢起一個名字获三,如果未指定,則返回null锨苏。 這個名稱可以在org.apache.spark.sql.streaming.DataStreamWriter中指定為dataframe.writeStream.queryName(“query”).start()疙教。 該名稱(如果已設置)在所有活動查詢中必須唯一。
  8. processAllAvailable()
    阻塞直到源中的所有可用數(shù)據(jù)都被處理并提交到接收器伞租。 此方法用于測試贞谓。
  • Note: 在連續(xù)到達數(shù)據(jù)的情況下,這種方法可能永遠阻塞葵诈。 另外份帐,只有在調(diào)用之前辐啄,這個方法才被保證阻塞焕参,直到數(shù)據(jù)已經(jīng)同步地將數(shù)據(jù)附加到流源霞势。 (即getOffset必須立即反映添加)。
  1. recentProgress
    返回此查詢的最新[[StreamingQueryProgress]]更新數(shù)組徊都。 為每個流保留的進度更新數(shù)由Spark會話配置spark.sql.streaming.numRecentProgressUpdates進行配置沪斟。
  2. runId
    返回此查詢的唯一標識,該標識在重新啟動時不會保留暇矫。 也就是說主之,每個啟動(或從檢查點重新啟動)的查詢將具有不同的runId。
  3. status
    返回查詢的當前狀態(tài)李根。
  4. stop()
    停止此流式查詢槽奕。

class pyspark.sql.streaming.StreamingQueryManager(jsqm)

一個來管理所有的StreamingQuery StreamingQueries活動的類。

  1. active
    返回與此SQLContext關聯(lián)的活動查詢的列表房轿。
  2. awaitAnyTermination(timeout=None)
    等到相關SQLContext的任何查詢自上下文創(chuàng)建以來粤攒,或者自調(diào)用resetTerminated()以來已終止。 如果有任何查詢由于異常而終止囱持,那么異常將被拋出夯接。 如果設置了超時,則會在超時秒數(shù)內(nèi)返回查詢是否終止纷妆。
  3. get(id)
    返回來自此SQLContext的活動查詢盔几,或者如果具有此名稱的活動查詢不存在,則會拋出異常掩幢。
  4. resetTerminated()
    忘記過去已終止的查詢逊拍,以便awaitAnyTermination()可以再次用于等待新的終止。

class pyspark.sql.streaming.DataStreamReader(spark)

用于從外部存儲系統(tǒng)(例如文件系統(tǒng)际邻,鍵值存儲等)加載流式DataFrame的接口芯丧。 使用spark.readStream()來訪問它。

  1. csv(path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, columnNameOfCorruptRecord=None, multiLine=None)
    加載CSV文件流并將結(jié)果作為DataFrame返回世曾。
  2. format(source)
    指定輸入數(shù)據(jù)源格式缨恒。
    Parameters: source - 字符串,數(shù)據(jù)源的名稱轮听,例如 'json'肿轨,'parquet'。
>>> s = spark.readStream.format("text")
  1. json(path, schema=None, primitivesAsString=None, prefersDecimal=None, allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None, multiLine=None)
    加載JSON文件流并將結(jié)果作為DataFrame返回蕊程。
  2. load(path=None, format=None, schema=None, **options)
    從數(shù)據(jù)源加載數(shù)據(jù)流并將其作為:class DataFrame 返回椒袍。
  3. option(key, value)
    為基礎數(shù)據(jù)源添加一個輸入選項。
  4. options(**options)
    為底層數(shù)據(jù)源添加多個輸入選項藻茂。
  5. parquet(path)
    加載Parquet文件流驹暑,將結(jié)果作為DataFrame返回。
  6. schema(schema)
    指定輸入模式辨赐。
    某些數(shù)據(jù)源(例如JSON)可以從數(shù)據(jù)自動推斷輸入模式优俘。 通過在這里指定模式,底層數(shù)據(jù)源可以跳過模式推斷步驟掀序,從而加速數(shù)據(jù)加載帆焕。
    Parameters: schema – 一個pyspark.sql.types.StructType 對象。
  7. text(path)
    加載一個文本文件流并返回一個DataFrame,其架構以一個名為“value”的字符串列開始叶雹,如果有的話财饥,后跟分區(qū)列。
    文本文件中的每一行都是生成的DataFrame中的新行折晦。

class pyspark.sql.streaming.DataStreamWriter(df)

用于將流式DataFrame寫入外部存儲系統(tǒng)(例如文件系統(tǒng)钥星,鍵值存儲等)的接口。 使用DataFrame.writeStream()來訪問這個满着。

  1. format(source)
    指定基礎輸出數(shù)據(jù)源谦炒。
  2. option(key, value)
    添加一個底層數(shù)據(jù)源的輸出選項。
  3. options(**options)
    為底層數(shù)據(jù)源添加多個輸出選項风喇。
  4. outputMode(outputMode)
    指定如何將DataFrame/Dataset流數(shù)據(jù)寫入流式接收器宁改。
    Options include:
  • append - 只有DataFrame / Dataset流數(shù)據(jù)中的新行才會寫入接收器
  • complete - DataFrame / Dataset流中的所有行將在每次更新時寫入接收器
  • update - 每當有更新時,只有在DataFrame / Dataset流數(shù)據(jù)中更新的行才會被寫入接收器魂莫。 如果查詢不包含聚合还蹲,它將相當于追加模式。
writer = sdf.writeStream.outputMode('append')
  1. partitionBy(*cols)
    按文件系統(tǒng)上的給定列對輸出進行分區(qū)豁鲤。
    如果指定秽誊,則輸出將在文件系統(tǒng)上進行布局,類似于Hive的分區(qū)方案琳骡。
  2. queryName(queryName)
    指定可以用start()啟動的StreamingQuery的名稱锅论。 該名稱在相關聯(lián)的SparkSession中的所有當前活動查詢中必須是唯一的。
  3. start(path=None, format=None, outputMode=None, partitionBy=None, queryName=None, **options)
    將DataFrame的內(nèi)容流式傳輸?shù)綌?shù)據(jù)源楣号。
    數(shù)據(jù)源由格式和一組選項指定最易。 如果未指定format,則將使用由spark.sql.sources.default配置的缺省數(shù)據(jù)源炫狱。
  4. trigger(*args, **kwargs)
    設置流查詢的觸發(fā)器藻懒。 如果沒有設置,它將盡可能快地運行查詢视译,這相當于將觸發(fā)器設置為processingTime ='0秒'嬉荆。
    Parameters: processingTime – a processing time interval as a string, e.g. ‘5 seconds’, ‘1 minute’.
>>> # trigger the query for execution every 5 seconds
>>> writer = sdf.writeStream.trigger(processingTime='5 seconds')
>>> # trigger the query for just once batch of data
>>> writer = sdf.writeStream.trigger(once=True)
最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市酷含,隨后出現(xiàn)的幾起案子鄙早,更是在濱河造成了極大的恐慌,老刑警劉巖椅亚,帶你破解...
    沈念sama閱讀 216,372評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件限番,死亡現(xiàn)場離奇詭異,居然都是意外死亡呀舔,警方通過查閱死者的電腦和手機弥虐,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評論 3 392
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人霜瘪,你說我怎么就攤上這事珠插。” “怎么了粥庄?”我有些...
    開封第一講書人閱讀 162,415評論 0 353
  • 文/不壞的土叔 我叫張陵丧失,是天一觀的道長豺妓。 經(jīng)常有香客問我惜互,道長,這世上最難降的妖魔是什么琳拭? 我笑而不...
    開封第一講書人閱讀 58,157評論 1 292
  • 正文 為了忘掉前任训堆,我火速辦了婚禮,結(jié)果婚禮上白嘁,老公的妹妹穿的比我還像新娘坑鱼。我一直安慰自己,他們只是感情好絮缅,可當我...
    茶點故事閱讀 67,171評論 6 388
  • 文/花漫 我一把揭開白布鲁沥。 她就那樣靜靜地躺著,像睡著了一般耕魄。 火紅的嫁衣襯著肌膚如雪画恰。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,125評論 1 297
  • 那天吸奴,我揣著相機與錄音允扇,去河邊找鬼。 笑死则奥,一個胖子當著我的面吹牛考润,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播读处,決...
    沈念sama閱讀 40,028評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼糊治,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了罚舱?” 一聲冷哼從身側(cè)響起井辜,我...
    開封第一講書人閱讀 38,887評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎馆匿,沒想到半個月后抑胎,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,310評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡渐北,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,533評論 2 332
  • 正文 我和宋清朗相戀三年阿逃,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,690評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡恃锉,死狀恐怖搀菩,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情破托,我是刑警寧澤肪跋,帶...
    沈念sama閱讀 35,411評論 5 343
  • 正文 年R本政府宣布,位于F島的核電站土砂,受9級特大地震影響州既,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜萝映,卻給世界環(huán)境...
    茶點故事閱讀 41,004評論 3 325
  • 文/蒙蒙 一吴叶、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧序臂,春花似錦蚌卤、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至构订,卻和暖如春侮叮,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背鲫咽。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評論 1 268
  • 我被黑心中介騙來泰國打工签赃, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人分尸。 一個月前我還...
    沈念sama閱讀 47,693評論 2 368
  • 正文 我出身青樓锦聊,卻偏偏與公主長得像,于是被迫代替她去往敵國和親箩绍。 傳聞我的和親對象是個殘疾皇子孔庭,可洞房花燭夜當晚...
    茶點故事閱讀 44,577評論 2 353

推薦閱讀更多精彩內(nèi)容