Spark 實戰(zhàn),第 4 部分: 使用 Spark MLlib 做 K-means 聚類分析
https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice4/
MLlib 是 Spark 生態(tài)系統(tǒng)里用來解決大數(shù)據(jù)機器學(xué)習(xí)問題的模塊皆辽。本文將以聚類分析這個典型的機器學(xué)習(xí)問題為基礎(chǔ),向讀者介紹如何使用 MLlib 提供的 K-means 算法對數(shù)據(jù)做聚類分析,我們還將通過分析源碼,進一步加深讀者對 MLlib K-means 算法的實現(xiàn)原理和使用方法的理解驳癌。
查看本系列更多內(nèi)容 | 15
王 龍, 軟件開發(fā)工程師, IBM
2015 年 9 月 24 日
開始您的試用
引言
提起機器學(xué)習(xí) (Machine Learning)役听,相信很多計算機從業(yè)者都會對這個技術(shù)方向感到興奮颓鲜。然而學(xué)習(xí)并使用機器學(xué)習(xí)算法來處理數(shù)據(jù)卻是一項復(fù)雜的工作,需要充足的知識儲備典予,如概率論甜滨,數(shù)理統(tǒng)計,數(shù)值逼近熙参,最優(yōu)化理論等艳吠。機器學(xué)習(xí)旨在使計算機具有人類一樣的學(xué)習(xí)能力和模仿能力,這也是實現(xiàn)人工智能的核心思想和方法孽椰。傳統(tǒng)的機器學(xué)習(xí)算法昭娩,由于技術(shù)和單機存儲的限制,只能在少量數(shù)據(jù)上使用黍匾,隨著 HDFS(Hadoop Distributed File System) 等分布式文件系統(tǒng)出現(xiàn)栏渺,存儲海量數(shù)據(jù)已經(jīng)成為可能。然而由于 MapReduce 自身的限制锐涯,使得使用 MapReduce 來實現(xiàn)分布式機器學(xué)習(xí)算法非常耗時和消耗磁盤容量磕诊。因為通常情況下機器學(xué)習(xí)算法參數(shù)學(xué)習(xí)的過程都是迭代計算的,即本次計算的結(jié)果要作為下一次迭代的輸入纹腌,這個過程中霎终,如果使用 MapReduce,我們只能把中間結(jié)果存儲磁盤升薯,然后在下一次計算的時候從新讀取莱褒,這對于迭代 頻發(fā)的算法顯然是致命的性能瓶頸。Spark 立足于內(nèi)存計算涎劈,天然的適應(yīng)于迭代式計算广凸,相信對于這點阅茶,讀者通過前面幾篇文章已經(jīng)有了較為深入的了解。然而即便這樣谅海,對于普通開發(fā)者來說脸哀,實現(xiàn)一個分布式機器學(xué)習(xí)算法仍然是一件極具挑戰(zhàn)的事情。MLlib 正是為了讓基于海量數(shù)據(jù)的機器學(xué)習(xí)變得更加簡單扭吁,它提供了常用機器學(xué)習(xí)算法的分布式實現(xiàn)撞蜂,開發(fā)者只需要有 Spark 基礎(chǔ)并且了解機器學(xué)習(xí)算法的原理,以及方法相關(guān)參數(shù)的含義智末,就可以輕松的通過調(diào)用相應(yīng)的 API 來實現(xiàn)基于海量數(shù)據(jù)的機器學(xué)習(xí)過程谅摄。當(dāng)然,原始數(shù)據(jù) ETL系馆,特征指標(biāo)提取送漠,調(diào)節(jié)參數(shù)并優(yōu)化學(xué)習(xí)過程,這依然需要有足夠的行業(yè)知識和數(shù)據(jù)敏感度由蘑,這往往也是經(jīng)驗的體現(xiàn)闽寡。本文的重點在于向讀者介紹如何使用 MLlib 機器學(xué)習(xí)庫提供的 K-means 算法做聚類分析,這是一個有意義的過程尼酿,相信會對讀者特別是初學(xué)者有啟發(fā)意義爷狈。
回頁首
Spark 機器學(xué)習(xí)庫簡介
Spark 機器學(xué)習(xí)庫提供了常用機器學(xué)習(xí)算法的實現(xiàn),包括聚類裳擎,分類涎永,回歸,協(xié)同過濾鹿响,維度縮減等羡微。使用 Spark 機器學(xué)習(xí)庫來做機器學(xué)習(xí)工作,可以說是非常的簡單惶我,通常只需要在對原始數(shù)據(jù)進行處理后妈倔,然后直接調(diào)用相應(yīng)的 API 就可以實現(xiàn)。但是要想選擇合適的算法绸贡,高效準(zhǔn)確地對數(shù)據(jù)進行分析盯蝴,您可能還需要深入了解下算法原理,以及相應(yīng) Spark MLlib API 實現(xiàn)的參數(shù)的意義听怕。
需要提及的是捧挺,Spark 機器學(xué)習(xí)庫從 1.2 版本以后被分為兩個包,分別是:
spark.mllib
Spark MLlib 歷史比較長了尿瞭,1.0 以前的版本中已經(jīng)包含了松忍,提供的算法實現(xiàn)都是基于原始的 RDD,從學(xué)習(xí)角度上來講筷厘,其實比較容易上手鸣峭。如果您已經(jīng)有機器學(xué)習(xí)方面的經(jīng)驗,那么您只需要熟悉下 MLlib 的 API 就可以開始數(shù)據(jù)分析工作了酥艳。想要基于這個包提供的工具構(gòu)建完整并且復(fù)雜的機器學(xué)習(xí)流水線是比較困難的摊溶。
spark.ml
Spark ML Pipeline 從 Spark1.2 版本開始,目前已經(jīng)從 Alpha 階段畢業(yè)充石,成為可用并且較為穩(wěn)定的新的機器學(xué)習(xí)庫莫换。ML Pipeline 彌補了原始 MLlib 庫的不足,向用戶提供了一個基于 DataFrame 的機器學(xué)習(xí)工作流式 API 套件骤铃,使用 ML Pipeline API拉岁,我們可以很方便的把數(shù)據(jù)處理,特征轉(zhuǎn)換惰爬,正則化喊暖,以及多個機器學(xué)習(xí)算法聯(lián)合起來,構(gòu)建一個單一完整的機器學(xué)習(xí)流水線撕瞧。顯然陵叽,這種新的方式給我們提供了更靈活的方法,而且這也更符合機器學(xué)習(xí)過程的特點丛版。
從官方文檔來看巩掺,Spark ML Pipeline 雖然是被推薦的機器學(xué)習(xí)方式,但是并不會在短期內(nèi)替代原始的 MLlib 庫页畦,因為 MLlib 已經(jīng)包含了豐富穩(wěn)定的算法實現(xiàn)胖替,并且部分 ML Pipeline 實現(xiàn)基于 MLlib。而且就筆者看來豫缨,并不是所有的機器學(xué)習(xí)過程都需要被構(gòu)建成一個流水線独令,有時候原始數(shù)據(jù)格式整齊且完整,而且使用單一的算法就能實現(xiàn)目標(biāo)州胳,我們就沒有必要把事情復(fù)雜化记焊,采用最簡單且容易理解的方式才是正確的選擇。
本文基于 Spark 1.5栓撞,向讀者展示使用 MLlib API 進行聚類分析的過程遍膜。讀者將會發(fā)現(xiàn),使用 MLlib API 開發(fā)機器學(xué)習(xí)應(yīng)用方式是比較簡單的瓤湘,相信本文可以使讀者建立起信心并掌握基本方法瓢颅,以便在后續(xù)的學(xué)習(xí)和工作中事半功倍。
回頁首
K-means 聚類算法原理
聚類分析是一個無監(jiān)督學(xué)習(xí) (Unsupervised Learning) 過程, 一般是用來對數(shù)據(jù)對象按照其特征屬性進行分組弛说,經(jīng)常被應(yīng)用在客戶分群挽懦,欺詐檢測,圖像分析等領(lǐng)域木人。K-means 應(yīng)該是最有名并且最經(jīng)常使用的聚類算法了信柿,其原理比較容易理解冀偶,并且聚類效果良好,有著廣泛的使用渔嚷。
和諸多機器學(xué)習(xí)算法一樣进鸠,K-means 算法也是一個迭代式的算法,其主要步驟如下:
第一步形病,選擇 K 個點作為初始聚類中心客年。
第二步,計算其余所有點到聚類中心的距離漠吻,并把每個點劃分到離它最近的聚類中心所在的聚類中去量瓜。在這里,衡量距離一般有多個函數(shù)可以選擇途乃,最常用的是歐幾里得距離 (Euclidean Distance), 也叫歐式距離绍傲。公式如下:
其中 C 代表中心點,X 代表任意一個非中心點欺劳。
第三步唧取,重新計算每個聚類中所有點的平均值,并將其作為新的聚類中心點划提。
最后枫弟,重復(fù) (二),(三) 步的過程鹏往,直至聚類中心不再發(fā)生改變淡诗,或者算法達(dá)到預(yù)定的迭代次數(shù),又或聚類中心的改變小于預(yù)先設(shè)定的閥值伊履。
在實際應(yīng)用中韩容,K-means 算法有兩個不得不面對并且克服的問題。
聚類個數(shù) K 的選擇唐瀑。K 的選擇是一個比較有學(xué)問和講究的步驟群凶,我們會在后文專門描述如何使用 Spark 提供的工具選擇 K。
初始聚類中心點的選擇哄辣。選擇不同的聚類中心可能導(dǎo)致聚類結(jié)果的差異请梢。
Spark MLlib K-means 算法的實現(xiàn)在初始聚類點的選擇上,借鑒了一個叫 K-means||的類 K-means++ 實現(xiàn)力穗。K-means++ 算法在初始點選擇上遵循一個基本原則: 初始聚類中心點相互之間的距離應(yīng)該盡可能的遠(yuǎn)毅弧。基本步驟如下:
第一步当窗,從數(shù)據(jù)集 X 中隨機選擇一個點作為第一個初始點够坐。
第二步,計算數(shù)據(jù)集中所有點與最新選擇的中心點的距離 D(x)。
第四部蛾坯,重復(fù) (二),(三) 步過程光酣,直到 K 個初始點選擇完成。
回頁首
MLlib 的 K-means 實現(xiàn)
Spark MLlib 中 K-means 算法的實現(xiàn)類 (KMeans.scala) 具有以下參數(shù)脉课,具體如下。
圖 1. MLlib K-means 算法實現(xiàn)類預(yù)覽
圖 2. MLlib K-means 算法參數(shù)初始值
k 表示期望的聚類的個數(shù)。
*maxInterations *表示方法單次運行最大的迭代次數(shù)戳寸。
*runs *表示算法被運行的次數(shù)呈驶。K-means 算法不保證能返回全局最優(yōu)的聚類結(jié)果,所以在目標(biāo)數(shù)據(jù)集上多次跑 K-means 算法疫鹊,有助于返回最佳聚類結(jié)果袖瞻。
*initializationMode *表示初始聚類中心點的選擇方式, 目前支持隨機選擇或者 K-means||方式。默認(rèn)是 K-means||拆吆。
initializationSteps表示 K-means||方法中的部數(shù)聋迎。
*epsilon *表示 K-means 算法迭代收斂的閥值。
*seed *表示集群初始化時的隨機種子枣耀。
通常應(yīng)用時霉晕,我們都會先調(diào)用 KMeans.train 方法對數(shù)據(jù)集進行聚類訓(xùn)練,這個方法會返回 KMeansModel 類實例捞奕,然后我們也可以使用 KMeansModel.predict 方法對新的數(shù)據(jù)點進行所屬聚類的預(yù)測牺堰,這是非常實用的功能。
KMeans.train 方法有很多重載方法颅围,這里我們選擇參數(shù)最全的一個展示伟葫。
圖 3. KMeans.train 方法預(yù)覽
回頁首
聚類測試數(shù)據(jù)集簡介
在本文中,我們所用到目標(biāo)數(shù)據(jù)集是來自 UCI Machine Learning Repository 的 Wholesale customer Data Set一疯。UCI 是一個關(guān)于機器學(xué)習(xí)測試數(shù)據(jù)的下載中心站點撼玄,里面包含了適用于做聚類,分群墩邀,回歸等各種機器學(xué)習(xí)問題的數(shù)據(jù)集掌猛。
Wholesale customer Data Set 是引用某批發(fā)經(jīng)銷商的客戶在各種類別產(chǎn)品上的年消費數(shù)。為了方便處理,本文把原始的 CSV 格式轉(zhuǎn)化成了兩個文本文件荔茬,分別是訓(xùn)練用數(shù)據(jù)和測試用數(shù)據(jù)废膘。
圖 5. 客戶消費數(shù)據(jù)格式預(yù)覽
回頁首
案例分析和編碼實現(xiàn)
本例中灌闺,我們將根據(jù)目標(biāo)客戶的消費數(shù)據(jù),將每一列視為一個特征指標(biāo)坏瞄,對數(shù)據(jù)集進行聚類分析桂对。代碼實現(xiàn)步驟如下
清單 1. 聚類分析實現(xiàn)類源碼
import org.apache.spark.{SparkContext, SparkConf}import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}import org.apache.spark.mllib.linalg.Vectorsobject KMeansClustering {def main (args: Array[String]) {if (args.length < 5) {println("Usage:KMeansClustering trainingDataFilePath testDataFilePath numClusters numIterations runTimes")sys.exit(1)}val conf = new SparkConf().setAppName("Spark MLlib Exercise:K-Means Clustering")val sc = new SparkContext(conf)/Channel Region Fresh Milk Grocery Frozen Detergents_Paper Delicassen 2 3 12669 9656 7561 214 2674 1338 2 3 7057 9810 9568 1762 3293 1776 2 3 6353 8808 7684 2405 3516 7844*/val rawTrainingData = sc.textFile(args(0))val parsedTrainingData = rawTrainingData.filter(!isColumnNameLine()).map(line => {Vectors.dense(line.split("\t").map(.trim).filter(!"".equals()).map(.toDouble))}).cache()// Cluster the data into two classes using KMeansval numClusters = args(2).toIntval numIterations = args(3).toIntval runTimes = args(4).toIntvar clusterIndex:Int = 0val clusters:KMeansModel = KMeans.train(parsedTrainingData, numClusters, numIterations,runTimes)println("Cluster Number:" + clusters.clusterCenters.length)println("Cluster Centers Information Overview:")clusters.clusterCenters.foreach( x => {println("Center Point of Cluster " + clusterIndex + ":")println(x)clusterIndex += 1})//begin to check which cluster each test data belongs to based on the clustering resultval rawTestData = sc.textFile(args(1))val parsedTestData = rawTestData.map(line => {Vectors.dense(line.split("\t").map(.trim).filter(!"".equals()).map(_.toDouble))})parsedTestData.collect().foreach(testDataLine => {val predictedClusterIndex: Int = clusters.predict(testDataLine)println("The data " + testDataLine.toString + " belongs to cluster " + predictedClusterIndex)})println("Spark MLlib K-means clustering test finished.")}private def isColumnNameLine(line:String):Boolean = {if (line != null && line.contains("Channel")) trueelse false}
該示例程序接受五個入?yún)ⅲ謩e是
訓(xùn)練數(shù)據(jù)集文件路徑
測試數(shù)據(jù)集文件路徑
聚類的個數(shù)
K-means 算法的迭代次數(shù)
K-means 算法 run 的次數(shù)
回頁首
運行示例程序
和本系列其他文章一樣鸠匀,我們依然選擇使用 HDFS 存儲數(shù)據(jù)文件蕉斜。運行程序之前,我們需要將前文提到的訓(xùn)練和測試數(shù)據(jù)集上傳到 HDFS缀棍。
圖 6. 測試數(shù)據(jù)的 HDFS 目錄
./spark-submit --class com.ibm.spark.exercise.mllib.KMeansClustering \ --master spark://<spark_master_node_ip>:7077 \ --num-executors 6 --driver-memory 3g --executor-memory 512m --total-executor-cores 6 \ /home/fams/spark_exercise-1.0.jar \ hdfs://<hdfs_namenode_ip>:9000/user/fams/mllib/wholesale_customers_data_training.txt \ hdfs://<hdfs_namenode_ip>:9000/user/fams/mllib/wholesale_customers_data_test.txt \ 8 30 3
圖 7. K-means 聚類示例程序運行結(jié)果
回頁首
如何選擇 K
前面提到 K 的選擇是 K-means 算法的關(guān)鍵宅此,Spark MLlib 在 KMeansModel 類里提供了 computeCost 方法,該方法通過計算所有數(shù)據(jù)點到其最近的中心點的平方和來評估聚類的效果爬范。一般來說父腕,同樣的迭代次數(shù)和算法跑的次數(shù),這個值越小代表聚類的效果越好坦敌。但是在實際情況下侣诵,我們還要考慮到聚類結(jié)果的可解釋性,不能一味的選擇使 computeCost 結(jié)果值最小的那個 K狱窘。
清單 3. K 選擇示例代碼片段
val ks:Array[Int] = Array(3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20)ks.foreach(cluster => { val model:KMeansModel = KMeans.train(parsedTrainingData, cluster,30,1) val ssd = model.computeCost(parsedTrainingData) println("sum of squared distances of points to their nearest center when k=" + cluster + " -> "+ ssd)})
圖 8. K 選擇示例程序運行結(jié)果
從上圖的運行結(jié)果可以看到杜顺,當(dāng) K=9 時,cost 值有波動蘸炸,但是后面又逐漸減小了躬络,所以我們選擇 8 這個臨界點作為 K 的個數(shù)。當(dāng)然可以多跑幾次搭儒,找一個穩(wěn)定的 K 值穷当。理論上 K 的值越大,聚類的 cost 越小淹禾,極限情況下馁菜,每個點都是一個聚類,這時候 cost 是 0铃岔,但是顯然這不是一個具有實際意義的聚類結(jié)果汪疮。
回頁首
結(jié)束語
通過本文的學(xué)習(xí),讀者已經(jīng)初步了解了 Spark 的機器學(xué)習(xí)庫,并且掌握了 K-means 算法的基本原理智嚷,以及如何基于 Spark MLlib 構(gòu)建自己的機器學(xué)習(xí)應(yīng)用卖丸。機器學(xué)習(xí)應(yīng)用的構(gòu)建是一個復(fù)雜的過程,我們通常還需要對數(shù)據(jù)進行預(yù)處理盏道,然后特征提取以及數(shù)據(jù)清洗等稍浆,然后才能利用算法來分析數(shù)據(jù)。Spark MLlib 區(qū)別于傳統(tǒng)的機器學(xué)習(xí)工具猜嘱,不僅是因為它提供了簡單易用的 API衅枫,更重要的是 Spark 在處理大數(shù)據(jù)上的高效以及在迭代計算時的獨特優(yōu)勢。雖然本文所采用的測試數(shù)據(jù)集很小朗伶,并不能反映大數(shù)據(jù)的應(yīng)用場景为鳄,但是對于掌握基本原理已經(jīng)足夠,并且如果讀者擁有更大的數(shù)據(jù)集就可以輕松的將本文的測試程序推廣到大數(shù)據(jù)聚類的場景下腕让,因為 Spark MLlib 的編程模型都是一致的,無非是數(shù)據(jù)讀取和處理的方式略有不同歧斟。希望讀者可以在本文中找到自己感興趣的知識纯丸,相信這對讀者今后深入學(xué)習(xí)是有幫助的。另外静袖,讀者在閱讀本文的過程中觉鼻,如果遇到問題或者發(fā)現(xiàn)不足之處,請不吝賜教队橙,在文末留言坠陈,共同交流學(xué)習(xí),謝謝捐康。