首先所有核心組件都會實現(xiàn)org.apache.flume.lifecycle.LifecycleAware接口:
public interface LifecycleAware {
public void start();
public void stop();
public LifecycleState getLifecycleState();
}
start方法在整個Flume啟動時或者初始化組件時都會調(diào)用start方法進行組件初始化妆够,F(xiàn)lume組件出現(xiàn)異常停止時會調(diào)用stop识啦,getLifecycleState返回組件的生命周期狀態(tài),有IDLE, START, STOP, ERROR四個狀態(tài)神妹。
如果開發(fā)的組件需要配置颓哮,如設(shè)置一些屬性;可以實現(xiàn)org.apache.flume.conf.Configurable接口:
public interface Configurable {
public void configure(Context context);
}
Flume在啟動組件之前會調(diào)用configure來初始化組件一些配置鸵荠。
1冕茅、Source
Source用于采集日志數(shù)據(jù),有兩種實現(xiàn)方式:輪詢拉取和事件驅(qū)動機制蛹找;Source接口如下:
public interface Source extends LifecycleAware, NamedComponent {
public void setChannelProcessor(ChannelProcessor channelProcessor);
public ChannelProcessor getChannelProcessor();
}
Source接口首先繼承了LifecycleAware接口姨伤,然后只提供了ChannelProcessor的setter和getter接口,也就是說它的的所有邏輯的實現(xiàn)應(yīng)該在LifecycleAware接口的start和stop中實現(xiàn)庸疾;ChannelProcessor之前介紹過用來進行日志流的過濾和Channel的選擇及調(diào)度乍楚。
而 Source 是通過 SourceFactory 工廠創(chuàng)建,默認提供了 DefaultSourceFactory 届慈,其首先通過 Enum 類型 org.apache.flume.conf.source.SourceType 查找默認實現(xiàn)徒溪,如 exec ,則找到 org.apache.flume.source.ExecSource 實現(xiàn)金顿,如果找不到直接 Class.forName(className) 創(chuàng)建臊泌。
Source 提供了兩種機制: PollableSource (輪詢拉取)和 EventDrivenSource(事件驅(qū)動):
PollableSource 默認提供了如下實現(xiàn):
比如 JMSSource 實現(xiàn)使用 javax.jms.MessageConsumer.receive(pollTimeout) 主動去拉取消息串绩。
EventDrivenSource 默認提供了如下實現(xiàn):
比如 NetcatSource 缺虐、 HttpSource 就是事件驅(qū)動,即被動等待礁凡;比如 HttpSource 就是內(nèi)部啟動了一個內(nèi)嵌的 Jetty 啟動了一個 Servlet 容器高氮,通過 FlumeHTTPServlet 去接收消息慧妄。
Flume 提供了 SourceRunner 用來啟動 Source 的流轉(zhuǎn):
public class EventDrivenSourceRunner extends SourceRunner {
private LifecycleState lifecycleState;
public EventDrivenSourceRunner() {
lifecycleState = LifecycleState.IDLE; //啟動之前是空閑狀態(tài)
}
@Override
public void start() {
Source source = getSource(); //獲取Source
ChannelProcessor cp = source.getChannelProcessor(); //Channel處理器
cp.initialize(); //初始化Channel處理器
source.start(); //啟動Source
lifecycleState = LifecycleState.START; //本組件狀態(tài)改成啟動狀態(tài)
}
@Override
public void stop() {
Source source = getSource(); //先停Source
source.stop();
ChannelProcessor cp = source.getChannelProcessor();
cp.close();//再停Channel處理器
lifecycleState = LifecycleState.STOP; //本組件狀態(tài)改成停止?fàn)顟B(tài)
}
}
從本組件也可以看出: 1 、首先要初始化 ChannelProcessor 剪芍,其實現(xiàn)時初始化過濾器鏈塞淹; 2 、接著啟動 Source 并更改本組件的狀態(tài)罪裹。
public class PollableSourceRunner extends SourceRunner {
@Override
public void start() {
PollableSource source = (PollableSource) getSource();
ChannelProcessor cp = source.getChannelProcessor();
cp.initialize();
source.start();
runner = new PollingRunner();
runner.source = source;
runner.counterGroup = counterGroup;
runner.shouldStop = shouldStop;
runnerThread = new Thread(runner);
runnerThread.setName(getClass().getSimpleName() + "-" +
source.getClass().getSimpleName() + "-" + source.getName());
runnerThread.start();
lifecycleState = LifecycleState.START;
}
}
而 PollingRunner 首先初始化組件饱普,但是又啟動了一個線程 PollingRunner ,其作用就是輪詢拉取數(shù)據(jù):
@Override
public void run() {
while (!shouldStop.get()) { //如果沒有停止状共,則一直在死循環(huán)運行
counterGroup.incrementAndGet("runner.polls");
try {
//調(diào)用PollableSource的process方法進行輪詢拉取,然后判斷是否遇到了失敗補償
if (source.process().equals(PollableSource.Status.BACKOFF)) {/
counterGroup.incrementAndGet("runner.backoffs");
//失敗補償時暫停線程處理峡继,等待超時時間之后重試
Thread.sleep(Math.min(
counterGroup.incrementAndGet("runner.backoffs.consecutive")
* source.getBackOffSleepIncrement(), source.getMaxBackOffSleepInterval()));
} else {
counterGroup.set("runner.backoffs.consecutive", 0L);
}
} catch (InterruptedException e) {
}
}
}
}
}
Flume 在啟動時會判斷 Source 是 PollableSource 還是 EventDrivenSource 來選擇使用 PollableSourceRunner 還是 EventDrivenSourceRunner 冯袍。
比如 HttpSource 實現(xiàn),其通過 FlumeHTTPServlet 接收消息然后:
List<Event> events = Collections.emptyList(); //create empty list
//首先從請求中獲取Event
events = handler.getEvents(request);
//然后交給ChannelProcessor進行處理
getChannelProcessor().processEventBatch(events);
到此基本的 Source 流程就介紹完了康愤,其作用就是監(jiān)聽日志征冷,采集检激,然后交給 ChannelProcessor 進行處理呵扛。
2 筐带、 Channel
Channel 用于連接 Source 和 Sink , Source 生產(chǎn)日志發(fā)送到 Channel 帖鸦, Sink 從 Channel 消費日志作儿;也就是說通過 Channel 實現(xiàn)了 Source 和 Sink 的解耦攻锰,可以實現(xiàn)多對多的關(guān)聯(lián)娶吞,和 Source 、 Sink 的異步化妒蛇。
之前 Source 采集到日志后會交給 ChannelProcessor 處理吏奸,那么接下來我們先從 ChannelProcessor 入手苦丁,其依賴三個組件:
private final ChannelSelector selector; //Channel選擇器
private final InterceptorChain interceptorChain; //攔截器鏈
private ExecutorService execService; //用于實現(xiàn)可選Channel的ExecutorService,默認是單線程實現(xiàn)
接下來看下其是如何處理 Event 的:
public void processEvent(Event event) {
event = interceptorChain.intercept(event); //首先進行攔截器鏈過濾
if (event == null) {
return;
}
List<Event> events = new ArrayList<Event>(1);
events.add(event);
//通過Channel選擇器獲取必須成功處理的Channel蛾狗,然后事務(wù)中執(zhí)行
List<Channel> requiredChannels = selector.getRequiredChannels(event);
for (Channel reqChannel : requiredChannels) {
executeChannelTransaction(reqChannel, events, false);
}
//通過Channel選擇器獲取可選的Channel留凭,這些Channel失敗是可以忽略蔼夜,不影響其他Channel的處理
List<Channel> optionalChannels = selector.getOptionalChannels(event);
for (Channel optChannel : optionalChannels) {
execService.submit(new OptionalChannelTransactionRunnable(optChannel, events));
}
}
另外內(nèi)部還提供了批處理實現(xiàn)方法 processEventBatch 匠题;對于內(nèi)部事務(wù)實現(xiàn)的話可以參考 executeChannelTransaction 方法钱磅,整體事務(wù)機制類似于 JDBC :
private static void executeChannelTransaction(Channel channel, List<Event> batch, boolean isOptional) {
//1、獲取Channel上的事務(wù)
Transaction tx = channel.getTransaction();
Preconditions.checkNotNull(tx, "Transaction object must not be null");
try {
//2、開啟事務(wù)
tx.begin();
//3、在Channel上執(zhí)行批量put操作
for (Event event : batch) {
channel.put(event);
}
//4、成功后提交事務(wù)
tx.commit();
} catch (Throwable t) {
//5、異常后回滾事務(wù)
tx.rollback();
if (t instanceof Error) {
LOG.error("Error while writing to channel: " +
channel, t);
throw (Error) t;
} else if(!isOptional) {//如果是可選的Channel工坊,異常忽略
throw new ChannelException("Unable to put batch on required " +
"channel: " + channel, t);
}
} finally {
//最后關(guān)閉事務(wù)
tx.close();
}
}
Interceptor 用于過濾 Event 昭齐,即傳入一個 Event 然后進行過濾加工司浪,然后返回一個新的 Event 饮睬,接口如下:
public interface Interceptor {
public void initialize();
public Event intercept(Event event);
public List<Event> intercept(List<Event> events);
public void close();
}
可以看到其提供了 initialize 和 close 方法用于啟動和關(guān)閉割去; intercept 方法用于過濾或加工 Event 咖城。比如 HostInterceptor 攔截器用于獲取本機 IP 然后默認添加到 Event的字段為 host 的 Header 中辐董。
接下來就是 ChannelSelector 選擇器了苔严,其通過如下方式創(chuàng)建:
//獲取ChannelSelector配置悼沈,比如agent.sources.s1.selector.type = replicating
ChannelSelectorConfiguration selectorConfig = config.getSelectorConfiguration();
//使用Source關(guān)聯(lián)的Channel創(chuàng)建絮供,比如agent.sources.s1.channels = c1 c2
ChannelSelector selector = ChannelSelectorFactory.create(sourceChannels, selectorConfig);
ChannelSelector 默認提供了兩種實現(xiàn):復(fù)制和多路復(fù)用:
默認實現(xiàn)是復(fù)制選擇器 ReplicatingChannelSelector ,即把接收到的消息復(fù)制到每一個 Channel ;多路復(fù)用選擇器 MultiplexingChannelSelector 會根據(jù) Event Header中的參數(shù)進行選擇,以此來選擇使用哪個 Channel 晴圾。
而 Channel 是 Event 中轉(zhuǎn)的地方撒蟀, Source 發(fā)布 Event 到 Channel , Sink 消費 Channel 的 Event ; Channel 接口提供了如下接口用來實現(xiàn) Event 流轉(zhuǎn):
public interface Channel extends LifecycleAware, NamedComponent {
public void put(Event event) throws ChannelException;
public Event take() throws ChannelException;
public Transaction getTransaction();
}
put 用于發(fā)布 Event 驻右, take 用于消費 Event 森爽, getTransaction 用于事務(wù)支持雕旨。默認提供了如下 Channel 的實現(xiàn):
對于 Channel 的實現(xiàn)我們后續(xù)單獨章節(jié)介紹。
3 可款、 Sink
Sink 從 Channel 消費 Event ,然后進行轉(zhuǎn)移到收集 / 聚合層或存儲層。 Sink 接口如下所示:
public interface Sink extends LifecycleAware, NamedComponent {
public void setChannel(Channel channel);
public Channel getChannel();
public Status process() throws EventDeliveryException;
public static enum Status {
READY, BACKOFF
}
}
類似于 Source 栈顷,其首先繼承了 LifecycleAware ,然后提供了 Channel 的 getter/setter 方法,并提供了 process 方法進行消費行嗤,此方法會返回消費的狀態(tài), READY 或 BACKOFF 。
Sink 也是通過 SinkFactory 工廠來創(chuàng)建鸵闪,其也提供了 DefaultSinkFactory 默認工廠,比如傳入 hdfs 苛让,會先查找 Enum org.apache.flume.conf.sink.SinkType ,然后找到相應(yīng)的默認處理類 org.apache.flume.sink.hdfs.HDFSEventSink 憔儿,如果沒找到默認處理類,直接通過 Class.forName(className) 進行反射創(chuàng)建叠艳。
我們知道 Sink 還提供了分組功能潦俺,用于把多個 Sink 聚合為一組進行使用劝堪,內(nèi)部提供了 SinkGroup 用來完成這個事情余境。此時問題來了绣张,如何去調(diào)度多個 Sink ,其內(nèi)部使用了 SinkProcessor 來完成這個事情,默認提供了故障轉(zhuǎn)移和負載均衡兩個策略。
首先 SinkGroup 就是聚合多個 Sink 為一組,然后將多個 Sink 傳給 SinkProcessorFactory 進行創(chuàng)建 SinkProcessor 狈惫,而策略是根據(jù)配置文件中配置的如 agent.sinkgroups.g1.processor.type = load_balance 來選擇的难菌。
SinkProcessor 提供了如下實現(xiàn):
DefaultSinkProcessor :默認實現(xiàn)舵匾,用于單個 Sink 的場景使用砍濒。
FailoverSinkProcessor :故障轉(zhuǎn)移實現(xiàn):
public Status process() throws EventDeliveryException {
Long now = System.currentTimeMillis();
//1绪囱、首先檢查失敗隊列的頭部的Sink是否已經(jīng)過了失敗補償?shù)却龝r間了
while(!failedSinks.isEmpty() && failedSinks.peek().getRefresh() < now) {
//2琉挖、如果可以使用了羊始,則從失敗Sink隊列獲取隊列第一個Sink
FailedSink cur = failedSinks.poll();
Status s;
try {
s = cur.getSink().process(); //3、使用此Sink進行處理
if (s == Status.READY) { //4查描、如果處理成功
liveSinks.put(cur.getPriority(), cur.getSink()); //4.1突委、放回存活Sink隊列
activeSink = liveSinks.get(liveSinks.lastKey());
} else {
failedSinks.add(cur); //4.2、如果此時不是READY冬三,即BACKOFF期間匀油,再次放回失敗隊列
}
return s;
} catch (Exception e) {
cur.incFails(); //5、如果遇到異常了勾笆,則增加失敗次數(shù)敌蚜,并放回失敗隊列
failedSinks.add(cur);
}
}
Status ret = null;
while(activeSink != null) { //6、此時失敗隊列中沒有Sink能處理了匠襟,那么需要使用存活Sink隊列進行處理
try {
ret = activeSink.process();
return ret;
} catch (Exception e) { //7钝侠、處理失敗進行轉(zhuǎn)移到失敗隊列
activeSink = moveActiveToDeadAndGetNext();
}
}
throw new EventDeliveryException("All sinks failed to process, " +
"nothing left to failover to");
}
失敗隊列是一個優(yōu)先級隊列,使用 refresh 屬性排序酸舍,而 refresh 是通過如下機制計算的:
refresh = System.currentTimeMillis()
+ Math.min(maxPenalty, (1 << sequentialFailures) * FAILURE_PENALTY);
其中 maxPenalty 是最大等待時間帅韧,默認 30s ,而 (1 << sequentialFailures) * FAILURE_PENALTY) 用于實現(xiàn)指數(shù)級等待時間遞增啃勉, FAILURE_PENALTY 是 1s 忽舟。
LoadBalanceSinkProcessor :用于實現(xiàn) Sink 的負載均衡,其通過 SinkSelector 進行實現(xiàn),類似于 ChannelSelector 叮阅。 LoadBalanceSinkProcessor 在啟動時會根據(jù)配置刁品,如 agent.sinkgroups.g1.processor.selector = random 進行選擇,默認提供了兩種選擇器:
LoadBalanceSinkProcessor 使用如下機制進行負載均衡:
public Status process() throws EventDeliveryException {
Status status = null;
//1浩姥、使用選擇器創(chuàng)建相應(yīng)的迭代器挑随,也就是用來選擇Sink的迭代器
Iterator<Sink> sinkIterator = selector.createSinkIterator();
while (sinkIterator.hasNext()) {
Sink sink = sinkIterator.next();
try {
//2、選擇器迭代Sink進行處理,如果成功直接break掉這次處理,此次負載均衡就算完成了
status = sink.process();
break;
} catch (Exception ex) {
//3握牧、失敗后會通知選擇器饼记,采取相應(yīng)的失敗退避補償算法進行處理
selector.informSinkFailed(sink);
LOGGER.warn("Sink failed to consume event. "
+ "Attempting next sink if available.", ex);
}
}
if (status == null) {
throw new EventDeliveryException("All configured sinks have failed");
}
return status;
}
如上的核心就是怎么創(chuàng)建迭代器颜价,如何進行失敗退避補償處理,首先我們看下 RoundRobinSinkSelector 實現(xiàn),其內(nèi)部是通過通用的 RoundRobinOrderSelector 選擇器實現(xiàn):
public Iterator<T> createIterator() {
//1、獲取存活的Sink索引噪舀,
List<Integer> activeIndices = getIndexList();
int size = activeIndices.size();
//2、如果上次記錄的下一個存活Sink的位置超過了size飘诗,那么從隊列頭重新開始計數(shù)
if (nextHead >= size) {
nextHead = 0;
}
//3与倡、獲取本次使用的起始位置
int begin = nextHead++;
if (nextHead == activeIndices.size()) {
nextHead = 0;
}
//4、從該位置開始迭代疚察,其實現(xiàn)類似于環(huán)形隊列蒸走,比如整個隊列是5,起始位置是3貌嫡,則按照 3、4该溯、0岛抄、1、2的順序進行輪詢狈茉,實現(xiàn)了輪詢算法
int[] indexOrder = new int[size];
for (int i = 0; i < size; i++) {
indexOrder[i] = activeIndices.get((begin + i) % size);
}
//indexOrder是迭代順序夫椭,getObjects返回相關(guān)的Sinks;
return new SpecificOrderIterator<T>(indexOrder, getObjects());
}
getIndexList 實現(xiàn)如下:
protected List<Integer> getIndexList() {
long now = System.currentTimeMillis();
List<Integer> indexList = new ArrayList<Integer>();
int i = 0;
for (T obj : stateMap.keySet()) {
if (!isShouldBackOff() || stateMap.get(obj).restoreTime < now) {
indexList.add(i);
}
i++;
}
return indexList;
}
isShouldBackOff() 表示是否開啟退避算法支持氯庆,如果不開啟蹭秋,則認為每個 Sink 都是存活的,每次都會重試堤撵,通過 agent.sinkgroups.g1.processor.backoff = true 配置開啟仁讨,默認 false ; restoreTime 和之前介紹的 refresh 一樣实昨,是退避補償?shù)却龝r間洞豁,算法類似,就不多介紹了。
那么什么時候調(diào)用 Sink 進行消費呢丈挟?其類似于 SourceRunner 刁卜, Sink 提供了 SinkRunner 進行輪詢拉取處理, SinkRunner 會輪詢調(diào)度 SinkProcessor 消費 Channel 的消息曙咽,然后調(diào)用 Sink 進行轉(zhuǎn)移蛔趴。 SinkProcessor 之前介紹過,其負責(zé)消息復(fù)制 / 路由例朱。
SinkRunner 實現(xiàn)如下:
public void start() {
SinkProcessor policy = getPolicy();
policy.start();
runner = new PollingRunner();
runner.policy = policy;
runner.counterGroup = counterGroup;
runner.shouldStop = new AtomicBoolean();
runnerThread = new Thread(runner);
runnerThread.setName("SinkRunner-PollingRunner-" +
policy.getClass().getSimpleName());
runnerThread.start();
lifecycleState = LifecycleState.START;
}
即獲取 SinkProcessor 然后啟動它夺脾,接著啟動輪詢線程去處理。 PollingRunner 線程負責(zé)輪詢消息茉继,核心實現(xiàn)如下:
public void run() {
while (!shouldStop.get()) { //如果沒有停止
try {
if (policy.process().equals(Sink.Status.BACKOFF)) {//如果處理失敗了咧叭,進行退避補償處理
counterGroup.incrementAndGet("runner.backoffs");
Thread.sleep(Math.min(
counterGroup.incrementAndGet("runner.backoffs.consecutive")
* backoffSleepIncrement, maxBackoffSleep)); //暫停退避補償設(shè)定的超時時間
} else {
counterGroup.set("runner.backoffs.consecutive", 0L);
}
} catch (Exception e) {
try {
Thread.sleep(maxBackoffSleep); //如果遇到異常則等待最大退避時間
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
}
整體實現(xiàn)類似于 PollableSourceRunner 實現(xiàn),整體處理都是交給 SinkProcessor 完成的烁竭。 SinkProcessor 會輪詢 Sink 的 process 方法進行處理菲茬;此處以 LoggerSink 為例:
@Override
public Status process() throws EventDeliveryException {
Status result = Status.READY;
Channel channel = getChannel();
//1、獲取事務(wù)
Transaction transaction = channel.getTransaction();
Event event = null;
try {
//2派撕、開啟事務(wù)
transaction.begin();
//3婉弹、從Channel獲取Event
event = channel.take();
if (event != null) {
if (logger.isInfoEnabled()) {
logger.info("Event: " + EventHelper.dumpEvent(event, maxBytesToLog));
}
} else {//4、如果Channel中沒有Event终吼,則默認進入故障補償機制镀赌,即防止死循環(huán)造成CPU負載高
result = Status.BACKOFF;
}
//5、成功后提交事務(wù)
transaction.commit();
} catch (Exception ex) {
//6际跪、失敗后回滾事務(wù)
transaction.rollback();
throw new EventDeliveryException("Failed to log event: " + event, ex);
} finally {
//7商佛、關(guān)閉事務(wù)
transaction.close();
}
return result;
}
Sink 中一些實現(xiàn)是支持批處理的,比如 RollingFileSink :
//1姆打、開啟事務(wù)
//2良姆、批處理
for (int i = 0; i < batchSize; i++) {
event = channel.take();
if (event != null) {
sinkCounter.incrementEventDrainAttemptCount();
eventAttemptCounter++;
serializer.write(event);
}
}
//3、提交/回滾事務(wù)幔戏、關(guān)閉事務(wù)
定義一個批處理大小然后在事務(wù)中執(zhí)行批處理玛追。