5续语、看垂谢!源碼之netty中future之ChannelGroupFuture接口實(shí)現(xiàn)

ChannelGroupFuture接口實(shí)現(xiàn)

直接已經(jīng)介紹過ChannelGroupFuture接口的定義這里將不會(huì)再重復(fù)講述,第一個(gè)類是VoidChannelGroupFuture這個(gè)類并沒有什么好說(shuō)的疮茄,和之前的VoidChannelPromise一樣是無(wú)效的類型滥朱,此類中沒有什么特別可講因?yàn)榇蠖喽紱]有邏輯進(jìn)行執(zhí)行,所以這里將不會(huì)講述此類力试,直接開始講述DefaultChannelGroupFuture類徙邻。

在講述在之前需要講述下它內(nèi)部引用的一些定義。

//此接口是對(duì)Channel對(duì)象做的分組畸裳,分了組每一組都是一個(gè)ChannelGroup而他可以進(jìn)行批量管理
public interface ChannelGroup extends Set<Channel>, Comparable<ChannelGroup> {
    //返回當(dāng)前分組的名字
    String name();
    //根據(jù)ChannelId獲取對(duì)于的Channel,Channel接口定義的時(shí)候就要一個(gè)id方法是用來(lái)返回當(dāng)前的Channel的id
    Channel find(ChannelId id);
    //給當(dāng)前組內(nèi)的Channel群發(fā)消息缰犁,如果消息是ByteBuf類型將會(huì)不會(huì)出現(xiàn)發(fā)送內(nèi)容錯(cuò)誤的情況,因?yàn)樗麜?huì)將內(nèi)容進(jìn)行一份保存每次返回的都是最原始的對(duì)象
    //并且此方法是異步的怖糊,因?yàn)镃hannel中的write是異步的帅容。
    ChannelGroupFuture write(Object message);
    //是上方方法的重載,增加了匹配伍伤,用于匹配channel并徘,這樣就可以在組內(nèi)匹配指定channel進(jìn)行發(fā)送消息
    ChannelGroupFuture write(Object message, ChannelMatcher matcher);
    //上方方法的重載,添加了是否返回?zé)o效的應(yīng)答扰魂,此應(yīng)答就是VoidChannelGroupFuture麦乞,如果是false那么將返回DefaultChannelGroupFuture
    ChannelGroupFuture write(Object message, ChannelMatcher matcher, boolean voidPromise);

    //刷新組內(nèi)全部的channel管道,說(shuō)白了就是循環(huán)channel集合然后調(diào)用每個(gè)channel的flush方法
    ChannelGroup flush();
    //上方方法的重載刷新匹配到的管道
    ChannelGroup flush(ChannelMatcher matcher);
    //結(jié)合了write與flush方法
    ChannelGroupFuture writeAndFlush(Object message);
    //此方法已經(jīng)廢棄了內(nèi)部實(shí)現(xiàn)調(diào)用的是writeAndFlush方法
    @Deprecated
    ChannelGroupFuture flushAndWrite(Object message);
    //結(jié)合flush(ChannelMatcher matcher)與write(Object message, ChannelMatcher matcher)方法阅爽,可以過濾管道
    ChannelGroupFuture writeAndFlush(Object message, ChannelMatcher matcher);
    //結(jié)合flush(ChannelMatcher matcher);與write(Object message, ChannelMatcher matcher, boolean voidPromise);方法路幸。
    ChannelGroupFuture writeAndFlush(Object message, ChannelMatcher matcher, boolean voidPromise);
    //內(nèi)部調(diào)用writeAndFlush(Object message, ChannelMatcher matcher)方法,此方法已廢棄
    @Deprecated
    ChannelGroupFuture flushAndWrite(Object message, ChannelMatcher matcher);
    //操作組內(nèi)所有的channel斷開連接
    ChannelGroupFuture disconnect();
    //上方方法的重構(gòu)只是添加了篩選channel的功能
    ChannelGroupFuture disconnect(ChannelMatcher matcher);
    //關(guān)閉channel管道
    ChannelGroupFuture close();
    //重構(gòu)上方方法添加了管道的過濾
    ChannelGroupFuture close(ChannelMatcher matcher);
    //用于注銷事件使用EventLoop中的
    @Deprecated
    ChannelGroupFuture deregister();
    //重載上方代碼加入管道篩選已經(jīng)廢棄不建議使用
    @Deprecated
    ChannelGroupFuture deregister(ChannelMatcher matcher);
    //獲取管道關(guān)閉時(shí)的Future付翁,可以使用它監(jiān)聽關(guān)閉
    ChannelGroupFuture newCloseFuture();
    //重載上方方法獲取指定管道的關(guān)閉Future
    ChannelGroupFuture newCloseFuture(ChannelMatcher matcher);
}
//默認(rèn)管道組的操作管理简肴,此類是提供給ChannelGroup接口使用的,因?yàn)镃hannelGroup是批量管理自然也需要有個(gè)能夠批量管理的Future而此類就是這個(gè)Future
final class DefaultChannelGroupFuture extends DefaultPromise<Void> implements ChannelGroupFuture {
    //管理的操作集合是哪個(gè)ChannelGroup的百侧,此變量暫無(wú)意義砰识,因?yàn)楸菊虏⑽词褂?    private final ChannelGroup group;
    //當(dāng)前組中的結(jié)果管理的所有管道和管道處理
    private final Map<Channel, ChannelFuture> futures;
    //本組任務(wù)執(zhí)行成功個(gè)數(shù)和失敗個(gè)數(shù)
    private int successCount;
    private int failureCount;
    //默認(rèn)監(jiān)聽器的實(shí)現(xiàn)能扒,用于記錄執(zhí)行結(jié)果,在futures中的每個(gè)任務(wù)執(zhí)行完成都會(huì)操作此監(jiān)聽器用于記錄操作結(jié)果辫狼,比如成功數(shù)和失敗數(shù)與失敗結(jié)果的封裝
    private final ChannelFutureListener childListener = new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            //獲取當(dāng)前任務(wù)的執(zhí)行結(jié)果
            boolean success = future.isSuccess();
            //聲明是否是最后一個(gè)處理結(jié)果
            boolean callSetDone;
            //使用了DefaultChannelGroupFuture為鎖是為了確保上方的計(jì)數(shù)變量的準(zhǔn)確性
            synchronized (DefaultChannelGroupFuture.this) {
                //如果是成功則成功計(jì)數(shù)加一否則失敗計(jì)數(shù)加一
                if (success) {
                    successCount ++;
                } else {
                    failureCount ++;
                }
                //如果成功數(shù)和失敗數(shù)的和等于總處理數(shù)則返回true否則返回false
                callSetDone = successCount + failureCount == futures.size();
                //斷言他的結(jié)果必然是小于等于處理任務(wù)初斑,如果出現(xiàn)大于那么邏輯肯定是有問題的需要拋出斷言異常,當(dāng)然前提是運(yùn)行時(shí)開啟了斷言
                assert successCount + failureCount <= futures.size();
            }
            //如果當(dāng)前處理是最后一個(gè)結(jié)果
            if (callSetDone) {
                //判斷當(dāng)前失敗的個(gè)數(shù)是否大于0
                if (failureCount > 0) {
                    //創(chuàng)建一個(gè)集合用于存儲(chǔ)執(zhí)行結(jié)果失敗的管道與異常信息膨处,設(shè)置個(gè)數(shù)為失敗個(gè)數(shù)
                    List<Map.Entry<Channel, Throwable>> failed =
                            new ArrayList<Map.Entry<Channel, Throwable>>(failureCount);
                    //遍歷所有的任務(wù)
                    for (ChannelFuture f: futures.values()) {
                        //判斷當(dāng)前的任務(wù)是否失敗
                        if (!f.isSuccess()) {
                            //如果失敗則給集合添加錯(cuò)誤的管道對(duì)于的錯(cuò)誤信息
                            failed.add(new DefaultEntry<Channel, Throwable>(f.channel(), f.cause()));
                        }
                    }
                    //給當(dāng)前的任務(wù)設(shè)置為失敗并且傳入的異常是個(gè)自定義異常见秤,此異常用于存儲(chǔ)具體的錯(cuò)誤數(shù)據(jù)信息作為返回值
                    setFailure0(new ChannelGroupException(failed));
                } else {
                    //如果小于0則代表沒有失敗那么設(shè)置當(dāng)前的管理是成功狀態(tài)
                    setSuccess0();
                }
            }
        }
    };

    //構(gòu)造器,所屬的ChannelGroup真椿、futures代表當(dāng)前任務(wù)組需要處理的任務(wù)們鹃答,executor任務(wù)執(zhí)行器
    DefaultChannelGroupFuture(ChannelGroup group, Collection<ChannelFuture> futures,  EventExecutor executor) {
        super(executor);
        if (group == null) {
            throw new NullPointerException("group");
        }
        if (futures == null) {
            throw new NullPointerException("futures");
        }

        this.group = group;
        //創(chuàng)建一個(gè)future集合 key是管道 value是對(duì)于的管道任務(wù)
        Map<Channel, ChannelFuture> futureMap = new LinkedHashMap<Channel, ChannelFuture>();
        //將傳入的list任務(wù) 動(dòng)態(tài)添加到futureMap中
        for (ChannelFuture f: futures) {
            futureMap.put(f.channel(), f);
        }
        //因?yàn)榇巳蝿?wù)集合是不允許修改的所以此處轉(zhuǎn)換為了不允許修改的map,此map如果調(diào)用put remove等修改方法則會(huì)拋出異常
        this.futures = Collections.unmodifiableMap(futureMap);
        //給管理的所有任務(wù)添加任務(wù)完成的監(jiān)聽器突硝,傳入的任務(wù)只要完成就會(huì)進(jìn)入上方的完成監(jiān)聽器從而達(dá)到計(jì)數(shù)的效果
        for (ChannelFuture f: this.futures.values()) {
            f.addListener(childListener);
        }
        //如果傳入的任務(wù)是空則直接完成當(dāng)前任務(wù)测摔,這里可能有些繞 因?yàn)槭荊roup他是對(duì)多個(gè)任務(wù)的管理,但是當(dāng)前類也是一個(gè)任務(wù)只不過是用于管理其他任務(wù)集合的任務(wù)罷了
        // Done on arrival?
        if (this.futures.isEmpty()) {
            setSuccess0();
        }
    }
    //此構(gòu)造器參考上方構(gòu)造器并沒有特殊之處只是減少了將list轉(zhuǎn)map的操作
    DefaultChannelGroupFuture(ChannelGroup group, Map<Channel, ChannelFuture> futures, EventExecutor executor) {
        super(executor);
        this.group = group;
        this.futures = Collections.unmodifiableMap(futures);
        for (ChannelFuture f: this.futures.values()) {
            f.addListener(childListener);
        }

        // Done on arrival?
        if (this.futures.isEmpty()) {
            setSuccess0();
        }
    }

    @Override
    public ChannelGroup group() {
        return group;
    }
    //根據(jù)管道查找任務(wù)解恰,就是使用的上方futures的get方法
    @Override
    public ChannelFuture find(Channel channel) {
        return futures.get(channel);
    }
    //迭代器則是獲取的futures的迭代器
    @Override
    public Iterator<ChannelFuture> iterator() {
        return futures.values().iterator();
    }
    //是否部分成功
    @Override
    public synchronized boolean isPartialSuccess() {
        //successCount != 0 代表總有一個(gè)是成功的
        //successCount != futures.size() 不是所有的都成功的
        return successCount != 0 && successCount != futures.size();
    }
    //是否部分失敗
    //參考isPartialSuccess內(nèi)部解釋
    @Override
    public synchronized boolean isPartialFailure() {
        return failureCount != 0 && failureCount != futures.size();
    }
    //下面的方法都是使用父級(jí)的方法此處不做講解锋八,有疑問可以看父級(jí)實(shí)現(xiàn)
    @Override
    public DefaultChannelGroupFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener) {
        super.addListener(listener);
        return this;
    }

    @Override
    public DefaultChannelGroupFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) {
        super.addListeners(listeners);
        return this;
    }

    @Override
    public DefaultChannelGroupFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener) {
        super.removeListener(listener);
        return this;
    }

    @Override
    public DefaultChannelGroupFuture removeListeners(
            GenericFutureListener<? extends Future<? super Void>>... listeners) {
        super.removeListeners(listeners);
        return this;
    }

    @Override
    public DefaultChannelGroupFuture await() throws InterruptedException {
        super.await();
        return this;
    }

    @Override
    public DefaultChannelGroupFuture awaitUninterruptibly() {
        super.awaitUninterruptibly();
        return this;
    }

    @Override
    public DefaultChannelGroupFuture syncUninterruptibly() {
        super.syncUninterruptibly();
        return this;
    }

    @Override
    public DefaultChannelGroupFuture sync() throws InterruptedException {
        super.sync();
        return this;
    }

    @Override
    public ChannelGroupException cause() {
        return (ChannelGroupException) super.cause();
    }
    //需要注意這兩個(gè)方法是私有方法代表著外部允許設(shè)置成功和失敗,從而也就看出外部不允許操作這種group組任務(wù)的操作
    private void setSuccess0() {
        super.setSuccess(null);
    }

    private void setFailure0(ChannelGroupException cause) {
        super.setFailure(cause);
    }
    //上面的講解這里又得到了驗(yàn)證护盈,如果外部調(diào)用了組的設(shè)置方法那么將會(huì)拋出異常
    @Override
    public DefaultChannelGroupFuture setSuccess(Void result) {
        throw new IllegalStateException();
    }

    @Override
    public boolean trySuccess(Void result) {
        throw new IllegalStateException();
    }

    @Override
    public DefaultChannelGroupFuture setFailure(Throwable cause) {
        throw new IllegalStateException();
    }

    @Override
    public boolean tryFailure(Throwable cause) {
        throw new IllegalStateException();
    }
    //檢查死鎖
    @Override
    protected void checkDeadLock() {
        //獲取到當(dāng)前的執(zhí)行
        EventExecutor e = executor();
        //如果當(dāng)前執(zhí)行器是null自然不會(huì)死鎖挟纱,因?yàn)椴]有線程執(zhí)行 
        //ImmediateEventExecutor是立即執(zhí)行的執(zhí)行器還是當(dāng)前線程所以不會(huì)發(fā)生死鎖,后續(xù)再繼續(xù)線程池介紹的時(shí)候?qū)?huì)詳細(xì)講解
        //e.inEventLoop() 如果當(dāng)前線程就是執(zhí)行線程那么就會(huì)發(fā)生死鎖
        //這里需要清楚第一第二條件都是派出單線程執(zhí)行情況黄琼,如果是單線程那么無(wú)死鎖存在樊销,而最后一個(gè)條件是判斷兩個(gè)線程,到后面講解線程池實(shí)現(xiàn)時(shí)將會(huì)講述
        if (e != null && e != ImmediateEventExecutor.INSTANCE && e.inEventLoop()) {
            throw new BlockingOperationException();
        }
    }
    //此類很簡(jiǎn)單用于存儲(chǔ)管道和任務(wù)
    private static final class DefaultEntry<K, V> implements Map.Entry<K, V> {
        private final K key;
        private final V value;

        DefaultEntry(K key, V value) {
            this.key = key;
            this.value = value;
        }

        @Override
        public K getKey() {
            return key;
        }

        @Override
        public V getValue() {
            return value;
        }

        @Override
        public V setValue(V value) {
            throw new UnsupportedOperationException("read-only");
        }
    }
}

到此ChannelGroupFuture的實(shí)現(xiàn)基本完成了可能有人會(huì)覺得一開始的接口并沒有用到脏款,這里需要知道一個(gè)定義,就是ChannelGroup與ChannelGroupFuture的關(guān)系裤园,
ChannelGroup是用于管理管道的而內(nèi)部很多批量操作最終都是分組成為了ChannelGroupFuture作為統(tǒng)一返回撤师。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市拧揽,隨后出現(xiàn)的幾起案子剃盾,更是在濱河造成了極大的恐慌,老刑警劉巖淤袜,帶你破解...
    沈念sama閱讀 210,978評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件痒谴,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡铡羡,警方通過查閱死者的電腦和手機(jī)积蔚,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,954評(píng)論 2 384
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)烦周,“玉大人尽爆,你說(shuō)我怎么就攤上這事怎顾。” “怎么了漱贱?”我有些...
    開封第一講書人閱讀 156,623評(píng)論 0 345
  • 文/不壞的土叔 我叫張陵槐雾,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我幅狮,道長(zhǎng)募强,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,324評(píng)論 1 282
  • 正文 為了忘掉前任崇摄,我火速辦了婚禮钻注,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘配猫。我一直安慰自己幅恋,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,390評(píng)論 5 384
  • 文/花漫 我一把揭開白布泵肄。 她就那樣靜靜地躺著捆交,像睡著了一般。 火紅的嫁衣襯著肌膚如雪腐巢。 梳的紋絲不亂的頭發(fā)上品追,一...
    開封第一講書人閱讀 49,741評(píng)論 1 289
  • 那天,我揣著相機(jī)與錄音冯丙,去河邊找鬼肉瓦。 笑死,一個(gè)胖子當(dāng)著我的面吹牛胃惜,可吹牛的內(nèi)容都是我干的泞莉。 我是一名探鬼主播,決...
    沈念sama閱讀 38,892評(píng)論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼船殉,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼鲫趁!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起利虫,我...
    開封第一講書人閱讀 37,655評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤挨厚,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后糠惫,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體疫剃,經(jīng)...
    沈念sama閱讀 44,104評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,451評(píng)論 2 325
  • 正文 我和宋清朗相戀三年硼讽,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了巢价。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,569評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖蹄溉,靈堂內(nèi)的尸體忽然破棺而出咨油,到底是詐尸還是另有隱情,我是刑警寧澤柒爵,帶...
    沈念sama閱讀 34,254評(píng)論 4 328
  • 正文 年R本政府宣布役电,位于F島的核電站,受9級(jí)特大地震影響棉胀,放射性物質(zhì)發(fā)生泄漏法瑟。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,834評(píng)論 3 312
  • 文/蒙蒙 一唁奢、第九天 我趴在偏房一處隱蔽的房頂上張望霎挟。 院中可真熱鬧,春花似錦麻掸、人聲如沸酥夭。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,725評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)熬北。三九已至,卻和暖如春诚隙,著一層夾襖步出監(jiān)牢的瞬間讶隐,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,950評(píng)論 1 264
  • 我被黑心中介騙來(lái)泰國(guó)打工久又, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留巫延,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,260評(píng)論 2 360
  • 正文 我出身青樓地消,卻偏偏與公主長(zhǎng)得像炉峰,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子犯建,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,446評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容