Spark 學習筆記

dataframe

create

創(chuàng)建dataframe

?? val training = ss.createDataFrame(Seq(

? ?? (1.0, Vectors.dense(0.0, 1.1, 0.1)),

? ?? (0.0, Vectors.dense(2.0, 1.0, -1.0)),

? ?? (0.0, Vectors.dense(2.0, 1.3, 1.0)),

? ?? (1.0, Vectors.dense(0.0, 1.2, -0.5))

?? )).toDF("label", "features")

read

讀csv,json格式存儲的文件

sc = spark.sparkContext

df = spark.read.json("examples/src/main/resources/people.json")

df = spark.read.option("delimiter", "\t").option("header", "true").csv(path) [讀取以\t分隔的文件到dateframe]

df.write.option("header",true)

? .csv("/tmp/spark_output/datacsv")

people = spark.read.parquet("...")

column get

列相關操作

features get: ageCol = people.age

添加現有列:

df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))

df_with_x7 = df_with_x6.withColumn("x7", Rand())

func.col("id")

row: row = Row(name="Alice", age=11)

'wrong_key' in Person

define function [自定義函數]

create function:

from pyspark.sql.functions import udf

@udf

def to_upper(s):

?? if s is not None:

? ? ?? return s.upper()??

df_new2 = df_new.withColumn("topnum", get_front3_value("Country"))

傳入固定值:

使用lit

from pyspark.sql.functions import lit

show the content of Data Frame [展示Data Frame的內容]

df.show()

自定義函數并取列

df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age"))

去重

dataFrame.dropDuplicates("id","label")

dataFrame.distinct()

vector

向量操作

dense vector: a = Vectors.dense([1.0, 2.0])

sparse vector: SparseVector(2, [0, 1], [2., 1.])

vector: toArray()

mLlib

import

from pyspark.ml.clustering import Kmeans

initial parameter 初始化模型超參數

kmeans = KMeans(k=100, maxIter=200)

train model 模型訓練

model0 = kmeans.fit(input0.limit(200000))

kmeans.clusterCenters()

data split

訓練數據和測試數據切分

(trainingData, testData) = data.randomSplit([0.7, 0.3])

Feature scaled

特征級別化(可分桶先嬉,標準化)

max min scaler:

featureScaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures").fit(data) [max min scale]

scalerModel = scaler.fit(dealfeature)

bucket scaler:

from pyspark.ml.feature import Bucketizer

splits = [float("-inf"), 0,100, 200, 300, 400, 500, 600, 700,

? ? ? ? ?? 800, 900, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, float("inf")]

dealinput = dealinput.withColumn("ctr", dealinput["ctr"].cast("float")).withColumn("ctr", dealinput["ctrv3"].cast("float"))

bucketizer = Bucketizer(splits=splits, inputCol="ctr", outputCol="bucketedctr")

dealout = bucketizer.transform(dealinput)

Data Type view and Data Type transformer

數據類型查看: df.dtypes

數據類型轉換: dealoutput = dealinput.select(dealinput["Clicks"].cast("float")

train test split

(deal_train, deal_test) = dealfeature.randomSplit([0.9,0.1])

data filter

數據過濾[條件篩選]

outanaly = predres.filter(predres.prediction ==1).select("InClickBinary", "prediction")

metrics

指標評估

from pyspark.mllib.evaluation import MulticlassMetrics

predictionAndLabels = predres.rdd.map(lambda x:(float(x.prediction), float(x.InClickBinary)))

metrics = MulticlassMetrics(predictionAndLabels)

print("Area under accauracy = %s" % metrics.precision(1.0))

print("Area under recall = %s" % metrics.recall(1.0))

column assemble to vector

每列特征集成到一個dense vector

#get the vector feature

assembler = VectorAssembler(

? ?? inputCols = ["Clicks", "BinaryFeature","isMarket", "ctrv2", "ctrv3", "Feature1", "Feature2", "Feature3", "Feature4", "Feature5", "Feature6", "Feature7", "Feature8"],

? ?? outputCol = "features")

dealfeature = assembler.transform(dealoutput)

(deal_train, deal_test) = dealfeature.randomSplit([0.9,0.1])

column to sparse vector

N列特征到sparse vector

featureids = [i for i in range(len(featurekeys))]

featuredict = dict(zip(featurekeys, featureids))

speckeys = ["ctrv2", "ctrv3", "isMarket", "BinaryFeature"]

from pyspark.sql.functions import struct

def get_sparse_vector(x):

?? valuedict = {}

?? for index, item in enumerate(speckeys):

? ? ?? valuedict[featuredict[item]] = x[index]

?? for index in range(8):

? ? ?? nindex = 4 + index

? ? ?? valuedict[featuredict["prediction%s_%s"%(index, x[nindex])]] = 1.0

?? return Vectors.sparse(len(featuredict), valuedict)

sparsecreate = udf(get_sparse_vector, VectorUDT())

dealfeature = dealout.withColumn("features", sparsecreate(struct("A", "B", "C", "D", "E", "F", "G", "H","I", "J", "K", "L", "M")))

model train

模型訓練

以LogisticRegression為例:

lr = LogisticRegression(featuresCol="features", labelCol="BinaryFeature", maxIter = 200)

lrmodel = lr.fit(deal_train)

predreslr = lrmodel.transform(deal_test)

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末轧苫,一起剝皮案震驚了整個濱河市,隨后出現的幾起案子,更是在濱河造成了極大的恐慌含懊,老刑警劉巖身冬,帶你破解...
    沈念sama閱讀 219,427評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現場離奇詭異岔乔,居然都是意外死亡酥筝,警方通過查閱死者的電腦和手機,發(fā)現死者居然都...
    沈念sama閱讀 93,551評論 3 395
  • 文/潘曉璐 我一進店門雏门,熙熙樓的掌柜王于貴愁眉苦臉地迎上來嘿歌,“玉大人,你說我怎么就攤上這事茁影≈娴郏” “怎么了?”我有些...
    開封第一講書人閱讀 165,747評論 0 356
  • 文/不壞的土叔 我叫張陵募闲,是天一觀的道長步脓。 經常有香客問我,道長蝇更,這世上最難降的妖魔是什么沪编? 我笑而不...
    開封第一講書人閱讀 58,939評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮年扩,結果婚禮上蚁廓,老公的妹妹穿的比我還像新娘。我一直安慰自己厨幻,他們只是感情好相嵌,可當我...
    茶點故事閱讀 67,955評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著况脆,像睡著了一般饭宾。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上格了,一...
    開封第一講書人閱讀 51,737評論 1 305
  • 那天看铆,我揣著相機與錄音,去河邊找鬼盛末。 笑死弹惦,一個胖子當著我的面吹牛,可吹牛的內容都是我干的悄但。 我是一名探鬼主播棠隐,決...
    沈念sama閱讀 40,448評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼檐嚣!你這毒婦竟也來了助泽?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,352評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎嗡贺,沒想到半個月后隐解,有當地人在樹林里發(fā)現了一具尸體,經...
    沈念sama閱讀 45,834評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡诫睬,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,992評論 3 338
  • 正文 我和宋清朗相戀三年厢漩,在試婚紗的時候發(fā)現自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片岩臣。...
    茶點故事閱讀 40,133評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖宵膨,靈堂內的尸體忽然破棺而出架谎,到底是詐尸還是另有隱情,我是刑警寧澤辟躏,帶...
    沈念sama閱讀 35,815評論 5 346
  • 正文 年R本政府宣布谷扣,位于F島的核電站,受9級特大地震影響捎琐,放射性物質發(fā)生泄漏会涎。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,477評論 3 331
  • 文/蒙蒙 一瑞凑、第九天 我趴在偏房一處隱蔽的房頂上張望末秃。 院中可真熱鬧,春花似錦籽御、人聲如沸练慕。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,022評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽铃将。三九已至,卻和暖如春哑梳,著一層夾襖步出監(jiān)牢的瞬間劲阎,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,147評論 1 272
  • 我被黑心中介騙來泰國打工鸠真, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留悯仙,地道東北人。 一個月前我還...
    沈念sama閱讀 48,398評論 3 373
  • 正文 我出身青樓弧哎,卻偏偏與公主長得像雁比,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子撤嫩,可洞房花燭夜當晚...
    茶點故事閱讀 45,077評論 2 355

推薦閱讀更多精彩內容