Flume配置案例:常用配置

  1. 常用的source
    1.1 nettcat
    1.2 Avro Source
    1.3 Exec Source
    1.4 spool Source
    1.5 HTTP source
  2. 常用的sink
    2.1 HDFS Sink
    2.2 Avro Sink
  3. Channel Selector
    3.1 Replicating Channel Selector
    3.2 Multiplexing Channel Selector
  4. Sink Processor
    4.1 Failover Sink Processor
    4.2 Load balancing Sink Processor
  5. Interceptor
    5.1 Timestamp Interceptor
    5.2 static Interceptor

常用的source

netcat

config

a1.sources = r1
a1.sinks = s1
a1.channels = c1

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

a1.sinks.s1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.s1.channel = c1

start

flume-ng agent -n a1 -c conf -f xxx.conf

test

telnet 127.0.0.1 44444
hello world!

Avro Source

config

a1.sources = r1
a1.sinks = s1
a1.channels = c1

a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind =192.168.137.2
a1.sources.r1.port = 44444

a1.sinks.s1.type = logger
a1.sinks.s1.channel = c1

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

start

flume-ng avro-client -c . -H localhost -p 44444 -F ~/bbb

test

avro-client發(fā)送文件:
echo "aaa" > ~/bbb

Exec Source

config

a1.sources = r1
a1.sinks = s1
a1.channels = c1

a1.sources.r1.type = exec
a1.sources.r1.command = tail –f aaa

a1.sources.r1.channels = c1

a1.sinks.s1.type = logger
a1.sinks.s1.channel = c1

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

test

echo "exec test" >> aaa

spool Source

config

a1.sources = r1
a1.sinks = s1
a1.channels = c1

a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = aaa
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1

a1.sinks.s1.type = logger
a1.sinks.s1.channel = c1

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

HTTP source

config

a1.sources = r1
a1.sinks = s1
a1.channels = c1

a1.sources.r1.type = http # org.apache.flume.source.http.HTTPSource
a1.sources.r1.port = 44444
a1.sources.r1.channels = c1

a1.sinks.s1.type = logger
a1.sinks.s1.channel = c1

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

test

curl -X POST -d '[{ "headers" :{"aaa" : "bbb","ccc" : "ddd"},"body" : "xxx"}]' http://localhost: 44444

常用的sink

HDFS Sink

config

a1.sources = r1
a1.sinks = s1

a1.channels = c1

a1.sources.r1.type = http
a1.sources.r1.port = 44444

a1.sources.r1.channels = c1

a1.sinks.s1.type = hdfs
a1.sinks.s1.channel = c1
a1.sinks.s1.hdfs.path = hdfs://master:9000/testtttt
a1.sinks.s1.hdfs.filePrefix = Syslog
a1.sinks.s1.hdfs.round = true
a1.sinks.s1.hdfs.roundValue = 10
a1.sinks.s1.hdfs.roundUnit = minute

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

test

curl -X POST -d '[{ "headers" :{"aaa" : "bbb","ccc" : "ddd"},"body" : "xxx"}]' http://localhost: 44444

Avro Sink

config 第一個(gè)配置

a1.sources = r1
a1.sinks = s1
a1.channels = c1

a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind =192.168.137.2
a1.sources.r1.port = 44445

a1.sinks.s1.type = logger
a1.sinks.s1.channel = c1

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

config 第二個(gè)配置

a2.sources = r1
a2.sinks = s1
a2.channels = c1

a2.sources.r1.type = http
a2.sources.r1.port = 44444
a2.sources.r1.channels = c1

a2.sinks.s1.type = avro
a2.sinks.s1.channel = c1
a2.sinks.s1.hostname =192.168.137.2
a2.sinks.s1.port = 44445

a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

test

curl -X POST -d '[{ "headers" :{"aaa" : "bbb","ccc" : "ddd"},"body" : "xxx"}]' http://localhost: 44444

Channel Selector

Replicating Channel Selector

config 第一個(gè)配置

a1.sources = r1
a1.sinks = s1 s2
a1.channels = c1 c2

a1.sources.r1.type = http
a1.sources.r1.port = 44444
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2

a1.sinks.s1.type = avro
a1.sinks.s1.channel = c1
a1.sinks.s1.hostname = 192.168.137.2
a1.sinks.s1.port = 44445

a1.sinks.s2.type = avro
a1.sinks.s2.channel = c2
a1.sinks.s2.hostname = 192.168.137.2
a1.sinks.s2.port = 44446

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

config 第二個(gè)配置

a2.sources = r1
a2.sinks = s1
a2.channels = c1

a2.sources.r1.type = avro
a2.sources.r1.channels = c1
a2.sources.r1.bind = 192.168.137.2
a2.sources.r1.port = 44445

a2.sinks.s1.type = logger
a2.sinks.s1.channel = c1

a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

config 第三個(gè)配置

a3.sources = r1
a3.sinks = s1
a3.channels = c1

a3.sources.r1.type = avro
a3.sources.r1.channels = c1
a3.sources.r1.bind = 192.168.137.2
a3.sources.r1.port = 44446

a3.sinks.s1.type = logger
a3.sinks.s1.channel = c1

a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

test

curl -X POST -d '[{ "headers" :{"aaa" : "bbb","ccc" : "ddd"},"body" : "xxx"}]' http://localhost: 44444

Multiplexing Channel Selector

config 第一個(gè)配置

a1.sources = r1
a1.sinks = s1 s2
a1.channels = c1 c2

a1.sources.r1.type = http
a1.sources.r1.port = 44444
a1.sources.r1.host =192.168.137.2
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.channels = c1 c2

a1.sources.r1.selector.header = aaa
a1.sources.r1.selector.mapping.bbb = c1
a1.sources.r1.selector.mapping.bbb1 = c2
a1.sources.r1.selector.default = c1

a1.sinks.s1.type = avro
a1.sinks.s1.channel = c1
a1.sinks.s1.hostname = 192.168.137.2
a1.sinks.s1.port = 44445

a1.sinks.s2.type = avro
a1.sinks.s2.channel = c2
a1.sinks.s2.hostname = 192.168.137.2
a1.sinks.s2.port = 44446

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

config 第二個(gè)配置

a2.sources = r1
a2.sinks = s1
a2.channels = c1

a2.sources.r1.type = avro
a2.sources.r1.channels = c1
a2.sources.r1.bind = 192.168.137.2
a2.sources.r1.port = 44445

a2.sinks.s1.type = logger
a2.sinks.s1.channel = c1

a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

config 第三個(gè)配置

a3.sources = r1
a3.sinks = s1
a3.channels = c1

a3.sources.r1.type = avro
a3.sources.r1.channels = c1
a3.sources.r1.bind = 192.168.137.2
a3.sources.r1.port = 44446

a3.sinks.s1.type = logger
a3.sinks.s1.channel = c1

a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

test

curl -X POST -d '[{ "headers" :{"aaa" : "bbb","ccc" : "ddd"},"body" : "xxx"}]' http://localhost: 44444

curl -X POST -d '[{ "headers" :{"aaa" : "bbb1","ccc" : "ddd"},"body" : "xxx"}]' http://localhost: 44444

curl -X POST -d '[{ "headers" :{"aaa" : "bbb2","ccc" : "ddd"},"body" : "xxx"}]' http://localhost: 44444

Sink Processor

Failover Sink Processor

config 第一個(gè)配置

a1.sources = r1
a1.sinks = s1 s2
a1.channels = c1 c2

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = s1 s2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.s1 = 5
a1.sinkgroups.g1.processor.priority.s2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

a1.sources.r1.type = http
a1.sources.r1.port = 44444
a1.sources.r1.host =192.168.137.2
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2

a1.sinks.s1.type = avro
a1.sinks.s1.channel = c1
a1.sinks.s1.hostname = 192.168.137.2
a1.sinks.s1.port = 44445

a1.sinks.s2.type = avro
a1.sinks.s2.channel = c2
a1.sinks.s2.hostname = 192.168.137.2
a1.sinks.s2.port = 44446

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

config第二個(gè)配置

a2.sources = r1
a2.sinks = s1
a2.channels = c1

a2.sources.r1.type = avro
a2.sources.r1.channels = c1
a2.sources.r1.bind = 192.168.137.2
a2.sources.r1.port = 44445

a2.sinks.s1.type = logger
a2.sinks.s1.channel = c1

a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

config 第三個(gè)配置

a3.sources = r1
a3.sinks = s1

a3.channels = c1

a3.sources.r1.type = avro
a3.sources.r1.channels = c1
a3.sources.r1.bind = 192.168.137.2
a3.sources.r1.port = 44446

a3.sinks.s1.type = logger
a3.sinks.s1.channel = c1

a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

test

curl -X POST -d '[{ "headers" :{"aaa" : "bbb","ccc" : "ddd"},"body" : "xxx"}]' http://localhost: 44444

Load balancing Sink Processor

config 第一個(gè)配置

a1.sources = r1
a1.sinks = s1 s2
a1.channels = c1

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = s1 s2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin

a1.sources.r1.type = http
a1.sources.r1.port = 44444
a1.sources.r1.host =192.168.137.2
a1.sources.r1.channels = c1

a1.sinks.s1.type = avro
a1.sinks.s1.channel = c1
a1.sinks.s1.hostname = 192.168.137.2
a1.sinks.s1.port = 44445

a1.sinks.s2.type = avro
a1.sinks.s2.channel = c1
a1.sinks.s2.hostname = 192.168.137.2
a1.sinks.s2.port = 44446

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

config 第二個(gè)配置

a2.sources = r1
a2.sinks = s1
a2.channels = c1

a2.sources.r1.type = avro
a2.sources.r1.channels = c1
a2.sources.r1.bind = 192.168.137.2
a2.sources.r1.port = 44445

a2.sinks.s1.type = logger
a2.sinks.s1.channel = c1

a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

config 第三個(gè)配置

a3.sources = r1
a3.sinks = s1
a3.channels = c1

a3.sources.r1.type = avro
a3.sources.r1.channels = c1
a3.sources.r1.bind = 192.168.137.2
a3.sources.r1.port = 44446

a3.sinks.s1.type = logger
a3.sinks.s1.channel = c1

a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

test

curl -X POST -d '[{ "headers" :{"aaa" : "bbb","ccc" : "ddd"},"body" : "xxx"}]' http://localhost: 44444

Interceptor

Timestamp Interceptor

config

a1.sources = r1
a1.sinks = s1
a1.channels = c1

a1.sources.r1.type = http
a1.sources.r1.port = 44444
a1.sources.r1.host =192.168.137.2
a1.sources.r1.channels = c1

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp

a1.sinks.s1.type = logger
a1.sinks.s1.channel = c1

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

test

curl -X POST -d '[{ "headers" :{"aaa" : "bbb","ccc" : "ddd"},"body" : "xxx"}]' http://localhost: 44444

static Interceptor

config

a1.sources = r1
a1.sinks = s1
a1.channels = c1

a1.sources.r1.type = http
a1.sources.r1.port = 44444
a1.sources.r1.host =192.168.137.2
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = eee
a1.sources.r1.interceptors.i1.value = fff

a1.sinks.s1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.s1.channel = c1

test

curl -X POST -d '[{ "headers" :{"aaa" : "bbb","ccc" : "ddd"},"body" : "xxx"}]' http://localhost: 44444

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末缕棵,一起剝皮案震驚了整個(gè)濱河市筋岛,隨后出現(xiàn)的幾起案子椎眯,更是在濱河造成了極大的恐慌佣耐,老刑警劉巖幻捏,帶你破解...
    沈念sama閱讀 206,602評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件潮剪,死亡現(xiàn)場(chǎng)離奇詭異裁良,居然都是意外死亡衷恭,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,442評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門堤撵,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)仁讨,“玉大人,你說(shuō)我怎么就攤上這事粒督∨愀停” “怎么了禽翼?”我有些...
    開封第一講書人閱讀 152,878評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵屠橄,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我闰挡,道長(zhǎng)锐墙,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,306評(píng)論 1 279
  • 正文 為了忘掉前任长酗,我火速辦了婚禮溪北,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘。我一直安慰自己之拨,他們只是感情好茉继,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,330評(píng)論 5 373
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著蚀乔,像睡著了一般烁竭。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上吉挣,一...
    開封第一講書人閱讀 49,071評(píng)論 1 285
  • 那天派撕,我揣著相機(jī)與錄音,去河邊找鬼睬魂。 笑死终吼,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的氯哮。 我是一名探鬼主播际跪,決...
    沈念sama閱讀 38,382評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼蛙粘!你這毒婦竟也來(lái)了垫卤?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,006評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤出牧,失蹤者是張志新(化名)和其女友劉穎穴肘,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體舔痕,經(jīng)...
    沈念sama閱讀 43,512評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡评抚,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,965評(píng)論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了伯复。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片慨代。...
    茶點(diǎn)故事閱讀 38,094評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖啸如,靈堂內(nèi)的尸體忽然破棺而出侍匙,到底是詐尸還是另有隱情,我是刑警寧澤叮雳,帶...
    沈念sama閱讀 33,732評(píng)論 4 323
  • 正文 年R本政府宣布想暗,位于F島的核電站,受9級(jí)特大地震影響帘不,放射性物質(zhì)發(fā)生泄漏说莫。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,283評(píng)論 3 307
  • 文/蒙蒙 一寞焙、第九天 我趴在偏房一處隱蔽的房頂上張望储狭。 院中可真熱鬧互婿,春花似錦、人聲如沸辽狈。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,286評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)刮萌。三九已至懂牧,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間尊勿,已是汗流浹背僧凤。 一陣腳步聲響...
    開封第一講書人閱讀 31,512評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留元扔,地道東北人躯保。 一個(gè)月前我還...
    沈念sama閱讀 45,536評(píng)論 2 354
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像澎语,于是被迫代替她去往敵國(guó)和親途事。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,828評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容