看看Curator
框架 為實(shí)現(xiàn)對 連接狀態(tài)ConnectionState
的監(jiān)聽割笙,都是怎么構(gòu)造框架的。后面我們也可以應(yīng)用到業(yè)務(wù)的各種監(jiān)聽中翠肘。
Curator2.13實(shí)現(xiàn)
接口 Listener
Listener
接口檐束,給用戶實(shí)現(xiàn)stateChange()
傳入新的狀態(tài),用戶實(shí)現(xiàn)對這新的狀態(tài)要做什么邏輯處理束倍。
public interface ConnectionStateListener
{
/**
* Called when there is a state change in the connection
* @param client the client
* @param newState the new state
*/
public void stateChanged(CuratorFramework client, ConnectionState newState);
}
接口 Listenable
/**
* Abstracts a listenable object
*/
public interface Listenable<T>
{
/**
* Add the given listener. The listener will be executed in the containing instance's thread.
*
* @param listener listener to add
*/
public void addListener(T listener);
/**
* Add the given listener. The listener will be executed using the given
* executor
*
* @param listener listener to add
* @param executor executor to run listener in
*/
public void addListener(T listener, Executor executor);
public void removeListener(T listener);
}
抽象類 ListenerContainer<T> implements Listenable<T>
/**
* Abstracts an object that has listeners 裝Listener的容器
* <T> Listener類型
*/
public class ListenerContainer<T> implements Listenable<T>
{
private final Map<T, ListenerEntry<T>> listeners = Maps.newConcurrentMap();
@Override
public void addListener(T listener)
{
addListener(listener, MoreExecutors.sameThreadExecutor());
}
@Override
public void addListener(T listener, Executor executor)
{
listeners.put(listener, new ListenerEntry<T>(listener, executor));
}
/**
* 對 Listener 列表的遍歷進(jìn)行封裝
* Utility - apply the given function to each listener.
* @param function function to call for each listener
*/
public void forEach(final Function<T, Void> function)
{
for ( final ListenerEntry<T> entry : listeners.values() )
{
entry.executor.execute
(
new Runnable()
{
@Override
public void run()
{
try
{
function.apply(entry.listener);
}
catch ( Throwable e )
{
ThreadUtils.checkInterrupted(e);
log.error(String.format("Listener (%s) threw an exception", entry.listener), e);
}
}
}
);
}
}
public void clear()
{
listeners.clear();
}
public int size()
{
return listeners.size();
}
}
ConnectionStateManager
// to manage connection state
public class ConnectionStateManager {
// 又是隊(duì)列被丧? 玩消息什么的都是用隊(duì)列。現(xiàn)在是存放 ConnectionState
BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);
// 持有 ListenerContainer
private final ListenerContainer<ConnectionStateListener> listeners = new ListenerContainer<ConnectionStateListener>();
/**
* Start the manager肌幽,起一個(gè)線程去執(zhí)行 processEvents()晚碾,要是這線程掛了怎么辦?異常怎么處理的喂急?框架怎么處理的格嘁。。
*/
public void start()
{
service.submit
(
new Callable<Object>()
{
@Override
public Object call() throws Exception
{
processEvents();
return null;
}
}
);
}
@Override
public void close()
{
if ( state.compareAndSet(State.STARTED, State.CLOSED) )
{
service.shutdownNow();
listeners.clear();
}
}
// 對不斷產(chǎn)生的 ConnectionState 進(jìn)行處理廊移,生產(chǎn)者糕簿?
private void processEvents(){
// 當(dāng) ConnectionStateManager 啟動(dòng)完成
while ( state.get() == State.STARTED )
{
// 不斷從隊(duì)列拿 Conection 狀態(tài)
final ConnectionState newState = eventQueue.take();
// 對每個(gè) 狀態(tài)監(jiān)聽接口 應(yīng)用 Function探入, 狀態(tài)監(jiān)聽接口作為 主語
// forEach 是 listeners封裝的 遍歷所有 listener 的方法而已。懂诗。蜂嗽。
listeners.forEach(
new Function<ConnectionStateListener, Void>() {
// ConnectionStateListener是我們自己要實(shí)現(xiàn)的接口,stateChanged是要實(shí)現(xiàn)的方法
@Override
public Void apply(ConnectionStateListener listener)
{
listener.stateChanged(client, newState);
return null;
}
}
);
/**
上面這段
如果沒有封裝 Listener 到 ListenerContainer 的話殃恒, 所有 Listener 就是個(gè) List列表植旧,就直接調(diào) Listener 的 stateChanged 方法了吧。
for Listener {
listener.stateChanged(client, newState);
}
因?yàn)?封裝 Listener 到 ListenerContainer了离唐, 上面的 forEach 方法內(nèi)部就可以有些內(nèi)部實(shí)現(xiàn)病附,比如 對每個(gè) Listener 都是用對應(yīng)的 executor 來執(zhí)行。
**/
}
}
// 上面的方法是處理 ConnectionState 的亥鬓,那 ConnectionState 是怎么傳進(jìn)來的呢完沪? 生產(chǎn)者?
/**
* Post a state change. If the manager is already in that state the change
* is ignored. Otherwise the change is queued for listeners.
*
* @param newConnectionState new state
* @return true if the state actually changed, false if it was already at that state
*/
public synchronized boolean addStateChange(ConnectionState newConnectionState)
{
// 先判斷 ConnectionStateManager 是否已經(jīng)啟動(dòng)好嵌戈, state 是內(nèi)部 Enum
if ( state.get() != State.STARTED )
{
return false;
}
ConnectionState previousState = currentConnectionState;
if ( previousState == newConnectionState )
{
return false;
}
ConnectionState localState = newConnectionState;
// !!!
notifyAll();
while ( !eventQueue.offer(state) )
{
eventQueue.poll();
log.warn("ConnectionStateManager queue full - dropping events to make room");
}
return true;
}
}
調(diào)用
啟動(dòng)
// 啟動(dòng) connectionStateManager覆积,不斷檢測 connectionState 變化
connectionStateManager.start(); // must be called before client.start()
// 來個(gè)匿名默認(rèn)的 ConnectionStateListener
final ConnectionStateListener listener = new ConnectionStateListener()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
if ( ConnectionState.CONNECTED == newState || ConnectionState.RECONNECTED == newState )
{
logAsErrorConnectionErrors.set(true);
}
}
};
this.getConnectionStateListenable().addListener(listener);
生產(chǎn) ConnectionState
,把zk
那里拿到的state
轉(zhuǎn)一下熟呛,然后addStateChange
void validateConnection(Watcher.Event.KeeperState state)
{
if ( state == Watcher.Event.KeeperState.Disconnected )
{
suspendConnection();
}
else if ( state == Watcher.Event.KeeperState.Expired )
{
connectionStateManager.addStateChange(ConnectionState.LOST);
}
else if ( state == Watcher.Event.KeeperState.SyncConnected )
{
connectionStateManager.addStateChange(ConnectionState.RECONNECTED);
}
else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly )
{
connectionStateManager.addStateChange(ConnectionState.READ_ONLY);
}
}
復(fù)用宽档?
還有其他各種Listener
,都可以放到 ListenerContainer
private final ListenerContainer<CuratorListener> listeners;
private final ListenerContainer<UnhandledErrorListener> unhandledErrorListeners;
/**
* Receives notifications about errors and background events
*/
public interface CuratorListener {
/**
* Called when a background task has completed or a watch has triggered
* @param event the event
* @throws Exception any errors
*/
public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception;
}
public interface UnhandledErrorListener
{
/**
* Called when an exception is caught in a background thread, handler, etc. Before this
* listener is called, the error will have been logged and a {@link ConnectionState#LOST}
* event will have been queued for any {@link ConnectionStateListener}s.
* @param message Source message
* @param e exception
*/
public void unhandledError(String message, Throwable e);
}
總結(jié)一下源碼技巧
-
ConnectionStateManager
就是個(gè) 生產(chǎn)者消費(fèi)者模式的代碼惰拱,特點(diǎn)就是:public addStateChange()
暴露給外部用戶生產(chǎn)ConnectionState
雌贱,通過隊(duì)列eventQueue
傳遞,private processEvents()
在內(nèi)部對ConnectionState
進(jìn)行消費(fèi)偿短。 - 直接
new
匿名類欣孤,對接口進(jìn)行默認(rèn)實(shí)現(xiàn)。 - 對
Listener
列表對象進(jìn)行Container
封裝昔逗,然后 封裝foreach
方法降传,傳入Function
接口 就是foreach
每個(gè)元素要執(zhí)行的業(yè)務(wù)邏輯,方法體就可以加一些其他福利勾怒。