Spark ML Pipelines

ML Pipelines 提供了一組統(tǒng)一的構(gòu)建在DataFrame上的高級API用于幫助用戶創(chuàng)建和調(diào)優(yōu)機器學(xué)習(xí)管道

ML Pipelines中的一些概念

MLlib標(biāo)準(zhǔn)化了機器學(xué)習(xí)算法的api晌姚,使多個算法更容易組合到一個單一的Pipeline或工作流中注服。

  • DataFrameML API使用 Spark SQL中的DataFrame作為ML的數(shù)據(jù)集
  • Transformer: Transformer是一種將DataFrame轉(zhuǎn)為另一個DataFrame的算法。比如一個ML 模型是一個將特征DataFrame轉(zhuǎn)為預(yù)測DataFrameTransformer
  • EstimatorEstimator是一個能適用于DataFrame并產(chǎn)生Transformer的算法臼隔。比如 學(xué)習(xí)算法是一種訓(xùn)練DataFrame并且產(chǎn)生一個模型的Estimator
  • PipelinePipeline用于鏈接多個EstimatorTransformer以形成一個完整的工作流
  • ParameterEstimatorTransformer的通用Parameter API

DataFrame

機器學(xué)習(xí)可以被用于各式各樣的數(shù)據(jù)類型,比如向量聚唐,文本沪么,圖片和結(jié)構(gòu)化數(shù)據(jù)。這些都可以使用DataFrame表示

Pipeline components

Transformers

Transformers是對特征轉(zhuǎn)化學(xué)習(xí)模型的抽象怒见。一般一個Transformer實現(xiàn)了 transform()方法用于將一個DataFrame轉(zhuǎn)化另一個DataFrame(一般是在原DataFrame上添加一些列實現(xiàn))俗慈。

  • 一個 feature transformer 接收一個DataFame,讀取一列(eg:text)遣耍,將其map為一個新的列(eg.,feature vectors)然后將新的列添加到DataFrame上作為輸出
  • 一個learning model接收一個DataFrame作為輸入闺阱,讀取包含feature vectors的列,為每個特征向量預(yù)測label舵变,讓后將預(yù)測的label作為新的列添加到輸出DataFrame
Estimators

Estimator是對學(xué)習(xí)算法和數(shù)據(jù)訓(xùn)練算法的抽象酣溃,一般一個Estimator實現(xiàn)了fit()方法瘦穆,它接收一個DataFrame并產(chǎn)生一個Model(Transformer)。比如LogisticRegression是一個Estimator,通過調(diào)用fit()訓(xùn)練出一個LogisticRegressionModel,這個Model是一個Transformer

Properties of pipeline components

目前
Transformer.transform()Estimator.fit()都是無狀態(tài)的
每個TransformerEstimator都有一個唯一的ID赊豌,方便調(diào)參

Pipeline

在機器學(xué)習(xí)對數(shù)據(jù)進(jìn)行處理和學(xué)習(xí)一般需要一系列的算法扛或,比如一個簡單的文本處理工作流可能包含如下幾個階段:

  • 將文本拆分為單詞
  • 將單詞轉(zhuǎn)為特征向量
  • 使用特征向量和標(biāo)簽進(jìn)行預(yù)測模型的學(xué)習(xí)

MLlib使用Pipeline表示這種工作流,它包含了一系列 以一定順序運行的PipelineStages(TransformerEstimator)

How it works

Pipeline 的每個階段由 TransformerEstimator構(gòu)成碘饼。這些階段按一定的順序運行熙兔,并且在每個階段都對輸入的DataFrame做轉(zhuǎn)化。對于Transformer階段艾恼,在DataFrame上調(diào)用transform() 方法住涉。對于Estimator階段,fit()方法被調(diào)用用于產(chǎn)生一個Transformer(which becomes part of the PipelineModel, or fitted Pipeline)

簡單的文本處理工作流在training timePipeline

image.png

簡單的文本處理工作流在test timePipeline
image.png

Code examples

Example: Estimator, Transformer, and Param

// $example on$
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.Row
// $example off$
import org.apache.spark.sql.SparkSession

object EstimatorTransformerParamExample {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder
      .appName("EstimatorTransformerParamExample")
      .master("local[16]")
      .getOrCreate()

    // $example on$
    // Prepare training data from a list of (label, features) tuples.
    val training = spark.createDataFrame(Seq(
      (1.0, Vectors.dense(0.0, 1.1, 0.1)),
      (0.0, Vectors.dense(2.0, 1.0, -1.0)),
      (0.0, Vectors.dense(2.0, 1.3, 1.0)),
      (1.0, Vectors.dense(0.0, 1.2, -0.5))
    )).toDF("label", "features")

    // Create a LogisticRegression instance. This instance is an Estimator.
    val lr = new LogisticRegression()
    // Print out the parameters, documentation, and any default values.
    println(s"LogisticRegression parameters:\n ${lr.explainParams()}\n")

    // We may set parameters using setter methods.
    lr.setMaxIter(10)
      .setRegParam(0.01)

    // Learn a LogisticRegression model. This uses the parameters stored in lr.
    val model1 = lr.fit(training)
    // Since model1 is a Model (i.e., a Transformer produced by an Estimator),
    // we can view the parameters it used during fit().
    // This prints the parameter (name: value) pairs, where names are unique IDs for this
    // LogisticRegression instance.
    println(s"Model 1 was fit using parameters: ${model1.parent.extractParamMap}")

    // We may alternatively specify parameters using a ParamMap,
    // which supports several methods for specifying parameters.
    val paramMap = ParamMap(lr.maxIter -> 20)
      .put(lr.maxIter, 30)  // Specify 1 Param. This overwrites the original maxIter.
      .put(lr.regParam -> 0.1, lr.threshold -> 0.55)  // Specify multiple Params.

    // One can also combine ParamMaps.
    val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability")  // Change output column name.
    val paramMapCombined = paramMap ++ paramMap2

    // Now learn a new model using the paramMapCombined parameters.
    // paramMapCombined overrides all parameters set earlier via lr.set* methods.
    val model2 = lr.fit(training, paramMapCombined)
    println(s"Model 2 was fit using parameters: ${model2.parent.extractParamMap}")

    // Prepare test data.
    val test = spark.createDataFrame(Seq(
      (1.0, Vectors.dense(-1.0, 1.5, 1.3)),
      (0.0, Vectors.dense(3.0, 2.0, -0.1)),
      (1.0, Vectors.dense(0.0, 2.2, -1.5))
    )).toDF("label", "features")

    // Make predictions on test data using the Transformer.transform() method.
    // LogisticRegression.transform will only use the 'features' column.
    // Note that model2.transform() outputs a 'myProbability' column instead of the usual
    // 'probability' column since we renamed the lr.probabilityCol parameter previously.
    model2.transform(test)
      .select("features", "label", "myProbability", "prediction")
      .collect()
      .foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>
        println(s"($features, $label) -> prob=$prob, prediction=$prediction")
      }
    // $example off$

    spark.stop()
  }
}
Example: Pipeline
// $example on$
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
// $example off$
import org.apache.spark.sql.SparkSession

object PipelineExample {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder
      .appName("PipelineExample")
      .master("local[16]")
      .getOrCreate()

    // $example on$
    // Prepare training documents from a list of (id, text, label) tuples.
    val training = spark.createDataFrame(Seq(
      (0L, "a b c d e spark", 1.0),
      (1L, "b d", 0.0),
      (2L, "spark f g h", 1.0),
      (3L, "hadoop mapreduce", 0.0)
    )).toDF("id", "text", "label")

    // Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
    val tokenizer = new Tokenizer()
      .setInputCol("text")
      .setOutputCol("words")
    val hashingTF = new HashingTF()
      .setNumFeatures(1000)
      .setInputCol(tokenizer.getOutputCol)
      .setOutputCol("features")
    val lr = new LogisticRegression()
      .setMaxIter(10)
      .setRegParam(0.001)
    val pipeline = new Pipeline()
      .setStages(Array(tokenizer, hashingTF, lr))

    // Fit the pipeline to training documents.
    val model = pipeline.fit(training)

    // Now we can optionally save the fitted pipeline to disk
    model.write.overwrite().save("/tmp/spark-logistic-regression-model")

    // We can also save this unfit pipeline to disk
    pipeline.write.overwrite().save("/tmp/unfit-lr-model")

    // And load it back in during production
    val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")

    // Prepare test documents, which are unlabeled (id, text) tuples.
    val test = spark.createDataFrame(Seq(
      (4L, "spark i j k"),
      (5L, "l m n"),
      (6L, "spark hadoop spark"),
      (7L, "apache hadoop")
    )).toDF("id", "text")

    // Make predictions on test documents.
    model.transform(test)
      .select("id", "text", "probability", "prediction")
      .collect()
      .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
        println(s"($id, $text) --> prob=$prob, prediction=$prediction")
      }
    // $example off$

    spark.stop()
  }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末钠绍,一起剝皮案震驚了整個濱河市湃交,隨后出現(xiàn)的幾起案子验残,更是在濱河造成了極大的恐慌镜遣,老刑警劉巖淘讥,帶你破解...
    沈念sama閱讀 216,919評論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異泻拦,居然都是意外死亡毙芜,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,567評論 3 392
  • 文/潘曉璐 我一進(jìn)店門争拐,熙熙樓的掌柜王于貴愁眉苦臉地迎上來腋粥,“玉大人,你說我怎么就攤上這事架曹“澹” “怎么了?”我有些...
    開封第一講書人閱讀 163,316評論 0 353
  • 文/不壞的土叔 我叫張陵绑雄,是天一觀的道長展辞。 經(jīng)常有香客問我,道長万牺,這世上最難降的妖魔是什么罗珍? 我笑而不...
    開封第一講書人閱讀 58,294評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮脚粟,結(jié)果婚禮上覆旱,老公的妹妹穿的比我還像新娘。我一直安慰自己核无,他們只是感情好扣唱,可當(dāng)我...
    茶點故事閱讀 67,318評論 6 390
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著,像睡著了一般噪沙。 火紅的嫁衣襯著肌膚如雪炼彪。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,245評論 1 299
  • 那天曲聂,我揣著相機與錄音霹购,去河邊找鬼佑惠。 笑死朋腋,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的膜楷。 我是一名探鬼主播旭咽,決...
    沈念sama閱讀 40,120評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼赌厅!你這毒婦竟也來了穷绵?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,964評論 0 275
  • 序言:老撾萬榮一對情侶失蹤特愿,失蹤者是張志新(化名)和其女友劉穎仲墨,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體揍障,經(jīng)...
    沈念sama閱讀 45,376評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡目养,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,592評論 2 333
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了毒嫡。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片癌蚁。...
    茶點故事閱讀 39,764評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖兜畸,靈堂內(nèi)的尸體忽然破棺而出努释,到底是詐尸還是另有隱情,我是刑警寧澤咬摇,帶...
    沈念sama閱讀 35,460評論 5 344
  • 正文 年R本政府宣布伐蒂,位于F島的核電站,受9級特大地震影響肛鹏,放射性物質(zhì)發(fā)生泄漏逸邦。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,070評論 3 327
  • 文/蒙蒙 一龄坪、第九天 我趴在偏房一處隱蔽的房頂上張望昭雌。 院中可真熱鬧,春花似錦健田、人聲如沸烛卧。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,697評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽总放。三九已至呈宇,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間局雄,已是汗流浹背甥啄。 一陣腳步聲響...
    開封第一講書人閱讀 32,846評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留炬搭,地道東北人蜈漓。 一個月前我還...
    沈念sama閱讀 47,819評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像宫盔,于是被迫代替她去往敵國和親融虽。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,665評論 2 354

推薦閱讀更多精彩內(nèi)容