flume分布式數(shù)據(jù)采集工具概述及安裝配置、使用

背景

我們前面一直在寫處理程序须床、sql去處理數(shù)據(jù)铐料,大家都知道我們要處理和分析的數(shù)據(jù)是存在hdfs分布式文件存儲系統(tǒng)當(dāng)中的

但這些數(shù)據(jù)并不是一開始就存儲在hdfs當(dāng)中的,有些數(shù)據(jù)在業(yè)務(wù)系統(tǒng)的機(jī)器上,有些數(shù)據(jù)在日志系統(tǒng)的機(jī)器上

這就要求我們能夠?qū)?shù)據(jù)從業(yè)務(wù)系統(tǒng)的機(jī)器上給收集過來钠惩,而且后面我們實(shí)踐后續(xù)項(xiàng)目時(shí)也要求能夠?qū)?shù)據(jù)進(jìn)行采集(不然數(shù)據(jù)從哪來柒凉?)

比如說我們需要分析用戶的行為習(xí)慣,通過分析了解用戶的喜好等篓跛,從而為公司的運(yùn)營指導(dǎo)方向膝捞,對用戶進(jìn)行精準(zhǔn)投放廣告或者推薦,提高公司產(chǎn)品的轉(zhuǎn)化率

再或者通過用戶的行為 來優(yōu)化公司內(nèi)部產(chǎn)品的研發(fā)

那需要分析用戶的行為習(xí)慣愧沟,用戶產(chǎn)生的行為蔬咬,肯定是某個(gè)動作,某個(gè)行為沐寺,觸發(fā)林艘,比如用戶在提交訂單這個(gè)步驟上停留的時(shí)間非常長

這些用戶的行為,肯定來源于業(yè)務(wù)系統(tǒng)芽丹,那數(shù)據(jù)肯定存儲在業(yè)務(wù)系統(tǒng)的服務(wù)器上

我們不可能直接自己在服務(wù)器上寫個(gè)java程序北启,請求hdfs客戶端卜朗,采集日志數(shù)據(jù)完之后拔第,再上傳到hdfs上,一來我們不能保證程序的健壯性

比如程序崩了怎么辦场钉?數(shù)據(jù)傳輸安全嗎蚊俺?數(shù)據(jù)丟失了怎么辦?二來逛万,我們自己寫太麻煩了

這就用到了分布式數(shù)據(jù)采集工具----------flume

特點(diǎn)

1)高可用(級聯(lián)模式下泳猬,一臺下游agent崩掉之后,可以立即切換另外一臺agent使用)

2)分布式(可以在多臺服務(wù)器上采集數(shù)據(jù))宇植,可以采集文件得封,socket數(shù)據(jù)包(網(wǎng)絡(luò)端口)、文件夾指郁、kafka忙上、mysql數(shù)據(jù)庫等各種形式源數(shù)據(jù)

3)可存儲、匯聚到大數(shù)據(jù)生態(tài)的各種存儲系統(tǒng)中(hdfs闲坎、hbase疫粥、hive、kafka)可以將采集到的數(shù)據(jù)(下沉sink)輸出到HDFS腰懂、hbase梗逮、hive、kafka等眾多外部存儲系統(tǒng)中

4)配置簡單绣溜,開箱即用慷彤!一般的采集、傳輸需求,通過對flume的簡單配置即可實(shí)現(xiàn)底哗;不用開發(fā)一行代碼贷屎!

5)良好的擴(kuò)展功能,F(xiàn)lume針對特殊場景也具備良好的自定義擴(kuò)展能力艘虎,因此唉侄,flume可以適用于大部分的日常數(shù)據(jù)采集場景

業(yè)務(wù)系統(tǒng)-日志服務(wù)器集群-日志采集-示意圖如下

小知識點(diǎn):埋點(diǎn)

我們知道,用戶行為的記錄野建,肯定是用戶自己做了某種行為之后属划,才會記錄下來。那么怎么記錄候生,又記錄在哪呢同眯?

舉個(gè)例子,web項(xiàng)目唯鸭,用戶通過淘寶購物须蜗,點(diǎn)擊購物車,觸發(fā)了網(wǎng)頁html中js設(shè)置的程序代碼目溉,這個(gè)就叫埋點(diǎn)代碼

觸發(fā)之后明肮,代碼將向服務(wù)器端進(jìn)行請求,服務(wù)器端將用戶行為缭付,通過打日志或者其他方式記錄到本地磁盤中

核心概念

1)agent

Flume中最核心的角色是agent柿估,flume采集系統(tǒng)就是由一個(gè)個(gè)agent連接起來所形成的一個(gè)或簡單或復(fù)雜的數(shù)據(jù)傳輸通道。

對于每一個(gè)Agent來說,它就是一個(gè)獨(dú)立的守護(hù)進(jìn)程(JVM),它負(fù)責(zé)從數(shù)據(jù)源接收數(shù)據(jù)陷猫,并發(fā)往下一個(gè)目的地秫舌,如下圖所示:

Agent的3個(gè)組件的設(shè)計(jì)思想,主要考慮的是:

source和sink之間解耦合绣檬,以及異步操作足陨;

每一個(gè)agent相當(dāng)于一個(gè)數(shù)據(jù)(被封裝成Event對象)傳遞員,內(nèi)部有3個(gè)核心組件:

Source:采集組件娇未,用于跟數(shù)據(jù)源對接墨缘,以獲取數(shù)據(jù);它有各種各樣的內(nèi)置實(shí)現(xiàn)忘蟹;

Sink:下沉組件飒房,用于往下一級agent傳遞數(shù)據(jù)或者向最終存儲系統(tǒng)傳遞數(shù)據(jù)

Channel:傳輸通道組件,用于從source將數(shù)據(jù)傳遞到sink

2)Event

數(shù)據(jù)在channel中的封裝形式媚值;

Source組件在獲取到原始數(shù)據(jù)后狠毯,需要封裝成Event放入channel;

Sink組件從channel中取出Event后褥芒,需要根據(jù)目標(biāo)存儲的需求嚼松,轉(zhuǎn)成其他形式的數(shù)據(jù)輸出

Event封裝對象主要有兩部分組成:?Headers和 ?Body

header是一個(gè)集合 ?Map[String,String]嫡良,用于攜帶一些KV形式的元數(shù)據(jù)(標(biāo)志、描述等)

body: 就是一個(gè)字節(jié)數(shù)組byte[]献酗;裝載具體的數(shù)據(jù)內(nèi)容

3)interceptor 攔截器

攔截器寝受,就是為用戶提供添加數(shù)據(jù)處理邏輯的可能性

攔截器工作在source組件之后,source產(chǎn)生的event會被傳入攔截器根據(jù)需要進(jìn)行攔截處理

而且罕偎,攔截器可以組成攔截器鏈很澄!

攔截器在flume中有一些內(nèi)置的功能比較常用的攔截器

用戶也可以根據(jù)自己的數(shù)據(jù)處理需求,自己開發(fā)自定義攔截器颜及!

這也是flume的一個(gè)可以用來自定義擴(kuò)展的接口甩苛!

4)級聯(lián)串聯(lián)(一般下游agent會使用高可用模式,有一個(gè)處于待機(jī)或者未工作狀態(tài))

4)事務(wù)機(jī)制

數(shù)據(jù)傳輸?shù)娜齻€(gè)語義:

At least once? 至少傳輸數(shù)據(jù)完整一次(不會丟失數(shù)據(jù)俏站,但可能產(chǎn)生重復(fù)傳輸)

At most once? 至多傳輸數(shù)據(jù)完整一次(可能一次都不會成功讯蒲,可能會丟失數(shù)據(jù))

Exactly once? ?數(shù)據(jù)不丟失且不重復(fù) 實(shí)現(xiàn)完美傳輸

Flume并沒有實(shí)現(xiàn)Exactly once!但可以實(shí)現(xiàn)at least once!?因?yàn)镋xactly once確實(shí)比較難實(shí)現(xiàn)肄扎!

Flume使用兩個(gè)獨(dú)立的事務(wù)

put操作:source讀取數(shù)據(jù)源并寫入event到channel

take操作:sink從channel中獲取event并寫出到目標(biāo)存儲

事務(wù)實(shí)現(xiàn)的核心點(diǎn)是:記錄狀態(tài)墨林!

比如source,會記錄自己完成拉取成功數(shù)據(jù)的偏移量

另外還有些其他的概念犯祠,以后碰到了再說旭等,這里不再贅述。



安裝配置

flume的安裝賊簡單雷则,只需要導(dǎo)個(gè)包辆雾,配置些文件即可!

1)解壓

將flume的安裝包上傳到linux01上之后月劈,解壓到/opt/apps下面

2)寫配置文件

接下來我們通過案例,一邊配置一邊實(shí)踐

案例實(shí)踐

需求:

現(xiàn)在需要對服務(wù)器磁盤某個(gè)文件下的數(shù)據(jù)進(jìn)行采集藤乙,數(shù)據(jù)是用戶的行為數(shù)據(jù)猜揪,數(shù)據(jù)中含有時(shí)間,采集完成之后坛梁,我需要按照時(shí)間而姐,具體到天為單位文件夾存放在hdfs中

思考:

1)因?yàn)閒lume的agent從用戶觸發(fā)時(shí)間開始到收集----存入channel-----由sink讀出來下沉到hdfs中,這個(gè)過程肯定有時(shí)間延遲划咐,假如某個(gè)用戶某次行為觸發(fā)事件的時(shí)間為2021-1-8? 23:59:59? 拴念,而存入hdfs端的時(shí)間也是用的本機(jī)服務(wù)器的時(shí)間的話,很明顯褐缠,按照天單位文件夾存政鼠,它就存到1月9號去了,是不可行的队魏,所以我們得記錄用戶觸發(fā)事件的時(shí)間公般,最好是連帶著用戶的行為中,一起被收集到source中

2)既然時(shí)間是在用戶行為數(shù)據(jù)當(dāng)中,那么我們可以設(shè)置攔截器官帘,對數(shù)據(jù)進(jìn)行提取瞬雹,分析,再存入到event的header中(不能存入body中刽虹,因?yàn)槟繕?biāo)數(shù)據(jù)具體是什么格式的酗捌,我們并不知道,但是header是一個(gè)hashmap)涌哲,因?yàn)閒lume自己提供的攔截器 并沒有能完成我們這種需求的意敛,所以需要自定義攔截器

先來寫攔截器:

新建maven代碼,導(dǎo)入flume依賴包膛虫,自定義攔截器草姻,繼承flume的interceptor

代碼如下:

package cn.study.demo01;

import org.apache.flume.Context;

import org.apache.flume.Event;

import org.apache.flume.interceptor.Interceptor;

import java.util.List;

/**

* @author:tom

* @Date:Created in 16:15 2021/1/8

*/

public class EventStampInterceptor implements Interceptor {

? ? String split_by;

? ? Integer ts_index;

? ? public EventStampInterceptor(String split_by, Integer ts_index) {

? ? ? ? this.split_by = split_by;

? ? ? ? this.ts_index = ts_index;

? ? }

? ? /**

? ? * 初始化方法,在正式調(diào)用攔截邏輯之前稍刀,會先調(diào)用一次

? ? */

? ? public void initialize() {

? ? }

? ? /**

? ? * 攔截的處理邏輯所在方法

? ? * 假設(shè)撩独,我們要采集的數(shù)據(jù),格式如下:

? ? * id,name,timestamp,devicetype,event

? ? */

? ? public Event intercept(Event event) {

? ? ? ? byte[] body = event.getBody();

? ? ? ? String line = new String(body);

? ? ? ? String timeStamp = line.split(split_by)[ts_index];

? ? ? ? event.getHeaders().put("timestamp", timeStamp);

? ? ? ? return event;

? ? }

? ? public List<Event> intercept(List<Event> list) {

? ? ? ? for (Event event : list) {

? ? ? ? ? ? intercept(event);

? ? ? ? }

? ? ? ? return list;

? ? }

? ? /**

? ? * 關(guān)閉清理方法账月,在銷毀該攔截器實(shí)例之前综膀,會調(diào)用一次

? ? */

? ? public void close() {

? ? }

? ? //builder? 構(gòu)建自定義攔截器對象的

? ? public static class EventStampInterceptorBuilder implements Interceptor.Builder {

? ? ? ? String split_by;

? ? ? ? Integer ts_index;

? ? ? ? public Interceptor build() {

? ? ? ? ? ? return new EventStampInterceptor(split_by, ts_index);

? ? ? ? }

? ? ? ? //可以獲取到配置文件中的對象

? ? ? ? public void configure(Context context) {

? ? ? ? ? ? split_by = context.getString("split_by");

? ? ? ? ? ? ts_index = context.getInteger("ts_index", 2);

? ? ? ? }

? ? }

}

采用級聯(lián)模式,使用三臺linux作為agent局齿,其中l(wèi)inux03作為下游agent剧劝,linux01和linux02作為上游agent

下游linux03的配置文件:

a1.sources = r1

a1.channels = c1

a1.sinks = k1

a1.sources.r1.channels = c1

a1.sources.r1.type = avro

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 41414

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 200

a1.sinks.k1.channel = c1

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = hdfs://linux01:8020/doit19_0108/%Y-%m-%d/

a1.sinks.k1.hdfs.filePrefix = DoitEduData

a1.sinks.k1.hdfs.fileSuffix = .log

a1.sinks.k1.hdfs.rollInterval = 60

a1.sinks.k1.hdfs.rollSize = 268435456

a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k1.hdfs.batchSize = 100

a1.sinks.k1.hdfs.useLocalTimeStamp = false

上游linux01的配置文件:

a1.sources = r1

a1.channels = c1

a1.sinks = k1

a1.sources.r1.channels = c1

a1.sources.r1.type = TAILDIR

a1.sources.r1.batchSize = 100

a1.sources.r1.filegroups = g1

a1.sources.r1.filegroups.g1 = /home/a.log

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = cn.study.demo01.EventStampInterceptor$EventStampInterceptorBuilder

a1.sources.r1.interceptors.i1.split_by = ,

a1.sources.r1.interceptors.i1.ts_index = 2

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 200

a1.sinks.k1.channel = c1

a1.sinks.k1.type = avro

a1.sinks.k1.hostname = linux03

a1.sinks.k1.port = 41414

a1.sinks.k1.batch-size = 100

上游linux02的配置文件:

a1.sources = r1

a1.channels = c1

a1.sinks = k1

a1.sources.r1.channels = c1

a1.sources.r1.type =TAILDIR

a1.sources.r1.filegroups = g1

a1.sources.r1.filegroups.g1 = /home/a.log

a1.sources.r1.batchSize = 100

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = cn.study.demo01.EventStampInterceptor$EventStampInterceptorBuilder

a1.sources.r1.interceptors.i1.split_by = ,

a1.sources.r1.interceptors.i1.ts_index = 2

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 200

a1.sinks.k1.channel = c1

a1.sinks.k1.type = avro

a1.sinks.k1.hostname = linux03

a1.sinks.k1.port = 41414

a1.sinks.k1.batch-size = 100

為了制造新產(chǎn)生的數(shù)據(jù),我寫了個(gè)腳本抓歼,并輸出到/home/a.log 中

先執(zhí)行腳本讥此,使其不斷產(chǎn)生數(shù)據(jù)

啟動下游agent

linux03:bin/flume-ng agent -c conf/ -f myagentconf/exec-m-hdfs-xiayou.conf ?-n a1 -Dflume.root.logger=INFO,console

啟動上游兩個(gè)agent

linux02:

bin/flume-ng agent -c conf/ -f myagentconf/exec-m-shangyou.conf -n a1 -Dflume.root.logger=INFO,console

linux01:

?bin/flume-ng agent -c conf/ -f agent.conf/exec-m-shangyou.conf -n a1 -Dflume.root.logger=INFO,console

可以看到,成功采集并才hdfs上分好了文件夾谣妻,和存入了相應(yīng)的數(shù)據(jù)

注意點(diǎn):

1)上游兩個(gè)采集數(shù)據(jù)的agent的type使用的是TAILDIR? ? 而不是exec 萄喳,因?yàn)門AILDIR? ?會記錄已經(jīng)采集數(shù)據(jù)的偏移量,能保證數(shù)據(jù)不丟失蹋半,一般我們使用的也是這個(gè)TAILDIR? 他巨,但是記得要指定filegroups

2)?必須要先啟動下游的linux03,因?yàn)閘inux03作為下游服務(wù)器端agent减江,是接收上游客戶端nagent的數(shù)據(jù)請求染突,通過網(wǎng)絡(luò)將上游的數(shù)據(jù)拉取過來,必須先啟動下游辈灼,才能啟動上游的agent份企,下游的agent通過監(jiān)聽啟動服務(wù)的端口號,看是否有數(shù)據(jù)sink(上游agent的sink)傳輸過來茵休,進(jìn)而進(jìn)行工作薪棒,上游agent的sink需要指定端口號手蝎,即下游agent 啟動之后的resoure的端口號

3)上游的agent的sink端口號為下游agent的source的端口號;上游agent的sink的type和下游的source的type都為avro俐芯,這是一種跨平臺棵介、跨語言的序列化方式

4)上游agent的sink 主機(jī)名記得寫linux03,寫錯(cuò)了幾次

5)linux02和linux03要配置hadoop的環(huán)境變量

6)配置文件吧史,粘貼的時(shí)候邮辽,要提前按i或者o進(jìn)入插入模式,否則將消耗粘貼內(nèi)容中的關(guān)鍵字

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末贸营,一起剝皮案震驚了整個(gè)濱河市吨述,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌钞脂,老刑警劉巖揣云,帶你破解...
    沈念sama閱讀 219,539評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異冰啃,居然都是意外死亡邓夕,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,594評論 3 396
  • 文/潘曉璐 我一進(jìn)店門阎毅,熙熙樓的掌柜王于貴愁眉苦臉地迎上來焚刚,“玉大人,你說我怎么就攤上這事扇调】蠊荆” “怎么了?”我有些...
    開封第一講書人閱讀 165,871評論 0 356
  • 文/不壞的土叔 我叫張陵狼钮,是天一觀的道長碳柱。 經(jīng)常有香客問我,道長燃领,這世上最難降的妖魔是什么士聪? 我笑而不...
    開封第一講書人閱讀 58,963評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮猛蔽,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘灵寺。我一直安慰自己曼库,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,984評論 6 393
  • 文/花漫 我一把揭開白布略板。 她就那樣靜靜地躺著毁枯,像睡著了一般。 火紅的嫁衣襯著肌膚如雪叮称。 梳的紋絲不亂的頭發(fā)上种玛,一...
    開封第一講書人閱讀 51,763評論 1 307
  • 那天藐鹤,我揣著相機(jī)與錄音,去河邊找鬼赂韵。 笑死娱节,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的祭示。 我是一名探鬼主播肄满,決...
    沈念sama閱讀 40,468評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼质涛!你這毒婦竟也來了稠歉?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,357評論 0 276
  • 序言:老撾萬榮一對情侶失蹤汇陆,失蹤者是張志新(化名)和其女友劉穎怒炸,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體毡代,經(jīng)...
    沈念sama閱讀 45,850評論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡阅羹,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,002評論 3 338
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了月趟。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片灯蝴。...
    茶點(diǎn)故事閱讀 40,144評論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖孝宗,靈堂內(nèi)的尸體忽然破棺而出穷躁,到底是詐尸還是另有隱情,我是刑警寧澤因妇,帶...
    沈念sama閱讀 35,823評論 5 346
  • 正文 年R本政府宣布问潭,位于F島的核電站,受9級特大地震影響婚被,放射性物質(zhì)發(fā)生泄漏狡忙。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,483評論 3 331
  • 文/蒙蒙 一址芯、第九天 我趴在偏房一處隱蔽的房頂上張望灾茁。 院中可真熱鬧,春花似錦谷炸、人聲如沸北专。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,026評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽拓颓。三九已至,卻和暖如春描孟,著一層夾襖步出監(jiān)牢的瞬間驶睦,已是汗流浹背砰左。 一陣腳步聲響...
    開封第一講書人閱讀 33,150評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留场航,地道東北人缠导。 一個(gè)月前我還...
    沈念sama閱讀 48,415評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像旗闽,于是被迫代替她去往敵國和親酬核。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,092評論 2 355

推薦閱讀更多精彩內(nèi)容