DataFrame的去重,none值填充及異常值處理2018-05-23

spark 數(shù)據(jù)建模準(zhǔn)備

去重

#初始化spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("shuangyu").getOrCreate()
df = spark.createDataFrame([(1,144.5,5.9,33,'M'),
                           (2,167.2,5.4,45,'M'),
                           (3,124.1,5.2,23,'F'),
                           (4,144.5,5.9,33,'M'),
                           (5,133.2,5.7,54,'F'),
                           (3,124.1,5.2,23,'F'),
                           (5,129.2,5.3,42,'M')],["id","weight","height","age","gender"])
#分別打印dataframe未去重和去重后的行數(shù)
print("count of rows: {}".format(df.count()))
print("count of distinct rows: {}".format(df.distinct().count()))

count of rows: 7
count of distinct rows: 6

#去掉重復(fù)的行
df = df.dropDuplicates()
df.show()

+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
| 5| 133.2| 5.7| 54| F|
| 5| 129.2| 5.3| 42| M|
| 1| 144.5| 5.9| 33| M|
| 4| 144.5| 5.9| 33| M|
| 2| 167.2| 5.4| 45| M|
| 3| 124.1| 5.2| 23| F|
+---+------+------+---+------+

#計(jì)算排除id后是否有重復(fù)的數(shù)據(jù)
print("counts of ids: {}".format(df.count()))
print("counts of distinct ids: {}".format(df.select([c for c in df.columns if c != "id"]).distinct().count()))

counts of ids: 6
counts of distinct ids: 5

#發(fā)現(xiàn)有2行出去ID外其它都是重復(fù)的,現(xiàn)在要去掉其中的一行
df = df.dropDuplicates(subset = [c for c in df.columns if c != "id"])
df.show()

+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
| 5| 133.2| 5.7| 54| F|
| 1| 144.5| 5.9| 33| M|
| 2| 167.2| 5.4| 45| M|
| 3| 124.1| 5.2| 23| F|
| 5| 129.2| 5.3| 42| M|
+---+------+------+---+------+

#ok.現(xiàn)在來(lái)計(jì)算下是否有重復(fù)的ID
import pyspark.sql.functions as fn #導(dǎo)入spark sql的一些函數(shù)

df.agg(fn.count("id").alias("count"),
       fn.countDistinct("id").alias("distinct")).show()

+-----+--------+
|count|distinct|
+-----+--------+
| 5| 4|
+-----+--------+

#發(fā)現(xiàn)有重復(fù)的ID,我們可能需要重新給每行數(shù)據(jù)分分配唯一的新的ID來(lái)標(biāo)示它們
df.withColumn("newId",fn.monotonically_increasing_id()).show()
#withColums 新增一列
#monotonically_increasing_id 生成唯一自增ID

+---+------+------+---+------+-------------+
| id|weight|height|age|gender| newId|
+---+------+------+---+------+-------------+
| 5| 133.2| 5.7| 54| F| 25769803776|
| 1| 144.5| 5.9| 33| M| 171798691840|
| 2| 167.2| 5.4| 45| M| 592705486848|
| 3| 124.1| 5.2| 23| F|1236950581248|
| 5| 129.2| 5.3| 42| M|1365799600128|
+---+------+------+---+------+-------------+

數(shù)據(jù)缺失

df_miss = spark.createDataFrame([(1,143.5,5.6,28,'M',10000),
                                (2,167.2,5.4,45,'M',None),
                                (3,None,5.2,None,None,None),
                                (4,144.5,5.9,33,'M',None),
                                (5,133.2,5.7,54,'F',None),
                                (6,124.1,5.2,None,'F',None),
                                (7,129.2,5.3,42,'M',76000)],
                               ['id','weight','height','age','gender','income'])
#統(tǒng)計(jì)每一行缺失的數(shù)據(jù)量
df_miss.rdd.map(lambda row: (row['id'],sum([c == None for c in row]))).collect()

[(1, 0), (2, 1), (3, 4), (4, 1), (5, 1), (6, 2), (7, 0)]

#第三行數(shù)據(jù)缺失有點(diǎn)多,來(lái)看一下第三行數(shù)據(jù)
df_miss.where('id == 3').show()

+---+------+------+----+------+------+
| id|weight|height| age|gender|income|
+---+------+------+----+------+------+
| 3| null| 5.2|null| null| null|
+---+------+------+----+------+------+

#統(tǒng)計(jì)每列數(shù)據(jù)缺失情況
df_miss.agg(*[(1-(fn.count(c)/fn.count('*'))).alias(c + "_miss") for c in df_miss.columns]).show()

+-------+------------------+-----------+------------------+------------------+------------------+
|id_miss| weight_miss|height_miss| age_miss| gender_miss| income_miss|
+-------+------------------+-----------+------------------+------------------+------------------+
| 0.0|0.1428571428571429| 0.0|0.2857142857142857|0.1428571428571429|0.7142857142857143|
+-------+------------------+-----------+------------------+------------------+------------------+

#income列缺失太多莫湘,基本無(wú)用了,現(xiàn)在要去掉這一列數(shù)據(jù)
df_miss_no_income = df_miss.select([c for c in df_miss.columns if c != "income"])
df_miss_no_income.show()

+---+------+------+----+------+
| id|weight|height| age|gender|
+---+------+------+----+------+
| 1| 143.5| 5.6| 28| M|
| 2| 167.2| 5.4| 45| M|
| 3| null| 5.2|null| null|
| 4| 144.5| 5.9| 33| M|
| 5| 133.2| 5.7| 54| F|
| 6| 124.1| 5.2|null| F|
| 7| 129.2| 5.3| 42| M|
+---+------+------+----+------+

#某些行缺失的數(shù)據(jù)也比較多郑气,現(xiàn)在去除掉這些行
#thresh=3 表示一行中非NONE的數(shù)據(jù)少于3個(gè)則去除該行
df_miss_no_income.dropna(thresh=3).show()

+---+------+------+----+------+
| id|weight|height| age|gender|
+---+------+------+----+------+
| 1| 143.5| 5.6| 28| M|
| 2| 167.2| 5.4| 45| M|
| 4| 144.5| 5.9| 33| M|
| 5| 133.2| 5.7| 54| F|
| 6| 124.1| 5.2|null| F|
| 7| 129.2| 5.3| 42| M|
+---+------+------+----+------+

#只要含有NONE則去除該行
df_miss_no_income.dropna().show()

+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
| 1| 143.5| 5.6| 28| M|
| 2| 167.2| 5.4| 45| M|
| 4| 144.5| 5.9| 33| M|
| 5| 133.2| 5.7| 54| F|
| 7| 129.2| 5.3| 42| M|
+---+------+------+---+------+

#為none值填充新值
means = df_miss_no_income.agg(*[fn.mean(c).alias(c) 
                                for c in df_miss_no_income.columns if c != 'gender'])\
                                .toPandas().to_dict('records')[0]
means['gender'] = "missing"
print(means)
#df.fillna(dict) 填充df中的none值,dict中以各個(gè)col字段作為key腰池,要填充的值作為value 
df_miss_no_income.fillna(means).show()

{'age': 40.4, 'height': 5.471428571428571, 'gender': 'missing', 'weight': 140.28333333333333, 'id': 4.0}
+---+------------------+------+---+-------+
| id| weight|height|age| gender|
+---+------------------+------+---+-------+
| 1| 143.5| 5.6| 28| M|
| 2| 167.2| 5.4| 45| M|
| 3|140.28333333333333| 5.2| 40|missing|
| 4| 144.5| 5.9| 33| M|
| 5| 133.2| 5.7| 54| F|
| 6| 124.1| 5.2| 40| F|
| 7| 129.2| 5.3| 42| M|
+---+------------------+------+---+-------+

異常值

df_outliers = spark.createDataFrame([(1,143.5,5.3,28),
                                    (2,154.2,5.5,45),
                                    (3,342.3,5.1,99),
                                    (4,144.5,5.5,33),
                                    (5,133.2,5.4,54),
                                    (6,124.1,5.1,21),
                                    (7,129.2,5.3,42)],["id","weight","height","age"])
cols = ["weight","height","age"]
#bounds,用來(lái)存儲(chǔ)后面生成的各個(gè)字段值的邊界
bounds = {}
for col in cols:
    #涉及統(tǒng)計(jì)中的4分位尾组。計(jì)算Q1和Q3
    quantiles = df_outliers.approxQuantile(col, [0.25,0.75], 0.05)
    #計(jì)算4分位距
    IQR = quantiles[1] - quantiles[0]
    #計(jì)算內(nèi)限
    bounds[col] = [quantiles[0] - 1.5*IQR, quantiles[1] + 1.5*IQR]
    
print("bounds: ",bounds)
#判斷是否為異常值,在內(nèi)限之外的值為異常值
outliers = df_outliers.select(*['id'] + \
                              [((df_outliers[c] < bounds[c][0]) | (df_outliers[c] > bounds[c][1]) )\
                               .alias(c +"_o") for c in cols])
outliers.show()

bounds: {'age': [-11.0, 93.0], 'height': [4.499999999999999, 6.1000000000000005], 'weight': [91.69999999999999, 191.7]}
+---+--------+--------+-----+
| id|weight_o|height_o|age_o|
+---+--------+--------+-----+
| 1| false| false|false|
| 2| false| false|false|
| 3| true| false| true|
| 4| false| false|false|
| 5| false| false|false|
| 6| false| false|false|
| 7| false| false|false|
+---+--------+--------+-----+

#查詢出異常值
df_outliers = df_outliers.join(outliers,on = 'id')
#上面的join語(yǔ)句不要寫(xiě)成 df_outliers.join(outliers, df_outliers.id == outliers.id) 否則在
#新生成的 df_outliers中會(huì)有2列id示弓,后面在select時(shí)會(huì)報(bào)錯(cuò)AnalysisException: "Reference 'id' is ambiguous
df_outliers.show()

+---+------+------+---+--------+--------+-----+
| id|weight|height|age|weight_o|height_o|age_o|
+---+------+------+---+--------+--------+-----+
| 7| 129.2| 5.3| 42| false| false|false|
| 6| 124.1| 5.1| 21| false| false|false|
| 5| 133.2| 5.4| 54| false| false|false|
| 1| 143.5| 5.3| 28| false| false|false|
| 3| 342.3| 5.1| 99| true| false| true|
| 2| 154.2| 5.5| 45| false| false|false|
| 4| 144.5| 5.5| 33| false| false|false|
+---+------+------+---+--------+--------+-----+

df_outliers.filter('weight_o').select('id','weight').show()

+---+------+
| id|weight|
+---+------+
| 3| 342.3|
+---+------+

df_outliers.filter("age_o").select("id","age").show()

+---+---+
| id|age|
+---+---+
| 3| 99|
+---+---+

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末讳侨,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子奏属,更是在濱河造成了極大的恐慌跨跨,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,454評(píng)論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件囱皿,死亡現(xiàn)場(chǎng)離奇詭異勇婴,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)嘱腥,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,553評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門耕渴,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人齿兔,你說(shuō)我怎么就攤上這事橱脸〈∶祝” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 157,921評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵添诉,是天一觀的道長(zhǎng)屁桑。 經(jīng)常有香客問(wèn)我,道長(zhǎng)栏赴,這世上最難降的妖魔是什么掏颊? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,648評(píng)論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮艾帐,結(jié)果婚禮上乌叶,老公的妹妹穿的比我還像新娘。我一直安慰自己柒爸,他們只是感情好准浴,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,770評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著捎稚,像睡著了一般乐横。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上今野,一...
    開(kāi)封第一講書(shū)人閱讀 49,950評(píng)論 1 291
  • 那天葡公,我揣著相機(jī)與錄音,去河邊找鬼条霜。 笑死催什,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的宰睡。 我是一名探鬼主播蒲凶,決...
    沈念sama閱讀 39,090評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼拆内!你這毒婦竟也來(lái)了旋圆?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 37,817評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤麸恍,失蹤者是張志新(化名)和其女友劉穎灵巧,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體抹沪,經(jīng)...
    沈念sama閱讀 44,275評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡刻肄,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,592評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了采够。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片肄方。...
    茶點(diǎn)故事閱讀 38,724評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖蹬癌,靈堂內(nèi)的尸體忽然破棺而出权她,到底是詐尸還是另有隱情虹茶,我是刑警寧澤,帶...
    沈念sama閱讀 34,409評(píng)論 4 333
  • 正文 年R本政府宣布隅要,位于F島的核電站蝴罪,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏步清。R本人自食惡果不足惜要门,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,052評(píng)論 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望廓啊。 院中可真熱鬧欢搜,春花似錦、人聲如沸谴轮。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,815評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)第步。三九已至疮装,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間粘都,已是汗流浹背廓推。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,043評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留翩隧,地道東北人樊展。 一個(gè)月前我還...
    沈念sama閱讀 46,503評(píng)論 2 361
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像鸽心,于是被迫代替她去往敵國(guó)和親滚局。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,627評(píng)論 2 350

推薦閱讀更多精彩內(nèi)容