源將事件寫到一個多或者多個通道中历筝。
接收器只從一個通道接收事件掖蛤。
代理可能會有多個源毡鉴、通道與接收器石挂。
flume-ng-channels
里面包含了filechannel博助,jdbcchannel,kafkachannel,memorychannel通道的實現(xiàn)痹愚。
flume-ng-clients
實現(xiàn)了log4j相關(guān)的幾個Appender富岳,使得log4j的日志輸出可以直接發(fā)送給flume-agent;其中有一個LoadBalancingLog4jAppender的實現(xiàn)拯腮,提供了多個flume-agent的load balance和ha功能城瞎,采用flume作為日志收集的可以考慮將這個appender引入內(nèi)部的log4j中。
flume-ng-configuration
這個主要就是Flume配置信息相關(guān)的類疾瓮,包括載入flume-config.properties配置文件并解析脖镀。其中包括了Source的配置,Sink的配置狼电,Channel的配置蜒灰,在閱讀源碼前推薦先梳理這部分關(guān)系再看其他部分的。
flume-ng-core
flume整個核心框架肩碟,包括了各個模塊的接口以及邏輯關(guān)系實現(xiàn)强窖。其中instrumentation是flume內(nèi)部實現(xiàn)的一套metric機(jī)制,metric的變化和維護(hù)削祈,其核心也就是在MonitoredCounterGroup中通過一個Map來實現(xiàn)metric的計量翅溺。ng-core下幾乎大部分代碼任然幾種在channel脑漫、sink、source幾個子目錄下咙崎,其他目錄基本完成一個util和輔助的功能优幸。
flume-ng-node
實現(xiàn)啟動flume的一些基本類,包括main函數(shù)的入口(Application.java中)褪猛。在理解configuration之后网杆,從application的main函數(shù)入手,可以較快的了解整個flume的代碼伊滋。
flume-ng啟動文件介紹
################################
# constants
################################
#設(shè)置常量值碳却,主要是針對不同的參數(shù)執(zhí)行相應(yīng)的類,以啟動Flume環(huán)境
FLUME_AGENT_CLASS="org.apache.flume.node.Application"
FLUME_AVRO_CLIENT_CLASS="org.apache.flume.client.avro.AvroCLIClient"
FLUME_VERSION_CLASS="org.apache.flume.tools.VersionInfo"
FLUME_TOOLS_CLASS="org.apache.flume.tools.FlumeToolsMain"
#真正啟動Flume環(huán)境的方法
run_flume() {
local FLUME_APPLICATION_CLASS
if [ "$#" -gt 0 ]; then
FLUME_APPLICATION_CLASS=$1
shift
else
error "Must specify flume application class" 1
fi
if [ ${CLEAN_FLAG} -ne 0 ]; then
set -x
fi
#執(zhí)行這一行命令笑旺,執(zhí)行相應(yīng)的啟動類昼浦,比如org.apache.flume.node.Application
$EXEC $JAVA_HOME/bin/java $JAVA_OPTS $FLUME_JAVA_OPTS "${arr_java_props[@]}" -cp "$FLUME_CLASSPATH" \
-Djava.library.path=$FLUME_JAVA_LIBRARY_PATH "$FLUME_APPLICATION_CLASS" $*
}
################################
# main
################################
# set default params
# 在啟動的過程中使用到的參數(shù)
FLUME_CLASSPATH=""
FLUME_JAVA_LIBRARY_PATH=""
#默認(rèn)占用堆空間大小,這一塊都可以根據(jù)JVM進(jìn)行重新設(shè)置
JAVA_OPTS="-Xmx20m"
LD_LIBRARY_PATH=""
opt_conf=""
opt_classpath=""
opt_plugins_dirs=""
arr_java_props=()
arr_java_props_ct=0
opt_dryrun=""
# 根據(jù)不同的參數(shù)筒主,執(zhí)行不同的啟動類关噪,每個常量所對應(yīng)的類路徑在代碼前面有過介紹。
if [ -n "$opt_agent" ] ; then
run_flume $FLUME_AGENT_CLASS $args
elif [ -n "$opt_avro_client" ] ; then
run_flume $FLUME_AVRO_CLIENT_CLASS $args
elif [ -n "${opt_version}" ] ; then
run_flume $FLUME_VERSION_CLASS $args
elif [ -n "${opt_tool}" ] ; then
run_flume $FLUME_TOOLS_CLASS $args
else
error "This message should never appear" 1
fi
主要是run-flume語句以及指定的啟動類
從bin/flume-ng這個shell腳本可以看到Flume的起始于org.apache.flume.node.Application類物舒,這是flume的main函數(shù)所在色洞。main方法首先會先解析shell命令戏锹,如果指定的配置文件不存在就拋出異常冠胯。
main方法首先校驗shell命令行的代碼,解析
./bin/flume-ng agent -n agent -c conf -f conf/hw.conf -Dflume.root.logger=INFO,console
語句锦针,然后加載配置文件荠察。
根據(jù)命令中含有"no-reload-conf"參數(shù),決定采用那種加載配置文件方式:
一奈搜、沒有此參數(shù)悉盆,會動態(tài)加載配置文件,默認(rèn)每30秒加載一次配置文件馋吗,因此可以動態(tài)修改配置文件焕盟;
二、有此參數(shù)宏粤,則只在啟動時加載一次配置文件脚翘。實現(xiàn)動態(tài)加載功能采用了發(fā)布訂閱模式,使用guava中的EventBus實現(xiàn)绍哎。
三来农、PropertiesFileConfigurationProvider這個類是配置文件加載類,PollingPropertiesFileConfigurationProvider類中崇堰,它實現(xiàn)了LifecycleAware接口沃于,而這個接口是掌管整個Flume生命周期的一個核心接口涩咖,LifecycleSupervisor實現(xiàn)了這個接口,通過上面代碼中application.start方法觸發(fā)LifecyleAware的start方法繁莹,LifecycleAware接口定義了start和stop方法檩互。
時序調(diào)用圖:
PollingPropertiesFileConfigurationProvider.start()方法會啟動一個單線程FileWatcherRunnable每隔30s去加載一次配置文件。
如何解析文件蒋困?
1盾似、用agent.sources.s1.command=s1來舉例:變量prefix指的是:sink,source,channel等關(guān)鍵字。
2雪标、parseConfigKey方法零院,首先根據(jù)prefix判斷prefix的后面,有少多字符村刨。比如:sources.s1.command告抄,在sources后面s1.command一共有10個字符。
3嵌牺、解析出name變量打洼,如s1,這個是自己定義的逆粹。
4募疮、解析出configKey固定關(guān)鍵字,如command僻弹,這個是系統(tǒng)定義的阿浓。
5、封裝new ComponentNameAndConfigKey(name, configKey)返回蹋绽。
6芭毙、將sources、channel卸耘、sink配置信息退敦,分別存放到sourceContextMap、channelConfigMap蚣抗、sinkConfigMap三個HashMap侈百,最后統(tǒng)一封裝到AgentConfiguration對象中,然后再把AgentConfiguration存放到agentConfigMap中翰铡,key是agentName钝域。
source - channelProcessor - channel - sink -sinkgroup
Source {
ChannelProcessor {
Channel ch1
Channel ch2
…
}
}
Sink {
Channel ch;
}
SinkGroup {
Channel ch;
Sink s1两蟀;
Sink s2网梢;
…
}
1、Source組件
Source是數(shù)據(jù)源的總稱赂毯,設(shè)定好源后战虏,數(shù)據(jù)將源源不斷的被抓取或者被推送拣宰。常見的數(shù)據(jù)源有:ExecSource,KafkaSource烦感,HttpSource巡社,NetcatSource,JmsSource手趣,AvroSource等等晌该。所有的數(shù)據(jù)源統(tǒng)一實現(xiàn)一個接口類如下:
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Source extends LifecycleAware, NamedComponent {
/**
* Specifies which channel processor will handle this source's events.
*
* @param channelProcessor
*/
public void setChannelProcessor(ChannelProcessor channelProcessor);
/**
* Returns the channel processor that will handle this source's events.
*/
public ChannelProcessor getChannelProcessor();
}
Source提供了兩種機(jī)制: PollableSource(輪詢拉取)和EventDrivenSource(事件驅(qū)動):
上圖展示的Source繼承關(guān)系類圖绿渣。通過類圖我們可以看到NetcatSource朝群,ExecSource和HttpSource屬于事件驅(qū)動模型。KafkaSource中符,SequenceGeneratorSource和JmsSource屬于輪詢拉取模型姜胖。Source接口繼承了LifecycleAware接口,它的的所有邏輯的實現(xiàn)在接口的start和stop方法中進(jìn)行淀散。下圖是類關(guān)系方法圖:
Source接口定義的是最終的實現(xiàn)過程右莱,如抓取日志,這個抓取的過程和實際操作就是在對應(yīng)的Source實現(xiàn)中档插,比如:ExecSource慢蜓。那么這些Source實現(xiàn)由誰來驅(qū)動的呢?現(xiàn)在我們將介紹SourceRunner類郭膛〕柯眨看一下類繼承結(jié)構(gòu)圖:
我們看一下PollableSourceRunner和EventDrivenSourceRunner的具體實現(xiàn):
@Override //PollableSourceRunner:
public void start() {
PollableSource source = (PollableSource) getSource();
ChannelProcessor cp = source.getChannelProcessor();
cp.initialize();
source.start();
runner = new PollingRunner();
runner.source = source;
runner.counterGroup = counterGroup;
runner.shouldStop = shouldStop;
runnerThread = new Thread(runner);
runnerThread.setName(getClass().getSimpleName() + "-" +
source.getClass().getSimpleName() + "-" + source.getName());
runnerThread.start();
lifecycleState = LifecycleState.START;
}
@Override //EventDrivenSourceRunner
public void start() {
Source source = getSource();
ChannelProcessor cp = source.getChannelProcessor();
cp.initialize();
source.start();
lifecycleState = LifecycleState.START;
}
注:其實所有的Source實現(xiàn)類內(nèi)部都維護(hù)著線程,執(zhí)行source.start()其實就是啟動了相應(yīng)的線程饲鄙。
2凄诞、Channel組件
Channel用于連接Source和Sink圆雁,Source將日志信息發(fā)送到Channel忍级,Sink從Channel消費日志信息;Channel是中轉(zhuǎn)日志信息的一個臨時存儲伪朽,保存有Source組件傳遞過來的日志信息轴咱。先看代碼如下:AbstractConfigurationProvider的loadSources()
ChannelSelectorConfiguration selectorConfig =
config.getSelectorConfiguration();
ChannelSelector selector = ChannelSelectorFactory.create(
sourceChannels, selectorConfig);
ChannelProcessor channelProcessor = new ChannelProcessor(selector);
Configurables.configure(channelProcessor, config);
source.setChannelProcessor(channelProcessor);
ChannelSelectorFactory.create方法實現(xiàn)如下:
public static ChannelSelector create(List<Channel> channels,
ChannelSelectorConfiguration conf) {
String type = ChannelSelectorType.REPLICATING.toString();
if (conf != null) {
type = conf.getType();
}
ChannelSelector selector = getSelectorForType(type);
selector.setChannels(channels);
Configurables.configure(selector, conf);
return selector;
}
其中我們看一下ChannelSelectorType這個枚舉類,包括了幾種類型:
public enum ChannelSelectorType {
/**
* Place holder for custom channel selectors not part of this enumeration.
*/
OTHER(null),
/**
* Replicating channel selector.
*/
REPLICATING("org.apache.flume.channel.ReplicatingChannelSelector"),
/**
* Multiplexing channel selector.
*/
MULTIPLEXING("org.apache.flume.channel.MultiplexingChannelSelector");
ChannelSelector的類結(jié)構(gòu)圖如下所示:
注:RelicatingChannelSelector和MultiplexingChannelSelector是二個通道選擇器烈涮,第一個是復(fù)用型通道選擇器朴肺,也就是的默認(rèn)的方式,會把接收到的消息發(fā)送給其他每個channel坚洽。第二個是多路通道選擇器戈稿,這個會根據(jù)消息header中的參數(shù)進(jìn)行通道選擇。
Channel是什么讶舰?:
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Channel extends LifecycleAware, NamedComponent {
/**
* <p>Puts the given event into the channel.</p>
* <p><strong>Note</strong>: This method must be invoked within an active
* {@link Transaction} boundary. Failure to do so can lead to unpredictable
* results.</p>
* @param event the event to transport.
* @throws ChannelException in case this operation fails.
* @see org.apache.flume.Transaction#begin()
*/
public void put(Event event) throws ChannelException;
/**
* <p>Returns the next event from the channel if available. If the channel
* does not have any events available, this method must return {@code null}.
* </p>
* <p><strong>Note</strong>: This method must be invoked within an active
* {@link Transaction} boundary. Failure to do so can lead to unpredictable
* results.</p>
* @return the next available event or {@code null} if no events are
* available.
* @throws ChannelException in case this operation fails.
* @see org.apache.flume.Transaction#begin()
*/
public Event take() throws ChannelException;
/**
* @return the transaction instance associated with this channel.
*/
public Transaction getTransaction();
}
注:put方法是用來發(fā)送消息鞍盗,take方法是獲取消息需了,transaction是用于事務(wù)操作。類結(jié)構(gòu)圖如下:
3般甲、Sink組件
Sink負(fù)責(zé)取出Channel中的消息數(shù)據(jù)肋乍,進(jìn)行相應(yīng)的存儲文件系統(tǒng),數(shù)據(jù)庫敷存,或者提交到遠(yuǎn)程服務(wù)器墓造。Sink在設(shè)置存儲數(shù)據(jù)時,可以向文件系統(tǒng)中锚烦,數(shù)據(jù)庫中觅闽,hadoop中儲數(shù)據(jù),在日志數(shù)據(jù)較少時涮俄,可以將數(shù)據(jù)存儲在文件系中谱煤,并且設(shè)定一定的時間間隔保存數(shù)據(jù)。在日志數(shù)據(jù)較多時禽拔,可以將相應(yīng)的日志數(shù)據(jù)存儲到Hadoop中刘离,便于日后進(jìn)行相應(yīng)的數(shù)據(jù)分析。
Sink接口類內(nèi)容如下:
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Sink extends LifecycleAware, NamedComponent {
/**
* <p>Sets the channel the sink will consume from</p>
* @param channel The channel to be polled
*/
public void setChannel(Channel channel);
/**
* @return the channel associated with this sink
*/
public Channel getChannel();
/**
* <p>Requests the sink to attempt to consume data from attached channel</p>
* <p><strong>Note</strong>: This method should be consuming from the channel
* within the bounds of a Transaction. On successful delivery, the transaction
* should be committed, and on failure it should be rolled back.
* @return READY if 1 or more Events were successfully delivered, BACKOFF if
* no data could be retrieved from the channel feeding this sink
* @throws EventDeliveryException In case of any kind of failure to
* deliver data to the next hop destination.
*/
public Status process() throws EventDeliveryException;
public static enum Status {
READY, BACKOFF
}
}
Sink是通過如下代碼進(jìn)行的創(chuàng)建:
Sink sink = sinkFactory.create(comp.getComponentName(), comp.getType());
DefaultSinkFactory.create方法如下:
@Override
public Sink create(String name, String type) throws FlumeException {
Preconditions.checkNotNull(name, "name");
Preconditions.checkNotNull(type, "type");
logger.info("Creating instance of sink: {}, type: {}", name, type);
Class<? extends Sink> sinkClass = getClass(type);
try {
Sink sink = sinkClass.newInstance();
sink.setName(name);
return sink;
} catch (Exception ex) {
throw new FlumeException("Unable to create sink: " + name
+ ", type: " + type + ", class: " + sinkClass.getName(), ex);
}
}
**注:Sink是通過SinkFactory工廠來創(chuàng)建睹栖,提供了DefaultSinkFactory默認(rèn)工廠硫惕,程序會查找org.apache.flume.conf.sink.SinkType這個枚舉類找到相應(yīng)的Sink處理類,比如:org.apache.flume.sink.LoggerSink野来,如果沒找到對應(yīng)的處理類恼除,直接通過Class.forName(className)進(jìn)行直接查找實例化實現(xiàn)類。 **
Sink的類結(jié)構(gòu)圖如下:
與ChannelProcessor處理類對應(yīng)的是SinkProcessor曼氛,由SinkProcessorFactory工廠類負(fù)責(zé)創(chuàng)建豁辉,SinkProcessor的類型由一個枚舉類提供,看下面代碼:
public enum SinkProcessorType {
/**
* Place holder for custom sinks not part of this enumeration.
*/
OTHER(null),
/**
* Failover processor
*
* @see org.apache.flume.sink.FailoverSinkProcessor
*/
FAILOVER("org.apache.flume.sink.FailoverSinkProcessor"),
/**
* Standard processor
*
* @see org.apache.flume.sink.DefaultSinkProcessor
*/
DEFAULT("org.apache.flume.sink.DefaultSinkProcessor"),
/**
* Load balancing processor
*
* @see org.apache.flume.sink.LoadBalancingSinkProcessor
*/
LOAD_BALANCE("org.apache.flume.sink.LoadBalancingSinkProcessor");
SinkProcessor的類結(jié)構(gòu)圖如下:
說明:1舀患、FailoverSinkProcessor是故障轉(zhuǎn)移處理器徽级,當(dāng)sink從通道拿數(shù)據(jù)信息時出錯進(jìn)行的相關(guān)處理,代碼如下:
@Override
public Status process() throws EventDeliveryException {
// Retry any failed sinks that have gone through their "cooldown" period
Long now = System.currentTimeMillis();
while (!failedSinks.isEmpty() && failedSinks.peek().getRefresh() < now) {
FailedSink cur = failedSinks.poll();
Status s;
try {
s = cur.getSink().process();
if (s == Status.READY) {
liveSinks.put(cur.getPriority(), cur.getSink());
activeSink = liveSinks.get(liveSinks.lastKey());
logger.debug("Sink {} was recovered from the fail list",
cur.getSink().getName());
} else {
// if it's a backoff it needn't be penalized.
failedSinks.add(cur);
}
return s;
} catch (Exception e) {
cur.incFails();
failedSinks.add(cur);
}
}
Status ret = null;
while (activeSink != null) {
try {
ret = activeSink.process();
return ret;
} catch (Exception e) {
logger.warn("Sink {} failed and has been sent to failover list",
activeSink.getName(), e);
activeSink = moveActiveToDeadAndGetNext();
}
}
throw new EventDeliveryException("All sinks failed to process, " +
"nothing left to failover to");
}
2聊浅、LoadBalancingSinkProcessor是負(fù)載Sink處理器首先我們和ChannelProcessor一樣餐抢,我們也要重點說明一下SinkSelector這個選擇器。先看一下LoadBalancingSinkProcessor extends AbstractSinkProcessor的configure方法的部分代碼:
public interface SinkSelector extends Configurable, LifecycleAware {
void setSinks(List<Sink> sinks);
Iterator<Sink> createSinkIterator();
void informSinkFailed(Sink failedSink);
}
if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_ROUND_ROBIN)) {
selector = new RoundRobinSinkSelector(shouldBackOff);
} else if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_RANDOM)) {
selector = new RandomOrderSinkSelector(shouldBackOff);
} else {
try {
@SuppressWarnings("unchecked")
Class<? extends SinkSelector> klass = (Class<? extends SinkSelector>)
Class.forName(selectorTypeName);
selector = klass.newInstance();
} catch (Exception ex) {
throw new FlumeException("Unable to instantiate sink selector: "
+ selectorTypeName, ex);
}
}
結(jié)合上面的代碼低匙,再看類結(jié)構(gòu)圖如下:
注:RoundRobinSinkSelector是輪詢選擇器旷痕,RandomOrderSinkSelector是隨機(jī)分配選擇器。
以KafkaSink為例看一下Sink里面的具體實現(xiàn):
public Status process() throws EventDeliveryException {
Status result = Status.READY;
Channel channel = getChannel();
Transaction transaction = null;
Event event = null;
String eventTopic = null;
String eventKey = null;
try {
long processedEvents = 0;
transaction = channel.getTransaction();
transaction.begin();
messageList.clear();
for (; processedEvents < batchSize; processedEvents += 1) {
event = channel.take();
if (event == null) {
// no events available in channel
break;
}
byte[] eventBody = event.getBody();
Map<String, String> headers = event.getHeaders();
if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
eventTopic = topic;
}
eventKey = headers.get(KEY_HDR);
if (logger.isDebugEnabled()) {
logger.debug("{Event} " + eventTopic + " : " + eventKey + " : "
+ new String(eventBody, "UTF-8"));
logger.debug("event #{}", processedEvents);
}
// create a message and add to buffer
KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>
(eventTopic, eventKey, eventBody);
messageList.add(data);
}
// publish batch and commit.
if (processedEvents > 0) {
long startTime = System.nanoTime();
producer.send(messageList);
long endTime = System.nanoTime();
counter.addToKafkaEventSendTimer((endTime-startTime)/(1000*1000));
counter.addToEventDrainSuccessCount(Long.valueOf(messageList.size()));
}
transaction.commit();
} catch (Exception ex) {
String errorMsg = "Failed to publish events";
logger.error("Failed to publish events", ex);
result = Status.BACKOFF;
if (transaction != null) {
try {
transaction.rollback();
counter.incrementRollbackCount();
} catch (Exception e) {
logger.error("Transaction rollback failed", e);
throw Throwables.propagate(e);
}
}
throw new EventDeliveryException(errorMsg, ex);
} finally {
if (transaction != null) {
transaction.close();
}
}
return result;
}
注:方法從channel中不斷的獲取數(shù)據(jù)顽冶,然后通過Kafka的producer生產(chǎn)者將消息發(fā)送到Kafka里面