如果你還沒(méi)看過(guò)Flume-ng源碼解析系列中的啟動(dòng)流程和Channel組件舵揭,可以點(diǎn)擊下面鏈接:
Flume-ng源碼解析之啟動(dòng)流程
Flume-ng源碼解析之Channel組件
作為啟動(dòng)流程中第二個(gè)啟動(dòng)的組件赃春,我們今天來(lái)看看Sink的細(xì)節(jié)
1 Sink
Sink在agent中扮演的角色是消費(fèi)者,將event輸送到特定的位置
首先依然是看代碼滞乙,由代碼我們可以看出Sink是一個(gè)接口,里面最主要的方法是process()垄提,用來(lái)處理從Channel中獲取的數(shù)據(jù)价捧。Sink的實(shí)例是由SinkFactory.create()生成的。
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Sink extends LifecycleAware, NamedComponent {
public void setChannel(Channel channel);
public Channel getChannel();
/* 用來(lái)處理channel中取來(lái)的event*/
public Status process() throws EventDeliveryException;
public static enum Status {
READY, BACKOFF
}
}
在啟動(dòng)流程中我們了解到Application中啟動(dòng)的不是Sink让蕾,而是SinkRunner浪规,由名字我們可以看出這是一個(gè)驅(qū)動(dòng)類或听。我們來(lái)看看代碼,主要看它的start()
public class SinkRunner implements LifecycleAware {
...
@Override
public void start() {
SinkProcessor policy = getPolicy();
policy.start();
runner = new PollingRunner();
runner.policy = policy;
runner.counterGroup = counterGroup;
runner.shouldStop = new AtomicBoolean();
runnerThread = new Thread(runner);
runnerThread.setName("SinkRunner-PollingRunner-" +
policy.getClass().getSimpleName());
runnerThread.start();
lifecycleState = LifecycleState.START;
}
...
}
我們知道啟動(dòng)SinkRunner實(shí)際上就是調(diào)用它的start()笋婿,而在start()中可以看到主要是啟動(dòng)了一個(gè)SinkProcessor誉裆,而這個(gè)SinkProcessor在創(chuàng)建SinkRunnner的時(shí)候已經(jīng)指定了,如果你想要了解配置文件是如何處理的缸濒,可以要去看看conf包里面的類足丢,可以看看org.apache.flume.node.AbstractConfigurationProvider中的getConfiguration()。
我們接著看看SinkProcessor
public interface SinkProcessor extends LifecycleAware, Configurable {
Status process() throws EventDeliveryException;
void setSinks(List<Sink> sinks);
}
SinkProcesor是一個(gè)接口庇配,他的實(shí)現(xiàn)類由SinkProcessorFactory的getProcessor()生成斩跌,在AbstractConfigurationProvider中的loadSinkGroup()調(diào)用SinkGroup中的configure()生成。
public class SinkGroup implements Configurable, ConfigurableComponent {
List<Sink> sinks;
SinkProcessor processor;
SinkGroupConfiguration conf;
public SinkGroup(List<Sink> groupSinks) {
sinks = groupSinks;
}
public SinkProcessor getProcessor() {
return processor;
}
@Override
public void configure(ComponentConfiguration conf) {
this.conf = (SinkGroupConfiguration) conf;
processor =
SinkProcessorFactory.getProcessor(this.conf.getProcessorContext(),
sinks);
}
}
那么我們以DefalutSinkProcessor為例子看看
public class DefaultSinkProcessor implements SinkProcessor, ConfigurableComponent {
private Sink sink;
private LifecycleState lifecycleState;
@Override
public void start() {
Preconditions.checkNotNull(sink, "DefaultSinkProcessor sink not set");
sink.start();
lifecycleState = LifecycleState.START;
}
@Override
public void stop() {
Preconditions.checkNotNull(sink, "DefaultSinkProcessor sink not set");
sink.stop();
lifecycleState = LifecycleState.STOP;
}
@Override
public LifecycleState getLifecycleState() {
return lifecycleState;
}
@Override
public void configure(Context context) {
}
@Override
public Status process() throws EventDeliveryException {
return sink.process();
}
@Override
public void setSinks(List<Sink> sinks) {
Preconditions.checkNotNull(sinks);
Preconditions.checkArgument(sinks.size() == 1, "DefaultSinkPolicy can "
+ "only handle one sink, "
+ "try using a policy that supports multiple sinks");
sink = sinks.get(0);
}
@Override
public void configure(ComponentConfiguration conf) {
}
}
從上面的代碼中我們可以看到SinkProcessor執(zhí)行的還是sink的start捞慌、stop和process方法耀鸦,那么SinkProcessor的作用是什么,F(xiàn)lume提供leFailoverSinkProcessor和LoadBalancingSinkProcessor啸澡,顧名思義袖订,一個(gè)是失效備援,一個(gè)是負(fù)載均衡锻霎,那么SinkProcessor不同子類的存在就是為了實(shí)現(xiàn)不同的分配操作和策略著角,而sink的start()通常是啟動(dòng)線程去執(zhí)行消費(fèi)操作。