在spark中提供了兩個機(jī)器學(xué)習(xí)庫mllib和ml橡疼,mllib的操作是基于RDD的忿磅,而ml則是基于DataFrame莫秆,是主流機(jī)器學(xué)習(xí)庫崔拥。
1幸缕、ml包的概述
ml包包括三個主要的抽象類:轉(zhuǎn)換器(Transformer)群发、評估器(Estimator)和管道(Pipeline)晰韵。
1.1 轉(zhuǎn)換器
轉(zhuǎn)換器類通過將一個新列附加到DataFrame來轉(zhuǎn)換數(shù)據(jù)。
從高層次上看熟妓,當(dāng)從轉(zhuǎn)換器的抽象類派生時雪猪,每個新的轉(zhuǎn)換器類需要實(shí)現(xiàn).transform()
方法。該方法要求傳遞一個要被轉(zhuǎn)換的DataFrame起愈,該參數(shù)通常是第一個也是唯一的一個強(qiáng)制性參數(shù)只恨。這在ml包的不同方法中也不盡相同:其他常見的參數(shù)有inputCol和outputCol;然而抬虽,這些參數(shù)通常用一些預(yù)定義的值作為默認(rèn)值坤次,例如inputCol參數(shù)的默認(rèn)值為“features”。
spark.ml.feature中提供了許多轉(zhuǎn)換器斥赋,下面做個簡要介紹:
-
Binarizer
, 根據(jù)指定的閾值將連續(xù)變量轉(zhuǎn)換為對應(yīng)的二進(jìn)制值缰猴; -
Bucketizer
, 與Binarizer類似,該方法根據(jù)閾值列表(分割的參數(shù))疤剑,將連續(xù)變量轉(zhuǎn)換為多項值(即將連續(xù)變量離散到指定的范圍區(qū)間)滑绒; -
ChiSqSelector
, 對于分類目標(biāo)變量,此功能允許你選擇預(yù)定義數(shù)量的特征(由numTopFeatures參數(shù)指定)隘膘,以便更好地說明目標(biāo)的變化疑故。如名所示,該方法使用卡方檢驗(yàn)(Chi-Square)完成選擇弯菊。該方法需要兩步:首先纵势,需要fit()
數(shù)據(jù)。調(diào)用fit方法返回一個ChipSelectorModel對象管钳,然后使用該對象的transform方法來轉(zhuǎn)換DataFrame钦铁; -
countVectorizer
, 該方法對于標(biāo)記文本(如[['learning', 'pyspark', 'with', 'us'], ['us', 'us', 'us', 'us']]
)是有用的。這是一個需要兩步的方法:首先才漆,需要用fit從數(shù)據(jù)集中學(xué)習(xí)這些模式牛曹,然后才能使用fit方法返回的CountVectorizerModel對象的transform方法。對于如上所示的標(biāo)記文本醇滥,該轉(zhuǎn)換器的輸出類似于`[(4, [0, 1, 2, 3], [1.0, 1.0, 1.0, 1.0]), (4, [3], [3.0])]黎比。 -
DCT
, 離散余弦變換取實(shí)數(shù)值向量,并返回相同長度的向量鸳玩,但余弦函數(shù)之和在不同頻率下振蕩阅虫。這種轉(zhuǎn)換對于提取數(shù)據(jù)或數(shù)據(jù)壓縮中的一些基本頻率很有用。 -
ElementwiseProduct
, 該方法返回一個向量不跟,其中的元素是傳入該方法的向量和另一個傳入?yún)?shù)scalingVec向量的乘積颓帝。例如,如果傳入的向量是[10.0, 3.0, 15.0], 而傳入的scalingVec為[0.99, 3.30, 0.66], 那么將獲得如下所示的向量:[9.9, 9.9, 9.9, 9.9]; -
HashingTF
, 一個哈希轉(zhuǎn)換器躲履,輸入為標(biāo)記文本的列表,返回一個帶有技術(shù)的有預(yù)定長度的向量聊闯。摘自pyspark文檔:"由于使用簡單的模數(shù)將散列函數(shù)轉(zhuǎn)換為列索引工猜,建議使用2的冪作為numFeatures參數(shù);否則特征將不會均勻的映射到列"菱蔬; -
IDF
, 該方法計算文檔列表的逆向文件頻率篷帅。請注意,文檔需要提前用向量表示(例如拴泌,使用HashingTF或CountVectorizer)魏身; -
IndexToString
, 與StringIndexer方法對應(yīng)。它使用StringIndexerModel對象中的編碼將字符串索引反轉(zhuǎn)到原始值蚪腐。另外請注意箭昵,如果有時不起作用,你需要指定StringIndexer中的值回季; -
MaxAbsScaler
, 將數(shù)據(jù)調(diào)整到[-1,0,1]范圍內(nèi)(因此不會移動數(shù)據(jù)中心)家制; -
MinMaxScaler
, 這與MaxAbsScaler相似,區(qū)別在于它將數(shù)據(jù)縮放到[0.0, 1.0]范圍內(nèi)泡一; -
NGram
, 此方法的輸入為標(biāo)記文本的列表颤殴,返回結(jié)果包含一系列n-gram:以兩個詞、三個詞或更多的n個詞記為一個n-gram鼻忠。例如涵但,如果你有一個['good', 'morning', 'Robin', 'Williams'], 你會得到以下輸出:['good morning', 'morning Robin', 'Robin Williams']; -
Normlizer
, 該方法使用p范數(shù)將數(shù)據(jù)縮放為單位范數(shù)(默認(rèn)為L2); -
OneHotEncoder
, 該方法將分類列編碼為二進(jìn)制向量列; -
PCA
, 使用主成分分析執(zhí)行數(shù)據(jù)降維帖蔓; -
PolynomiaExpansion
, 執(zhí)行向量的多項式展開矮瘟。例如,加入你有一個如[x,y,z]的向量塑娇,則該方法將產(chǎn)生一下擴(kuò)展:[x, xx, y, xy, yy, z, xz, yz, zz]; -
QuantileDiscretizer
, 與Bucketizer方法類似芥永,但不是傳遞分隔參數(shù),而是傳遞一個numBuckets參數(shù)钝吮。然后埋涧,該方法通過計算數(shù)據(jù)的近似分位數(shù)來決定分隔應(yīng)該是什么; -
RegexTokenizer
, 這是一個使用正則表達(dá)式的字符串分詞器奇瘦; -
RFormula
, 對于狂熱的R用戶棘催,你可以傳遞一個公式,如vec ~ alpha*3 + beta (假設(shè)你的DataFrame具有alpha和beta列)耳标,它將產(chǎn)生給定表達(dá)式的vec列醇坝; -
SQLTransformer
, 與上面相似,但不是類似R的公式,你可以使用SQL語法(FROM 語句應(yīng)該從__THIS中選擇呼猪,表示你正在訪問DataFrame画畅,如SELECT alpha*3 + beta AS vec FROM THIS); -
StandardScaler
, 標(biāo)準(zhǔn)化列,使其擁有零均值和等于1的標(biāo)準(zhǔn)差宋距; -
StopWordsRemover
, 從標(biāo)記文本中刪除停用詞(如'the', 'a'); -
StringIndexer
, 假設(shè)包含所有單詞的列表都在一列轴踱,這將產(chǎn)生一個索引向量; -
Tokenizer
(分詞器):該默認(rèn)分詞器將字符串轉(zhuǎn)成小寫谚赎,然后以空格為分隔符分詞淫僻; -
VectorAssembler
, 這是一個非常有用的轉(zhuǎn)換器,他將多個數(shù)字(包括向量)列合并為一列向量壶唤; -
VectorIndexer
, 該方法為類別列生成索引向量雳灵。它以逐列方式工作,從列中選擇不同的值闸盔,排序并從映射中返回值的索引而不是原始值悯辙; -
VectorSlicer
, 用于特征向量,不管是密集的還是稀疏的:給定一個索引列表迎吵,它從特征向量中提取值笑撞; -
Word2Vec
, 該方法將一個句子(字符串)作為輸入 ,并將其轉(zhuǎn)換為{string, vector}格式的映射钓觉,這種表示在自然語言處理中非常有用茴肥。
1.2 評估器
評估器可以被視為需要評估的統(tǒng)計模型,對你的觀測對象做預(yù)測或分類荡灾。
如果從抽象的評估器類派生瓤狐,新模型必須實(shí)現(xiàn)fit方法,該方法給出的在DataFrame中找到的數(shù)據(jù)和某些默認(rèn)或自定義的參數(shù)來擬合模型批幌。在pyspark中有很多評估器可用础锐,下面簡要介紹下spark中提供的模型。
分類
ML包提供了七種分類(Classification)模型以供選擇荧缘,從最簡單的邏輯回歸到一些更復(fù)雜的模型皆警,下面作簡要的描述:
-
LogisticRegression
, 分類的基準(zhǔn)模型。邏輯回歸使用一個對數(shù)函數(shù)來計算屬于特定類別的觀察對象的概率截粗; -
DecisionTreeClassifier
, 該分類器構(gòu)建了一個決策樹來預(yù)測一個觀察對象的所屬類別信姓。指定maxDepth參數(shù)限制樹的深度,minInstancePerNode確定需要進(jìn)一步拆分的樹節(jié)點(diǎn)的觀察對象的最小數(shù)量绸罗,maxBins參數(shù)指定連續(xù)變量將被分割的Bin的最大數(shù)量意推,而impurity指定用于測量并計算來自分隔的信息的度量; -
GBTClassifier
, 用于分類的梯度提升決策樹模型珊蟀。該模型屬于集合模型家族:集合模型結(jié)合多個弱預(yù)測模型而形成一個強(qiáng)健的模型菊值; -
RandomForestClassifier
,該模型產(chǎn)生多個決策樹,并使用模式輸出的決策樹來對觀察對象進(jìn)行分類腻窒; -
NaiveBayes
, 基于貝葉斯定理昵宇,該模型使用條件概率理論對觀測進(jìn)行分類; -
MultilayerPerceptronClassfier
, 多層感知器分類器儿子。模仿人類大腦本質(zhì)的分類器瓦哎,深深植根于人造神經(jīng)網(wǎng)絡(luò)理論,該模型是一個黑盒模型典徊,內(nèi)部參數(shù)不易解釋。該模型至少包含三個完全相連的人造神經(jīng)元層:輸入層(需要和數(shù)據(jù)集中特征的數(shù)量一樣)恩够、多個隱藏層(至少一個)以及一個輸出層卒落,其神經(jīng)元數(shù)量等于標(biāo)簽中的類別數(shù)量。輸入層和隱藏層中的所有神經(jīng)元都有sigmoid激活函數(shù)蜂桶,而輸出神經(jīng)元的激活函數(shù)則為softmax儡毕。 -
OneVsRest
,將多分類問題簡化為二分類問題扑媚。例如腰湾,在多標(biāo)簽的情況下,模型可以訓(xùn)練成多個二元邏輯回歸模型疆股。如多標(biāo)簽情況下费坊,模型可以訓(xùn)練成多個二元邏輯回歸模型。所有模型分別計分旬痹,具有最高概率的模型獲勝附井。
回歸
pyspark ML軟件包中有七種可用于回歸(Regression)任務(wù)的模型。與分類一樣两残,范圍從一些基本的回歸(如強(qiáng)制線性回歸)到更復(fù)雜的回歸:
-
AFTSurvivalRegression
永毅,適合加速失效時間回歸模型。它是一個參數(shù)化模型人弓,假設(shè)其中一個特征的邊際效應(yīng)加速或減緩了預(yù)期壽命(或過程失斦铀馈)。它非常適用于具有明確階段的過程崔赌; -
DecisionTreeRegressor
意蛀, 類似于分類模型,明顯不同的是其標(biāo)簽是連續(xù)的而不是二元的健芭; -
GBTRegressor
, 與DecisionTressRegressor一樣浸间,區(qū)別在于標(biāo)簽的數(shù)據(jù)類型; -
GeneralizedLinearRegression
吟榴,廣義線性回歸是具有不同內(nèi)核功能(鏈接功能)的線性模型家族魁蒜。與假設(shè)誤差項的常態(tài)性的線性回歸相反,GLM允許標(biāo)簽具有不同的誤差分布項:pyspark ML包的generalizedRegression模型支持gaussian、binomial兜看、gamma锥咸、和possion家族的誤差分布,他們有許多不同的連接功能细移; -
IsotonicRegression
, 這種回歸擬合一個形式自由的搏予、非遞減的行到數(shù)據(jù)中。對于擬合有序的和遞增的觀測數(shù)據(jù)集是有用的弧轧; -
LinearRgression
, 最簡單的回歸模型雪侥,他架設(shè)了特征與連續(xù)標(biāo)簽以及誤差項的常態(tài)之間的線性關(guān)系; -
RandomForestRegressor
, 與DecisionTreeRegressor或GBTRegressor類似精绎,RandomForestRegressor適合連續(xù)的標(biāo)簽速缨,而不是離散標(biāo)簽。
聚類
聚類是一系列無監(jiān)督模型代乃,用于查找數(shù)據(jù)中的隱含模式旬牲。pyspark ML包提供了四種當(dāng)前最流行的模型:
-
BisectingKMeans
, 二分k均值算法,該算法結(jié)合了k均值聚類算法和層次聚類算法搁吓。最初該算法將所有觀察點(diǎn)作為一個簇原茅,然后將數(shù)據(jù)迭代的分解為k個簇; -
KMeans
, K均值算法堕仔,將數(shù)據(jù)分成k個簇擂橘,迭代地搜索那些使每個觀察點(diǎn)和它所屬簇的質(zhì)點(diǎn)之間距離平方和最小的那些質(zhì)點(diǎn); -
GaussianMixture
, 混合高斯模型摩骨。該方法使用具有未知參數(shù)的k個高斯分布來剖析數(shù)據(jù)集贝室。使用期望最大化算法,通過最大化對數(shù)似然函數(shù)找到高斯參數(shù)仿吞; -
LDA
, 該模型用于自然語言處理應(yīng)用程序中的主題生成滑频; - 除此之外,pyspark ML還提供了推薦模型唤冈。
1.3 管道
pyspark ML中管道的概念用來表示從轉(zhuǎn)換到評估(具有一系列不同階段)的端到端的過程峡迷,這個過程可以對輸入的一些原始數(shù)據(jù)(以DataFrame形式)執(zhí)行必要的數(shù)據(jù)加工(轉(zhuǎn)換),最后評估模型你虹。
一個管道可以被認(rèn)為是由一系列不同階段組成的绘搞。在Pipeline對象上執(zhí)行fit方法時,所有階段按照stage參數(shù)中指定的順序執(zhí)行傅物;stage參數(shù)是轉(zhuǎn)換器和評估器對象的列表夯辖。管道對象的fit方法執(zhí)行每個轉(zhuǎn)換器的transform方法和所有評估器的fit方法。
通常董饰,前一階段的輸出會成為下一階段的輸入:當(dāng)從轉(zhuǎn)換器或評估器抽象類型派生時蒿褂,需要實(shí)現(xiàn)getOutputCol()方法圆米,該方法返回創(chuàng)建對象時指定的outputCol參數(shù)的值。
下面通過一些例子來詳細(xì)介紹ml的用法啄栓。
2娄帖、例子:使用ML預(yù)測嬰兒生存幾率
在本節(jié)中我們將使用ml中的函數(shù)方法來預(yù)測嬰兒生存率,數(shù)據(jù)可從http://www.tomdrabas.com/data/LearningPySpark/births_transformed.csv.gz下載昙楚。
2.1 加載數(shù)據(jù)
import pyspark.sql.types as typ
from pyspark.ml import Pipeline
import pyspark.ml.classification as cl
import pyspark.ml.evaluation as ev
import pandas as pd
import numpy as np
import os
labels = [('INFANT_ALIVE_AT_REPORT', typ.IntegerType()),
('BIRTH_PLACE', typ.StringType()),
('MOTHER_AGE_YEARS', typ.IntegerType()),
('FATHER_COMBINE_AGE', typ.IntegerType()),
('CIG_BEFORE', typ.IntegerType()),
('CIG_1_TRI', typ.IntegerType()),
('CIG_2_TRI', typ.IntegerType()),
('CIG_3_TRI', typ.IntegerType()),
('MOTHER_HEIGHT_IN', typ.IntegerType()),
('MOTHER_PRE_WEIGHT', typ.IntegerType()),
('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),
('MOTHER_WEIGHT_GAIN', typ.IntegerType()),
('DIABETES_PRE', typ.IntegerType()),
('DIABETES_GEST', typ.IntegerType()),
('HYP_TENS_PRE', typ.IntegerType()),
('HYP_TENS_GEST', typ.IntegerType()),
('PREV_BIRTH_PRETERM', typ.IntegerType())
]
schema = typ.StructType([
typ.StructField(e[0], e[1], False) for e in labels
])
births = spark.read.csv(
'/Users/shexuan/Downloads/births_transformed.csv.gz', header=True, schema=schema)
births.show(3)
+----------------------+-----------+----------------+------------------+----------+---------+---------+---------+----------------+-----------------+----------------------+------------------+------------+-------------+------------+-------------+------------------+
|INFANT_ALIVE_AT_REPORT|BIRTH_PLACE|MOTHER_AGE_YEARS|FATHER_COMBINE_AGE|CIG_BEFORE|CIG_1_TRI|CIG_2_TRI|CIG_3_TRI|MOTHER_HEIGHT_IN|MOTHER_PRE_WEIGHT|MOTHER_DELIVERY_WEIGHT|MOTHER_WEIGHT_GAIN|DIABETES_PRE|DIABETES_GEST|HYP_TENS_PRE|HYP_TENS_GEST|PREV_BIRTH_PRETERM|
+----------------------+-----------+----------------+------------------+----------+---------+---------+---------+----------------+-----------------+----------------------+------------------+------------+-------------+------------+-------------+------------------+
| 0| 1| 29| 99| 0| 0| 0| 0| 99| 999| 999| 99| 0| 0| 0| 0| 0|
| 0| 1| 22| 29| 0| 0| 0| 0| 65| 180| 198| 18| 0| 0| 0| 0| 0|
| 0| 1| 38| 40| 0| 0| 0| 0| 63| 155| 167| 12| 0| 0| 0| 0| 0|
+----------------------+-----------+----------------+------------------+----------+---------+---------+---------+----------------+-----------------+----------------------+------------------+------------+-------------+------------+-------------+------------------+
在這里我們指定DataFrame的schema近速,限制數(shù)據(jù)集只有17列。
2.2 創(chuàng)建轉(zhuǎn)換器
在使用模型對數(shù)據(jù)集進(jìn)行評估預(yù)測前堪旧,先要對數(shù)據(jù)做一些特征轉(zhuǎn)換削葱。
# 創(chuàng)建轉(zhuǎn)換器、評估器
import pyspark.ml.feature as ft
births = births.withColumn('BIRTH_PLACE_INT', births['BIRTH_PLACE']\
.cast(typ.IntegerType()))
# birth place使用one-hot編碼
encoder = ft.OneHotEncoder(inputCol='BIRTH_PLACE_INT',
outputCol='BIRTH_PLACE_VEC')
# 創(chuàng)建單一的列將所有特征整合在一起
featuresCreator = ft.VectorAssembler(
inputCols=[col[0] for col in labels[2:]] + [encoder.getOutputCol()],
outputCol='features'
)
# 創(chuàng)建一個評估器
import pyspark.ml.classification as cl
logistic = cl.LogisticRegression(maxIter=10,
regParam=0.01,
featuresCol=featuresCreator.getOutputCol(),
labelCol='INFANT_ALIVE_AT_REPORT')
2.3 創(chuàng)建一個管道淳梦、擬合模型
在前面我們已經(jīng)創(chuàng)建了數(shù)據(jù)轉(zhuǎn)換器和評估器析砸,現(xiàn)在我們可以通過管道將他們串聯(lián)起來并方便的進(jìn)行模型擬合了。
# 創(chuàng)建一個管道
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[encoder, featuresCreator, logistic])
# 擬合模型
birth_train, birth_test = births.randomSplit([0.7,0.3],seed=123)
model = pipeline.fit(birth_train)
test_model = model.transform(birth_test)
2.4 評估模型
在前面我們將數(shù)據(jù)分為了兩部分并通過管道方便的對訓(xùn)練集進(jìn)行了擬合以及對測試集進(jìn)行了測試√房纾現(xiàn)在我們可以通過測試集的結(jié)果來對模型擬合效果進(jìn)行評估了干厚。
# 評估模型性能
import pyspark.ml.evaluation as ev
evaluator = ev.BinaryClassificationEvaluator(
rawPredictionCol='probability',
labelCol='INFANT_ALIVE_AT_REPORT'
)
print(evaluator.evaluate(test_model, {evaluator.metricName:'areaUnderROC'}))
print(evaluator.evaluate(test_model, {evaluator.metricName:'areaUnderPR'}))
0.7187355793173213
0.6819691176245866
2.5 保存模型
PySpark不僅允許保存訓(xùn)練好的模型李滴,還可以保存管道結(jié)構(gòu)及所有轉(zhuǎn)換器和評估器的定義螃宙。
# 保存模型pipeline
pipelinePath = './infant_oneHotEncoder_Logistic_Pipeline'
pipeline.write().overwrite().save(pipelinePath)
# 重載模型pipeline
loadedPipeline = Pipeline.load(pipelinePath)
loadedPipeline.fit(birth_train).transform(birth_test).take(1)
[Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', ...]
# 保存模型
from pyspark.ml import PipelineModel
modelPath = './infant_oneHotEncoder_LogisticPipelineModel'
model.write().overwrite().save(modelPath)
# 載入模型
loadedPipelineModel = PipelineModel.load(modelPath)
test_reloadedModel = loadedPipelineModel.transform(birth_test)
test_reloadedModel.take(1)
[Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=12, ...]
2.6 超參調(diào)優(yōu)
我們的第一個模型幾乎不可能是最好的模型。利用超參調(diào)優(yōu)能幫我們找到模型的最佳參數(shù)所坯,如邏輯回歸模型所需的最佳迭代次數(shù)或決策樹的最大深度谆扎。
在超參調(diào)優(yōu)時PySpark提供了兩種驗(yàn)證方法:K-Fold交叉驗(yàn)證和train-validation(相當(dāng)于1-Fold)交叉驗(yàn)證。
# 超參調(diào)優(yōu):grid search和train-validation splitting
# 網(wǎng)格搜索
import pyspark.ml.tuning as tune
logistic = cl.LogisticRegression(labelCol='INFANT_ALIVE_AT_REPORT')
grid = tune.ParamGridBuilder()\
.addGrid(logistic.maxIter, [5,10,50])\
.addGrid(logistic.regParam, [0.01,0.05,0.3])\
.build()
evaluator = ev.BinaryClassificationEvaluator(
rawPredictionCol='probability',
labelCol='INFANT_ALIVE_AT_REPORT'
)
# 使用K-Fold交叉驗(yàn)證評估各種參數(shù)的模型
cv = tune.CrossValidator(
estimator=logistic,
estimatorParamMaps=grid,
evaluator=evaluator,
numFolds=3
)
# 創(chuàng)建一個構(gòu)建特征的pipeline
pipeline = Pipeline(stages=[encoder, featuresCreator])
birth_train, birth_test = births.randomSplit([0.7,0.3],seed=123) # 重新打開數(shù)據(jù)進(jìn)行處理
data_transformer = pipeline.fit(birth_train)
data_test = data_transformer.transform(birth_test)
# cvModel 返回估計的最佳模型
cvModel = cv.fit(data_transformer.transform(birth_train))
results = cvModel.transform(data_test)
print(evaluator.evaluate(results, {evaluator.metricName:'areaUnderROC'}))
print(evaluator.evaluate(results, {evaluator.metricName:'areaUnderPR'}))
0.735848884034915
0.6959036715961695
使用下面的代碼可以查看模型最佳參數(shù):
# 查看最佳模型參數(shù)
param_maps = cvModel.getEstimatorParamMaps()
eval_metrics = cvModel.avgMetrics
param_res = []
for params, metric in zip(param_maps, eval_metrics):
param_metric = {}
for key, param_val in zip(params.keys(), params.values()):
param_metric[key.name]=param_val
param_res.append((param_metric, metric))
sorted(param_res, key=lambda x:x[1], reverse=True)
[({'maxIter': 50, 'regParam': 0.01}, 0.7406291618177623),
({'maxIter': 10, 'regParam': 0.01}, 0.735580969909943),
({'maxIter': 50, 'regParam': 0.05}, 0.7355100622938429),
({'maxIter': 10, 'regParam': 0.05}, 0.7351586303619441),
({'maxIter': 10, 'regParam': 0.3}, 0.7248698034708339),
({'maxIter': 50, 'regParam': 0.3}, 0.7214679272915997),
({'maxIter': 5, 'regParam': 0.3}, 0.7180255703028883),
({'maxIter': 5, 'regParam': 0.01}, 0.7179304617840288),
({'maxIter': 5, 'regParam': 0.05}, 0.7173397593133481)]
上面使用的使用K-Fold來進(jìn)行超參調(diào)優(yōu)芹助,K-Fold交叉驗(yàn)證往往非常耗時堂湖,使用1-Fold的交叉驗(yàn)證(即將數(shù)據(jù)集按比例分為訓(xùn)練集合驗(yàn)證集)能大大縮短時間。
# Train-validation劃分
# 使用卡方檢驗(yàn)選擇特征
selector = ft.ChiSqSelector(
numTopFeatures=5,
featuresCol=featuresCreator.getOutputCol(),
outputCol='selectedFeatures',
labelCol='INFANT_ALIVE_AT_REPORT'
)
logistic = cl.LogisticRegression(labelCol='INFANT_ALIVE_AT_REPORT',
featuresCol='selectedFeatures')
pipeline = Pipeline(stages=[encoder, featuresCreator, selector])
data_transformer = pipeline.fit(birth_train)
tvs = tune.TrainValidationSplit(estimator=logistic,
estimatorParamMaps=grid,
evaluator=evaluator,
trainRatio=0.75
)
tvsModel = tvs.fit(data_transformer.transform(birth_train))
data_test = data_transformer.transform(birth_test)
results = tvsModel.transform(data_test)
print(evaluator.evaluate(results, {evaluator.metricName:'areaUnderROC'}))
print(evaluator.evaluate(results, {evaluator.metricName:'areaUnderPR'}))
0.6111344483529891
0.5735913338089571
3状土、使用PySpark ML的其他功能
在上面我們完整的介紹了利用pyspark ml庫來進(jìn)行建模的過程无蜂。下面我們介紹一些其他常用的功能。
3.1 特征提取
3.1.1 NLP相關(guān)特征提取
如第一部分所述蒙谓,NGram模型采用標(biāo)記文本的列表斥季,并生成單詞對(或n-gram)。
本例中累驮,我們從pyspark的文檔中摘錄一段酣倾,并介紹如何在將文本傳遞給NGram模型之前進(jìn)行清理。
# NLP相關(guān)特征提劝ā(NGram模型采用標(biāo)記文本的列表躁锡,并生成單詞對或n-gram)
text_data = spark.createDataFrame([
['''K-fold cross validation performs model selection by splitting the dataset into a set of non-overlapping
randomly partitioned folds which are used as separate training and test datasets e.g., with k=3 folds,
K-fold cross validation will generate 3 (training, test) dataset pairs, each of which uses 2/3 of the data
for training and 1/3 for testing. Each fold is used as the test set exactly once.'''],
['''CrossValidatorModel contains the model with the highest average cross-validation metric across folds and
uses this model to transform input data. CrossValidatorModel also tracks the metrics for each param map
evaluated.'''],
['''Creates a copy of this instance with a randomly generated uid and some extra params. This copies the
underlying bestModel, creates a deep copy of the embedded paramMap, and copies the embedded and extra
parameters over.''']
], ['input'])
# 將文本拆分成單詞
tokenizer = ft.RegexTokenizer(inputCol='input',
outputCol='input_arr',
pattern='\s+|[,.\"]')
# 刪掉停用詞
stopwords = ft.StopWordsRemover(inputCol=tokenizer.getOutputCol(),
outputCol='input_stop')
# 生成ngram詞對
ngram = ft.NGram(n=2,
inputCol=stopwords.getOutputCol(),
outputCol='nGrams')
# 構(gòu)建特征pipeline
pipeline = Pipeline(stages=[tokenizer, stopwords, ngram])
data_ngram = pipeline\
.fit(text_data)\
.transform(text_data)
data_ngram.show()
+--------------------+--------------------+--------------------+--------------------+
| input| input_arr| input_stop| nGrams|
+--------------------+--------------------+--------------------+--------------------+
|K-fold cross vali...|[k-fold, cross, v...|[k-fold, cross, v...|[k-fold cross, cr...|
|CrossValidatorMod...|[crossvalidatormo...|[crossvalidatormo...|[crossvalidatormo...|
|Creates a copy of...|[creates, a, copy...|[creates, copy, i...|[creates copy, co...|
+--------------------+--------------------+--------------------+--------------------+
3.1.2 離散連續(xù)變量
我們常常需要處理高度非線性連續(xù)特征,很難只用一個系數(shù)來供給模型置侍。這種情況下映之,可能難以用一個系數(shù)來解釋這樣的特征與目標(biāo)之間的關(guān)系拦焚。有時候,將值劃分成分類級別是很有用的惕医。
# 離散連續(xù)變量
x = np.arange(0, 100)
x = (x/100.0)*np.pi*4
y = x*np.sin(x/1.764)+20.1234
schema = typ.StructType([typ.StructField('continuous_var', typ.DoubleType(), nullable=False)])
data = spark.createDataFrame([[float(e)] for e in y], schema=schema)
data.show(4)
+------------------+
| continuous_var|
+------------------+
| 20.1234|
|20.132344452369832|
|20.159087064491775|
|20.203356291885854|
+------------------+
# 使用QuantileDiscretizer模型將連續(xù)變量分為五個分類級別
discretizer = ft.QuantileDiscretizer(
numBuckets=5,
inputCol='continuous_var',
outputCol='discritized'
)
data_discretized = discretizer.fit(data).transform(data)
data_discretized.show(5,truncate=False)
+------------------+-----------+
|continuous_var |discritized|
+------------------+-----------+
|20.1234 |2.0 |
|20.132344452369832|2.0 |
|20.159087064491775|2.0 |
|20.203356291885854|2.0 |
|20.26470185735763 |2.0 |
+------------------+-----------+
3.1.3 標(biāo)準(zhǔn)化連續(xù)變量
標(biāo)準(zhǔn)化連續(xù)變量不僅有助于更好地理解特征之間的關(guān)系耕漱,而且還有助于計算效率,并防止運(yùn)行到某些數(shù)字陷阱抬伺。
# 標(biāo)準(zhǔn)化連續(xù)變量
# 首先螟够,要創(chuàng)建一個向量代表連續(xù)變量(因?yàn)樗皇且粋€float)
vectorizer = ft.VectorAssembler(inputCols=['continuous_var'],
outputCol='continuous_vec')
normlizer = ft.StandardScaler(inputCol=vectorizer.getOutputCol(),
outputCol='normlized',
withMean=True,
withStd=True)
pipeline = Pipeline(stages=[vectorizer, normlizer])
data_standardized = pipeline.fit(data).transform(data)
data_standardized.show(4)
+------------------+--------------------+--------------------+
| continuous_var| continuous_vec| normlized|
+------------------+--------------------+--------------------+
| 20.1234| [20.1234]|[0.23429139554502...|
|20.132344452369832|[20.132344452369832]|[0.23630959828688...|
|20.159087064491775|[20.159087064491775]|[0.24234373105179...|
|20.203356291885854|[20.203356291885854]|[0.25233252325644...|
+------------------+--------------------+--------------------+
3.2 聚類
在前面的例子中我們介紹了如何使用pyspark ml庫來擬合分類模型,在本節(jié)我們將簡單介紹pyspark ml庫中的聚類模型峡钓。
聚類是機(jī)器學(xué)習(xí)中的另一個重要組成部分:通常在現(xiàn)實(shí)世界中妓笙,我們沒有那么幸運(yùn)具有目標(biāo)特征,所以需要回到一個無監(jiān)督的學(xué)習(xí)范例能岩,來試圖從中發(fā)掘數(shù)據(jù)內(nèi)的模式寞宫。
# 聚類
# 使用Kmeans模型在出生數(shù)據(jù)中查找相似性
import pyspark.ml.clustering as clus
kmeans = clus.KMeans(k=5, featuresCol='features')
pipeline = Pipeline(stages=[encoder, featuresCreator, kmeans])
model = pipeline.fit(birth_train)
test = model.transform(birth_test)
test.groupby('prediction')\
.agg({'*':'count',
'MOTHER_HEIGHT_IN':'avg'})\
.collect()
[Row(prediction=1, avg(MOTHER_HEIGHT_IN)=67.43708609271523, count(1)=453),
Row(prediction=3, avg(MOTHER_HEIGHT_IN)=67.65714285714286, count(1)=245),
Row(prediction=4, avg(MOTHER_HEIGHT_IN)=63.92423385728825, count(1)=8843),
Row(prediction=2, avg(MOTHER_HEIGHT_IN)=84.97315436241611, count(1)=447),
Row(prediction=0, avg(MOTHER_HEIGHT_IN)=65.45034965034965, count(1)=3575)]
3.3 回歸
上面已經(jīng)介紹過了分類和聚類模型,最后我們再來簡單介紹一下回歸模型拉鹃。
在本節(jié)中辈赋,我們將嘗試用給定的一些特征來預(yù)測MOTHER_WEIGHT_GAIN。
# 回歸
# 使用梯度提升決策樹來預(yù)測增加的體重
import pyspark.ml.regression as reg
features = ['MOTHER_AGE_YEARS', 'MOTHER_HEIGHT_IN', 'MOTHER_PRE_WEIGHT',
'DIABETES_PRE', 'DIABETES_GEST', 'HYP_TENS_PRE', 'HYP_TENS_GEST',
'PREV_BIRTH_PRETERM', 'CIG_BEFORE', 'CIG_1_TRI', 'CIG_2_TRI',
'CIG_3_TRI']
featuresCreator = ft.VectorAssembler(
inputCols=[col for col in features[1:]],
outputCol='features'
)
# 這里使用卡方檢驗(yàn)選擇前六個最重要的特征
selector = ft.ChiSqSelector(numTopFeatures=6,
outputCol='selectedFeatures',
labelCol='MOTHER_WEIGHT_GAIN')
regressor = reg.GBTRegressor(maxIter=15,
maxDepth=3,
labelCol='MOTHER_WEIGHT_GAIN')
pipeline = Pipeline(stages=[featuresCreator, selector, regressor])
weight_gain = pipeline.fit(birth_train)
# 測試集評估
evaluator = ev.RegressionEvaluator(predictionCol='prediction',labelCol='MOTHER_WEIGHT_GAIN')
print(evaluator.evaluate(weight_gain.transform(birth_test),
{evaluator.metricName:'r2'}))
0.49363823556949404
雖然模型結(jié)果不太好膏燕,但是我們的重點(diǎn)不在這里钥屈,我們只是為了了解pyspark ml庫中回歸模型的用法。
參考:
《pyspark 實(shí)戰(zhàn)指南:利用python和spark構(gòu)建數(shù)據(jù)密集型應(yīng)用并規(guī)陌颖瑁化部署》