之前閱讀了Spark的ML API文檔侨赡,也看了里面介紹的example玩讳,正好之前自己寫過Logistic Regression的算法并預(yù)測(cè)了下Kaggle上的新手村任務(wù)之一:Titanic竭缝。所以這里也想用Spark MLlib自帶的LR算法也實(shí)踐下精耐,一是對(duì)比下各自的預(yù)測(cè)結(jié)果希太,二是實(shí)踐出真知克饶,讀API文檔不實(shí)踐乃憾事也。
第一步:特征工程
這個(gè)其實(shí)是解決一個(gè)實(shí)際大數(shù)據(jù)問題的重中之重誊辉,不過因?yàn)檫@個(gè)文檔重在實(shí)踐Spark彤路,所以特征工程我還是沿用之前自己寫的LR算法的時(shí)候的做法,簡(jiǎn)單的從Kaggle官網(wǎng)的訓(xùn)練集和測(cè)試集上生成能夠喂給算法的訓(xùn)練集以及對(duì)應(yīng)的測(cè)試集芥映。
下面是對(duì)數(shù)據(jù)的一個(gè)展示洲尊,方便接下來代碼展示時(shí)的理解:
數(shù)據(jù)集都是csv格式的远豺,首先是訓(xùn)練集(Survived是標(biāo)簽):
SibSp,Parch,Cabin_No,Cabin_Yes,Embarked_C,Embarked_Q,Embarked_S,Sex_female,Sex_male,Pclass_1,Pclass_2,Pclass_3,Age_scaled,Fare_scaled,Survived
1,0,1,0,0,0,1,0,1,0,0,1,-0.56136323207,-0.502445171436,0
1,0,0,1,1,0,0,1,0,1,0,0,0.613181832266,0.786845293588,1
0,0,1,0,0,0,1,1,0,0,0,1,-0.267726965986,-0.488854257585,1
1,0,0,1,0,0,1,1,0,1,0,0,0.392954632703,0.420730236069,1
0,0,1,0,0,0,1,0,1,0,0,1,0.392954632703,-0.486337421687,0
0,0,1,0,0,1,0,0,1,0,0,1,-0.427101530014,-0.478116428909,0
然后是測(cè)試集(沒有標(biāo)簽Survived):
SibSp,Parch,Cabin_No,Cabin_Yes,Embarked_C,Embarked_Q,Embarked_S,Sex_female,Sex_male,Pclass_1,Pclass_2,Pclass_3,Age_scaled,Fare_scaled
0,0,1,0,0,1,0,0,1,0,0,1,0.307534608854,-0.496637106488
1,0,1,0,0,0,1,1,0,0,0,1,1.25623006816,-0.511497104137
0,0,1,0,0,1,0,0,1,0,1,0,2.39466461933,-0.463334726327
0,0,1,0,0,0,1,0,1,0,0,1,-0.261682666729,-0.481703633213
1,1,1,0,0,0,1,1,0,0,0,1,-0.641160850452,-0.416740425935
第二步:用Spark MLlib自帶LR建模并預(yù)測(cè)
直接上代碼吧:
public class TitanicLogisticRegressionWithElasticNet {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("JavaLogisticRegressionWithElasticNetExample")
.getOrCreate();
// $example on$
// Load training data
Dataset<Row> training = spark.read().format("csv").option("header", true).option("inferSchema", true)
.load("/home/paul/share/mySparkJavaApiLearning/src/main/resources/kaggle/Titanic/gen_LR_train_data.csv");
// System.out.println("\n------- Read csv data:");
// training.printSchema();
// training.show(5, false);
String origStr = "SibSp,Parch,Cabin_No,Cabin_Yes,Embarked_C,Embarked_Q,Embarked_S,Sex_female,Sex_male,Pclass_1,Pclass_2,Pclass_3,Age_scaled,Fare_scaled";
String[] arrOrig = origStr.split(",");
VectorAssembler vectorAssem = new VectorAssembler()
.setInputCols(arrOrig).setOutputCol("features");
Dataset<Row> feaTrain = vectorAssem.transform(training);
// System.out.println("\n------- assembled out:");
// feaTrain.printSchema();
// feaTrain.show(5, false);
feaTrain = feaTrain.select("features", "Survived");
System.out.println("\n------- after selected:");
feaTrain.printSchema();
feaTrain.show(5, false);
LogisticRegression lr = new LogisticRegression()
.setLabelCol("Survived")
.setMaxIter(10000)
.setRegParam(0.0)
.setElasticNetParam(0.8);
// Fit the model
LogisticRegressionModel lrModel = lr.fit(feaTrain);
// Print the coefficients and intercept for logistic regression
System.out.println("\n+++++++++ Binomial logistic regression's Coefficients: "
+ lrModel.coefficients() + "\nBinomial Intercept: " + lrModel.intercept());
Dataset<Row> testData = spark.read().format("csv").option("header", true).option("inferSchema", true)
.load("/home/paul/share/mySparkJavaApiLearning/src/main/resources/kaggle/Titanic/gen_LR_test_data.csv");
Dataset<Row> feaTest = vectorAssem.transform(testData);
feaTest = feaTest.select("features");
Dataset<Row> result = lrModel.transform(feaTest);
// System.out.println("\n------- after predict:");
// result.printSchema();
// result.show(5, false);
//result = result.withColumn("PassengerId", result.col("prediction"));
result = result.withColumnRenamed("prediction", "Survived");
System.out.println("\n====== after add and rename:");
result.printSchema();
result.show(5, false);
result.select("Survived").write().mode("overwrite").option("header", true).csv("mllib_LR_TitanicResult");
spark.stop();
}
}
列舉寫代碼時(shí)遇到的問題
雖然看起來是小問題但是第一次遇到解決起來可不少花時(shí)間。
-
問題1:讀取csv格式文件
雖然之前閱讀DataFrame的API文檔的時(shí)候官網(wǎng)上有講DataFrame可以從csv格式的文件中生成DataFrame坞嘀,但是我在實(shí)際寫代碼中遇到了以下幾個(gè)問題:
最開始代碼是這么寫的
Dataset<Row> training = spark.read().format("csv")
.load("/home/paul/share/mySparkJavaApiLearning/src/main/resources/kaggle/Titanic/gen_LR_train_data.csv");
這個(gè)會(huì)導(dǎo)致一個(gè)問題躯护,在DataFrame.show()的時(shí)候發(fā)現(xiàn)第一行的列名稱也出現(xiàn)在數(shù)據(jù)當(dāng)中,而且列的名稱變?yōu)?_C0 ... _C14"丽涩,通過google和閱讀源碼的注釋發(fā)現(xiàn)這里需要增加一個(gè)配置:
Dataset<Row> training = spark.read().format("csv").option("header", true)
.load("/home/paul/share/mySparkJavaApiLearning/src/main/resources/kaggle/Titanic/gen_LR_train_data.csv");
不過問題還沒有結(jié)束棺滞,再接下來想要使用VectorAssembler來產(chǎn)生features的列向量時(shí)遇到一個(gè)問題,剛才前面得到的training的schema每一列的數(shù)據(jù)類型都是string矢渊。這會(huì)引起VectorAssembler的一個(gè)異常(原因見API介紹:不接受string格式的列作為輸入)继准,于是還需要想辦法把string轉(zhuǎn)成int或者double。然后繼續(xù)google和看源碼注釋矮男,發(fā)現(xiàn)還需要增加一個(gè)配置:
Dataset<Row> training = spark.read().format("csv").option("header", true).option("inferSchema", true)
.load("/home/paul/share/mySparkJavaApiLearning/src/main/resources/kaggle/Titanic/gen_LR_train_data.csv");
至此移必,我們就得到了有列名稱以及有具體非string類型的schema了。
-
問題2: 怎么產(chǎn)生LogisticRegression所需要的特征列向量
在用LogisticRegression訓(xùn)練產(chǎn)生LogisticRegressionModel時(shí)毡鉴,只要簡(jiǎn)單調(diào)用fit()方法即可崔泵,不過如果直接對(duì)上面的training調(diào)用這個(gè)方法,會(huì)提示沒有列向量(默認(rèn)名為features)猪瞬。一開始我的思路是查找LogisticRegression憎瘸,看看是否有setXXXXXX這樣的方法可以讓LogisticRegression將多個(gè)列作為特征列向量來使用,最終我沒有找到這樣的設(shè)置方法(如果確實(shí)有陈瘦,麻煩在評(píng)論區(qū)回復(fù)下幌甘,非常感謝!)痊项;然后再換個(gè)思路锅风,怎么對(duì)training進(jìn)行Transformation產(chǎn)生一個(gè)新的DataFrame來滿足我們的需求。經(jīng)過google終于找到了一個(gè)特征轉(zhuǎn)換的方法:VectorAssembler线婚,這個(gè)API可以實(shí)現(xiàn)我們上面的需求:
String origStr = "SibSp,Parch,Cabin_No,Cabin_Yes,Embarked_C,Embarked_Q,Embarked_S,Sex_female,Sex_male,Pclass_1,Pclass_2,Pclass_3,Age_scaled,Fare_scaled";
String[] arrOrig = origStr.split(",");
VectorAssembler vectorAssem = new VectorAssembler()
.setInputCols(arrOrig).setOutputCol("features");
Dataset<Row> feaTrain = vectorAssem.transform(training);
// System.out.println("\n------- assembled out:");
// feaTrain.printSchema();
// feaTrain.show(5, false);
feaTrain = feaTrain.select("features", "Survived");
這樣我們就得到了可以用來fit LogisticRegression的訓(xùn)練集遏弱,來產(chǎn)生LogisticRegressionModel盆均。然后同樣的方式處理測(cè)試集塞弊,最后通過剛剛訓(xùn)練得到的模型來預(yù)測(cè)測(cè)試集的結(jié)果。
第三步:對(duì)比Spark的MLlib LogisticRegression結(jié)果
具體訓(xùn)練LogisticRegression時(shí)設(shè)置的參數(shù)為:
LogisticRegression lr = new LogisticRegression()
.setLabelCol("Survived")
.setMaxIter(10000)
.setRegParam(0.0)
.setElasticNetParam(0.8);
最終很巧合的是Spark預(yù)測(cè)出的結(jié)果與我之前自己手寫的Java版的LogisticRegression的預(yù)測(cè)竟完全一致泪姨。最終得分都是:
雖然這個(gè)結(jié)果看起來很挫游沿,但是這個(gè)一致的結(jié)果讓我覺得我之前做的工作在正確性上還是可以的。而且我也知道這個(gè)得分比較挫的原因是特征工程太low了肮砾,只是目前還沒有細(xì)化下去诀黍,畢竟特征工程是預(yù)測(cè)結(jié)果的上限,在這個(gè)之上所有不同的機(jī)器學(xué)習(xí)方法只是在用各自的努力去接近這個(gè)上限而已仗处。
另外還需要注意的是眯勾,Spark的LogisticRegression在訓(xùn)練時(shí)明顯比我自己寫的代碼計(jì)算速度快枣宫,我本以為虛擬機(jī)環(huán)境加上Spark的各種雜七雜八的流程處理肯定會(huì)不如我自己的代碼運(yùn)行的快的,但實(shí)際結(jié)果卻Piapia打臉吃环。也颤。。
總結(jié):
- 感謝google郁轻、感謝Stack Overflow翅娶、感謝Spark官網(wǎng)的Programming Guide以及example
- 強(qiáng)烈鄙視http://spark.apache.org/docs/latest/api/java/index.html, 如果我打開的方式?jīng)]有錯(cuò)誤的話好唯,感覺它完全沒啥用(如果是我打開的方式不對(duì)也麻煩在評(píng)論區(qū)幫我指正竭沫,先感謝了!)骑篙。
- 源碼也很有幫助蜕提,就是scala的語(yǔ)法還要再學(xué)學(xué),不然看起來不順暢替蛉。
- 對(duì)于RDD或者DataFrame的Transformation玩的不夠溜贯溅,這個(gè)確實(shí)是基礎(chǔ),也接下來需要找機(jī)會(huì)多學(xué)習(xí)和實(shí)踐躲查。