弦外之音
很多朋友都在問我宠哄,經(jīng)常看各種框架的源碼會(huì)不會(huì)感到很枯燥禽最,是什么東西在驅(qū)動(dòng)著我一直看下去腺怯。其實(shí)我想說的很簡(jiǎn)單,作為一個(gè)程序員川无,不管你工作了多少年呛占,能夠經(jīng)常學(xué)習(xí)和借鑒國(guó)內(nèi)外優(yōu)秀框架設(shè)計(jì)思想和程序架構(gòu),我想對(duì)我們來說是最直接的提高懦趋。
終于開始Flume源碼的分析研究工作了晾虑,我也是邊學(xué)邊和大家分享,內(nèi)容上難免有不足之處,望大家見諒帜篇。
在 http://flume.apache.org 上下載flume-1.6.0版本糙捺,將源碼導(dǎo)入到Idea開發(fā)工具后如下圖所示:
一、主要模塊說明
flume-ng-channels
里面包含了filechannel笙隙,jdbcchannel洪灯,kafkachannel,memorychannel通道的實(shí)現(xiàn)。flume-ng-clients
實(shí)現(xiàn)了log4j相關(guān)的幾個(gè)Appender逃沿,使得log4j的日志輸出可以直接發(fā)送給flume-agent婴渡;其中有一個(gè)LoadBalancingLog4jAppender的實(shí)現(xiàn),提供了多個(gè)flume-agent的load balance和ha功能凯亮,采用flume作為日志收集的可以考慮將這個(gè)appender引入內(nèi)部的log4j中边臼。flume-ng-configuration
這個(gè)主要就是Flume配置信息相關(guān)的類,包括載入flume-config.properties配置文件并解析假消。其中包括了Source的配置柠并,Sink的配置,Channel的配置富拗,在閱讀源碼前推薦先梳理這部分關(guān)系再看其他部分的臼予。flume-ng-core
flume整個(gè)核心框架,包括了各個(gè)模塊的接口以及邏輯關(guān)系實(shí)現(xiàn)啃沪。其中instrumentation是flume內(nèi)部實(shí)現(xiàn)的一套metric機(jī)制粘拾,metric的變化和維護(hù),其核心也就是在MonitoredCounterGroup中通過一個(gè)Map<key, AtomicLong>來實(shí)現(xiàn)metric的計(jì)量创千。ng-core下幾乎大部分代碼任然幾種在channel缰雇、sink、source幾個(gè)子目錄下追驴,其他目錄基本完成一個(gè)util和輔助的功能械哟。flume-ng-node
實(shí)現(xiàn)啟動(dòng)flume的一些基本類,包括main函數(shù)的入口(Application.java中)殿雪。在理解configuration之后暇咆,從application的main函數(shù)入手,可以較快的了解整個(gè)flume的代碼丙曙。
二爸业、Flume邏輯結(jié)構(gòu)圖
三、flume-ng啟動(dòng)文件介紹
################################
# constants
################################
#設(shè)置常量值河泳,主要是針對(duì)不同的參數(shù)執(zhí)行相應(yīng)的類沃呢,以啟動(dòng)Flume環(huán)境
FLUME_AGENT_CLASS="org.apache.flume.node.Application"
FLUME_AVRO_CLIENT_CLASS="org.apache.flume.client.avro.AvroCLIClient"
FLUME_VERSION_CLASS="org.apache.flume.tools.VersionInfo"
FLUME_TOOLS_CLASS="org.apache.flume.tools.FlumeToolsMain"
#真正啟動(dòng)Flume環(huán)境的方法
run_flume() {
local FLUME_APPLICATION_CLASS
if [ "$#" -gt 0 ]; then
FLUME_APPLICATION_CLASS=$1
shift
else
error "Must specify flume application class" 1
fi
if [ ${CLEAN_FLAG} -ne 0 ]; then
set -x
fi
#執(zhí)行這一行命令,執(zhí)行相應(yīng)的啟動(dòng)類拆挥,比如org.apache.flume.node.Application
$EXEC $JAVA_HOME/bin/java $JAVA_OPTS $FLUME_JAVA_OPTS "${arr_java_props[@]}" -cp "$FLUME_CLASSPATH" \
-Djava.library.path=$FLUME_JAVA_LIBRARY_PATH "$FLUME_APPLICATION_CLASS" $*
}
################################
# main
################################
# set default params
# 在啟動(dòng)的過程中使用到的參數(shù)
FLUME_CLASSPATH=""
FLUME_JAVA_LIBRARY_PATH=""
#默認(rèn)占用堆空間大小薄霜,這一塊都可以根據(jù)JVM進(jìn)行重新設(shè)置
JAVA_OPTS="-Xmx20m"
LD_LIBRARY_PATH=""
opt_conf=""
opt_classpath=""
opt_plugins_dirs=""
arr_java_props=()
arr_java_props_ct=0
opt_dryrun=""
# 根據(jù)不同的參數(shù)某抓,執(zhí)行不同的啟動(dòng)類,每個(gè)常量所對(duì)應(yīng)的類路徑在代碼前面有過介紹惰瓜。
if [ -n "$opt_agent" ] ; then
run_flume $FLUME_AGENT_CLASS $args
elif [ -n "$opt_avro_client" ] ; then
run_flume $FLUME_AVRO_CLIENT_CLASS $args
elif [ -n "${opt_version}" ] ; then
run_flume $FLUME_VERSION_CLASS $args
elif [ -n "${opt_tool}" ] ; then
run_flume $FLUME_TOOLS_CLASS $args
else
error "This message should never appear" 1
fi
這是其中最主要的一部分flume-ng命令行否副,根據(jù)重要性摘取了一段,感興趣的讀者可以自己到bin目錄下查看全部崎坊。
四备禀、從Flume-NG啟動(dòng)過程開始說起
從bin/flume-ng這個(gè)shell腳本可以看到Flume的起始于org.apache.flume.node.Application類,這是flume的main函數(shù)所在奈揍。
main方法首先會(huì)先解析shell命令曲尸,如果指定的配置文件不存在就拋出異常。
代碼如下所示:
Options options = new Options();
Option option = new Option("n", "name", true, "the name of this agent");
option.setRequired(true);
options.addOption(option);
option = new Option("f", "conf-file", true,
"specify a config file (required if -z missing)");
option.setRequired(false);
options.addOption(option);
option = new Option(null, "no-reload-conf", false,
"do not reload config file if changed");
options.addOption(option);
// Options for Zookeeper
option = new Option("z", "zkConnString", true,
"specify the ZooKeeper connection to use (required if -f missing)");
option.setRequired(false);
options.addOption(option);
option = new Option("p", "zkBasePath", true,
"specify the base path in ZooKeeper for agent configs");
option.setRequired(false);
options.addOption(option);
option = new Option("h", "help", false, "display help text");
options.addOption(option);
#命令行解析類
CommandLineParser parser = new GnuParser();
CommandLine commandLine = parser.parse(options, args);
if (commandLine.hasOption('h')) {
new HelpFormatter().printHelp("flume-ng agent", options, true);
return;
}
String agentName = commandLine.getOptionValue('n');
boolean reload = !commandLine.hasOption("no-reload-conf");
if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) {
isZkConfigured = true;
}
以上代碼是Application類中校驗(yàn)shell命令行的代碼男翰,舉個(gè)例子在啟動(dòng)flume的時(shí)候另患,使用如下命令行:
./bin/flume-ng agent -n agent -c conf -f conf/hw.conf -Dflume.root.logger=INFO,console
里面的-n -f等參數(shù)都是在上面代碼中校驗(yàn)的。
再往下看main方法里的代碼:
File configurationFile = new File(commandLine.getOptionValue('f'));
/*
* The following is to ensure that by default the agent will fail on
* startup if the file does not exist.
*/
if (!configurationFile.exists()) {
// If command line invocation, then need to fail fast
if (System.getProperty(Constants.SYSPROP_CALLED_FROM_SERVICE) ==
null) {
String path = configurationFile.getPath();
try {
path = configurationFile.getCanonicalPath();
} catch (IOException ex) {
logger.error("Failed to read canonical path for file: " + path,
ex);
}
throw new ParseException(
"The specified configuration file does not exist: " + path);
}
}
List<LifecycleAware> components = Lists.newArrayList();
if (reload) {
EventBus eventBus = new EventBus(agentName + "-event-bus");
PollingPropertiesFileConfigurationProvider configurationProvider =
new PollingPropertiesFileConfigurationProvider(
agentName, configurationFile, eventBus, 30);
components.add(configurationProvider);
application = new Application(components);
eventBus.register(application);
} else {
PropertiesFileConfigurationProvider configurationProvider =
new PropertiesFileConfigurationProvider(
agentName, configurationFile);
application = new Application();
application.handleConfigurationEvent(configurationProvider
.getConfiguration());
}
}
application.start();
說明:
根據(jù)命令中含有"no-reload-conf"參數(shù)蛾绎,決定采用那種加載配置文件方式:
一昆箕、沒有此參數(shù),會(huì)動(dòng)態(tài)加載配置文件租冠,默認(rèn)每30秒加載一次配置文件鹏倘,因此可以動(dòng)態(tài)修改配置文件;
二顽爹、有此參數(shù)纤泵,則只在啟動(dòng)時(shí)加載一次配置文件。實(shí)現(xiàn)動(dòng)態(tài)加載功能采用了發(fā)布訂閱模式镜粤,使用guava中的EventBus實(shí)現(xiàn)夕吻。
三、PropertiesFileConfigurationProvider這個(gè)類是配置文件加載類
類圖如下:
從圖中可以看出在整個(gè)PollingPropertiesFileConfigurationProvider類中繁仁,它實(shí)現(xiàn)了LifecycleAware接口,而這個(gè)接口是掌管整個(gè)Flume生命周期的一個(gè)核心接口归园,LifecycleSupervisor實(shí)現(xiàn)了這個(gè)接口黄虱,通過上面代碼中application.start方法觸發(fā)LifecyleAware的start方法,下面是這個(gè)接口的方法定義及相關(guān)類代碼:
public interface LifecycleAware {
/**
* <p>
* Starts a service or component.
* </p>
* @throws LifecycleException
* @throws InterruptedException
*/
public void start();
/**
* <p>
* Stops a service or component.
* </p>
* @throws LifecycleException
* @throws InterruptedException
*/
public void stop();
/**
* <p>
* Return the current state of the service or component.
* </p>
*/
public LifecycleState getLifecycleState();
}
Application.start()方法內(nèi)容:
public synchronized void start() {
for(LifecycleAware component : components) {
supervisor.supervise(component,
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
}
}
LifecycleSupervisor.supervise方法內(nèi)容如下:
public synchronized void supervise(LifecycleAware lifecycleAware,
SupervisorPolicy policy, LifecycleState desiredState) {
if(this.monitorService.isShutdown()
|| this.monitorService.isTerminated()
|| this.monitorService.isTerminating()){
throw new FlumeException("Supervise called on " + lifecycleAware + " " +
"after shutdown has been initiated. " + lifecycleAware + " will not" +
" be started");
}
Preconditions.checkState(!supervisedProcesses.containsKey(lifecycleAware),
"Refusing to supervise " + lifecycleAware + " more than once");
if (logger.isDebugEnabled()) {
logger.debug("Supervising service:{} policy:{} desiredState:{}",
new Object[] { lifecycleAware, policy, desiredState });
}
Supervisoree process = new Supervisoree();
process.status = new Status();
process.policy = policy;
process.status.desiredState = desiredState;
process.status.error = false;
MonitorRunnable monitorRunnable = new MonitorRunnable();
monitorRunnable.lifecycleAware = lifecycleAware;
monitorRunnable.supervisoree = process;
monitorRunnable.monitorService = monitorService;
supervisedProcesses.put(lifecycleAware, process);
ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
monitorRunnable, 0, 3, TimeUnit.SECONDS);
monitorFutures.put(lifecycleAware, future);
}
在上面的代碼中庸诱,會(huì)創(chuàng)建MonitorRunnable對(duì)象捻浦,這個(gè)對(duì)象是個(gè)定時(shí)對(duì)象,里面的run方法主要是根據(jù)supervisoree.status.desiredState的值執(zhí)行對(duì)應(yīng)的操作桥爽。
包括:START朱灿,STOP等狀態(tài), 大家注意scheduleWithFixedDelay這個(gè)方法钠四,這是java線程池自帶的盗扒,要求每次任務(wù)執(zhí)行完以后再延遲3秒跪楞,而不是每隔3秒執(zhí)行一次,大家注意這一點(diǎn)侣灶。
又有同學(xué)會(huì)問循環(huán)調(diào)用會(huì)不會(huì)有問題甸祭,這里回應(yīng)大家其實(shí)也沒問題,這么做是為了重試機(jī)制褥影,看下面代碼:
if (!lifecycleAware.getLifecycleState().equals( supervisoree.status.desiredState))
在MonitorRunnable內(nèi)部有這樣一個(gè)判斷池户,當(dāng)getLifecycleState與supervisoree.status.desiredState狀態(tài)不相等的時(shí)候才會(huì)執(zhí)行,而ifecycleAware.getLifecycleState()初始狀態(tài)是IDLE凡怎。
時(shí)序調(diào)用圖如下所示
注:
PollingPropertiesFileConfigurationProvider.start()方法會(huì)啟動(dòng)一個(gè)單線程FileWatcherRunnable每隔30s去加載一次配置文件:
eventBus.post(getConfiguration())校焦。
getConfiguration()解析了配置文件并且獲取所有組件及配置屬性。
五统倒、配置文件加載詳細(xì)分析
先看一下FileWatcherRunnable內(nèi)部的代碼:
public MaterializedConfiguration getConfiguration() {
//初始化三大組件的配置Map寨典,source,channel檐薯,sink
MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
FlumeConfiguration fconfig = getFlumeConfiguration();
AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());
if (agentConf != null) {
Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
try {
loadChannels(agentConf, channelComponentMap);
loadSources(agentConf, channelComponentMap, sourceRunnerMap);
loadSinks(agentConf, channelComponentMap, sinkRunnerMap);
Set<String> channelNames =
new HashSet<String>(channelComponentMap.keySet());
for(String channelName : channelNames) {
ChannelComponent channelComponent = channelComponentMap.
get(channelName);
if(channelComponent.components.isEmpty()) {
LOGGER.warn(String.format("Channel %s has no components connected" +
" and has been removed.", channelName));
channelComponentMap.remove(channelName);
Map<String, Channel> nameChannelMap = channelCache.
get(channelComponent.channel.getClass());
if(nameChannelMap != null) {
nameChannelMap.remove(channelName);
}
} else {
LOGGER.info(String.format("Channel %s connected to %s",
channelName, channelComponent.components.toString()));
conf.addChannel(channelName, channelComponent.channel);
}
}
for(Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
conf.addSourceRunner(entry.getKey(), entry.getValue());
}
for(Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
conf.addSinkRunner(entry.getKey(), entry.getValue());
}
} catch (InstantiationException ex) {
LOGGER.error("Failed to instantiate component", ex);
} finally {
channelComponentMap.clear();
sourceRunnerMap.clear();
sinkRunnerMap.clear();
}
} else {
LOGGER.warn("No configuration found for this host:{}", getAgentName());
}
return conf;
}
說明:
一凝赛、在哪里加載的配置文件
其實(shí)是在這里,F(xiàn)lumeConfiguration fconfig = getFlumeConfiguration();
getFlumeConfiguration()這個(gè)方法是一個(gè)抽象方法坛缕,可以通過下圖的方式查找加載方式墓猎。
我們選擇PollingPropertiesFileConfigurationProvider這個(gè),可以看到:
@Override
public FlumeConfiguration getFlumeConfiguration() {
BufferedReader reader = null;
try {
reader = new BufferedReader(new FileReader(file));
Properties properties = new Properties();
properties.load(reader);
return new FlumeConfiguration(toMap(properties));
} catch (IOException ex) {
LOGGER.error("Unable to load file:" + file
+ " (I/O failure) - Exception follows.", ex);
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException ex) {
LOGGER.warn(
"Unable to close file reader for file: " + file, ex);
}
}
}
return new FlumeConfiguration(new HashMap<String, String>());
}
就是上面這個(gè)方法通過JAVA最基本的流的方式加載的配置文件赚楚,也就是圖上面我配置的flume的hw.conf配置文件毙沾。方法讀取配置文件,然后解析成name(輸姓名全稱宠页,即等號(hào)左側(cè)的全部)左胞、value(等號(hào)的右側(cè))對(duì),存入一個(gè)Map當(dāng)中举户,返回一個(gè)封裝了這個(gè)Map的FlumeConfiguration對(duì)象烤宙。
FlumeConfiguration類的構(gòu)造函數(shù)會(huì)遍歷這個(gè)Map的所有<name,value>對(duì),調(diào)用addRawProperty(String name, String value)處理<name,value>對(duì)俭嘁,addRawProperty方法會(huì)先做一些合法性檢查躺枕,啟動(dòng)Flume的時(shí)候會(huì)構(gòu)造一個(gè)AgentConfiguration對(duì)象aconf,然后agentConfigMap.put(agentName, aconf)供填,以后動(dòng)態(tài)加載配置文件時(shí)只需要AgentConfiguration aconf = agentConfigMap.get(agentName)就可以得到拐云,然后調(diào)用aconf.addProperty(configKey, value)處理。
二近她、我們重點(diǎn)看一下addProperty方法內(nèi)部的parseConfigKey方法叉瘩,這里會(huì)深入解析每一行配置文件內(nèi)容。
我們舉一個(gè)配置文件的例子:
agent.sources=s1
agent.channels=c1 c2
agent.sinks=k1 k2
agent.sources.s1.type=exec
agent.sources.s1.command=tail -F /Users/it-od-m-2687/Downloads/abc.log
agent.sources.s1.channels=c1
agent.channels.c1.type=memory
agent.channels.c1.capacity=10000
agent.channels.c1.transactionCapacity=100
agent.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
agent.sinks.k1.brokerList=127.0.0.1:9092
#agent.sinks.k1.custom.partition.key=kafkaPartition
agent.sinks.k1.topic=testKJ1
agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder
agent.sinks.k1.channel=c1
解析上面的文件就是使用下面parseConfigKey這個(gè)方法:
cnck = parseConfigKey(key, BasicConfigurationConstants.CONFIG_SINKGROUPS_PREFIX);
private ComponentNameAndConfigKey parseConfigKey(String key, String prefix) {
// key must start with prefix
if (!key.startsWith(prefix)) {
return null;
}
// key must have a component name part after the prefix of the format:
// <prefix><component-name>.<config-key>
int index = key.indexOf('.', prefix.length() + 1);
if (index == -1) {
return null;
}
String name = key.substring(prefix.length(), index);
String configKey = key.substring(prefix.length() + name.length() + 1);
// name and config key must be non-empty
if (name.length() == 0 || configKey.length() == 0) {
return null;
}
return new ComponentNameAndConfigKey(name, configKey);
}
1粘捎、我們用agent.sources.s1.command=s1來舉例:
變量prefix指的是:sink,source,channel等關(guān)鍵字薇缅。
如下面代碼:
public final class BasicConfigurationConstants {
public static final String CONFIG_SOURCES = "sources";
public static final String CONFIG_SOURCES_PREFIX = CONFIG_SOURCES + ".";
public static final String CONFIG_SOURCE_CHANNELSELECTOR_PREFIX = "selector.";
public static final String CONFIG_SINKS = "sinks";
public static final String CONFIG_SINKS_PREFIX = CONFIG_SINKS + ".";
public static final String CONFIG_SINK_PROCESSOR_PREFIX = "processor.";
public static final String CONFIG_SINKGROUPS = "sinkgroups";
public static final String CONFIG_SINKGROUPS_PREFIX = CONFIG_SINKGROUPS + ".";
public static final String CONFIG_CHANNEL = "channel";
public static final String CONFIG_CHANNELS = "channels";
public static final String CONFIG_CHANNELS_PREFIX = CONFIG_CHANNELS + ".";
public static final String CONFIG_CONFIG = "config";
public static final String CONFIG_TYPE = "type";
private BasicConfigurationConstants() {
// disable explicit object creation
}
2危彩、上面parseConfigKey方法,首先根據(jù)prefix判斷prefix的后面捅暴,有少多字符恬砂。比如:sources.s1.command,在sources后面s1.command一共有10個(gè)字符蓬痒。
3泻骤、解析出name變量,如s1梧奢,這個(gè)是自己定義的狱掂。
4、解析出configKey固定關(guān)鍵字亲轨,如command趋惨,這個(gè)是系統(tǒng)定義的。
5惦蚊、封裝new ComponentNameAndConfigKey(name, configKey)返回器虾。
6、將sources蹦锋、channel兆沙、sink配置信息,分別存放到sourceContextMap莉掂、channelConfigMap葛圃、sinkConfigMap三個(gè)HashMap,最后統(tǒng)一封裝到AgentConfiguration對(duì)象中憎妙,然后再把AgentConfiguration存放到agentConfigMap中库正,key是agentName。說了這么多相信很多同學(xué)都已經(jīng)暈了厘唾,agentConfigMap的結(jié)構(gòu)如下圖所示:
讀源碼是一個(gè)很痛苦的過程褥符,不僅要分析整體框架的架構(gòu),還要理解作者的用意和設(shè)計(jì)思想抚垃,但只要堅(jiān)持下來你會(huì)發(fā)現(xiàn)還是能學(xué)到很多東西的属瓣。