1.背景介紹
Flume提供對數(shù)據(jù)進(jìn)行簡單處理蜈出,并寫到各種數(shù)據(jù)接受方(可定制)的能力。Flume有各種自帶的攔截器延欠,比如:TimestampInterceptor陌兑、HostInterceptor等,通過使用不同的攔截器由捎,實現(xiàn)不同的功能兔综。但是以上的這些攔截器,不能改變原有日志數(shù)據(jù)的內(nèi)容或者對日志信息添加一定的處理邏輯,當(dāng)一條日志信息有幾十個甚至上百個字段的時候软驰,在傳統(tǒng)的Flume處理下涧窒,收集到的日志還是會有對應(yīng)這么多的字段,也不能對你想要的字段進(jìn)行對應(yīng)的處理碌宴。
2.自定義攔截器
根據(jù)實際業(yè)務(wù)的需求杀狡,為了更好的滿足數(shù)據(jù)在應(yīng)用層的處理,通過自定義Flume攔截器贰镣,過濾掉不需要的字段呜象,并對指定字段加密處理,將源數(shù)據(jù)進(jìn)行預(yù)處理碑隆。減少了數(shù)據(jù)的傳輸量恭陡,降低了存儲的開銷。
3.功能實現(xiàn)
1.定義一個類CustomParameterInterceptor實現(xiàn)Interceptor接口上煤。
2.在CustomParameterInterceptor類中定義變量休玩,這些變量是需要到 Flume的配置文件中進(jìn)行配置使用的。每一行字段間的分隔符(fields_separator)劫狠、通過分隔符分隔后拴疤,所需要列字段的下標(biāo)(indexs)、多個下標(biāo)使用的分隔符(indexs_separator)独泞、多個下標(biāo)使用的分隔符(indexs_separator)呐矾。
3.添加CustomParameterInterceptor的有參構(gòu)造方法。并對相應(yīng)的變量進(jìn)行處理懦砂。將配置文件中傳過來的unicode編碼進(jìn)行轉(zhuǎn)換為字符串蜒犯。
4.寫具體的要處理的邏輯intercept()方法,一個是單個處理的荞膘,一個是批量處理罚随。
5.接口中定義了一個內(nèi)部接口Builder,在configure方法中羽资,進(jìn)行一些參數(shù)配置淘菩。并給出,在flume的conf中沒配置一些參數(shù)時屠升,給出其默認(rèn)值瞄勾。通過其builder方法,返回一個CustomParameterInterceptor對象弥激。
6.定義一個靜態(tài)類,類中封裝MD5加密方法
7.自定義攔截器的代碼開發(fā)已完成愿阐,然后打包成jar微服, 放到Flume的根目錄下的lib中
8.修改Flume的配置信息
新增配置文件spool-interceptor-hdfs.conf,內(nèi)容為:
a1.channels = c1
a1.sources = r1
a1.sinks = s1
#channel
a1.channels.c1.type = memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=50000
#source
a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/data/
a1.sources.r1.batchSize= 50
a1.sources.r1.inputCharset = UTF-8
a1.sources.r1.interceptors =i1 i2
a1.sources.r1.interceptors.i1.type =cn.itcast.interceptor.CustomParameterInterceptor$Builder
a1.sources.r1.interceptors.i1.fields_separator=\\u0009
a1.sources.r1.interceptors.i1.indexs =0,1,3,5,6
a1.sources.r1.interceptors.i1.indexs_separator =\\u002c
a1.sources.r1.interceptors.i1.encrypted_field_index =0
a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
#sink
a1.sinks.s1.channel = c1
a1.sinks.s1.type = hdfs
a1.sinks.s1.hdfs.path =hdfs://192.168.200.101:9000/flume/%Y%m%d
a1.sinks.s1.hdfs.filePrefix = event
a1.sinks.s1.hdfs.fileSuffix = .log
a1.sinks.s1.hdfs.rollSize = 10485760
a1.sinks.s1.hdfs.rollInterval =20
a1.sinks.s1.hdfs.rollCount = 0
a1.sinks.s1.hdfs.batchSize = 1500
a1.sinks.s1.hdfs.round = true
a1.sinks.s1.hdfs.roundUnit = minute
a1.sinks.s1.hdfs.threadsPoolSize = 25
a1.sinks.s1.hdfs.useLocalTimeStamp = true
a1.sinks.s1.hdfs.minBlockReplicas = 1
a1.sinks.s1.hdfs.fileType =DataStream
a1.sinks.s1.hdfs.writeFormat = Text
a1.sinks.s1.hdfs.callTimeout = 60000
a1.sinks.s1.hdfs.idleTimeout =60
9啟動:
bin/flume-ng agent -c conf -f conf/spool-interceptor-hdfs.conf -name a1 -Dflume.root.logger=DEBUG,console