ML Pipelines
提供了一組統(tǒng)一的構(gòu)建在DataFrame上
的高級API
用于幫助用戶創(chuàng)建和調(diào)優(yōu)機器學(xué)習(xí)管道
ML Pipelines中的一些概念
MLlib
標(biāo)準(zhǔn)化了機器學(xué)習(xí)算法的api晌姚,使多個算法更容易組合到一個單一的Pipeline
或工作流中注服。
-
DataFrame
:ML API
使用Spark SQL
中的DataFrame
作為ML
的數(shù)據(jù)集 -
Transformer
:Transformer
是一種將DataFrame
轉(zhuǎn)為另一個DataFrame
的算法。比如一個ML 模型是一個將特征DataFrame
轉(zhuǎn)為預(yù)測DataFrame
的Transformer
-
Estimator
:Estimator
是一個能適用于DataFrame
并產(chǎn)生Transformer
的算法臼隔。比如 學(xué)習(xí)算法是一種訓(xùn)練DataFrame
并且產(chǎn)生一個模型的Estimator
-
Pipeline
:Pipeline
用于鏈接多個Estimator
和Transformer
以形成一個完整的工作流 -
Parameter
:Estimator
和Transformer
的通用Parameter API
DataFrame
機器學(xué)習(xí)可以被用于各式各樣的數(shù)據(jù)類型,比如向量聚唐,文本沪么,圖片和結(jié)構(gòu)化數(shù)據(jù)。這些都可以使用DataFrame
表示
Pipeline components
Transformers
Transformers
是對特征轉(zhuǎn)化和學(xué)習(xí)模型的抽象怒见。一般一個Transformer
實現(xiàn)了 transform()
方法用于將一個DataFrame
轉(zhuǎn)化另一個DataFrame
(一般是在原DataFrame
上添加一些列實現(xiàn))俗慈。
- 一個
feature transformer
接收一個DataFame
,讀取一列(eg:text)遣耍,將其map為一個新的列(eg.,feature vectors)然后將新的列添加到DataFrame
上作為輸出 - 一個
learning model
接收一個DataFrame
作為輸入闺阱,讀取包含feature vectors
的列,為每個特征向量預(yù)測label舵变,讓后將預(yù)測的label作為新的列添加到輸出DataFrame
上
Estimators
Estimator
是對學(xué)習(xí)算法和數(shù)據(jù)訓(xùn)練算法的抽象酣溃,一般一個Estimator
實現(xiàn)了fit()
方法瘦穆,它接收一個DataFrame
并產(chǎn)生一個Model
(Transformer
)。比如LogisticRegression
是一個Estimator
,通過調(diào)用fit()訓(xùn)練出一個LogisticRegressionModel
,這個Model
是一個Transformer
Properties of pipeline components
目前
Transformer.transform()
和 Estimator.fit()
都是無狀態(tài)的
每個Transformer
和 Estimator
都有一個唯一的ID赊豌,方便調(diào)參
Pipeline
在機器學(xué)習(xí)對數(shù)據(jù)進(jìn)行處理和學(xué)習(xí)一般需要一系列的算法扛或,比如一個簡單的文本處理工作流可能包含如下幾個階段:
- 將文本拆分為單詞
- 將單詞轉(zhuǎn)為特征向量
- 使用特征向量和標(biāo)簽進(jìn)行預(yù)測模型的學(xué)習(xí)
MLlib
使用Pipeline
表示這種工作流,它包含了一系列 以一定順序運行的PipelineStages
(Transformer
或 Estimator
)
How it works
Pipeline
的每個階段由 Transformer
或Estimator
構(gòu)成碘饼。這些階段按一定的順序運行熙兔,并且在每個階段都對輸入的DataFrame
做轉(zhuǎn)化。對于Transformer
階段艾恼,在DataFrame
上調(diào)用transform()
方法住涉。對于Estimator
階段,fit()
方法被調(diào)用用于產(chǎn)生一個Transformer
(which becomes part of the PipelineModel, or fitted Pipeline)
簡單的文本處理工作流在training time
的Pipeline
簡單的文本處理工作流在
test time
的Pipeline
Code examples
Example: Estimator, Transformer, and Param
// $example on$
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.Row
// $example off$
import org.apache.spark.sql.SparkSession
object EstimatorTransformerParamExample {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("EstimatorTransformerParamExample")
.master("local[16]")
.getOrCreate()
// $example on$
// Prepare training data from a list of (label, features) tuples.
val training = spark.createDataFrame(Seq(
(1.0, Vectors.dense(0.0, 1.1, 0.1)),
(0.0, Vectors.dense(2.0, 1.0, -1.0)),
(0.0, Vectors.dense(2.0, 1.3, 1.0)),
(1.0, Vectors.dense(0.0, 1.2, -0.5))
)).toDF("label", "features")
// Create a LogisticRegression instance. This instance is an Estimator.
val lr = new LogisticRegression()
// Print out the parameters, documentation, and any default values.
println(s"LogisticRegression parameters:\n ${lr.explainParams()}\n")
// We may set parameters using setter methods.
lr.setMaxIter(10)
.setRegParam(0.01)
// Learn a LogisticRegression model. This uses the parameters stored in lr.
val model1 = lr.fit(training)
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this
// LogisticRegression instance.
println(s"Model 1 was fit using parameters: ${model1.parent.extractParamMap}")
// We may alternatively specify parameters using a ParamMap,
// which supports several methods for specifying parameters.
val paramMap = ParamMap(lr.maxIter -> 20)
.put(lr.maxIter, 30) // Specify 1 Param. This overwrites the original maxIter.
.put(lr.regParam -> 0.1, lr.threshold -> 0.55) // Specify multiple Params.
// One can also combine ParamMaps.
val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") // Change output column name.
val paramMapCombined = paramMap ++ paramMap2
// Now learn a new model using the paramMapCombined parameters.
// paramMapCombined overrides all parameters set earlier via lr.set* methods.
val model2 = lr.fit(training, paramMapCombined)
println(s"Model 2 was fit using parameters: ${model2.parent.extractParamMap}")
// Prepare test data.
val test = spark.createDataFrame(Seq(
(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
(0.0, Vectors.dense(3.0, 2.0, -0.1)),
(1.0, Vectors.dense(0.0, 2.2, -1.5))
)).toDF("label", "features")
// Make predictions on test data using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'myProbability' column instead of the usual
// 'probability' column since we renamed the lr.probabilityCol parameter previously.
model2.transform(test)
.select("features", "label", "myProbability", "prediction")
.collect()
.foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>
println(s"($features, $label) -> prob=$prob, prediction=$prediction")
}
// $example off$
spark.stop()
}
}
Example: Pipeline
// $example on$
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
// $example off$
import org.apache.spark.sql.SparkSession
object PipelineExample {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("PipelineExample")
.master("local[16]")
.getOrCreate()
// $example on$
// Prepare training documents from a list of (id, text, label) tuples.
val training = spark.createDataFrame(Seq(
(0L, "a b c d e spark", 1.0),
(1L, "b d", 0.0),
(2L, "spark f g h", 1.0),
(3L, "hadoop mapreduce", 0.0)
)).toDF("id", "text", "label")
// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
val tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words")
val hashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol)
.setOutputCol("features")
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.001)
val pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr))
// Fit the pipeline to training documents.
val model = pipeline.fit(training)
// Now we can optionally save the fitted pipeline to disk
model.write.overwrite().save("/tmp/spark-logistic-regression-model")
// We can also save this unfit pipeline to disk
pipeline.write.overwrite().save("/tmp/unfit-lr-model")
// And load it back in during production
val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")
// Prepare test documents, which are unlabeled (id, text) tuples.
val test = spark.createDataFrame(Seq(
(4L, "spark i j k"),
(5L, "l m n"),
(6L, "spark hadoop spark"),
(7L, "apache hadoop")
)).toDF("id", "text")
// Make predictions on test documents.
model.transform(test)
.select("id", "text", "probability", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
println(s"($id, $text) --> prob=$prob, prediction=$prediction")
}
// $example off$
spark.stop()
}
}