前言: 繼上一篇從source到channel到sink實現了一整套的流程之后颂跨,我們這次學習一下Channel Selector與Sink Processors恒削。我個人為了方便理解把這兩個組件想象成在Source和Channel之間以及Channel和Sink之間蔓同。(注:這兩個組件不是必須要設置的蹲诀。)
一弃揽、Channel Selector
????flume channel selectors允許給一個source可以配置多個channel的能力则北。這種模式有兩種方式尚揣,一種是用來復制(Replication)掖举,這也是默認配置,另一種是用來分流(Multiplexing)方篮。
????1) Replication方式:可以將數據源復制多份励负,分別傳遞到多個channel中,每個channel接收到的數據都是相同的巾表。如下圖所示:
這種方式的配置主要有兩個key:
a1.sources = r1
a1.channels = c1 c2 c3
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2 c3
#這意味著c3是可選的集币,向c3寫入失敗會被忽略惠猿。但是向c1负间,c2寫入失敗會出錯
a1.sources.r1.selector.optional = c3
????2)Multiplexing方式:selector可以根據header的值來確定數據傳遞到哪一個channel,其中header的值可以通過interceptor去設置趾访。如果現在不明白interceptor沒關系董虱,就把它當作能在header中添加一個key-value對的玩意就可以愤诱。
????假設我們通過攔截器向header中添加了key為state的一個屬性淫半,他的值根據具體需求可以為CZ和US等。那我們想把值為CZ的數據流通過c1處理科吭,把值為US的數據流通過c2,c3處理猴鲫,其他情況用c4處理拂共。則flume.conf 配置如下:
a1.sources = r1
a1.channels = c1 c2 c3 c4
#設置selector類型
a1.sources.r1.selector.type = multiplexing
#設置根據header中的什么key去分流
a1.sources.r1.selector.header = state
#設置根據key的具體值選擇哪個channel
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
#設置默認channel
a1.sources.r1.selector.default = c4
二宜狐、Sink Processors
????Sink Processors蛇捌,顧名思義,就是沉槽處理器柑爸,也就是數據向哪里流盒音,怎么流由處理器控制。以sinkgroup的形式出現譬圣。簡單的說就是一個source 對應一個Sinkgroups雄坪,即多個sink, 其實與selector情況差不多,只是processor考慮更多的是可靠性和性能绳姨,即故障轉移與負載均衡的設置阔挠。
????SinkGroup允許組織多個sink到一個實體上。SinkProcessors 能夠提供在組內所有sink之間實現負載均衡的能力(配置load_balance)跪削。而且在失敗的情況下能夠進行故障轉移碾盐,從一個Sink到另一個Sink(配置failover )揩局。
#設置組名
a1.sinkgroups = g1
#設置組內的sink
a1.sinkgroups.g1.sinks = k1 k2
#設置processor的類別,這里是負載均衡
a1.sinkgroups.g1.processor.type = load_balance
2.1 負載均衡(load_balance)
????過程:source里的event流經channel孕豹,進入sink組励背,在sink組內部根據負載算法(我們在配置文件中配的round_robin砸西、random)選擇sink,后續(xù)可以選擇不同機器上的agent實現負載均衡衅疙。
借圖:https://blog.csdn.net/silentwolfyh/article/details/51165804
配置如下:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
# Describe/configure the source
#exec源從執(zhí)行Unix命令的標準輸出獲取數據
a1.sources.r1.type = exec
a1.sources.r1.channels=c1
#要執(zhí)行的Unix命令是tail饱溢,tail命令默認在屏幕上顯示指定文件的末尾10行
a1.sources.r1.command=tail -F /home/flume/xx.log
#define sinkgroups
#這里定義的就是Processor
a1.sinkgroups=g1
a1.sinkgroups.g1.sinks=k1 k2
#類型為負載均衡
a1.sinkgroups.g1.processor.type=load_balance
#是否指數增長超時恢復時間
a1.sinkgroups.g1.processor.backoff=true
#選擇下一個sink的算法
a1.sinkgroups.g1.processor.selector=round_robin
#define the sink 1
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=192.168.1.112
a1.sinks.k1.port=9876
#define the sink 2
a1.sinks.k2.type=avro
a1.sinks.k2.hostname=192.168.1.113
a1.sinks.k2.port=9876
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel=c1
2.2 故障轉移(Failover)
????配置一組sink绩郎,這組sink組成一個Failover Sink Processor翁逞,當有一個sink處理失敗,Flume將這個sink放到一個地方状植,等待冷卻時間怨喘,可以正常處理event時再拿回來。
????event通過一個channel流向一個sink組洼畅,在sink組內部根據優(yōu)先級選擇具體的sink棚赔,一個失敗后再轉向另一個sink,流程圖如下:
相應配置如下:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.channels=c1
a1.sources.r1.command=tail -F /home/flume/xx.log
#define sinkgroups
a1.sinkgroups=g1
a1.sinkgroups.g1.sinks=k1 k2
a1.sinkgroups.g1.processor.type=failover
#在Sink中的兩個數據為優(yōu)先級設置默認為5丧肴,數字越大越優(yōu)先
a1.sinkgroups.g1.processor.priority.k1=10
a1.sinkgroups.g1.processor.priority.k2=5
a1.sinkgroups.g1.processor.maxpenalty=10000
#define the sink 1
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=192.168.1.112
a1.sinks.k1.port=9876
#define the sink 2
a1.sinks.k2.type=avro
a1.sinks.k2.hostname=192.168.1.113
a1.sinks.k2.port=9876
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel=c1
三芋浮、總結
channel selector和sink processor都不是必須配置的纸巷,他們有自己的默認值。channel selector注重處理數據的流向瘤旨,sink processor注重處理可靠性和性能。
參考資料: