背景
我們前面一直在寫處理程序须床、sql去處理數(shù)據(jù)铐料,大家都知道我們要處理和分析的數(shù)據(jù)是存在hdfs分布式文件存儲系統(tǒng)當(dāng)中的
但這些數(shù)據(jù)并不是一開始就存儲在hdfs當(dāng)中的,有些數(shù)據(jù)在業(yè)務(wù)系統(tǒng)的機(jī)器上,有些數(shù)據(jù)在日志系統(tǒng)的機(jī)器上
這就要求我們能夠?qū)?shù)據(jù)從業(yè)務(wù)系統(tǒng)的機(jī)器上給收集過來钠惩,而且后面我們實(shí)踐后續(xù)項(xiàng)目時(shí)也要求能夠?qū)?shù)據(jù)進(jìn)行采集(不然數(shù)據(jù)從哪來柒凉?)
比如說我們需要分析用戶的行為習(xí)慣,通過分析了解用戶的喜好等篓跛,從而為公司的運(yùn)營指導(dǎo)方向膝捞,對用戶進(jìn)行精準(zhǔn)投放廣告或者推薦,提高公司產(chǎn)品的轉(zhuǎn)化率
再或者通過用戶的行為 來優(yōu)化公司內(nèi)部產(chǎn)品的研發(fā)
那需要分析用戶的行為習(xí)慣愧沟,用戶產(chǎn)生的行為蔬咬,肯定是某個(gè)動作,某個(gè)行為沐寺,觸發(fā)林艘,比如用戶在提交訂單這個(gè)步驟上停留的時(shí)間非常長
這些用戶的行為,肯定來源于業(yè)務(wù)系統(tǒng)芽丹,那數(shù)據(jù)肯定存儲在業(yè)務(wù)系統(tǒng)的服務(wù)器上
我們不可能直接自己在服務(wù)器上寫個(gè)java程序北启,請求hdfs客戶端卜朗,采集日志數(shù)據(jù)完之后拔第,再上傳到hdfs上,一來我們不能保證程序的健壯性
比如程序崩了怎么辦场钉?數(shù)據(jù)傳輸安全嗎蚊俺?數(shù)據(jù)丟失了怎么辦?二來逛万,我們自己寫太麻煩了
這就用到了分布式數(shù)據(jù)采集工具----------flume
1)高可用(級聯(lián)模式下泳猬,一臺下游agent崩掉之后,可以立即切換另外一臺agent使用)
2)分布式(可以在多臺服務(wù)器上采集數(shù)據(jù))宇植,可以采集文件得封,socket數(shù)據(jù)包(網(wǎng)絡(luò)端口)、文件夾指郁、kafka忙上、mysql數(shù)據(jù)庫等各種形式源數(shù)據(jù)
3)可存儲、匯聚到大數(shù)據(jù)生態(tài)的各種存儲系統(tǒng)中(hdfs闲坎、hbase疫粥、hive、kafka)可以將采集到的數(shù)據(jù)(下沉sink)輸出到HDFS腰懂、hbase梗逮、hive、kafka等眾多外部存儲系統(tǒng)中
4)配置簡單绣溜,開箱即用慷彤!一般的采集、傳輸需求,通過對flume的簡單配置即可實(shí)現(xiàn)底哗;不用開發(fā)一行代碼贷屎!
5)良好的擴(kuò)展功能,F(xiàn)lume針對特殊場景也具備良好的自定義擴(kuò)展能力艘虎,因此唉侄,flume可以適用于大部分的日常數(shù)據(jù)采集場景
業(yè)務(wù)系統(tǒng)-日志服務(wù)器集群-日志采集-示意圖如下:
小知識點(diǎn):埋點(diǎn)
我們知道,用戶行為的記錄野建,肯定是用戶自己做了某種行為之后属划,才會記錄下來。那么怎么記錄候生,又記錄在哪呢同眯?
舉個(gè)例子,web項(xiàng)目唯鸭,用戶通過淘寶購物须蜗,點(diǎn)擊購物車,觸發(fā)了網(wǎng)頁html中js設(shè)置的程序代碼目溉,這個(gè)就叫埋點(diǎn)代碼
觸發(fā)之后明肮,代碼將向服務(wù)器端進(jìn)行請求,服務(wù)器端將用戶行為缭付,通過打日志或者其他方式記錄到本地磁盤中
1)agent
Flume中最核心的角色是agent柿估,flume采集系統(tǒng)就是由一個(gè)個(gè)agent連接起來所形成的一個(gè)或簡單或復(fù)雜的數(shù)據(jù)傳輸通道。
對于每一個(gè)Agent來說,它就是一個(gè)獨(dú)立的守護(hù)進(jìn)程(JVM),它負(fù)責(zé)從數(shù)據(jù)源接收數(shù)據(jù)陷猫,并發(fā)往下一個(gè)目的地秫舌,如下圖所示:
Agent的3個(gè)組件的設(shè)計(jì)思想,主要考慮的是:
source和sink之間解耦合绣檬,以及異步操作足陨;
每一個(gè)agent相當(dāng)于一個(gè)數(shù)據(jù)(被封裝成Event對象)傳遞員,內(nèi)部有3個(gè)核心組件:
Source:采集組件娇未,用于跟數(shù)據(jù)源對接墨缘,以獲取數(shù)據(jù);它有各種各樣的內(nèi)置實(shí)現(xiàn)忘蟹;
Sink:下沉組件飒房,用于往下一級agent傳遞數(shù)據(jù)或者向最終存儲系統(tǒng)傳遞數(shù)據(jù)
Channel:傳輸通道組件,用于從source將數(shù)據(jù)傳遞到sink
2)Event
數(shù)據(jù)在channel中的封裝形式媚值;
Source組件在獲取到原始數(shù)據(jù)后狠毯,需要封裝成Event放入channel;
Sink組件從channel中取出Event后褥芒,需要根據(jù)目標(biāo)存儲的需求嚼松,轉(zhuǎn)成其他形式的數(shù)據(jù)輸出
Event封裝對象主要有兩部分組成:?Headers和 ?Body
header是一個(gè)集合 ?Map[String,String]嫡良,用于攜帶一些KV形式的元數(shù)據(jù)(標(biāo)志、描述等)
body: 就是一個(gè)字節(jié)數(shù)組byte[]献酗;裝載具體的數(shù)據(jù)內(nèi)容
3)interceptor 攔截器
攔截器寝受,就是為用戶提供添加數(shù)據(jù)處理邏輯的可能性
攔截器工作在source組件之后,source產(chǎn)生的event會被傳入攔截器根據(jù)需要進(jìn)行攔截處理
而且罕偎,攔截器可以組成攔截器鏈很澄!
攔截器在flume中有一些內(nèi)置的功能比較常用的攔截器
用戶也可以根據(jù)自己的數(shù)據(jù)處理需求,自己開發(fā)自定義攔截器颜及!
這也是flume的一個(gè)可以用來自定義擴(kuò)展的接口甩苛!
4)級聯(lián)串聯(lián)(一般下游agent會使用高可用模式,有一個(gè)處于待機(jī)或者未工作狀態(tài))
4)事務(wù)機(jī)制
數(shù)據(jù)傳輸?shù)娜齻€(gè)語義:
At least once? 至少傳輸數(shù)據(jù)完整一次(不會丟失數(shù)據(jù)俏站,但可能產(chǎn)生重復(fù)傳輸)
At most once? 至多傳輸數(shù)據(jù)完整一次(可能一次都不會成功讯蒲,可能會丟失數(shù)據(jù))
Exactly once? ?數(shù)據(jù)不丟失且不重復(fù) 實(shí)現(xiàn)完美傳輸
Flume并沒有實(shí)現(xiàn)Exactly once!但可以實(shí)現(xiàn)at least once!?因?yàn)镋xactly once確實(shí)比較難實(shí)現(xiàn)肄扎!
Flume使用兩個(gè)獨(dú)立的事務(wù):
put操作:source讀取數(shù)據(jù)源并寫入event到channel
take操作:sink從channel中獲取event并寫出到目標(biāo)存儲
事務(wù)實(shí)現(xiàn)的核心點(diǎn)是:記錄狀態(tài)墨林!
比如source,會記錄自己完成拉取成功數(shù)據(jù)的偏移量
另外還有些其他的概念犯祠,以后碰到了再說旭等,這里不再贅述。
flume的安裝賊簡單雷则,只需要導(dǎo)個(gè)包辆雾,配置些文件即可!
1)解壓
將flume的安裝包上傳到linux01上之后月劈,解壓到/opt/apps下面
2)寫配置文件
接下來我們通過案例,一邊配置一邊實(shí)踐
需求:
現(xiàn)在需要對服務(wù)器磁盤某個(gè)文件下的數(shù)據(jù)進(jìn)行采集藤乙,數(shù)據(jù)是用戶的行為數(shù)據(jù)猜揪,數(shù)據(jù)中含有時(shí)間,采集完成之后坛梁,我需要按照時(shí)間而姐,具體到天為單位文件夾存放在hdfs中
思考:
1)因?yàn)閒lume的agent從用戶觸發(fā)時(shí)間開始到收集----存入channel-----由sink讀出來下沉到hdfs中,這個(gè)過程肯定有時(shí)間延遲划咐,假如某個(gè)用戶某次行為觸發(fā)事件的時(shí)間為2021-1-8? 23:59:59? 拴念,而存入hdfs端的時(shí)間也是用的本機(jī)服務(wù)器的時(shí)間的話,很明顯褐缠,按照天單位文件夾存政鼠,它就存到1月9號去了,是不可行的队魏,所以我們得記錄用戶觸發(fā)事件的時(shí)間公般,最好是連帶著用戶的行為中,一起被收集到source中
2)既然時(shí)間是在用戶行為數(shù)據(jù)當(dāng)中,那么我們可以設(shè)置攔截器官帘,對數(shù)據(jù)進(jìn)行提取瞬雹,分析,再存入到event的header中(不能存入body中刽虹,因?yàn)槟繕?biāo)數(shù)據(jù)具體是什么格式的酗捌,我們并不知道,但是header是一個(gè)hashmap)涌哲,因?yàn)閒lume自己提供的攔截器 并沒有能完成我們這種需求的意敛,所以需要自定義攔截器
先來寫攔截器:
新建maven代碼,導(dǎo)入flume依賴包膛虫,自定義攔截器草姻,繼承flume的interceptor
代碼如下:
package cn.study.demo01;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.List;
/**
* @author:tom
* @Date:Created in 16:15 2021/1/8
*/
public class EventStampInterceptor implements Interceptor {
? ? String split_by;
? ? Integer ts_index;
? ? public EventStampInterceptor(String split_by, Integer ts_index) {
? ? ? ? this.split_by = split_by;
? ? ? ? this.ts_index = ts_index;
? ? }
? ? /**
? ? * 初始化方法,在正式調(diào)用攔截邏輯之前稍刀,會先調(diào)用一次
? ? */
? ? public void initialize() {
? ? }
? ? /**
? ? * 攔截的處理邏輯所在方法
? ? * 假設(shè)撩独,我們要采集的數(shù)據(jù),格式如下:
? ? * id,name,timestamp,devicetype,event
? ? */
? ? public Event intercept(Event event) {
? ? ? ? byte[] body = event.getBody();
? ? ? ? String line = new String(body);
? ? ? ? String timeStamp = line.split(split_by)[ts_index];
? ? ? ? event.getHeaders().put("timestamp", timeStamp);
? ? ? ? return event;
? ? }
? ? public List<Event> intercept(List<Event> list) {
? ? ? ? for (Event event : list) {
? ? ? ? ? ? intercept(event);
? ? ? ? }
? ? ? ? return list;
? ? }
? ? /**
? ? * 關(guān)閉清理方法账月,在銷毀該攔截器實(shí)例之前综膀,會調(diào)用一次
? ? */
? ? public void close() {
? ? }
? ? //builder? 構(gòu)建自定義攔截器對象的
? ? public static class EventStampInterceptorBuilder implements Interceptor.Builder {
? ? ? ? String split_by;
? ? ? ? Integer ts_index;
? ? ? ? public Interceptor build() {
? ? ? ? ? ? return new EventStampInterceptor(split_by, ts_index);
? ? ? ? }
? ? ? ? //可以獲取到配置文件中的對象
? ? ? ? public void configure(Context context) {
? ? ? ? ? ? split_by = context.getString("split_by");
? ? ? ? ? ? ts_index = context.getInteger("ts_index", 2);
? ? ? ? }
? ? }
}
采用級聯(lián)模式,使用三臺linux作為agent局齿,其中l(wèi)inux03作為下游agent剧劝,linux01和linux02作為上游agent
下游linux03的配置文件:
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 200
a1.sinks.k1.channel = c1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://linux01:8020/doit19_0108/%Y-%m-%d/
a1.sinks.k1.hdfs.filePrefix = DoitEduData
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.rollInterval = 60
a1.sinks.k1.hdfs.rollSize = 268435456
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.batchSize = 100
a1.sinks.k1.hdfs.useLocalTimeStamp = false
上游linux01的配置文件:
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.batchSize = 100
a1.sources.r1.filegroups = g1
a1.sources.r1.filegroups.g1 = /home/a.log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = cn.study.demo01.EventStampInterceptor$EventStampInterceptorBuilder
a1.sources.r1.interceptors.i1.split_by = ,
a1.sources.r1.interceptors.i1.ts_index = 2
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 200
a1.sinks.k1.channel = c1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = linux03
a1.sinks.k1.port = 41414
a1.sinks.k1.batch-size = 100
上游linux02的配置文件:
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.channels = c1
a1.sources.r1.type =TAILDIR
a1.sources.r1.filegroups = g1
a1.sources.r1.filegroups.g1 = /home/a.log
a1.sources.r1.batchSize = 100
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = cn.study.demo01.EventStampInterceptor$EventStampInterceptorBuilder
a1.sources.r1.interceptors.i1.split_by = ,
a1.sources.r1.interceptors.i1.ts_index = 2
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 200
a1.sinks.k1.channel = c1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = linux03
a1.sinks.k1.port = 41414
a1.sinks.k1.batch-size = 100
為了制造新產(chǎn)生的數(shù)據(jù),我寫了個(gè)腳本抓歼,并輸出到/home/a.log 中
先執(zhí)行腳本讥此,使其不斷產(chǎn)生數(shù)據(jù)
啟動下游agent
linux03:bin/flume-ng agent -c conf/ -f myagentconf/exec-m-hdfs-xiayou.conf ?-n a1 -Dflume.root.logger=INFO,console
啟動上游兩個(gè)agent
linux02:
bin/flume-ng agent -c conf/ -f myagentconf/exec-m-shangyou.conf -n a1 -Dflume.root.logger=INFO,console
linux01:
?bin/flume-ng agent -c conf/ -f agent.conf/exec-m-shangyou.conf -n a1 -Dflume.root.logger=INFO,console
可以看到,成功采集并才hdfs上分好了文件夾谣妻,和存入了相應(yīng)的數(shù)據(jù)
注意點(diǎn):
1)上游兩個(gè)采集數(shù)據(jù)的agent的type使用的是TAILDIR? ? 而不是exec 萄喳,因?yàn)門AILDIR? ?會記錄已經(jīng)采集數(shù)據(jù)的偏移量,能保證數(shù)據(jù)不丟失蹋半,一般我們使用的也是這個(gè)TAILDIR? 他巨,但是記得要指定filegroups
2)?必須要先啟動下游的linux03,因?yàn)閘inux03作為下游服務(wù)器端agent减江,是接收上游客戶端nagent的數(shù)據(jù)請求染突,通過網(wǎng)絡(luò)將上游的數(shù)據(jù)拉取過來,必須先啟動下游辈灼,才能啟動上游的agent份企,下游的agent通過監(jiān)聽啟動服務(wù)的端口號,看是否有數(shù)據(jù)sink(上游agent的sink)傳輸過來茵休,進(jìn)而進(jìn)行工作薪棒,上游agent的sink需要指定端口號手蝎,即下游agent 啟動之后的resoure的端口號
3)上游的agent的sink端口號為下游agent的source的端口號;上游agent的sink的type和下游的source的type都為avro俐芯,這是一種跨平臺棵介、跨語言的序列化方式
4)上游agent的sink 主機(jī)名記得寫linux03,寫錯(cuò)了幾次
5)linux02和linux03要配置hadoop的環(huán)境變量
6)配置文件吧史,粘貼的時(shí)候邮辽,要提前按i或者o進(jìn)入插入模式,否則將消耗粘貼內(nèi)容中的關(guān)鍵字