Curator源碼閱讀 - ConnectionState的管理與監(jiān)聽

看看Curator框架 為實(shí)現(xiàn)對 連接狀態(tài)ConnectionState的監(jiān)聽割笙,都是怎么構(gòu)造框架的。后面我們也可以應(yīng)用到業(yè)務(wù)的各種監(jiān)聽中翠肘。

Curator2.13實(shí)現(xiàn)

接口 Listener

Listener接口檐束,給用戶實(shí)現(xiàn)stateChange()傳入新的狀態(tài),用戶實(shí)現(xiàn)對這新的狀態(tài)要做什么邏輯處理束倍。

public interface ConnectionStateListener
{
    /**
     * Called when there is a state change in the connection
     * @param client the client
     * @param newState the new state
     */
    public void stateChanged(CuratorFramework client, ConnectionState newState);
}

接口 Listenable

/**
 * Abstracts a listenable object
 */
public interface Listenable<T>
{
    /**
     * Add the given listener. The listener will be executed in the containing instance's thread.
     *
     * @param listener listener to add
     */
    public void     addListener(T listener);

    /**
     * Add the given listener. The listener will be executed using the given
     * executor
     *
     * @param listener listener to add
     * @param executor executor to run listener in
     */
    public void     addListener(T listener, Executor executor);

    public void     removeListener(T listener);
}

抽象類 ListenerContainer<T> implements Listenable<T>

/**
 * Abstracts an object that has listeners 裝Listener的容器
 * <T> Listener類型
 */
public class ListenerContainer<T> implements Listenable<T>
{
    private final Map<T, ListenerEntry<T>> listeners = Maps.newConcurrentMap();

    @Override
    public void addListener(T listener)
    {
        addListener(listener, MoreExecutors.sameThreadExecutor());
    }

    @Override
    public void addListener(T listener, Executor executor)
    {
        listeners.put(listener, new ListenerEntry<T>(listener, executor));
    }
    
    /**
     * 對 Listener 列表的遍歷進(jìn)行封裝
     * Utility - apply the given function to each listener. 
     * @param function function to call for each listener
     */
    public void forEach(final Function<T, Void> function)
    {
        for ( final ListenerEntry<T> entry : listeners.values() )
        {
            entry.executor.execute
            (
                new Runnable()
                {
                    @Override
                    public void run()
                    {
                        try
                        {
                            function.apply(entry.listener);
                        }
                        catch ( Throwable e )
                        {
                            ThreadUtils.checkInterrupted(e);
                            log.error(String.format("Listener (%s) threw an exception", entry.listener), e);
                        }
                    }
                }
            );
        }
    }
    
    
    public void clear()
    {
        listeners.clear();
    }

    public int size()
    {
        return listeners.size();
    }
    
}

ConnectionStateManager

// to manage connection state
public class ConnectionStateManager {
    // 又是隊(duì)列被丧? 玩消息什么的都是用隊(duì)列。現(xiàn)在是存放 ConnectionState
    BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);
    // 持有 ListenerContainer
    private final ListenerContainer<ConnectionStateListener> listeners = new ListenerContainer<ConnectionStateListener>();
    
    /**
     * Start the manager肌幽,起一個(gè)線程去執(zhí)行 processEvents()晚碾,要是這線程掛了怎么辦?異常怎么處理的喂急?框架怎么處理的格嘁。。
     */
    public void start()
    {
        service.submit
            (
                new Callable<Object>()
                {
                    @Override
                    public Object call() throws Exception
                    {
                        processEvents();
                        return null;
                    }
                }
            );
    }
    
    @Override
    public void close()
    {
        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
        {
            service.shutdownNow();
            listeners.clear();
        }
    }
    
    // 對不斷產(chǎn)生的 ConnectionState 進(jìn)行處理廊移,生產(chǎn)者糕簿?
     private void processEvents(){
        // 當(dāng) ConnectionStateManager 啟動(dòng)完成
        while ( state.get() == State.STARTED )
        {
            // 不斷從隊(duì)列拿 Conection 狀態(tài)
            final ConnectionState newState = eventQueue.take();
            // 對每個(gè) 狀態(tài)監(jiān)聽接口  應(yīng)用 Function探入, 狀態(tài)監(jiān)聽接口作為 主語
            // forEach 是 listeners封裝的 遍歷所有 listener 的方法而已。懂诗。蜂嗽。
            listeners.forEach(
                new Function<ConnectionStateListener, Void>() {
                    // ConnectionStateListener是我們自己要實(shí)現(xiàn)的接口,stateChanged是要實(shí)現(xiàn)的方法
                    @Override 
                    public Void apply(ConnectionStateListener listener)
                    {
                        listener.stateChanged(client, newState);
                        return null;
                    }
                }
            );
            /**
            上面這段
            如果沒有封裝 Listener 到 ListenerContainer 的話殃恒, 所有 Listener 就是個(gè) List列表植旧,就直接調(diào) Listener 的 stateChanged 方法了吧。
            for Listener {
                listener.stateChanged(client, newState);
            }
            
            因?yàn)?封裝 Listener 到 ListenerContainer了离唐, 上面的 forEach 方法內(nèi)部就可以有些內(nèi)部實(shí)現(xiàn)病附,比如 對每個(gè) Listener 都是用對應(yīng)的 executor 來執(zhí)行。
            **/
        }
    }
    
    
    // 上面的方法是處理 ConnectionState 的亥鬓,那 ConnectionState 是怎么傳進(jìn)來的呢完沪? 生產(chǎn)者?
    /**
     * Post a state change. If the manager is already in that state the change
     * is ignored. Otherwise the change is queued for listeners.
     *
     * @param newConnectionState new state
     * @return true if the state actually changed, false if it was already at that state
     */
    public synchronized boolean addStateChange(ConnectionState newConnectionState)
    {
        // 先判斷 ConnectionStateManager 是否已經(jīng)啟動(dòng)好嵌戈, state 是內(nèi)部 Enum
        if ( state.get() != State.STARTED )
        {
            return false;
        }

        ConnectionState previousState = currentConnectionState;
        if ( previousState == newConnectionState )
        {
            return false;
        }
        ConnectionState localState = newConnectionState;
        // !!!
        notifyAll();

        while ( !eventQueue.offer(state) )
        {
            eventQueue.poll();
            log.warn("ConnectionStateManager queue full - dropping events to make room");
        }
        return true;
    }
    
}
    
   

調(diào)用

啟動(dòng)

// 啟動(dòng) connectionStateManager覆积,不斷檢測 connectionState 變化
connectionStateManager.start(); // must be called before client.start()
// 來個(gè)匿名默認(rèn)的 ConnectionStateListener
final ConnectionStateListener listener = new ConnectionStateListener()
{
    @Override
    public void stateChanged(CuratorFramework client, ConnectionState newState)
    {
        if ( ConnectionState.CONNECTED == newState || ConnectionState.RECONNECTED == newState )
        {
            logAsErrorConnectionErrors.set(true);
        }
    }
};
this.getConnectionStateListenable().addListener(listener);

生產(chǎn) ConnectionState,把zk那里拿到的state轉(zhuǎn)一下熟呛,然后addStateChange

void validateConnection(Watcher.Event.KeeperState state)
{
    if ( state == Watcher.Event.KeeperState.Disconnected )
    {
        suspendConnection();
    }
    else if ( state == Watcher.Event.KeeperState.Expired )
    {
        connectionStateManager.addStateChange(ConnectionState.LOST);
    }
    else if ( state == Watcher.Event.KeeperState.SyncConnected )
    {
        connectionStateManager.addStateChange(ConnectionState.RECONNECTED);
    }
    else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly )
    {
        connectionStateManager.addStateChange(ConnectionState.READ_ONLY);
    }
}

復(fù)用宽档?

還有其他各種Listener,都可以放到 ListenerContainer

private final ListenerContainer<CuratorListener> listeners;
private final ListenerContainer<UnhandledErrorListener> unhandledErrorListeners;


/**
 * Receives notifications about errors and background events
 */
public interface CuratorListener {
    /**
     * Called when a background task has completed or a watch has triggered
     * @param event the event
     * @throws Exception any errors
     */
    public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception;
}

public interface UnhandledErrorListener
{
    /**
     * Called when an exception is caught in a background thread, handler, etc. Before this
     * listener is called, the error will have been logged and a {@link ConnectionState#LOST}
     * event will have been queued for any {@link ConnectionStateListener}s.
     * @param message Source message
     * @param e exception
     */
    public void     unhandledError(String message, Throwable e);
}

總結(jié)一下源碼技巧

  1. ConnectionStateManager 就是個(gè) 生產(chǎn)者消費(fèi)者模式的代碼惰拱,特點(diǎn)就是: public addStateChange() 暴露給外部用戶生產(chǎn) ConnectionState雌贱,通過隊(duì)列eventQueue傳遞,private processEvents()在內(nèi)部對ConnectionState進(jìn)行消費(fèi)偿短。
  2. 直接new匿名類欣孤,對接口進(jìn)行默認(rèn)實(shí)現(xiàn)。
  3. Listener列表對象進(jìn)行Container封裝昔逗,然后 封裝foreach方法降传,傳入Function接口 就是foreach每個(gè)元素要執(zhí)行的業(yè)務(wù)邏輯,方法體就可以加一些其他福利勾怒。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末婆排,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子笔链,更是在濱河造成了極大的恐慌段只,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,682評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件鉴扫,死亡現(xiàn)場離奇詭異赞枕,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,277評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門炕婶,熙熙樓的掌柜王于貴愁眉苦臉地迎上來姐赡,“玉大人,你說我怎么就攤上這事柠掂∠罨” “怎么了?”我有些...
    開封第一講書人閱讀 165,083評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵涯贞,是天一觀的道長枪狂。 經(jīng)常有香客問我,道長宋渔,這世上最難降的妖魔是什么摘完? 我笑而不...
    開封第一講書人閱讀 58,763評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮傻谁,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘列粪。我一直安慰自己审磁,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,785評(píng)論 6 392
  • 文/花漫 我一把揭開白布岂座。 她就那樣靜靜地躺著态蒂,像睡著了一般。 火紅的嫁衣襯著肌膚如雪费什。 梳的紋絲不亂的頭發(fā)上钾恢,一...
    開封第一講書人閱讀 51,624評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音鸳址,去河邊找鬼瘩蚪。 笑死,一個(gè)胖子當(dāng)著我的面吹牛稿黍,可吹牛的內(nèi)容都是我干的疹瘦。 我是一名探鬼主播,決...
    沈念sama閱讀 40,358評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼巡球,長吁一口氣:“原來是場噩夢啊……” “哼言沐!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起酣栈,我...
    開封第一講書人閱讀 39,261評(píng)論 0 276
  • 序言:老撾萬榮一對情侶失蹤险胰,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后矿筝,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體起便,經(jīng)...
    沈念sama閱讀 45,722評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了缨睡。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片鸟悴。...
    茶點(diǎn)故事閱讀 40,030評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖奖年,靈堂內(nèi)的尸體忽然破棺而出细诸,到底是詐尸還是另有隱情,我是刑警寧澤陋守,帶...
    沈念sama閱讀 35,737評(píng)論 5 346
  • 正文 年R本政府宣布震贵,位于F島的核電站,受9級(jí)特大地震影響水评,放射性物質(zhì)發(fā)生泄漏猩系。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,360評(píng)論 3 330
  • 文/蒙蒙 一中燥、第九天 我趴在偏房一處隱蔽的房頂上張望寇甸。 院中可真熱鬧,春花似錦疗涉、人聲如沸拿霉。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,941評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽绽淘。三九已至,卻和暖如春闹伪,著一層夾襖步出監(jiān)牢的瞬間沪铭,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,057評(píng)論 1 270
  • 我被黑心中介騙來泰國打工偏瓤, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留杀怠,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,237評(píng)論 3 371
  • 正文 我出身青樓厅克,卻偏偏與公主長得像驮肉,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子已骇,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,976評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容