使用spark的dataframe進(jìn)行計(jì)算時(shí)有時(shí)需要添加新的列。本文介紹兩種添加新列的方法曼月,比較常見的一種方法是調(diào)用dataframe的withColumn方法,但是該方法存在一定的限制,即新添加的列只能根據(jù)現(xiàn)有列轉(zhuǎn)換得到枫匾;另一種方法是利用UDF(user defined function)模塊。下面結(jié)合例子進(jìn)行說明拟淮,現(xiàn)有預(yù)測得到的pm2.5數(shù)據(jù)干茉,需要添加其他污染項(xiàng)目的預(yù)測數(shù)據(jù)及預(yù)測時(shí)間。
1很泊、withColumn
dataframe的withColumn方法可以用于添加新的列角虫,但是新的列僅能根據(jù)現(xiàn)有列計(jì)算得到。
yHat = yHat.withColumn("pm25", yHat["pm25"]*(maxValue - minValue) + minValue)
yHat = yHat.withColumn("pm10", yHat["pm25"] + 10)
yHat = yHat.withColumn("CO", yHat["pm25"] + 20)
yHat = yHat.withColumn("NO2", yHat["pm25"] + 30)
yHat = yHat.withColumn("NO", yHat["pm25"] + 40)
yHat = yHat.withColumn("SO2", yHat["pm25"] + 50)
2委造、udf
除了withColumn方法戳鹅,還可以利用spark的udf模塊添加新的列。在本例中昏兆,還需要添加相應(yīng)的時(shí)間列枫虏,此時(shí)withColumn方法并不適用,需要導(dǎo)入udf方法爬虱,該方法有兩個(gè)參數(shù)隶债,分別為自定義的函數(shù)名及返回值類型。
global idx
idx = 0
date = gettime()
def set_date(x):
global idx # 將idx設(shè)置為全局變量
if x is not None:
idx += 1
return date[idx - 1]
index = udf(set_date, StringType())
yHat = yHat.withColumn("date", index(yHat["pm25"]))