概觀
sparklyr為Spark的分布式機器學(xué)習(xí)庫提供綁定鸣剪。特別是零蓉,sparklyr允許訪問spark.ml包提供的機器學(xué)習(xí)例程卿嘲。與sparklyr的dplyr界面一起, 可以輕松地在Spark上創(chuàng)建和調(diào)整機器學(xué)習(xí)工作流程,完全在R中編排丹壕。
sparklyr提供了三個功能系列庆械,可以與Spark機器學(xué)習(xí)一起使用:
- 用于分析數(shù)據(jù)的機器學(xué)習(xí)算法(ml_*)
- 用于處理各個特征的特征變換器(ft_*)
- 用于操作Spark DataFrames(sdf_*)的函數(shù)
使用sparklyr的分析工作流程可能包含以下幾個階段。有關(guān)示例菌赖,請參閱示例工作流程缭乘。
- 通過sparklyr dplyr接口執(zhí)行SQL查詢,
- 使用函數(shù)sdf_和ft_函數(shù)系列來生成新列或?qū)?shù)據(jù)集進行分區(qū)琉用,
- 從ml_*函數(shù)族中選擇合適的機器學(xué)習(xí)算法來建模數(shù)據(jù)堕绩,
- 檢查模型擬合的質(zhì)量,并使用它來預(yù)測新數(shù)據(jù)邑时。
- 收集結(jié)果以便在R中進行可視化和進一步分析
算法
可以通過一ml_*組函數(shù)從sparklyr訪問Spark的機器學(xué)習(xí)庫:
- ml_kmeans K-Means聚類
- ml_linear_regression 線性回歸
- ml_logistic_regression Logistic回歸
- ml_survival_regression 生存回歸
- ml_generalized_linear_regression 廣義線性回歸
- ml_decision_tree 決策樹
- ml_random_forest 隨機森林
- ml_gradient_boosted_trees 漸變 - 樹木
- ml_pca 主成分分析
- ml_naive_bayes 樸素貝葉斯
- ml_multilayer_perceptron 多層感知器
- ml_lda 潛在的Dirichlet分配
- ml_one_vs_rest 一對陣休息
函數(shù)公式
該ml_*函數(shù)接受的參數(shù)response和features奴紧。但features也可以是具有主效應(yīng)的公式(它目前不接受交互術(shù)語)。截取項可以通過使用省略-1晶丘。
就是這兩種公式展現(xiàn)的方式
ml_linear_regression(z ~ -1 + x + y)
ml_linear_regression(intercept = FALSE, response = "z", features = c("x", "y"))
選項
可以使用ml_options函數(shù)中的參數(shù)修改Spark模型輸出ml_*黍氮。這ml_options是專家調(diào)整模型輸出的唯一界面。例如浅浮,model.transform可以在執(zhí)行擬合之前用于改變Spark模型對象沫浆。
轉(zhuǎn)換
Spark提供了特征變換器,促進了Spark DataFrame中數(shù)據(jù)的許多常見轉(zhuǎn)換滚秩,并且Sparklyr在ft_*函數(shù)族中公開了這些變換专执。這些例程通常采用一個或多個輸入列,并生成一個新的輸出列郁油,形成為這些列的轉(zhuǎn)換本股。
- ft_binarizer 閾值數(shù)字特征為二進制(0/1)特征
- ft_bucketizer Bucketizer將一列連續(xù)特征轉(zhuǎn)換為一列特征桶
- ft_discrete_cosine_transform 將時域中的長度NN實值序列變換為頻域中的另一長度NN實值序列
- ft_elementwise_product 使用逐元素乘法將每個輸入向量乘以提供的權(quán)重向量。
- ft_index_to_string 將一列標簽索引映射回包含原始標簽作為字符串的列
- ft_quantile_discretizer 采用具有連續(xù)特征的列已艰,并輸出具有分箱分類特征的列
- sql_transformer 實現(xiàn)由SQL語句定義的轉(zhuǎn)換
- ft_string_indexer 將標簽的字符串列編碼為標簽索引列
- ft_vector_assembler 將給定的列列表合并到單個矢量列中
例子
采用數(shù)據(jù)集iris
library(sparklyr)
library(ggplot2)
library(dplyr)
sc <- spark_connect(master = "local")
iris_tbl <- copy_to(sc, iris, "iris", overwrite = TRUE)
iris_tbt
K-MEANS聚類
使用Spark的K-means聚類將數(shù)據(jù)集分組。K均值聚類將分區(qū)指向成k組蚕苇,使得從點到指定的聚類中心的平方和最小化哩掺。
使用k-means聚類
kmeans_model <- iris_tbl %>%
ml_kmeans(formula = Species~.,centers = 3)
這里需要注意的是,非監(jiān)督的模型如何寫公式
kmeans_model <- iris_tbl %>%
select(Petal_Width, Petal_Length) %>%
ml_kmeans(formula = ~.,centers = 3)
kmeans_model
K-means clustering with 3 clusters
Cluster centers:
Petal_Width Petal_Length
1 1.359259 4.292593
2 0.246000 1.462000
3 2.047826 5.626087
Within Set Sum of Squared Errors = 31.41289
進行預(yù)測
predicted <- sdf_predict(kmeans_model, iris_tbl) %>%
collect
查看聚類的標簽和真實的標簽之間的關(guān)系
table(predicted$Species, predicted$prediction)
0 1 2
setosa 0 50 0
versicolor 48 0 2
virginica 6 0 44
sdf_predict(kmeans_model) %>%
collect() %>%
ggplot(aes(Petal_Length, Petal_Width)) +
geom_point(aes(Petal_Width, Petal_Length, col = factor(prediction + 1)),
size = 2, alpha = 0.5) +
geom_point(data = kmeans_model$centers, aes(Petal_Width, Petal_Length),
col = scales::muted(c("red", "green", "blue")),
pch = 'x', size = 12) +
scale_color_discrete(name = "Predicted Cluster",
labels = paste("Cluster", 1:3)) +
labs(
x = "Petal Length",
y = "Petal Width",
title = "K-Means Clustering",
subtitle = "Use Spark.ML to predict cluster membership with the iris dataset."
)
線性回歸
建立線性回歸模型
lm_model <- iris_tbl %>%
select(Petal_Width, Petal_Length) %>%
ml_linear_regression(Petal_Length ~ Petal_Width)
lm_model
Formula: Petal_Length ~ Petal_Width
Coefficients:
(Intercept) Petal_Width
1.083558 2.229940
iris_tbl %>%
select(Petal_Width, Petal_Length) %>%
collect %>%
ggplot(aes(Petal_Length, Petal_Width)) +
geom_point(aes(Petal_Width, Petal_Length), size = 2, alpha = 0.5) +
geom_abline(aes(slope = coef(lm_model)[["Petal_Width"]],
intercept = coef(lm_model)[["(Intercept)"]]),
color = "red") +
labs(
x = "Petal Width",
y = "Petal Length",
title = "Linear Regression: Petal Length ~ Petal Width",
subtitle = "Use Spark.ML linear regression to predict petal length as a function of petal width."
)
邏輯回歸
使用Spark的邏輯回歸來執(zhí)行邏輯回歸涩笤,將二元結(jié)果建模為一個或多個解釋變量的函數(shù)嚼吞。
數(shù)據(jù)準備
beaver <- beaver2
beaver$activ <- factor(beaver$activ, labels = c("Non-Active", "Active"))
copy_to(sc, beaver, "beaver")
beaver_tbl <- tbl(sc, "beaver")
建立回歸模型
glm_model <- beaver_tbl %>%
mutate(binary_response = as.numeric(activ == "Active")) %>%
ml_logistic_regression(binary_response ~ temp)
glm_model <- beaver_tbl %>%
ml_logistic_regression(activ ~ temp)
> glm_model
Formula: activ ~ temp
Coefficients:
(Intercept) temp
550.52331 -14.69184
pre <- sdf_predict(glm_model) %>% collect()
pre
# A tibble: 100 x 12
day time temp activ features label rawPrediction probability
<dbl> <dbl> <dbl> <chr> <list> <dbl> <list> <list>
1 307 930 36.6 Non-Active <dbl [1]> 1.00 <dbl [2]> <dbl [2]>
2 307 940 36.7 Non-Active <dbl [1]> 1.00 <dbl [2]> <dbl [2]>
3 307 950 36.9 Non-Active <dbl [1]> 1.00 <dbl [2]> <dbl [2]>
4 307 1000 37.2 Non-Active <dbl [1]> 1.00 <dbl [2]> <dbl [2]>
5 307 1010 37.2 Non-Active <dbl [1]> 1.00 <dbl [2]> <dbl [2]>
6 307 1020 37.2 Non-Active <dbl [1]> 1.00 <dbl [2]> <dbl [2]>
7 307 1030 37.2 Non-Active <dbl [1]> 1.00 <dbl [2]> <dbl [2]>
8 307 1040 36.9 Non-Active <dbl [1]> 1.00 <dbl [2]> <dbl [2]>
9 307 1050 37.0 Non-Active <dbl [1]> 1.00 <dbl [2]> <dbl [2]>
10 307 1100 36.9 Non-Active <dbl [1]> 1.00 <dbl [2]> <dbl [2]>
# ... with 90 more rows, and 4 more variables: prediction <dbl>,
# predicted_label <chr>, probability_0 <dbl>, probability_1 <dbl>
>
PCA
使用Spark的主成分分析(PCA)來降低維數(shù)。PCA是一種統(tǒng)計方法蹬碧,用于查找旋轉(zhuǎn)舱禽,使得第一個坐標具有可能的最大方差,并且每個后續(xù)坐標又具有可能的最大方差
建立PCA模型
pca_model <- tbl(sc, "iris") %>%
select(-Species) %>%
ml_pca()
print(pca_model)
Explained variance:
PC1 PC2 PC3 PC4
0.924618723 0.053066483 0.017102610 0.005212184
Rotation:
PC1 PC2 PC3 PC4
Sepal_Length -0.36138659 -0.65658877 0.58202985 0.3154872
Sepal_Width 0.08452251 -0.73016143 -0.59791083 -0.3197231
Petal_Length -0.85667061 0.17337266 -0.07623608 -0.4798390
Petal_Width -0.35828920 0.07548102 -0.54583143 0.7536574
隨機森林
使用隨機森林進行二分類或者多分類
rf_model <- iris_tbl %>%
ml_random_forest(Species ~ Petal_Length + Petal_Width, type = "classification",num_trees = 500)
rf_predict <- sdf_predict(rf_model, iris_tbl) %>%
ft_string_indexer("Species", "Species_idx") %>%
collect
table(rf_predict$Species_idx, rf_predict$prediction)
0 1 2
0 49 1 0
1 0 50 0
2 0 0 50
數(shù)據(jù)集合劃分
將Spark DataFrame拆分為訓(xùn)練恩沽,測試數(shù)據(jù)集誊稚。
劃分數(shù)據(jù)集合
partitions <- tbl(sc, "iris") %>%
sdf_partition(training = 0.75, test = 0.25, seed = 1099)
構(gòu)建線性回歸模型
fit <- partitions$training %>%
ml_linear_regression(Petal_Length ~ Petal_Width)
評價模型的結(jié)果
estimate_mse <- function(df){
sdf_predict(fit, df) %>%
mutate(resid = Petal_Length - prediction) %>%
summarize(mse = mean(resid ^ 2)) %>%
collect
}
sapply(partitions, estimate_mse)
字符串索引
使用ft_string_indexer和ft_index_to_string將字符列轉(zhuǎn)換為數(shù)字列,然后再將其轉(zhuǎn)換回來。
ft_string2idx <- iris_tbl %>%
ft_string_indexer("Species", "Species_idx") %>%
ft_index_to_string("Species_idx", "Species_remap") %>%
collect
table(ft_string2idx$Species, ft_string2idx$Species_remap)
setosa versicolor virginica
setosa 50 0 0
versicolor 0 50 0
virginica 0 0 50
SDF轉(zhuǎn)換
ft_string2idx <- iris_tbl %>%
sdf_mutate(Species_idx = ft_string_indexer(Species)) %>%
sdf_mutate(Species_remap = ft_index_to_string(Species_idx)) %>%
collect
ft_string2idx %>%
select(Species, Species_idx, Species_remap) %>%
distinct
簡單的例子
讓我們通過一個簡單的例子來演示在R中使用Spark的機器學(xué)習(xí)算法里伯。我們將使用ml_linear_regression來擬合線性回歸模型城瞎。使用內(nèi)置mtcars數(shù)據(jù)集,我們將嘗試根據(jù)車輛的mpg重量(wt)和發(fā)動機所包含的氣缸數(shù)()來預(yù)測汽車的油耗(cyl)疾瓮。
首先脖镀,我們將mtcars數(shù)據(jù)集復(fù)制到Spark中。
mtcars_tbl <- copy_to(sc, mtcars, "mtcars")
使用Spark SQL狼电,功能轉(zhuǎn)換器和DataFrame函數(shù)轉(zhuǎn)換數(shù)據(jù)蜒灰。
使用Spark SQL刪除馬力小于100的所有汽車
使用Spark功能變換器將汽車分成兩組,基于汽缸
使用Spark DataFrame函數(shù)將數(shù)據(jù)分區(qū)為測試和培訓(xùn)
然后使用spark ML擬合線性模型肩碟。將MPG作為重量和氣缸的函數(shù)强窖。
partitions <- mtcars_tbl %>%
filter(hp >= 100) %>%
sdf_mutate(cyl8 = ft_bucketizer(cyl, c(0,8,12))) %>%
sdf_partition(training = 0.5, test = 0.5, seed = 888)
fit <- partitions$training %>%
ml_linear_regression(mpg ~ wt + cyl)
summary(fit)
Deviance Residuals:
Min 1Q Median 3Q Max
-2.0947 -1.2747 -0.1129 1.0876 2.2185
Coefficients:
(Intercept) wt cyl
33.795576 -1.596247 -1.580360
R-Squared: 0.8267
Root Mean Squared Error: 1.437
這summary()表明我們的模型非常合適,并且汽車重量以及發(fā)動機中的汽缸數(shù)量都將成為其平均油耗的強大預(yù)測因素腾务。(該模型表明毕骡,平均而言,較重的汽車消耗的燃料更多岩瘦。)
讓我們使用我們的Spark模型擬合來預(yù)測我們的測試數(shù)據(jù)集的平均油耗未巫,并將預(yù)測的響應(yīng)與真實的測量燃料消耗進行比較。我們將構(gòu)建一個簡單的ggplot2圖启昧,使我們能夠檢查預(yù)測的質(zhì)量叙凡。
# Score the data
pred <- sdf_predict(fit, partitions$test) %>%
collect
# Plot the predicted versus actual mpg
ggplot(pred, aes(x = mpg, y = prediction)) +
geom_abline(lty = "dashed", col = "red") +
geom_point() +
theme(plot.title = element_text(hjust = 0.5)) +
coord_fixed(ratio = 1) +
labs(
x = "Actual Fuel Consumption",
y = "Predicted Fuel Consumption",
title = "Predicted vs. Actual Fuel Consumption"
)
雖然簡單,但我們的模型似乎在預(yù)測汽車的平均油耗方面做得相當(dāng)不錯密末。
我們可以輕松有效地將功能變換器握爷,機器學(xué)習(xí)算法和Spark DataFrame功能組合到Spark和R的完整分析中。