借助于spark的分布式特性耕捞,機器學習與spark的結(jié)合可以解決數(shù)據(jù)規(guī)模大、復雜運算時間久的問題烫幕。
spark提供MLlib組件用于滿足機器學習的需求俺抽。
本文將從機器學習數(shù)據(jù)讀取、數(shù)據(jù)操作较曼、特征處理磷斧、模型訓練、結(jié)果評估、模型保存六個方面展開弛饭。
一冕末、基礎(chǔ)操作
1、sparksession
(1) 創(chuàng)建SparkSession
SparkSession是Spark 2.0引如的新概念侣颂。SparkSession為用戶提供了統(tǒng)一的切入點档桃,來讓用戶學習spark的各項功能[1]。
任何Spark程序的第一步都是先創(chuàng)建SparkSession憔晒。
From pyspark.sql import SparkSession
spark=SparkSession.builder.appName('data_processing').getOrCreate()
(2) SparkSession與SparkContext[1]
在spark的早期版本中藻肄,SparkContext是spark的主要切入點,由于RDD是主要的API拒担,我們通過sparkcontext來創(chuàng)建和操作RDD嘹屯。對于每個其他的API,我們需要使用不同的context(例如對于Streming从撼,我們需要使用StreamingContext抚垄;對于sql,使用sqlContext谋逻;對于Hive呆馁,使用hiveContext)。
但是隨著DataSet和DataFrame的API逐漸成為標準的API毁兆,就需要為他們建立接入點浙滤。所以在spark2.0中,引入SparkSession作為DataSet和DataFrame API的切入點气堕,SparkSession封裝了SparkConf纺腊、SparkContext和SQLContext
至于圖中的RDD(Resilient Distributed Dataset,彈性分布式數(shù)據(jù)集)是Spark中最基本的數(shù)據(jù)抽象茎芭,它代表一個不可變揖膜、可分區(qū)、里面的元素可并行計算的集合梅桩。
這些概念在編碼過程中偶爾會出現(xiàn)壹粟,對于spark分布式運行的詳細架構(gòu)本文就不深入了,只需對SparkSession宿百、SparkContext有個大致的了解趁仙。
2、數(shù)據(jù)加載
(1) 本地數(shù)據(jù)讀取
使用spark方式讀取本地csv垦页。
df = spark.read.csv('XXX.csv',inferSchema=True,header=True)
使用pandas方式讀取本地csv雀费,轉(zhuǎn)換pandas dataframe為spark dataframe。
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext
pandas_df = pd.read_csv('/home/logsaas/pyspark/lalafile/movie_ratings_df.csv')
sc = SparkContext()
sqlContest = SQLContext(sc)
df = sqlContest.createDataFrame(pandas_df)
(2) 數(shù)據(jù)庫讀取
# hive數(shù)據(jù)庫讀取
spark.sql('select * from XX')
(3) Spark_DataFrame與Pandas_DataFrame區(qū)別[2]
Spark中DataFrame與Pandas中DataFrame的主要區(qū)別是工作方式的不同痊焊。
區(qū)別 | pandas | spark |
---|---|---|
工作方式 | 單機模式盏袄,沒有并行機制忿峻,不支持Hadoop,處理大量數(shù)據(jù)有瓶頸 | 分布式并行計算框架辕羽,所有的數(shù)據(jù)和操作自動并行分布在各個集群結(jié)點上炭菌,支持Hadoop,能處理大量數(shù)據(jù) |
DataFrame可變性 | 可變 | Spark中RDDs是不可變的逛漫,因此DataFrame也是不可變的 |
相互轉(zhuǎn)換 | 從spark_df轉(zhuǎn)換:pandas_df = spark_df.toPandas() | 從pandas_df轉(zhuǎn)換:spark_df = SQLContext.createDataFrame(pandas_df) |
3、數(shù)據(jù)操作
由于spark dataframe和python dataframe的區(qū)別赘艳,隨之而來的操作差別也比較大酌毡。
操作 | pandas | spark |
---|---|---|
取列信息 | df[“name”] | df.select(“name”) |
取滿足條件的信息 | df[df[‘a(chǎn)ge’]>21] | df.filter(df[‘a(chǎn)ge’]>21) |
groupby | df.groupby(“A”).avg(“B”) | from pyspark.sql import functions df.groupBy(“A”).agg(functions.avg(“B”), functions.min(“B”), functions.max(“B”)).show() |
data合并 | append/concat等 | df.join() |
去重統(tǒng)計 | len(set(df['title'])) | df.select('title').distinct().count() |
新增一列 | df['newcol']=lists | df.withColumn('newcol',lists) |
其他 | ··· | ··· |
常見的spark dataframe操作如下:
# **基礎(chǔ)描述**
spark_df.count() # 行數(shù)
spark_df.columns # 列名稱
spark_df.printSchema() # 結(jié)構(gòu)及屬性顯示
spark_df.show(5,False) # truncate=False表示不壓縮顯示
spark_df.describe().show() # 均值/最值等描述
# **dataframe操作**
# 取'age','mobile'兩列
spark_df.select('age','mobile').show(5)
# 新增一列:age_after_10_yrs
spark_df.withColumn("age_after_10_yrs",(spark_df["age"]+10)).show(10,False)
# 新建一列age_double,將age轉(zhuǎn)換為double屬性
spark_df.withColumn('age_double',spark_df['age'].cast(DoubleType())).show(10,False)
# 篩選mobile==Vivo的信息
spark_df.filter(spark_df['mobile']=='Vivo').show()
spark_df.filter(spark_df['mobile']=='Vivo').select('age','ratings','mobile').show()
spark_df.filter(spark_df['mobile']=='Vivo').filter(spark_df['experience'] >10).show()
# 去重統(tǒng)計
spark_df.select('mobile').distinct().show()
# 行去重
spark_df=spark_df.dropDuplicates()
# 刪除列
df_new=spark_df.drop('mobile')
# groupby操作
spark_df.groupBy('mobile').count().show(5,False)
spark_df.groupBy('mobile').count().orderBy('count',ascending=False).show(5,False)
spark_df.groupBy('mobile').agg({'experience':'sum'}).show(5,False) # 根據(jù)mobile分區(qū)蕾管,計算experience的sum
# udf 自建sql函數(shù)
from pyspark.sql.functions import udf
def price_range(brand):
if brand in ['Samsung','Apple']:
return 'High Price'
elif brand =='MI':
return 'Mid Price'
else:
return 'Low Price'
brand_udf=udf(price_range,StringType()) # create udf using python function # 輸出為string格式
# 新建一列price_range
spark_df.withColumn('price_range',brand_udf(spark_df['mobile'])).show(10,False)
# 使用lamba創(chuàng)建udf
age_udf = udf(lambda age: "young" if age <= 30 else "senior", StringType()) # using lambda function
# 新建一列age_group
spark_df.withColumn("age_group", age_udf(spark_df.age)).show(10,False)
在spark中很少會用for循環(huán)去處理一個個特征枷踏,一般使用函數(shù)/自建UDF,批量處理掉了掰曾。
比如計算Review列每個數(shù)據(jù)的長度旭蠕。
python模式
review_length = []
for info in text_df['Review']:
···· review_length.apend(length(info))
text_df['length'] = review_length
pyspark模式
from pyspark.sql.functions import length text_df=text_df.withColumn('length',length(text_df['Review']))
3、特征處理
以下為pyspark.ml.feature提供的特征處理功能旷坦,滿足了大部分機器學習的特征處理需求掏熬。
標準化與歸一化
函數(shù) | 備注 |
---|---|
NGram | 正則標準化,不需要fit 直接transform |
Normalizer | 歸一化函數(shù)秒梅,使它的范數(shù)或者數(shù)值范圍在一定的范圍內(nèi) |
MaxAbsScaler | 歸一化函數(shù)旗芬,將列標準化到[0,1]之間,每一個值都除以本列的絕對值最大的數(shù)捆蜀,先fit然后 transform() |
MinMaxScaler | 最大最小歸一化疮丛,先fit然后 transform() |
StandardScaler | 對列進行標準化,先fit再transform |
分箱處理
函數(shù) | 備注 |
---|---|
Binarizer | 將數(shù)值型特征的二值化辆它。將數(shù)據(jù)框中的某一列按照閾值劃分為只包含0誊薄,1的列 |
Bucketizer | 將連續(xù)特征按照splits值進行分箱 |
QuantileDiscretizer | 將連續(xù)列進行分箱操作,numBuckets 表示分箱數(shù)目 |
文本特征處理
函數(shù) | 備注 |
---|---|
CountVectorizer | 只考慮詞匯在文本中出現(xiàn)的頻率 |
HashingTF | 自然語言處理的場景中锰茉,hashingTF使用的比較多 |
IDF | TF-IDF 作為特征提取 |
StringIndexer | 針對單個類別型特征進行轉(zhuǎn)換呢蔫。把字符串的列按照出現(xiàn)頻率進行排序,出現(xiàn)次數(shù)最高的對應的Index為0飒筑,依次下排 |
IndexToString | 索引轉(zhuǎn)字符串 |
Tokenizer | 將字符串列轉(zhuǎn)換成小寫并按空格切分 |
RegexTokenizer | 基于正則的方式進行文檔切分成單詞組 |
Word2Vec | 將words轉(zhuǎn)換成一個vectorSize維的向量 |
OneHotEncoderEstimator | 獨熱編碼相關(guān) |
RFormula | 文本類特征處理咐刨,先 StringIndexer 再 OneHotEncoderEstimator |
特征操作
函數(shù) | 備注 |
---|---|
Imputer | 缺失值填補,默認使用均值或中值(“median”)填補,要計算均值所以要先 fit(),然后再transfrom() |
StopWordsRemover | 英文停用詞移除 |
SQLTransformer | 使用SQL語句創(chuàng)建新的列扬霜,直接transform() |
BucketedRandomProjectionLSH | 基于歐幾里德距離的空間度量 |
MinHashLSH | 基于Jaccard距離的空間度量 |
ElementwiseProduct | 計算inputCol與scaling內(nèi)積定鸟,不需要訓練,直接transform |
PolynomialExpansion | 特征變換著瓶,將特征拓展比如[x,y],如果degree=2,則拓展成[x,xy,y,xx,y*y]联予,所以直接transform即可 |
OneHotEncoder | 獨熱編碼,特征用一個二進制數(shù)字來表示。例如[0, 0, 3]處理后為[ 1., 0., 0., 1., 0., 0., 0., 0., 1.] |
ChiSqSelector | 依據(jù)卡方檢驗沸久,計算類別特征與分類標簽的關(guān)聯(lián)性季眷。該函數(shù)只有先訓練才能知道挑選哪些特征值,所以要先fit,應用的時候再transform |
vector 組合
函數(shù) | 備注 |
---|---|
VectorIndexer | 數(shù)據(jù)集中的類別特征轉(zhuǎn)換卷胯∽庸危可以自動識別哪些特征是類別型的,并且將原始值轉(zhuǎn)換為類別索引 |
VectorSizeHint | 允許用戶顯式指定列的向量大小 |
VectorAssembler | 用于將多個列合并為一個向量列窑睁,直接transform即可挺峡,經(jīng)常用的 |
VectorSlicer | 通過對這些索引的值進行篩選得到新的向量集 |
壓縮降維
函數(shù) | 備注 |
---|---|
PCA | 對特征進行PCA降維,先fit然后 transform() |
DCT | 離散余弦變換(Discrete Cosine Transform)担钮,用于將數(shù)據(jù)或圖像的壓縮 |
FeatureHasher | 特征哈希橱赠,相當于一種降維技巧 |
# 正則標準化,不需要fit 直接transform
ngram = NGram(n=2, inputCol="inputTokens", outputCol="nGrams")
ngram.transform(df)
# 歸一化函數(shù)箫津,使它的范數(shù)或者數(shù)值范圍在一定的范圍內(nèi)
normalizer = Normalizer(p=2.0, inputCol="dense", outputCol="features")
normalizer.transform(df).head().features
# 歸一化函數(shù)狭姨,將列標準化到[0,1]之間
maScaler = MaxAbsScaler(inputCol="a", outputCol="scaled")
model = maScaler.fit(df)
model.transform(df).show()
# 最大最小歸一化,先fit然后 transform()
mmScaler = MinMaxScaler(inputCol="a", outputCol="scaled")
model = mmScaler.fit(df)
model.transform(df)
# 對列進行標準化苏遥,先fit再transform
standardScaler = StandardScaler(inputCol="a", outputCol="scaled")
model = standardScaler.fit(df)
model.transform(df)
# 按閾值1劃分values饼拍,結(jié)果輸出到features,結(jié)果為0/1
binarizer = Binarizer(threshold=1.0, inputCol="values", outputCol="features")
new_df = binarizer.transform(df)
# 將連續(xù)特征按照splits值進行分箱
bucketizer = Bucketizer(splits=[-float("inf"), 0.5, 1.4, float("inf")],inputCol="values", outputCol="buckets")
bucketed = bucketizer.setHandleInvalid("keep").transform(df)
# 將連續(xù)列進行分箱操作田炭,numBuckets 表示分箱數(shù)目
qds = QuantileDiscretizer(numBuckets=2,inputCol="values", outputCol="buckets", relativeError=0.01, handleInvalid="error")
bucketizer = qds.fit(df)
qds.setHandleInvalid("keep").fit(df).transform(df)
# 只考慮詞匯在文本中出現(xiàn)的頻率
cv = CountVectorizer(inputCol="raw", outputCol="vectors")
model = cv.fit(df) # model.vocabulary
new_df = model.transform(df)
# 詞頻統(tǒng)計
hashingTF = HashingTF(numFeatures=10, inputCol="words", outputCol="features")
hashingTF.transform(df)
# 文本tf-idf計算
idf = IDF(minDocFreq=3, inputCol="tf", outputCol="idf")
model = idf.fit(df) # model.idf
model.transform(df)
# 針對單個類別型特征進行轉(zhuǎn)換惕耕,把字符串的列按照出現(xiàn)頻率進行排序
stringIndexer = StringIndexer(inputCol="label",outputCol="indexed", handleInvalid="error",stringOrderType="frequencyDesc")
model = stringIndexer.fit(stringIndDf)
td = model.transform(stringIndDf)
# 將字符串列轉(zhuǎn)換成小寫并按空格切分
tokenizer = Tokenizer(inputCol="text", outputCol="words")
tokenizer.transform(df)
# 基于正則的方式進行文檔切分成單詞組
reTokenizer = RegexTokenizer(inputCol="text", outputCol="words")
reTokenizer.transform(df)
# Word2Vec
word2Vec = Word2Vec(vectorSize=5, seed=42,inputCol="sentence", outputCol="model")
model = word2Vec.fit(doc)
model.transform(doc)
# OneHotEncoderEstimator
ohe = OneHotEncoderEstimator(inputCols=["input"], outputCols=["output"])
model = ohe.fit(df)
model.transform(df)
# RFormula 文本類特征處理
rf = RFormula(formula="y ~ x + s")
model = rf.fit(df)
model.transform(df)
# 缺失值填補,要計算均值所以要先 fit(),然后再transfrom()
imputer = Imputer(inputCols=["a", "b"], outputCols=["out_a", "out_b"])
model = imputer.fit(df)
model.transform(df).show()
# 移除停頓詞
remover = StopWordsRemover(inputCol="text", outputCol="words", stopWords=["b"])
remover.transform(df)
# 使用SQL語句創(chuàng)建新的列
sqlTrans = SQLTransformer(statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlTrans.transform(df)
# 歐幾里德距離的空間度量
brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes",seed=12345, bucketLength=1.0)
model = brp.fit(df)
new_df = model.transform(df)
# 基于Jaccard距離的空間度量
mh = MinHashLSH(inputCol="features", outputCol="hashes", seed=12345)
model = mh.fit(df)
model.transform(df)
# 計算inputCol與scaling內(nèi)積诫肠,不需要訓練司澎,直接transform
ep = ElementwiseProduct(scalingVec=Vectors.dense([1.0, 2.0, 3.0]), inputCol="values", outputCol="eprod")
new_df = ep.transform(df)
# PolynomialExpansion特征變換
px = PolynomialExpansion(degree=2, inputCol="dense", outputCol="expanded")
px.transform(df)
# 獨熱編碼
encoder = OneHotEncoder(inputCol="indexed", outputCol="features")
encoder.transform(td)
# 依據(jù)卡方檢驗特征處理
selector = ChiSqSelector(numTopFeatures=1,outputCol="selectedFeatures")
model = selector.fit(df)
new_df = model.transform(df)
# 數(shù)據(jù)集中的類別特征轉(zhuǎn)換
indexer = VectorIndexer(maxCategories=2, inputCol="a", outputCol="indexed")
model = indexer.fit(df)
model.transform(df)
# 允許用戶顯式指定列的向量大小
sizeHint = VectorSizeHint(inputCol="vector", size=3, handleInvalid="skip") # 先指定大小
vecAssembler = VectorAssembler(inputCols=["vector", "float"], outputCol="assembled") # 整合成一列
pipeline = Pipeline(stages=[sizeHint, vecAssembler])
pipelineModel = pipeline.fit(df) # 功能模塊合并
pipelineModel.transform(df)
# 用于將多個列合并為一個向量列,直接transform即可栋豫,經(jīng)常用的
vecAssembler = VectorAssembler(inputCols=["a", "b", "c"], outputCol="features")
vecAssembler.transform(df)
# 通過對這些索引的值進行篩選得到新的向量集
vs = VectorSlicer(inputCol="features", outputCol="sliced", indices=[1, 4])
vs.transform(df).head().sliced
# PCA
pca = PCA(k=2, inputCol="features", outputCol="pca_features")
model = pca.fit(df)
model.transform(df)
# 離散余弦變換壓縮數(shù)據(jù)
dct = DCT(inverse=False, inputCol="vec", outputCol="resultVec")
df2 = dct.transform(df1)
# 特征哈希挤安,相當于一種降維技巧
hasher = FeatureHasher(inputCols=cols, outputCol="features")
hasher.transform(df).head().features
4、模型訓練
分類模塊
model | 備注 |
---|---|
LinearSVC | 線性分類支持向量機 |
LogisticRegression | 邏輯回歸 |
DecisionTreeClassifier | 決策樹分類 |
GBTClassifier | GBDT梯度提升決策樹 |
RandomForestClassifier | 隨機森林 |
NaiveBayes | 樸素貝葉斯 |
MultilayerPerceptronClassifier | 多層感知機分類器 |
OneVsRest | 將多分類問題簡化為二分類問題 |
# 線性分類支持向量機
svm = LinearSVC(maxIter=5, regParam=0.01) # maxIter最大迭代次數(shù)5次丧鸯,regParam正則化參數(shù)
model = svm.fit(df)
# 邏輯回歸
blor = LogisticRegression(regParam=0.01, weightCol="weight")
blorModel = blor.fit(bdf)
# 決策樹分類
dt = DecisionTreeClassifier(maxDepth=2, labelCol="indexed") # 限定決策樹的最大可能深度為2
model = dt.fit(td)
# GBDT梯度提升決策樹
gbt = GBTClassifier(maxIter=5, maxDepth=2, labelCol="indexed", seed=42)
model = gbt.fit(td)
# 隨機森林
rf = RandomForestClassifier(numTrees=3, maxDepth=2, labelCol="indexed", seed=42)
model = rf.fit(td)
# 樸素貝葉斯
nb = NaiveBayes(smoothing=1.0, modelType="multinomial", weightCol="weight")
model = nb.fit(df)
# 多層感知機
mlp = MultilayerPerceptronClassifier(maxIter=100, layers=[2, 2, 2], blockSize=1, seed=123)
model = mlp.fit(df)
# OneVsRest
lr = LogisticRegression(regParam=0.01)
ovr = OneVsRest(classifier=lr)
model = ovr.fit(df)
聚類模塊
model | 備注 |
---|---|
BisectingKMeans | 二分類KMeans |
KMeans | k均值聚類算法 |
GaussianMixture | 高斯混合模型 |
LDA | LDA主題聚類 |
PowerIterationClustering | 冪迭代聚類 |
# 二分類KMeans
bkm = BisectingKMeans(k=2, minDivisibleClusterSize=1.0)
model = bkm.fit(df)
transformed = model.transform(df)
# k均值聚類算法
kmeans = KMeans(k=2, seed=1)
model = kmeans.fit(df)
transformed = model.transform(df)
# 高斯混合模型
gm = GaussianMixture(k=3, tol=0.0001,maxIter=10, seed=10)
model = gm.fit(df)
transformed = model.transform(df)
# LDA主題聚類
lda = LDA(k=2, seed=1, optimizer="em")
model = lda.fit(df)
# 冪迭代聚類
pic = PowerIterationClustering(k=2, maxIter=40,weightCol="weight")
assignments = pic.assignClusters(df)
回歸模塊
model | 備注 |
---|---|
AFTSurvivalRegression | 生存分析的對數(shù)線性模型 |
DecisionTreeRegressor | 決策樹回歸模型 |
GBTRegressor | 全稱梯度下降樹回歸模型 |
IsotonicRegression | 保序回歸 |
# 生存分析的對數(shù)線性模型
aftsr = AFTSurvivalRegression()
model = aftsr.fit(df)
# 決策樹回歸模型
dt = DecisionTreeRegressor(maxDepth=2, varianceCol="variance")
model = dt.fit(df)
# 全稱梯度下降樹回歸模型
gbt = GBTRegressor(maxIter=5, maxDepth=2, seed=42)
model = gbt.fit(df)
# 保序回歸
ir = IsotonicRegression()
model = ir.fit(df)
推薦模塊
model | 備注 |
---|---|
ALS | (Alternatingleastsquares蛤铜。交替最小二乘法 |
# 推薦系統(tǒng)
als = ALS(rank=10, maxIter=5, seed=0)
model = als.fit(df)
5、結(jié)果評估
model | 備注 |
---|---|
BinaryClassificationEvaluator | 二分類評估 |
RegressionEvaluator | 回歸評估 |
MulticlassClassificationEvaluator | 多分類評估 |
ClusteringEvaluator | 聚類評估 |
evaluator = BinaryClassificationEvaluator(rawPredictionCol="raw")
evaluator.evaluate(dataset)
evaluator = RegressionEvaluator(predictionCol="raw")
evaluator.evaluate(dataset)
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(dataset)
evaluator = ClusteringEvaluator(predictionCol="prediction")
evaluator.evaluate(dataset)
6丛肢、模型保存
直接使用save保存模型围肥,使用load加載訓練結(jié)果。
# rf_classifier為RandomForestClassificationModel訓練完的模型
rf_classifier.save("xxx/RF_model")
# 模型調(diào)用
rf=RandomForestClassificationModel.load("xxx/RF_model")
model_preditions=rf.transform(test_df)
二蜂怎、運行案例
對上面的各個過程的方法進行組裝穆刻,以隨機森林代碼為例:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler # 特征處理
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import RandomForestClassificationModel
spark=SparkSession.builder.appName('random_forest').getOrCreate()
# 數(shù)據(jù)讀入
pandas_df = pd.read_csv('xxx/affairs.csv')
sc = SparkContext() # 初始化;SparkContext則是客戶端的核心
sqlContest = SQLContext(sc) # SQLContext是Spark SQL進行結(jié)構(gòu)化數(shù)據(jù)處理的入口
df = sqlContest.createDataFrame(pandas_df) # pandas df 轉(zhuǎn)換為 spark df 格式
# 數(shù)據(jù)屬性
print((df.count(),len(df.columns)))
df.printSchema()
df.describe().select('summary','rate_marriage','age','yrs_married','children','religious').show()
df.groupBy('affairs').count().show()
df.groupBy('rate_marriage').count().show()
df.groupBy('rate_marriage','affairs').count().orderBy('rate_marriage','affairs','count',ascending=True).show()
df.groupBy('religious','affairs').count().orderBy('religious','affairs','count',ascending=True).show()
df.groupBy('children','affairs').count().orderBy('children','affairs','count',ascending=True).show()
df.groupBy('affairs').mean().show()
# 特征處理
df_assembler = VectorAssembler(inputCols=['rate_marriage', 'age', 'yrs_married', 'children', 'religious'], outputCol="features") # 把特征組裝成一個list
df = df_assembler.transform(df)
df.printSchema()
df.show(5,truncate=False)
# 數(shù)據(jù)集劃分
model_df=df.select(['features','affairs'])
train_df,test_df=model_df.randomSplit([0.75,0.25])
train_df.count()
train_df.groupBy('affairs').count().show()
test_df.groupBy('affairs').count().show()
# 模型構(gòu)建
rf_classifier=RandomForestClassifier(labelCol='affairs',numTrees=50).fit(train_df)
rf_predictions=rf_classifier.transform(test_df)
rf_predictions.show()
# 結(jié)果查看
rf_classifier.featureImportances # 各個特征的權(quán)重
# 模型效果
rf_predictions.groupBy('prediction').count().show()
rf_predictions.select(['probability','affairs','prediction']).show(10,False)
# 多分類模型——準確率
rf_accuracy=MulticlassClassificationEvaluator(labelCol='affairs',metricName='accuracy').evaluate(rf_predictions)
print('The accuracy of RF on test data is {0:.0%}'.format(rf_accuracy))
print(rf_accuracy)
# 多分類模型——精確率
rf_precision=MulticlassClassificationEvaluator(labelCol='affairs',metricName='weightedPrecision').evaluate(rf_predictions)
print('The precision rate on test data is {0:.0%}'.format(rf_precision))
# AUC
rf_auc=BinaryClassificationEvaluator(labelCol='affairs').evaluate(rf_predictions)
print(rf_auc)
# 模型保存
rf_classifier.save("/home/logsaas/pyspark/lalafile/RF_model")
# 模型調(diào)用
rf=RandomForestClassificationModel.load("/home/logsaas/pyspark/lalafile/RF_model")
model_preditions=rf.transform(test_df)
model_preditions.show()
參考資料
[1] SparkSession與SparkContext:https://blog.csdn.net/qq_35495339/article/details/98119422
[2] Spark中DataFrame與Pandas中DataFrame的區(qū)別:https://blog.csdn.net/u013129944/article/details/80019546
[3] pyspark 官網(wǎng)解釋:http://spark.apache.org/docs/latest/api/python/pyspark.ml.html
[4] pyspark.ml.feature函數(shù)中文簡介:https://blog.csdn.net/yw_vine/article/details/80117759
[5] Spark特征工程:https://blog.csdn.net/u012050154/article/details/60766387
[6] http://dblab.xmu.edu.cn/blog/1709-2/ 子雨
[7] Machine Learning with PySpark by Pramod Singh
[8] https://github.com/Apress/machine-learning-with-pyspark