Flume-ng源碼解析之Sink組件

如果你還沒(méi)看過(guò)Flume-ng源碼解析系列中的啟動(dòng)流程和Channel組件舵揭,可以點(diǎn)擊下面鏈接:
Flume-ng源碼解析之啟動(dòng)流程
Flume-ng源碼解析之Channel組件

作為啟動(dòng)流程中第二個(gè)啟動(dòng)的組件赃春,我們今天來(lái)看看Sink的細(xì)節(jié)

1 Sink

Sink在agent中扮演的角色是消費(fèi)者,將event輸送到特定的位置

首先依然是看代碼滞乙,由代碼我們可以看出Sink是一個(gè)接口,里面最主要的方法是process()垄提,用來(lái)處理從Channel中獲取的數(shù)據(jù)价捧。Sink的實(shí)例是由SinkFactory.create()生成的。

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Sink extends LifecycleAware, NamedComponent {
  public void setChannel(Channel channel);
  public Channel getChannel();
  /* 用來(lái)處理channel中取來(lái)的event*/
  public Status process() throws EventDeliveryException;
  public static enum Status {
    READY, BACKOFF
  }
}

在啟動(dòng)流程中我們了解到Application中啟動(dòng)的不是Sink让蕾,而是SinkRunner浪规,由名字我們可以看出這是一個(gè)驅(qū)動(dòng)類或听。我們來(lái)看看代碼,主要看它的start()

public class SinkRunner implements LifecycleAware {

  ...

  @Override
  public void start() {
    SinkProcessor policy = getPolicy();

    policy.start();

    runner = new PollingRunner();

    runner.policy = policy;
    runner.counterGroup = counterGroup;
    runner.shouldStop = new AtomicBoolean();

    runnerThread = new Thread(runner);
    runnerThread.setName("SinkRunner-PollingRunner-" +
        policy.getClass().getSimpleName());
    runnerThread.start();

    lifecycleState = LifecycleState.START;
  }
  ...

}

我們知道啟動(dòng)SinkRunner實(shí)際上就是調(diào)用它的start()笋婿,而在start()中可以看到主要是啟動(dòng)了一個(gè)SinkProcessor誉裆,而這個(gè)SinkProcessor在創(chuàng)建SinkRunnner的時(shí)候已經(jīng)指定了,如果你想要了解配置文件是如何處理的缸濒,可以要去看看conf包里面的類足丢,可以看看org.apache.flume.node.AbstractConfigurationProvider中的getConfiguration()。

我們接著看看SinkProcessor

public interface SinkProcessor extends LifecycleAware, Configurable {
  Status process() throws EventDeliveryException;
  void setSinks(List<Sink> sinks);
}

SinkProcesor是一個(gè)接口庇配,他的實(shí)現(xiàn)類由SinkProcessorFactory的getProcessor()生成斩跌,在AbstractConfigurationProvider中的loadSinkGroup()調(diào)用SinkGroup中的configure()生成。

public class SinkGroup implements Configurable, ConfigurableComponent {
  List<Sink> sinks;
  SinkProcessor processor;
  SinkGroupConfiguration conf;

  public SinkGroup(List<Sink> groupSinks) {
    sinks = groupSinks;
  }
  
  public SinkProcessor getProcessor() {
    return processor;
  }

  @Override
  public void configure(ComponentConfiguration conf) {
    this.conf = (SinkGroupConfiguration) conf;
    processor =
        SinkProcessorFactory.getProcessor(this.conf.getProcessorContext(),
            sinks);
  }
}

那么我們以DefalutSinkProcessor為例子看看

public class DefaultSinkProcessor implements SinkProcessor, ConfigurableComponent {
  private Sink sink;
  private LifecycleState lifecycleState;

  @Override
  public void start() {
    Preconditions.checkNotNull(sink, "DefaultSinkProcessor sink not set");
    sink.start();
    lifecycleState = LifecycleState.START;
  }

  @Override
  public void stop() {
    Preconditions.checkNotNull(sink, "DefaultSinkProcessor sink not set");
    sink.stop();
    lifecycleState = LifecycleState.STOP;
  }

  @Override
  public LifecycleState getLifecycleState() {
    return lifecycleState;
  }

  @Override
  public void configure(Context context) {
  }

  @Override
  public Status process() throws EventDeliveryException {
    return sink.process();
  }

  @Override
  public void setSinks(List<Sink> sinks) {
    Preconditions.checkNotNull(sinks);
    Preconditions.checkArgument(sinks.size() == 1, "DefaultSinkPolicy can "
        + "only handle one sink, "
        + "try using a policy that supports multiple sinks");
    sink = sinks.get(0);
  }

  @Override
  public void configure(ComponentConfiguration conf) {

  }

}

從上面的代碼中我們可以看到SinkProcessor執(zhí)行的還是sink的start捞慌、stop和process方法耀鸦,那么SinkProcessor的作用是什么,F(xiàn)lume提供leFailoverSinkProcessor和LoadBalancingSinkProcessor啸澡,顧名思義袖订,一個(gè)是失效備援,一個(gè)是負(fù)載均衡锻霎,那么SinkProcessor不同子類的存在就是為了實(shí)現(xiàn)不同的分配操作和策略著角,而sink的start()通常是啟動(dòng)線程去執(zhí)行消費(fèi)操作。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末旋恼,一起剝皮案震驚了整個(gè)濱河市吏口,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌冰更,老刑警劉巖产徊,帶你破解...
    沈念sama閱讀 218,755評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異蜀细,居然都是意外死亡舟铜,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén)奠衔,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)谆刨,“玉大人,你說(shuō)我怎么就攤上這事归斤∪玻” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,138評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵脏里,是天一觀的道長(zhǎng)她我。 經(jīng)常有香客問(wèn)我,道長(zhǎng),這世上最難降的妖魔是什么番舆? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,791評(píng)論 1 295
  • 正文 為了忘掉前任酝碳,我火速辦了婚禮,結(jié)果婚禮上恨狈,老公的妹妹穿的比我還像新娘疏哗。我一直安慰自己,他們只是感情好拴事,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,794評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布沃斤。 她就那樣靜靜地躺著,像睡著了一般刃宵。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上徘公,一...
    開(kāi)封第一講書(shū)人閱讀 51,631評(píng)論 1 305
  • 那天牲证,我揣著相機(jī)與錄音,去河邊找鬼关面。 笑死坦袍,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的等太。 我是一名探鬼主播捂齐,決...
    沈念sama閱讀 40,362評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼缩抡!你這毒婦竟也來(lái)了奠宜?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,264評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤瞻想,失蹤者是張志新(化名)和其女友劉穎压真,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體蘑险,經(jīng)...
    沈念sama閱讀 45,724評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡滴肿,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了佃迄。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片泼差。...
    茶點(diǎn)故事閱讀 40,040評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖呵俏,靈堂內(nèi)的尸體忽然破棺而出堆缘,到底是詐尸還是另有隱情,我是刑警寧澤柴信,帶...
    沈念sama閱讀 35,742評(píng)論 5 346
  • 正文 年R本政府宣布套啤,位于F島的核電站,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏潜沦。R本人自食惡果不足惜萄涯,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,364評(píng)論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望唆鸡。 院中可真熱鬧涝影,春花似錦、人聲如沸争占。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,944評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)臂痕。三九已至伯襟,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間握童,已是汗流浹背姆怪。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,060評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留澡绩,地道東北人稽揭。 一個(gè)月前我還...
    沈念sama閱讀 48,247評(píng)論 3 371
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像肥卡,于是被迫代替她去往敵國(guó)和親溪掀。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,979評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • 博客原文 翻譯作品步鉴,水平有限揪胃,如有錯(cuò)誤,煩請(qǐng)留言指正唠叛。原文請(qǐng)見(jiàn) 官網(wǎng)英文文檔 引言 概述 Apache Flume...
    rabbitGYK閱讀 11,469評(píng)論 13 34
  • 如果還沒(méi)看過(guò)Flume-ng源碼解析之啟動(dòng)流程只嚣,可以點(diǎn)擊 Flume-ng源碼解析之啟動(dòng)流程 查看 1 接口介紹 ...
    和心數(shù)據(jù)閱讀 661評(píng)論 0 1
  • 如果你還沒(méi)看過(guò)Flume-ng源碼解析系列中的啟動(dòng)流程、Channel組件和Sink組件艺沼,可以點(diǎn)擊下面鏈接:Flu...
    和心數(shù)據(jù)閱讀 956評(píng)論 0 2
  • 介紹 概述 Apache Flume是為有效收集聚合和移動(dòng)大量來(lái)自不同源到中心數(shù)據(jù)存儲(chǔ)而設(shè)計(jì)的可分布册舞,可靠的,可用...
    ximengchj閱讀 3,525評(píng)論 0 13
  • 在某個(gè)Logstash的場(chǎng)景下障般,我產(chǎn)生了為什么不能用Flume代替Logstash的疑問(wèn)调鲸,因此查閱了不少材料在這里...
    汝南魔法師閱讀 18,858評(píng)論 5 61