圖片.png
用戶定義函數(shù)(UDF:User-Defined Functions)
UDF廣泛用于數(shù)據(jù)處理李命,以轉(zhuǎn)換數(shù)據(jù)幀。 PySpark中有兩種類型的UDF:常規(guī)UDF和Pandas UDF盟蚣。 Pandas UDF在速度和處理時間方面更加強(qiáng)大悦屏。
- 傳統(tǒng)的Python函數(shù)
>>> from pyspark.sql.functions import udf
>>> def price_range(brand):
... prices = {"Samsung":'High Price', "Apple":'High Price', "MI":'Mid Price'}
... return prices.get('test',"Low Price")
...
>>> brand_udf=udf(price_range,StringType())
>>> df.withColumn('price_range',brand_udf(df['mobile'])).show(10,False)
+-------+---+----------+------+-------+-----------+
|ratings|age|experience|family|mobile |price_range|
+-------+---+----------+------+-------+-----------+
|3 |32 |9.0 |3 |Vivo |Low Price |
|3 |27 |13.0 |3 |Apple |Low Price |
|4 |22 |2.5 |0 |Samsung|Low Price |
|4 |37 |16.5 |4 |Apple |Low Price |
|5 |27 |9.0 |1 |MI |Low Price |
|4 |27 |9.0 |0 |Oppo |Low Price |
|5 |37 |23.0 |5 |Vivo |Low Price |
|5 |37 |23.0 |5 |Samsung|Low Price |
|3 |22 |2.5 |0 |Apple |Low Price |
|3 |27 |6.0 |0 |MI |Low Price |
+-------+---+----------+------+-------+-----------+
only showing top 10 rows
>>>
- Lambda函數(shù)
>>> age_udf = udf(lambda age: "young" if age <= 30 else "senior", StringType())
>>> df.withColumn("age_group", age_udf(df.age)).show(10,False)
+-------+---+----------+------+-------+---------+
|ratings|age|experience|family|mobile |age_group|
+-------+---+----------+------+-------+---------+
|3 |32 |9.0 |3 |Vivo |senior |
|3 |27 |13.0 |3 |Apple |young |
|4 |22 |2.5 |0 |Samsung|young |
|4 |37 |16.5 |4 |Apple |senior |
|5 |27 |9.0 |1 |MI |young |
|4 |27 |9.0 |0 |Oppo |young |
|5 |37 |23.0 |5 |Vivo |senior |
|5 |37 |23.0 |5 |Samsung|senior |
|3 |22 |2.5 |0 |Apple |young |
|3 |27 |6.0 |0 |MI |young |
+-------+---+----------+------+-------+---------+
only showing top 10 rows
圖片.png
- PandasUDF(矢量化UDF)
有兩種類型的Pandas UDF:Scalar和GroupedMap节沦。
Pandas UDF與使用基本UDf非常相似。我們必須首先從PySpark導(dǎo)入pandas_udf并將其應(yīng)用于要轉(zhuǎn)換的任何特定列础爬。
>>> from pyspark.sql.functions import pandas_udf
>>> def remaining_yrs(age):
... return (100-age)
...
>>> from pyspark.sql.types import IntegerType
>>> length_udf = pandas_udf(remaining_yrs, IntegerType())
>>> df.withColumn("yrs_left", length_udf(df['age'])).show(10,False)
/opt/anaconda3/lib/python3.6/site-packages/pyarrow/__init__.py:159: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream
warnings.warn("pyarrow.open_stream is deprecated, please use "
+-------+---+----------+------+-------+--------+
|ratings|age|experience|family|mobile |yrs_left|
+-------+---+----------+------+-------+--------+
|3 |32 |9.0 |3 |Vivo |68 |
|3 |27 |13.0 |3 |Apple |73 |
|4 |22 |2.5 |0 |Samsung|78 |
|4 |37 |16.5 |4 |Apple |63 |
|5 |27 |9.0 |1 |MI |73 |
|4 |27 |9.0 |0 |Oppo |73 |
|5 |37 |23.0 |5 |Vivo |63 |
|5 |37 |23.0 |5 |Samsung|63 |
|3 |22 |2.5 |0 |Apple |78 |
|3 |27 |6.0 |0 |MI |73 |
+-------+---+----------+------+-------+--------+
only showing top 10 rows
- PandasUDF(多列)
>>> def prod(rating,exp):
... return rating*exp
...
>>> prod_udf = pandas_udf(prod, DoubleType())
>>> df.withColumn("product",prod_udf(df['ratings'], df['experience'])).show(10,False)
/opt/anaconda3/lib/python3.6/site-packages/pyarrow/__init__.py:159: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream
warnings.warn("pyarrow.open_stream is deprecated, please use "
+-------+---+----------+------+-------+-------+
|ratings|age|experience|family|mobile |product|
+-------+---+----------+------+-------+-------+
|3 |32 |9.0 |3 |Vivo |27.0 |
|3 |27 |13.0 |3 |Apple |39.0 |
|4 |22 |2.5 |0 |Samsung|10.0 |
|4 |37 |16.5 |4 |Apple |66.0 |
|5 |27 |9.0 |1 |MI |45.0 |
|4 |27 |9.0 |0 |Oppo |36.0 |
|5 |37 |23.0 |5 |Vivo |115.0 |
|5 |37 |23.0 |5 |Samsung|115.0 |
|3 |22 |2.5 |0 |Apple |7.5 |
|3 |27 |6.0 |0 |MI |18.0 |
+-------+---+----------+------+-------+-------+
only showing top 10 rows
刪除重復(fù)值
>>> df.count()
33
>>> df=df.dropDuplicates()
>>> df.count()
26
刪除列
>>> df_new=df.drop('mobile')
>>> df_new.show()
+-------+---+----------+------+
|ratings|age|experience|family|
+-------+---+----------+------+
| 3| 32| 9.0| 3|
| 4| 22| 2.5| 0|
| 5| 27| 6.0| 0|
| 4| 22| 6.0| 1|
| 3| 27| 6.0| 0|
| 2| 32| 16.5| 2|
| 4| 27| 9.0| 0|
| 2| 27| 9.0| 2|
| 3| 37| 16.5| 5|
| 4| 27| 6.0| 1|
| 5| 37| 23.0| 5|
| 2| 27| 6.0| 2|
| 4| 37| 6.0| 0|
| 5| 37| 23.0| 5|
| 4| 37| 9.0| 2|
| 5| 37| 13.0| 1|
| 5| 27| 2.5| 0|
| 3| 42| 23.0| 5|
| 5| 22| 2.5| 0|
| 1| 37| 23.0| 5|
+-------+---+----------+------+
only showing top 20 rows
參考資料
- python測試開發(fā)項(xiàng)目實(shí)戰(zhàn)-目錄
- python工具書籍下載-持續(xù)更新
- python 3.7極速入門教程 - 目錄
- 討論qq群630011153 144081101
- 原文地址
- 本文涉及的python測試開發(fā)庫 謝謝點(diǎn)贊译株!
- [本文相關(guān)海量書籍下載](https://github.com/china-testing/python-api-tesing/blob/master/books.md
- http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html
寫數(shù)據(jù)
- CSV
如果我們想以原始csv格式將其保存為單個文件放可,我們可以在spark中使用coalesce函數(shù)。
>>> write_uri = '/home/andrew/test.csv'
>>> df.coalesce(1).write.format("csv").option("header","true").save(write_uri)
- Parquet
如果數(shù)據(jù)集很大且涉及很多列,我們可以選擇對其進(jìn)行壓縮并將其轉(zhuǎn)換為Parquet文件格式蚤氏。它減少了數(shù)據(jù)的整體大小并在處理數(shù)據(jù)時優(yōu)化了性能吐葵,因?yàn)樗梢蕴幚硭枇械淖蛹皇钦麄€數(shù)據(jù)逢慌。
我們可以輕松地將數(shù)據(jù)幀轉(zhuǎn)換并保存為Parquet格式娩贷。
注意完整的數(shù)據(jù)集以及代碼可以在本書的GitHub存儲庫中進(jìn)行參考,并在onSpark 2.3及更高版本上執(zhí)行最佳碱茁。
圖片.png