前言:flume通過使用Interceptors(攔截器)實現(xiàn)修改和過濾事件的功能棋傍。舉個栗子救拉,一個網站每天產生海量數(shù)據(jù),但是可能會有很多數(shù)據(jù)是不完整的(缺少重要字段)舍沙,或冗余的近上,如果不對這些數(shù)據(jù)進行特殊處理,那么會降低系統(tǒng)的效率拂铡。這時候攔截器就派上用場了壹无。
一、flume內置的攔截器
先列個flume內置攔截器的表:
????由于攔截器一般針對Event的Header進行處理感帅,那我先介紹一Event吧斗锭。Event結構如下圖:
- event是flume中處理消息的基本單元,由零個或者多個header和正文body組成失球。
- Header 是 key/value 形式的岖是,可以用來制造路由決策或攜帶其他結構化信息(如事件的時間戳或事件來源的服務器主機名)帮毁。你可以把它想象成和 HTTP 頭一樣提供相同的功能——通過該方法來傳輸正文之外的額外信息。
- Body是一個字節(jié)數(shù)組,包含了實際的內容。
- flume提供的不同source會給其生成的event添加不同的header韵丑。
????這些攔截器實際都是往Event的header里插數(shù)據(jù)背捌,比如Timestamp Interceptor攔截器就是可以往event的header中插入關鍵詞為timestamp的時間戳伯铣。
1.1 timestamp攔截器
下面是官網的timestamp的配置:
a1.sources = r1
a1.channels = c1
a1.sources.r1.channels = c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
測試例子:
????在flume的conf目錄下新建timestamp.conf文件,輸入以下代碼:
#配置文件:timestamp.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 50000
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.preserveExisting= false
a1.sources.r1.interceptors.i1.type = timestamp
# Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
# Use a channel which buffers events inmemory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
????打開一個終端,進入flume的bin目錄,輸入./flume-ng agent -c ../conf -f ../conf/timestamp.conf -Dflume.root.logger=INFO,console -n a1
????啟動成功后灯抛,flume開始監(jiān)控本主機上的所有IP地址,再開一個終端音瓷,向50000端口發(fā)TCP數(shù)據(jù)对嚼,命令如下:echo "TimestampInterceptor" | nc 127.0.0.1 50000
。
????此時绳慎,flume的終端會接收到這個消息纵竖,如下:
我們可以清楚地看到header中有timestamp的影子。成功了杏愤。
1.2 host攔截器
????再說一個host interceptor磨确,該攔截器可以往event的header中插入關鍵詞默認為host的主機名或者ip地址(注意是agent運行的機器的主機名或者ip地址)。官網配置如下:
a1.sources = r1
a1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host
測試例子
#配置文件:host.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 50000
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i2
# a1.sources.r1.interceptors.i1.preserveExisting= false
# a1.sources.r1.interceptors.i1.type =timestamp
a1.sources.r1.interceptors.i2.type = host
a1.sources.r1.interceptors.i2.hostHeader =hostname
a1.sources.r1.interceptors.i2.useIP = false
# Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
# Use a channel which buffers events inmemory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
????進入flume的bin目錄声邦,執(zhí)行./flume-ng agent -c ../conf -f ../conf/host.conf -Dflume.root.logger=INFO,console -n a1
然后在新的終端輸入echo "HostInterceptor" | nc 127.0.0.1 50000
????結果如下:
????可以發(fā)現(xiàn)header中增加了一個hostname的值:192.168.1.105。使用ifconfig命令摆舟,正是我電腦的IP地址亥曹,因為flume的agent運行在這個IP上嘛。
1.3 Regex Filtering Interceptor攔截器
????還有比較重要的Regex Filtering Interceptor恨诱,Regex Filtering Interceptor攔截器用于過濾事件媳瞪,篩選出與配置的正則表達式相匹配的事件≌毡Γ可以用于包含事件和排除事件蛇受。常用于數(shù)據(jù)清洗,通過正則表達式把數(shù)據(jù)過濾出來厕鹃。
測試例子:
#配置文件:regex_filter.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 50000
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type =regex_filter
#全部是數(shù)字的數(shù)據(jù)
a1.sources.r1.interceptors.i1.regex =^[0-9]*$
#排除符合正則表達式的數(shù)據(jù)
a1.sources.r1.interceptors.i1.excludeEvents =true
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events inmemory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
????這樣的配置將會過濾掉所有純數(shù)字的數(shù)據(jù)兢仰。
????進入bin目錄啟動flume,執(zhí)行./flume-ng agent -c ../conf -f ../conf/regex_filter.conf -Dflume.root.logger=INFO,console -n a1
????新開終端輸入
echo "1234" | nc 127.0.0.1 50000
echo "1234" | nc 127.0.0.1 50000
echo "1234" | nc 127.0.0.1 50000
????在flume的終端會輸出如下內容剂碴,但是沒有數(shù)據(jù)把将,因為被我們過濾掉了。
二忆矛、總結
????本文介紹了flume中的interceptor攔截器察蹲,并針對具體的三個攔截器進行了測試,驗證其功能。后面我們還會編寫符合自己需求的自定義攔截器洽议。敬請期待宗收。