默認(rèn)命名如下:
a1.sources = r1
a1.sinks = k1
a1.channels = c1 c2 c3 c4
agent_name:a1 ? ? source_name:r1 ? ? channel_name:c1 ? ? sink_name:k1
1刁卜、Interceptors(sources)
攔截器的作用范圍是數(shù)據(jù)源到source之間,主要是為了給數(shù)據(jù)添加headers曙咽,最常用的是timestamp蛔趴、host、static例朱。timestamp類型可以配合hdfs sink的文件輸出的日期格式(hdfs sink也可以用hdfs.useLocalTimeStamp來當(dāng)時間戳孝情,但是不能代表數(shù)據(jù)的產(chǎn)生時間)∪鬣停可以用host和static來做數(shù)據(jù)來源界定箫荡,既可以用來標(biāo)記數(shù)據(jù),也可以用來做數(shù)據(jù)分發(fā)的時候使用(配合Channel Selectors)烁竭。但是默認(rèn)的Interceptor只能針對web打標(biāo)簽菲茬,不能針對event打標(biāo)簽吉挣。如果需要對數(shù)據(jù)進(jìn)行標(biāo)志派撕,需要修改代碼才能實現(xiàn),比如avro? source可以修改org.apache.flume.clients.log4jappender.Log4jAppender的實現(xiàn)來增加一些數(shù)據(jù)上的處理睬魂。
2终吼、Channel Selectors
選擇器的作用范圍是source到channel之間,主要是為了確定每個Event到哪個channel氯哮。最常用的類型是replicating和multiplexing际跪,replicating是默認(rèn)的,能夠把Event分別復(fù)制到每個channel喉钢。multiplexing可以根據(jù)Event的headers里的key的不同的值把數(shù)據(jù)分發(fā)到不同的channel姆打。
3、Sink Processors
處理器的作用范圍是channel到sink之間肠虽,主要是為了確定Events到哪個sink幔戏。默認(rèn)的是default,最常用的是failover(故障轉(zhuǎn)移)和load_balance(負(fù)載均衡)税课。
4闲延、Flume sources
avro(監(jiān)聽端口):
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444
a1.sources.r1.ipFilter = true #開啟ip黑白名單認(rèn)證,多個驗證順序執(zhí)行,或關(guān)系韩玩,只要滿足一個即驗證結(jié)束
a1.sources.r1.ipFilterRules = allow:ip:127.*,allow:name:localhost,deny:ip:*#這個意思是阻止任何ip訪問垒玲,僅允許ip127開頭和localhost通過
#攔截器部分,其他類型source組件一樣
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i2.type = host
a1.sources.r1.interceptors.i3.type = static
a1.sources.r1.interceptors.i3.key = test_key
a1.sources.r1.interceptors.i3.value = test_value
#選擇器部分找颓,其他類型source組件一樣
a1.sources.r1.selector.type=multiplexing
a1.sources.r1.selector.header=state
a1.sources.r1.selector.mapping.CZ=c1
a1.sources.r1.selector.mapping.US=c2 c3
a1.sources.r1.selector.default=c4
exec(常用量監(jiān)控文件):
a1.sources.r1.type=exec
a1.sources.r1.command=tail -F /var/log/secure
a1.sources.r1.batchSize=20 #批處理條數(shù)合愈,每次上傳數(shù)據(jù)的條數(shù),不到不傳
a1.sources.r1.batchTimeout=3000 #等待數(shù)據(jù)時間(單位毫秒),如果在時間內(nèi)沒有收集到20條數(shù)據(jù)也需要傳輸數(shù)據(jù)
spooldir(監(jiān)控目錄):
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/hadoop/app/flume-1.6.0-cdh5.14.0-bin/tmp/
a1.sources.r1.fileSuffix = .org
a1.sources.r1.deletePolicy = never #讀完刪除與否佛析,默認(rèn)不刪除(never)妇汗,immediate立即刪除
a1.sources.r1.fileHeader = true
a1.sources.r1.fileHeaderKey = ruoze_file #headers里添加ruoze_file=文件全路徑
a1.sources.r1.basenameHeader = true
a1.sources.r1.basenameHeaderKey = ruoze_base #headers里添加ruoze_base=文件名
a1.sources.r1.trackerDir = /home/hadoop/app/flume-1.6.0-cdh5.14.0-bin/flumespool
TAILDIR(多目錄的文件監(jiān)控):
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.positionFile = /home/hadoop/app/flume-1.6.0-cdh5.14.0-bin/positions/taildir_position.json
a1.sources.r1.filegroups.f1 = /home/hadoop/app/flume-1.6.0-cdh5.14.0-bin/tmp
a1.sources.r1.headers.f1.ruoze = TAILDIR_f1
a1.sources.r1.byteOffsetHeader = true #字節(jié)偏移量放到headers
a1.sources.r1.writePosInterval = 3000 #偏移量記錄時間間隔
a1.sources.r1.filegroups.f2 = /home/hadoop/app/flume-1.6.0-cdh5.14.0-bin/flumespool
a1.sources.r1.headers.f2.ruoze2 = TAILDIR_f2
netcat(監(jiān)聽端口):
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444
a1.sources.r1.max-line-length = 1024 #一行的最大長度,超出長度會報錯并斷掉連接
a1.sources.r1.ack-every-event = true #應(yīng)答OK給每個Event
5说莫、Flume channels
memory(內(nèi)存管道):
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000#緩存最大Event條數(shù)
a1.channels.c1.transactionCapacity = 100#事物包括的Event最大條數(shù)杨箭,source和sink的batchSize需要小于該值
file(文件管道):
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = #監(jiān)測點文件路徑
a1.channels.c1.useDualCheckpoints = true #監(jiān)測點文件是否備份
a1.channels.c1.backupCheckpointDir = #監(jiān)測點文件備份路徑
a1.channels.c1.dataDirs = #數(shù)據(jù)存儲目錄
6、Flume? sinks
logger(控制臺打印信息):
a1.sinks.k1.type = logger
flume-ng啟動時需要加上:-Dflume.root.logger=INFO,console
avro(往avro? source推送數(shù)據(jù)):
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost #目標(biāo)ip
a1.sinks.k1.port = 44444 #目標(biāo)端口
HDFS(寫數(shù)據(jù)到HDFS):
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.205.131:9000/data/%Y%m%d%H%M? #這個時間的格式需要時間戳來進(jìn)行轉(zhuǎn)換储狭,需要headers里有timestamp
a1.sinks.k1.hdfs.filePrefix = app_name #文件前綴
a1.sinks.k1.hdfs.fileSuffix = .log #文件后綴
a1.sinks.k1.hdfs.inUseSuffix = .tmp #正在寫入文件后綴
a1.sinks.k1.hdfs.rollInterval = 30 #臨時文件轉(zhuǎn)正式文件的時間互婿,單位秒,如果為0辽狈,則不限制慈参,和rollSize、rollCount為或關(guān)系
a1.sinks.k1.hdfs.rollSize = 10485760 #臨時文件轉(zhuǎn)正式文件的大小刮萌,單位byte驮配,如果為0,則不限制
a1.sinks.k1.hdfs.rollCount = 100000 #臨時文件轉(zhuǎn)正式文件記錄條數(shù)着茸,如果為0壮锻,則不限制
a1.sinks.k1.hdfs.round = true #是否啟用時間上的”舍棄”。如果啟用涮阔,則會影響除了%t的其他所有時間表達(dá)式
a1.sinks.k1.hdfs.roundValue = 10#時間上進(jìn)行“舍棄”的值猜绣,這里是舍棄10分鐘以內(nèi)的值,意味著每10分鐘新建一個目錄
a1.sinks.k1.hdfs.roundUnit = minute #時間上進(jìn)行”舍棄”的單位敬特,包含:second,minute,hour
a1.sinks.k1.hdfs.useLocalTimeStamp = true #如果source里沒有timestamp時間戳掰邢,則需要該參數(shù)為true