pyspark工具機(jī)器學(xué)習(xí)(自然語言處理和推薦系統(tǒng))2數(shù)據(jù)處理2

圖片.png

用戶定義函數(shù)(UDF:User-Defined Functions)

UDF廣泛用于數(shù)據(jù)處理李命,以轉(zhuǎn)換數(shù)據(jù)幀。 PySpark中有兩種類型的UDF:常規(guī)UDF和Pandas UDF盟蚣。 Pandas UDF在速度和處理時間方面更加強(qiáng)大悦屏。

  • 傳統(tǒng)的Python函數(shù)

>>> from pyspark.sql.functions import udf
>>> def price_range(brand):
...     prices = {"Samsung":'High Price', "Apple":'High Price', "MI":'Mid Price'}
...     return prices.get('test',"Low Price")
... 
>>> brand_udf=udf(price_range,StringType())
>>> df.withColumn('price_range',brand_udf(df['mobile'])).show(10,False)
+-------+---+----------+------+-------+-----------+                             
|ratings|age|experience|family|mobile |price_range|
+-------+---+----------+------+-------+-----------+
|3      |32 |9.0       |3     |Vivo   |Low Price  |
|3      |27 |13.0      |3     |Apple  |Low Price  |
|4      |22 |2.5       |0     |Samsung|Low Price  |
|4      |37 |16.5      |4     |Apple  |Low Price  |
|5      |27 |9.0       |1     |MI     |Low Price  |
|4      |27 |9.0       |0     |Oppo   |Low Price  |
|5      |37 |23.0      |5     |Vivo   |Low Price  |
|5      |37 |23.0      |5     |Samsung|Low Price  |
|3      |22 |2.5       |0     |Apple  |Low Price  |
|3      |27 |6.0       |0     |MI     |Low Price  |
+-------+---+----------+------+-------+-----------+
only showing top 10 rows

>>> 
  • Lambda函數(shù)

>>> age_udf = udf(lambda age: "young" if age <= 30 else "senior", StringType())
>>> df.withColumn("age_group", age_udf(df.age)).show(10,False)
+-------+---+----------+------+-------+---------+
|ratings|age|experience|family|mobile |age_group|
+-------+---+----------+------+-------+---------+
|3      |32 |9.0       |3     |Vivo   |senior   |
|3      |27 |13.0      |3     |Apple  |young    |
|4      |22 |2.5       |0     |Samsung|young    |
|4      |37 |16.5      |4     |Apple  |senior   |
|5      |27 |9.0       |1     |MI     |young    |
|4      |27 |9.0       |0     |Oppo   |young    |
|5      |37 |23.0      |5     |Vivo   |senior   |
|5      |37 |23.0      |5     |Samsung|senior   |
|3      |22 |2.5       |0     |Apple  |young    |
|3      |27 |6.0       |0     |MI     |young    |
+-------+---+----------+------+-------+---------+
only showing top 10 rows
圖片.png
  • PandasUDF(矢量化UDF)

有兩種類型的Pandas UDF:Scalar和GroupedMap节沦。

Pandas UDF與使用基本UDf非常相似。我們必須首先從PySpark導(dǎo)入pandas_udf并將其應(yīng)用于要轉(zhuǎn)換的任何特定列础爬。


>>> from pyspark.sql.functions import pandas_udf
>>> def remaining_yrs(age):
...     return (100-age)
... 
>>> from pyspark.sql.types import IntegerType
>>> length_udf = pandas_udf(remaining_yrs, IntegerType())
>>> df.withColumn("yrs_left", length_udf(df['age'])).show(10,False)
/opt/anaconda3/lib/python3.6/site-packages/pyarrow/__init__.py:159: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream
  warnings.warn("pyarrow.open_stream is deprecated, please use "
+-------+---+----------+------+-------+--------+                                
|ratings|age|experience|family|mobile |yrs_left|
+-------+---+----------+------+-------+--------+
|3      |32 |9.0       |3     |Vivo   |68      |
|3      |27 |13.0      |3     |Apple  |73      |
|4      |22 |2.5       |0     |Samsung|78      |
|4      |37 |16.5      |4     |Apple  |63      |
|5      |27 |9.0       |1     |MI     |73      |
|4      |27 |9.0       |0     |Oppo   |73      |
|5      |37 |23.0      |5     |Vivo   |63      |
|5      |37 |23.0      |5     |Samsung|63      |
|3      |22 |2.5       |0     |Apple  |78      |
|3      |27 |6.0       |0     |MI     |73      |
+-------+---+----------+------+-------+--------+
only showing top 10 rows

  • PandasUDF(多列)

>>> def prod(rating,exp):
...     return rating*exp
... 
>>> prod_udf = pandas_udf(prod, DoubleType())
>>> df.withColumn("product",prod_udf(df['ratings'], df['experience'])).show(10,False)
/opt/anaconda3/lib/python3.6/site-packages/pyarrow/__init__.py:159: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream
  warnings.warn("pyarrow.open_stream is deprecated, please use "
+-------+---+----------+------+-------+-------+
|ratings|age|experience|family|mobile |product|
+-------+---+----------+------+-------+-------+
|3      |32 |9.0       |3     |Vivo   |27.0   |
|3      |27 |13.0      |3     |Apple  |39.0   |
|4      |22 |2.5       |0     |Samsung|10.0   |
|4      |37 |16.5      |4     |Apple  |66.0   |
|5      |27 |9.0       |1     |MI     |45.0   |
|4      |27 |9.0       |0     |Oppo   |36.0   |
|5      |37 |23.0      |5     |Vivo   |115.0  |
|5      |37 |23.0      |5     |Samsung|115.0  |
|3      |22 |2.5       |0     |Apple  |7.5    |
|3      |27 |6.0       |0     |MI     |18.0   |
+-------+---+----------+------+-------+-------+
only showing top 10 rows

刪除重復(fù)值


>>> df.count()
33
>>> df=df.dropDuplicates()
>>> df.count()
26

刪除列


>>> df_new=df.drop('mobile')
>>> df_new.show()
+-------+---+----------+------+
|ratings|age|experience|family|
+-------+---+----------+------+
|      3| 32|       9.0|     3|
|      4| 22|       2.5|     0|
|      5| 27|       6.0|     0|
|      4| 22|       6.0|     1|
|      3| 27|       6.0|     0|
|      2| 32|      16.5|     2|
|      4| 27|       9.0|     0|
|      2| 27|       9.0|     2|
|      3| 37|      16.5|     5|
|      4| 27|       6.0|     1|
|      5| 37|      23.0|     5|
|      2| 27|       6.0|     2|
|      4| 37|       6.0|     0|
|      5| 37|      23.0|     5|
|      4| 37|       9.0|     2|
|      5| 37|      13.0|     1|
|      5| 27|       2.5|     0|
|      3| 42|      23.0|     5|
|      5| 22|       2.5|     0|
|      1| 37|      23.0|     5|
+-------+---+----------+------+
only showing top 20 rows

參考資料

寫數(shù)據(jù)

  • CSV

如果我們想以原始csv格式將其保存為單個文件放可,我們可以在spark中使用coalesce函數(shù)。


>>> write_uri = '/home/andrew/test.csv'
>>> df.coalesce(1).write.format("csv").option("header","true").save(write_uri)
  • Parquet

如果數(shù)據(jù)集很大且涉及很多列,我們可以選擇對其進(jìn)行壓縮并將其轉(zhuǎn)換為Parquet文件格式蚤氏。它減少了數(shù)據(jù)的整體大小并在處理數(shù)據(jù)時優(yōu)化了性能吐葵,因?yàn)樗梢蕴幚硭枇械淖蛹皇钦麄€數(shù)據(jù)逢慌。
我們可以輕松地將數(shù)據(jù)幀轉(zhuǎn)換并保存為Parquet格式娩贷。

注意完整的數(shù)據(jù)集以及代碼可以在本書的GitHub存儲庫中進(jìn)行參考,并在onSpark 2.3及更高版本上執(zhí)行最佳碱茁。

圖片.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末裸卫,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子纽竣,更是在濱河造成了極大的恐慌墓贿,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,204評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蜓氨,死亡現(xiàn)場離奇詭異聋袋,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)穴吹,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,091評論 3 395
  • 文/潘曉璐 我一進(jìn)店門幽勒,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人港令,你說我怎么就攤上這事啥容。” “怎么了顷霹?”我有些...
    開封第一講書人閱讀 164,548評論 0 354
  • 文/不壞的土叔 我叫張陵咪惠,是天一觀的道長。 經(jīng)常有香客問我淋淀,道長遥昧,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,657評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮炭臭,結(jié)果婚禮上永脓,老公的妹妹穿的比我還像新娘。我一直安慰自己鞋仍,他們只是感情好常摧,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,689評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著凿试,像睡著了一般排宰。 火紅的嫁衣襯著肌膚如雪似芝。 梳的紋絲不亂的頭發(fā)上那婉,一...
    開封第一講書人閱讀 51,554評論 1 305
  • 那天,我揣著相機(jī)與錄音党瓮,去河邊找鬼详炬。 笑死,一個胖子當(dāng)著我的面吹牛寞奸,可吹牛的內(nèi)容都是我干的呛谜。 我是一名探鬼主播,決...
    沈念sama閱讀 40,302評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼枪萄,長吁一口氣:“原來是場噩夢啊……” “哼隐岛!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起瓷翻,我...
    開封第一講書人閱讀 39,216評論 0 276
  • 序言:老撾萬榮一對情侶失蹤聚凹,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后齐帚,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體妒牙,經(jīng)...
    沈念sama閱讀 45,661評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,851評論 3 336
  • 正文 我和宋清朗相戀三年对妄,在試婚紗的時候發(fā)現(xiàn)自己被綠了湘今。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,977評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡剪菱,死狀恐怖摩瞎,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情孝常,我是刑警寧澤愉豺,帶...
    沈念sama閱讀 35,697評論 5 347
  • 正文 年R本政府宣布,位于F島的核電站茫因,受9級特大地震影響蚪拦,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,306評論 3 330
  • 文/蒙蒙 一驰贷、第九天 我趴在偏房一處隱蔽的房頂上張望盛嘿。 院中可真熱鬧,春花似錦括袒、人聲如沸次兆。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,898評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽芥炭。三九已至,卻和暖如春恃慧,著一層夾襖步出監(jiān)牢的瞬間园蝠,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,019評論 1 270
  • 我被黑心中介騙來泰國打工痢士, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留彪薛,地道東北人。 一個月前我還...
    沈念sama閱讀 48,138評論 3 370
  • 正文 我出身青樓怠蹂,卻偏偏與公主長得像善延,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子城侧,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,927評論 2 355