- 常用的source
1.1 nettcat
1.2 Avro Source
1.3 Exec Source
1.4 spool Source
1.5 HTTP source - 常用的sink
2.1 HDFS Sink
2.2 Avro Sink - Channel Selector
3.1 Replicating Channel Selector
3.2 Multiplexing Channel Selector - Sink Processor
4.1 Failover Sink Processor
4.2 Load balancing Sink Processor - Interceptor
5.1 Timestamp Interceptor
5.2 static Interceptor
常用的source
netcat
config
a1.sources = r1
a1.sinks = s1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sinks.s1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.s1.channel = c1
start
flume-ng agent -n a1 -c conf -f xxx.conf
test
telnet 127.0.0.1 44444
hello world!
Avro Source
config
a1.sources = r1
a1.sinks = s1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind =192.168.137.2
a1.sources.r1.port = 44444
a1.sinks.s1.type = logger
a1.sinks.s1.channel = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
start
flume-ng avro-client -c . -H localhost -p 44444 -F ~/bbb
test
avro-client發(fā)送文件:
echo "aaa" > ~/bbb
Exec Source
config
a1.sources = r1
a1.sinks = s1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail –f aaa
a1.sources.r1.channels = c1
a1.sinks.s1.type = logger
a1.sinks.s1.channel = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
test
echo "exec test" >> aaa
spool Source
config
a1.sources = r1
a1.sinks = s1
a1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = aaa
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1
a1.sinks.s1.type = logger
a1.sinks.s1.channel = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
HTTP source
config
a1.sources = r1
a1.sinks = s1
a1.channels = c1
a1.sources.r1.type = http # org.apache.flume.source.http.HTTPSource
a1.sources.r1.port = 44444
a1.sources.r1.channels = c1
a1.sinks.s1.type = logger
a1.sinks.s1.channel = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
test
curl -X POST -d '[{ "headers" :{"aaa" : "bbb","ccc" : "ddd"},"body" : "xxx"}]' http://localhost: 44444
常用的sink
HDFS Sink
config
a1.sources = r1
a1.sinks = s1
a1.channels = c1
a1.sources.r1.type = http
a1.sources.r1.port = 44444
a1.sources.r1.channels = c1
a1.sinks.s1.type = hdfs
a1.sinks.s1.channel = c1
a1.sinks.s1.hdfs.path = hdfs://master:9000/testtttt
a1.sinks.s1.hdfs.filePrefix = Syslog
a1.sinks.s1.hdfs.round = true
a1.sinks.s1.hdfs.roundValue = 10
a1.sinks.s1.hdfs.roundUnit = minute
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
test
curl -X POST -d '[{ "headers" :{"aaa" : "bbb","ccc" : "ddd"},"body" : "xxx"}]' http://localhost: 44444
Avro Sink
config 第一個(gè)配置
a1.sources = r1
a1.sinks = s1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind =192.168.137.2
a1.sources.r1.port = 44445
a1.sinks.s1.type = logger
a1.sinks.s1.channel = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
config 第二個(gè)配置
a2.sources = r1
a2.sinks = s1
a2.channels = c1
a2.sources.r1.type = http
a2.sources.r1.port = 44444
a2.sources.r1.channels = c1
a2.sinks.s1.type = avro
a2.sinks.s1.channel = c1
a2.sinks.s1.hostname =192.168.137.2
a2.sinks.s1.port = 44445
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
test
curl -X POST -d '[{ "headers" :{"aaa" : "bbb","ccc" : "ddd"},"body" : "xxx"}]' http://localhost: 44444
Channel Selector
Replicating Channel Selector
config 第一個(gè)配置
a1.sources = r1
a1.sinks = s1 s2
a1.channels = c1 c2
a1.sources.r1.type = http
a1.sources.r1.port = 44444
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2
a1.sinks.s1.type = avro
a1.sinks.s1.channel = c1
a1.sinks.s1.hostname = 192.168.137.2
a1.sinks.s1.port = 44445
a1.sinks.s2.type = avro
a1.sinks.s2.channel = c2
a1.sinks.s2.hostname = 192.168.137.2
a1.sinks.s2.port = 44446
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
config 第二個(gè)配置
a2.sources = r1
a2.sinks = s1
a2.channels = c1
a2.sources.r1.type = avro
a2.sources.r1.channels = c1
a2.sources.r1.bind = 192.168.137.2
a2.sources.r1.port = 44445
a2.sinks.s1.type = logger
a2.sinks.s1.channel = c1
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
config 第三個(gè)配置
a3.sources = r1
a3.sinks = s1
a3.channels = c1
a3.sources.r1.type = avro
a3.sources.r1.channels = c1
a3.sources.r1.bind = 192.168.137.2
a3.sources.r1.port = 44446
a3.sinks.s1.type = logger
a3.sinks.s1.channel = c1
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
test
curl -X POST -d '[{ "headers" :{"aaa" : "bbb","ccc" : "ddd"},"body" : "xxx"}]' http://localhost: 44444
Multiplexing Channel Selector
config 第一個(gè)配置
a1.sources = r1
a1.sinks = s1 s2
a1.channels = c1 c2
a1.sources.r1.type = http
a1.sources.r1.port = 44444
a1.sources.r1.host =192.168.137.2
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.channels = c1 c2
a1.sources.r1.selector.header = aaa
a1.sources.r1.selector.mapping.bbb = c1
a1.sources.r1.selector.mapping.bbb1 = c2
a1.sources.r1.selector.default = c1
a1.sinks.s1.type = avro
a1.sinks.s1.channel = c1
a1.sinks.s1.hostname = 192.168.137.2
a1.sinks.s1.port = 44445
a1.sinks.s2.type = avro
a1.sinks.s2.channel = c2
a1.sinks.s2.hostname = 192.168.137.2
a1.sinks.s2.port = 44446
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
config 第二個(gè)配置
a2.sources = r1
a2.sinks = s1
a2.channels = c1
a2.sources.r1.type = avro
a2.sources.r1.channels = c1
a2.sources.r1.bind = 192.168.137.2
a2.sources.r1.port = 44445
a2.sinks.s1.type = logger
a2.sinks.s1.channel = c1
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
config 第三個(gè)配置
a3.sources = r1
a3.sinks = s1
a3.channels = c1
a3.sources.r1.type = avro
a3.sources.r1.channels = c1
a3.sources.r1.bind = 192.168.137.2
a3.sources.r1.port = 44446
a3.sinks.s1.type = logger
a3.sinks.s1.channel = c1
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
test
curl -X POST -d '[{ "headers" :{"aaa" : "bbb","ccc" : "ddd"},"body" : "xxx"}]' http://localhost: 44444
curl -X POST -d '[{ "headers" :{"aaa" : "bbb1","ccc" : "ddd"},"body" : "xxx"}]' http://localhost: 44444
curl -X POST -d '[{ "headers" :{"aaa" : "bbb2","ccc" : "ddd"},"body" : "xxx"}]' http://localhost: 44444
Sink Processor
Failover Sink Processor
config 第一個(gè)配置
a1.sources = r1
a1.sinks = s1 s2
a1.channels = c1 c2
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = s1 s2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.s1 = 5
a1.sinkgroups.g1.processor.priority.s2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
a1.sources.r1.type = http
a1.sources.r1.port = 44444
a1.sources.r1.host =192.168.137.2
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2
a1.sinks.s1.type = avro
a1.sinks.s1.channel = c1
a1.sinks.s1.hostname = 192.168.137.2
a1.sinks.s1.port = 44445
a1.sinks.s2.type = avro
a1.sinks.s2.channel = c2
a1.sinks.s2.hostname = 192.168.137.2
a1.sinks.s2.port = 44446
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
config第二個(gè)配置
a2.sources = r1
a2.sinks = s1
a2.channels = c1
a2.sources.r1.type = avro
a2.sources.r1.channels = c1
a2.sources.r1.bind = 192.168.137.2
a2.sources.r1.port = 44445
a2.sinks.s1.type = logger
a2.sinks.s1.channel = c1
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
config 第三個(gè)配置
a3.sources = r1
a3.sinks = s1
a3.channels = c1
a3.sources.r1.type = avro
a3.sources.r1.channels = c1
a3.sources.r1.bind = 192.168.137.2
a3.sources.r1.port = 44446
a3.sinks.s1.type = logger
a3.sinks.s1.channel = c1
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
test
curl -X POST -d '[{ "headers" :{"aaa" : "bbb","ccc" : "ddd"},"body" : "xxx"}]' http://localhost: 44444
Load balancing Sink Processor
config 第一個(gè)配置
a1.sources = r1
a1.sinks = s1 s2
a1.channels = c1
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = s1 s2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin
a1.sources.r1.type = http
a1.sources.r1.port = 44444
a1.sources.r1.host =192.168.137.2
a1.sources.r1.channels = c1
a1.sinks.s1.type = avro
a1.sinks.s1.channel = c1
a1.sinks.s1.hostname = 192.168.137.2
a1.sinks.s1.port = 44445
a1.sinks.s2.type = avro
a1.sinks.s2.channel = c1
a1.sinks.s2.hostname = 192.168.137.2
a1.sinks.s2.port = 44446
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
config 第二個(gè)配置
a2.sources = r1
a2.sinks = s1
a2.channels = c1
a2.sources.r1.type = avro
a2.sources.r1.channels = c1
a2.sources.r1.bind = 192.168.137.2
a2.sources.r1.port = 44445
a2.sinks.s1.type = logger
a2.sinks.s1.channel = c1
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
config 第三個(gè)配置
a3.sources = r1
a3.sinks = s1
a3.channels = c1
a3.sources.r1.type = avro
a3.sources.r1.channels = c1
a3.sources.r1.bind = 192.168.137.2
a3.sources.r1.port = 44446
a3.sinks.s1.type = logger
a3.sinks.s1.channel = c1
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
test
curl -X POST -d '[{ "headers" :{"aaa" : "bbb","ccc" : "ddd"},"body" : "xxx"}]' http://localhost: 44444
Interceptor
Timestamp Interceptor
config
a1.sources = r1
a1.sinks = s1
a1.channels = c1
a1.sources.r1.type = http
a1.sources.r1.port = 44444
a1.sources.r1.host =192.168.137.2
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
a1.sinks.s1.type = logger
a1.sinks.s1.channel = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
test
curl -X POST -d '[{ "headers" :{"aaa" : "bbb","ccc" : "ddd"},"body" : "xxx"}]' http://localhost: 44444
static Interceptor
config
a1.sources = r1
a1.sinks = s1
a1.channels = c1
a1.sources.r1.type = http
a1.sources.r1.port = 44444
a1.sources.r1.host =192.168.137.2
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = eee
a1.sources.r1.interceptors.i1.value = fff
a1.sinks.s1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.s1.channel = c1
test
curl -X POST -d '[{ "headers" :{"aaa" : "bbb","ccc" : "ddd"},"body" : "xxx"}]' http://localhost: 44444