pySpark 機(jī)器學(xué)習(xí)庫ml入門

在spark中提供了兩個機(jī)器學(xué)習(xí)庫mllib和ml橡疼,mllib的操作是基于RDD的忿磅,而ml則是基于DataFrame莫秆,是主流機(jī)器學(xué)習(xí)庫崔拥。

1幸缕、ml包的概述

ml包包括三個主要的抽象類:轉(zhuǎn)換器(Transformer)群发、評估器(Estimator)和管道(Pipeline)晰韵。

1.1 轉(zhuǎn)換器

轉(zhuǎn)換器類通過將一個新列附加到DataFrame來轉(zhuǎn)換數(shù)據(jù)。

從高層次上看熟妓,當(dāng)從轉(zhuǎn)換器的抽象類派生時雪猪,每個新的轉(zhuǎn)換器類需要實(shí)現(xiàn).transform()方法。該方法要求傳遞一個要被轉(zhuǎn)換的DataFrame起愈,該參數(shù)通常是第一個也是唯一的一個強(qiáng)制性參數(shù)只恨。這在ml包的不同方法中也不盡相同:其他常見的參數(shù)有inputCol和outputCol;然而抬虽,這些參數(shù)通常用一些預(yù)定義的值作為默認(rèn)值坤次,例如inputCol參數(shù)的默認(rèn)值為“features”。

spark.ml.feature中提供了許多轉(zhuǎn)換器斥赋,下面做個簡要介紹:

  • Binarizer, 根據(jù)指定的閾值將連續(xù)變量轉(zhuǎn)換為對應(yīng)的二進(jìn)制值缰猴;
  • Bucketizer, 與Binarizer類似,該方法根據(jù)閾值列表(分割的參數(shù))疤剑,將連續(xù)變量轉(zhuǎn)換為多項值(即將連續(xù)變量離散到指定的范圍區(qū)間)滑绒;
  • ChiSqSelector, 對于分類目標(biāo)變量,此功能允許你選擇預(yù)定義數(shù)量的特征(由numTopFeatures參數(shù)指定)隘膘,以便更好地說明目標(biāo)的變化疑故。如名所示,該方法使用卡方檢驗(yàn)(Chi-Square)完成選擇弯菊。該方法需要兩步:首先纵势,需要fit()數(shù)據(jù)。調(diào)用fit方法返回一個ChipSelectorModel對象管钳,然后使用該對象的transform方法來轉(zhuǎn)換DataFrame钦铁;
  • countVectorizer, 該方法對于標(biāo)記文本(如[['learning', 'pyspark', 'with', 'us'], ['us', 'us', 'us', 'us']])是有用的。這是一個需要兩步的方法:首先才漆,需要用fit從數(shù)據(jù)集中學(xué)習(xí)這些模式牛曹,然后才能使用fit方法返回的CountVectorizerModel對象的transform方法。對于如上所示的標(biāo)記文本醇滥,該轉(zhuǎn)換器的輸出類似于`[(4, [0, 1, 2, 3], [1.0, 1.0, 1.0, 1.0]), (4, [3], [3.0])]黎比。
  • DCT, 離散余弦變換取實(shí)數(shù)值向量,并返回相同長度的向量鸳玩,但余弦函數(shù)之和在不同頻率下振蕩阅虫。這種轉(zhuǎn)換對于提取數(shù)據(jù)或數(shù)據(jù)壓縮中的一些基本頻率很有用。
  • ElementwiseProduct, 該方法返回一個向量不跟,其中的元素是傳入該方法的向量和另一個傳入?yún)?shù)scalingVec向量的乘積颓帝。例如,如果傳入的向量是[10.0, 3.0, 15.0], 而傳入的scalingVec為[0.99, 3.30, 0.66], 那么將獲得如下所示的向量:[9.9, 9.9, 9.9, 9.9];
  • HashingTF, 一個哈希轉(zhuǎn)換器躲履,輸入為標(biāo)記文本的列表,返回一個帶有技術(shù)的有預(yù)定長度的向量聊闯。摘自pyspark文檔:"由于使用簡單的模數(shù)將散列函數(shù)轉(zhuǎn)換為列索引工猜,建議使用2的冪作為numFeatures參數(shù);否則特征將不會均勻的映射到列"菱蔬;
  • IDF, 該方法計算文檔列表的逆向文件頻率篷帅。請注意,文檔需要提前用向量表示(例如拴泌,使用HashingTF或CountVectorizer)魏身;
  • IndexToString, 與StringIndexer方法對應(yīng)。它使用StringIndexerModel對象中的編碼將字符串索引反轉(zhuǎn)到原始值蚪腐。另外請注意箭昵,如果有時不起作用,你需要指定StringIndexer中的值回季;
  • MaxAbsScaler, 將數(shù)據(jù)調(diào)整到[-1,0,1]范圍內(nèi)(因此不會移動數(shù)據(jù)中心)家制;
  • MinMaxScaler, 這與MaxAbsScaler相似,區(qū)別在于它將數(shù)據(jù)縮放到[0.0, 1.0]范圍內(nèi)泡一;
  • NGram, 此方法的輸入為標(biāo)記文本的列表颤殴,返回結(jié)果包含一系列n-gram:以兩個詞、三個詞或更多的n個詞記為一個n-gram鼻忠。例如涵但,如果你有一個['good', 'morning', 'Robin', 'Williams'], 你會得到以下輸出:['good morning', 'morning Robin', 'Robin Williams'];
  • Normlizer, 該方法使用p范數(shù)將數(shù)據(jù)縮放為單位范數(shù)(默認(rèn)為L2);
  • OneHotEncoder, 該方法將分類列編碼為二進(jìn)制向量列;
  • PCA, 使用主成分分析執(zhí)行數(shù)據(jù)降維帖蔓;
  • PolynomiaExpansion, 執(zhí)行向量的多項式展開矮瘟。例如,加入你有一個如[x,y,z]的向量塑娇,則該方法將產(chǎn)生一下擴(kuò)展:[x, xx, y, xy, yy, z, xz, yz, zz];
  • QuantileDiscretizer, 與Bucketizer方法類似芥永,但不是傳遞分隔參數(shù),而是傳遞一個numBuckets參數(shù)钝吮。然后埋涧,該方法通過計算數(shù)據(jù)的近似分位數(shù)來決定分隔應(yīng)該是什么;
  • RegexTokenizer, 這是一個使用正則表達(dá)式的字符串分詞器奇瘦;
  • RFormula, 對于狂熱的R用戶棘催,你可以傳遞一個公式,如vec ~ alpha*3 + beta (假設(shè)你的DataFrame具有alpha和beta列)耳标,它將產(chǎn)生給定表達(dá)式的vec列醇坝;
  • SQLTransformer, 與上面相似,但不是類似R的公式,你可以使用SQL語法(FROM 語句應(yīng)該從__THIS中選擇呼猪,表示你正在訪問DataFrame画畅,如SELECT alpha*3 + beta AS vec FROM THIS);
  • StandardScaler, 標(biāo)準(zhǔn)化列,使其擁有零均值和等于1的標(biāo)準(zhǔn)差宋距;
  • StopWordsRemover, 從標(biāo)記文本中刪除停用詞(如'the', 'a');
  • StringIndexer, 假設(shè)包含所有單詞的列表都在一列轴踱,這將產(chǎn)生一個索引向量;
  • Tokenizer (分詞器):該默認(rèn)分詞器將字符串轉(zhuǎn)成小寫谚赎,然后以空格為分隔符分詞淫僻;
  • VectorAssembler, 這是一個非常有用的轉(zhuǎn)換器,他將多個數(shù)字(包括向量)列合并為一列向量壶唤;
  • VectorIndexer, 該方法為類別列生成索引向量雳灵。它以逐列方式工作,從列中選擇不同的值闸盔,排序并從映射中返回值的索引而不是原始值悯辙;
  • VectorSlicer, 用于特征向量,不管是密集的還是稀疏的:給定一個索引列表迎吵,它從特征向量中提取值笑撞;
  • Word2Vec, 該方法將一個句子(字符串)作為輸入 ,并將其轉(zhuǎn)換為{string, vector}格式的映射钓觉,這種表示在自然語言處理中非常有用茴肥。
1.2 評估器

評估器可以被視為需要評估的統(tǒng)計模型,對你的觀測對象做預(yù)測或分類荡灾。

如果從抽象的評估器類派生瓤狐,新模型必須實(shí)現(xiàn)fit方法,該方法給出的在DataFrame中找到的數(shù)據(jù)和某些默認(rèn)或自定義的參數(shù)來擬合模型批幌。在pyspark中有很多評估器可用础锐,下面簡要介紹下spark中提供的模型。

分類

ML包提供了七種分類(Classification)模型以供選擇荧缘,從最簡單的邏輯回歸到一些更復(fù)雜的模型皆警,下面作簡要的描述:

  • LogisticRegression, 分類的基準(zhǔn)模型。邏輯回歸使用一個對數(shù)函數(shù)來計算屬于特定類別的觀察對象的概率截粗;
  • DecisionTreeClassifier, 該分類器構(gòu)建了一個決策樹來預(yù)測一個觀察對象的所屬類別信姓。指定maxDepth參數(shù)限制樹的深度,minInstancePerNode確定需要進(jìn)一步拆分的樹節(jié)點(diǎn)的觀察對象的最小數(shù)量绸罗,maxBins參數(shù)指定連續(xù)變量將被分割的Bin的最大數(shù)量意推,而impurity指定用于測量并計算來自分隔的信息的度量;
  • GBTClassifier, 用于分類的梯度提升決策樹模型珊蟀。該模型屬于集合模型家族:集合模型結(jié)合多個弱預(yù)測模型而形成一個強(qiáng)健的模型菊值;
  • RandomForestClassifier,該模型產(chǎn)生多個決策樹,并使用模式輸出的決策樹來對觀察對象進(jìn)行分類腻窒;
  • NaiveBayes, 基于貝葉斯定理昵宇,該模型使用條件概率理論對觀測進(jìn)行分類;
  • MultilayerPerceptronClassfier, 多層感知器分類器儿子。模仿人類大腦本質(zhì)的分類器瓦哎,深深植根于人造神經(jīng)網(wǎng)絡(luò)理論,該模型是一個黑盒模型典徊,內(nèi)部參數(shù)不易解釋。該模型至少包含三個完全相連的人造神經(jīng)元層:輸入層(需要和數(shù)據(jù)集中特征的數(shù)量一樣)恩够、多個隱藏層(至少一個)以及一個輸出層卒落,其神經(jīng)元數(shù)量等于標(biāo)簽中的類別數(shù)量。輸入層和隱藏層中的所有神經(jīng)元都有sigmoid激活函數(shù)蜂桶,而輸出神經(jīng)元的激活函數(shù)則為softmax儡毕。
  • OneVsRest,將多分類問題簡化為二分類問題扑媚。例如腰湾,在多標(biāo)簽的情況下,模型可以訓(xùn)練成多個二元邏輯回歸模型疆股。如多標(biāo)簽情況下费坊,模型可以訓(xùn)練成多個二元邏輯回歸模型。所有模型分別計分旬痹,具有最高概率的模型獲勝附井。
回歸

pyspark ML軟件包中有七種可用于回歸(Regression)任務(wù)的模型。與分類一樣两残,范圍從一些基本的回歸(如強(qiáng)制線性回歸)到更復(fù)雜的回歸:

  • AFTSurvivalRegression永毅,適合加速失效時間回歸模型。它是一個參數(shù)化模型人弓,假設(shè)其中一個特征的邊際效應(yīng)加速或減緩了預(yù)期壽命(或過程失斦铀馈)。它非常適用于具有明確階段的過程崔赌;
  • DecisionTreeRegressor意蛀, 類似于分類模型,明顯不同的是其標(biāo)簽是連續(xù)的而不是二元的健芭;
  • GBTRegressor, 與DecisionTressRegressor一樣浸间,區(qū)別在于標(biāo)簽的數(shù)據(jù)類型;
  • GeneralizedLinearRegression吟榴,廣義線性回歸是具有不同內(nèi)核功能(鏈接功能)的線性模型家族魁蒜。與假設(shè)誤差項的常態(tài)性的線性回歸相反,GLM允許標(biāo)簽具有不同的誤差分布項:pyspark ML包的generalizedRegression模型支持gaussian、binomial兜看、gamma锥咸、和possion家族的誤差分布,他們有許多不同的連接功能细移;
  • IsotonicRegression, 這種回歸擬合一個形式自由的搏予、非遞減的行到數(shù)據(jù)中。對于擬合有序的和遞增的觀測數(shù)據(jù)集是有用的弧轧;
  • LinearRgression, 最簡單的回歸模型雪侥,他架設(shè)了特征與連續(xù)標(biāo)簽以及誤差項的常態(tài)之間的線性關(guān)系;
  • RandomForestRegressor, 與DecisionTreeRegressor或GBTRegressor類似精绎,RandomForestRegressor適合連續(xù)的標(biāo)簽速缨,而不是離散標(biāo)簽。
聚類

聚類是一系列無監(jiān)督模型代乃,用于查找數(shù)據(jù)中的隱含模式旬牲。pyspark ML包提供了四種當(dāng)前最流行的模型:

  • BisectingKMeans, 二分k均值算法,該算法結(jié)合了k均值聚類算法和層次聚類算法搁吓。最初該算法將所有觀察點(diǎn)作為一個簇原茅,然后將數(shù)據(jù)迭代的分解為k個簇;
  • KMeans, K均值算法堕仔,將數(shù)據(jù)分成k個簇擂橘,迭代地搜索那些使每個觀察點(diǎn)和它所屬簇的質(zhì)點(diǎn)之間距離平方和最小的那些質(zhì)點(diǎn);
  • GaussianMixture, 混合高斯模型摩骨。該方法使用具有未知參數(shù)的k個高斯分布來剖析數(shù)據(jù)集贝室。使用期望最大化算法,通過最大化對數(shù)似然函數(shù)找到高斯參數(shù)仿吞;
  • LDA, 該模型用于自然語言處理應(yīng)用程序中的主題生成滑频;
  • 除此之外,pyspark ML還提供了推薦模型唤冈。
1.3 管道

pyspark ML中管道的概念用來表示從轉(zhuǎn)換到評估(具有一系列不同階段)的端到端的過程峡迷,這個過程可以對輸入的一些原始數(shù)據(jù)(以DataFrame形式)執(zhí)行必要的數(shù)據(jù)加工(轉(zhuǎn)換),最后評估模型你虹。

一個管道可以被認(rèn)為是由一系列不同階段組成的绘搞。在Pipeline對象上執(zhí)行fit方法時,所有階段按照stage參數(shù)中指定的順序執(zhí)行傅物;stage參數(shù)是轉(zhuǎn)換器和評估器對象的列表夯辖。管道對象的fit方法執(zhí)行每個轉(zhuǎn)換器的transform方法和所有評估器的fit方法。

通常董饰,前一階段的輸出會成為下一階段的輸入:當(dāng)從轉(zhuǎn)換器或評估器抽象類型派生時蒿褂,需要實(shí)現(xiàn)getOutputCol()方法圆米,該方法返回創(chuàng)建對象時指定的outputCol參數(shù)的值。

下面通過一些例子來詳細(xì)介紹ml的用法啄栓。

2娄帖、例子:使用ML預(yù)測嬰兒生存幾率

在本節(jié)中我們將使用ml中的函數(shù)方法來預(yù)測嬰兒生存率,數(shù)據(jù)可從http://www.tomdrabas.com/data/LearningPySpark/births_transformed.csv.gz下載昙楚。

2.1 加載數(shù)據(jù)
import pyspark.sql.types as typ
from pyspark.ml import Pipeline
import pyspark.ml.classification as cl
import pyspark.ml.evaluation as ev
import pandas as pd
import numpy as np
import os

labels = [('INFANT_ALIVE_AT_REPORT', typ.IntegerType()),
          ('BIRTH_PLACE', typ.StringType()),
          ('MOTHER_AGE_YEARS', typ.IntegerType()),
          ('FATHER_COMBINE_AGE', typ.IntegerType()),
          ('CIG_BEFORE', typ.IntegerType()),
          ('CIG_1_TRI', typ.IntegerType()),
          ('CIG_2_TRI', typ.IntegerType()),
          ('CIG_3_TRI', typ.IntegerType()),
          ('MOTHER_HEIGHT_IN', typ.IntegerType()),
          ('MOTHER_PRE_WEIGHT', typ.IntegerType()),
          ('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),
          ('MOTHER_WEIGHT_GAIN', typ.IntegerType()),
          ('DIABETES_PRE', typ.IntegerType()),
          ('DIABETES_GEST', typ.IntegerType()),
          ('HYP_TENS_PRE', typ.IntegerType()),
          ('HYP_TENS_GEST', typ.IntegerType()),
          ('PREV_BIRTH_PRETERM', typ.IntegerType())
          ]

schema = typ.StructType([
    typ.StructField(e[0], e[1], False) for e in labels
])

births = spark.read.csv(
    '/Users/shexuan/Downloads/births_transformed.csv.gz', header=True, schema=schema)

births.show(3)
+----------------------+-----------+----------------+------------------+----------+---------+---------+---------+----------------+-----------------+----------------------+------------------+------------+-------------+------------+-------------+------------------+
|INFANT_ALIVE_AT_REPORT|BIRTH_PLACE|MOTHER_AGE_YEARS|FATHER_COMBINE_AGE|CIG_BEFORE|CIG_1_TRI|CIG_2_TRI|CIG_3_TRI|MOTHER_HEIGHT_IN|MOTHER_PRE_WEIGHT|MOTHER_DELIVERY_WEIGHT|MOTHER_WEIGHT_GAIN|DIABETES_PRE|DIABETES_GEST|HYP_TENS_PRE|HYP_TENS_GEST|PREV_BIRTH_PRETERM|
+----------------------+-----------+----------------+------------------+----------+---------+---------+---------+----------------+-----------------+----------------------+------------------+------------+-------------+------------+-------------+------------------+
|                     0|          1|              29|                99|         0|        0|        0|        0|              99|              999|                   999|                99|           0|            0|           0|            0|                 0|
|                     0|          1|              22|                29|         0|        0|        0|        0|              65|              180|                   198|                18|           0|            0|           0|            0|                 0|
|                     0|          1|              38|                40|         0|        0|        0|        0|              63|              155|                   167|                12|           0|            0|           0|            0|                 0|
+----------------------+-----------+----------------+------------------+----------+---------+---------+---------+----------------+-----------------+----------------------+------------------+------------+-------------+------------+-------------+------------------+

在這里我們指定DataFrame的schema近速,限制數(shù)據(jù)集只有17列。

2.2 創(chuàng)建轉(zhuǎn)換器

在使用模型對數(shù)據(jù)集進(jìn)行評估預(yù)測前堪旧,先要對數(shù)據(jù)做一些特征轉(zhuǎn)換削葱。

# 創(chuàng)建轉(zhuǎn)換器、評估器
import  pyspark.ml.feature as ft

births = births.withColumn('BIRTH_PLACE_INT', births['BIRTH_PLACE']\
    .cast(typ.IntegerType()))

# birth place使用one-hot編碼
encoder = ft.OneHotEncoder(inputCol='BIRTH_PLACE_INT',
                           outputCol='BIRTH_PLACE_VEC')

# 創(chuàng)建單一的列將所有特征整合在一起
featuresCreator = ft.VectorAssembler(
    inputCols=[col[0] for col in labels[2:]] + [encoder.getOutputCol()],
    outputCol='features'
)

# 創(chuàng)建一個評估器
import pyspark.ml.classification as cl

logistic = cl.LogisticRegression(maxIter=10,
                                regParam=0.01,
                                featuresCol=featuresCreator.getOutputCol(),
                                labelCol='INFANT_ALIVE_AT_REPORT')
2.3 創(chuàng)建一個管道淳梦、擬合模型

在前面我們已經(jīng)創(chuàng)建了數(shù)據(jù)轉(zhuǎn)換器和評估器析砸,現(xiàn)在我們可以通過管道將他們串聯(lián)起來并方便的進(jìn)行模型擬合了。

# 創(chuàng)建一個管道
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[encoder, featuresCreator, logistic])

# 擬合模型
birth_train, birth_test = births.randomSplit([0.7,0.3],seed=123)

model = pipeline.fit(birth_train)
test_model = model.transform(birth_test)
2.4 評估模型

在前面我們將數(shù)據(jù)分為了兩部分并通過管道方便的對訓(xùn)練集進(jìn)行了擬合以及對測試集進(jìn)行了測試√房纾現(xiàn)在我們可以通過測試集的結(jié)果來對模型擬合效果進(jìn)行評估了干厚。

# 評估模型性能
import pyspark.ml.evaluation as ev

evaluator = ev.BinaryClassificationEvaluator(
    rawPredictionCol='probability',
    labelCol='INFANT_ALIVE_AT_REPORT'
)

print(evaluator.evaluate(test_model, {evaluator.metricName:'areaUnderROC'}))
print(evaluator.evaluate(test_model, {evaluator.metricName:'areaUnderPR'}))

0.7187355793173213
0.6819691176245866
2.5 保存模型

PySpark不僅允許保存訓(xùn)練好的模型李滴,還可以保存管道結(jié)構(gòu)及所有轉(zhuǎn)換器和評估器的定義螃宙。

# 保存模型pipeline
pipelinePath = './infant_oneHotEncoder_Logistic_Pipeline'
pipeline.write().overwrite().save(pipelinePath)

# 重載模型pipeline
loadedPipeline = Pipeline.load(pipelinePath)
loadedPipeline.fit(birth_train).transform(birth_test).take(1)

[Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', ...]

# 保存模型
from pyspark.ml import PipelineModel

modelPath = './infant_oneHotEncoder_LogisticPipelineModel'
model.write().overwrite().save(modelPath)

# 載入模型
loadedPipelineModel = PipelineModel.load(modelPath)
test_reloadedModel = loadedPipelineModel.transform(birth_test)
test_reloadedModel.take(1)

[Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=12, ...]
2.6 超參調(diào)優(yōu)

我們的第一個模型幾乎不可能是最好的模型。利用超參調(diào)優(yōu)能幫我們找到模型的最佳參數(shù)所坯,如邏輯回歸模型所需的最佳迭代次數(shù)或決策樹的最大深度谆扎。

在超參調(diào)優(yōu)時PySpark提供了兩種驗(yàn)證方法:K-Fold交叉驗(yàn)證和train-validation(相當(dāng)于1-Fold)交叉驗(yàn)證。

# 超參調(diào)優(yōu):grid search和train-validation splitting 

# 網(wǎng)格搜索
import pyspark.ml.tuning as tune

logistic = cl.LogisticRegression(labelCol='INFANT_ALIVE_AT_REPORT')
grid = tune.ParamGridBuilder()\
    .addGrid(logistic.maxIter, [5,10,50])\
    .addGrid(logistic.regParam, [0.01,0.05,0.3])\
    .build()

evaluator = ev.BinaryClassificationEvaluator(
    rawPredictionCol='probability',
    labelCol='INFANT_ALIVE_AT_REPORT'
)

# 使用K-Fold交叉驗(yàn)證評估各種參數(shù)的模型
cv = tune.CrossValidator(
    estimator=logistic,
    estimatorParamMaps=grid,
    evaluator=evaluator,
    numFolds=3
)

# 創(chuàng)建一個構(gòu)建特征的pipeline
pipeline = Pipeline(stages=[encoder, featuresCreator])
birth_train, birth_test = births.randomSplit([0.7,0.3],seed=123) # 重新打開數(shù)據(jù)進(jìn)行處理
data_transformer = pipeline.fit(birth_train)
data_test = data_transformer.transform(birth_test)


# cvModel 返回估計的最佳模型
cvModel = cv.fit(data_transformer.transform(birth_train))
results = cvModel.transform(data_test)

print(evaluator.evaluate(results, {evaluator.metricName:'areaUnderROC'}))
print(evaluator.evaluate(results, {evaluator.metricName:'areaUnderPR'}))

0.735848884034915
0.6959036715961695

使用下面的代碼可以查看模型最佳參數(shù):

# 查看最佳模型參數(shù)
param_maps = cvModel.getEstimatorParamMaps()
eval_metrics = cvModel.avgMetrics

param_res = []

for params, metric in zip(param_maps, eval_metrics):
    param_metric = {}
    for key, param_val in zip(params.keys(), params.values()):
        param_metric[key.name]=param_val
    param_res.append((param_metric, metric))

sorted(param_res, key=lambda x:x[1], reverse=True)

[({'maxIter': 50, 'regParam': 0.01}, 0.7406291618177623),
 ({'maxIter': 10, 'regParam': 0.01}, 0.735580969909943),
 ({'maxIter': 50, 'regParam': 0.05}, 0.7355100622938429),
 ({'maxIter': 10, 'regParam': 0.05}, 0.7351586303619441),
 ({'maxIter': 10, 'regParam': 0.3}, 0.7248698034708339),
 ({'maxIter': 50, 'regParam': 0.3}, 0.7214679272915997),
 ({'maxIter': 5, 'regParam': 0.3}, 0.7180255703028883),
 ({'maxIter': 5, 'regParam': 0.01}, 0.7179304617840288),
 ({'maxIter': 5, 'regParam': 0.05}, 0.7173397593133481)]

上面使用的使用K-Fold來進(jìn)行超參調(diào)優(yōu)芹助,K-Fold交叉驗(yàn)證往往非常耗時堂湖,使用1-Fold的交叉驗(yàn)證(即將數(shù)據(jù)集按比例分為訓(xùn)練集合驗(yàn)證集)能大大縮短時間。

# Train-validation劃分

# 使用卡方檢驗(yàn)選擇特征
selector = ft.ChiSqSelector(
    numTopFeatures=5,
    featuresCol=featuresCreator.getOutputCol(),
    outputCol='selectedFeatures',
    labelCol='INFANT_ALIVE_AT_REPORT'
)

logistic = cl.LogisticRegression(labelCol='INFANT_ALIVE_AT_REPORT',
                                featuresCol='selectedFeatures')

pipeline = Pipeline(stages=[encoder, featuresCreator, selector])
data_transformer = pipeline.fit(birth_train)

tvs = tune.TrainValidationSplit(estimator=logistic,
                               estimatorParamMaps=grid,
                               evaluator=evaluator,
                                trainRatio=0.75
                               )

tvsModel = tvs.fit(data_transformer.transform(birth_train))
data_test = data_transformer.transform(birth_test)
results = tvsModel.transform(data_test)

print(evaluator.evaluate(results, {evaluator.metricName:'areaUnderROC'}))
print(evaluator.evaluate(results, {evaluator.metricName:'areaUnderPR'}))

0.6111344483529891
0.5735913338089571

3状土、使用PySpark ML的其他功能

在上面我們完整的介紹了利用pyspark ml庫來進(jìn)行建模的過程无蜂。下面我們介紹一些其他常用的功能。

3.1 特征提取
3.1.1 NLP相關(guān)特征提取

如第一部分所述蒙谓,NGram模型采用標(biāo)記文本的列表斥季,并生成單詞對(或n-gram)。
本例中累驮,我們從pyspark的文檔中摘錄一段酣倾,并介紹如何在將文本傳遞給NGram模型之前進(jìn)行清理。

# NLP相關(guān)特征提劝ā(NGram模型采用標(biāo)記文本的列表躁锡,并生成單詞對或n-gram)

text_data = spark.createDataFrame([
    ['''K-fold cross validation performs model selection by splitting the dataset into a set of non-overlapping 
    randomly partitioned folds which are used as separate training and test datasets e.g., with k=3 folds, 
    K-fold cross validation will generate 3 (training, test) dataset pairs, each of which uses 2/3 of the data 
    for training and 1/3 for testing. Each fold is used as the test set exactly once.'''],
    ['''CrossValidatorModel contains the model with the highest average cross-validation metric across folds and
    uses this model to transform input data. CrossValidatorModel also tracks the metrics for each param map 
    evaluated.'''],
    ['''Creates a copy of this instance with a randomly generated uid and some extra params. This copies the 
    underlying bestModel, creates a deep copy of the embedded paramMap, and copies the embedded and extra 
    parameters over.''']
], ['input'])

# 將文本拆分成單詞
tokenizer = ft.RegexTokenizer(inputCol='input',
                              outputCol='input_arr',
                              pattern='\s+|[,.\"]')

# 刪掉停用詞
stopwords = ft.StopWordsRemover(inputCol=tokenizer.getOutputCol(),
                               outputCol='input_stop')
# 生成ngram詞對
ngram = ft.NGram(n=2,
                inputCol=stopwords.getOutputCol(),
                outputCol='nGrams')

# 構(gòu)建特征pipeline
pipeline = Pipeline(stages=[tokenizer, stopwords, ngram])

data_ngram = pipeline\
    .fit(text_data)\
    .transform(text_data)

data_ngram.show()

+--------------------+--------------------+--------------------+--------------------+
|               input|           input_arr|          input_stop|              nGrams|
+--------------------+--------------------+--------------------+--------------------+
|K-fold cross vali...|[k-fold, cross, v...|[k-fold, cross, v...|[k-fold cross, cr...|
|CrossValidatorMod...|[crossvalidatormo...|[crossvalidatormo...|[crossvalidatormo...|
|Creates a copy of...|[creates, a, copy...|[creates, copy, i...|[creates copy, co...|
+--------------------+--------------------+--------------------+--------------------+
3.1.2 離散連續(xù)變量

我們常常需要處理高度非線性連續(xù)特征,很難只用一個系數(shù)來供給模型置侍。這種情況下映之,可能難以用一個系數(shù)來解釋這樣的特征與目標(biāo)之間的關(guān)系拦焚。有時候,將值劃分成分類級別是很有用的惕医。

# 離散連續(xù)變量
x = np.arange(0, 100)
x = (x/100.0)*np.pi*4
y = x*np.sin(x/1.764)+20.1234

schema = typ.StructType([typ.StructField('continuous_var', typ.DoubleType(), nullable=False)])
data = spark.createDataFrame([[float(e)] for e in y], schema=schema)
data.show(4)
+------------------+
|    continuous_var|
+------------------+
|           20.1234|
|20.132344452369832|
|20.159087064491775|
|20.203356291885854|
+------------------+


# 使用QuantileDiscretizer模型將連續(xù)變量分為五個分類級別
discretizer = ft.QuantileDiscretizer(
    numBuckets=5, 
    inputCol='continuous_var',
    outputCol='discritized'
)

data_discretized = discretizer.fit(data).transform(data)
data_discretized.show(5,truncate=False)
+------------------+-----------+
|continuous_var    |discritized|
+------------------+-----------+
|20.1234           |2.0        |
|20.132344452369832|2.0        |
|20.159087064491775|2.0        |
|20.203356291885854|2.0        |
|20.26470185735763 |2.0        |
+------------------+-----------+
3.1.3 標(biāo)準(zhǔn)化連續(xù)變量

標(biāo)準(zhǔn)化連續(xù)變量不僅有助于更好地理解特征之間的關(guān)系耕漱,而且還有助于計算效率,并防止運(yùn)行到某些數(shù)字陷阱抬伺。

# 標(biāo)準(zhǔn)化連續(xù)變量
# 首先螟够,要創(chuàng)建一個向量代表連續(xù)變量(因?yàn)樗皇且粋€float)
vectorizer = ft.VectorAssembler(inputCols=['continuous_var'],
                               outputCol='continuous_vec')

normlizer = ft.StandardScaler(inputCol=vectorizer.getOutputCol(),
                             outputCol='normlized',
                             withMean=True,
                             withStd=True)

pipeline = Pipeline(stages=[vectorizer, normlizer])
data_standardized = pipeline.fit(data).transform(data)
data_standardized.show(4)
+------------------+--------------------+--------------------+
|    continuous_var|      continuous_vec|           normlized|
+------------------+--------------------+--------------------+
|           20.1234|           [20.1234]|[0.23429139554502...|
|20.132344452369832|[20.132344452369832]|[0.23630959828688...|
|20.159087064491775|[20.159087064491775]|[0.24234373105179...|
|20.203356291885854|[20.203356291885854]|[0.25233252325644...|
+------------------+--------------------+--------------------+
3.2 聚類

在前面的例子中我們介紹了如何使用pyspark ml庫來擬合分類模型,在本節(jié)我們將簡單介紹pyspark ml庫中的聚類模型峡钓。

聚類是機(jī)器學(xué)習(xí)中的另一個重要組成部分:通常在現(xiàn)實(shí)世界中妓笙,我們沒有那么幸運(yùn)具有目標(biāo)特征,所以需要回到一個無監(jiān)督的學(xué)習(xí)范例能岩,來試圖從中發(fā)掘數(shù)據(jù)內(nèi)的模式寞宫。

# 聚類
# 使用Kmeans模型在出生數(shù)據(jù)中查找相似性
import pyspark.ml.clustering as clus

kmeans = clus.KMeans(k=5, featuresCol='features')
pipeline = Pipeline(stages=[encoder, featuresCreator, kmeans])
model = pipeline.fit(birth_train)

test = model.transform(birth_test)
test.groupby('prediction')\
    .agg({'*':'count',
         'MOTHER_HEIGHT_IN':'avg'})\
    .collect()

[Row(prediction=1, avg(MOTHER_HEIGHT_IN)=67.43708609271523, count(1)=453),
 Row(prediction=3, avg(MOTHER_HEIGHT_IN)=67.65714285714286, count(1)=245),
 Row(prediction=4, avg(MOTHER_HEIGHT_IN)=63.92423385728825, count(1)=8843),
 Row(prediction=2, avg(MOTHER_HEIGHT_IN)=84.97315436241611, count(1)=447),
 Row(prediction=0, avg(MOTHER_HEIGHT_IN)=65.45034965034965, count(1)=3575)]
3.3 回歸

上面已經(jīng)介紹過了分類和聚類模型,最后我們再來簡單介紹一下回歸模型拉鹃。
在本節(jié)中辈赋,我們將嘗試用給定的一些特征來預(yù)測MOTHER_WEIGHT_GAIN。

# 回歸
# 使用梯度提升決策樹來預(yù)測增加的體重
import pyspark.ml.regression as reg

features = ['MOTHER_AGE_YEARS', 'MOTHER_HEIGHT_IN', 'MOTHER_PRE_WEIGHT',
            'DIABETES_PRE', 'DIABETES_GEST', 'HYP_TENS_PRE', 'HYP_TENS_GEST',
            'PREV_BIRTH_PRETERM', 'CIG_BEFORE', 'CIG_1_TRI', 'CIG_2_TRI',
            'CIG_3_TRI']

featuresCreator = ft.VectorAssembler(
    inputCols=[col for col in features[1:]],
    outputCol='features'
)

# 這里使用卡方檢驗(yàn)選擇前六個最重要的特征
selector = ft.ChiSqSelector(numTopFeatures=6,
                            outputCol='selectedFeatures',
                            labelCol='MOTHER_WEIGHT_GAIN')

regressor = reg.GBTRegressor(maxIter=15,
                            maxDepth=3,
                            labelCol='MOTHER_WEIGHT_GAIN')

pipeline = Pipeline(stages=[featuresCreator, selector, regressor])
weight_gain = pipeline.fit(birth_train)

# 測試集評估
evaluator = ev.RegressionEvaluator(predictionCol='prediction',labelCol='MOTHER_WEIGHT_GAIN')
print(evaluator.evaluate(weight_gain.transform(birth_test),
                        {evaluator.metricName:'r2'}))

0.49363823556949404

雖然模型結(jié)果不太好膏燕,但是我們的重點(diǎn)不在這里钥屈,我們只是為了了解pyspark ml庫中回歸模型的用法。

參考:

《pyspark 實(shí)戰(zhàn)指南:利用python和spark構(gòu)建數(shù)據(jù)密集型應(yīng)用并規(guī)陌颖瑁化部署》

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末篷就,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子近忙,更是在濱河造成了極大的恐慌竭业,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,826評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件及舍,死亡現(xiàn)場離奇詭異未辆,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)锯玛,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,968評論 3 395
  • 文/潘曉璐 我一進(jìn)店門咐柜,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人更振,你說我怎么就攤上這事炕桨。” “怎么了肯腕?”我有些...
    開封第一講書人閱讀 164,234評論 0 354
  • 文/不壞的土叔 我叫張陵献宫,是天一觀的道長。 經(jīng)常有香客問我实撒,道長姊途,這世上最難降的妖魔是什么涉瘾? 我笑而不...
    開封第一講書人閱讀 58,562評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮捷兰,結(jié)果婚禮上立叛,老公的妹妹穿的比我還像新娘。我一直安慰自己贡茅,他們只是感情好秘蛇,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,611評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著顶考,像睡著了一般赁还。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上驹沿,一...
    開封第一講書人閱讀 51,482評論 1 302
  • 那天艘策,我揣著相機(jī)與錄音,去河邊找鬼渊季。 笑死朋蔫,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的却汉。 我是一名探鬼主播驯妄,決...
    沈念sama閱讀 40,271評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼病涨!你這毒婦竟也來了富玷?” 一聲冷哼從身側(cè)響起璧坟,我...
    開封第一講書人閱讀 39,166評論 0 276
  • 序言:老撾萬榮一對情侶失蹤既穆,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后雀鹃,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體幻工,經(jīng)...
    沈念sama閱讀 45,608評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,814評論 3 336
  • 正文 我和宋清朗相戀三年黎茎,在試婚紗的時候發(fā)現(xiàn)自己被綠了囊颅。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,926評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡傅瞻,死狀恐怖踢代,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情嗅骄,我是刑警寧澤胳挎,帶...
    沈念sama閱讀 35,644評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站溺森,受9級特大地震影響慕爬,放射性物質(zhì)發(fā)生泄漏窑眯。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,249評論 3 329
  • 文/蒙蒙 一医窿、第九天 我趴在偏房一處隱蔽的房頂上張望磅甩。 院中可真熱鬧,春花似錦姥卢、人聲如沸卷要。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,866評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽却妨。三九已至,卻和暖如春括眠,著一層夾襖步出監(jiān)牢的瞬間彪标,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,991評論 1 269
  • 我被黑心中介騙來泰國打工掷豺, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留捞烟,地道東北人。 一個月前我還...
    沈念sama閱讀 48,063評論 3 370
  • 正文 我出身青樓当船,卻偏偏與公主長得像题画,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子德频,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,871評論 2 354

推薦閱讀更多精彩內(nèi)容