異常點檢測算法isolation forest的分布式實現(xiàn)

無監(jiān)督領域有一個準度和效率雙佳的異常點檢測算法万搔,我在實踐中使用過幾次仰坦,效果奇好,就是最近幾年非常流行的isolation forest(孤立森林)良瞧。該算法在sklearn中有現(xiàn)成的包陪汽,但是如果大數(shù)據(jù)的集群上跑的話,目前沒有封裝好的接口褥蚯,給分布式任務的部署帶來了很多不便(話說spark mllib中集成的算法真心太少了)掩缓,本文用scala從頭進行該算法在spark上的分布式實現(xiàn),并演示任務在集群上的執(zhí)行全過程遵岩。

一你辣、算法簡介

先說一下算法的最少必要知識,細節(jié)部分會揉在代碼里進行講解尘执。

1舍哄、訓練過程:構建森林的樹木

iForest由iTree組成。構建每一顆iTree時誊锭,從訓練數(shù)據(jù)中抽取N個樣本表悬,然后在這些樣本中,隨機選擇一個特征丧靡,再隨機選擇該特征下的一個值蟆沫,對樣本進行二叉劃分籽暇,然后分別在左右兩邊的數(shù)據(jù)集上重復上面的過程,直接達到終止條件饭庞,一顆樹構建完成戒悠。

2、預測過程:計算樣本的異常得分

把測試數(shù)據(jù)在每棵樹上沿對應的條件分支往下走舟山,直到達到葉子節(jié)點绸狐,并記錄這過程中經過的路徑長度path length(用h(x)表示)。并由此得出異常分數(shù)累盗,當分數(shù)超過某一閾值寒矿,即可判定為異常樣本。

二若债、scala實現(xiàn)

代碼主體非原創(chuàng)符相,參考自國外的一位大神:https://github.com/hsperr/first_steps_in_scala,有部分修改

1蠢琳、首先主巍,import編寫spark程序所需的包,以及scala的Random模塊挪凑,用于隨機選取功能孕索。
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.util.Random
2、定義單顆樹iTree躏碳,第二搞旭、三行意味著,每棵樹的左右分支ITreeBranch和葉子節(jié)點ITreeLeaf都屬于iTree的子類菇绵。
sealed trait ITree
case class ITreeBranch(left: ITree, right: ITree, split_column: Int, split_value: Double) extends ITree
case class ITreeLeaf(size: Long) extends ITree
3肄渗、定義孤立森林的類,完成算法的訓練部分咬最,即全部樹的構建翎嫡。

3.1、從樣本中抽樣永乌,用于構建單個iTree

object IsolationForest {
    def getRandomSubsample(data: RDD[Array[Double]], sampleRatio: Double, seed: Long = Random.nextLong): RDD[Array[Double]] = {
        data.sample(false, sampleRatio, seed=seed)
    }

3.2 惑申、遞歸構建生成單顆iTree。
參數(shù):
data:上步抽出的樣本數(shù)據(jù)翅雏;
maxHeight:樹的最大高度即樹終止生長的條件圈驼;
numColumns:data的特征數(shù)量;
currentHeight:樹的當前高度望几。
返回:
一顆完整的ITree

    def growTree(data: RDD[Array[Double]], maxHeight:Int, numColumns:Int, currentHeight:Int = 0): ITree = {
        val numSamples = data.count()
        //遞歸終止條件绩脆,當前樹高大于maxHeight或數(shù)據(jù)量不大于1
        if(currentHeight>=maxHeight || numSamples <= 1){
            return new ITreeLeaf(numSamples)
        }
        //隨機選擇特征列
        val split_column = Random.nextInt(numColumns)
        val column = data.map(s => s(split_column))
        //隨機選擇該特征列中的值split_value,用于分割樣本
        val col_min = column.min()
        val col_max = column.max()
        val split_value = col_min + Random.nextDouble()*(col_max-col_min)
        //小于分割值的成為左子樹,反之右子樹
        val X_left = data.filter(s => s(split_column) < split_value).cache()
        val X_right = data.filter(s => s(split_column) >= split_value).cache()

        //遞歸
        new ITreeBranch(growTree(X_left, maxHeight, numColumns, currentHeight + 1),
            growTree(X_right, maxHeight, numColumns, currentHeight + 1),
            split_column,
            split_value)
    }
}

3.3靴迫、將多棵iTree組建成完整森林iforest
參數(shù):
data:全部訓練數(shù)據(jù)惕味;
numTrees:森林中樹的個數(shù);
subSampleSize:每棵樹采樣的大杏裥俊名挥;
seed:隨機種子。
返回:
孤立森林

def buildForest(data: RDD[Array[Double]], numTrees: Int = 2, subSampleSize: Int = 256, seed: Long = Random.nextLong) : IsolationForest = {
        val numSamples = data.count()
        val numColumns = data.take(1)(0).size
        val maxHeight = math.ceil(math.log(subSampleSize)).toInt
        val trees = Array.fill[ITree](numTrees)(ITreeLeaf(1))

        val trainedTrees = trees.map(s=>growTree(getRandomSubsample(data, subSampleSize/numSamples.toDouble, seed), maxHeight, numColumns))

        IsolationForest(numSamples, trainedTrees)
    }
4芬沉、定義預測功能類

4.1 預測功能類定義為IsolationForest的樣例類,
參數(shù)
num_samples:單課iTree的樣本數(shù)目
trees:已經構建好的孤立森林iforest

主函數(shù)predict阁猜,
參數(shù)x:要預測的單條樣本數(shù)組丸逸,
返回:異常得分Anomaly Score
步驟:
在每一棵iTree上,計算樣本達到葉子節(jié)點走過的路徑長度剃袍,然后將得到的不同路徑長度按照如下公式進行計算黄刚,得到異常得分,走過的路徑越短民效,得分越高憔维,代表越異常。


image.png

公式中畏邢,h(x)代表路徑長度业扒,E(h(x))代表在不同的iTree上路徑長度的均值,即群體決策舒萎,分母是用來歸一化的程储。

case class IsolationForest(num_samples: Long, trees: Array[ITree]) {
    def predict(x:Array[Double]): Double = {
        val predictions = trees.map(s => pathLength(x, s, 0)).toList
        println(predictions.mkString(","))
        math.pow(2, -(predictions.sum/predictions.size)/cost(num_samples)) //Anomaly Score
    }

上面代碼用到的cost 方法和pathLength方法定義如下,
cost方法參數(shù)為二叉樹中的樣本個數(shù)臂寝,范圍該二叉樹的平均路徑長度章鲤,公式為:


image.png
    def cost(num_items:Long): Int =
        //二叉搜索樹的平均路徑長度。0.5772156649:歐拉常數(shù)
        (2*(math.log(num_items-1) + 0.5772156649)-(2*(num_items-1)/num_items)).toInt

pathLength方法是一個遞歸計算咆贬,因為每走一步败徊,接下來面對的仍然是一顆樹,分支樹或者葉子節(jié)點掏缎。
參數(shù):樣本x皱蹦,單顆樹tree,當前的路徑長度path_length眷蜈,初始值應傳入0根欧。
返回:最終的路徑長度

    @scala.annotation.tailrec
    final def pathLength(x:Array[Double], tree:ITree, path_length:Int): Double ={
        tree match{ //match方法,讓tree進行如下兩種模式匹配
            //如果ITree匹配到的類型是葉子節(jié)點端蛆,那么凤粗,查看該節(jié)點的樣本數(shù)size,如果size大于1,則加上該size對應的二叉搜索樹的平均路徑長度嫌拣,如果size等于1柔袁,則直接加1
            case ITreeLeaf(size) => 
                if (size > 1)
                    path_length + cost(size)
                else 
                    path_length + 1

            //如果ITree匹配到的類型是一顆分支子樹,該子樹還會有l(wèi)eft分支异逐,right分支捶索,以及分類的依據(jù)特征列split_column,和該特征列的分割值split_value
            case ITreeBranch(left, right, split_column, split_value) => 
                val sample_value = x(split_column)  //傳入的樣本x在該特征上的取值

                if (sample_value < split_value)  //如果小于分割值則在左子樹上進行遞歸計算灰瞻,如果大于分割值則在右子樹上進行遞歸計算
                    pathLength(x, left, path_length + 1)
                else
                    pathLength(x, right, path_length + 1)
        }
    }
}

5腥例、讀取數(shù)據(jù)進行預測

本節(jié)定義最終要調用運行的main方法,我把樣例數(shù)據(jù)放在了本地酝润,也可以放到hdfs上燎竖,csv格式,已經做好了標準化要销,概覽如下


訓練數(shù)據(jù)概覽.png

5.1构回、一些對spark的基本設置

object Runner{
    def main(args:Array[String]): Unit ={
        Random.setSeed(1337)

        val conf = new SparkConf()
            .setAppName("IsolationTree")
            .setMaster("local")

        val sc = new SparkContext(conf)
        //禁止對輸出文件進行壓縮
        sc.hadoopConfiguration.set("mapred.output.compress", "false")  

5.2、讀入csv數(shù)據(jù)并預處理疏咐,lines為RDD格式纤掸,這是spark處理數(shù)據(jù)的基本單元

        val lines = sc.textFile("file:///tmp/spark_data/spark_if_train.csv") //本地路徑
        val data =  //對每一行數(shù)據(jù)以逗號為分隔符進行拆分,從第二個數(shù)據(jù)開始取浑塞,因為第一個數(shù)字是索引
            lines
                .map(line => line.split(","))
                .map(s => s.slice(1,s.length))
        val header = data.first() // 取第一行的數(shù)據(jù)作為列名

        // 去掉列名行并將數(shù)據(jù)轉化為double類型
        val rows = data.filter(line => line(0) != header(0)).map(s => s.map(_.toDouble)) 

        println("Loaded CSV File...")
        println(header.mkString("\n"))  // 看一下列名
        println(rows.take(5).deep.mkString("\n"))  // 看一下前5行數(shù)據(jù)

5.3借跪、進行iforest的構建和對樣本的預測

        // 構建森林,訓練數(shù)據(jù)rows酌壕,森林里樹的棵樹垦梆,這里寫10,數(shù)據(jù)量大的話一般是100
        val forest = IsolationForest.buildForest(rows, numTrees=10)

        // 對每一行數(shù)據(jù)進行預測
        val result_rdd = rows.map(row => row ++ Array(forest.predict(row)))

        // 將結果存入本地文件
        result_rdd.map(lines => lines.mkString(",")).repartition(1).saveAsTextFile("file:///tmp/predict_label")

        // 看一下前10條數(shù)據(jù)的預測結果
        val local_rows = rows.take(10)
        for(row <- local_rows){
            println("ForestScore", forest.predict(row))
        }
        println("Finished Isolation")
    }
}

以上仅孩,isolation forest訓練部分和預測部分都做好了托猩。

三、部署到spark上并運行

(圖片給自己的機器打了碼辽慕,略丑??)

1京腥、基礎環(huán)境配置

前提1:配置好spark集群,能成功進入下圖所示的交互狀態(tài)溅蛉。此部分教程自行google~

$ spark-shell
進入spark交互命令行查看是否正常運行.png

前提2:配置好sbt 公浪,用于管理項目依賴,構建項目
參考教程:http://blog.csdn.net/zcf1002797280/article/details/49677881

sbt sbtVersion
查看sbt版本信息確保sbt正確安裝.png
2船侧、部署腳本

2.1欠气、將上節(jié)代碼文件命名為Runner.scala
2.2、創(chuàng)建目錄結構

cd ~
mkdir -p mysparkapp/iforest_model/src/main/scala

2.3镜撩、將Runner.scala移動到~/mysparkapp/iforest_model/src/main/scala文件夾下

mv Runner.scala ~/mysparkapp/iforest_model/src/main/scala/

2.4预柒、新建配置文件conf.sbt,聲明我們項目的名稱以及對相關版本的依賴信息

cd ~/mysparkapp/iforest_model
vim conf.sbt

在conf.sbt中,添加如下內容宜鸯,版本信息根據(jù)你配置的真實信息來寫哦:

name := "IsolationForest"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.1"

現(xiàn)在看一下我們的項目結構是否如圖所示

find .
查看項目目錄結構.png

2.5憔古、將程序打包,仍然在~/mysparkapp/iforest_model下淋袖,執(zhí)行:

sbt package
sbt打包.png

注意黃色箭頭指向的文件地址鸿市,這是打包好的jar包,供我們稍后提交任務使用即碗。

2.6焰情、正式提交spark任務
在提交spark任務之前,要確保輸出目錄不存在:

rm -r /tmp/predict_label

然后用spark-submit命令提交任務剥懒,需要傳入剛剛打包好的jar包路徑:

spark-submit --class "Runner" ~/mysparkapp/iforest_model/target/scala-2.11/isolationforest_2.11-1.0.jar

開始運行~~???
我們打印出了數(shù)據(jù)的列名内舟、前5條數(shù)據(jù)、以及前10條數(shù)據(jù)的異常得分如圖所示:


程序執(zhí)行.png

任務執(zhí)行完畢蕊肥,看一下輸出文件谒获,圖示撈出了前五行蛤肌,最后一個字段即為預測得分壁却,接下來就可以設定一個閾值,原作論文推薦為0.6裸准,大于閾值的即判定為異常啦展东。


輸出文件.png

圖中的第二行數(shù)據(jù),得分0.69炒俱,其他數(shù)據(jù)得分均為0.5以下盐肃,觀察一下它前面的字段,比其他數(shù)據(jù)都要大出很多权悟,確實為一個異常點~

四砸王、小結

isolation forest由多棵樹構成,而樹的生長過程并不受其他樹影響峦阁,所以是一個非常完美的適合分布式并行的算法谦铃。樣例數(shù)據(jù)和代碼都放到了https://github.com/scarlettgin/isolation_spark

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市榔昔,隨后出現(xiàn)的幾起案子驹闰,更是在濱河造成了極大的恐慌,老刑警劉巖撒会,帶你破解...
    沈念sama閱讀 217,509評論 6 504
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件嘹朗,死亡現(xiàn)場離奇詭異,居然都是意外死亡诵肛,警方通過查閱死者的電腦和手機屹培,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,806評論 3 394
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人惫谤,你說我怎么就攤上這事壁顶。” “怎么了溜歪?”我有些...
    開封第一講書人閱讀 163,875評論 0 354
  • 文/不壞的土叔 我叫張陵若专,是天一觀的道長。 經常有香客問我蝴猪,道長调衰,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,441評論 1 293
  • 正文 為了忘掉前任自阱,我火速辦了婚禮嚎莉,結果婚禮上,老公的妹妹穿的比我還像新娘沛豌。我一直安慰自己趋箩,他們只是感情好,可當我...
    茶點故事閱讀 67,488評論 6 392
  • 文/花漫 我一把揭開白布加派。 她就那樣靜靜地躺著叫确,像睡著了一般。 火紅的嫁衣襯著肌膚如雪芍锦。 梳的紋絲不亂的頭發(fā)上竹勉,一...
    開封第一講書人閱讀 51,365評論 1 302
  • 那天,我揣著相機與錄音娄琉,去河邊找鬼次乓。 笑死,一個胖子當著我的面吹牛孽水,可吹牛的內容都是我干的票腰。 我是一名探鬼主播,決...
    沈念sama閱讀 40,190評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼女气,長吁一口氣:“原來是場噩夢啊……” “哼杏慰!你這毒婦竟也來了?” 一聲冷哼從身側響起主卫,我...
    開封第一講書人閱讀 39,062評論 0 276
  • 序言:老撾萬榮一對情侶失蹤逃默,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后簇搅,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體完域,經...
    沈念sama閱讀 45,500評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,706評論 3 335
  • 正文 我和宋清朗相戀三年瘩将,在試婚紗的時候發(fā)現(xiàn)自己被綠了吟税。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片凹耙。...
    茶點故事閱讀 39,834評論 1 347
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖肠仪,靈堂內的尸體忽然破棺而出肖抱,到底是詐尸還是另有隱情,我是刑警寧澤异旧,帶...
    沈念sama閱讀 35,559評論 5 345
  • 正文 年R本政府宣布意述,位于F島的核電站,受9級特大地震影響吮蛹,放射性物質發(fā)生泄漏荤崇。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,167評論 3 328
  • 文/蒙蒙 一潮针、第九天 我趴在偏房一處隱蔽的房頂上張望术荤。 院中可真熱鬧,春花似錦每篷、人聲如沸瓣戚。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,779評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽子库。三九已至,卻和暖如春吨灭,著一層夾襖步出監(jiān)牢的瞬間刚照,已是汗流浹背刑巧。 一陣腳步聲響...
    開封第一講書人閱讀 32,912評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人闽寡。 一個月前我還...
    沈念sama閱讀 47,958評論 2 370
  • 正文 我出身青樓电谣,卻偏偏與公主長得像,于是被迫代替她去往敵國和親恭理。 傳聞我的和親對象是個殘疾皇子拯辙,可洞房花燭夜當晚...
    茶點故事閱讀 44,779評論 2 354

推薦閱讀更多精彩內容