1、將一個字符或數(shù)字列轉(zhuǎn)換為vector/array
from pyspark.sql.functions import col,udf
from pyspark.ml.linalg import Vectors, _convert_to_vector, VectorUDT, DenseVector
# 數(shù)字的可轉(zhuǎn)為vector衣式,但字符串轉(zhuǎn)為vector會報錯
to_vec = udf(lambda x: DenseVector([x]), VectorUDT())
# 字符串轉(zhuǎn)為array
to_array = udf(lambda x: [x], ArrayType(StringType()))
2、從一個向量或數(shù)組列中獲取某個位置處的值
df = spark.createDataFrame([(1, [1,2,3]), (2, [4,5,6])], ['label', 'data'])
df.show()
df.printSchema()
+-----+---------+
|label| data|
+-----+---------+
| 1|[1, 2, 3]|
| 2|[4, 5, 6]|
+-----+---------+
root
|-- label: long (nullable = true)
|-- data: array (nullable = true)
| |-- element: long (containsNull = true)
# 可以根據(jù)某一列的值作為索引來選擇特定位置的值
from pyspark.sql.functions import udf,col
from pyspark.sql.types import FloatType
firstelement=udf(lambda k,v:float(v[int(k)]),FloatType())
df.withColumn('value', firstelement(col('label'), col('data'))).show(4, truncate=False)
+-----+---------+-----+
|label|data |value|
+-----+---------+-----+
|1 |[1, 2, 3]|2.0 |
|2 |[4, 5, 6]|6.0 |
+-----+---------+-----+
3檐什、單個list列變多列
參考https://stackoverflow.com/questions/45789489/how-to-split-a-list-to-multiple-columns-in-pyspark
4碴卧、獲取每個類別的前n個數(shù)據(jù)
參考:https://stackoverflow.com/questions/38397796/retrieve-top-n-in-each-group-of-a-dataframe-in-pyspark
rdd = sc.parallelize([("user_1", "object_1", 3),
("user_1", "object_2", 2),
("user_2", "object_1", 5),
("user_2", "object_2", 2),
("user_2", "object_2", 6)])
df = sqlContext.createDataFrame(rdd, ["user_id", "object_id", "score"])
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col
window = Window.partitionBy(df['user_id']).orderBy(df['score'].desc())
df.select('*', rank().over(window).alias('rank'))
.filter(col('rank') <= 2)
.show()
#+-------+---------+-----+----+
#|user_id|object_id|score|rank|
#+-------+---------+-----+----+
#| user_1| object_1| 3| 1|
#| user_1| object_2| 2| 2|
#| user_2| object_2| 6| 1|
#| user_2| object_1| 5| 2|
#+-------+---------+-----+----+
5、將字符串轉(zhuǎn)變?yōu)槿掌?/h3>
from pyspark.sql.functions import min, max
df = spark.createDataFrame([
"2017-01-01", "2018-02-08", "2019-01-03", "2019-01-01"], "string"
).selectExpr("CAST(value AS date) AS date")
# 或者
df = spark.createDataFrame([
"2017-01-01", "2018-02-08", "2019-01-03", "2019-01-01"], "string"
).withColumn('date', F.col('value').cast(DateType()))
min_date, max_date = df.select(min("date"), max("date")).first()
min_date, max_date
# (datetime.date(2017, 1, 1), datetime.date(2019, 1, 3))
6乃正、檢測缺失值
d1 = spark.createDataFrame([(10,'a',None), (20, 'b',3), (30, 'c',4),
(20, 'b',5), (30, 'd',6), (40, None,7),
(None, 'e',8)], ['value', 'key','v2'])
d1 = d1.select('key', 'value', 'v2')
d1.where(reduce(lambda x, y: x | y, (F.col(x).isNull() for x in d1.columns))).show()
# 或者
d1.where(F.col('key').isNull() | F.col('value').isNull() | F.col('v2').isNull()).show()
# +----+-----+----+
# | key|value| v2|
# +----+-----+----+
# | a| 10|null|
# |null| 40| 7|
# | e| null| 8|
# +----+-----+----+
from pyspark.sql.functions import min, max
df = spark.createDataFrame([
"2017-01-01", "2018-02-08", "2019-01-03", "2019-01-01"], "string"
).selectExpr("CAST(value AS date) AS date")
# 或者
df = spark.createDataFrame([
"2017-01-01", "2018-02-08", "2019-01-03", "2019-01-01"], "string"
).withColumn('date', F.col('value').cast(DateType()))
min_date, max_date = df.select(min("date"), max("date")).first()
min_date, max_date
# (datetime.date(2017, 1, 1), datetime.date(2019, 1, 3))
d1 = spark.createDataFrame([(10,'a',None), (20, 'b',3), (30, 'c',4),
(20, 'b',5), (30, 'd',6), (40, None,7),
(None, 'e',8)], ['value', 'key','v2'])
d1 = d1.select('key', 'value', 'v2')
d1.where(reduce(lambda x, y: x | y, (F.col(x).isNull() for x in d1.columns))).show()
# 或者
d1.where(F.col('key').isNull() | F.col('value').isNull() | F.col('v2').isNull()).show()
# +----+-----+----+
# | key|value| v2|
# +----+-----+----+
# | a| 10|null|
# |null| 40| 7|
# | e| null| 8|
# +----+-----+----+
更進一步地住册,同時還檢測NaN
和空字符串:
d1 = spark.createDataFrame([(10,'a', None), (20, 'b', 3.), (30, 'c',4.),
(20, 'b',5.), (30, 'd', np.nan), (40, None,7.),
(None, 'e',8.), (50, '', 8.)], ['value', 'key','v2'])
d1 = d1.select('key', 'value', 'v2')
d1.show()
# +----+-----+----+
# | key|value| v2|
# +----+-----+----+
# | a| 10|null|
# | b| 20| 3.0|
# | c| 30| 4.0|
# | b| 20| 5.0|
# | d| 30| NaN|
# |null| 40| 7.0|
# | e| null| 8.0|
# | | 50| 8.0|
# +----+-----+----+
d1.where((F.col('key').isNotNull()) & (F.col('key')!='') & (~F.isnan(F.col('v2')))).show()
# +---+-----+----+
# |key|value| v2|
# +---+-----+----+
# | a| 10|null|
# | b| 20| 3.0|
# | c| 30| 4.0|
# | b| 20| 5.0|
# | e| null| 8.0|
# +---+-----+----+
7、填充非連續(xù)時間序列
參考:https://walkenho.github.io/interpolating-time-series-p2-spark/
https://stackoverflow.com/questions/42411184/filling-gaps-in-timeseries-spark
或者 https://stackoverflow.com/questions/39271374/pyspark-how-to-resample-frequencies
import random
data = {'readtime' : pd.date_range(start='1/15/2018', end='02/14/2018', freq='D')\
.append(pd.date_range(start='1/15/2018', end='02/14/2018', freq='D')),
'house' : ['house1' for i in range(31)] + ['house2' for i in range(31)],
'readvalue': [0.5+0.5*np.sin(2*np.pi/30*i) for i in range(31)]\
+ [0.5+0.5*np.cos(2*np.pi/30*i) for i in range(31)]}
df0 = pd.DataFrame(data, columns = ['readtime', 'house', 'readvalue'])
random.seed(42)
df0 = df0.drop(random.sample(range(df0.shape[0]), k=int(df0.shape[0]/2)))
df0.head()
readtime house readvalue
2 2018-01-17 house1 0.703368
5 2018-01-20 house1 0.933013
7 2018-01-22 house1 0.997261
8 2018-01-23 house1 0.997261
14 2018-01-29 house1 0.603956
import pyspark.sql.functions as func
from pyspark.sql.functions import col
df = spark.createDataFrame(df0)
df = df.withColumn("readtime", col('readtime')/1e9)\
.withColumn("readtime_existent", col("readtime"))
df.show(10, False)
+-----------+------+--------------------+-----------------+
|readtime |house |readvalue |readtime_existent|
+-----------+------+--------------------+-----------------+
|1.5161472E9|house1|0.7033683215379001 |1.5161472E9 |
|1.5164064E9|house1|0.9330127018922193 |1.5164064E9 |
|1.5165792E9|house1|0.9972609476841366 |1.5165792E9 |
|1.5166656E9|house1|0.9972609476841368 |1.5166656E9 |
|1.517184E9 |house1|0.6039558454088798 |1.517184E9 |
|1.5172704E9|house1|0.5000000000000001 |1.5172704E9 |
|1.5174432E9|house1|0.2966316784621001 |1.5174432E9 |
|1.5175296E9|house1|0.2061073738537635 |1.5175296E9 |
|1.5177024E9|house1|0.06698729810778081 |1.5177024E9 |
|1.5177888E9|house1|0.024471741852423234|1.5177888E9 |
+-----------+------+--------------------+-----------------+
only showing top 10 rows
三步實現(xiàn)填充時間gap:
In the first step, we group the data by ‘house’ and generate an array containing an equally spaced time grid for each house.
In the second step, we create one row for each element of the arrays by using the spark SQL function explode().
In the third step, the resulting structure is used as a basis to which the existing read value information is joined using an outer left join.
from pyspark.sql.types import *
# define function to create date range
def date_range(t1, t2, step=60*60*24):
"""Return a list of equally spaced points between t1 and t2 with stepsize step."""
return [t1 + step*x for x in range(int((t2-t1)/step)+1)]
# define udf
date_range_udf = func.udf(date_range, ArrayType(LongType()))
# obtain min and max of time period for each house
df_base = df.groupBy('house')\
.agg(func.min('readtime').cast('integer').alias('readtime_min'),
func.max('readtime').cast('integer').alias('readtime_max'))
# generate timegrid and explode
df_base = df_base.withColumn("readtime", func.explode(date_range_udf("readtime_min", "readtime_max")))\
.drop('readtime_min', 'readtime_max')
# left outer join existing read values
df_all_dates = df_base.join(df, ["house", "readtime"], "leftouter")
tmp = df_all_dates.withColumn('readtime', func.from_unixtime(col('readtime')))
tmp.orderBy('house','readtime').show(20, False)
+------+-------------------+--------------------+-----------------+
|house |readtime |readvalue |readtime_existent|
+------+-------------------+--------------------+-----------------+
|house1|2018-01-17 08:00:00|0.7033683215379001 |1.5161472E9 |
|house1|2018-01-18 08:00:00|null |null |
|house1|2018-01-19 08:00:00|null |null |
|house1|2018-01-20 08:00:00|0.9330127018922193 |1.5164064E9 |
|house1|2018-01-21 08:00:00|null |null |
|house1|2018-01-22 08:00:00|0.9972609476841366 |1.5165792E9 |
|house1|2018-01-23 08:00:00|0.9972609476841368 |1.5166656E9 |
|house1|2018-01-24 08:00:00|null |null |
|house1|2018-01-25 08:00:00|null |null |
|house1|2018-01-26 08:00:00|null |null |
|house1|2018-01-27 08:00:00|null |null |
|house1|2018-01-28 08:00:00|null |null |
|house1|2018-01-29 08:00:00|0.6039558454088798 |1.517184E9 |
|house1|2018-01-30 08:00:00|0.5000000000000001 |1.5172704E9 |
|house1|2018-01-31 08:00:00|null |null |
|house1|2018-02-01 08:00:00|0.2966316784621001 |1.5174432E9 |
|house1|2018-02-02 08:00:00|0.2061073738537635 |1.5175296E9 |
|house1|2018-02-03 08:00:00|null |null |
|house1|2018-02-04 08:00:00|0.06698729810778081 |1.5177024E9 |
|house1|2018-02-05 08:00:00|0.024471741852423234|1.5177888E9 |
+------+-------------------+--------------------+-----------------+
only showing top 20 rows
8瓮具、當(dāng)前行與上一行值得差
參考:https://www.arundhaj.com/blog/calculate-difference-with-previous-row-in-pyspark.html
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql.window import Window
sc = SparkContext(appName="PrevRowDiffApp")
sqlc = SQLContext(sc)
rdd = sc.parallelize([(1, 65), (2, 66), (3, 65), (4, 68), (5, 71)])
df = sqlc.createDataFrame(rdd, ["id", "value"])
my_window = Window.partitionBy().orderBy("id")
df = df.withColumn("prev_value", F.lag(df.value).over(my_window))
df = df.withColumn("diff", F.when(F.isnull(df.value - df.prev_value), 0)
.otherwise(df.value - df.prev_value))
df.show()
+---+-----+----------+----+
| id|value|prev_value|diff|
+---+-----+----------+----+
| 1| 65| null| 0|
| 2| 66| 65| 1|
| 3| 65| 66| -1|
| 4| 68| 65| 3|
| 5| 71| 68| 3|
+---+-----+----------+----+
9荧飞、udf如何傳入多個參數(shù)來計算list column不同的quantile凡人?
def quantile_udf(q=0.5):
"""計算quantile
"""
def quantile_(arr):
return float(np.quantile(arr, q))
return F.udf(quantile_, DoubleType())
history_stat = history_stat.withColumn('quantile_025', quantile_udf(0.25)(F.col('date_diff_list')))\
.withColumn('quantile_05', quantile_udf(0.5)(F.col('date_diff_list')))\
.withColumn('quantile_075', quantile_udf(0.75)(F.col('date_diff_list')))
history_stat.show(2)
+---------------+-----------------+----------+----------+----------+------------------+-------------------+--------------------+------------+-----------+------------+
|cust_store_dkey|cust_product_dkey| first_day| last_day|sale_times|max_sales_interval|mean_sales_interval| date_diff_list|quantile_025|quantile_05|quantile_075|
+---------------+-----------------+----------+----------+----------+------------------+-------------------+--------------------+------------+-----------+------------+
| 560| 211|2017-12-10|2019-03-08| 180| 16| 2.5166666666666666|[0, 11, 4, 11, 1,...| 1.0| 1.0| 3.0|
| 560| 990|2016-12-30|2017-03-17| 20| 26| 3.85|[0, 1, 1, 1, 2, 3...| 1.0| 2.0| 4.0|
+---------------+-----------------+----------+----------+----------+------------------+-------------------+--------------------+------------+-----------+------------+
10、對特定條件的值進行替換
import numpy as np
df = spark.createDataFrame(
[(1, 1, None),
(1, 2, float(5)),
(1, 3, np.nan),
(1, 4, None),
(0, 5, float(10)),
(1, 6, float('nan')),
(0, 6, float('nan'))],
('session', "timestamp1", "id2"))
+-------+----------+----+
|session|timestamp1| id2|
+-------+----------+----+
| 1| 1|null|
| 1| 2| 5.0|
| 1| 3| NaN|
| 1| 4|null|
| 0| 5|10.0|
| 1| 6| NaN|
| 0| 6| NaN|
+-------+----------+----+
from pyspark.sql.functions import when
targetDf = df.withColumn("timestamp1", \
when(df["session"] == 0, 999).otherwise(df["timestamp1"]))
11叹阔、如何將sparse vector轉(zhuǎn)化為array挠轴?
vector_udf = udf(lambda vector: vector.toArray().tolist(), ArrayType(FloatType()))
df = df.withColumn('col1', vector_udf('col2'))
需要注意的是,udf中的tolist()
是必須的, 因為spark中沒有np.array
類型耳幢。類似的岸晦,當(dāng)我們返回一個np.dtype
類型數(shù)據(jù)的時候,也需要使用float
或int
對其進行轉(zhuǎn)換帅掘。
12委煤、如何將pyspark sparse vector轉(zhuǎn)化為 scipy sparse matrix以及pytorch sparse tensor?
參考:https://stackoverflow.com/questions/40557577/pyspark-sparse-vectors-to-scipy-sparse-matrix
將spark sparse vector轉(zhuǎn)換為scipy csr matrix如下:
import numpy as np
from scipy.sparse import vstack
import numpy as np
from scipy.sparse import csr_matrix
import torch
def as_matrix(vec):
data, indices = vec.values, vec.indices
shape = 1, vec.size
return csr_matrix((data, indices, np.array([0, vec.values.size])), shape)
# cv_cols表示spark中countvectorizer得到的稀疏矩陣
train_pd[csr_cols] = train_pd[cv_cols].applymap(lambda x: as_matrix(x))
# 上面的代碼將每一行的spark sparse vector轉(zhuǎn)換為了scipy csr matrix修档,
# 通過下面的代碼可以將每一列的所有行csr matrix進行合并碧绞,得到一個大的csr matrix
csr_col1 = vstack(train_pd['csr_col1'])
通過上面的代碼可以將sparse vector轉(zhuǎn)換為scipy sparse matrix,具體地——scipy csr matrix吱窝。
下面我們再將scipy csr matrix轉(zhuǎn)換為pytorch sparse tensor讥邻。
def sparse2tensor(tmpdf):
"""
tmpdf 表示一個scipy csr matrix,如上面得到的csr_col1院峡。
"""
tmpdf_coo = vstack(tmpdf).tocoo()
# 下面代碼中的torch.Size能保證轉(zhuǎn)換為sparse tensor后維度一致
sptensor = torch.sparse.FloatTensor(torch.LongTensor([tmpdf_coo.row.tolist(), tmpdf_coo.col.tolist()]),
torch.FloatTensor(tmpdf_coo.data), torch.Size(tmpdf_coo.shape))
return sptensor
spt = sparse2tensor(csr_cols1)
當(dāng)數(shù)據(jù)維度非常大且稀疏的時候兴使,使用sparse matrix/tensor能極大的減少內(nèi)存占用,是一個非常實用的方法照激。
12发魄、稀疏向量求和
def sum_vector(vector):
return float(vector.values.sum())
13、bigint 轉(zhuǎn)timestamp
df = df.withColumn('ttl2',
Func.from_unixtime(col("ttl").cast('float') / 1000.0 , 'yyyyMMdd'))
13俩垃、大量category的string2index實現(xiàn)
在進行l(wèi)abel encoder的時候励幼,一般使用stringIndex函數(shù)。但是有時候由于類別數(shù)太多口柳,達到幾千萬上億苹粟,如對推薦的MF中ID的編碼,此時仍然使用stringIndex容易內(nèi)存溢出跃闹,因為這幾千萬個字符要進行序列化并傳播需要較大的機器內(nèi)存嵌削。這時候我們就需要使用其他的方法來進行編碼了。
一種方法是借鑒window函數(shù)望艺,以id作為key進行分window并對每個window賦一個值:
# 原數(shù)據(jù)如下:
id | col |
1 | a |
2 | a |
3 | b |
4 | c |
5 | c |
from pyspark.sql import window as W
import pyspark.sql.functions as f
df.select('id', f.dense_rank().over(W.Window.orderBy('col')).alias('group')).show(truncate=False)
# 結(jié)果如下:
+---+-----+
|id |group|
+---+-----+
|1 |1 |
|2 |1 |
|3 |2 |
|4 |3 |
|5 |3 |
+---+-----+
此外苛秕,還有一種方法是借用window的row_number函數(shù),先給每一個唯一id編碼找默,然后進行join:
df2 = data.select("col").distinct()
.withColumn("group", row_number().over(Window.orderBy("col")))
result = data.join(df2, on="col", how="inner")
參考:https://stackoverflow.com/questions/50233518/create-a-group-id-over-a-window-in-spark-dataframe.