from pyspark import SparkConf, SparkContext,SQLContext
from pyspark.sql import Row
conf = SparkConf()
sc = SparkContext(conf=conf)
#設置錯誤級別
sc.setLogLevel("error")
sqlContext = SQLContext(sc)
def run(outfile):
origin_data = sc.textFile("filepath").map(lambda x: x.split("\t"))
first = origin_data.first()
# 過濾第一行
whole= origin_data.filter(lambda x: x != first)
course_order = whole.map(lambda x: (int(x[0]), int(x[1]), int(x[2]), int(x[3]))). \
filter(lambda x: int(x[3]) == 3). \
filter(lambda x: x[2] <= 100). \
filter(lambda x: x[0] != 0). \
map(lambda x: (int(x[0]), int(x[1]))). \
map(lambda x: Row(user_id=int(x[0]), num=int(x[1])))
out = sqlContext.createDataFrame(course_order).\
#分組
groupBy("user_id"). \
#聚合
agg({"num": "sum"}). \
#列重命名
withColumnRenamed("sum(num)", "num")
#csv file
out.repartition(1).write.format("csv").option("header", "false").mode("append").save(outfile)
#sql file
out.rdd.map(lambda x:sq % (x['user_id'] %10, x['num'], x['user_id'])).repartition(1).saveAsTextFile('sql.csv')
if __name__ == '__main__':
run("out")
spark常用操作
?著作權歸作者所有,轉載或內容合作請聯系作者
- 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來频敛,“玉大人项郊,你說我怎么就攤上這事≌遄” “怎么了着降?”我有些...
- 文/花漫 我一把揭開白布熊尉。 她就那樣靜靜地躺著罐柳,像睡著了一般。 火紅的嫁衣襯著肌膚如雪狰住。 梳的紋絲不亂的頭發(fā)上张吉,一...
- 文/蒼蘭香墨 我猛地睜開眼稿辙,長吁一口氣:“原來是場噩夢啊……” “哼昆码!你這毒婦竟也來了?” 一聲冷哼從身側響起邻储,我...
- 正文 年R本政府宣布,位于F島的核電站燎斩,受9級特大地震影響虱歪,放射性物質發(fā)生泄漏。R本人自食惡果不足惜栅表,卻給世界環(huán)境...
- 文/蒙蒙 一笋鄙、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧怪瓶,春花似錦萧落、人聲如沸。這莊子的主人今日做“春日...
- 文/蒼蘭香墨 我抬頭看了看天上的太陽陨倡。三九已至,卻和暖如春许布,著一層夾襖步出監(jiān)牢的瞬間兴革,已是汗流浹背。 一陣腳步聲響...
推薦閱讀更多精彩內容
- 聚合函數combineByKey 將RDD[k,v]轉化為RDD[k,c],利用該函數可以實現reduceByKe...
- RDD 操作二 常用的 Transformations 與 Actions 方法 原文地址: http://spa...
- DataSet 的函數 詳細API常見此鏈接 Action 操作 1述召、collect() ,返回值是一個數組朱转,返回...
- Transformations map,filterspark最長用的兩個Transformations:map积暖,...