from pyspark.sql import functions as F
from pyspark.sql.functions import lower, col # 小寫
from pyspark.sql.functions import upper, col? # 大寫
from pyspark.sql.functions import lit # 增加列
from pyspark.sql.functions import when # ifelse
from pyspark.sql.functions import split, explode, concat, concat_ws? # split(列數(shù)據(jù)的分割), explode(一行分成多行) concat,concat_ws(列數(shù)據(jù)合并)
from pyspark.sql.types import StringType # 導(dǎo)入數(shù)據(jù)類型
from pyspark.sql.functions import UserDefinedFunction # 定義函數(shù)
from pyspark.sql.functions import desc #降序排列
from pyspark.sql.functions import trim # 去空格
a.createOrReplaceTempView("a")
a = spark.sql("select * from a").cache()?# 生成pyspark的dataframe
a.show(10)?# 查看數(shù)據(jù)head
df = df.dropDuplicates()? ?/? df.select('A_field').distinct().count()??# 去重
a.count()??# 行數(shù)
a.columns?# 查看列名
a.dtypes?# 查看字段類型
a.printSchema()??# 查看數(shù)據(jù)結(jié)構(gòu)
a.withColumnRenamed("CUST_ID",'ShipToNumber').withColumnRenamed("SKU",'SKUNumber')??# 修改列名
a.select('col').describe().show()??# 選擇某列summariy
b1 = b.drop("col").show()??# 刪除某列
a.filter(a.col== 504943)??# 篩選滿足條件的行數(shù)
a.filter(col.UPDT_DT >= '2020-01-05') \
.filter(col.INSTIT_NM == 'Unknown').show()??# 多條件篩選(and 必須換行)
a1= a.filter(lower(a.current_pagename).like('products:%')?# 篩選以a開頭的記錄
方法一:# 時間戳轉(zhuǎn)換成日期格式
a= a.withColumn('UPDT_DT',F.to_date(a.UPDT_DT))
a= a.withColumn('CRT_DT',F.to_date(a.CRT_DT))
方法二:# 時間戳轉(zhuǎn)換成日期格式
a.select('UPDT_DT').withColumn("UPDT_DT_1",col("UPDT_DT").cast("date"))? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??
from pyspark.sql.functions import lower, col?# 字段轉(zhuǎn)換成小寫
WEB_USER = spark.table('WEB_USER').withColumn('CONTACT_ID_1', lower(col('WEB_USER.CONTACT_ID')))??# 小寫去空格
a.withColumn("USER_NM", upper(trim(col("USER_NM")))).show()? # 操作在dataframe上??
? # 去除開頭和結(jié)尾的空格
def single_space(col):
? ? return F.trim(F.regexp_replace(col, " +", " "))
? # 去除中間的空格
def remove_all_whitespace(col):
? ? return F.regexp_replace(col, "\\s+", "")
spark.table('a').withColumn('a1', lower(remove_all_whitespace(single_space(col("USER_NM"))))).show()? # 操作Table上
from pyspark.sql.types import StringType
from pyspark.sql.functions import UserDefinedFunction
to_none = UserDefinedFunction(lambda x: None, StringType())
a1= a.withColumn('new_column', to_none(a['login'])
a.sort('CONTACT_ID_1','USER_NM_1',ascending = False).show() #降序排列? 默認(rèn)為升序 (同升同降)
a.sort(WEB_USER_3.CONTACT_ID_1.desc(),WEB_USER_3.USER_NM_1.asc()).show() # 自定義升降
a.groupBy('CONTACT_ID_1').agg(f.count('CONTACT_ID_1').alias('count')).sort(desc('count')).show()?#分組
a.groupBy("login").count().sort(desc("count")).show()? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
df.groupBy('level').agg(sf.concat_ws(',', sf.collect_list(df.name))).show()
from pyspark.sql.functions import when?# ifelse
df = df.withColumn("profile", when(df.age >= 40,"Senior") .otherwise("Executive"))?# ifelse
frame3_1 = WEB_USER_3.withColumn("name_length", f.length(WEB_USER_3.USER_NM_1))?# 新生成一列 (查看每個字段的字符長度)
ST_SKU_1.withColumn('Input',F.lit('Viewed')).show()? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
from pyspark.sql.functions import lit
new_df = df1.withColumn('newCol', lit(0)).show() # 新列為0
new_df = fy_cx_sessions_2.withColumn('new_column_1', lit(None).cast(StringType()))? #新列為NULL
df = df1.join(df2, ta.name == tb.name, how='inner'/'outer'/'left'/'right')??# 表連接
df.show()
from pyspark.sql.functions import split, explode, concat, concat_ws?# 列數(shù)據(jù)的分割
df_split = df.withColumn("s", split(df['score'], " ")) #切分字段score恋拍,生成為s
df_split.show()? ? ? ? ? ? ? ? ? ? ? ? ? ?
ST_SKU_2.withColumn('STSKU',concat(ST_SKU_2['ShipToNumber'],ST_SKU_2['SKUNumber'])) #列數(shù)據(jù)合并 (沒有分隔符)?
a.withColumn('STSKU',concat_ws("",a['ShipToNumber'],a['SKUNumber']))?#列數(shù)據(jù)合并 (指定分隔符)? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
from pyspark.sql.functions import split, explode, concat, concat_ws?# 把數(shù)據(jù)拉豎(melt)
a1= a.withColumn("SKU", explode(split(a['prod_list'], ",")))?# 把數(shù)據(jù)拉豎(R:melt)
from pyspark.sql.functions import pandas_udf,pandasUDFType
@pandas_udf("user string,PL string,Order_Number integer",pandasUDFType.GROUPED_MAP)
def data_partiotion(df):
? V=df.select('Order_Number')
? return spark.createDataFrame()
df.withColumn("datetime", col("datetime").cast("timestamp"))
? ? .groupBy("userId", "memberId")
? ? .agg(max_("datetime"))? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
#注意事項(xiàng)
1 filter (命名)? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
test = a.groupBy('USER_NM').agg(F.count('USER_NM').alias('count')).sort(desc('count'))
test.filter(test.count > 1).show()? 會報(bào)錯:'>' not supported between instances of 'method' and 'int'
修改成:test.filter(test['count'] > 1).show()
報(bào)錯原因:'count'為默認(rèn)方法再菊,名字沖突? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?