lag,lead平移
from pyspark.sql import SparkSession
from pyspark.sql.functions import lag, lead
import pyspark.sql.functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder.appName('my_first_app').getOrCreate()
df = spark.createDataFrame([(1,2,3), (4,2,6), (7,2,9)],
["a", "b", "c"])
df.show()
df.withColumn('aa', lag('a').over(
Window.partitionBy('b').orderBy('a')
)
).show()
scala加載xxx.scala文件
# 前面冒號不可省略
:load xxx.scala
join
# 列名相同時,明確父類
df1.join(df2, df1("a") == df2("a")).select(df1("f")).show(2)
# 設(shè)置表別名
df1.alias("df1").join(df2.alias("df2"), "a").select($"a", $"df1.f"+$"df2.f").show(5)
計(jì)算缺失值比例
test.agg(*[(1-(F.count(c) /F.count('*'))).alias(c+'_missing') for c in test.columns]).show()
自定義函數(shù)F.udf
def get_week(time):
return time.weekday()
toWeekUDF = F.udf(get_week, IntegerType()) # 參數(shù)1為自定義函數(shù), 參數(shù)2為返回類型
test.select(toWeekUDF(test.operTime)).show(5)
test.select(F.dayofweek('operTime')).show(5)
# 多參數(shù)傳遞
def map_dict(x, col):
return feature_dict_sc.value[col][x]
mapDictUDF = F.udf(map_dict, IntegerType())
test.select(mapDictUDF(test.siteId, F.lit('siteId')).alias('ss')).show(5)
def mapDict(col):
return F.udf(lambda x:feature_dict_sc.value[col][x])
test.withColumn('ss', mapDict('siteId')(test.siteId)).show(5)
F函數(shù)庫
agg
需要用整列處理函數(shù)對應(yīng)剥险,如min
,max
, select
需要用每行處理函數(shù)撼班,如hour
,dayofweek
最后編輯于 :
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者