數(shù)據(jù)平臺實(shí)踐①——Flume+Kafka+SparkStreaming(pyspark)

蜻蜓點(diǎn)水


Flume——數(shù)據(jù)采集

如果說监署,爬蟲是采集外部數(shù)據(jù)的常用手段的話藏研,那么孽锥,F(xiàn)lume就是采集內(nèi)部數(shù)據(jù)的常用手段之一(logstash也是這方面的佼佼者)哼拔。
下面介紹一下Flume的基本構(gòu)造引有。

  • Agent:包含Source、Channel和Sink的主體倦逐,它是這3個組件的載體譬正,是組成Flume的數(shù)據(jù)節(jié)點(diǎn)。
  • Event:Flume 數(shù)據(jù)傳輸?shù)幕締卧?/li>
  • Source: 用來接收Event檬姥,并將Event批量傳給Channel曾我。
  • Channel:Source和Sink之間的Event緩沖通道,它有個type屬性健民,一般為memory抒巢,可以提高傳輸速度。
  • Sink:負(fù)責(zé)將數(shù)據(jù)沉淀到最終存儲區(qū)秉犹,或沉淀給下一個source虐秦,形成數(shù)據(jù)流平酿。
Flume

在大致了解了以上要素之后,通過上圖悦陋,我們就可以有一個大概的認(rèn)識蜈彼。一句話講,Source接收數(shù)據(jù)俺驶,并轉(zhuǎn)成Event單元幸逆,然后導(dǎo)入Channel緩沖通道,最后暮现,經(jīng)由Sink進(jìn)行數(shù)據(jù)沉淀还绘。當(dāng)然這里的沉淀,有多種選擇栖袋,除了上圖中的HDFS外拍顷,還包括HBase、File塘幅,或者作為另一個Source的源昔案。在一系列過程,一條有序的數(shù)據(jù)流就誕生了电媳。

Kafka——數(shù)據(jù)的發(fā)布/訂閱

Kafka踏揣,作為基于發(fā)布/訂閱的消息系統(tǒng),以其分布式性而受到大家的喜愛匾乓。
下面介紹一下Kafka的基本構(gòu)造捞稿。

  • Broker(代理): Kafka集群可由一個或多個服務(wù)器組成,其中的每個服務(wù)節(jié)點(diǎn)稱作這個集群的一個Broker拼缝。
  • Topic(主題): 一個Topic對應(yīng)一類消息娱局,Topic用作為消息劃分類別。
  • Partition(分區(qū)): 一個Topic一般含有多個分區(qū)咧七。
  • Producer(生產(chǎn)者):消息生產(chǎn)者衰齐,負(fù)責(zé)生產(chǎn)Topic消息。
  • Consumer(消費(fèi)者): 消息消費(fèi)者猪叙,負(fù)責(zé)消費(fèi)Topic消息娇斩。


    Kafka

Zookeeper——服務(wù)器間協(xié)調(diào)

這里需要提一下Zookeeper仁卷,對于Kafka這樣的分布式服務(wù)穴翩,大多需要多臺服務(wù)器相互協(xié)調(diào)工作,且保持一致性锦积。任意一臺服務(wù)器出現(xiàn)問題芒帕,如果不及時處理,都有可能導(dǎo)致整個服務(wù)的崩潰丰介,其后果是不堪設(shè)想的背蟆。ZooKeeper的分布式設(shè)計(jì)鉴分,可用于領(lǐng)導(dǎo)人選舉、群組協(xié)同工作和配置服務(wù)等带膀,保證了服務(wù)的一致性和可用性志珍。

Zookeeper

Spark Streaming——Spark核心API

Spark Streaming屬于Spark的核心api,它支持高吞吐量垛叨、支持容錯的實(shí)時流數(shù)據(jù)處理伦糯。它可以通過Kafka、HDFS嗽元、Flume等多種渠道獲取數(shù)據(jù)敛纲,轉(zhuǎn)換數(shù)據(jù)后利用Spark Engine進(jìn)行數(shù)據(jù)處理。現(xiàn)在剂癌,包括Python淤翔、Java等多種高級語言都對Spark進(jìn)行支持。本文使用pyspark進(jìn)行編程佩谷。

Spark Streaming

實(shí)踐出真知


要做什么

nginx日志分析旁壮,簡單統(tǒng)計(jì)了下PV和UV,并做了H5圖表實(shí)時展示琳要。使用的是我開發(fā)的基于ace-admin和react的管理端LogAdmin對數(shù)據(jù)進(jìn)行展示寡具。這里提供github,感興趣的朋友可以看一下稚补。

LogAdmin

下面是我的主要步驟童叠。
①Flume實(shí)時讀入nginx日志,并將數(shù)據(jù)導(dǎo)入Kafka中课幕。
②pyspark從Kafka讀入數(shù)據(jù)厦坛,做實(shí)時處理,并將處理后的數(shù)據(jù)導(dǎo)出到redis隊(duì)列中乍惊。
③編寫腳本從redis中取出數(shù)據(jù)杜秸,存入mysql。
④H5展示润绎。
【版本:Logstash1.7.0撬碟,Kafka 2.11(該版本中已集成了Zookeeper),Spark(2.0.2)】

①Flume實(shí)時讀入nginx日志莉撇,并將數(shù)據(jù)導(dǎo)入Kafka中呢蛤。

這一步中,只需配置flume.conf棍郎,并依次啟動flume其障、zookeeper、kafka即可
flume.conf(配置中命名已較為明確涂佃,hdfs部分被注釋了)

# agent-my80

# Finally, now that we've defined all of our components, tell
# agent-my80 which ones we want to activate.
#agent-my80.channels = ch1
#agent-my80.sources = avro-source1
#agent-my80.sinks = hdfs-sink1

agent-my80.channels = ch2
agent-my80.sources = exec-source1
agent-my80.sinks = kafka-sink1

# Define a memory channel called ch1 on agent-my80
#agent-my80.channels.ch1.type = memory
agent-my80.channels.ch2.type = memory


# Define an Avro source called avro-source1 on agent-my80 and tell it
# to bind to 0.0.0.0:41414. Connect it to channel ch1.
#agent-my80.sources.avro-source1.channels = ch1
#agent-my80.sources.avro-source1.type = avro
#agent-my80.sources.avro-source1.bind = 0.0.0.0
#agent-my80.sources.avro-source1.port = 44444
#agent-my80.sources.avro-source1.basenameHeader = true


agent-my80.sources.exec-source1.channels = ch2
agent-my80.sources.exec-source1.type = exec
agent-my80.sources.exec-source1.command = tail -f /home/www/logs/access.log

# # Define a logger sink that simply logs all events it receives
# # and connect it to the other end of the same channel.
#agent-my80.sinks.hdfs-sink1.channel = ch1
#agent-my80.sinks.hdfs-sink1.type = hdfs
#agent-my80.sinks.hdfs-sink1.hdfs.path = hdfs://my80:9000/flume-test
#agent-my80.sinks.hdfs-sink1.hdfs.filePrefix = event-
#agent-my80.sinks.hdfs-sink1.hdfs.filePrefix = %{basename}
#agent-my80.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = true
#agent-my80.sinks.hdfs-sink1.hdfs.round = true
#agent-my80.sinks.hdfs-sink1.hdfs.roundValue = 10
#agent-my80.sinks.hdfs-sink1.hdfs.fileType = DataStream

agent-my80.sinks.kafka-sink1.channel = ch2
agent-my80.sinks.kafka-sink1.type = org.apache.flume.sink.kafka.KafkaSink 
agent-my80.sinks.kafka-sink1.topic = my80-log
agent-my80.sinks.kafka-sink1.brokerList = localhost:9092
agent-my80.sinks.kafka-sink1.batchSize = 20

flume啟動命令

flume-ng agent --conf /usr/local/apache-flume-1.7.0-bin/conf --conf-file /usr/local/apache-flume-1.7.0-bin/conf/flume.conf --name agent-my80 -Dflume.root.logger=INFO,console

zookeeper啟動命令

/usr/local/kafka_2.11-0.10.1.0/bin/zookeeper-server-start.sh /usr/local/kafka_2.11-0.10.1.0/config/zookeeper.properties

kafka啟動命令

/usr/local/kafka_2.11-0.10.1.0/bin/kafka-server-start.sh /usr/local/kafka_2.11-0.10.1.0/config/server.properties

注意:有些朋友励翼,是用自己的個人服務(wù)器做相關(guān)實(shí)踐蜈敢,那么會遇到內(nèi)存不足的問題,這時候一般通過汽抚,修改Java堆大小來解決抓狭。比如我是修改的kafka的kafka-server-start.sh和zookeeper-server-start.sh來解決這個問題。

#export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" 
export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"

②pyspark從Kafka讀入數(shù)據(jù)造烁,做實(shí)時處理辐宾,并將處理后的數(shù)據(jù)導(dǎo)出到redis隊(duì)列中。

這部分為方便站點(diǎn)解析膨蛮,我對nginx日志格式做了修改叠纹。
該步驟主要是做正則解析+MapReduce+數(shù)據(jù)導(dǎo)入redis,并分別將請求內(nèi)容和請求ip放入redis的list和set敞葛,這樣主要是方便我統(tǒng)計(jì)每天的PV和UV誉察。
還要注意一點(diǎn),nginx日志中包括靜態(tài)文件惹谐,顯然這個不能算UV和PV持偏,所以要過濾。

calculate.py

#coding=utf8
from __future__ import print_function
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import re
import redis
import datetime

# 解析日志
def parse(logstring):
    #使用正則解析日志
    # regex = '(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}).*ip=\/(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}).*tbl=([a-zA-Z0-9_]+)'
    regex = 'ip:(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}).*?time:\[(.*?)\].*?request:\"(.*?)\".*?status_code:(\d{1,3}).*?agent:\"(.*?)\"'
    pattern = re.compile(regex)
    m1 = pattern.search(str(logstring))
    if m1 is not None:
        m = m1.groups()
        # print(m
        if len(m)!=5 or not m[2]:
            m= None
        else:
            hd_list=[u".js",u".css",u".jpg",u".png",u".jpeg",u".gif",u".bmp",u".woff"];
            if doStrContainAnyWords(m[2],hd_list):
                m= None
    else:
        m = None
    return m

def doStrContainAnyWords(str,words=[]):
    for word in words:
        if word in str:
            return True;
    return False;

class RedisClient:
    pool = None
    def __init__(self):
        self.getRedisPool()

    def getRedisPool(self):
        redisIp='localhost'
        redisPort=6379
        redisDB=0
        self.pool = redis.ConnectionPool(host=redisIp, port=redisPort, db=redisDB)
        return self.pool

    def addToHashSet(self, key, value):
        if self.pool is None:
            self.pool = self.getRedisPool()
        r = redis.Redis(connection_pool=self.pool)
        hashSetName="my80-log-iphash-"+datetime.datetime.now().strftime("%Y-%m-%d");

        flag=False;
        if r.exists(hashSetName) is False:
            flag=True

        if r.hexists(hashSetName,str(key)):
            r.hincrby(hashSetName, str(key), value)
        else:
            r.hset(hashSetName, str(key), value)

        if flag is True:
            r.expire(hashSetName,3600*24+300);


    def addToList(self,value):
        if self.pool is None:
            self.pool = self.getRedisPool()
        r = redis.Redis(connection_pool=self.pool)
        r.lpush('my80-log-list', value)

if __name__ == '__main__':
    zkQuorum = 'localhost:2181'
    topic = 'my80-log'
    sc = SparkContext("local[2]", appName="kafka_pyspark_redis")
    ssc = StreamingContext(sc, 10)
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "kafka-streaming-redis", {topic: 1})
    #kafka讀取返回的數(shù)據(jù)為tuple,長度為2氨肌,tuple[1]為實(shí)際的數(shù)據(jù)鸿秆,tuple[0]的編碼為Unicode
    

    res = kvs.map(lambda x: x[1]).map(lambda x:parse(x)).filter(lambda x:True if x is not None else False)
    items = res.map(lambda item:{"ip":item[0],"time":item[1],"request":item[2],"status_code":item[3],"agent":item[4] } )
    # items = res.map(lambda item:{"ip":item[0],"time":item[1] } )
    # ipcount = res.map(lambda item:(item[0],1)).reduceByKey(lambda a, b: a+b).map(lambda x:{ x[0]:str(x[1]) } )
    ipcount = res.map(lambda item:(item[0],1)).reduceByKey(lambda a, b: a+b)

    r = RedisClient()

    def handleItem(time,rdd):
        if rdd.isEmpty() is False:
            for element in rdd.collect():
                r.addToList(element)
    items.foreachRDD(handleItem)
    

    def ipHandle(time,rdd):
        if rdd.isEmpty() is False:
            # rddstr = "{"+','.join(rdd.collect())+"}"
            for element in rdd.collect():
                r.addToHashSet(element[0], element[1] )
    ipcount.foreachRDD(ipHandle)
  

    ssc.start()
    ssc.awaitTermination()

安裝好spark-2.0.2-bin-hadoop2.7,腳本測試ok怎囚,最后就需要通過spark streaming提交任務(wù)(即提交calculate.py)卿叽。任務(wù)正常執(zhí)行的話,數(shù)據(jù)就會從Kafka導(dǎo)出恳守,經(jīng)處理后考婴,導(dǎo)入redis。

/usr/local/spark-2.0.2-bin-hadoop2.7/bin/spark-submit --jars /usr/local/spark-2.0.2-bin-hadoop2.7/spark-streaming-kafka-0-8-assembly_2.11-2.0.2.jar /home/dcb/python/pyspark/calculate.py

③編寫腳本從redis中取出數(shù)據(jù)催烘,存入mysql沥阱。

這一步相信大家沒問題。

④H5圖表展示

H5圖表實(shí)時展示+github伊群,感興趣的朋友可以看一下考杉。

小結(jié)

Flume+Kafka+Spark,是一個相對比較流行且可行的實(shí)時計(jì)算組合舰始,可定制性較高崇棠,如果項(xiàng)目需求比較復(fù)雜,建議深入了解后進(jìn)行定制開發(fā)蔽午。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末易茬,一起剝皮案震驚了整個濱河市酬蹋,隨后出現(xiàn)的幾起案子及老,更是在濱河造成了極大的恐慌抽莱,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,194評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件骄恶,死亡現(xiàn)場離奇詭異食铐,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)僧鲁,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,058評論 2 385
  • 文/潘曉璐 我一進(jìn)店門虐呻,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人寞秃,你說我怎么就攤上這事斟叼。” “怎么了春寿?”我有些...
    開封第一講書人閱讀 156,780評論 0 346
  • 文/不壞的土叔 我叫張陵,是天一觀的道長。 經(jīng)常有香客問我顷窒,道長性含,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,388評論 1 283
  • 正文 為了忘掉前任厘线,我火速辦了婚禮识腿,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘造壮。我一直安慰自己渡讼,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,430評論 5 384
  • 文/花漫 我一把揭開白布耳璧。 她就那樣靜靜地躺著硝全,像睡著了一般。 火紅的嫁衣襯著肌膚如雪楞抡。 梳的紋絲不亂的頭發(fā)上伟众,一...
    開封第一講書人閱讀 49,764評論 1 290
  • 那天,我揣著相機(jī)與錄音召廷,去河邊找鬼凳厢。 笑死,一個胖子當(dāng)著我的面吹牛竞慢,可吹牛的內(nèi)容都是我干的先紫。 我是一名探鬼主播,決...
    沈念sama閱讀 38,907評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼筹煮,長吁一口氣:“原來是場噩夢啊……” “哼遮精!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,679評論 0 266
  • 序言:老撾萬榮一對情侶失蹤本冲,失蹤者是張志新(化名)和其女友劉穎准脂,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體檬洞,經(jīng)...
    沈念sama閱讀 44,122評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡狸膏,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,459評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了添怔。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片湾戳。...
    茶點(diǎn)故事閱讀 38,605評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖广料,靈堂內(nèi)的尸體忽然破棺而出砾脑,到底是詐尸還是另有隱情,我是刑警寧澤艾杏,帶...
    沈念sama閱讀 34,270評論 4 329
  • 正文 年R本政府宣布拦止,位于F島的核電站,受9級特大地震影響糜颠,放射性物質(zhì)發(fā)生泄漏汹族。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,867評論 3 312
  • 文/蒙蒙 一其兴、第九天 我趴在偏房一處隱蔽的房頂上張望顶瞒。 院中可真熱鬧,春花似錦元旬、人聲如沸榴徐。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,734評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽坑资。三九已至,卻和暖如春穆端,著一層夾襖步出監(jiān)牢的瞬間袱贮,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,961評論 1 265
  • 我被黑心中介騙來泰國打工体啰, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留攒巍,地道東北人。 一個月前我還...
    沈念sama閱讀 46,297評論 2 360
  • 正文 我出身青樓荒勇,卻偏偏與公主長得像柒莉,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子沽翔,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,472評論 2 348

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理兢孝,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 134,629評論 18 139
  • ** 今天看了一下kafka官網(wǎng)跨蟹,嘗試著在自己電腦上安裝和配置雳殊,然后學(xué)一下官方document。** Introd...
    RainChang閱讀 4,993評論 1 30
  • kafka的定義:是一個分布式消息系統(tǒng)喷市,由LinkedIn使用Scala編寫,用作LinkedIn的活動流(Act...
    時待吾閱讀 5,309評論 1 15
  • 目前為止威恼,已經(jīng)討論了機(jī)器學(xué)習(xí)和批處理模式的數(shù)據(jù)挖掘∑沸眨現(xiàn)在審視持續(xù)處理流數(shù)據(jù),實(shí)時檢測其中的事實(shí)和模式箫措,好像從湖泊來...
    abel_cao閱讀 8,989評論 1 20
  • 小時候父母總以表姐作我的范本斤蔓。而表姐也確實(shí)是不容易植酥。 當(dāng)時舅舅因?yàn)樵诠さ厣洗蚬哪_手架摔下來,雖然保住了一條命弦牡,但...
    蒹葭蒼蒼123閱讀 380評論 2 1