Flume源碼分析之啟動Flume

整體的event傳輸流程如下圖:

flume傳輸流程.png

1. 配置文件的解析和相關(guān)組件的加載

通過如下命令即可啟動flume進程。

bin/flume-ng agent -n name -c conf -f jobs/a.conf 

入口函數(shù)是flume-ng-node子項目中的Application的main方法育灸。先通過Commons-cli對命令行進行解析磅崭,獲取name和file,其中file就是配置文件柔逼。

本小結(jié)是根據(jù)配置文件生成MaterializedConfiguration對象

這個過程大體上分為如下步驟:

  • 讀取配置文件properties
  • loadChannels(agentConf, channelComponentMap)愉适,利用反射根據(jù)name和type創(chuàng)建Channel實例
  • loadSources(agentConf, channelComponentMap, sourceRunnerMap)癣漆,利用反射根據(jù)name和type創(chuàng)建Source實例惠爽,并創(chuàng)建ChannelSelector和ChannelProcessor,將source關(guān)聯(lián)到channel
  • loadSinks(agentConf, channelComponentMap, sinkRunnerMap)费坊,利用反射根據(jù)name和type創(chuàng)建Sink實例,并將sink關(guān)聯(lián)到channel讨越,創(chuàng)建SinkProcessor和SinkRunner

配置文件的讀取有兩種方式:通過ZooKeeper和配置文件把跨。

當(dāng)讀取的是配置文件着逐,有選項控制是否當(dāng)配置文件變更后,進行reload操作健芭。

        List<LifecycleAware> components = Lists.newArrayList();

        if (reload) {
          EventBus eventBus = new EventBus(agentName + "-event-bus");
          PollingPropertiesFileConfigurationProvider configurationProvider =
              new PollingPropertiesFileConfigurationProvider(
                  agentName, configurationFile, eventBus, 30);
          components.add(configurationProvider);
          application = new Application(components);
          eventBus.register(application);
        } else {
          //解析配置文件的前期準(zhǔn)備
          PropertiesFileConfigurationProvider configurationProvider =
              new PropertiesFileConfigurationProvider(agentName, configurationFile);
          application = new Application();

          //configurationProvider.getConfiguration() 真正開始解析配置文件
          //并根據(jù)配置文件啟動flume
          application.handleConfigurationEvent(configurationProvider.getConfiguration());
        }

解析配置文件的前期準(zhǔn)備

public class PropertiesFileConfigurationProvider extends
    AbstractConfigurationProvider {

  private final File file;

  public PropertiesFileConfigurationProvider(String agentName, File file) {
    super(agentName);
    this.file = file;
  }

  //以Properties的方式實現(xiàn)配置文件的讀取
  @Override
  public FlumeConfiguration getFlumeConfiguration() {
    BufferedReader reader = null;
    try {
      reader = new BufferedReader(new FileReader(file));
      Properties properties = new Properties();
      properties.load(reader);
      return new FlumeConfiguration(toMap(properties));
    } catch (IOException ex) {
      LOGGER.error("Unable to load file:" + file
          + " (I/O failure) - Exception follows.", ex);
    } finally {
      if (reader != null) {
        try {
          reader.close();
        } catch (IOException ex) {
          LOGGER.warn(
              "Unable to close file reader for file: " + file, ex);
        }
      }
    }
    return new FlumeConfiguration(new HashMap<String, String>());
  }
}
public abstract class AbstractConfigurationProvider implements ConfigurationProvider {

  private final String agentName;
  private final SourceFactory sourceFactory;
  private final SinkFactory sinkFactory;
  private final ChannelFactory channelFactory;

  private final Map<Class<? extends Channel>, Map<String, Channel>> channelCache;

  public AbstractConfigurationProvider(String agentName) {
    super();
    this.agentName = agentName;
    this.sourceFactory = new DefaultSourceFactory();
    this.sinkFactory = new DefaultSinkFactory();
    this.channelFactory = new DefaultChannelFactory();

    channelCache = new HashMap<Class<? extends Channel>, Map<String, Channel>>();
  }

  //由子類具體實現(xiàn)配置文件的讀取
  protected abstract FlumeConfiguration getFlumeConfiguration();

  //根據(jù)配置文件,生成MaterializedConfiguration
  public MaterializedConfiguration getConfiguration() {
    MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
    FlumeConfiguration fconfig = getFlumeConfiguration();//鉤子方法痒留,實現(xiàn)配置文件的讀取
    AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());
    if (agentConf != null) {
      // SourceRunner伸头,SinkRunner用于對Source和Sink進行驅(qū)動
      Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
      Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
      Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
      try {
        //根據(jù)配置信息恤磷,裝載channel相關(guān)數(shù)據(jù)
        loadChannels(agentConf, channelComponentMap);
        //根據(jù)配置信息和channel數(shù)據(jù),裝載Source
        loadSources(agentConf, channelComponentMap, sourceRunnerMap);
        //根據(jù)配置信息和channel數(shù)據(jù)精绎,裝載Sink
        loadSinks(agentConf, channelComponentMap, sinkRunnerMap);
        
        Set<String> channelNames = new HashSet<String>(channelComponentMap.keySet());
        //遍歷channel
        for (String channelName : channelNames) {
          ChannelComponent channelComponent = channelComponentMap.get(channelName);
          if (channelComponent.components.isEmpty()) {
            省略一些異常處理...
          } else {
            //完成channel的映射
            conf.addChannel(channelName, channelComponent.channel);
          }
        }
        for (Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
          //完成source和SourceRunner的映射
          conf.addSourceRunner(entry.getKey(), entry.getValue());
        }
        for (Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
          //完成sink和SinkRunner的映射
          conf.addSinkRunner(entry.getKey(), entry.getValue());
        }
      } catch (InstantiationException ex) {
        LOGGER.error("Failed to instantiate component", ex);
      } finally {
        channelComponentMap.clear();
        sourceRunnerMap.clear();
        sinkRunnerMap.clear();
      }
    } else {
      LOGGER.warn("No configuration found for this host:{}", getAgentName());
    }
    return conf;
  }

1.1 根據(jù)配置文件創(chuàng)建Channel實例

有幾種默認的Channel類型定義代乃,當(dāng)type是已知的幾種時或者是自定義Channel時搁吓,都可以獲取到全限定類名吭历,根據(jù)類名晌区,通過反射,完成channel實例創(chuàng)建恼五。


  //裝載Channel
  private void loadChannels(AgentConfiguration agentConf,
      Map<String, ChannelComponent> channelComponentMap)
          throws InstantiationException {
    Set<String> channelNames = agentConf.getChannelSet();
    Map<String, ComponentConfiguration> compMap = agentConf.getChannelConfigMap();
    
    for (String chName : channelNames) {
      ComponentConfiguration comp = compMap.get(chName);
      if (comp != null) {
        //根據(jù)channel名稱和類型創(chuàng)建Channel實例
        Channel channel = getOrCreateChannel(channelsNotReused,
            comp.getComponentName(), comp.getType());
        try {
          Configurables.configure(channel, comp);
          channelComponentMap.put(comp.getComponentName(),
              new ChannelComponent(channel));
          LOGGER.info("Created channel " + chName);
        } catch (Exception e) {
          String msg = String.format("Channel %s has been removed due to an " +
              "error during configuration", chName);
          LOGGER.error(msg, e);
        }
      }
    }
  }

  private Channel getOrCreateChannel(
      ListMultimap<Class<? extends Channel>, String> channelsNotReused,
      String name, String type)
      throws FlumeException {

    //根據(jù)Channel的type獲取Class
    Class<? extends Channel> channelClass = channelFactory.getClass(type);
    
    if (channelClass.isAnnotationPresent(Disposable.class)) {
      Channel channel = channelFactory.create(name, type);
      channel.setName(name);
      return channel;
    }
    Map<String, Channel> channelMap = channelCache.get(channelClass);
    if (channelMap == null) {
      channelMap = new HashMap<String, Channel>();
      channelCache.put(channelClass, channelMap);
    }
    Channel channel = channelMap.get(name);
    if (channel == null) {
      //根據(jù)channel的name和type創(chuàng)建Channel對象
      channel = channelFactory.create(name, type);
      channel.setName(name);
      channelMap.put(name, channel);
    }
    channelsNotReused.get(channelClass).remove(name);
    return channel;
  }


public class DefaultChannelFactory implements ChannelFactory {
  @Override
  public Channel create(String name, String type) throws FlumeException {
    Preconditions.checkNotNull(name, "name");
    Preconditions.checkNotNull(type, "type");

    //根據(jù)type獲取Channel的class信息
    Class<? extends Channel> channelClass = getClass(type);
    try {
      //創(chuàng)建Channel
      return channelClass.newInstance();
    } catch (Exception ex) {
      throw new FlumeException("Unable to create channel: " + name
          + ", type: " + type + ", class: " + channelClass.getName(), ex);
    }
  }

  @SuppressWarnings("unchecked")
  @Override
  public Class<? extends Channel> getClass(String type) throws FlumeException {
    String channelClassName = type;
    ChannelType channelType = ChannelType.OTHER;
    try {
      //全部轉(zhuǎn)緩存大寫 
      channelType = ChannelType.valueOf(type.toUpperCase(Locale.ENGLISH));
    } catch (IllegalArgumentException ex) {
      logger.debug("Channel type {} is a custom type", type);
    }
    if (!channelType.equals(ChannelType.OTHER)) {
      //默認的幾種類型
      channelClassName = channelType.getChannelClassName();
    }
    try {
      //自定義type遣总,全路徑限制類
      return (Class<? extends Channel>) Class.forName(channelClassName);
    } catch (Exception ex) {
      throw new FlumeException("Unable to load channel type: " + type
          + ", class: " + channelClassName, ex);
    }
  }
}

幾種默認的Channel類型,都匹配不上的容达,就是自定義Channel董饰。

public enum ChannelType {
  OTHER(null),
  FILE("org.apache.flume.channel.file.FileChannel"),
  MEMORY("org.apache.flume.channel.MemoryChannel"),
  JDBC("org.apache.flume.channel.jdbc.JdbcChannel"),
  SPILLABLEMEMORY("org.apache.flume.channel.SpillableMemoryChannel");

  private final String channelClassName;

  private ChannelType(String channelClassName) {
    this.channelClassName = channelClassName;
  }

  public String getChannelClassName() {
    return channelClassName;
  }
}

1.2 根據(jù)配置文件完成Source加載

source的實例化部分與channel的相同。完成source實例化后啄栓,還需要將Source和Channel進行關(guān)聯(lián)昙楚,以及設(shè)置

 private void loadSources(AgentConfiguration agentConf,
      Map<String, ChannelComponent> channelComponentMap,
      Map<String, SourceRunner> sourceRunnerMap)
      throws InstantiationException {

    Set<String> sourceNames = agentConf.getSourceSet();
    Map<String, ComponentConfiguration> compMap =
        agentConf.getSourceConfigMap();
    
    //遍歷source
    for (String sourceName : sourceNames) {
      ComponentConfiguration comp = compMap.get(sourceName);
      if (comp != null) {
        SourceConfiguration config = (SourceConfiguration) comp;

        //與channel的實例化類似诈嘿,根據(jù)source的name和type奖亚,通過反射創(chuàng)建Source實例
        Source source = sourceFactory.create(comp.getComponentName(),
            comp.getType());
        try {

          //獲取該source關(guān)聯(lián)的channel信息
          Configurables.configure(source, config);
          Set<String> channelNames = config.getChannels();
          List<Channel> sourceChannels = new ArrayList<Channel>();
          for (String chName : channelNames) {
            ChannelComponent channelComponent = channelComponentMap.get(chName);
            if (channelComponent != null) {
              sourceChannels.add(channelComponent.channel);
            }
          }

          if (sourceChannels.isEmpty()) {
            //不允許source沒有channel
            String msg = String.format("Source %s is not connected to a " +
                "channel",  sourceName);
            throw new IllegalStateException(msg);
          }

          ChannelSelectorConfiguration selectorConfig =
              config.getSelectorConfiguration();

          //實例化ChannelSelector昔字,默認是復(fù)制策略
          ChannelSelector selector = ChannelSelectorFactory.create(
              sourceChannels, selectorConfig);

            //創(chuàng)建ChannelProcessor
          ChannelProcessor channelProcessor = new ChannelProcessor(selector);
          Configurables.configure(channelProcessor, config);
          source.setChannelProcessor(channelProcessor);
          sourceRunnerMap.put(comp.getComponentName(),
              SourceRunner.forSource(source));//source和SourceRunner進行映射
              
          for (Channel channel : sourceChannels) {
            ChannelComponent channelComponent =
                Preconditions.checkNotNull(channelComponentMap.get(channel.getName()),
                                           String.format("Channel %s", channel.getName()));
            channelComponent.components.add(sourceName);
          }
        } catch (Exception e) {
          String msg = String.format("Source %s has been removed due to an " +
              "error during configuration", sourceName);
          LOGGER.error(msg, e);
        }
      }
    }
  }

有兩種提供的Source類型作郭,同理也有兩種提供的SourceRunner夹攒。

public abstract class SourceRunner implements LifecycleAware {
  private Source source;

  public static SourceRunner forSource(Source source) {
    SourceRunner runner = null;

    if (source instanceof PollableSource) {
      runner = new PollableSourceRunner();
      ((PollableSourceRunner) runner).setSource((PollableSource) source);
    } else if (source instanceof EventDrivenSource) {
      runner = new EventDrivenSourceRunner();
      ((EventDrivenSourceRunner) runner).setSource((EventDrivenSource) source);
    } else {
      throw new IllegalArgumentException("No known runner type for source "
          + source);
    }
    return runner;
  }
}

提供了兩種ChannelSelector類型,一種是REPLICATING(復(fù)制)压语,一種是MULTIPLEXING(多路分發(fā))

public enum ChannelSelectorType {
  OTHER(null),
  REPLICATING("org.apache.flume.channel.ReplicatingChannelSelector"),
  MULTIPLEXING("org.apache.flume.channel.MultiplexingChannelSelector");
  private final String channelSelectorClassName;
  private ChannelSelectorType(String channelSelectorClassName) {
    this.channelSelectorClassName = channelSelectorClassName;
  }
  public String getChannelSelectorClassName() {
    return channelSelectorClassName;
  }
}

1.3 根據(jù)配置文件加載sink

private void loadSinks(AgentConfiguration agentConf,
      Map<String, ChannelComponent> channelComponentMap, Map<String, SinkRunner> sinkRunnerMap)
      throws InstantiationException {
    Set<String> sinkNames = agentConf.getSinkSet();
    Map<String, ComponentConfiguration> compMap =
        agentConf.getSinkConfigMap();
    Map<String, Sink> sinks = new HashMap<String, Sink>();

    //遍歷sink
    for (String sinkName : sinkNames) {
      ComponentConfiguration comp = compMap.get(sinkName);
      if (comp != null) {
        SinkConfiguration config = (SinkConfiguration) comp;
        //實例化sink
        Sink sink = sinkFactory.create(comp.getComponentName(), comp.getType());
        try {
          Configurables.configure(sink, config);
          ChannelComponent channelComponent = channelComponentMap.get(config.getChannel());
          if (channelComponent == null) {
            String msg = String.format("Sink %s is not connected to a " +
                "channel",  sinkName);
            throw new IllegalStateException(msg);
          }
          sink.setChannel(channelComponent.channel);
          sinks.put(comp.getComponentName(), sink);
          channelComponent.components.add(sinkName);
        } catch (Exception e) {
          String msg = String.format("Sink %s has been removed due to an " +
              "error during configuration", sinkName);
          LOGGER.error(msg, e);
        }
      }
    }
       
    loadSinkGroups(agentConf, sinks, sinkRunnerMap);
  }

  private void loadSinkGroups(AgentConfiguration agentConf,
      Map<String, Sink> sinks, Map<String, SinkRunner> sinkRunnerMap)
          throws InstantiationException {
    Set<String> sinkGroupNames = agentConf.getSinkgroupSet();
    Map<String, ComponentConfiguration> compMap =
        agentConf.getSinkGroupConfigMap();
    Map<String, String> usedSinks = new HashMap<String, String>();
    for (String groupName: sinkGroupNames) {
      ComponentConfiguration comp = compMap.get(groupName);
      if (comp != null) {
        SinkGroupConfiguration groupConf = (SinkGroupConfiguration) comp;
        List<Sink> groupSinks = new ArrayList<Sink>();
        for (String sink : groupConf.getSinks()) {
          Sink s = sinks.remove(sink);
          if (s == null) {
            String sinkUser = usedSinks.get(sink);
            if (sinkUser != null) {
              throw new InstantiationException(String.format(
                  "Sink %s of group %s already " +
                      "in use by group %s", sink, groupName, sinkUser));
            } else {
              throw new InstantiationException(String.format(
                  "Sink %s of group %s does "
                      + "not exist or is not properly configured", sink,
                      groupName));
            }
          }
          groupSinks.add(s);
          usedSinks.put(sink, groupName);
        }
        try {
          SinkGroup group = new SinkGroup(groupSinks);
          Configurables.configure(group, groupConf);
          sinkRunnerMap.put(comp.getComponentName(),
              new SinkRunner(group.getProcessor()));
        } catch (Exception e) {
          String msg = String.format("SinkGroup %s has been removed due to " +
              "an error during configuration", groupName);
          LOGGER.error(msg, e);
        }
      }
    }
    // add any unassigned sinks to solo collectors
    for (Entry<String, Sink> entry : sinks.entrySet()) {
      if (!usedSinks.containsValue(entry.getKey())) {
        try {

          //創(chuàng)建SinkProcessor
          SinkProcessor pr = new DefaultSinkProcessor();
          List<Sink> sinkMap = new ArrayList<Sink>();
          sinkMap.add(entry.getValue());
          pr.setSinks(sinkMap);
          Configurables.configure(pr, new Context());
          
          //完成Sink和Sink映射
          sinkRunnerMap.put(entry.getKey(), new SinkRunner(pr));
        } catch (Exception e) {
          String msg = String.format("SinkGroup %s has been removed due to " +
              "an error during configuration", entry.getKey());
          LOGGER.error(msg, e);
        }
      }
    }
  }

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末训桶,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子谤专,更是在濱河造成了極大的恐慌午绳,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,454評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蜡坊,死亡現(xiàn)場離奇詭異秕衙,居然都是意外死亡,警方通過查閱死者的電腦和手機鹦牛,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,553評論 3 385
  • 文/潘曉璐 我一進店門曼追,熙熙樓的掌柜王于貴愁眉苦臉地迎上來汉规,“玉大人,你說我怎么就攤上這事膏燕∥蛎瘢” “怎么了射亏?”我有些...
    開封第一講書人閱讀 157,921評論 0 348
  • 文/不壞的土叔 我叫張陵智润,是天一觀的道長。 經(jīng)常有香客問我窟绷,道長兼蜈,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,648評論 1 284
  • 正文 為了忘掉前任歼郭,我火速辦了婚禮病曾,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘鲫竞。我一直安慰自己负敏,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 65,770評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著妖泄,像睡著了一般。 火紅的嫁衣襯著肌膚如雪渊季。 梳的紋絲不亂的頭發(fā)上罚渐,一...
    開封第一講書人閱讀 49,950評論 1 291
  • 那天荷并,我揣著相機與錄音,去河邊找鬼翩伪。 笑死谈息,一個胖子當(dāng)著我的面吹牛侠仇,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播互亮,決...
    沈念sama閱讀 39,090評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼嗅骄,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了饼疙?” 一聲冷哼從身側(cè)響起溺森,我...
    開封第一講書人閱讀 37,817評論 0 268
  • 序言:老撾萬榮一對情侶失蹤慕爬,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后屏积,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體医窿,經(jīng)...
    沈念sama閱讀 44,275評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,592評論 2 327
  • 正文 我和宋清朗相戀三年炊林,在試婚紗的時候發(fā)現(xiàn)自己被綠了姥卢。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,724評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡独榴,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出奕枝,到底是詐尸還是另有隱情棺榔,我是刑警寧澤,帶...
    沈念sama閱讀 34,409評論 4 333
  • 正文 年R本政府宣布隘道,位于F島的核電站症歇,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏谭梗。R本人自食惡果不足惜忘晤,卻給世界環(huán)境...
    茶點故事閱讀 40,052評論 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望激捏。 院中可真熱鬧设塔,春花似錦、人聲如沸缩幸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,815評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽表谊。三九已至钞护,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間爆办,已是汗流浹背难咕。 一陣腳步聲響...
    開封第一講書人閱讀 32,043評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留距辆,地道東北人余佃。 一個月前我還...
    沈念sama閱讀 46,503評論 2 361
  • 正文 我出身青樓,卻偏偏與公主長得像跨算,于是被迫代替她去往敵國和親爆土。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,627評論 2 350