pyspark一些簡單常用的函數(shù)方法

1、將一個字符或數(shù)字列轉(zhuǎn)換為vector/array

from pyspark.sql.functions import col,udf
from pyspark.ml.linalg import Vectors, _convert_to_vector, VectorUDT, DenseVector

# 數(shù)字的可轉(zhuǎn)為vector衣式,但字符串轉(zhuǎn)為vector會報錯
to_vec = udf(lambda x: DenseVector([x]), VectorUDT())

# 字符串轉(zhuǎn)為array
to_array = udf(lambda x: [x], ArrayType(StringType()))

2、從一個向量或數(shù)組列中獲取某個位置處的值

df = spark.createDataFrame([(1, [1,2,3]), (2, [4,5,6])], ['label', 'data'])
df.show()
df.printSchema()

+-----+---------+
|label|     data|
+-----+---------+
|    1|[1, 2, 3]|
|    2|[4, 5, 6]|
+-----+---------+

root
 |-- label: long (nullable = true)
 |-- data: array (nullable = true)
 |    |-- element: long (containsNull = true)

# 可以根據(jù)某一列的值作為索引來選擇特定位置的值
from pyspark.sql.functions import udf,col
from pyspark.sql.types import FloatType

firstelement=udf(lambda k,v:float(v[int(k)]),FloatType())
df.withColumn('value', firstelement(col('label'), col('data'))).show(4, truncate=False)
+-----+---------+-----+
|label|data     |value|
+-----+---------+-----+
|1    |[1, 2, 3]|2.0  |
|2    |[4, 5, 6]|6.0  |
+-----+---------+-----+ 

3檐什、單個list列變多列

參考https://stackoverflow.com/questions/45789489/how-to-split-a-list-to-multiple-columns-in-pyspark

4碴卧、獲取每個類別的前n個數(shù)據(jù)

參考:https://stackoverflow.com/questions/38397796/retrieve-top-n-in-each-group-of-a-dataframe-in-pyspark

rdd = sc.parallelize([("user_1",  "object_1",  3), 
                      ("user_1",  "object_2",  2), 
                      ("user_2",  "object_1",  5), 
                      ("user_2",  "object_2",  2), 
                      ("user_2",  "object_2",  6)])
df = sqlContext.createDataFrame(rdd, ["user_id", "object_id", "score"])

from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col

window = Window.partitionBy(df['user_id']).orderBy(df['score'].desc())

df.select('*', rank().over(window).alias('rank')) 
  .filter(col('rank') <= 2) 
  .show() 
#+-------+---------+-----+----+
#|user_id|object_id|score|rank|
#+-------+---------+-----+----+
#| user_1| object_1|    3|   1|
#| user_1| object_2|    2|   2|
#| user_2| object_2|    6|   1|
#| user_2| object_1|    5|   2|
#+-------+---------+-----+----+

5、將字符串轉(zhuǎn)變?yōu)槿掌?/h3>
from pyspark.sql.functions import min, max

df = spark.createDataFrame([
  "2017-01-01", "2018-02-08", "2019-01-03", "2019-01-01"], "string"
).selectExpr("CAST(value AS date) AS date")

# 或者
df = spark.createDataFrame([
  "2017-01-01", "2018-02-08", "2019-01-03", "2019-01-01"], "string"
).withColumn('date', F.col('value').cast(DateType())) 

min_date, max_date = df.select(min("date"), max("date")).first()
min_date, max_date

# (datetime.date(2017, 1, 1), datetime.date(2019, 1, 3))

6乃正、檢測缺失值

d1 = spark.createDataFrame([(10,'a',None), (20, 'b',3), (30, 'c',4), 
                            (20, 'b',5), (30, 'd',6), (40, None,7), 
                            (None, 'e',8)], ['value', 'key','v2'])
d1 = d1.select('key', 'value', 'v2')

d1.where(reduce(lambda x, y: x | y, (F.col(x).isNull() for x in d1.columns))).show()

# 或者
d1.where(F.col('key').isNull() | F.col('value').isNull() | F.col('v2').isNull()).show()

# +----+-----+----+
# | key|value|  v2|
# +----+-----+----+
# |   a|   10|null|
# |null|   40|   7|
# |   e| null|   8|
# +----+-----+----+

更進一步地住册,同時還檢測NaN和空字符串:

d1 = spark.createDataFrame([(10,'a', None), (20, 'b', 3.), (30, 'c',4.), 
                            (20, 'b',5.), (30, 'd', np.nan), (40, None,7.), 
                            (None, 'e',8.), (50, '', 8.)], ['value', 'key','v2'])
d1 = d1.select('key', 'value', 'v2')
d1.show()
# +----+-----+----+
# | key|value|  v2|
# +----+-----+----+
# |   a|   10|null|
# |   b|   20| 3.0|
# |   c|   30| 4.0|
# |   b|   20| 5.0|
# |   d|   30| NaN|
# |null|   40| 7.0|
# |   e| null| 8.0|
# |    |   50| 8.0|
# +----+-----+----+

d1.where((F.col('key').isNotNull()) & (F.col('key')!='') & (~F.isnan(F.col('v2')))).show()
# +---+-----+----+
# |key|value|  v2|
# +---+-----+----+
# |  a|   10|null|
# |  b|   20| 3.0|
# |  c|   30| 4.0|
# |  b|   20| 5.0|
# |  e| null| 8.0|
# +---+-----+----+

7、填充非連續(xù)時間序列

參考:https://walkenho.github.io/interpolating-time-series-p2-spark/
https://stackoverflow.com/questions/42411184/filling-gaps-in-timeseries-spark

或者 https://stackoverflow.com/questions/39271374/pyspark-how-to-resample-frequencies

import random

data = {'readtime' : pd.date_range(start='1/15/2018', end='02/14/2018', freq='D')\
                       .append(pd.date_range(start='1/15/2018', end='02/14/2018', freq='D')),
        'house' : ['house1' for i in range(31)] + ['house2' for i in range(31)],
        'readvalue': [0.5+0.5*np.sin(2*np.pi/30*i) for i in range(31)]\
                     + [0.5+0.5*np.cos(2*np.pi/30*i) for i in range(31)]}
df0 = pd.DataFrame(data, columns = ['readtime', 'house', 'readvalue'])
random.seed(42)
df0 = df0.drop(random.sample(range(df0.shape[0]), k=int(df0.shape[0]/2)))
df0.head()

    readtime    house   readvalue
2   2018-01-17  house1  0.703368
5   2018-01-20  house1  0.933013
7   2018-01-22  house1  0.997261
8   2018-01-23  house1  0.997261
14  2018-01-29  house1  0.603956

import pyspark.sql.functions as func
from pyspark.sql.functions import col
df = spark.createDataFrame(df0)
df = df.withColumn("readtime", col('readtime')/1e9)\
       .withColumn("readtime_existent", col("readtime"))
df.show(10, False)

+-----------+------+--------------------+-----------------+
|readtime   |house |readvalue           |readtime_existent|
+-----------+------+--------------------+-----------------+
|1.5161472E9|house1|0.7033683215379001  |1.5161472E9      |
|1.5164064E9|house1|0.9330127018922193  |1.5164064E9      |
|1.5165792E9|house1|0.9972609476841366  |1.5165792E9      |
|1.5166656E9|house1|0.9972609476841368  |1.5166656E9      |
|1.517184E9 |house1|0.6039558454088798  |1.517184E9       |
|1.5172704E9|house1|0.5000000000000001  |1.5172704E9      |
|1.5174432E9|house1|0.2966316784621001  |1.5174432E9      |
|1.5175296E9|house1|0.2061073738537635  |1.5175296E9      |
|1.5177024E9|house1|0.06698729810778081 |1.5177024E9      |
|1.5177888E9|house1|0.024471741852423234|1.5177888E9      |
+-----------+------+--------------------+-----------------+
only showing top 10 rows

三步實現(xiàn)填充時間gap:

  • In the first step, we group the data by ‘house’ and generate an array containing an equally spaced time grid for each house.

  • In the second step, we create one row for each element of the arrays by using the spark SQL function explode().

  • In the third step, the resulting structure is used as a basis to which the existing read value information is joined using an outer left join.

from pyspark.sql.types import *

# define function to create date range
def date_range(t1, t2, step=60*60*24):
    """Return a list of equally spaced points between t1 and t2 with stepsize step."""
    return [t1 + step*x for x in range(int((t2-t1)/step)+1)]

# define udf
date_range_udf = func.udf(date_range, ArrayType(LongType()))

# obtain min and max of time period for each house
df_base = df.groupBy('house')\
            .agg(func.min('readtime').cast('integer').alias('readtime_min'), 
                 func.max('readtime').cast('integer').alias('readtime_max'))

# generate timegrid and explode
df_base = df_base.withColumn("readtime", func.explode(date_range_udf("readtime_min", "readtime_max")))\
             .drop('readtime_min', 'readtime_max')

# left outer join existing read values
df_all_dates = df_base.join(df, ["house", "readtime"], "leftouter")

tmp = df_all_dates.withColumn('readtime', func.from_unixtime(col('readtime')))
tmp.orderBy('house','readtime').show(20, False)

+------+-------------------+--------------------+-----------------+
|house |readtime           |readvalue           |readtime_existent|
+------+-------------------+--------------------+-----------------+
|house1|2018-01-17 08:00:00|0.7033683215379001  |1.5161472E9      |
|house1|2018-01-18 08:00:00|null                |null             |
|house1|2018-01-19 08:00:00|null                |null             |
|house1|2018-01-20 08:00:00|0.9330127018922193  |1.5164064E9      |
|house1|2018-01-21 08:00:00|null                |null             |
|house1|2018-01-22 08:00:00|0.9972609476841366  |1.5165792E9      |
|house1|2018-01-23 08:00:00|0.9972609476841368  |1.5166656E9      |
|house1|2018-01-24 08:00:00|null                |null             |
|house1|2018-01-25 08:00:00|null                |null             |
|house1|2018-01-26 08:00:00|null                |null             |
|house1|2018-01-27 08:00:00|null                |null             |
|house1|2018-01-28 08:00:00|null                |null             |
|house1|2018-01-29 08:00:00|0.6039558454088798  |1.517184E9       |
|house1|2018-01-30 08:00:00|0.5000000000000001  |1.5172704E9      |
|house1|2018-01-31 08:00:00|null                |null             |
|house1|2018-02-01 08:00:00|0.2966316784621001  |1.5174432E9      |
|house1|2018-02-02 08:00:00|0.2061073738537635  |1.5175296E9      |
|house1|2018-02-03 08:00:00|null                |null             |
|house1|2018-02-04 08:00:00|0.06698729810778081 |1.5177024E9      |
|house1|2018-02-05 08:00:00|0.024471741852423234|1.5177888E9      |
+------+-------------------+--------------------+-----------------+
only showing top 20 rows

8瓮具、當(dāng)前行與上一行值得差

參考:https://www.arundhaj.com/blog/calculate-difference-with-previous-row-in-pyspark.html

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql.window import Window

sc = SparkContext(appName="PrevRowDiffApp")
sqlc = SQLContext(sc)

rdd = sc.parallelize([(1, 65), (2, 66), (3, 65), (4, 68), (5, 71)])

df = sqlc.createDataFrame(rdd, ["id", "value"])

my_window = Window.partitionBy().orderBy("id")

df = df.withColumn("prev_value", F.lag(df.value).over(my_window))
df = df.withColumn("diff", F.when(F.isnull(df.value - df.prev_value), 0)
                              .otherwise(df.value - df.prev_value))

df.show()
+---+-----+----------+----+
| id|value|prev_value|diff|
+---+-----+----------+----+
|  1|   65|      null|   0|
|  2|   66|        65|   1|
|  3|   65|        66|  -1|
|  4|   68|        65|   3|
|  5|   71|        68|   3|
+---+-----+----------+----+

9荧飞、udf如何傳入多個參數(shù)來計算list column不同的quantile凡人?

利用閉包:https://stackoverflow.com/questions/52843485/pyspark-pass-multiple-columns-along-with-an-argument-in-udf

def quantile_udf(q=0.5):
    """計算quantile
    """
    def quantile_(arr):
        return float(np.quantile(arr, q))
    return F.udf(quantile_, DoubleType())

history_stat = history_stat.withColumn('quantile_025', quantile_udf(0.25)(F.col('date_diff_list')))\
                        .withColumn('quantile_05', quantile_udf(0.5)(F.col('date_diff_list')))\
                        .withColumn('quantile_075', quantile_udf(0.75)(F.col('date_diff_list')))


history_stat.show(2)
+---------------+-----------------+----------+----------+----------+------------------+-------------------+--------------------+------------+-----------+------------+
|cust_store_dkey|cust_product_dkey| first_day|  last_day|sale_times|max_sales_interval|mean_sales_interval|      date_diff_list|quantile_025|quantile_05|quantile_075|
+---------------+-----------------+----------+----------+----------+------------------+-------------------+--------------------+------------+-----------+------------+
|            560|              211|2017-12-10|2019-03-08|       180|                16| 2.5166666666666666|[0, 11, 4, 11, 1,...|         1.0|        1.0|         3.0|
|            560|              990|2016-12-30|2017-03-17|        20|                26|               3.85|[0, 1, 1, 1, 2, 3...|         1.0|        2.0|         4.0|
+---------------+-----------------+----------+----------+----------+------------------+-------------------+--------------------+------------+-----------+------------+

10、對特定條件的值進行替換

https://stackoverflow.com/questions/44773758/how-to-conditionally-replace-value-in-a-column-based-on-evaluation-of-expression

import numpy as np

df = spark.createDataFrame(
    [(1, 1, None),
     (1, 2, float(5)),
     (1, 3, np.nan),
     (1, 4, None),
     (0, 5, float(10)),
     (1, 6, float('nan')),
     (0, 6, float('nan'))],
    ('session', "timestamp1", "id2"))

+-------+----------+----+
|session|timestamp1| id2|
+-------+----------+----+
|      1|         1|null|
|      1|         2| 5.0|
|      1|         3| NaN|
|      1|         4|null|
|      0|         5|10.0|
|      1|         6| NaN|
|      0|         6| NaN|
+-------+----------+----+

from pyspark.sql.functions import when

targetDf = df.withColumn("timestamp1", \
              when(df["session"] == 0, 999).otherwise(df["timestamp1"]))

11叹阔、如何將sparse vector轉(zhuǎn)化為array挠轴?

vector_udf = udf(lambda vector: vector.toArray().tolist(), ArrayType(FloatType()))

df = df.withColumn('col1', vector_udf('col2'))

需要注意的是,udf中的tolist() 是必須的, 因為spark中沒有np.array類型耳幢。類似的岸晦,當(dāng)我們返回一個np.dtype類型數(shù)據(jù)的時候,也需要使用floatint對其進行轉(zhuǎn)換帅掘。

12委煤、如何將pyspark sparse vector轉(zhuǎn)化為 scipy sparse matrix以及pytorch sparse tensor?

參考:https://stackoverflow.com/questions/40557577/pyspark-sparse-vectors-to-scipy-sparse-matrix
將spark sparse vector轉(zhuǎn)換為scipy csr matrix如下:

import numpy as np
from scipy.sparse import vstack
import numpy as np
from scipy.sparse import csr_matrix
import torch

def as_matrix(vec):
    data, indices = vec.values, vec.indices
    shape = 1, vec.size
    return csr_matrix((data, indices, np.array([0, vec.values.size])), shape)

# cv_cols表示spark中countvectorizer得到的稀疏矩陣
train_pd[csr_cols] = train_pd[cv_cols].applymap(lambda x: as_matrix(x))

# 上面的代碼將每一行的spark sparse vector轉(zhuǎn)換為了scipy csr matrix修档,
# 通過下面的代碼可以將每一列的所有行csr matrix進行合并碧绞,得到一個大的csr matrix
csr_col1 = vstack(train_pd['csr_col1'])

通過上面的代碼可以將sparse vector轉(zhuǎn)換為scipy sparse matrix,具體地——scipy csr matrix吱窝。

下面我們再將scipy csr matrix轉(zhuǎn)換為pytorch sparse tensor讥邻。

def sparse2tensor(tmpdf):
    """
    tmpdf 表示一個scipy csr matrix,如上面得到的csr_col1院峡。
    """
    tmpdf_coo = vstack(tmpdf).tocoo()
    # 下面代碼中的torch.Size能保證轉(zhuǎn)換為sparse tensor后維度一致
    sptensor = torch.sparse.FloatTensor(torch.LongTensor([tmpdf_coo.row.tolist(), tmpdf_coo.col.tolist()]),
                                     torch.FloatTensor(tmpdf_coo.data), torch.Size(tmpdf_coo.shape))
    return sptensor

spt = sparse2tensor(csr_cols1)

當(dāng)數(shù)據(jù)維度非常大且稀疏的時候兴使,使用sparse matrix/tensor能極大的減少內(nèi)存占用,是一個非常實用的方法照激。

12发魄、稀疏向量求和

def sum_vector(vector):
    return float(vector.values.sum())

13、bigint 轉(zhuǎn)timestamp

df = df.withColumn('ttl2', 
          Func.from_unixtime(col("ttl").cast('float') / 1000.0 , 'yyyyMMdd'))

13俩垃、大量category的string2index實現(xiàn)

在進行l(wèi)abel encoder的時候励幼,一般使用stringIndex函數(shù)。但是有時候由于類別數(shù)太多口柳,達到幾千萬上億苹粟,如對推薦的MF中ID的編碼,此時仍然使用stringIndex容易內(nèi)存溢出跃闹,因為這幾千萬個字符要進行序列化并傳播需要較大的機器內(nèi)存嵌削。這時候我們就需要使用其他的方法來進行編碼了。

一種方法是借鑒window函數(shù)望艺,以id作為key進行分window并對每個window賦一個值:

# 原數(shù)據(jù)如下:
id | col |
1  |  a  |
2  |  a  |
3  |  b  |
4  |  c  |
5  |  c  |

from pyspark.sql import window as W
import pyspark.sql.functions as f
df.select('id', f.dense_rank().over(W.Window.orderBy('col')).alias('group')).show(truncate=False)
# 結(jié)果如下:
+---+-----+
|id |group|
+---+-----+
|1  |1    |
|2  |1    |
|3  |2    |
|4  |3    |
|5  |3    |
+---+-----+

此外苛秕,還有一種方法是借用window的row_number函數(shù),先給每一個唯一id編碼找默,然后進行join:

df2 = data.select("col").distinct()
  .withColumn("group", row_number().over(Window.orderBy("col")))

result = data.join(df2,  on="col",  how="inner")

參考:https://stackoverflow.com/questions/50233518/create-a-group-id-over-a-window-in-spark-dataframe.

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末艇劫,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子啡莉,更是在濱河造成了極大的恐慌港准,老刑警劉巖旨剥,帶你破解...
    沈念sama閱讀 217,657評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異浅缸,居然都是意外死亡轨帜,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,889評論 3 394
  • 文/潘曉璐 我一進店門衩椒,熙熙樓的掌柜王于貴愁眉苦臉地迎上來蚌父,“玉大人,你說我怎么就攤上這事毛萌」冻冢” “怎么了?”我有些...
    開封第一講書人閱讀 164,057評論 0 354
  • 文/不壞的土叔 我叫張陵阁将,是天一觀的道長膏秫。 經(jīng)常有香客問我,道長做盅,這世上最難降的妖魔是什么缤削? 我笑而不...
    開封第一講書人閱讀 58,509評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮吹榴,結(jié)果婚禮上亭敢,老公的妹妹穿的比我還像新娘。我一直安慰自己图筹,他們只是感情好帅刀,可當(dāng)我...
    茶點故事閱讀 67,562評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著远剩,像睡著了一般扣溺。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上民宿,一...
    開封第一講書人閱讀 51,443評論 1 302
  • 那天娇妓,我揣著相機與錄音像鸡,去河邊找鬼活鹰。 笑死,一個胖子當(dāng)著我的面吹牛只估,可吹牛的內(nèi)容都是我干的志群。 我是一名探鬼主播,決...
    沈念sama閱讀 40,251評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼蛔钙,長吁一口氣:“原來是場噩夢啊……” “哼锌云!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起吁脱,我...
    開封第一講書人閱讀 39,129評論 0 276
  • 序言:老撾萬榮一對情侶失蹤桑涎,失蹤者是張志新(化名)和其女友劉穎彬向,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體攻冷,經(jīng)...
    沈念sama閱讀 45,561評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡娃胆,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,779評論 3 335
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了等曼。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片里烦。...
    茶點故事閱讀 39,902評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖禁谦,靈堂內(nèi)的尸體忽然破棺而出胁黑,到底是詐尸還是另有隱情,我是刑警寧澤州泊,帶...
    沈念sama閱讀 35,621評論 5 345
  • 正文 年R本政府宣布丧蘸,位于F島的核電站,受9級特大地震影響遥皂,放射性物質(zhì)發(fā)生泄漏触趴。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,220評論 3 328
  • 文/蒙蒙 一渴肉、第九天 我趴在偏房一處隱蔽的房頂上張望冗懦。 院中可真熱鬧,春花似錦仇祭、人聲如沸披蕉。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,838評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽没讲。三九已至,卻和暖如春礁苗,著一層夾襖步出監(jiān)牢的瞬間爬凑,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,971評論 1 269
  • 我被黑心中介騙來泰國打工试伙, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留嘁信,地道東北人。 一個月前我還...
    沈念sama閱讀 48,025評論 2 370
  • 正文 我出身青樓疏叨,卻偏偏與公主長得像潘靖,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子蚤蔓,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,843評論 2 354