整體的event傳輸流程如下圖:
1. 配置文件的解析和相關(guān)組件的加載
通過如下命令即可啟動flume進程。
bin/flume-ng agent -n name -c conf -f jobs/a.conf
入口函數(shù)是flume-ng-node
子項目中的Application的main方法育灸。先通過Commons-cli對命令行進行解析磅崭,獲取name和file,其中file就是配置文件柔逼。
本小結(jié)是根據(jù)配置文件生成MaterializedConfiguration對象
這個過程大體上分為如下步驟:
- 讀取配置文件properties
- loadChannels(agentConf, channelComponentMap)愉适,利用反射根據(jù)name和type創(chuàng)建Channel實例
- loadSources(agentConf, channelComponentMap, sourceRunnerMap)癣漆,利用反射根據(jù)name和type創(chuàng)建Source實例惠爽,并創(chuàng)建ChannelSelector和ChannelProcessor,將source關(guān)聯(lián)到channel
- loadSinks(agentConf, channelComponentMap, sinkRunnerMap)费坊,利用反射根據(jù)name和type創(chuàng)建Sink實例,并將sink關(guān)聯(lián)到channel讨越,創(chuàng)建SinkProcessor和SinkRunner
配置文件的讀取有兩種方式:通過ZooKeeper和配置文件把跨。
當(dāng)讀取的是配置文件着逐,有選項控制是否當(dāng)配置文件變更后,進行reload操作健芭。
List<LifecycleAware> components = Lists.newArrayList();
if (reload) {
EventBus eventBus = new EventBus(agentName + "-event-bus");
PollingPropertiesFileConfigurationProvider configurationProvider =
new PollingPropertiesFileConfigurationProvider(
agentName, configurationFile, eventBus, 30);
components.add(configurationProvider);
application = new Application(components);
eventBus.register(application);
} else {
//解析配置文件的前期準(zhǔn)備
PropertiesFileConfigurationProvider configurationProvider =
new PropertiesFileConfigurationProvider(agentName, configurationFile);
application = new Application();
//configurationProvider.getConfiguration() 真正開始解析配置文件
//并根據(jù)配置文件啟動flume
application.handleConfigurationEvent(configurationProvider.getConfiguration());
}
解析配置文件的前期準(zhǔn)備
public class PropertiesFileConfigurationProvider extends
AbstractConfigurationProvider {
private final File file;
public PropertiesFileConfigurationProvider(String agentName, File file) {
super(agentName);
this.file = file;
}
//以Properties的方式實現(xiàn)配置文件的讀取
@Override
public FlumeConfiguration getFlumeConfiguration() {
BufferedReader reader = null;
try {
reader = new BufferedReader(new FileReader(file));
Properties properties = new Properties();
properties.load(reader);
return new FlumeConfiguration(toMap(properties));
} catch (IOException ex) {
LOGGER.error("Unable to load file:" + file
+ " (I/O failure) - Exception follows.", ex);
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException ex) {
LOGGER.warn(
"Unable to close file reader for file: " + file, ex);
}
}
}
return new FlumeConfiguration(new HashMap<String, String>());
}
}
public abstract class AbstractConfigurationProvider implements ConfigurationProvider {
private final String agentName;
private final SourceFactory sourceFactory;
private final SinkFactory sinkFactory;
private final ChannelFactory channelFactory;
private final Map<Class<? extends Channel>, Map<String, Channel>> channelCache;
public AbstractConfigurationProvider(String agentName) {
super();
this.agentName = agentName;
this.sourceFactory = new DefaultSourceFactory();
this.sinkFactory = new DefaultSinkFactory();
this.channelFactory = new DefaultChannelFactory();
channelCache = new HashMap<Class<? extends Channel>, Map<String, Channel>>();
}
//由子類具體實現(xiàn)配置文件的讀取
protected abstract FlumeConfiguration getFlumeConfiguration();
//根據(jù)配置文件,生成MaterializedConfiguration
public MaterializedConfiguration getConfiguration() {
MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
FlumeConfiguration fconfig = getFlumeConfiguration();//鉤子方法痒留,實現(xiàn)配置文件的讀取
AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());
if (agentConf != null) {
// SourceRunner伸头,SinkRunner用于對Source和Sink進行驅(qū)動
Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
try {
//根據(jù)配置信息恤磷,裝載channel相關(guān)數(shù)據(jù)
loadChannels(agentConf, channelComponentMap);
//根據(jù)配置信息和channel數(shù)據(jù),裝載Source
loadSources(agentConf, channelComponentMap, sourceRunnerMap);
//根據(jù)配置信息和channel數(shù)據(jù)精绎,裝載Sink
loadSinks(agentConf, channelComponentMap, sinkRunnerMap);
Set<String> channelNames = new HashSet<String>(channelComponentMap.keySet());
//遍歷channel
for (String channelName : channelNames) {
ChannelComponent channelComponent = channelComponentMap.get(channelName);
if (channelComponent.components.isEmpty()) {
省略一些異常處理...
} else {
//完成channel的映射
conf.addChannel(channelName, channelComponent.channel);
}
}
for (Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
//完成source和SourceRunner的映射
conf.addSourceRunner(entry.getKey(), entry.getValue());
}
for (Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
//完成sink和SinkRunner的映射
conf.addSinkRunner(entry.getKey(), entry.getValue());
}
} catch (InstantiationException ex) {
LOGGER.error("Failed to instantiate component", ex);
} finally {
channelComponentMap.clear();
sourceRunnerMap.clear();
sinkRunnerMap.clear();
}
} else {
LOGGER.warn("No configuration found for this host:{}", getAgentName());
}
return conf;
}
1.1 根據(jù)配置文件創(chuàng)建Channel實例
有幾種默認的Channel類型定義代乃,當(dāng)type是已知的幾種時或者是自定義Channel時搁吓,都可以獲取到全限定類名吭历,根據(jù)類名晌区,通過反射,完成channel實例創(chuàng)建恼五。
//裝載Channel
private void loadChannels(AgentConfiguration agentConf,
Map<String, ChannelComponent> channelComponentMap)
throws InstantiationException {
Set<String> channelNames = agentConf.getChannelSet();
Map<String, ComponentConfiguration> compMap = agentConf.getChannelConfigMap();
for (String chName : channelNames) {
ComponentConfiguration comp = compMap.get(chName);
if (comp != null) {
//根據(jù)channel名稱和類型創(chuàng)建Channel實例
Channel channel = getOrCreateChannel(channelsNotReused,
comp.getComponentName(), comp.getType());
try {
Configurables.configure(channel, comp);
channelComponentMap.put(comp.getComponentName(),
new ChannelComponent(channel));
LOGGER.info("Created channel " + chName);
} catch (Exception e) {
String msg = String.format("Channel %s has been removed due to an " +
"error during configuration", chName);
LOGGER.error(msg, e);
}
}
}
}
private Channel getOrCreateChannel(
ListMultimap<Class<? extends Channel>, String> channelsNotReused,
String name, String type)
throws FlumeException {
//根據(jù)Channel的type獲取Class
Class<? extends Channel> channelClass = channelFactory.getClass(type);
if (channelClass.isAnnotationPresent(Disposable.class)) {
Channel channel = channelFactory.create(name, type);
channel.setName(name);
return channel;
}
Map<String, Channel> channelMap = channelCache.get(channelClass);
if (channelMap == null) {
channelMap = new HashMap<String, Channel>();
channelCache.put(channelClass, channelMap);
}
Channel channel = channelMap.get(name);
if (channel == null) {
//根據(jù)channel的name和type創(chuàng)建Channel對象
channel = channelFactory.create(name, type);
channel.setName(name);
channelMap.put(name, channel);
}
channelsNotReused.get(channelClass).remove(name);
return channel;
}
public class DefaultChannelFactory implements ChannelFactory {
@Override
public Channel create(String name, String type) throws FlumeException {
Preconditions.checkNotNull(name, "name");
Preconditions.checkNotNull(type, "type");
//根據(jù)type獲取Channel的class信息
Class<? extends Channel> channelClass = getClass(type);
try {
//創(chuàng)建Channel
return channelClass.newInstance();
} catch (Exception ex) {
throw new FlumeException("Unable to create channel: " + name
+ ", type: " + type + ", class: " + channelClass.getName(), ex);
}
}
@SuppressWarnings("unchecked")
@Override
public Class<? extends Channel> getClass(String type) throws FlumeException {
String channelClassName = type;
ChannelType channelType = ChannelType.OTHER;
try {
//全部轉(zhuǎn)緩存大寫
channelType = ChannelType.valueOf(type.toUpperCase(Locale.ENGLISH));
} catch (IllegalArgumentException ex) {
logger.debug("Channel type {} is a custom type", type);
}
if (!channelType.equals(ChannelType.OTHER)) {
//默認的幾種類型
channelClassName = channelType.getChannelClassName();
}
try {
//自定義type遣总,全路徑限制類
return (Class<? extends Channel>) Class.forName(channelClassName);
} catch (Exception ex) {
throw new FlumeException("Unable to load channel type: " + type
+ ", class: " + channelClassName, ex);
}
}
}
幾種默認的Channel類型,都匹配不上的容达,就是自定義Channel董饰。
public enum ChannelType {
OTHER(null),
FILE("org.apache.flume.channel.file.FileChannel"),
MEMORY("org.apache.flume.channel.MemoryChannel"),
JDBC("org.apache.flume.channel.jdbc.JdbcChannel"),
SPILLABLEMEMORY("org.apache.flume.channel.SpillableMemoryChannel");
private final String channelClassName;
private ChannelType(String channelClassName) {
this.channelClassName = channelClassName;
}
public String getChannelClassName() {
return channelClassName;
}
}
1.2 根據(jù)配置文件完成Source加載
source的實例化部分與channel的相同。完成source實例化后啄栓,還需要將Source和Channel進行關(guān)聯(lián)昙楚,以及設(shè)置
private void loadSources(AgentConfiguration agentConf,
Map<String, ChannelComponent> channelComponentMap,
Map<String, SourceRunner> sourceRunnerMap)
throws InstantiationException {
Set<String> sourceNames = agentConf.getSourceSet();
Map<String, ComponentConfiguration> compMap =
agentConf.getSourceConfigMap();
//遍歷source
for (String sourceName : sourceNames) {
ComponentConfiguration comp = compMap.get(sourceName);
if (comp != null) {
SourceConfiguration config = (SourceConfiguration) comp;
//與channel的實例化類似诈嘿,根據(jù)source的name和type奖亚,通過反射創(chuàng)建Source實例
Source source = sourceFactory.create(comp.getComponentName(),
comp.getType());
try {
//獲取該source關(guān)聯(lián)的channel信息
Configurables.configure(source, config);
Set<String> channelNames = config.getChannels();
List<Channel> sourceChannels = new ArrayList<Channel>();
for (String chName : channelNames) {
ChannelComponent channelComponent = channelComponentMap.get(chName);
if (channelComponent != null) {
sourceChannels.add(channelComponent.channel);
}
}
if (sourceChannels.isEmpty()) {
//不允許source沒有channel
String msg = String.format("Source %s is not connected to a " +
"channel", sourceName);
throw new IllegalStateException(msg);
}
ChannelSelectorConfiguration selectorConfig =
config.getSelectorConfiguration();
//實例化ChannelSelector昔字,默認是復(fù)制策略
ChannelSelector selector = ChannelSelectorFactory.create(
sourceChannels, selectorConfig);
//創(chuàng)建ChannelProcessor
ChannelProcessor channelProcessor = new ChannelProcessor(selector);
Configurables.configure(channelProcessor, config);
source.setChannelProcessor(channelProcessor);
sourceRunnerMap.put(comp.getComponentName(),
SourceRunner.forSource(source));//source和SourceRunner進行映射
for (Channel channel : sourceChannels) {
ChannelComponent channelComponent =
Preconditions.checkNotNull(channelComponentMap.get(channel.getName()),
String.format("Channel %s", channel.getName()));
channelComponent.components.add(sourceName);
}
} catch (Exception e) {
String msg = String.format("Source %s has been removed due to an " +
"error during configuration", sourceName);
LOGGER.error(msg, e);
}
}
}
}
有兩種提供的Source類型作郭,同理也有兩種提供的SourceRunner夹攒。
public abstract class SourceRunner implements LifecycleAware {
private Source source;
public static SourceRunner forSource(Source source) {
SourceRunner runner = null;
if (source instanceof PollableSource) {
runner = new PollableSourceRunner();
((PollableSourceRunner) runner).setSource((PollableSource) source);
} else if (source instanceof EventDrivenSource) {
runner = new EventDrivenSourceRunner();
((EventDrivenSourceRunner) runner).setSource((EventDrivenSource) source);
} else {
throw new IllegalArgumentException("No known runner type for source "
+ source);
}
return runner;
}
}
提供了兩種ChannelSelector類型,一種是REPLICATING(復(fù)制)压语,一種是MULTIPLEXING(多路分發(fā))
public enum ChannelSelectorType {
OTHER(null),
REPLICATING("org.apache.flume.channel.ReplicatingChannelSelector"),
MULTIPLEXING("org.apache.flume.channel.MultiplexingChannelSelector");
private final String channelSelectorClassName;
private ChannelSelectorType(String channelSelectorClassName) {
this.channelSelectorClassName = channelSelectorClassName;
}
public String getChannelSelectorClassName() {
return channelSelectorClassName;
}
}
1.3 根據(jù)配置文件加載sink
private void loadSinks(AgentConfiguration agentConf,
Map<String, ChannelComponent> channelComponentMap, Map<String, SinkRunner> sinkRunnerMap)
throws InstantiationException {
Set<String> sinkNames = agentConf.getSinkSet();
Map<String, ComponentConfiguration> compMap =
agentConf.getSinkConfigMap();
Map<String, Sink> sinks = new HashMap<String, Sink>();
//遍歷sink
for (String sinkName : sinkNames) {
ComponentConfiguration comp = compMap.get(sinkName);
if (comp != null) {
SinkConfiguration config = (SinkConfiguration) comp;
//實例化sink
Sink sink = sinkFactory.create(comp.getComponentName(), comp.getType());
try {
Configurables.configure(sink, config);
ChannelComponent channelComponent = channelComponentMap.get(config.getChannel());
if (channelComponent == null) {
String msg = String.format("Sink %s is not connected to a " +
"channel", sinkName);
throw new IllegalStateException(msg);
}
sink.setChannel(channelComponent.channel);
sinks.put(comp.getComponentName(), sink);
channelComponent.components.add(sinkName);
} catch (Exception e) {
String msg = String.format("Sink %s has been removed due to an " +
"error during configuration", sinkName);
LOGGER.error(msg, e);
}
}
}
loadSinkGroups(agentConf, sinks, sinkRunnerMap);
}
private void loadSinkGroups(AgentConfiguration agentConf,
Map<String, Sink> sinks, Map<String, SinkRunner> sinkRunnerMap)
throws InstantiationException {
Set<String> sinkGroupNames = agentConf.getSinkgroupSet();
Map<String, ComponentConfiguration> compMap =
agentConf.getSinkGroupConfigMap();
Map<String, String> usedSinks = new HashMap<String, String>();
for (String groupName: sinkGroupNames) {
ComponentConfiguration comp = compMap.get(groupName);
if (comp != null) {
SinkGroupConfiguration groupConf = (SinkGroupConfiguration) comp;
List<Sink> groupSinks = new ArrayList<Sink>();
for (String sink : groupConf.getSinks()) {
Sink s = sinks.remove(sink);
if (s == null) {
String sinkUser = usedSinks.get(sink);
if (sinkUser != null) {
throw new InstantiationException(String.format(
"Sink %s of group %s already " +
"in use by group %s", sink, groupName, sinkUser));
} else {
throw new InstantiationException(String.format(
"Sink %s of group %s does "
+ "not exist or is not properly configured", sink,
groupName));
}
}
groupSinks.add(s);
usedSinks.put(sink, groupName);
}
try {
SinkGroup group = new SinkGroup(groupSinks);
Configurables.configure(group, groupConf);
sinkRunnerMap.put(comp.getComponentName(),
new SinkRunner(group.getProcessor()));
} catch (Exception e) {
String msg = String.format("SinkGroup %s has been removed due to " +
"an error during configuration", groupName);
LOGGER.error(msg, e);
}
}
}
// add any unassigned sinks to solo collectors
for (Entry<String, Sink> entry : sinks.entrySet()) {
if (!usedSinks.containsValue(entry.getKey())) {
try {
//創(chuàng)建SinkProcessor
SinkProcessor pr = new DefaultSinkProcessor();
List<Sink> sinkMap = new ArrayList<Sink>();
sinkMap.add(entry.getValue());
pr.setSinks(sinkMap);
Configurables.configure(pr, new Context());
//完成Sink和Sink映射
sinkRunnerMap.put(entry.getKey(), new SinkRunner(pr));
} catch (Exception e) {
String msg = String.format("SinkGroup %s has been removed due to " +
"an error during configuration", entry.getKey());
LOGGER.error(msg, e);
}
}
}
}