class pyspark.sql.DataFrame(jdf, sql_ctx)
分布式的收集數(shù)據(jù)分組到命名列中。
一個DataFrame相當(dāng)于在Spark SQL中一個相關(guān)的表,可在SQLContext使用各種方法創(chuàng)建,
2.1 agg(*exprs)
沒有組的情況下聚集整個DataFrame (df.groupBy.agg()的簡寫)蝇完。
>>>l=[('cassie',5),('beiwang',4),('xs',2)]
>>>df = sqlContext.createDataFrame(l,['name','age'])
>>>df.agg({"age": "max"}).collect()[Row(max(age)=5)]
>>>from pyspark.sql importfunctions as F
>>> df.agg(F.min(df.age)).collect()
[Row(min(age)=2)]
2.2?alias(alias)
In [57]: l = [('cassie',2), ('beiwang',3)]
In [58]: df = sqlContext.createDataFrame(l,['name', 'age'])
In [59]: from pyspark.sql.functions import *
In [60]: df1 = df.alias('df1')
In [61]: df2 = df.alias('df2')
In [62]: join_df = df1.join(df2, col("df1.name")==col("df2.name"), 'inner')
In [63]: join_df.select("df1.name")
Out[63]: DataFrame[name: string]
In [64]: join_df.select(col("df1.name"))
Out[64]: DataFrame[name: string]
In [65]: join_df.select(col("df1.name")).collect()
Out[65]: [Row(name=u'beiwang'), Row(name=u'cassie')]
In [66]: join_df.select("df1.name").collect()
Out[66]: [Row(name=u'beiwang'), Row(name=u'cassie')]
2.3 cache()
用默認(rèn)的存儲級別緩存數(shù)據(jù)(MEMORY_ONLY_SER).
2.4 coalesce(numPartitions)
返回一個有確切的分區(qū)數(shù)的分區(qū)的新的DataFrame坠宴。
與在一個RDD上定義的合并類似, 這個操作產(chǎn)生一個窄依賴摧阅。 如果從1000個分區(qū)到100個分區(qū),不會有shuffle過程, 而是每100個新分區(qū)會需要當(dāng)前分區(qū)的10個侧甫。
2.5 collect()
返回所有的記錄數(shù)為行的列表。
>>> df.collect()
[Row(name=u'cassie', age=2), Row(name=u'beiwang', age=3)]
2.6 columns
返回所有列名的列表康铭。
>>> df.columns
['age','name']
2.7 corr(col1, col2, method=None)
計算一個DataFrame相關(guān)的兩列為double值惯退。通常只支持皮爾森相關(guān)系數(shù)。DataFrame.corr()和DataFrameStatFunctions.corr()類似从藤。
參數(shù):● ?col1?– 第一列的名稱
? ??● ?col2?– 第二列的名稱
? ? ● ?method?– 相關(guān)方法.當(dāng)前只支持皮爾森相關(guān)系數(shù)
df.stat.corr('age','hobby')
2.8?count()
返回DataFrame的行數(shù)催跪。
>>> df.count()2
2.9 cov(col1, col2)
計算由列名指定列的樣本協(xié)方差為double值。DataFrame.cov()和DataFrameStatFunctions.cov()類似夷野。
參數(shù):● ?col1?– 第一列的名稱
● ?col2?– 第二列的名稱
df.stat.cov('hobby','age')
2.10 crosstab(col1, col2)
計算給定列的分組頻數(shù)表,也稱為相關(guān)表懊蒸。每一列的去重值的個數(shù)應(yīng)該小于1e4.最多返回1e6個非零對.每一行的第一列會是col1的去重值,列名稱是col2的去重值悯搔。第一列的名稱是$col1_$col2. 沒有出現(xiàn)的配對將以零作為計數(shù)骑丸。DataFrame.crosstab() and DataFrameStatFunctions.crosstab()類似。
參數(shù):● ?col1?– 第一列的名稱. 去重項作為每行的第一項鳖孤。
● ?col2?– 第二列的名稱. 去重項作為DataFrame的列名稱者娱。
df.stat.crosstab("hobby", "age").show()
+---------+---+---+
|hobby_age|? 2|? 3|
+---------+---+---+
|? ? ? ? 5|? 0|? 1|
|? ? ? 10|? 1|? 0|
+---------+---+---+
2.11 cube(*cols)
創(chuàng)建使用指定列的當(dāng)前DataFrame的多維立方體抡笼,這樣可以聚合這些數(shù)據(jù)苏揣。
?df.cube('hobby', df.age).count().show()
+-----+----+-----+
|hobby| age|count|
+-----+----+-----+
|? ? 5|null|? ? 1|
| null|null|? ? 2|
|? 10|null|? ? 1|
|? 10|? 2|? ? 1|
| null|? 2|? ? 1|
|? ? 5|? 3|? ? 1|
| null|? 3|? ? 1|
+-----+----+-----+
2.12 describe(*cols)
計算數(shù)值列的統(tǒng)計信息。
包括計數(shù)推姻,平均平匈,標(biāo)準(zhǔn)差,最小和最大藏古。如果沒有指定任何列增炭,這個函數(shù)計算統(tǒng)計所有數(shù)值列。
2.13 distinct()
返回行去重的新的DataFrame拧晕。
df.distinct().count()
2.14 drop(col)
返回刪除指定列的新的DataFrame
df.drop('hobby').collect()
2.15 dropDuplicates(subset=None)
返回去掉重復(fù)行的一個新的DataFrame,通常只考慮某幾列隙姿。
drop_duplicates()和dropDuplicates()類似。
df.dropDuplicates().show()
>>>df.dropDuplicates(['name','height']).show()
2.16 drop_duplicates(subset=None)
與以上相同厂捞。
2.17 dropna(how='any', thresh=None, subset=None)
返回一個刪除null值行的新的DataFrame输玷。dropna()和dataframenafunctions.drop()類似。
參數(shù):● ?how?– 'any'或者'all'靡馁。如果'any',刪除包含任何空值的行欲鹏。如果'all',刪除所有值為null的行。
●?thresh?– int,默認(rèn)為None,如果指定這個值臭墨,刪除小于閾值的非空值的行赔嚎。這個會重寫'how'參數(shù)。
● ?subset?– 選擇的列名稱列表。
df.na.drop().show()
dfnew.na.drop(how='all',thresh=2).show()
2.18 dtypes
返回所有列名及類型的列表尤误。
>>> df.dtypes
[('age','int'), ('name','string')]
2.19 explain(extended=False)
將(邏輯和物理)計劃打印到控制臺以進行調(diào)試侠畔。
參數(shù):● ?extended?– boolean類型,默認(rèn)為False袄膏。如果為False,只打印物理計劃践图。
df.explain(True)
2.20 fillna(value, subset=None)
替換空值,和na.fill()類似,DataFrame.fillna()和dataframenafunctions.fill()類似沉馆。
參數(shù):● ?value?-?要代替空值的值有int,long,float,string或dict.如果值是字典,subset參數(shù)將被忽略码党。值必須是要替換的列的映射,替換值必須是int,long,float或者string.
● ?subset?- 要替換的列名列表。在subset指定的列,沒有對應(yīng)數(shù)據(jù)類型的會被忽略斥黑。例如,如果值是字符串,subset包含一個非字符串的列,這個非字符串的值會被忽略揖盘。
dfnew.na.fill(50).show()
dfnew.na.fill({'age': 50, 'name': 'unknown'}).show()
2.21 filter(condition)
用給定的條件過濾行。
where()和filter()類似锌奴。
參數(shù):● ?條件?-?一個列的bool類型或字符串的SQL表達式兽狭。
df.where(df.age == 2).collect()
df.filter(df.age == 2).collect()
2.22 first()
返回第一行。
>>> df.first()
Row(age=2, name=u'Alice')
2.23 flatMap(f)
返回在每行應(yīng)用F函數(shù)后的新的RDD,然后將結(jié)果壓扁鹿蜀。
是df.rdd.flatMap()的簡寫箕慧。
>>>df.rdd.flatMap(lambda p: p.name).collect()
[u'A', u'l', u'i', u'c', u'e', u'B', u'o', u'b']
2.24 foreach(f)
應(yīng)用f函數(shù)到DataFrame的所有行。
是df.rdd.foreach()的簡寫茴恰。
def f(person):
?????print(person.name)
>>> df.foreach(f)
Alice
2.25 foreachPartition(f)
應(yīng)用f函數(shù)到DataFrame的每一個分區(qū)颠焦。
是 df.rdd.foreachPartition()的縮寫。
>>>def f(people):
...? ? forpersonin people:
...? ? ? ? print(person.name)>>> df.foreachPartition(f)
2.26??freqItems(cols, support=None)
參數(shù):● ?cols?– 要計算重復(fù)項的列名往枣,為字符串類型的列表或者元祖伐庭。
● ?support?– 要計算頻率項的頻率值。默認(rèn)是1%分冈。參數(shù)必須大于1e-4.
df.stat.freqItems(['name']).collect()
2.27 groupBy(*cols)
使用指定的列分組DataFrame,這樣可以聚合計算圾另。可以從GroupedData查看所有可用的聚合方法雕沉。
groupby()和groupBy()類似集乔。
參數(shù):● ?cols?– 分組依據(jù)的列。每一項應(yīng)該是一個字符串的列名或者列的表達式坡椒。
df.groupBy(['name', df.age]).count().collect()
2.28 groupby(*cols)
和以上一致
2.29 head(n=None)
返回前n行
參數(shù):● ?n?– int類型扰路,默認(rèn)為1,要返回的行數(shù)肠牲。
返回值: 如果n大于1,返回行列表幼衰,如果n為1,返回單獨的一行缀雳。
2.30 insertInto(tableName, overwrite=False)
插入DataFrame內(nèi)容到指定表渡嚣。
注:在1.4中已過時,使用DataFrameWriter.insertInto()代替。
2.31 intersect(other)
返回新的DataFrame,包含僅同時在當(dāng)前框和另一個框的行。
相當(dāng)于SQL中的交集识椰。
df.intersect(df8)
如果collect()和take()方法可以運行在本地(不需要Spark executors)那么返回True
2.32 join(other, on=None, how=None)
使用給定的關(guān)聯(lián)表達式绝葡,關(guān)聯(lián)另一個DataFrame。
以下執(zhí)行df1和df2之間完整的外連接腹鹉。
參數(shù):●?other?– 連接的右側(cè)
●?on?– 一個連接的列名稱字符串, 列名稱列表,一個連接表達式(列)或者列的列表藏畅。如果on參數(shù)是一個字符串或者字符串列表,表示連接列的名稱,這些名稱必須同時存在join的兩個表中, 這樣執(zhí)行的是一個等價連接。
●?how?– 字符串,默認(rèn)'inner'功咒。inner,outer,left_outer,right_outer,leftsemi之一愉阎。
df.join(df8,on='name',how='inner').show()
2.33 limit(num)
將結(jié)果計數(shù)限制為指定的數(shù)字。
df.limit(1).collect()
2.34 map(f)
通過每行應(yīng)用f函數(shù)返回新的RDD力奋。
是 df.rdd.map()的縮寫榜旦。
>>>df.rdd.map(lambda p: p.name).collect()
2.35 mapPartitions(f, preservesPartitioning=False)
通過每個分區(qū)應(yīng)用f函數(shù)返回新的RDD
是df.rdd.mapPartitions()的縮寫。
>>>rdd = sc.parallelize([1, 2, 3, 4], 4)
>>>def f(iterator):yield 1
>>> rdd.mapPartitions(f).sum() 4
2.36 na
返回DataFrameNaFunctions用于處理缺失值
df.na.drop(how='all').show()
2.37 orderBy(*cols, **kwargs)
返回按照指定列排序的新的DataFrame景殷。
參數(shù):●?cols?– 用來排序的列或列名稱的列表溅呢。
●?ascending?– 布爾值或布爾值列表(默認(rèn) True). 升序排序與降序排序。指定多個排序順序的列表猿挚。如果指定列表, 列表的長度必須等于列的長度咐旧。
df.orderBy('age',ascending=False).show()
df.orderBy(['age','height'],ascending=[1,0]).show()
2.38 persist(storageLevel=StorageLevel(False, True, False, False, 1))
設(shè)置存儲級別以在第一次操作運行完成后保存其值。這只能用來分配新的存儲級別绩蜻,如果RDD沒有設(shè)置存儲級別的話铣墨。如果沒有指定存儲級別,默認(rèn)為(memory_only_ser)辜羊。
from pyspark import StorageLevel
df.persist(StorageLevel.MEMORY_ONLY)
print StorageLevel.MEMORY_ONLY
2.39 printSchema()
打印schema以樹的格式
df.printSchema()
splits = rdd.randomSplit([0.7, 0.3],42)
?splits[0].collect(): [0, 1, 2, 3, 4, 5, 7, 9, 10, 12, 13, 14, 16, 18, 19]
splits[1].collect(): [6, 8, 11, 15, 17]
2.40 rdd:返回內(nèi)容為行的RDD踏兜。
2.41 registerAsTable(name): 在1.4中已過時,使用registerTempTable()代替词顾。
2.42 registerTempTable(name)
使用給定的名字注冊該RDD為臨時表
這個臨時表的有效期與用來創(chuàng)建這個DataFrame的SQLContext相關(guān)
df.registerTempTable("people")
?df2 = sqlContext.sql("select * from people")
?df2.show()
+---+------+-----+
|age|height| name|
+---+------+-----+
|? 5|? ? 80|Alice|
|? 5|? ? 80|Alice|
| 10|? ? 80|Alice|
+---+------+-----+
2.43 repartition(numPartitions, *cols)
按照給定的分區(qū)表達式分區(qū),返回新的DataFrame八秃。產(chǎn)生的DataFrame是哈希分區(qū)。
numPartitions參數(shù)可以是一個整數(shù)來指定分區(qū)數(shù)肉盹,或者是一個列昔驱。如果是一個列,這個列會作為第一個分區(qū)列。如果沒有指定,將使用默認(rèn)的分區(qū)數(shù)上忍。
1.6版本修改: 添加可選參數(shù)可以指定分區(qū)列骤肛。如果分區(qū)列指定的話,numPartitions也是可選的。
2.44 replace(to_replace, value, subset=None)
返回用另外一個值替換了一個值的新的DataFrame窍蓝。DataFrame.replace() 和 DataFrameNaFunctions.replace() 類似腋颠。
參數(shù):●?to_replace?– 整形,長整形,浮點型,字符串,或者列表。要替換的值吓笙。如果值是字典,那么值會被忽略淑玫,to_replace必須是一個從列名(字符串)到要替換的值的映射。要替換的值必須是一個整形,長整形,浮點型,或者字符串。
●?value?– 整形,長整形,浮點型,字符串或者列表絮蒿。要替換為的值尊搬。要替換為的值必須是一個整形,長整形,浮點型,或者字符串。如果值是列表或者元組,值應(yīng)該和to_replace有相同的長度土涝。
● subset?– 要考慮替換的列名的可選列表佛寿。在subset指定的列如果沒有匹配的數(shù)據(jù)類型那么將被忽略。例如,如果值是字符串,并且subset參數(shù)包含一個非字符串的列,那么非字符串的列被忽略但壮。
l4=[('Alice',10,80),('Bob',5,None),('Tom',None,None),(None,None,None)]
df4 = sqlContext.createDataFrame(l4,['name','age','height'])
?df4.replace(10, 20).show() #把 10替換20 加不加na都可以
df4.na.replace(['Alice', 'Bob'], ['A', 'B'], 'name').show()+----+----+------+
2.45 rollup(*cols)
使用指定的列為當(dāng)前的DataFrame創(chuàng)建一個多維匯總, 這樣可以聚合這些數(shù)據(jù)冀泻。
In [240]: l=[('Alice',2,80),('Bob',5,None)]
In [241]: df = sqlContext.createDataFrame(l,['name','age','height'])
In [242]: df.show()
+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice|? 2|? ? 80|
|? Bob|? 5|? null|
+-----+---+------+
In [243]:? df.rollup('name', df.age).count().show()
+-----+----+-----+
| name| age|count|
+-----+----+-----+
| null|null|? ? 2|
|? Bob|? 5|? ? 1|
|Alice|? 2|? ? 1|
|? Bob|null|? ? 1|
|Alice|null|? ? 1|
+-----+----+-----+
2.46 sample(withReplacement, fraction, seed=None)
sample(是否放回, fraction, seed)
withReplacement:true抽取放回,false抽取不放回蜡饵。
fraction:1)false抽取不放回的情況下腔长,抽取的概率(0-1)。0-全不抽1-全抽2)true抽取放回的情況下验残,抽取的次數(shù)捞附。seed:隨機數(shù)種子。
返回DataFrame的子集采樣您没。
>>>df.sample(False, 0.5, 42).count()
2.47 sampleBy(col, fractions, seed=None)
根據(jù)每個層次上給出的分?jǐn)?shù)鸟召,返回沒有替換的分層樣本。
返回沒有替換的分層抽樣 基于每層給定的一小部分 在給定的每層的片段
參數(shù):●?col?– 定義層的列
●?fractions?– 每層的抽樣數(shù)氨鹏。如果沒有指定層, 將其數(shù)目視為0.
●?seed?– 隨機數(shù)
返回值: 返回代表分層樣本的新的DataFrame
sampled = dataset.sampleBy("key", fractions={0: 1, 1: 0}, seed=0) 表示key取的比例
2.48 save(path=None, source=None, mode='error', **options)
保存DataFrame的數(shù)據(jù)到數(shù)據(jù)源欧募。
注:在1.4中已過時,使用DataFrameWriter.save()代替。
2.49 saveAsParquetFile(path)
保存內(nèi)容為一個Parquet文件,代表這個schema仆抵。
注:在1.4中已過時,使用DataFrameWriter.parquet() 代替跟继。
2.50 saveAsTable(tableName, source=None, mode='error', **options)
將此DataFrame的內(nèi)容作為表保存到數(shù)據(jù)源。
注:在1.4中已過時,使用DataFrameWriter.saveAsTable() 代替镣丑。
2.51? schema
返回DataFrame的schema為types.StructType舔糖。
>>>l=[('Alice',2),('Bob',5)]>>>df = sqlContext.createDataFrame(l,['name','age'])>>> df.schema
StructType(List(StructField(name,StringType,true),StructField(age,LongType,true)))
2.52? select(*cols)
提供一組表達式并返回一個新的DataFrame。
參數(shù):●cols?– 列名(字符串)或表達式(列)列表莺匠。 如果其中一列的名稱為“*”金吗,那么該列將被擴展為包括當(dāng)前DataFrame中的所有列。
In [275]: df.select('name','age').show()
+-----+---+
| name|age|
+-----+---+
|Alice|? 2|
|? Bob|? 5|
+-----+---+
2.53 show(n=20, truncate=True)?
將前n行打印到控制臺趣竣。
參數(shù):●n?– 要顯示的行數(shù)摇庙。
●?truncate?– 是否截斷長字符串并對齊單元格。
2.54 sort(*cols, **kwargs)
返回按指定列排序的新DataFrame遥缕。
參數(shù):●cols?– 要排序的列或列名稱列表卫袒。
●?ascending?– 布爾值或布爾值列表(默認(rèn)為True)。 排序升序降序单匣。 指定多個排序順序的列表夕凝。 如果指定了列表烤蜕,列表的長度必須等于列的長度。
df.sort(df.age.desc()).collect()
2.55 sortWithinPartitions(*cols, **kwargs)
返回一個新的DataFrame迹冤,每個分區(qū)按照指定的列排序讽营。
參數(shù):●cols?– 要排序的列或列名稱列表渤刃。
●ascending?– 布爾值或布爾值列表(默認(rèn)為True)偷线。 排序升序降序骨坑。 指定多個排序順序的列表秩铆。 如果指定了列表忿等,列表的長度必須等于列的長度旭咽。
df.sortWithinPartitions("age", ascending=False).show()
2.56 stat?
返回統(tǒng)計功能的DataFrameStatFunctions聊训。
2.57 subtract(other)
返回一個新的DataFrame倔喂,這個DataFrame中包含的行不在另一個DataFrame中礁竞。
這相當(dāng)于SQL中的EXCEPT糖荒。
In [278]: df.subtract(df4).show()
+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice|? 2|? ? 80|
+-----+---+------+
2.58 take(num)
返回前num行的行列表
>>>df.take(2)
2.59 toDF(*cols)
返回一個新類:具有新的指定列名稱的DataFrame。
參數(shù):●?cols?– 新列名列表(字符串)模捂。
In [280]: df.toDF('a','b','c').show()
+-----+---+----+
|? ? a|? b|? c|
+-----+---+----+
|Alice|? 2|? 80|
|? Bob|? 5|null|
+-----+---+----+
In [281]: df.show()
+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice|? 2|? ? 80|
|? Bob|? 5|? null|
+-----+---+------+
2.60 toJSON(use_unicode=True)
將DataFrame轉(zhuǎn)換為字符串的RDD捶朵。
每行都將轉(zhuǎn)換為JSON格式作為返回的RDD中的一個元素。
In [284]: df.toJSON().take(2)
Out[284]: [u'{"name":"Alice","age":2,"height":80}', u'{"name":"Bob","age":5}']
2.61 toPandas()
將此DataFrame的內(nèi)容返回為Pandas pandas.DataFrame狂男。
這只有在pandas安裝和可用的情況下才可用综看。
表:In [286]: df.toPandas()
Out[286]:
? ? name? age? height
0? Alice? ? 2? ? 80.0
1? ? Bob? ? 5? ? NaN
2.62 unionAll(other)
返回包含在這個frame和另一個frame的行的聯(lián)合的新DataFrame。
這相當(dāng)于SQL中的UNION ALL岖食。
In [297]: df.unionAll(df4).show()
+-----+----+------+
| name| age|height|
+-----+----+------+
|Alice|? 2|? ? 80|
|? Bob|? 5|? null|
|Alice|? 10|? ? 80|
|? Bob|? 5|? null|
|? Tom|null|? null|
| null|null|? null|
+-----+----+------+
2.63 unpersist(blocking=True)
將DataFrame標(biāo)記為非持久性红碑,并從內(nèi)存和磁盤中刪除所有的塊。
2.64 where(condition)
使用給定表達式過濾行泡垃。
where()是filter()的別名析珊。
2.65 withColumn(colName, col)
通過添加列或替換具有相同名稱的現(xiàn)有列來返回新的DataFrame。
參數(shù):●?colName?– 字符串蔑穴,新列的名稱
●?col?– 新列的列表達式
>>>df.withColumn('age2', df.age + 2).collect()
[Row(name=u'Alice', age=2, age2=4), Row(name=u'Bob', age=5, age2=7)]
2.66 withColumnRenamed(existing, new)
通過重命名現(xiàn)有列來返回新的DataFrame忠寻。
參數(shù):●?existing?– 字符串,要重命名的現(xiàn)有列的名稱
●?col?– 字符串澎剥,列的新名稱
>>>df.withColumnRenamed('age', 'age2').collect()
[Row(name=u'Alice', age2=2), Row(name=u'Bob', age2=5)]
2.67 write
用于將DataFrame的內(nèi)容保存到外部存儲的接口锡溯。
返回:DataFrameWriter