三要素(Source/Channel/Sink)
-
Source:負(fù)責(zé)接收數(shù)據(jù)到flume的組件
-
1.Netcat:基于TCP端口的數(shù)據(jù)源接收器
# 配置Agent中的三要素 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置Source部分 a1.sources.r1.type = netcat a1.sources.r1.bind = 192.0.0.2 a1.sources.r1.port = 44444 # 配置Sink部分 a1.sinks.k1.type = logger # 配置Channel部分 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 綁定相關(guān)組件 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
- logger4j配置文件log4j.properties(控制臺(tái)輸出+文件輸出)
# 設(shè)置Logger的日志級(jí)別為INFO顺献,同時(shí)增加兩個(gè)日志輸出項(xiàng)叫A1,A2. log4j.rootLogger=INFO, A1, A2 # A1這個(gè)設(shè)置項(xiàng)被配置為控制臺(tái)輸出 ConsoleAppender. log4j.appender.A1=org.apache.log4j.ConsoleAppender # A1 輸出項(xiàng)的輸出格式. log4j.appender.A1.layout=org.apache.log4j.PatternLayout log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n # A2 這個(gè)配置項(xiàng)的設(shè)置旗国,文件輸出 log4j.appender.A2=org.apache.log4j.FileAppender # 設(shè)置日志的文件名 log4j.appender.A2.File=./logs/log.out # 定義輸出的日志格式 log4j.appender.A2.layout=org.apache.log4j.PatternLayout log4j.appender.A2.layout.conversionPattern=%m%n
啟動(dòng)程序
flume-ng agent --name a1 --conf-file /root/trainging/flume-1.9.0/conf/example1.conf --conf /root/trainging/flume-1.9.0/conf/
發(fā)送數(shù)據(jù)
telnet 192.0.0.2 44444
-
2.Exec:基于命令行標(biāo)準(zhǔn)輸出來(lái)產(chǎn)生數(shù)據(jù)的數(shù)據(jù)源接收器
# 配置Agent中的三要素 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置Source部分 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /root/data/flume/access.log # 配置Sink部分 a1.sinks.k1.type = logger # 配置Channel部分 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 綁定相關(guān)組件 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
logger4j配置文件log4j.properties(控制臺(tái)輸出+文件輸出):配置同上
啟動(dòng)程序
flume-ng agent --name a1 --conf-file /root/trainging/flume-1.9.0/conf/example2.conf --conf /root/trainging/flume-1.9.0/conf/
發(fā)送數(shù)據(jù)
echo hello >> /root/data/flume/access.log
-
3.avro:高擴(kuò)展的RPC數(shù)據(jù)源(最常用)
# 配置Agent中的三要素 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置Source部分 a1.sources.r1.type = avro a1.sources.r1.bind = 192.0.0.2 a1.sources.r1.port = 44444 # 配置Sink部分 a1.sinks.k1.type = logger # 配置Channel部分 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 綁定相關(guān)組件 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
logger4j配置文件log4j.properties(控制臺(tái)輸出+文件輸出):配置同上
啟動(dòng)程序
flume-ng agent --name a1 --conf-file /root/trainging/flume-1.9.0/conf/example3.conf --conf /root/trainging/flume-1.9.0/conf/
發(fā)送端Java代碼
package pkg01; import java.nio.charset.Charset; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.api.RpcClient; import org.apache.flume.api.RpcClientFactory; import org.apache.flume.event.EventBuilder; public class class1 { public static void main(String[] args) throws EventDeliveryException { String ip = "192.0.0.2"; int port = 44444; RpcClient client = RpcClientFactory.getDefaultInstance(ip, port); // Avro // RpcClient client = RpcClientFactory.getThriftInstance(ip, port); // Thrift Event event = EventBuilder.withBody("hello flume!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!", Charset.forName("UTF8")); client.append(event); client.close(); } }
-
-
Channel:位于Source和Sink之間的緩沖塊,允許Source和Sink運(yùn)行在不同速率上
-
1.MemoryChannel:建立在內(nèi)存中的通道注整,數(shù)據(jù)存儲(chǔ)在JVM的堆上
- 允許數(shù)據(jù)少量丟失可使用內(nèi)存通道能曾,不允許數(shù)據(jù)丟失可使用文件通道
-
內(nèi)存通道支持事務(wù)特性,如下所示:
image.png
# 配置Agent中的三要素 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置Source部分 a1.sources.r1.type = netcat a1.sources.r1.bind = 192.0.0.2 a1.sources.r1.port = 44444 # 配置Sink部分 a1.sinks.k1.type = logger # 配置Channel部分 # 設(shè)置內(nèi)存通道 a1.channels.c1.type = memory # 可以存最大10萬(wàn)個(gè)事件event a1.channels.c1.capacity = 100000 # 每個(gè)事務(wù)可以存取最大100個(gè)事件event a1.channels.c1.transactionCapacity = 100 # 內(nèi)存通道大小為500MB a1.channels.c1.byteCapacity = 500000000 # 其中10%存放頭文件肿轨,500MB*10% a1.channels.c1.byteCapacityBufferPercentage = 10 # 綁定相關(guān)組件 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
-
-
Sink:從channel中獲取數(shù)據(jù)并推送給下一級(jí)Flume Agent或者存儲(chǔ)數(shù)據(jù)到指定位置
-
Sink組概念:
image.png -
Sink優(yōu)化:
image.png -
Sink事務(wù)特性:
image.png - 一般配置:
# sink類型 a1.sinks.k1.type = hdfs # hdfs目錄 a1.sinks.k1.hdfs.path = /user/hduser/logs/data_%Y-%m-%d # 文件前綴 a1.sinks.k1.hdfs.filePrefix = retail # 文件后綴 a1.sinks.k1.hdfs.fileSuffix = .txt # 60秒或128兆或100個(gè)事件寿冕,則關(guān)閉文件 a1.sinks.k1.hdfs.rollInterval = 60 a1.sinks.k1.hdfs.rollSize = 128000000 a1.sinks.k1.hdfs.rollCount = 100 # 30秒沒(méi)數(shù)據(jù)寫(xiě)入則關(guān)閉文件 a1.sinks.k1.hdfs.idleTimeout= 30 # 最多打開(kāi)100個(gè)文件 a1.sinks.k1.hdfs.maxOpenFiles= 30 # 文件不壓縮 # a1.sinks.k1.hdfs.fileType = DataStream # 文件壓縮,snappy方式壓縮 a1.sinks.k1.hdfs.fileType = CompressedStream a1.sinks.k1.hdfs.codeC = snappy # 使用本地時(shí)間戳 a1.sinks.k1.hdfs.useLocalTimeStamp = true # 每10分鐘寫(xiě)到一個(gè)bucket(目錄) a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.roundValue= 10
-
-
攔截器:工作在Source和Channel之間萝招,在Source接收到數(shù)據(jù)后蚂斤,攔截器基于自定義的規(guī)則刪除或轉(zhuǎn)換相關(guān)事件存捺,如果一個(gè)鏈路上存在多個(gè)攔截器槐沼,將按順序依次執(zhí)行
-
主機(jī)攔截器:Agent將主機(jī)IP或主機(jī)名添加到事件的報(bào)文頭中,事件報(bào)文頭使用hostHeader配置捌治,默認(rèn)是host
a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = host # 是否覆蓋原有的值 a1.sources.r1.interceptors.i1.preserveExisting = true # 默認(rèn)是使用IP岗钩,這里不使用IP a1.sources.r1.interceptors.i1.useIP = false # 使用主機(jī)名 a1.sources.r1.interceptors.i1.hostHeader = hostname
image.png
image.png -
時(shí)間攔截器(常用):事件報(bào)文頭帶有timestamp鍵,可以方便HDFS Sink進(jìn)行分桶
a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = timestamp
image.png -
靜態(tài)攔截器:簡(jiǎn)單的將固定報(bào)文的KV對(duì)插入到報(bào)文事件中
a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = static a1.sources.r1.interceptors.i1.key = Zone a1.sources.r1.interceptors.i1.value = NEW_YORK
image.png -
UUID攔截器:通過(guò)攔截器給每個(gè)事件添加唯一標(biāo)識(shí)符UUID
a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
image.png-
正則過(guò)濾攔截器:選擇性保留數(shù)據(jù)
a1.sources.r1.interceptors = i1 i2 a1.sources.r1.interceptors.i1.type = regex_filter a1.sources.r1.interceptors.i1.regex = .*info* # 保留此類事件 a1.sources.r1.interceptors.i1.excludeEvents = false a1.sources.r1.interceptors.i2.type = regex_filter a1.sources.r1.interceptors.i2.regex = .*data3* # 排除此類事件 a1.sources.r1.interceptors.i2.excludeEvents = true
image.png -
自定義攔截器:自定義的攔截器肖油,java實(shí)現(xiàn)
Map<String, String> headers = new HashMap<String, String>(); headers.put("ClientServer", "Client01srv"); List<Event> events = new ArrayList<Event>(); events.add(EventBuilder.withBody("info ", Charset.forName("UTF8"), headers)); RpcClient client = RpcClientFactory.getDefaultInstance(ip, port); // Avro client.appendBatch(events); client.close();
image.png
-
-
-
通道選擇器:Channel處理器選擇哪些事件進(jìn)入哪個(gè)Channel
-
復(fù)制Channel選擇器:復(fù)制Source過(guò)來(lái)的數(shù)據(jù)到每個(gè)Channel中
# 配置Agent中的三要素 a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # 配置Source部分 a1.sources.r1.type = avro a1.sources.r1.bind = 10.0.1.213 a1.sources.r1.port = 44444 # 配置Sink部分 a1.sinks.k1.type = logger a1.sinks.k2.type = logger # 配置Channel部分 a1.channels.c1.type = memory a1.channels.c1.capacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 100 # 綁定相關(guān)組件 a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2
-
多路復(fù)用Channel選擇器:根據(jù)事件頭數(shù)據(jù)中key對(duì)應(yīng)的value來(lái)選擇此事件進(jìn)入哪個(gè)Channel
# 配置Agent中的三要素 a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # 配置Source部分 a1.sources.r1.type = avro a1.sources.r1.bind = 10.0.1.213 a1.sources.r1.port = 44444 a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = op # op為1兼吓,事件進(jìn)入 c1 Channel a1.sources.r1.selector.mapping.1 = c1 # op為2,事件進(jìn)入 c2 Channel a1.sources.r1.selector.mapping.2 = c2 # op為其他值森枪,事件進(jìn)入 c2 Channel a1.sources.r1.selector.default = c2 # 配置Sink部分 a1.sinks.k1.type = logger a1.sinks.k2.type = logger # 配置Channel部分 a1.channels.c1.type = memory a1.channels.c1.capacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 100 # 綁定相關(guān)組件 a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2
-
-
Sink組:多個(gè)Sink組成一個(gè)Sink組视搏,可用于負(fù)載均衡和故障轉(zhuǎn)移
-
負(fù)載均衡:多條鏈路都發(fā)送數(shù)據(jù),不保證數(shù)據(jù)順序
image.png# 配置Sink部分 a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true #a1.sinkgroups.g1.processor.selector = random a1.sinkgroups.g1.processor.selector = round_robin a1.sinks.k1.type = logger a1.sinks.k2.type = logger
-
故障轉(zhuǎn)移:一個(gè)鏈路壞了县袱,其他鏈路會(huì)按照優(yōu)先級(jí)起來(lái)頂替工作浑娜,保證數(shù)據(jù)順序
image.png# 配置Sink部分 a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10 a1.sinkgroups.g1.processor.maxpenalty = 10000 #a1.sinks.k1.type = logger a1.sinks.k2.type = logger a1.sinks.k1.type = avro a1.sinks.k1.hostname = localhost a1.sinks.k1.port = 44444
-