離線數(shù)據(jù)分析平臺實戰(zhàn)——150Flume介紹
Nginx介紹
Nginx是一款輕量級的Web 服務(wù)器/反向代理服務(wù)器及電子郵件(IMAP/POP3)代理服務(wù)器嘿歌。
其特點是占有內(nèi)存少,并發(fā)能力強诡壁,事實上nginx的并發(fā)能力確實在同類型的網(wǎng)頁服務(wù)器中表現(xiàn)較好债沮。
一般情況下,我們會將nginx服務(wù)器作為一個靜態(tài)資源的訪問容器峻仇。
Nginx安裝步驟
Nginx安裝步驟如下:(使用yum命令安裝)
- 使用root用戶登錄俄讹。
- 查看nginx信息哆致,命令:yum info nginx.
- 如果查看nginx信息提示nginx找不到,那么可以通過修改rpm源來進(jìn)行后續(xù)步驟颅悉,執(zhí)行命令:rpm -ivh http://nginx.org/packages/centos/6/noarch/RPMS/nginx-release-centos-6-0.el6.ngx.noarch.rpm
- 在查看nginx信息沽瞭。
- 安裝,命令: yum install nginx剩瓶。在安裝過程中直接輸入y驹溃。
- 啟動nginx,命令:service nginx start
- 訪問http://192.168.0.120查看nginx的web頁面延曙。
Flume介紹
Flume是Apache基金會組織的一個提供的高可用的豌鹤,高可靠的,分布式的海量日志采集枝缔、聚合和傳輸?shù)南到y(tǒng)布疙,
Flume支持在日志系統(tǒng)中定制各類數(shù)據(jù)發(fā)送方蚊惯,用于收集數(shù)據(jù);
同時灵临,F(xiàn)lume提供對數(shù)據(jù)進(jìn)行簡單處理截型,并寫到各種數(shù)據(jù)接受方(可定制)的能力。
當(dāng)前Flume有兩個版本儒溉,
Flume0.9x版本之前的統(tǒng)稱為Flume-og宦焦,
Flume1.X版本被統(tǒng)稱為Flume-ng。
參考文檔:http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.5.0-cdh5.3.6/FlumeUserGuide.html
Flume-og和Flume-ng的區(qū)別
主要區(qū)別如下:
- Flume-og中采用master結(jié)構(gòu)顿涣,為了保證數(shù)據(jù)的一致性波闹,引入zookeeper進(jìn)行管理。Flume-ng中取消了集中master機(jī)制和zookeeper管理機(jī)制涛碑,變成了一個純粹的傳輸工具精堕。
- Flume-ng中采用不同的線程進(jìn)行數(shù)據(jù)的讀寫操作;在Flume-og中蒲障,讀數(shù)據(jù)和寫數(shù)據(jù)是由同一個線程操作的歹篓,如果寫出比較慢的話,可能會阻塞flume的接收數(shù)據(jù)的能力揉阎。
Flume結(jié)構(gòu)
Flume中以Agent為基本單位滋捶,一個agent可以包括source、channel余黎、sink,三種組件都可以有多個载萌。
其中source組件主要功能是接收外部數(shù)據(jù)惧财,并將數(shù)據(jù)傳遞到channel中;
sink組件主要功能是發(fā)送flume接收到的數(shù)據(jù)到目的地扭仁;
channel的主要作用就是數(shù)據(jù)傳輸和保存的一個作用垮衷。
Flume主要分為三類結(jié)構(gòu):單agent結(jié)構(gòu)、多agent鏈?zhǔn)浇Y(jié)構(gòu)和多路復(fù)用agent結(jié)構(gòu)乖坠。
單agent結(jié)構(gòu)
多agent鏈?zhǔn)浇Y(jié)構(gòu)
多路復(fù)用agent結(jié)構(gòu)
Source介紹
Source的主要作用是接收客戶端發(fā)送的數(shù)據(jù)搀突,并將數(shù)據(jù)發(fā)送到channel中,source和channel之間的關(guān)系是多對多關(guān)系熊泵,不過一般情況下使用一個source對應(yīng)多個channel仰迁。
通過名稱區(qū)分不同的source。
Flume常用source有:Avro Source顽分、Thrift Source徐许、Exec Source、Kafka Source卒蘸、Netcat Source等雌隅。
設(shè)置格式如下:
<agent-name>.sources.<source_name>.type=指定類型
<agent-name>.sources.<source_name>.channels=channels
.... 其他對應(yīng)source類型需要的參數(shù)
Channel介紹
Channel的主要作用是提供一個數(shù)據(jù)傳輸通道,提供數(shù)據(jù)傳輸和數(shù)據(jù)存儲(可選)等功能。
source將數(shù)據(jù)放到channel中恰起,sink從channel中拿數(shù)據(jù)修械。
通過不同的名稱來區(qū)分channel。
Flume常用channel有:Memory Channel检盼、JDBC Channel肯污、Kafka Channel、File Channel等梯皿。設(shè)置格式如下:
<agent-name>.channels.<channel_name>.type=指定類型
.... 其他對應(yīng)channel類型需要的參數(shù)
Sink介紹
Sink的主要作用是定義數(shù)據(jù)寫出方式仇箱,一般情況下sink從channel中獲取數(shù)據(jù),然后將數(shù)據(jù)寫出到file东羹、hdfs或者網(wǎng)絡(luò)上剂桥。
channel和sink之間的關(guān)系是一對多的關(guān)系。
通過不同的名稱來區(qū)分sink属提。
Flume常用sink有:Hdfs Sink权逗、Hive Sink、File Sink冤议、HBase Sink斟薇、Avro Sink、Thrift Sink恕酸、Logger Sink等堪滨。
設(shè)置格式如下:
<agent-name>.sinks.<sink_name1>.type=指定類型
<agent-name>.sinks.<sink_name1>.channel=<channe_name>
.... 其他對應(yīng)sink類型需要的參數(shù)
Flume安裝
安裝步驟如下:
- 下載flume:wget http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.5.0-cdh5.3.6.tar.gz
- 解壓flume。
- 修改conf/flume-env.sh文件蕊温,如果沒有就新建一個袱箱。
- 添加flume的bin目錄到環(huán)境變量中去。
- 驗證是否安裝成功, flume-ng version
Flume案例1
使用netcat source監(jiān)聽客戶端的請求义矛,使用memory channel作為數(shù)據(jù)的傳輸通道发笔,使用logger sink打印監(jiān)聽到的信息。
步驟:
- 在conf文件夾中建立test1.conf,里面是agent的配置凉翻。
- 啟動flume-ng agent --conf ./conf/ --conf-file ./conf/test.conf --name a1 -Dflume.root.logger=INFO,console了讨。
- 使用telenet命令連接機(jī)器,命令:telnet 192.168.100.120 4444
- 輸入信息查看是否成功制轰。
test1.conf
a1.sources=r1
a1.channels=c1
a1.sinks=k1
a1.sources.r1.type=netcat
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=4444
a1.sources.r1.channels=c1
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.sinks.k1.type=logger
a1.sinks.k1.channel=c1
Flume案例2
Nginx作為日志服務(wù)器前计,通過exec source監(jiān)聽nginx的日志文件,使用memory channel作為數(shù)據(jù)傳輸通道艇挨,使用hdfs sink將數(shù)據(jù)存儲到hdfs上残炮。
項目用到的配置文件
nginx.conf
user nginx;
worker_processes 1;
error_log /var/log/nginx/error.log warn;
pid /var/run/nginx.pid;
events {
worker_connections 1024;
}
http {
include /etc/nginx/mime.types;
default_type application/octet-stream;
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
log_format log_format '$remote_addr^A$msec^A$http_host^A$request_uri';
sendfile on;
keepalive_timeout 65;
#include /etc/nginx/conf.d/*.conf;
server {
listen 80;
server_name hh 0.0.0.0;
location ~ .*(BfImg)\.(gif)$ {
default_type image/gif;
access_log /home/hadoop/access.log log_format;
root /etc/nginx/www/source;
}
}
}
test2.conf
agent.sources = r1
agent.sinks = k1
agent.channels = c1
## common
agent.sources.r1.channels = c1
agent.sinks.k1.channel = c1
## sources config
agent.sources.r1.type = exec
agent.sources.r1.command = tail -F /home/hadoop/access.log
## channels config
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 1000
agent.channels.c1.byteCapacityBufferPercentage = 20
agent.channels.c1.byteCapacity = 1000000
agent.channels.c1.keep-alive = 60
#sinks config
agent.sinks.k1.type = hdfs
agent.sinks.k1.channel = c1
agent.sinks.k1.hdfs.path = hdfs://hh:8020/logs/%m/%d
agent.sinks.k1.hdfs.fileType = DataStream
agent.sinks.k1.hdfs.filePrefix = BF-%H
agent.sinks.k1.hdfs.fileSuffix=.log
agent.sinks.k1.hdfs.minBlockReplicas=1
agent.sinks.k1.hdfs.rollInterval=3600
agent.sinks.k1.hdfs.rollSize=132692539
agent.sinks.k1.hdfs.idleTimeout=10
agent.sinks.k1.hdfs.batchSize = 1
agent.sinks.k1.hdfs.rollCount=0
agent.sinks.k1.hdfs.round = true
agent.sinks.k1.hdfs.roundValue = 2
agent.sinks.k1.hdfs.roundUnit = minute
agent.sinks.k1.hdfs.useLocalTimeStamp = true
復(fù)雜的配置方法
agent.sources = r1
agent.sinks = k1 k2
agent.channels = c1
## common
agent.sources.r1.channels = c1
agent.sinks.k1.channel = c1
agent.sinks.k2.channel = c1
## sources config
agent.sources.r1.type = exec
agent.sources.r1.command = tail -F /home/hadoop/access.log
agent.sources.r1.selector.type= replicating
## channels config
agent.channels.c1.type = memory
agent.channels.c1.capacity = 100000000
agent.channels.c1.transactionCapacity = 10000000
agent.channels.c1.byteCapacityBufferPercentage = 60
agent.channels.c1.byteCapacity = 12800000000
agent.channels.c1.keep-alive = 60
#sinks config
agent.sinks.k1.type = hdfs
agent.sinks.k1.channel = c1
agent.sinks.k1.hdfs.path = hdfs://server30-244:8020/logs/%Y/%m/%d
agent.sinks.k1.hdfs.fileType = DataStream
agent.sinks.k1.hdfs.filePrefix = BF-%H
agent.sinks.k1.hdfs.fileSuffix=.log
agent.sinks.k1.hdfs.minBlockReplicas=1
agent.sinks.k1.hdfs.rollInterval=3600
agent.sinks.k1.hdfs.rollSize=132692539
agent.sinks.k1.hdfs.idleTimeout=600
agent.sinks.k1.hdfs.batchSize = 100
agent.sinks.k1.hdfs.rollCount=0
agent.sinks.k1.hdfs.round = true
agent.sinks.k1.hdfs.roundValue = 2
agent.sinks.k1.hdfs.roundUnit = minute
agent.sinks.k1.hdfs.useLocalTimeStamp = true
agent.sinks.k2.type = hdfs
agent.sinks.k2.channel = c1
agent.sinks.k2.hdfs.path = hdfs://server30-99:8020/agentLog/%Y/%m/%d
agent.sinks.k2.hdfs.fileType = DataStream
agent.sinks.k1.hdfs.filePrefix = ub-%H
agent.sinks.k1.hdfs.fileSuffix=.log
agent.sinks.k2.hdfs.minBlockReplicas=1
agent.sinks.k2.hdfs.rollInterval=3600
agent.sinks.k2.hdfs.rollSize=132692539
agent.sinks.k2.hdfs.idleTimeout=600
agent.sinks.k2.hdfs.batchSize = 100
agent.sinks.k2.hdfs.rollCount=0
agent.sinks.k2.hdfs.round = true
agent.sinks.k2.hdfs.roundValue = 2
agent.sinks.k2.hdfs.roundUnit = minute
agent.sinks.k2.hdfs.useLocalTimeStamp = true
#####sink groups
agent.sinkgroups=g1
agent.sinkgroups.g1.sinks=k1 k2
agent.sinkgroups.g1.processor.type= failover
agent.sinkgroups.g1.processor.priority.k1=10
agent.sinkgroups.g1.processor.priority.k2=5
agent.sinkgroups.g1.processor.maxpenalty=10000