pyspark與機器學習

借助于spark的分布式特性耕捞,機器學習與spark的結(jié)合可以解決數(shù)據(jù)規(guī)模大、復雜運算時間久的問題烫幕。
spark提供MLlib組件用于滿足機器學習的需求俺抽。
本文將從機器學習數(shù)據(jù)讀取、數(shù)據(jù)操作较曼、特征處理磷斧、模型訓練、結(jié)果評估、模型保存六個方面展開弛饭。

一冕末、基礎(chǔ)操作

1、sparksession

(1) 創(chuàng)建SparkSession

SparkSession是Spark 2.0引如的新概念侣颂。SparkSession為用戶提供了統(tǒng)一的切入點档桃,來讓用戶學習spark的各項功能[1]
任何Spark程序的第一步都是先創(chuàng)建SparkSession憔晒。

From pyspark.sql import SparkSession
spark=SparkSession.builder.appName('data_processing').getOrCreate()
(2) SparkSession與SparkContext[1]

在spark的早期版本中藻肄,SparkContext是spark的主要切入點,由于RDD是主要的API拒担,我們通過sparkcontext來創(chuàng)建和操作RDD嘹屯。對于每個其他的API,我們需要使用不同的context(例如對于Streming从撼,我們需要使用StreamingContext抚垄;對于sql,使用sqlContext谋逻;對于Hive呆馁,使用hiveContext)。
但是隨著DataSet和DataFrame的API逐漸成為標準的API毁兆,就需要為他們建立接入點浙滤。所以在spark2.0中,引入SparkSession作為DataSet和DataFrame API的切入點气堕,SparkSession封裝了SparkConf纺腊、SparkContext和SQLContext

至于圖中的RDD(Resilient Distributed Dataset,彈性分布式數(shù)據(jù)集)是Spark中最基本的數(shù)據(jù)抽象茎芭,它代表一個不可變揖膜、可分區(qū)、里面的元素可并行計算的集合梅桩。

這些概念在編碼過程中偶爾會出現(xiàn)壹粟,對于spark分布式運行的詳細架構(gòu)本文就不深入了,只需對SparkSession宿百、SparkContext有個大致的了解趁仙。

2、數(shù)據(jù)加載

(1) 本地數(shù)據(jù)讀取

使用spark方式讀取本地csv垦页。

df = spark.read.csv('XXX.csv',inferSchema=True,header=True)

使用pandas方式讀取本地csv雀费,轉(zhuǎn)換pandas dataframe為spark dataframe。

import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext
pandas_df = pd.read_csv('/home/logsaas/pyspark/lalafile/movie_ratings_df.csv')
sc = SparkContext()
sqlContest = SQLContext(sc)
df = sqlContest.createDataFrame(pandas_df)
(2) 數(shù)據(jù)庫讀取
# hive數(shù)據(jù)庫讀取
spark.sql('select * from XX')
(3) Spark_DataFrame與Pandas_DataFrame區(qū)別[2]

Spark中DataFrame與Pandas中DataFrame的主要區(qū)別是工作方式的不同痊焊。

區(qū)別 pandas spark
工作方式 單機模式盏袄,沒有并行機制忿峻,不支持Hadoop,處理大量數(shù)據(jù)有瓶頸 分布式并行計算框架辕羽,所有的數(shù)據(jù)和操作自動并行分布在各個集群結(jié)點上炭菌,支持Hadoop,能處理大量數(shù)據(jù)
DataFrame可變性 可變 Spark中RDDs是不可變的逛漫,因此DataFrame也是不可變的
相互轉(zhuǎn)換 從spark_df轉(zhuǎn)換:pandas_df = spark_df.toPandas() 從pandas_df轉(zhuǎn)換:spark_df = SQLContext.createDataFrame(pandas_df)

3、數(shù)據(jù)操作

由于spark dataframe和python dataframe的區(qū)別赘艳,隨之而來的操作差別也比較大酌毡。

操作 pandas spark
取列信息 df[“name”] df.select(“name”)
取滿足條件的信息 df[df[‘a(chǎn)ge’]>21] df.filter(df[‘a(chǎn)ge’]>21)
groupby df.groupby(“A”).avg(“B”) from pyspark.sql import functions df.groupBy(“A”).agg(functions.avg(“B”), functions.min(“B”), functions.max(“B”)).show()
data合并 append/concat等 df.join()
去重統(tǒng)計 len(set(df['title'])) df.select('title').distinct().count()
新增一列 df['newcol']=lists df.withColumn('newcol',lists)
其他 ··· ···

常見的spark dataframe操作如下:

# **基礎(chǔ)描述**
spark_df.count()  # 行數(shù)
spark_df.columns  # 列名稱
spark_df.printSchema()  # 結(jié)構(gòu)及屬性顯示
spark_df.show(5,False)  # truncate=False表示不壓縮顯示
spark_df.describe().show()  # 均值/最值等描述

# **dataframe操作**
# 取'age','mobile'兩列
spark_df.select('age','mobile').show(5) 
# 新增一列:age_after_10_yrs
spark_df.withColumn("age_after_10_yrs",(spark_df["age"]+10)).show(10,False)
# 新建一列age_double,將age轉(zhuǎn)換為double屬性
spark_df.withColumn('age_double',spark_df['age'].cast(DoubleType())).show(10,False)
# 篩選mobile==Vivo的信息
spark_df.filter(spark_df['mobile']=='Vivo').show()
spark_df.filter(spark_df['mobile']=='Vivo').select('age','ratings','mobile').show()
spark_df.filter(spark_df['mobile']=='Vivo').filter(spark_df['experience'] >10).show()
# 去重統(tǒng)計
spark_df.select('mobile').distinct().show() 
# 行去重
spark_df=spark_df.dropDuplicates()
# 刪除列
df_new=spark_df.drop('mobile')
# groupby操作
spark_df.groupBy('mobile').count().show(5,False)
spark_df.groupBy('mobile').count().orderBy('count',ascending=False).show(5,False)
spark_df.groupBy('mobile').agg({'experience':'sum'}).show(5,False)  # 根據(jù)mobile分區(qū)蕾管,計算experience的sum
# udf 自建sql函數(shù)
from pyspark.sql.functions import udf
def price_range(brand):
    if brand in ['Samsung','Apple']:
        return 'High Price'
    elif brand =='MI':
        return 'Mid Price'
    else:
        return 'Low Price'
brand_udf=udf(price_range,StringType())  # create udf using python function # 輸出為string格式
# 新建一列price_range
spark_df.withColumn('price_range',brand_udf(spark_df['mobile'])).show(10,False)
# 使用lamba創(chuàng)建udf
age_udf = udf(lambda age: "young" if age <= 30 else "senior", StringType())  # using lambda function
# 新建一列age_group
spark_df.withColumn("age_group", age_udf(spark_df.age)).show(10,False)

在spark中很少會用for循環(huán)去處理一個個特征枷踏,一般使用函數(shù)/自建UDF,批量處理掉了掰曾。

比如計算Review列每個數(shù)據(jù)的長度旭蠕。
python模式
review_length = []
for info in text_df['Review']:
···· review_length.apend(length(info))
text_df['length'] = review_length
pyspark模式
from pyspark.sql.functions import length text_df=text_df.withColumn('length',length(text_df['Review']))

3、特征處理

以下為pyspark.ml.feature提供的特征處理功能旷坦,滿足了大部分機器學習的特征處理需求掏熬。

標準化與歸一化

函數(shù) 備注
NGram 正則標準化,不需要fit 直接transform
Normalizer 歸一化函數(shù)秒梅,使它的范數(shù)或者數(shù)值范圍在一定的范圍內(nèi)
MaxAbsScaler 歸一化函數(shù)旗芬,將列標準化到[0,1]之間,每一個值都除以本列的絕對值最大的數(shù)捆蜀,先fit然后 transform()
MinMaxScaler 最大最小歸一化疮丛,先fit然后 transform()
StandardScaler 對列進行標準化,先fit再transform

分箱處理

函數(shù) 備注
Binarizer 將數(shù)值型特征的二值化辆它。將數(shù)據(jù)框中的某一列按照閾值劃分為只包含0誊薄,1的列
Bucketizer 將連續(xù)特征按照splits值進行分箱
QuantileDiscretizer 將連續(xù)列進行分箱操作,numBuckets 表示分箱數(shù)目

文本特征處理

函數(shù) 備注
CountVectorizer 只考慮詞匯在文本中出現(xiàn)的頻率
HashingTF 自然語言處理的場景中锰茉,hashingTF使用的比較多
IDF TF-IDF 作為特征提取
StringIndexer 針對單個類別型特征進行轉(zhuǎn)換呢蔫。把字符串的列按照出現(xiàn)頻率進行排序,出現(xiàn)次數(shù)最高的對應的Index為0飒筑,依次下排
IndexToString 索引轉(zhuǎn)字符串
Tokenizer 將字符串列轉(zhuǎn)換成小寫并按空格切分
RegexTokenizer 基于正則的方式進行文檔切分成單詞組
Word2Vec 將words轉(zhuǎn)換成一個vectorSize維的向量
OneHotEncoderEstimator 獨熱編碼相關(guān)
RFormula 文本類特征處理咐刨,先 StringIndexer 再 OneHotEncoderEstimator

特征操作

函數(shù) 備注
Imputer 缺失值填補,默認使用均值或中值(“median”)填補,要計算均值所以要先 fit(),然后再transfrom()
StopWordsRemover 英文停用詞移除
SQLTransformer 使用SQL語句創(chuàng)建新的列扬霜,直接transform()
BucketedRandomProjectionLSH 基于歐幾里德距離的空間度量
MinHashLSH 基于Jaccard距離的空間度量
ElementwiseProduct 計算inputCol與scaling內(nèi)積定鸟,不需要訓練,直接transform
PolynomialExpansion 特征變換著瓶,將特征拓展比如[x,y],如果degree=2,則拓展成[x,xy,y,xx,y*y]联予,所以直接transform即可
OneHotEncoder 獨熱編碼,特征用一個二進制數(shù)字來表示。例如[0, 0, 3]處理后為[ 1., 0., 0., 1., 0., 0., 0., 0., 1.]
ChiSqSelector 依據(jù)卡方檢驗沸久,計算類別特征與分類標簽的關(guān)聯(lián)性季眷。該函數(shù)只有先訓練才能知道挑選哪些特征值,所以要先fit,應用的時候再transform

vector 組合

函數(shù) 備注
VectorIndexer 數(shù)據(jù)集中的類別特征轉(zhuǎn)換卷胯∽庸危可以自動識別哪些特征是類別型的,并且將原始值轉(zhuǎn)換為類別索引
VectorSizeHint 允許用戶顯式指定列的向量大小
VectorAssembler 用于將多個列合并為一個向量列窑睁,直接transform即可挺峡,經(jīng)常用的
VectorSlicer 通過對這些索引的值進行篩選得到新的向量集

壓縮降維

函數(shù) 備注
PCA 對特征進行PCA降維,先fit然后 transform()
DCT 離散余弦變換(Discrete Cosine Transform)担钮,用于將數(shù)據(jù)或圖像的壓縮
FeatureHasher 特征哈希橱赠,相當于一種降維技巧
# 正則標準化,不需要fit 直接transform
ngram = NGram(n=2, inputCol="inputTokens", outputCol="nGrams")
ngram.transform(df)

# 歸一化函數(shù)箫津,使它的范數(shù)或者數(shù)值范圍在一定的范圍內(nèi)
normalizer = Normalizer(p=2.0, inputCol="dense", outputCol="features")
normalizer.transform(df).head().features

# 歸一化函數(shù)狭姨,將列標準化到[0,1]之間
maScaler = MaxAbsScaler(inputCol="a", outputCol="scaled")
model = maScaler.fit(df)
model.transform(df).show()

# 最大最小歸一化,先fit然后 transform()
mmScaler = MinMaxScaler(inputCol="a", outputCol="scaled")
model = mmScaler.fit(df)
model.transform(df)

# 對列進行標準化苏遥,先fit再transform
standardScaler = StandardScaler(inputCol="a", outputCol="scaled")
model = standardScaler.fit(df)
model.transform(df)

# 按閾值1劃分values饼拍,結(jié)果輸出到features,結(jié)果為0/1
binarizer = Binarizer(threshold=1.0, inputCol="values", outputCol="features")
new_df = binarizer.transform(df)

# 將連續(xù)特征按照splits值進行分箱
bucketizer = Bucketizer(splits=[-float("inf"), 0.5, 1.4, float("inf")],inputCol="values", outputCol="buckets")
bucketed = bucketizer.setHandleInvalid("keep").transform(df)

# 將連續(xù)列進行分箱操作田炭,numBuckets 表示分箱數(shù)目
qds = QuantileDiscretizer(numBuckets=2,inputCol="values", outputCol="buckets", relativeError=0.01, handleInvalid="error")
bucketizer = qds.fit(df)
qds.setHandleInvalid("keep").fit(df).transform(df)

# 只考慮詞匯在文本中出現(xiàn)的頻率
cv = CountVectorizer(inputCol="raw", outputCol="vectors")
model = cv.fit(df)  # model.vocabulary
new_df = model.transform(df)

# 詞頻統(tǒng)計
hashingTF = HashingTF(numFeatures=10, inputCol="words", outputCol="features")
hashingTF.transform(df)

# 文本tf-idf計算
idf = IDF(minDocFreq=3, inputCol="tf", outputCol="idf")
model = idf.fit(df)  # model.idf
model.transform(df)

# 針對單個類別型特征進行轉(zhuǎn)換惕耕,把字符串的列按照出現(xiàn)頻率進行排序
stringIndexer = StringIndexer(inputCol="label",outputCol="indexed", handleInvalid="error",stringOrderType="frequencyDesc")
model = stringIndexer.fit(stringIndDf)
td = model.transform(stringIndDf)

# 將字符串列轉(zhuǎn)換成小寫并按空格切分
tokenizer = Tokenizer(inputCol="text", outputCol="words")
tokenizer.transform(df)

# 基于正則的方式進行文檔切分成單詞組
reTokenizer = RegexTokenizer(inputCol="text", outputCol="words")
reTokenizer.transform(df)

# Word2Vec
word2Vec = Word2Vec(vectorSize=5, seed=42,inputCol="sentence", outputCol="model")
model = word2Vec.fit(doc)
model.transform(doc)

# OneHotEncoderEstimator
ohe = OneHotEncoderEstimator(inputCols=["input"], outputCols=["output"])
model = ohe.fit(df)
model.transform(df)

# RFormula 文本類特征處理
rf = RFormula(formula="y ~ x + s")
model = rf.fit(df)
model.transform(df)

# 缺失值填補,要計算均值所以要先 fit(),然后再transfrom()
imputer = Imputer(inputCols=["a", "b"], outputCols=["out_a", "out_b"])
model = imputer.fit(df)
model.transform(df).show()

# 移除停頓詞
remover = StopWordsRemover(inputCol="text", outputCol="words", stopWords=["b"])
remover.transform(df)

# 使用SQL語句創(chuàng)建新的列
sqlTrans = SQLTransformer(statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlTrans.transform(df)

# 歐幾里德距離的空間度量
brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes",seed=12345, bucketLength=1.0)
model = brp.fit(df)
new_df = model.transform(df)

# 基于Jaccard距離的空間度量
mh = MinHashLSH(inputCol="features", outputCol="hashes", seed=12345)
model = mh.fit(df)
model.transform(df)

# 計算inputCol與scaling內(nèi)積诫肠,不需要訓練司澎,直接transform
ep = ElementwiseProduct(scalingVec=Vectors.dense([1.0, 2.0, 3.0]), inputCol="values", outputCol="eprod")
new_df = ep.transform(df)

# PolynomialExpansion特征變換
px = PolynomialExpansion(degree=2, inputCol="dense", outputCol="expanded")
px.transform(df)

# 獨熱編碼
encoder = OneHotEncoder(inputCol="indexed", outputCol="features")
encoder.transform(td)

# 依據(jù)卡方檢驗特征處理
selector = ChiSqSelector(numTopFeatures=1,outputCol="selectedFeatures")
model = selector.fit(df)
new_df = model.transform(df)

# 數(shù)據(jù)集中的類別特征轉(zhuǎn)換
indexer = VectorIndexer(maxCategories=2, inputCol="a", outputCol="indexed")
model = indexer.fit(df)
model.transform(df)

# 允許用戶顯式指定列的向量大小
sizeHint = VectorSizeHint(inputCol="vector", size=3, handleInvalid="skip")  # 先指定大小
vecAssembler = VectorAssembler(inputCols=["vector", "float"], outputCol="assembled")  # 整合成一列
pipeline = Pipeline(stages=[sizeHint, vecAssembler])
pipelineModel = pipeline.fit(df)  # 功能模塊合并
pipelineModel.transform(df)

# 用于將多個列合并為一個向量列,直接transform即可栋豫,經(jīng)常用的
vecAssembler = VectorAssembler(inputCols=["a", "b", "c"], outputCol="features")
vecAssembler.transform(df)

# 通過對這些索引的值進行篩選得到新的向量集
vs = VectorSlicer(inputCol="features", outputCol="sliced", indices=[1, 4])
vs.transform(df).head().sliced

# PCA
pca = PCA(k=2, inputCol="features", outputCol="pca_features")
model = pca.fit(df)
model.transform(df)

# 離散余弦變換壓縮數(shù)據(jù)
dct = DCT(inverse=False, inputCol="vec", outputCol="resultVec")
df2 = dct.transform(df1)

# 特征哈希挤安,相當于一種降維技巧
hasher = FeatureHasher(inputCols=cols, outputCol="features")
hasher.transform(df).head().features

4、模型訓練

分類模塊

model 備注
LinearSVC 線性分類支持向量機
LogisticRegression 邏輯回歸
DecisionTreeClassifier 決策樹分類
GBTClassifier GBDT梯度提升決策樹
RandomForestClassifier 隨機森林
NaiveBayes 樸素貝葉斯
MultilayerPerceptronClassifier 多層感知機分類器
OneVsRest 將多分類問題簡化為二分類問題
# 線性分類支持向量機 
svm = LinearSVC(maxIter=5, regParam=0.01)  # maxIter最大迭代次數(shù)5次丧鸯,regParam正則化參數(shù)
model = svm.fit(df)
# 邏輯回歸
blor = LogisticRegression(regParam=0.01, weightCol="weight") 
blorModel = blor.fit(bdf)
# 決策樹分類
dt = DecisionTreeClassifier(maxDepth=2, labelCol="indexed")  # 限定決策樹的最大可能深度為2
model = dt.fit(td)
# GBDT梯度提升決策樹
gbt = GBTClassifier(maxIter=5, maxDepth=2, labelCol="indexed", seed=42)
model = gbt.fit(td)
# 隨機森林
rf = RandomForestClassifier(numTrees=3, maxDepth=2, labelCol="indexed", seed=42)
model = rf.fit(td)
# 樸素貝葉斯
nb = NaiveBayes(smoothing=1.0, modelType="multinomial", weightCol="weight")
model = nb.fit(df)
# 多層感知機
mlp = MultilayerPerceptronClassifier(maxIter=100, layers=[2, 2, 2], blockSize=1, seed=123)
model = mlp.fit(df)
# OneVsRest
lr = LogisticRegression(regParam=0.01)
ovr = OneVsRest(classifier=lr)
model = ovr.fit(df)

聚類模塊

model 備注
BisectingKMeans 二分類KMeans
KMeans k均值聚類算法
GaussianMixture 高斯混合模型
LDA LDA主題聚類
PowerIterationClustering 冪迭代聚類
# 二分類KMeans
bkm = BisectingKMeans(k=2, minDivisibleClusterSize=1.0)
model = bkm.fit(df)
transformed = model.transform(df)
# k均值聚類算法
kmeans = KMeans(k=2, seed=1)
model = kmeans.fit(df)
transformed = model.transform(df)
# 高斯混合模型
gm = GaussianMixture(k=3, tol=0.0001,maxIter=10, seed=10)
model = gm.fit(df)
transformed = model.transform(df)
# LDA主題聚類
lda = LDA(k=2, seed=1, optimizer="em")
model = lda.fit(df)
# 冪迭代聚類
pic = PowerIterationClustering(k=2, maxIter=40,weightCol="weight")
assignments = pic.assignClusters(df)

回歸模塊

model 備注
AFTSurvivalRegression 生存分析的對數(shù)線性模型
DecisionTreeRegressor 決策樹回歸模型
GBTRegressor 全稱梯度下降樹回歸模型
IsotonicRegression 保序回歸
# 生存分析的對數(shù)線性模型
aftsr = AFTSurvivalRegression()
model = aftsr.fit(df)
# 決策樹回歸模型
dt = DecisionTreeRegressor(maxDepth=2, varianceCol="variance")
model = dt.fit(df)
# 全稱梯度下降樹回歸模型
gbt = GBTRegressor(maxIter=5, maxDepth=2, seed=42)
model = gbt.fit(df)
# 保序回歸
ir = IsotonicRegression()
model = ir.fit(df)

推薦模塊

model 備注
ALS (Alternatingleastsquares蛤铜。交替最小二乘法
# 推薦系統(tǒng)
als = ALS(rank=10, maxIter=5, seed=0)
model = als.fit(df)

5、結(jié)果評估

model 備注
BinaryClassificationEvaluator 二分類評估
RegressionEvaluator 回歸評估
MulticlassClassificationEvaluator 多分類評估
ClusteringEvaluator 聚類評估
evaluator = BinaryClassificationEvaluator(rawPredictionCol="raw")
evaluator.evaluate(dataset)

evaluator = RegressionEvaluator(predictionCol="raw")
evaluator.evaluate(dataset)

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(dataset)

evaluator = ClusteringEvaluator(predictionCol="prediction")
evaluator.evaluate(dataset)

6丛肢、模型保存

直接使用save保存模型围肥,使用load加載訓練結(jié)果。

# rf_classifier為RandomForestClassificationModel訓練完的模型
rf_classifier.save("xxx/RF_model")

# 模型調(diào)用
rf=RandomForestClassificationModel.load("xxx/RF_model")
model_preditions=rf.transform(test_df)

二蜂怎、運行案例

對上面的各個過程的方法進行組裝穆刻,以隨機森林代碼為例:

import pandas as pd
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler  # 特征處理
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import RandomForestClassificationModel


spark=SparkSession.builder.appName('random_forest').getOrCreate()


# 數(shù)據(jù)讀入
pandas_df = pd.read_csv('xxx/affairs.csv')
sc = SparkContext()  # 初始化;SparkContext則是客戶端的核心
sqlContest = SQLContext(sc)  # SQLContext是Spark SQL進行結(jié)構(gòu)化數(shù)據(jù)處理的入口
df = sqlContest.createDataFrame(pandas_df)  # pandas df 轉(zhuǎn)換為 spark df 格式


# 數(shù)據(jù)屬性
print((df.count(),len(df.columns)))
df.printSchema()
df.describe().select('summary','rate_marriage','age','yrs_married','children','religious').show()
df.groupBy('affairs').count().show()
df.groupBy('rate_marriage').count().show()
df.groupBy('rate_marriage','affairs').count().orderBy('rate_marriage','affairs','count',ascending=True).show()
df.groupBy('religious','affairs').count().orderBy('religious','affairs','count',ascending=True).show()
df.groupBy('children','affairs').count().orderBy('children','affairs','count',ascending=True).show()
df.groupBy('affairs').mean().show()


# 特征處理
df_assembler = VectorAssembler(inputCols=['rate_marriage', 'age', 'yrs_married', 'children', 'religious'], outputCol="features")  # 把特征組裝成一個list
df = df_assembler.transform(df)
df.printSchema()
df.show(5,truncate=False)


# 數(shù)據(jù)集劃分
model_df=df.select(['features','affairs'])
train_df,test_df=model_df.randomSplit([0.75,0.25])
train_df.count()
train_df.groupBy('affairs').count().show()
test_df.groupBy('affairs').count().show()


# 模型構(gòu)建
rf_classifier=RandomForestClassifier(labelCol='affairs',numTrees=50).fit(train_df)
rf_predictions=rf_classifier.transform(test_df)
rf_predictions.show()
# 結(jié)果查看
rf_classifier.featureImportances  # 各個特征的權(quán)重


# 模型效果
rf_predictions.groupBy('prediction').count().show()
rf_predictions.select(['probability','affairs','prediction']).show(10,False)
# 多分類模型——準確率
rf_accuracy=MulticlassClassificationEvaluator(labelCol='affairs',metricName='accuracy').evaluate(rf_predictions)
print('The accuracy of RF on test data is {0:.0%}'.format(rf_accuracy))
print(rf_accuracy)
# 多分類模型——精確率
rf_precision=MulticlassClassificationEvaluator(labelCol='affairs',metricName='weightedPrecision').evaluate(rf_predictions)
print('The precision rate on test data is {0:.0%}'.format(rf_precision))
# AUC
rf_auc=BinaryClassificationEvaluator(labelCol='affairs').evaluate(rf_predictions)
print(rf_auc)


# 模型保存
rf_classifier.save("/home/logsaas/pyspark/lalafile/RF_model")

# 模型調(diào)用
rf=RandomForestClassificationModel.load("/home/logsaas/pyspark/lalafile/RF_model")
model_preditions=rf.transform(test_df)
model_preditions.show()

參考資料

[1] SparkSession與SparkContext:https://blog.csdn.net/qq_35495339/article/details/98119422
[2] Spark中DataFrame與Pandas中DataFrame的區(qū)別:https://blog.csdn.net/u013129944/article/details/80019546
[3] pyspark 官網(wǎng)解釋:http://spark.apache.org/docs/latest/api/python/pyspark.ml.html
[4] pyspark.ml.feature函數(shù)中文簡介:https://blog.csdn.net/yw_vine/article/details/80117759
[5] Spark特征工程:https://blog.csdn.net/u012050154/article/details/60766387
[6] http://dblab.xmu.edu.cn/blog/1709-2/ 子雨
[7] Machine Learning with PySpark by Pramod Singh
[8] https://github.com/Apress/machine-learning-with-pyspark

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末杠步,一起剝皮案震驚了整個濱河市氢伟,隨后出現(xiàn)的幾起案子榜轿,更是在濱河造成了極大的恐慌,老刑警劉巖朵锣,帶你破解...
    沈念sama閱讀 211,561評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件谬盐,死亡現(xiàn)場離奇詭異,居然都是意外死亡诚些,警方通過查閱死者的電腦和手機蔗喂,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,218評論 3 385
  • 文/潘曉璐 我一進店門谚赎,熙熙樓的掌柜王于貴愁眉苦臉地迎上來千扔,“玉大人命浴,你說我怎么就攤上這事∫文” “怎么了?”我有些...
    開封第一講書人閱讀 157,162評論 0 348
  • 文/不壞的土叔 我叫張陵寡键,是天一觀的道長掀泳。 經(jīng)常有香客問我,道長西轩,這世上最難降的妖魔是什么员舵? 我笑而不...
    開封第一講書人閱讀 56,470評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮藕畔,結(jié)果婚禮上马僻,老公的妹妹穿的比我還像新娘。我一直安慰自己注服,他們只是感情好韭邓,可當我...
    茶點故事閱讀 65,550評論 6 385
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著溶弟,像睡著了一般女淑。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上辜御,一...
    開封第一講書人閱讀 49,806評論 1 290
  • 那天鸭你,我揣著相機與錄音,去河邊找鬼擒权。 笑死袱巨,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的碳抄。 我是一名探鬼主播愉老,決...
    沈念sama閱讀 38,951評論 3 407
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼剖效!你這毒婦竟也來了俺夕?” 一聲冷哼從身側(cè)響起裳凸,我...
    開封第一講書人閱讀 37,712評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎劝贸,沒想到半個月后姨谷,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,166評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡映九,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,510評論 2 327
  • 正文 我和宋清朗相戀三年梦湘,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片件甥。...
    茶點故事閱讀 38,643評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡捌议,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出引有,到底是詐尸還是另有隱情瓣颅,我是刑警寧澤,帶...
    沈念sama閱讀 34,306評論 4 330
  • 正文 年R本政府宣布譬正,位于F島的核電站宫补,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏曾我。R本人自食惡果不足惜粉怕,卻給世界環(huán)境...
    茶點故事閱讀 39,930評論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望抒巢。 院中可真熱鬧贫贝,春花似錦、人聲如沸蛉谜。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,745評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽型诚。三九已至蜈彼,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間俺驶,已是汗流浹背幸逆。 一陣腳步聲響...
    開封第一講書人閱讀 31,983評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留暮现,地道東北人还绘。 一個月前我還...
    沈念sama閱讀 46,351評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像栖袋,于是被迫代替她去往敵國和親拍顷。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,509評論 2 348

推薦閱讀更多精彩內(nèi)容