流式實時日志分析系統(tǒng)

我們都知道服務(wù)用戶訪問流量是不間斷的忽妒,基于網(wǎng)站的訪問日志,即 Web log 分析是典型的流式實時計算應(yīng)用場景兼贸。比如百度統(tǒng)計段直,它可以做流量分析、來源分析溶诞、網(wǎng)站分析鸯檬、轉(zhuǎn)化分析。另外還有特定場景分析很澄,比如安全分析京闰,用來識別 CC 攻擊、 SQL 注入分析甩苛、脫庫等蹂楣。在本次實踐中,我們將基于 Spark Streaming 流式計算框架讯蒲,簡單地實現(xiàn)一個類似于百度分析的系統(tǒng)痊土。

知識點簡述

  • Python 模擬生成 Nginx 日志
  • Spark Streaming 編程
  • 服務(wù)器訪問日志分析方法

原理簡述

百度統(tǒng)計是百度推出的一款免費的專業(yè)網(wǎng)站流量分析工具,能夠告訴用戶訪客是如何找到并瀏覽用戶的網(wǎng)站的墨林,以及在網(wǎng)站上瀏覽了哪些頁面赁酝。這些信息可以幫助用戶改善訪客在其網(wǎng)站上的使用體驗,不斷提升網(wǎng)站的投資回報率旭等。
百度統(tǒng)計提供了幾十種圖形化報告酌呆,包括:趨勢分析、來源分析搔耕、頁面分析隙袁、訪客分析、定制分析等多種統(tǒng)計分析服務(wù)。

這里我們參考百度統(tǒng)計的功能菩收,基于 Spark Streaming 簡單實現(xiàn)一個分析系統(tǒng)梨睁,使之包括以下分析功能。

  • 流量分析娜饵。一段時間內(nèi)用戶網(wǎng)站的流量變化趨勢坡贺,針對不同的 IP 對用戶網(wǎng)站的流量進行細分。常見指標是總 PV 和各 IP 的PV箱舞。
  • 來源分析遍坟。各種搜索引擎來源給用戶網(wǎng)站帶來的流量情況,需要精確到具體搜索引擎褐缠、具體關(guān)鍵詞政鼠。通過來源分析,用戶可以及時了解哪種類型的來源為其帶來了更多訪客队魏。常見指標是搜索引擎、關(guān)鍵詞和終端類型的 PV 万搔。
  • 網(wǎng)站分析胡桨。各個頁面的訪問情況,包括及時了解哪些頁面最吸引訪客以及哪些頁面最容易導(dǎo)致訪客流失瞬雹,從而幫助用戶更有針對性地改善網(wǎng)站質(zhì)量昧谊。常見指標是各頁面的 PV 。

日志實時采集

Web log 一般在 HTTP 服務(wù)器收集酗捌,比如 Nginx access 日志文件呢诬。一個典型的方案是 Nginx 日志文件 + Flume + Kafka + Spark Streaming,如下所述:

    1. 接收服務(wù)器用 Nginx 胖缤,根據(jù)負載可以部署多臺尚镰,數(shù)據(jù)落地至本地日志文件;
    1. 每個 Nginx 節(jié)點上部署 Flume 哪廓,使用 tail -f 實時讀取 Nginx 日志狗唉,發(fā)送至 KafKa 集群;
    1. 專用的 Kafka 集群用戶連接實時日志與 Spark 集群涡真,詳細配置可以參考 http://spark.apache.org/docs/2.1.1/streaming-kafka-integration.html 分俯;
    1. Spark Streaming 程序?qū)崟r消費 Kafka 集群上的數(shù)據(jù),實時分析哆料,輸出缸剪;

流式分析系統(tǒng)實現(xiàn)

我們簡單模擬一下數(shù)據(jù)收集和發(fā)送的環(huán)節(jié),用一個 Python 腳本隨機生成 Nginx 訪問日志东亦,并通過腳本的方式自動上傳至 HDFS 杏节,然后移動至指定目錄。 Spark Streaming 程序監(jiān)控 HDFS 目錄,自動處理新的文件拢锹。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import random
import time


class WebLogGeneration(object):

    # 類屬性谣妻,由所有類的對象共享
    site_url_base = "http://www.xxx.com/"

    # 基本構(gòu)造函數(shù)
    def __init__(self):
        #  前面7條是IE,所以大概瀏覽器類型70%為IE ,接入類型上卒稳,20%為移動設(shè)備蹋半,分別是7和8條,5% 為空
        #  https://github.com/mssola/user_agent/blob/master/all_test.go
        self.user_agent_dist = {0.0:"Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Trident/6.0)",
                                0.1:"Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Trident/6.0)",
                                0.2:"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Trident/4.0; .NET CLR 2.0.50727)",
                                0.3:"Mozilla/4.0 (compatible; MSIE6.0; Windows NT 5.0; .NET CLR 1.1.4322)",
                                0.4:"Mozilla/5.0 (Windows NT 6.1; Trident/7.0; rv:11.0) like Gecko",
                                0.5:"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:41.0) Gecko/20100101 Firefox/41.0",
                                0.6:"Mozilla/4.0 (compatible; MSIE6.0; Windows NT 5.0; .NET CLR 1.1.4322)",
                                0.7:"Mozilla/5.0 (iPhone; CPU iPhone OS 7_0_3 like Mac OS X) AppleWebKit/537.51.1 (KHTML, like Gecko) Version/7.0 Mobile/11B511 Safari/9537.53",
                                0.8:"Mozilla/5.0 (Linux; Android 4.2.1; Galaxy Nexus Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.166 Mobile Safari/535.19",
                                0.9:"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.85 Safari/537.36",
                                1:" ",}
        self.ip_slice_list = [10, 29, 30, 46, 55, 63, 72, 87, 98,132,156,124,167,143,187,168,190,201,202,214,215,222]
        self.url_path_list = ["login.php","view.php","list.php","upload.php","admin/login.php","edit.php","index.html"]
        self.http_refer = [ "http://www.baidu.com/s?wd={query}","http://www.google.cn/search?q={query}","http://www.sogou.com/web?query={query}","http://one.cn.yahoo.com/s?p={query}","http://cn.bing.com/search?q={query}"]
        self.search_keyword = ["spark","hadoop","hive","spark mlib","spark sql"]


    def sample_ip(self):
        slice = random.sample(self.ip_slice_list, 4) #從ip_slice_list中隨機獲取4個元素,作為一個片斷返回
        return  ".".join([str(item) for item in slice])  #  todo


    def sample_url(self):
        return  random.sample(self.url_path_list,1)[0]


    def sample_user_agent(self):
        dist_uppon = random.uniform(0, 1)
        return self.user_agent_dist[float('%0.1f' % dist_uppon)]


    # 主要搜索引擎referrer參數(shù)
    def sample_refer(self):
        if random.uniform(0, 1) > 0.2:  # 只有20% 流量有refer
            return "-"

        refer_str=random.sample(self.http_refer,1)
        query_str=random.sample(self.search_keyword,1)
        return refer_str[0].format(query=query_str[0])

    def sample_one_log(self,count = 3):
        time_str = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
        while count >1:
            query_log = "{ip} - - [{local_time}] \"GET /{url} HTTP/1.1\" 200 0 \"{refer}\" \"{user_agent}\" \"-\"".format(ip=self.sample_ip(),local_time=time_str,url=self.sample_url(),refer=self.sample_refer(),user_agent=self.sample_user_agent())
            print query_log
            count = count -1

if __name__ == "__main__":
    web_log_gene = WebLogGeneration()

    #while True:
    #    time.sleep(random.uniform(0, 3))
    web_log_gene.sample_one_log(random.uniform(10, 100))

然后需要一個簡單的腳本來調(diào)用上面的腳本以隨機生成日志充坑,上傳至 HDFS 减江,然后移動到目標目錄:

#!/bin/bash 

# HDFS命令 
HDFS="/usr/local/myhadoop/hadoop-2.7.3/bin/hadoop fs"

# Streaming程序監(jiān)聽的目錄,注意跟后面Streaming程序的配置要保持一致 
streaming_dir=”/spark/streaming” 

# 清空舊數(shù)據(jù) 
$HDFS -rm "${streaming_dir}"'/tmp/*' > /dev/null 2>&1 
$HDFS -rm "${streaming_dir}"'/*'     > /dev/null 2>&1 

# 一直運行 
while [ 1 ]; do 
    ./sample_web_log.py > test.log  

    # 給日志文件加上時間戳捻爷,避免重名 
    tmplog="access.`date +'%s'`.log" 

    # 先放在臨時目錄辈灼,再move至Streaming程序監(jiān)控的目錄下,確保原子性
    # 臨時目錄用的是監(jiān)控目錄的子目錄也榄,因為子目錄不會被監(jiān)控
    $HDFS -put test.log ${streaming_dir}/tmp/$tmplog 
    $HDFS -mv           ${streaming_dir}/tmp/$tmplog ${streaming_dir}/ 
    echo "`date +"%F %T"` put $tmplog to HDFS succeed"
    sleep 1
done 

Spark Streaming 程序代碼如下所示巡莹,可以在 bin/spark-shell 交互式環(huán)境下運行,如果要以 Spark 程序的方式運行甜紫,按注釋中的說明調(diào)整一下 StreamingContext 的生成方式即可降宅。啟動 bin/spark-shell 時,為了避免因 DEBUG 日志信息太多而影響觀察輸出囚霸,可以將 DEBUG 日志重定向至文件腰根,屏幕上只顯示主要輸出,方法是 ./bin/spark-shell 2>spark-shell-debug.log:

// 導(dǎo)入類
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

// 設(shè)計計算的周期拓型,單位秒
val batch = 10

/*
 * 這是bin/spark-shell交互式模式下創(chuàng)建StreamingContext的方法
 * 非交互式請使用下面的方法來創(chuàng)建
 */
val ssc = new StreamingContext(sc, Seconds(batch))

/*
// 非交互式下創(chuàng)建StreamingContext的方法
val conf = new SparkConf().setAppName("NginxAnay")
val ssc = new StreamingContext(conf, Seconds(batch))
*/


/*
 * 創(chuàng)建輸入DStream额嘿,是文本文件目錄類型
 * 本地模式下也可以使用本地文件系統(tǒng)的目錄,比如 file:///home/spark/streaming
 */
val lines = ssc.textFileStream("hdfs:///spark/streaming")


/*
 * 下面是統(tǒng)計各項指標劣挫,調(diào)試時可以只進行部分統(tǒng)計册养,方便觀察結(jié)果
 */


// 1. 總PV
lines.count().print()


// 2. 各IP的PV,按PV倒序
//   空格分隔的第一個字段就是IP
lines.map(line => {(line.split(" ")(0), 1)}).reduceByKey(_ + _).transform(rdd => {
  rdd.map(ip_pv => (ip_pv._2, ip_pv._1)).
  sortByKey(false).
  map(ip_pv => (ip_pv._2, ip_pv._1))
}).print()


// 3. 搜索引擎PV
val refer = lines.map(_.split("\"")(3))

// 先輸出搜索引擎和查詢關(guān)鍵詞揣云,避免統(tǒng)計搜索關(guān)鍵詞時重復(fù)計算
// 輸出(host, query_keys)
val searchEnginInfo = refer.map(r => {

    val f = r.split('/')

    val searchEngines = Map(
        "www.google.cn" -> "q",
        "www.yahoo.com" -> "p",
        "cn.bing.com" -> "q",
        "www.baidu.com" -> "wd",
        "www.sogou.com" -> "query"
    )

    if (f.length > 2) {
        val host = f(2)

        if (searchEngines.contains(host)) {
            val query = r.split('?')(1)
            if (query.length > 0) {
                val arr_search_q = query.split('&').filter(_.indexOf(searchEngines(host)+"=") == 0)
                if (arr_search_q.length > 0)
                    (host, arr_search_q(0).split('=')(1))
                else
                    (host, "")
            } else {
                (host, "")
            }
        } else
            ("", "")
    } else
        ("", "")

})

// 輸出搜索引擎PV
searchEnginInfo.filter(_._1.length > 0).map(p => {(p._1, 1)}).reduceByKey(_ + _).print()


// 4. 關(guān)鍵詞PV
searchEnginInfo.filter(_._2.length > 0).map(p => {(p._2, 1)}).reduceByKey(_ + _).print()


// 5. 終端類型PV
lines.map(_.split("\"")(5)).map(agent => {
    val types = Seq("iPhone", "Android")
    var r = "Default"
    for (t <- types) {
        if (agent.indexOf(t) != -1)
            r = t
    }
    (r, 1)
}).reduceByKey(_ + _).print()


// 6. 各頁面PV
lines.map(line => {(line.split("\"")(1).split(" ")(1), 1)}).reduceByKey(_ + _).print()



// 啟動計算,等待執(zhí)行結(jié)束(出錯或Ctrl-C退出)
ssc.start()
ssc.awaitTermination()

參考 實驗樓 《流式實時日志分析系統(tǒng)》
若有疑問捕儒,歡迎留言交流

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市邓夕,隨后出現(xiàn)的幾起案子刘莹,更是在濱河造成了極大的恐慌,老刑警劉巖焚刚,帶你破解...
    沈念sama閱讀 217,277評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件点弯,死亡現(xiàn)場離奇詭異,居然都是意外死亡矿咕,警方通過查閱死者的電腦和手機抢肛,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評論 3 393
  • 文/潘曉璐 我一進店門狼钮,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人捡絮,你說我怎么就攤上這事熬芜。” “怎么了福稳?”我有些...
    開封第一講書人閱讀 163,624評論 0 353
  • 文/不壞的土叔 我叫張陵涎拉,是天一觀的道長。 經(jīng)常有香客問我的圆,道長鼓拧,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,356評論 1 293
  • 正文 為了忘掉前任越妈,我火速辦了婚禮季俩,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘梅掠。我一直安慰自己酌住,他們只是感情好,可當我...
    茶點故事閱讀 67,402評論 6 392
  • 文/花漫 我一把揭開白布阎抒。 她就那樣靜靜地躺著赂韵,像睡著了一般。 火紅的嫁衣襯著肌膚如雪挠蛉。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,292評論 1 301
  • 那天肄满,我揣著相機與錄音谴古,去河邊找鬼。 笑死稠歉,一個胖子當著我的面吹牛掰担,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播怒炸,決...
    沈念sama閱讀 40,135評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼带饱,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了阅羹?” 一聲冷哼從身側(cè)響起勺疼,我...
    開封第一講書人閱讀 38,992評論 0 275
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎捏鱼,沒想到半個月后执庐,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,429評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡导梆,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,636評論 3 334
  • 正文 我和宋清朗相戀三年轨淌,在試婚紗的時候發(fā)現(xiàn)自己被綠了迂烁。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,785評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡递鹉,死狀恐怖盟步,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情躏结,我是刑警寧澤却盘,帶...
    沈念sama閱讀 35,492評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站窜觉,受9級特大地震影響谷炸,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜禀挫,卻給世界環(huán)境...
    茶點故事閱讀 41,092評論 3 328
  • 文/蒙蒙 一旬陡、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧语婴,春花似錦描孟、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,723評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至缠导,卻和暖如春廉羔,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背僻造。 一陣腳步聲響...
    開封第一講書人閱讀 32,858評論 1 269
  • 我被黑心中介騙來泰國打工憋他, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人髓削。 一個月前我還...
    沈念sama閱讀 47,891評論 2 370
  • 正文 我出身青樓竹挡,卻偏偏與公主長得像,于是被迫代替她去往敵國和親立膛。 傳聞我的和親對象是個殘疾皇子揪罕,可洞房花燭夜當晚...
    茶點故事閱讀 44,713評論 2 354

推薦閱讀更多精彩內(nèi)容

  • 引言 隨著實時數(shù)據(jù)的日漸普及,企業(yè)需要流式計算系統(tǒng)滿足可擴展宝泵、易用以及易整合進業(yè)務(wù)系統(tǒng)好啰。Structured St...
    阿貓阿狗Hakuna閱讀 3,311評論 2 13
  • 目前為止,已經(jīng)討論了機器學(xué)習和批處理模式的數(shù)據(jù)挖掘÷承桑現(xiàn)在審視持續(xù)處理流數(shù)據(jù)坎怪,實時檢測其中的事實和模式,好像從湖泊來...
    abel_cao閱讀 9,006評論 1 20
  • 1.Linux基礎(chǔ)和分布式集群技術(shù) 學(xué)完此階段可掌握的核心能力: 熟練使用Linux廓握,熟練安裝Linux上的軟件搅窿,...
    大數(shù)據(jù)05閱讀 400評論 0 0
  • 第四十五回 紅豆生江東嘁酿,春來發(fā)幾只(下) 第四十六回神亭青嶺秀,混元自心成(上) 亂塵飄然上山男应,原是為尋得了那太史...
    死在水里的魚閱讀 838評論 0 1
  • 三人為眾闹司,我們已經(jīng)9個人,說一群不為過沐飘。 因為共同的社交電商游桩,走到一起,從來不曾放棄耐朴,一直在努力借卧,我們就稱呼自己是...
    景雪兒可可閱讀 580評論 0 5