無監(jiān)督領域有一個準度和效率雙佳的異常點檢測算法万搔,我在實踐中使用過幾次仰坦,效果奇好,就是最近幾年非常流行的isolation forest(孤立森林)良瞧。該算法在sklearn中有現(xiàn)成的包陪汽,但是如果大數(shù)據(jù)的集群上跑的話,目前沒有封裝好的接口褥蚯,給分布式任務的部署帶來了很多不便(話說spark mllib中集成的算法真心太少了)掩缓,本文用scala從頭進行該算法在spark上的分布式實現(xiàn),并演示任務在集群上的執(zhí)行全過程遵岩。
一你辣、算法簡介
先說一下算法的最少必要知識,細節(jié)部分會揉在代碼里進行講解尘执。
1舍哄、訓練過程:構建森林的樹木
iForest由iTree組成。構建每一顆iTree時誊锭,從訓練數(shù)據(jù)中抽取N個樣本表悬,然后在這些樣本中,隨機選擇一個特征丧靡,再隨機選擇該特征下的一個值蟆沫,對樣本進行二叉劃分籽暇,然后分別在左右兩邊的數(shù)據(jù)集上重復上面的過程,直接達到終止條件饭庞,一顆樹構建完成戒悠。
2、預測過程:計算樣本的異常得分
把測試數(shù)據(jù)在每棵樹上沿對應的條件分支往下走舟山,直到達到葉子節(jié)點绸狐,并記錄這過程中經過的路徑長度path length(用h(x)表示)。并由此得出異常分數(shù)累盗,當分數(shù)超過某一閾值寒矿,即可判定為異常樣本。
二若债、scala實現(xiàn)
代碼主體非原創(chuàng)符相,參考自國外的一位大神:https://github.com/hsperr/first_steps_in_scala,有部分修改
1蠢琳、首先主巍,import編寫spark程序所需的包,以及scala的Random模塊挪凑,用于隨機選取功能孕索。
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.util.Random
2、定義單顆樹iTree躏碳,第二搞旭、三行意味著,每棵樹的左右分支ITreeBranch和葉子節(jié)點ITreeLeaf都屬于iTree的子類菇绵。
sealed trait ITree
case class ITreeBranch(left: ITree, right: ITree, split_column: Int, split_value: Double) extends ITree
case class ITreeLeaf(size: Long) extends ITree
3肄渗、定義孤立森林的類,完成算法的訓練部分咬最,即全部樹的構建翎嫡。
3.1、從樣本中抽樣永乌,用于構建單個iTree
object IsolationForest {
def getRandomSubsample(data: RDD[Array[Double]], sampleRatio: Double, seed: Long = Random.nextLong): RDD[Array[Double]] = {
data.sample(false, sampleRatio, seed=seed)
}
3.2 惑申、遞歸構建生成單顆iTree。
參數(shù):
data:上步抽出的樣本數(shù)據(jù)翅雏;
maxHeight:樹的最大高度即樹終止生長的條件圈驼;
numColumns:data的特征數(shù)量;
currentHeight:樹的當前高度望几。
返回:
一顆完整的ITree
def growTree(data: RDD[Array[Double]], maxHeight:Int, numColumns:Int, currentHeight:Int = 0): ITree = {
val numSamples = data.count()
//遞歸終止條件绩脆,當前樹高大于maxHeight或數(shù)據(jù)量不大于1
if(currentHeight>=maxHeight || numSamples <= 1){
return new ITreeLeaf(numSamples)
}
//隨機選擇特征列
val split_column = Random.nextInt(numColumns)
val column = data.map(s => s(split_column))
//隨機選擇該特征列中的值split_value,用于分割樣本
val col_min = column.min()
val col_max = column.max()
val split_value = col_min + Random.nextDouble()*(col_max-col_min)
//小于分割值的成為左子樹,反之右子樹
val X_left = data.filter(s => s(split_column) < split_value).cache()
val X_right = data.filter(s => s(split_column) >= split_value).cache()
//遞歸
new ITreeBranch(growTree(X_left, maxHeight, numColumns, currentHeight + 1),
growTree(X_right, maxHeight, numColumns, currentHeight + 1),
split_column,
split_value)
}
}
3.3靴迫、將多棵iTree組建成完整森林iforest
參數(shù):
data:全部訓練數(shù)據(jù)惕味;
numTrees:森林中樹的個數(shù);
subSampleSize:每棵樹采樣的大杏裥俊名挥;
seed:隨機種子。
返回:
孤立森林
def buildForest(data: RDD[Array[Double]], numTrees: Int = 2, subSampleSize: Int = 256, seed: Long = Random.nextLong) : IsolationForest = {
val numSamples = data.count()
val numColumns = data.take(1)(0).size
val maxHeight = math.ceil(math.log(subSampleSize)).toInt
val trees = Array.fill[ITree](numTrees)(ITreeLeaf(1))
val trainedTrees = trees.map(s=>growTree(getRandomSubsample(data, subSampleSize/numSamples.toDouble, seed), maxHeight, numColumns))
IsolationForest(numSamples, trainedTrees)
}
4芬沉、定義預測功能類
4.1 預測功能類定義為IsolationForest的樣例類,
參數(shù)
num_samples:單課iTree的樣本數(shù)目
trees:已經構建好的孤立森林iforest
主函數(shù)predict阁猜,
參數(shù)x:要預測的單條樣本數(shù)組丸逸,
返回:異常得分Anomaly Score
步驟:
在每一棵iTree上,計算樣本達到葉子節(jié)點走過的路徑長度剃袍,然后將得到的不同路徑長度按照如下公式進行計算黄刚,得到異常得分,走過的路徑越短民效,得分越高憔维,代表越異常。
公式中畏邢,h(x)代表路徑長度业扒,E(h(x))代表在不同的iTree上路徑長度的均值,即群體決策舒萎,分母是用來歸一化的程储。
case class IsolationForest(num_samples: Long, trees: Array[ITree]) {
def predict(x:Array[Double]): Double = {
val predictions = trees.map(s => pathLength(x, s, 0)).toList
println(predictions.mkString(","))
math.pow(2, -(predictions.sum/predictions.size)/cost(num_samples)) //Anomaly Score
}
上面代碼用到的cost 方法和pathLength方法定義如下,
cost方法參數(shù)為二叉樹中的樣本個數(shù)臂寝,范圍該二叉樹的平均路徑長度章鲤,公式為:
def cost(num_items:Long): Int =
//二叉搜索樹的平均路徑長度。0.5772156649:歐拉常數(shù)
(2*(math.log(num_items-1) + 0.5772156649)-(2*(num_items-1)/num_items)).toInt
pathLength方法是一個遞歸計算咆贬,因為每走一步败徊,接下來面對的仍然是一顆樹,分支樹或者葉子節(jié)點掏缎。
參數(shù):樣本x皱蹦,單顆樹tree,當前的路徑長度path_length眷蜈,初始值應傳入0根欧。
返回:最終的路徑長度
@scala.annotation.tailrec
final def pathLength(x:Array[Double], tree:ITree, path_length:Int): Double ={
tree match{ //match方法,讓tree進行如下兩種模式匹配
//如果ITree匹配到的類型是葉子節(jié)點端蛆,那么凤粗,查看該節(jié)點的樣本數(shù)size,如果size大于1,則加上該size對應的二叉搜索樹的平均路徑長度嫌拣,如果size等于1柔袁,則直接加1
case ITreeLeaf(size) =>
if (size > 1)
path_length + cost(size)
else
path_length + 1
//如果ITree匹配到的類型是一顆分支子樹,該子樹還會有l(wèi)eft分支异逐,right分支捶索,以及分類的依據(jù)特征列split_column,和該特征列的分割值split_value
case ITreeBranch(left, right, split_column, split_value) =>
val sample_value = x(split_column) //傳入的樣本x在該特征上的取值
if (sample_value < split_value) //如果小于分割值則在左子樹上進行遞歸計算灰瞻,如果大于分割值則在右子樹上進行遞歸計算
pathLength(x, left, path_length + 1)
else
pathLength(x, right, path_length + 1)
}
}
}
5腥例、讀取數(shù)據(jù)進行預測
本節(jié)定義最終要調用運行的main方法,我把樣例數(shù)據(jù)放在了本地酝润,也可以放到hdfs上燎竖,csv格式,已經做好了標準化要销,概覽如下
5.1构回、一些對spark的基本設置
object Runner{
def main(args:Array[String]): Unit ={
Random.setSeed(1337)
val conf = new SparkConf()
.setAppName("IsolationTree")
.setMaster("local")
val sc = new SparkContext(conf)
//禁止對輸出文件進行壓縮
sc.hadoopConfiguration.set("mapred.output.compress", "false")
5.2、讀入csv數(shù)據(jù)并預處理疏咐,lines為RDD格式纤掸,這是spark處理數(shù)據(jù)的基本單元
val lines = sc.textFile("file:///tmp/spark_data/spark_if_train.csv") //本地路徑
val data = //對每一行數(shù)據(jù)以逗號為分隔符進行拆分,從第二個數(shù)據(jù)開始取浑塞,因為第一個數(shù)字是索引
lines
.map(line => line.split(","))
.map(s => s.slice(1,s.length))
val header = data.first() // 取第一行的數(shù)據(jù)作為列名
// 去掉列名行并將數(shù)據(jù)轉化為double類型
val rows = data.filter(line => line(0) != header(0)).map(s => s.map(_.toDouble))
println("Loaded CSV File...")
println(header.mkString("\n")) // 看一下列名
println(rows.take(5).deep.mkString("\n")) // 看一下前5行數(shù)據(jù)
5.3借跪、進行iforest的構建和對樣本的預測
// 構建森林,訓練數(shù)據(jù)rows酌壕,森林里樹的棵樹垦梆,這里寫10,數(shù)據(jù)量大的話一般是100
val forest = IsolationForest.buildForest(rows, numTrees=10)
// 對每一行數(shù)據(jù)進行預測
val result_rdd = rows.map(row => row ++ Array(forest.predict(row)))
// 將結果存入本地文件
result_rdd.map(lines => lines.mkString(",")).repartition(1).saveAsTextFile("file:///tmp/predict_label")
// 看一下前10條數(shù)據(jù)的預測結果
val local_rows = rows.take(10)
for(row <- local_rows){
println("ForestScore", forest.predict(row))
}
println("Finished Isolation")
}
}
以上仅孩,isolation forest訓練部分和預測部分都做好了托猩。
三、部署到spark上并運行
(圖片給自己的機器打了碼辽慕,略丑??)
1京腥、基礎環(huán)境配置
前提1:配置好spark集群,能成功進入下圖所示的交互狀態(tài)溅蛉。此部分教程自行google~
$ spark-shell
前提2:配置好sbt 公浪,用于管理項目依賴,構建項目
參考教程:http://blog.csdn.net/zcf1002797280/article/details/49677881
sbt sbtVersion
2船侧、部署腳本
2.1欠气、將上節(jié)代碼文件命名為Runner.scala
2.2、創(chuàng)建目錄結構
cd ~
mkdir -p mysparkapp/iforest_model/src/main/scala
2.3镜撩、將Runner.scala移動到~/mysparkapp/iforest_model/src/main/scala文件夾下
mv Runner.scala ~/mysparkapp/iforest_model/src/main/scala/
2.4预柒、新建配置文件conf.sbt,聲明我們項目的名稱以及對相關版本的依賴信息
cd ~/mysparkapp/iforest_model
vim conf.sbt
在conf.sbt中,添加如下內容宜鸯,版本信息根據(jù)你配置的真實信息來寫哦:
name := "IsolationForest"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.1"
現(xiàn)在看一下我們的項目結構是否如圖所示
find .
2.5憔古、將程序打包,仍然在~/mysparkapp/iforest_model下淋袖,執(zhí)行:
sbt package
注意黃色箭頭指向的文件地址鸿市,這是打包好的jar包,供我們稍后提交任務使用即碗。
2.6焰情、正式提交spark任務
在提交spark任務之前,要確保輸出目錄不存在:
rm -r /tmp/predict_label
然后用spark-submit命令提交任務剥懒,需要傳入剛剛打包好的jar包路徑:
spark-submit --class "Runner" ~/mysparkapp/iforest_model/target/scala-2.11/isolationforest_2.11-1.0.jar
開始運行~~???
我們打印出了數(shù)據(jù)的列名内舟、前5條數(shù)據(jù)、以及前10條數(shù)據(jù)的異常得分如圖所示:
任務執(zhí)行完畢蕊肥,看一下輸出文件谒获,圖示撈出了前五行蛤肌,最后一個字段即為預測得分壁却,接下來就可以設定一個閾值,原作論文推薦為0.6裸准,大于閾值的即判定為異常啦展东。
圖中的第二行數(shù)據(jù),得分0.69炒俱,其他數(shù)據(jù)得分均為0.5以下盐肃,觀察一下它前面的字段,比其他數(shù)據(jù)都要大出很多权悟,確實為一個異常點~
四砸王、小結
isolation forest由多棵樹構成,而樹的生長過程并不受其他樹影響峦阁,所以是一個非常完美的適合分布式并行的算法谦铃。樣例數(shù)據(jù)和代碼都放到了https://github.com/scarlettgin/isolation_spark