如果還沒看過Flume-ng源碼解析之啟動流程宾毒,可以點擊 Flume-ng源碼解析之啟動流程 查看
1 接口介紹
組件的分析順序是按照上一篇中啟動順序來分析的驼修,首先是Channel,然后是Sink诈铛,最后是Source乙各,在開始看組件源碼之前我們先來看一下兩個重要的接口,一個是LifecycleAware 幢竹,另一個是NamedComponent
1.1 LifecycleAware
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface LifecycleAware {
public void start();
public void stop();
public LifecycleState getLifecycleState();
}
非常簡單就是三個方法耳峦,start()、stop()和getLifecycleState妨退,這個接口是flume好多類都要實現(xiàn)的接口妇萄,包括 Flume-ng源碼解析之啟動流程
所中提到PollingPropertiesFileConfigurationProvider()蜕企,只要涉及到生命周期的都會實現(xiàn)該接口,當(dāng)然組件們也是要實現(xiàn)的冠句!
1.2 NamedComponent
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface NamedComponent {
public void setName(String name);
public String getName();
}
這個沒什么好講的轻掩,就是用來設(shè)置名字的。
2 Channel
作為Flume三大核心組件之一的Channel懦底,我們有必要來看看它的構(gòu)成:
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Channel extends LifecycleAware, NamedComponent {
public void put(Event event) throws ChannelException;
public Event take() throws ChannelException;
public Transaction getTransaction();
}
那么從上面的接口中我們可以看到Channel的主要功能就是put()和take()唇牧,那么我們就來看一下它的具體實現(xiàn)。這里我們選擇MemoryChannel作為例子聚唐,但是MemoryChannel太長了丐重,我們就截取一小段來看看
public class MemoryChannel extends BasicChannelSemantics {
private static Logger LOGGER = LoggerFactory.getLogger(MemoryChannel.class);
private static final Integer defaultCapacity = Integer.valueOf(100);
private static final Integer defaultTransCapacity = Integer.valueOf(100);
public MemoryChannel() {
}
...
}
我們又看到它繼承了BasicChannelSemantics ,從名字我們可以看出它是一個基礎(chǔ)的Channel杆查,我們繼續(xù)看看看它的實現(xiàn)
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class BasicChannelSemantics extends AbstractChannel {
private ThreadLocal<BasicTransactionSemantics> currentTransaction
= new ThreadLocal<BasicTransactionSemantics>();
private boolean initialized = false;
protected void initialize() {}
protected abstract BasicTransactionSemantics createTransaction();
@Override
public void put(Event event) throws ChannelException {
BasicTransactionSemantics transaction = currentTransaction.get();
Preconditions.checkState(transaction != null,
"No transaction exists for this thread");
transaction.put(event);
}
@Override
public Event take() throws ChannelException {
BasicTransactionSemantics transaction = currentTransaction.get();
Preconditions.checkState(transaction != null,
"No transaction exists for this thread");
return transaction.take();
}
@Override
public Transaction getTransaction() {
if (!initialized) {
synchronized (this) {
if (!initialized) {
initialize();
initialized = true;
}
}
}
BasicTransactionSemantics transaction = currentTransaction.get();
if (transaction == null || transaction.getState().equals(
BasicTransactionSemantics.State.CLOSED)) {
transaction = createTransaction();
currentTransaction.set(transaction);
}
return transaction;
}
}
找了許久扮惦,終于發(fā)現(xiàn)了put()和take(),但是仔細一看亲桦,它們內(nèi)部調(diào)用的是BasicTransactionSemantics 的put()和take()崖蜜,有點失望,繼續(xù)來看看BasicTransactionSemantics
public abstract class BasicTransactionSemantics implements Transaction {
private State state;
private long initialThreadId;
protected void doBegin() throws InterruptedException {}
protected abstract void doPut(Event event) throws InterruptedException;
protected abstract Event doTake() throws InterruptedException;
protected abstract void doCommit() throws InterruptedException;
protected abstract void doRollback() throws InterruptedException;
protected void doClose() {}
protected BasicTransactionSemantics() {
state = State.NEW;
initialThreadId = Thread.currentThread().getId();
}
protected void put(Event event) {
Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
"put() called from different thread than getTransaction()!");
Preconditions.checkState(state.equals(State.OPEN),
"put() called when transaction is %s!", state);
Preconditions.checkArgument(event != null,
"put() called with null event!");
try {
doPut(event);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ChannelException(e.toString(), e);
}
}
protected Event take() {
Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
"take() called from different thread than getTransaction()!");
Preconditions.checkState(state.equals(State.OPEN),
"take() called when transaction is %s!", state);
try {
return doTake();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
}
protected State getState() {
return state;
}
...//我們這里只是討論put和take客峭,所以一些暫時不涉及的方法就被我干掉豫领,有興趣恩典朋友可以自行閱讀
protected static enum State {
NEW, OPEN, COMPLETED, CLOSED
}
}
又是一個抽象類,put()和take()內(nèi)部調(diào)用的還是抽象方法doPut()和doTake()舔琅,看到這里等恐,我相信沒有耐心的同學(xué)已經(jīng)崩潰了,但是就差最后一步了备蚓,既然是抽象類课蔬,那么最終Channel所使用的肯定是它的一個實現(xiàn)類,這時候我們可以回到一開始使用的MemoryChannel星著,到里面找找有沒有線索购笆,一看,MemoryChannel中就藏著個內(nèi)部類
private class MemoryTransaction extends BasicTransactionSemantics {
private LinkedBlockingDeque<Event> takeList;
private LinkedBlockingDeque<Event> putList;
private final ChannelCounter channelCounter;
private int putByteCounter = 0;
private int takeByteCounter = 0;
public MemoryTransaction(int transCapacity, ChannelCounter counter) {
putList = new LinkedBlockingDeque<Event>(transCapacity);
takeList = new LinkedBlockingDeque<Event>(transCapacity);
channelCounter = counter;
}
@Override
protected void doPut(Event event) throws InterruptedException {
channelCounter.incrementEventPutAttemptCount();
int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);
if (!putList.offer(event)) {
throw new ChannelException(
"Put queue for MemoryTransaction of capacity " +
putList.size() + " full, consider committing more frequently, " +
"increasing capacity or increasing thread count");
}
putByteCounter += eventByteSize;
}
@Override
protected Event doTake() throws InterruptedException {
channelCounter.incrementEventTakeAttemptCount();
if (takeList.remainingCapacity() == 0) {
throw new ChannelException("Take list for MemoryTransaction, capacity " +
takeList.size() + " full, consider committing more frequently, " +
"increasing capacity, or increasing thread count");
}
if (!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) {
return null;
}
Event event;
synchronized (queueLock) {
event = queue.poll();
}
Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " +
"signalling existence of entry");
takeList.put(event);
int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);
takeByteCounter += eventByteSize;
return event;
}
//...依然刪除暫時不需要的方法
}
在這個類中我們可以看到doPut()和doTake()的實現(xiàn)方法虚循,也明白MemoryChannel的put()和take()最終調(diào)用的是MemoryTransaction 的doPut()和doTake()。
有朋友看到這里以為這次解析就要結(jié)束了样傍,其實好戲還在后頭横缔,Channel中還有兩個重要的類ChannelProcessor和ChannelSelector,耐心地聽我慢慢道來衫哥。
3 ChannelProcessor
ChannelProcessor 的作用就是執(zhí)行put操作茎刚,將數(shù)據(jù)放到channel里面腥寇。每個ChannelProcessor實例都會配備一個ChannelSelector來決定event要put到那個channl當(dāng)中
public class ChannelProcessor implements Configurable {
private static final Logger LOG = LoggerFactory.getLogger(ChannelProcessor.class);
private final ChannelSelector selector;
private final InterceptorChain interceptorChain;
public ChannelProcessor(ChannelSelector selector) {
this.selector = selector;
this.interceptorChain = new InterceptorChain();
}
public void initialize() {
this.interceptorChain.initialize();
}
public void close() {
this.interceptorChain.close();
}
public void configure(Context context) {
this.configureInterceptors(context);
}
private void configureInterceptors(Context context) {
//配置攔截器
}
public ChannelSelector getSelector() {
return this.selector;
}
public void processEventBatch(List<Event> events) {
...
while(i$.hasNext()) {
Event optChannel = (Event)i$.next();
List tx = this.selector.getRequiredChannels(optChannel);
...//將event放到Required隊列
t1 = this.selector.getOptionalChannels(optChannel);
Object eventQueue;
...//將event放到Optional隊列
}
...//event的分配操作
}
public void processEvent(Event event) {
event = this.interceptorChain.intercept(event);
if(event != null) {
List requiredChannels = this.selector.getRequiredChannels(event);
Iterator optionalChannels = requiredChannels.iterator();
...//event的分配操作
List optionalChannels1 = this.selector.getOptionalChannels(event);
Iterator i$1 = optionalChannels1.iterator();
...//event的分配操作
}
}
}
為了簡化代碼间影,我進行了一些刪除,只保留需要講解的部分艘绍,說白了Channel中的兩個寫入方法,都是需要從作為參數(shù)傳入的selector中獲取對應(yīng)的channel來執(zhí)行event的put操作初狰。接下來我們來看看ChannelSelector
4 ChannelSelector
ChannelSelector是一個接口莫杈,我們可以通過ChannelSelectorFactory來創(chuàng)建它的子類,F(xiàn)lume提供了兩個實現(xiàn)類MultiplexingChannelSelector和ReplicatingChannelSelector奢入。
public interface ChannelSelector extends NamedComponent, Configurable {
void setChannels(List<Channel> var1);
List<Channel> getRequiredChannels(Event var1);
List<Channel> getOptionalChannels(Event var1);
List<Channel> getAllChannels();
}
通過ChannelSelectorFactory 的create來創(chuàng)建筝闹,create中調(diào)用getSelectorForType來獲得一個selector,通過配置文件中的type來創(chuàng)建相應(yīng)的子類
public class ChannelSelectorFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(
ChannelSelectorFactory.class);
public static ChannelSelector create(List<Channel> channels,
Map<String, String> config) {
...
}
public static ChannelSelector create(List<Channel> channels,
ChannelSelectorConfiguration conf) {
String type = ChannelSelectorType.REPLICATING.toString();
if (conf != null) {
type = conf.getType();
}
ChannelSelector selector = getSelectorForType(type);
selector.setChannels(channels);
Configurables.configure(selector, conf);
return selector;
}
private static ChannelSelector getSelectorForType(String type) {
if (type == null || type.trim().length() == 0) {
return new ReplicatingChannelSelector();
}
String selectorClassName = type;
ChannelSelectorType selectorType = ChannelSelectorType.OTHER;
try {
selectorType = ChannelSelectorType.valueOf(type.toUpperCase(Locale.ENGLISH));
} catch (IllegalArgumentException ex) {
LOGGER.debug("Selector type {} is a custom type", type);
}
if (!selectorType.equals(ChannelSelectorType.OTHER)) {
selectorClassName = selectorType.getChannelSelectorClassName();
}
ChannelSelector selector = null;
try {
@SuppressWarnings("unchecked")
Class<? extends ChannelSelector> selectorClass =
(Class<? extends ChannelSelector>) Class.forName(selectorClassName);
selector = selectorClass.newInstance();
} catch (Exception ex) {
throw new FlumeException("Unable to load selector type: " + type
+ ", class: " + selectorClassName, ex);
}
return selector;
}
}
對于這兩種Selector簡單說一下:
1)MultiplexingChannelSelector
下面是一個channel selector 配置文件
agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.optional.CA = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1
MultiplexingChannelSelector類中定義了三個屬性腥光,用于存儲不同類型的channel
private Map<String, List<Channel>> channelMapping;
private Map<String, List<Channel>> optionalChannels;
private List<Channel> defaultChannels;
那么具體分配原則如下:
- 如果設(shè)置了maping关顷,那么會event肯定會給指定的channel,如果同時設(shè)置了optional武福,也會發(fā)送給optionalchannel
- 如果沒有設(shè)置maping议双,設(shè)置default,那么event會發(fā)送給defaultchannel捉片,如果還同時設(shè)置了optional聋伦,那么也會發(fā)送給optionalchannel
- 如果maping和default都沒指定,如果有指定option界睁,那么會發(fā)送給optionalchannel觉增,但是發(fā)送給optionalchannel不會進行失敗重試
2)ReplicatingChannelSelector
分配原則比較簡單
- 如果是replicating的話,那么如果沒有指定optional翻斟,那么全部channel都有逾礁,如果某個channel指定為option的話,那么就要從requiredChannel移除访惜,只發(fā)送給optionalchannel
5 總結(jié):
作為一個承上啟下的組件嘹履,Channel的作用就是將source來的數(shù)據(jù)通過自己流向sink,那么ChannelProcessor就起到將event put到分配好的channel中债热,而分配的規(guī)則是由selector決定的砾嫉,flume提供的selector有multiplexing和replicating兩種。所以ChannelProcessor一般都是在Source中被調(diào)用窒篱。那么Channel的take()肯定是在Sink中調(diào)用的焕刮。