本文首發(fā)于在Spark中實(shí)現(xiàn)Pandas melt函數(shù) | 磚瓦匠杜重
轉(zhuǎn)載需注明出處。
最近在做一個(gè)用Spark洗數(shù)據(jù)的工作,其中的一個(gè)步驟需要將寬數(shù)據(jù)轉(zhuǎn)換為長數(shù)據(jù)毒返,發(fā)現(xiàn)Spark里面并沒有原生的方法實(shí)現(xiàn)這樣的效果引有,后面發(fā)現(xiàn)可以利用explode
方法栖秕,間接實(shí)現(xiàn)這樣數(shù)據(jù)的轉(zhuǎn)換嫩与,本文介紹整個(gè)思路玫鸟。
[圖片上傳失敗...(image-2d063a-1629296267664)]
explode
方法可以將DataFrame一行中ArrayType或者StructType的集合數(shù)據(jù)下每一項(xiàng)导绷,提取出來單獨(dú)作為新DataFrame的一行中的一項(xiàng),實(shí)現(xiàn)從一個(gè)集合到多個(gè)個(gè)體的轉(zhuǎn)換屎飘,一行到多行的轉(zhuǎn)換妥曲。具體實(shí)現(xiàn)效果如下:
[圖片上傳失敗...(image-98c126-1629296267664)]
回到在Spark中實(shí)現(xiàn)melt方法,我們可以首先將需要melt的列合并ArrayType的集合钦购,集合中的每個(gè)元素以(variable_name, variable_value)的StructType呈現(xiàn)檐盟,再利用explode
方法進(jìn)行一到多的拓展,最后再將(variable_name, variable_value)分成兩列即可押桃,示意圖如下:
[圖片上傳失敗...(image-c7af-1629296267664)]
我將最后代碼整理如下葵萎,函數(shù)采用了和Pandas中類似的參數(shù)命名方式:
def sparkMelt(frame, id_vars=None, value_vars=None, var_name=None, value_name=None):
"""
Pandas melting functions implemented in Spark
Args:
frame (Spark DataFrame): Spark dataframe to work on.
id_vars (list, optional): Column(s) to use as identifier variables. Defaults to None.
value_vars (list, optional): Column(s) to unpivot. If not specified, uses all columns that are not set as id_vars. Defaults to None.
var_name (list, optional): Name to use for the ‘variable’ column. Defaults to None. If None, use 'variable'.
value_name (list, optional): Name to use for the ‘value’ column. Defaults to None. If None, use 'value'.
Returns:
[Spark DataFrame]: Unpivoted Spark DataFrame.
"""
id_vars = id_vars if not id_vars else frame.columns
value_vars = [col_name for col_name in frame.columns if col_name not in id_vars] \
if not value_vars else value_vars
# if value_vars is None, no columns need to be melted
if not value_vars:
return frame
var_name = 'variable' if not var_name else var_name
value_name = 'value' if not value_name else value_name
col_lst = ['height', 'weight']
for col_name in col_lst:
frame = frame.withColumn(col_name,
F.struct(F.lit(col_name).alias('var_name'), F.col(col_name).alias('var_value')))
frame = frame.withColumn('_zip', F.array(*col_lst)) \
.withColumn('_key_value', F.explode('_zip')) \
.withColumn(var_name, F.col('_key_value')['var_name']) \
.withColumn(value_name, F.col('_key_value')['var_value'])
df_col = [col_name for col_name in frame.columns if col_name not in (
*col_lst, '_zip', '_key_value')]
frame = frame.select(*df_col)
return frame