蜻蜓點(diǎn)水
Flume——數(shù)據(jù)采集
如果說监署,爬蟲是采集外部數(shù)據(jù)的常用手段的話藏研,那么孽锥,F(xiàn)lume就是采集內(nèi)部數(shù)據(jù)的常用手段之一(logstash也是這方面的佼佼者)哼拔。
下面介紹一下Flume的基本構(gòu)造引有。
- Agent:包含Source、Channel和Sink的主體倦逐,它是這3個組件的載體譬正,是組成Flume的數(shù)據(jù)節(jié)點(diǎn)。
- Event:Flume 數(shù)據(jù)傳輸?shù)幕締卧?/li>
- Source: 用來接收Event檬姥,并將Event批量傳給Channel曾我。
- Channel:Source和Sink之間的Event緩沖通道,它有個type屬性健民,一般為memory抒巢,可以提高傳輸速度。
- Sink:負(fù)責(zé)將數(shù)據(jù)沉淀到最終存儲區(qū)秉犹,或沉淀給下一個source虐秦,形成數(shù)據(jù)流平酿。
在大致了解了以上要素之后,通過上圖悦陋,我們就可以有一個大概的認(rèn)識蜈彼。一句話講,Source接收數(shù)據(jù)俺驶,并轉(zhuǎn)成Event單元幸逆,然后導(dǎo)入Channel緩沖通道,最后暮现,經(jīng)由Sink進(jìn)行數(shù)據(jù)沉淀还绘。當(dāng)然這里的沉淀,有多種選擇栖袋,除了上圖中的HDFS外拍顷,還包括HBase、File塘幅,或者作為另一個Source的源昔案。在一系列過程,一條有序的數(shù)據(jù)流就誕生了电媳。
Kafka——數(shù)據(jù)的發(fā)布/訂閱
Kafka踏揣,作為基于發(fā)布/訂閱的消息系統(tǒng),以其分布式性而受到大家的喜愛匾乓。
下面介紹一下Kafka的基本構(gòu)造捞稿。
- Broker(代理): Kafka集群可由一個或多個服務(wù)器組成,其中的每個服務(wù)節(jié)點(diǎn)稱作這個集群的一個Broker拼缝。
- Topic(主題): 一個Topic對應(yīng)一類消息娱局,Topic用作為消息劃分類別。
- Partition(分區(qū)): 一個Topic一般含有多個分區(qū)咧七。
- Producer(生產(chǎn)者):消息生產(chǎn)者衰齐,負(fù)責(zé)生產(chǎn)Topic消息。
-
Consumer(消費(fèi)者): 消息消費(fèi)者猪叙,負(fù)責(zé)消費(fèi)Topic消息娇斩。
Zookeeper——服務(wù)器間協(xié)調(diào)
這里需要提一下Zookeeper仁卷,對于Kafka這樣的分布式服務(wù)穴翩,大多需要多臺服務(wù)器相互協(xié)調(diào)工作,且保持一致性锦积。任意一臺服務(wù)器出現(xiàn)問題芒帕,如果不及時處理,都有可能導(dǎo)致整個服務(wù)的崩潰丰介,其后果是不堪設(shè)想的背蟆。ZooKeeper的分布式設(shè)計(jì)鉴分,可用于領(lǐng)導(dǎo)人選舉、群組協(xié)同工作和配置服務(wù)等带膀,保證了服務(wù)的一致性和可用性志珍。
Spark Streaming——Spark核心API
Spark Streaming屬于Spark的核心api,它支持高吞吐量垛叨、支持容錯的實(shí)時流數(shù)據(jù)處理伦糯。它可以通過Kafka、HDFS嗽元、Flume等多種渠道獲取數(shù)據(jù)敛纲,轉(zhuǎn)換數(shù)據(jù)后利用Spark Engine進(jìn)行數(shù)據(jù)處理。現(xiàn)在剂癌,包括Python淤翔、Java等多種高級語言都對Spark進(jìn)行支持。本文使用pyspark進(jìn)行編程佩谷。
實(shí)踐出真知
要做什么
nginx日志分析旁壮,簡單統(tǒng)計(jì)了下PV和UV,并做了H5圖表實(shí)時展示琳要。使用的是我開發(fā)的基于ace-admin和react的管理端LogAdmin對數(shù)據(jù)進(jìn)行展示寡具。這里提供github,感興趣的朋友可以看一下稚补。
下面是我的主要步驟童叠。
①Flume實(shí)時讀入nginx日志,并將數(shù)據(jù)導(dǎo)入Kafka中课幕。
②pyspark從Kafka讀入數(shù)據(jù)厦坛,做實(shí)時處理,并將處理后的數(shù)據(jù)導(dǎo)出到redis隊(duì)列中乍惊。
③編寫腳本從redis中取出數(shù)據(jù)杜秸,存入mysql。
④H5展示润绎。
【版本:Logstash1.7.0撬碟,Kafka 2.11(該版本中已集成了Zookeeper),Spark(2.0.2)】
①Flume實(shí)時讀入nginx日志莉撇,并將數(shù)據(jù)導(dǎo)入Kafka中呢蛤。
這一步中,只需配置flume.conf棍郎,并依次啟動flume其障、zookeeper、kafka即可
flume.conf(配置中命名已較為明確涂佃,hdfs部分被注釋了)
# agent-my80
# Finally, now that we've defined all of our components, tell
# agent-my80 which ones we want to activate.
#agent-my80.channels = ch1
#agent-my80.sources = avro-source1
#agent-my80.sinks = hdfs-sink1
agent-my80.channels = ch2
agent-my80.sources = exec-source1
agent-my80.sinks = kafka-sink1
# Define a memory channel called ch1 on agent-my80
#agent-my80.channels.ch1.type = memory
agent-my80.channels.ch2.type = memory
# Define an Avro source called avro-source1 on agent-my80 and tell it
# to bind to 0.0.0.0:41414. Connect it to channel ch1.
#agent-my80.sources.avro-source1.channels = ch1
#agent-my80.sources.avro-source1.type = avro
#agent-my80.sources.avro-source1.bind = 0.0.0.0
#agent-my80.sources.avro-source1.port = 44444
#agent-my80.sources.avro-source1.basenameHeader = true
agent-my80.sources.exec-source1.channels = ch2
agent-my80.sources.exec-source1.type = exec
agent-my80.sources.exec-source1.command = tail -f /home/www/logs/access.log
# # Define a logger sink that simply logs all events it receives
# # and connect it to the other end of the same channel.
#agent-my80.sinks.hdfs-sink1.channel = ch1
#agent-my80.sinks.hdfs-sink1.type = hdfs
#agent-my80.sinks.hdfs-sink1.hdfs.path = hdfs://my80:9000/flume-test
#agent-my80.sinks.hdfs-sink1.hdfs.filePrefix = event-
#agent-my80.sinks.hdfs-sink1.hdfs.filePrefix = %{basename}
#agent-my80.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = true
#agent-my80.sinks.hdfs-sink1.hdfs.round = true
#agent-my80.sinks.hdfs-sink1.hdfs.roundValue = 10
#agent-my80.sinks.hdfs-sink1.hdfs.fileType = DataStream
agent-my80.sinks.kafka-sink1.channel = ch2
agent-my80.sinks.kafka-sink1.type = org.apache.flume.sink.kafka.KafkaSink
agent-my80.sinks.kafka-sink1.topic = my80-log
agent-my80.sinks.kafka-sink1.brokerList = localhost:9092
agent-my80.sinks.kafka-sink1.batchSize = 20
flume啟動命令
flume-ng agent --conf /usr/local/apache-flume-1.7.0-bin/conf --conf-file /usr/local/apache-flume-1.7.0-bin/conf/flume.conf --name agent-my80 -Dflume.root.logger=INFO,console
zookeeper啟動命令
/usr/local/kafka_2.11-0.10.1.0/bin/zookeeper-server-start.sh /usr/local/kafka_2.11-0.10.1.0/config/zookeeper.properties
kafka啟動命令
/usr/local/kafka_2.11-0.10.1.0/bin/kafka-server-start.sh /usr/local/kafka_2.11-0.10.1.0/config/server.properties
注意:有些朋友励翼,是用自己的個人服務(wù)器做相關(guān)實(shí)踐蜈敢,那么會遇到內(nèi)存不足的問題,這時候一般通過汽抚,修改Java堆大小來解決抓狭。比如我是修改的kafka的kafka-server-start.sh和zookeeper-server-start.sh來解決這個問題。
#export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"
②pyspark從Kafka讀入數(shù)據(jù)造烁,做實(shí)時處理辐宾,并將處理后的數(shù)據(jù)導(dǎo)出到redis隊(duì)列中。
這部分為方便站點(diǎn)解析膨蛮,我對nginx日志格式做了修改叠纹。
該步驟主要是做正則解析+MapReduce+數(shù)據(jù)導(dǎo)入redis,并分別將請求內(nèi)容和請求ip放入redis的list和set敞葛,這樣主要是方便我統(tǒng)計(jì)每天的PV和UV誉察。
還要注意一點(diǎn),nginx日志中包括靜態(tài)文件惹谐,顯然這個不能算UV和PV持偏,所以要過濾。
calculate.py
#coding=utf8
from __future__ import print_function
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import re
import redis
import datetime
# 解析日志
def parse(logstring):
#使用正則解析日志
# regex = '(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}).*ip=\/(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}).*tbl=([a-zA-Z0-9_]+)'
regex = 'ip:(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}).*?time:\[(.*?)\].*?request:\"(.*?)\".*?status_code:(\d{1,3}).*?agent:\"(.*?)\"'
pattern = re.compile(regex)
m1 = pattern.search(str(logstring))
if m1 is not None:
m = m1.groups()
# print(m
if len(m)!=5 or not m[2]:
m= None
else:
hd_list=[u".js",u".css",u".jpg",u".png",u".jpeg",u".gif",u".bmp",u".woff"];
if doStrContainAnyWords(m[2],hd_list):
m= None
else:
m = None
return m
def doStrContainAnyWords(str,words=[]):
for word in words:
if word in str:
return True;
return False;
class RedisClient:
pool = None
def __init__(self):
self.getRedisPool()
def getRedisPool(self):
redisIp='localhost'
redisPort=6379
redisDB=0
self.pool = redis.ConnectionPool(host=redisIp, port=redisPort, db=redisDB)
return self.pool
def addToHashSet(self, key, value):
if self.pool is None:
self.pool = self.getRedisPool()
r = redis.Redis(connection_pool=self.pool)
hashSetName="my80-log-iphash-"+datetime.datetime.now().strftime("%Y-%m-%d");
flag=False;
if r.exists(hashSetName) is False:
flag=True
if r.hexists(hashSetName,str(key)):
r.hincrby(hashSetName, str(key), value)
else:
r.hset(hashSetName, str(key), value)
if flag is True:
r.expire(hashSetName,3600*24+300);
def addToList(self,value):
if self.pool is None:
self.pool = self.getRedisPool()
r = redis.Redis(connection_pool=self.pool)
r.lpush('my80-log-list', value)
if __name__ == '__main__':
zkQuorum = 'localhost:2181'
topic = 'my80-log'
sc = SparkContext("local[2]", appName="kafka_pyspark_redis")
ssc = StreamingContext(sc, 10)
kvs = KafkaUtils.createStream(ssc, zkQuorum, "kafka-streaming-redis", {topic: 1})
#kafka讀取返回的數(shù)據(jù)為tuple,長度為2氨肌,tuple[1]為實(shí)際的數(shù)據(jù)鸿秆,tuple[0]的編碼為Unicode
res = kvs.map(lambda x: x[1]).map(lambda x:parse(x)).filter(lambda x:True if x is not None else False)
items = res.map(lambda item:{"ip":item[0],"time":item[1],"request":item[2],"status_code":item[3],"agent":item[4] } )
# items = res.map(lambda item:{"ip":item[0],"time":item[1] } )
# ipcount = res.map(lambda item:(item[0],1)).reduceByKey(lambda a, b: a+b).map(lambda x:{ x[0]:str(x[1]) } )
ipcount = res.map(lambda item:(item[0],1)).reduceByKey(lambda a, b: a+b)
r = RedisClient()
def handleItem(time,rdd):
if rdd.isEmpty() is False:
for element in rdd.collect():
r.addToList(element)
items.foreachRDD(handleItem)
def ipHandle(time,rdd):
if rdd.isEmpty() is False:
# rddstr = "{"+','.join(rdd.collect())+"}"
for element in rdd.collect():
r.addToHashSet(element[0], element[1] )
ipcount.foreachRDD(ipHandle)
ssc.start()
ssc.awaitTermination()
安裝好spark-2.0.2-bin-hadoop2.7,腳本測試ok怎囚,最后就需要通過spark streaming提交任務(wù)(即提交calculate.py)卿叽。任務(wù)正常執(zhí)行的話,數(shù)據(jù)就會從Kafka導(dǎo)出恳守,經(jīng)處理后考婴,導(dǎo)入redis。
/usr/local/spark-2.0.2-bin-hadoop2.7/bin/spark-submit --jars /usr/local/spark-2.0.2-bin-hadoop2.7/spark-streaming-kafka-0-8-assembly_2.11-2.0.2.jar /home/dcb/python/pyspark/calculate.py
③編寫腳本從redis中取出數(shù)據(jù)催烘,存入mysql沥阱。
這一步相信大家沒問題。
④H5圖表展示
H5圖表實(shí)時展示+github伊群,感興趣的朋友可以看一下考杉。
小結(jié)
Flume+Kafka+Spark,是一個相對比較流行且可行的實(shí)時計(jì)算組合舰始,可定制性較高崇棠,如果項(xiàng)目需求比較復(fù)雜,建議深入了解后進(jìn)行定制開發(fā)蔽午。