Spark機器學(xué)習(xí)庫(MLlib)

概觀

sparklyr為Spark的分布式機器學(xué)習(xí)庫提供綁定鸣剪。特別是零蓉,sparklyr允許訪問spark.ml包提供的機器學(xué)習(xí)例程卿嘲。與sparklyr的dplyr界面一起, 可以輕松地在Spark上創(chuàng)建和調(diào)整機器學(xué)習(xí)工作流程,完全在R中編排丹壕。
sparklyr提供了三個功能系列庆械,可以與Spark機器學(xué)習(xí)一起使用:

  • 用于分析數(shù)據(jù)的機器學(xué)習(xí)算法(ml_*)
  • 用于處理各個特征的特征變換器(ft_*)
  • 用于操作Spark DataFrames(sdf_*)的函數(shù)

使用sparklyr的分析工作流程可能包含以下幾個階段。有關(guān)示例菌赖,請參閱示例工作流程缭乘。

  1. 通過sparklyr dplyr接口執(zhí)行SQL查詢,
  2. 使用函數(shù)sdf_和ft_函數(shù)系列來生成新列或?qū)?shù)據(jù)集進行分區(qū)琉用,
  3. 從ml_*函數(shù)族中選擇合適的機器學(xué)習(xí)算法來建模數(shù)據(jù)堕绩,
  4. 檢查模型擬合的質(zhì)量,并使用它來預(yù)測新數(shù)據(jù)邑时。
  5. 收集結(jié)果以便在R中進行可視化和進一步分析

算法

可以通過一ml_*組函數(shù)從sparklyr訪問Spark的機器學(xué)習(xí)庫:

  • ml_kmeans K-Means聚類
  • ml_linear_regression 線性回歸
  • ml_logistic_regression Logistic回歸
  • ml_survival_regression 生存回歸
  • ml_generalized_linear_regression 廣義線性回歸
  • ml_decision_tree 決策樹
  • ml_random_forest 隨機森林
  • ml_gradient_boosted_trees 漸變 - 樹木
  • ml_pca 主成分分析
  • ml_naive_bayes 樸素貝葉斯
  • ml_multilayer_perceptron 多層感知器
  • ml_lda 潛在的Dirichlet分配
  • ml_one_vs_rest 一對陣休息

函數(shù)公式

該ml_*函數(shù)接受的參數(shù)response和features奴紧。但features也可以是具有主效應(yīng)的公式(它目前不接受交互術(shù)語)。截取項可以通過使用省略-1晶丘。
就是這兩種公式展現(xiàn)的方式

ml_linear_regression(z ~ -1 + x + y)
ml_linear_regression(intercept = FALSE, response = "z", features = c("x", "y"))

選項

可以使用ml_options函數(shù)中的參數(shù)修改Spark模型輸出ml_*黍氮。這ml_options是專家調(diào)整模型輸出的唯一界面。例如浅浮,model.transform可以在執(zhí)行擬合之前用于改變Spark模型對象沫浆。

轉(zhuǎn)換

Spark提供了特征變換器,促進了Spark DataFrame中數(shù)據(jù)的許多常見轉(zhuǎn)換滚秩,并且Sparklyr在ft_*函數(shù)族中公開了這些變換专执。這些例程通常采用一個或多個輸入列,并生成一個新的輸出列郁油,形成為這些列的轉(zhuǎn)換本股。

  1. ft_binarizer 閾值數(shù)字特征為二進制(0/1)特征
  2. ft_bucketizer Bucketizer將一列連續(xù)特征轉(zhuǎn)換為一列特征桶
  3. ft_discrete_cosine_transform 將時域中的長度NN實值序列變換為頻域中的另一長度NN實值序列
  4. ft_elementwise_product 使用逐元素乘法將每個輸入向量乘以提供的權(quán)重向量。
  5. ft_index_to_string 將一列標簽索引映射回包含原始標簽作為字符串的列
  6. ft_quantile_discretizer 采用具有連續(xù)特征的列已艰,并輸出具有分箱分類特征的列
  7. sql_transformer 實現(xiàn)由SQL語句定義的轉(zhuǎn)換
  8. ft_string_indexer 將標簽的字符串列編碼為標簽索引列
  9. ft_vector_assembler 將給定的列列表合并到單個矢量列中

例子

采用數(shù)據(jù)集iris

library(sparklyr)
library(ggplot2)
library(dplyr)
sc <- spark_connect(master = "local")
iris_tbl <- copy_to(sc, iris, "iris", overwrite = TRUE)
iris_tbt

K-MEANS聚類

使用Spark的K-means聚類將數(shù)據(jù)集分組。K均值聚類將分區(qū)指向成k組蚕苇,使得從點到指定的聚類中心的平方和最小化哩掺。

使用k-means聚類

kmeans_model <- iris_tbl %>%
  ml_kmeans(formula = Species~.,centers = 3)

這里需要注意的是,非監(jiān)督的模型如何寫公式

kmeans_model <- iris_tbl %>%
  select(Petal_Width, Petal_Length) %>%
  ml_kmeans(formula = ~.,centers = 3)
kmeans_model
K-means clustering with 3 clusters

Cluster centers:
  Petal_Width Petal_Length
1    1.359259     4.292593
2    0.246000     1.462000
3    2.047826     5.626087

Within Set Sum of Squared Errors =  31.41289

進行預(yù)測

predicted <- sdf_predict(kmeans_model, iris_tbl) %>%
  collect

查看聚類的標簽和真實的標簽之間的關(guān)系

table(predicted$Species, predicted$prediction)
         
              0  1  2
  setosa      0 50  0
  versicolor 48  0  2
  virginica   6  0 44

sdf_predict(kmeans_model) %>%
  collect() %>%
  ggplot(aes(Petal_Length, Petal_Width)) +
  geom_point(aes(Petal_Width, Petal_Length, col = factor(prediction + 1)),
             size = 2, alpha = 0.5) + 
  geom_point(data = kmeans_model$centers, aes(Petal_Width, Petal_Length),
             col = scales::muted(c("red", "green", "blue")),
             pch = 'x', size = 12) +
  scale_color_discrete(name = "Predicted Cluster",
                       labels = paste("Cluster", 1:3)) +
  labs(
    x = "Petal Length",
    y = "Petal Width",
    title = "K-Means Clustering",
    subtitle = "Use Spark.ML to predict cluster membership with the iris dataset."
  )

image.png

線性回歸

建立線性回歸模型

lm_model <- iris_tbl %>%
  select(Petal_Width, Petal_Length) %>%
  ml_linear_regression(Petal_Length ~ Petal_Width)

lm_model
Formula: Petal_Length ~ Petal_Width

Coefficients:
(Intercept) Petal_Width 
   1.083558    2.229940 
iris_tbl %>%
  select(Petal_Width, Petal_Length) %>%
  collect %>%
  ggplot(aes(Petal_Length, Petal_Width)) +
  geom_point(aes(Petal_Width, Petal_Length), size = 2, alpha = 0.5) +
  geom_abline(aes(slope = coef(lm_model)[["Petal_Width"]],
                  intercept = coef(lm_model)[["(Intercept)"]]),
              color = "red") +
  labs(
    x = "Petal Width",
    y = "Petal Length",
    title = "Linear Regression: Petal Length ~ Petal Width",
    subtitle = "Use Spark.ML linear regression to predict petal length as a function of petal width."
  )

image.png

邏輯回歸

使用Spark的邏輯回歸來執(zhí)行邏輯回歸涩笤,將二元結(jié)果建模為一個或多個解釋變量的函數(shù)嚼吞。

數(shù)據(jù)準備

beaver <- beaver2
beaver$activ <- factor(beaver$activ, labels = c("Non-Active", "Active"))
copy_to(sc, beaver, "beaver")


beaver_tbl <- tbl(sc, "beaver")

建立回歸模型

glm_model <- beaver_tbl %>%
  mutate(binary_response = as.numeric(activ == "Active")) %>%
  ml_logistic_regression(binary_response ~ temp)


glm_model <- beaver_tbl %>%
  ml_logistic_regression(activ ~ temp)

> glm_model
Formula: activ ~ temp

Coefficients:
(Intercept)        temp 
  550.52331   -14.69184 

pre <- sdf_predict(glm_model) %>% collect()
pre
# A tibble: 100 x 12
     day  time  temp activ      features  label rawPrediction probability
   <dbl> <dbl> <dbl> <chr>      <list>    <dbl> <list>        <list>     
 1   307   930  36.6 Non-Active <dbl [1]>  1.00 <dbl [2]>     <dbl [2]>  
 2   307   940  36.7 Non-Active <dbl [1]>  1.00 <dbl [2]>     <dbl [2]>  
 3   307   950  36.9 Non-Active <dbl [1]>  1.00 <dbl [2]>     <dbl [2]>  
 4   307  1000  37.2 Non-Active <dbl [1]>  1.00 <dbl [2]>     <dbl [2]>  
 5   307  1010  37.2 Non-Active <dbl [1]>  1.00 <dbl [2]>     <dbl [2]>  
 6   307  1020  37.2 Non-Active <dbl [1]>  1.00 <dbl [2]>     <dbl [2]>  
 7   307  1030  37.2 Non-Active <dbl [1]>  1.00 <dbl [2]>     <dbl [2]>  
 8   307  1040  36.9 Non-Active <dbl [1]>  1.00 <dbl [2]>     <dbl [2]>  
 9   307  1050  37.0 Non-Active <dbl [1]>  1.00 <dbl [2]>     <dbl [2]>  
10   307  1100  36.9 Non-Active <dbl [1]>  1.00 <dbl [2]>     <dbl [2]>  
# ... with 90 more rows, and 4 more variables: prediction <dbl>,
#   predicted_label <chr>, probability_0 <dbl>, probability_1 <dbl>
> 

PCA

使用Spark的主成分分析(PCA)來降低維數(shù)。PCA是一種統(tǒng)計方法蹬碧,用于查找旋轉(zhuǎn)舱禽,使得第一個坐標具有可能的最大方差,并且每個后續(xù)坐標又具有可能的最大方差
建立PCA模型

pca_model <- tbl(sc, "iris") %>%
  select(-Species) %>%
  ml_pca()


print(pca_model)
Explained variance:

        PC1         PC2         PC3         PC4 
0.924618723 0.053066483 0.017102610 0.005212184 

Rotation:
                     PC1         PC2         PC3        PC4
Sepal_Length -0.36138659 -0.65658877  0.58202985  0.3154872
Sepal_Width   0.08452251 -0.73016143 -0.59791083 -0.3197231
Petal_Length -0.85667061  0.17337266 -0.07623608 -0.4798390
Petal_Width  -0.35828920  0.07548102 -0.54583143  0.7536574

隨機森林

使用隨機森林進行二分類或者多分類

rf_model <- iris_tbl %>%
  ml_random_forest(Species ~ Petal_Length + Petal_Width, type = "classification",num_trees = 500)



rf_predict <- sdf_predict(rf_model, iris_tbl) %>%
  ft_string_indexer("Species", "Species_idx") %>%
  collect

table(rf_predict$Species_idx, rf_predict$prediction)
     0  1  2
  0 49  1  0
  1  0 50  0
  2  0  0 50

數(shù)據(jù)集合劃分

將Spark DataFrame拆分為訓(xùn)練恩沽,測試數(shù)據(jù)集誊稚。

劃分數(shù)據(jù)集合

partitions <- tbl(sc, "iris") %>%
  sdf_partition(training = 0.75, test = 0.25, seed = 1099)

構(gòu)建線性回歸模型

fit <- partitions$training %>%
  ml_linear_regression(Petal_Length ~ Petal_Width)

評價模型的結(jié)果

estimate_mse <- function(df){
  sdf_predict(fit, df) %>%
    mutate(resid = Petal_Length - prediction) %>%
    summarize(mse = mean(resid ^ 2)) %>%
    collect
}
sapply(partitions, estimate_mse)

字符串索引

使用ft_string_indexer和ft_index_to_string將字符列轉(zhuǎn)換為數(shù)字列,然后再將其轉(zhuǎn)換回來。

ft_string2idx <- iris_tbl %>%
  ft_string_indexer("Species", "Species_idx") %>%
  ft_index_to_string("Species_idx", "Species_remap") %>%
  collect

table(ft_string2idx$Species, ft_string2idx$Species_remap)
        setosa versicolor virginica
  setosa         50          0         0
  versicolor      0         50         0
  virginica       0          0        50

SDF轉(zhuǎn)換

ft_string2idx <- iris_tbl %>%
  sdf_mutate(Species_idx = ft_string_indexer(Species)) %>%
  sdf_mutate(Species_remap = ft_index_to_string(Species_idx)) %>%
  collect

ft_string2idx %>%
  select(Species, Species_idx, Species_remap) %>%
  distinct

簡單的例子

讓我們通過一個簡單的例子來演示在R中使用Spark的機器學(xué)習(xí)算法里伯。我們將使用ml_linear_regression來擬合線性回歸模型城瞎。使用內(nèi)置mtcars數(shù)據(jù)集,我們將嘗試根據(jù)車輛的mpg重量(wt)和發(fā)動機所包含的氣缸數(shù)()來預(yù)測汽車的油耗(cyl)疾瓮。

首先脖镀,我們將mtcars數(shù)據(jù)集復(fù)制到Spark中。

mtcars_tbl <- copy_to(sc, mtcars, "mtcars")

使用Spark SQL狼电,功能轉(zhuǎn)換器和DataFrame函數(shù)轉(zhuǎn)換數(shù)據(jù)蜒灰。

使用Spark SQL刪除馬力小于100的所有汽車
使用Spark功能變換器將汽車分成兩組,基于汽缸
使用Spark DataFrame函數(shù)將數(shù)據(jù)分區(qū)為測試和培訓(xùn)
然后使用spark ML擬合線性模型肩碟。將MPG作為重量和氣缸的函數(shù)强窖。

partitions <- mtcars_tbl %>%
  filter(hp >= 100) %>%
  sdf_mutate(cyl8 = ft_bucketizer(cyl, c(0,8,12))) %>%
  sdf_partition(training = 0.5, test = 0.5, seed = 888)



fit <- partitions$training %>%
  ml_linear_regression(mpg ~ wt + cyl)


summary(fit)
 
Deviance Residuals:
    Min      1Q  Median      3Q     Max 
-2.0947 -1.2747 -0.1129  1.0876  2.2185 

Coefficients:
(Intercept)          wt         cyl 
  33.795576   -1.596247   -1.580360 

R-Squared: 0.8267
Root Mean Squared Error: 1.437

這summary()表明我們的模型非常合適,并且汽車重量以及發(fā)動機中的汽缸數(shù)量都將成為其平均油耗的強大預(yù)測因素腾务。(該模型表明毕骡,平均而言,較重的汽車消耗的燃料更多岩瘦。)

讓我們使用我們的Spark模型擬合來預(yù)測我們的測試數(shù)據(jù)集的平均油耗未巫,并將預(yù)測的響應(yīng)與真實的測量燃料消耗進行比較。我們將構(gòu)建一個簡單的ggplot2圖启昧,使我們能夠檢查預(yù)測的質(zhì)量叙凡。

# Score the data
pred <- sdf_predict(fit, partitions$test) %>%
  collect

# Plot the predicted versus actual mpg
ggplot(pred, aes(x = mpg, y = prediction)) +
  geom_abline(lty = "dashed", col = "red") +
  geom_point() +
  theme(plot.title = element_text(hjust = 0.5)) +
  coord_fixed(ratio = 1) +
  labs(
    x = "Actual Fuel Consumption",
    y = "Predicted Fuel Consumption",
    title = "Predicted vs. Actual Fuel Consumption"
  )
image.png

雖然簡單,但我們的模型似乎在預(yù)測汽車的平均油耗方面做得相當(dāng)不錯密末。

我們可以輕松有效地將功能變換器握爷,機器學(xué)習(xí)算法和Spark DataFrame功能組合到Spark和R的完整分析中。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末严里,一起剝皮案震驚了整個濱河市新啼,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌刹碾,老刑警劉巖燥撞,帶你破解...
    沈念sama閱讀 211,194評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異迷帜,居然都是意外死亡物舒,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,058評論 2 385
  • 文/潘曉璐 我一進店門戏锹,熙熙樓的掌柜王于貴愁眉苦臉地迎上來冠胯,“玉大人,你說我怎么就攤上這事锦针≤欤” “怎么了置蜀?”我有些...
    開封第一講書人閱讀 156,780評論 0 346
  • 文/不壞的土叔 我叫張陵,是天一觀的道長割粮。 經(jīng)常有香客問我盾碗,道長,這世上最難降的妖魔是什么舀瓢? 我笑而不...
    開封第一講書人閱讀 56,388評論 1 283
  • 正文 為了忘掉前任廷雅,我火速辦了婚禮,結(jié)果婚禮上京髓,老公的妹妹穿的比我還像新娘航缀。我一直安慰自己,他們只是感情好堰怨,可當(dāng)我...
    茶點故事閱讀 65,430評論 5 384
  • 文/花漫 我一把揭開白布芥玉。 她就那樣靜靜地躺著,像睡著了一般备图。 火紅的嫁衣襯著肌膚如雪灿巧。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,764評論 1 290
  • 那天揽涮,我揣著相機與錄音抠藕,去河邊找鬼。 笑死蒋困,一個胖子當(dāng)著我的面吹牛盾似,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播雪标,決...
    沈念sama閱讀 38,907評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼村刨!你這毒婦竟也來了告抄?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,679評論 0 266
  • 序言:老撾萬榮一對情侶失蹤嵌牺,失蹤者是張志新(化名)和其女友劉穎打洼,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體髓梅,經(jīng)...
    沈念sama閱讀 44,122評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡拟蜻,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,459評論 2 325
  • 正文 我和宋清朗相戀三年绎签,在試婚紗的時候發(fā)現(xiàn)自己被綠了枯饿。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,605評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡诡必,死狀恐怖奢方,靈堂內(nèi)的尸體忽然破棺而出搔扁,到底是詐尸還是另有隱情,我是刑警寧澤蟋字,帶...
    沈念sama閱讀 34,270評論 4 329
  • 正文 年R本政府宣布稿蹲,位于F島的核電站,受9級特大地震影響鹊奖,放射性物質(zhì)發(fā)生泄漏苛聘。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,867評論 3 312
  • 文/蒙蒙 一忠聚、第九天 我趴在偏房一處隱蔽的房頂上張望设哗。 院中可真熱鬧,春花似錦两蟀、人聲如沸网梢。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,734評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽战虏。三九已至,卻和暖如春党涕,著一層夾襖步出監(jiān)牢的瞬間烦感,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,961評論 1 265
  • 我被黑心中介騙來泰國打工遣鼓, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留啸盏,地道東北人。 一個月前我還...
    沈念sama閱讀 46,297評論 2 360
  • 正文 我出身青樓骑祟,卻偏偏與公主長得像回懦,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子次企,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,472評論 2 348

推薦閱讀更多精彩內(nèi)容