Flume-ng源碼解析之Channel組件

如果還沒看過Flume-ng源碼解析之啟動流程宾毒,可以點擊 Flume-ng源碼解析之啟動流程 查看

1 接口介紹

組件的分析順序是按照上一篇中啟動順序來分析的驼修,首先是Channel,然后是Sink诈铛,最后是Source乙各,在開始看組件源碼之前我們先來看一下兩個重要的接口,一個是LifecycleAware 幢竹,另一個是NamedComponent

1.1 LifecycleAware

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface LifecycleAware {

  public void start();

  public void stop();

  public LifecycleState getLifecycleState();

}

非常簡單就是三個方法耳峦,start()、stop()和getLifecycleState妨退,這個接口是flume好多類都要實現(xiàn)的接口妇萄,包括 Flume-ng源碼解析之啟動流程
所中提到PollingPropertiesFileConfigurationProvider()蜕企,只要涉及到生命周期的都會實現(xiàn)該接口,當(dāng)然組件們也是要實現(xiàn)的冠句!

1.2 NamedComponent

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface NamedComponent {

  public void setName(String name);

  public String getName();

}

這個沒什么好講的轻掩,就是用來設(shè)置名字的。

2 Channel

作為Flume三大核心組件之一的Channel懦底,我們有必要來看看它的構(gòu)成:

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Channel extends LifecycleAware, NamedComponent {

  public void put(Event event) throws ChannelException;
  public Event take() throws ChannelException;
  public Transaction getTransaction();
}

那么從上面的接口中我們可以看到Channel的主要功能就是put()和take()唇牧,那么我們就來看一下它的具體實現(xiàn)。這里我們選擇MemoryChannel作為例子聚唐,但是MemoryChannel太長了丐重,我們就截取一小段來看看

public class MemoryChannel extends BasicChannelSemantics {
    private static Logger LOGGER = LoggerFactory.getLogger(MemoryChannel.class);
    private static final Integer defaultCapacity = Integer.valueOf(100);
    private static final Integer defaultTransCapacity = Integer.valueOf(100);
    
    public MemoryChannel() {
    }

    ...
}

我們又看到它繼承了BasicChannelSemantics ,從名字我們可以看出它是一個基礎(chǔ)的Channel杆查,我們繼續(xù)看看看它的實現(xiàn)

@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class BasicChannelSemantics extends AbstractChannel {

  private ThreadLocal<BasicTransactionSemantics> currentTransaction
      = new ThreadLocal<BasicTransactionSemantics>();

  private boolean initialized = false;

  protected void initialize() {}

  protected abstract BasicTransactionSemantics createTransaction();

  @Override
  public void put(Event event) throws ChannelException {
    BasicTransactionSemantics transaction = currentTransaction.get();
    Preconditions.checkState(transaction != null,
        "No transaction exists for this thread");
    transaction.put(event);
  }

  @Override
  public Event take() throws ChannelException {
    BasicTransactionSemantics transaction = currentTransaction.get();
    Preconditions.checkState(transaction != null,
        "No transaction exists for this thread");
    return transaction.take();
  }

  @Override
  public Transaction getTransaction() {

    if (!initialized) {
      synchronized (this) {
        if (!initialized) {
          initialize();
          initialized = true;
        }
      }
    }

    BasicTransactionSemantics transaction = currentTransaction.get();
    if (transaction == null || transaction.getState().equals(
            BasicTransactionSemantics.State.CLOSED)) {
      transaction = createTransaction();
      currentTransaction.set(transaction);
    }
    return transaction;
  }
}

找了許久扮惦,終于發(fā)現(xiàn)了put()和take(),但是仔細一看亲桦,它們內(nèi)部調(diào)用的是BasicTransactionSemantics 的put()和take()崖蜜,有點失望,繼續(xù)來看看BasicTransactionSemantics

public abstract class BasicTransactionSemantics implements Transaction {

  private State state;
  private long initialThreadId;

  protected void doBegin() throws InterruptedException {}
  protected abstract void doPut(Event event) throws InterruptedException;
  protected abstract Event doTake() throws InterruptedException;
  protected abstract void doCommit() throws InterruptedException;
  protected abstract void doRollback() throws InterruptedException;
  protected void doClose() {}

  protected BasicTransactionSemantics() {
    state = State.NEW;
    initialThreadId = Thread.currentThread().getId();
  }

  protected void put(Event event) {
    Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
        "put() called from different thread than getTransaction()!");
    Preconditions.checkState(state.equals(State.OPEN),
        "put() called when transaction is %s!", state);
    Preconditions.checkArgument(event != null,
        "put() called with null event!");

    try {
      doPut(event);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      throw new ChannelException(e.toString(), e);
    }
  }

  protected Event take() {
    Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
        "take() called from different thread than getTransaction()!");
    Preconditions.checkState(state.equals(State.OPEN),
        "take() called when transaction is %s!", state);

    try {
      return doTake();
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      return null;
    }
  }

  protected State getState() {
    return state;
  }

  ...//我們這里只是討論put和take客峭,所以一些暫時不涉及的方法就被我干掉豫领,有興趣恩典朋友可以自行閱讀

  protected static enum State {
    NEW, OPEN, COMPLETED, CLOSED
  }
}

又是一個抽象類,put()和take()內(nèi)部調(diào)用的還是抽象方法doPut()和doTake()舔琅,看到這里等恐,我相信沒有耐心的同學(xué)已經(jīng)崩潰了,但是就差最后一步了备蚓,既然是抽象類课蔬,那么最終Channel所使用的肯定是它的一個實現(xiàn)類,這時候我們可以回到一開始使用的MemoryChannel星著,到里面找找有沒有線索购笆,一看,MemoryChannel中就藏著個內(nèi)部類

private class MemoryTransaction extends BasicTransactionSemantics {
    private LinkedBlockingDeque<Event> takeList;
    private LinkedBlockingDeque<Event> putList;
    private final ChannelCounter channelCounter;
    private int putByteCounter = 0;
    private int takeByteCounter = 0;

    public MemoryTransaction(int transCapacity, ChannelCounter counter) {
      putList = new LinkedBlockingDeque<Event>(transCapacity);
      takeList = new LinkedBlockingDeque<Event>(transCapacity);

      channelCounter = counter;
    }

    @Override
    protected void doPut(Event event) throws InterruptedException {
      channelCounter.incrementEventPutAttemptCount();
      int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);

      if (!putList.offer(event)) {
        throw new ChannelException(
            "Put queue for MemoryTransaction of capacity " +
            putList.size() + " full, consider committing more frequently, " +
            "increasing capacity or increasing thread count");
      }
      putByteCounter += eventByteSize;
    }

    @Override
    protected Event doTake() throws InterruptedException {
      channelCounter.incrementEventTakeAttemptCount();
      if (takeList.remainingCapacity() == 0) {
        throw new ChannelException("Take list for MemoryTransaction, capacity " +
            takeList.size() + " full, consider committing more frequently, " +
            "increasing capacity, or increasing thread count");
      }
      if (!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) {
        return null;
      }
      Event event;
      synchronized (queueLock) {
        event = queue.poll();
      }
      Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " +
          "signalling existence of entry");
      takeList.put(event);

      int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);
      takeByteCounter += eventByteSize;

      return event;
    }

   //...依然刪除暫時不需要的方法

  }

在這個類中我們可以看到doPut()和doTake()的實現(xiàn)方法虚循,也明白MemoryChannel的put()和take()最終調(diào)用的是MemoryTransaction 的doPut()和doTake()。

有朋友看到這里以為這次解析就要結(jié)束了样傍,其實好戲還在后頭横缔,Channel中還有兩個重要的類ChannelProcessor和ChannelSelector,耐心地聽我慢慢道來衫哥。

3 ChannelProcessor

ChannelProcessor 的作用就是執(zhí)行put操作茎刚,將數(shù)據(jù)放到channel里面腥寇。每個ChannelProcessor實例都會配備一個ChannelSelector來決定event要put到那個channl當(dāng)中

public class ChannelProcessor implements Configurable {
    private static final Logger LOG = LoggerFactory.getLogger(ChannelProcessor.class);
    private final ChannelSelector selector;
    private final InterceptorChain interceptorChain;

    public ChannelProcessor(ChannelSelector selector) {
        this.selector = selector;
        this.interceptorChain = new InterceptorChain();
    }

    public void initialize() {
        this.interceptorChain.initialize();
    }

    public void close() {
        this.interceptorChain.close();
    }

    public void configure(Context context) {
        this.configureInterceptors(context);
    }

    private void configureInterceptors(Context context) {
        //配置攔截器
    }

    public ChannelSelector getSelector() {
        return this.selector;
    }

    public void processEventBatch(List<Event> events) {
        ...
        while(i$.hasNext()) {
            Event optChannel = (Event)i$.next();
            List tx = this.selector.getRequiredChannels(optChannel);

            ...//將event放到Required隊列

            t1 = this.selector.getOptionalChannels(optChannel);

            Object eventQueue;
            ...//將event放到Optional隊列
           
        }

        ...//event的分配操作

    }

    public void processEvent(Event event) {
        event = this.interceptorChain.intercept(event);
        if(event != null) {
            List requiredChannels = this.selector.getRequiredChannels(event);
            Iterator optionalChannels = requiredChannels.iterator();

            ...//event的分配操作

            List optionalChannels1 = this.selector.getOptionalChannels(event);
            Iterator i$1 = optionalChannels1.iterator();

            ...//event的分配操作
        }
    }
}

為了簡化代碼间影,我進行了一些刪除,只保留需要講解的部分艘绍,說白了Channel中的兩個寫入方法,都是需要從作為參數(shù)傳入的selector中獲取對應(yīng)的channel來執(zhí)行event的put操作初狰。接下來我們來看看ChannelSelector

4 ChannelSelector

ChannelSelector是一個接口莫杈,我們可以通過ChannelSelectorFactory來創(chuàng)建它的子類,F(xiàn)lume提供了兩個實現(xiàn)類MultiplexingChannelSelector和ReplicatingChannelSelector奢入。

public interface ChannelSelector extends NamedComponent, Configurable {
    void setChannels(List<Channel> var1);

    List<Channel> getRequiredChannels(Event var1);

    List<Channel> getOptionalChannels(Event var1);

    List<Channel> getAllChannels();
}

通過ChannelSelectorFactory 的create來創(chuàng)建筝闹,create中調(diào)用getSelectorForType來獲得一個selector,通過配置文件中的type來創(chuàng)建相應(yīng)的子類

public class ChannelSelectorFactory {

  private static final Logger LOGGER = LoggerFactory.getLogger(
      ChannelSelectorFactory.class);

  public static ChannelSelector create(List<Channel> channels,
      Map<String, String> config) {

      ...
  }

  public static ChannelSelector create(List<Channel> channels,
      ChannelSelectorConfiguration conf) {
    String type = ChannelSelectorType.REPLICATING.toString();
    if (conf != null) {
      type = conf.getType();
    }
    ChannelSelector selector = getSelectorForType(type);
    selector.setChannels(channels);
    Configurables.configure(selector, conf);
    return selector;
  }

  private static ChannelSelector getSelectorForType(String type) {
    if (type == null || type.trim().length() == 0) {
      return new ReplicatingChannelSelector();
    }

    String selectorClassName = type;
    ChannelSelectorType  selectorType = ChannelSelectorType.OTHER;

    try {
      selectorType = ChannelSelectorType.valueOf(type.toUpperCase(Locale.ENGLISH));
    } catch (IllegalArgumentException ex) {
      LOGGER.debug("Selector type {} is a custom type", type);
    }

    if (!selectorType.equals(ChannelSelectorType.OTHER)) {
      selectorClassName = selectorType.getChannelSelectorClassName();
    }

    ChannelSelector selector = null;

    try {
      @SuppressWarnings("unchecked")
      Class<? extends ChannelSelector> selectorClass =
          (Class<? extends ChannelSelector>) Class.forName(selectorClassName);
      selector = selectorClass.newInstance();
    } catch (Exception ex) {
      throw new FlumeException("Unable to load selector type: " + type
          + ", class: " + selectorClassName, ex);
    }

    return selector;
  }

}

對于這兩種Selector簡單說一下:

1)MultiplexingChannelSelector
下面是一個channel selector 配置文件

agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.optional.CA = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1

MultiplexingChannelSelector類中定義了三個屬性腥光,用于存儲不同類型的channel

    private Map<String, List<Channel>> channelMapping;
    private Map<String, List<Channel>> optionalChannels;
    private List<Channel> defaultChannels;

那么具體分配原則如下:

  • 如果設(shè)置了maping关顷,那么會event肯定會給指定的channel,如果同時設(shè)置了optional武福,也會發(fā)送給optionalchannel
  • 如果沒有設(shè)置maping议双,設(shè)置default,那么event會發(fā)送給defaultchannel捉片,如果還同時設(shè)置了optional聋伦,那么也會發(fā)送給optionalchannel
  • 如果maping和default都沒指定,如果有指定option界睁,那么會發(fā)送給optionalchannel觉增,但是發(fā)送給optionalchannel不會進行失敗重試

2)ReplicatingChannelSelector

分配原則比較簡單

  • 如果是replicating的話,那么如果沒有指定optional翻斟,那么全部channel都有逾礁,如果某個channel指定為option的話,那么就要從requiredChannel移除访惜,只發(fā)送給optionalchannel

5 總結(jié):

作為一個承上啟下的組件嘹履,Channel的作用就是將source來的數(shù)據(jù)通過自己流向sink,那么ChannelProcessor就起到將event put到分配好的channel中债热,而分配的規(guī)則是由selector決定的砾嫉,flume提供的selector有multiplexing和replicating兩種。所以ChannelProcessor一般都是在Source中被調(diào)用窒篱。那么Channel的take()肯定是在Sink中調(diào)用的焕刮。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市墙杯,隨后出現(xiàn)的幾起案子配并,更是在濱河造成了極大的恐慌,老刑警劉巖高镐,帶你破解...
    沈念sama閱讀 218,525評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件溉旋,死亡現(xiàn)場離奇詭異,居然都是意外死亡嫉髓,警方通過查閱死者的電腦和手機观腊,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,203評論 3 395
  • 文/潘曉璐 我一進店門邑闲,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人梧油,你說我怎么就攤上這事苫耸。” “怎么了婶溯?”我有些...
    開封第一講書人閱讀 164,862評論 0 354
  • 文/不壞的土叔 我叫張陵鲸阔,是天一觀的道長。 經(jīng)常有香客問我迄委,道長褐筛,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,728評論 1 294
  • 正文 為了忘掉前任叙身,我火速辦了婚禮渔扎,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘信轿。我一直安慰自己晃痴,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,743評論 6 392
  • 文/花漫 我一把揭開白布财忽。 她就那樣靜靜地躺著倘核,像睡著了一般。 火紅的嫁衣襯著肌膚如雪即彪。 梳的紋絲不亂的頭發(fā)上紧唱,一...
    開封第一講書人閱讀 51,590評論 1 305
  • 那天,我揣著相機與錄音隶校,去河邊找鬼漏益。 笑死,一個胖子當(dāng)著我的面吹牛深胳,可吹牛的內(nèi)容都是我干的绰疤。 我是一名探鬼主播,決...
    沈念sama閱讀 40,330評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼舞终,長吁一口氣:“原來是場噩夢啊……” “哼轻庆!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起权埠,我...
    開封第一講書人閱讀 39,244評論 0 276
  • 序言:老撾萬榮一對情侶失蹤榨了,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后攘蔽,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,693評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡呐粘,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,885評論 3 336
  • 正文 我和宋清朗相戀三年满俗,在試婚紗的時候發(fā)現(xiàn)自己被綠了转捕。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,001評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡唆垃,死狀恐怖五芝,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情辕万,我是刑警寧澤枢步,帶...
    沈念sama閱讀 35,723評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站渐尿,受9級特大地震影響醉途,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜砖茸,卻給世界環(huán)境...
    茶點故事閱讀 41,343評論 3 330
  • 文/蒙蒙 一隘擎、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧凉夯,春花似錦货葬、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,919評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至征绎,卻和暖如春蹲姐,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背炒瘸。 一陣腳步聲響...
    開封第一講書人閱讀 33,042評論 1 270
  • 我被黑心中介騙來泰國打工淤堵, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人顷扩。 一個月前我還...
    沈念sama閱讀 48,191評論 3 370
  • 正文 我出身青樓拐邪,卻偏偏與公主長得像,于是被迫代替她去往敵國和親隘截。 傳聞我的和親對象是個殘疾皇子扎阶,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,955評論 2 355

推薦閱讀更多精彩內(nèi)容

  • 博客原文 翻譯作品,水平有限婶芭,如有錯誤东臀,煩請留言指正。原文請見 官網(wǎng)英文文檔 引言 概述 Apache Flume...
    rabbitGYK閱讀 11,469評論 13 34
  • 介紹 概述 Apache Flume是為有效收集聚合和移動大量來自不同源到中心數(shù)據(jù)存儲而設(shè)計的可分布犀农,可靠的惰赋,可用...
    ximengchj閱讀 3,522評論 0 13
  • 首先所有核心組件都會實現(xiàn)org.apache.flume.lifecycle.LifecycleAware接口: ...
    bobcorbett閱讀 1,246評論 0 2
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器赁濒,智...
    卡卡羅2017閱讀 134,657評論 18 139
  • 更多多肉文章和視頻請訪問我的博客拒炎,感謝支持 想知道怎么種多肉植物挪拟?快跟多肉博士學(xué)知識吧!|多肉植物怎么養(yǎng)?找多...
    多肉博士閱讀 2,191評論 0 1