ChannelGroupFuture接口實(shí)現(xiàn)
直接已經(jīng)介紹過ChannelGroupFuture接口的定義這里將不會(huì)再重復(fù)講述,第一個(gè)類是VoidChannelGroupFuture這個(gè)類并沒有什么好說(shuō)的疮茄,和之前的VoidChannelPromise一樣是無(wú)效的類型滥朱,此類中沒有什么特別可講因?yàn)榇蠖喽紱]有邏輯進(jìn)行執(zhí)行,所以這里將不會(huì)講述此類力试,直接開始講述DefaultChannelGroupFuture類徙邻。
在講述在之前需要講述下它內(nèi)部引用的一些定義。
//此接口是對(duì)Channel對(duì)象做的分組畸裳,分了組每一組都是一個(gè)ChannelGroup而他可以進(jìn)行批量管理
public interface ChannelGroup extends Set<Channel>, Comparable<ChannelGroup> {
//返回當(dāng)前分組的名字
String name();
//根據(jù)ChannelId獲取對(duì)于的Channel,Channel接口定義的時(shí)候就要一個(gè)id方法是用來(lái)返回當(dāng)前的Channel的id
Channel find(ChannelId id);
//給當(dāng)前組內(nèi)的Channel群發(fā)消息缰犁,如果消息是ByteBuf類型將會(huì)不會(huì)出現(xiàn)發(fā)送內(nèi)容錯(cuò)誤的情況,因?yàn)樗麜?huì)將內(nèi)容進(jìn)行一份保存每次返回的都是最原始的對(duì)象
//并且此方法是異步的怖糊,因?yàn)镃hannel中的write是異步的帅容。
ChannelGroupFuture write(Object message);
//是上方方法的重載,增加了匹配伍伤,用于匹配channel并徘,這樣就可以在組內(nèi)匹配指定channel進(jìn)行發(fā)送消息
ChannelGroupFuture write(Object message, ChannelMatcher matcher);
//上方方法的重載,添加了是否返回?zé)o效的應(yīng)答扰魂,此應(yīng)答就是VoidChannelGroupFuture麦乞,如果是false那么將返回DefaultChannelGroupFuture
ChannelGroupFuture write(Object message, ChannelMatcher matcher, boolean voidPromise);
//刷新組內(nèi)全部的channel管道,說(shuō)白了就是循環(huán)channel集合然后調(diào)用每個(gè)channel的flush方法
ChannelGroup flush();
//上方方法的重載刷新匹配到的管道
ChannelGroup flush(ChannelMatcher matcher);
//結(jié)合了write與flush方法
ChannelGroupFuture writeAndFlush(Object message);
//此方法已經(jīng)廢棄了內(nèi)部實(shí)現(xiàn)調(diào)用的是writeAndFlush方法
@Deprecated
ChannelGroupFuture flushAndWrite(Object message);
//結(jié)合flush(ChannelMatcher matcher)與write(Object message, ChannelMatcher matcher)方法阅爽,可以過濾管道
ChannelGroupFuture writeAndFlush(Object message, ChannelMatcher matcher);
//結(jié)合flush(ChannelMatcher matcher);與write(Object message, ChannelMatcher matcher, boolean voidPromise);方法路幸。
ChannelGroupFuture writeAndFlush(Object message, ChannelMatcher matcher, boolean voidPromise);
//內(nèi)部調(diào)用writeAndFlush(Object message, ChannelMatcher matcher)方法,此方法已廢棄
@Deprecated
ChannelGroupFuture flushAndWrite(Object message, ChannelMatcher matcher);
//操作組內(nèi)所有的channel斷開連接
ChannelGroupFuture disconnect();
//上方方法的重構(gòu)只是添加了篩選channel的功能
ChannelGroupFuture disconnect(ChannelMatcher matcher);
//關(guān)閉channel管道
ChannelGroupFuture close();
//重構(gòu)上方方法添加了管道的過濾
ChannelGroupFuture close(ChannelMatcher matcher);
//用于注銷事件使用EventLoop中的
@Deprecated
ChannelGroupFuture deregister();
//重載上方代碼加入管道篩選已經(jīng)廢棄不建議使用
@Deprecated
ChannelGroupFuture deregister(ChannelMatcher matcher);
//獲取管道關(guān)閉時(shí)的Future付翁,可以使用它監(jiān)聽關(guān)閉
ChannelGroupFuture newCloseFuture();
//重載上方方法獲取指定管道的關(guān)閉Future
ChannelGroupFuture newCloseFuture(ChannelMatcher matcher);
}
//默認(rèn)管道組的操作管理简肴,此類是提供給ChannelGroup接口使用的,因?yàn)镃hannelGroup是批量管理自然也需要有個(gè)能夠批量管理的Future而此類就是這個(gè)Future
final class DefaultChannelGroupFuture extends DefaultPromise<Void> implements ChannelGroupFuture {
//管理的操作集合是哪個(gè)ChannelGroup的百侧,此變量暫無(wú)意義砰识,因?yàn)楸菊虏⑽词褂? private final ChannelGroup group;
//當(dāng)前組中的結(jié)果管理的所有管道和管道處理
private final Map<Channel, ChannelFuture> futures;
//本組任務(wù)執(zhí)行成功個(gè)數(shù)和失敗個(gè)數(shù)
private int successCount;
private int failureCount;
//默認(rèn)監(jiān)聽器的實(shí)現(xiàn)能扒,用于記錄執(zhí)行結(jié)果,在futures中的每個(gè)任務(wù)執(zhí)行完成都會(huì)操作此監(jiān)聽器用于記錄操作結(jié)果辫狼,比如成功數(shù)和失敗數(shù)與失敗結(jié)果的封裝
private final ChannelFutureListener childListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
//獲取當(dāng)前任務(wù)的執(zhí)行結(jié)果
boolean success = future.isSuccess();
//聲明是否是最后一個(gè)處理結(jié)果
boolean callSetDone;
//使用了DefaultChannelGroupFuture為鎖是為了確保上方的計(jì)數(shù)變量的準(zhǔn)確性
synchronized (DefaultChannelGroupFuture.this) {
//如果是成功則成功計(jì)數(shù)加一否則失敗計(jì)數(shù)加一
if (success) {
successCount ++;
} else {
failureCount ++;
}
//如果成功數(shù)和失敗數(shù)的和等于總處理數(shù)則返回true否則返回false
callSetDone = successCount + failureCount == futures.size();
//斷言他的結(jié)果必然是小于等于處理任務(wù)初斑,如果出現(xiàn)大于那么邏輯肯定是有問題的需要拋出斷言異常,當(dāng)然前提是運(yùn)行時(shí)開啟了斷言
assert successCount + failureCount <= futures.size();
}
//如果當(dāng)前處理是最后一個(gè)結(jié)果
if (callSetDone) {
//判斷當(dāng)前失敗的個(gè)數(shù)是否大于0
if (failureCount > 0) {
//創(chuàng)建一個(gè)集合用于存儲(chǔ)執(zhí)行結(jié)果失敗的管道與異常信息膨处,設(shè)置個(gè)數(shù)為失敗個(gè)數(shù)
List<Map.Entry<Channel, Throwable>> failed =
new ArrayList<Map.Entry<Channel, Throwable>>(failureCount);
//遍歷所有的任務(wù)
for (ChannelFuture f: futures.values()) {
//判斷當(dāng)前的任務(wù)是否失敗
if (!f.isSuccess()) {
//如果失敗則給集合添加錯(cuò)誤的管道對(duì)于的錯(cuò)誤信息
failed.add(new DefaultEntry<Channel, Throwable>(f.channel(), f.cause()));
}
}
//給當(dāng)前的任務(wù)設(shè)置為失敗并且傳入的異常是個(gè)自定義異常见秤,此異常用于存儲(chǔ)具體的錯(cuò)誤數(shù)據(jù)信息作為返回值
setFailure0(new ChannelGroupException(failed));
} else {
//如果小于0則代表沒有失敗那么設(shè)置當(dāng)前的管理是成功狀態(tài)
setSuccess0();
}
}
}
};
//構(gòu)造器,所屬的ChannelGroup真椿、futures代表當(dāng)前任務(wù)組需要處理的任務(wù)們鹃答,executor任務(wù)執(zhí)行器
DefaultChannelGroupFuture(ChannelGroup group, Collection<ChannelFuture> futures, EventExecutor executor) {
super(executor);
if (group == null) {
throw new NullPointerException("group");
}
if (futures == null) {
throw new NullPointerException("futures");
}
this.group = group;
//創(chuàng)建一個(gè)future集合 key是管道 value是對(duì)于的管道任務(wù)
Map<Channel, ChannelFuture> futureMap = new LinkedHashMap<Channel, ChannelFuture>();
//將傳入的list任務(wù) 動(dòng)態(tài)添加到futureMap中
for (ChannelFuture f: futures) {
futureMap.put(f.channel(), f);
}
//因?yàn)榇巳蝿?wù)集合是不允許修改的所以此處轉(zhuǎn)換為了不允許修改的map,此map如果調(diào)用put remove等修改方法則會(huì)拋出異常
this.futures = Collections.unmodifiableMap(futureMap);
//給管理的所有任務(wù)添加任務(wù)完成的監(jiān)聽器突硝,傳入的任務(wù)只要完成就會(huì)進(jìn)入上方的完成監(jiān)聽器從而達(dá)到計(jì)數(shù)的效果
for (ChannelFuture f: this.futures.values()) {
f.addListener(childListener);
}
//如果傳入的任務(wù)是空則直接完成當(dāng)前任務(wù)测摔,這里可能有些繞 因?yàn)槭荊roup他是對(duì)多個(gè)任務(wù)的管理,但是當(dāng)前類也是一個(gè)任務(wù)只不過是用于管理其他任務(wù)集合的任務(wù)罷了
// Done on arrival?
if (this.futures.isEmpty()) {
setSuccess0();
}
}
//此構(gòu)造器參考上方構(gòu)造器并沒有特殊之處只是減少了將list轉(zhuǎn)map的操作
DefaultChannelGroupFuture(ChannelGroup group, Map<Channel, ChannelFuture> futures, EventExecutor executor) {
super(executor);
this.group = group;
this.futures = Collections.unmodifiableMap(futures);
for (ChannelFuture f: this.futures.values()) {
f.addListener(childListener);
}
// Done on arrival?
if (this.futures.isEmpty()) {
setSuccess0();
}
}
@Override
public ChannelGroup group() {
return group;
}
//根據(jù)管道查找任務(wù)解恰,就是使用的上方futures的get方法
@Override
public ChannelFuture find(Channel channel) {
return futures.get(channel);
}
//迭代器則是獲取的futures的迭代器
@Override
public Iterator<ChannelFuture> iterator() {
return futures.values().iterator();
}
//是否部分成功
@Override
public synchronized boolean isPartialSuccess() {
//successCount != 0 代表總有一個(gè)是成功的
//successCount != futures.size() 不是所有的都成功的
return successCount != 0 && successCount != futures.size();
}
//是否部分失敗
//參考isPartialSuccess內(nèi)部解釋
@Override
public synchronized boolean isPartialFailure() {
return failureCount != 0 && failureCount != futures.size();
}
//下面的方法都是使用父級(jí)的方法此處不做講解锋八,有疑問可以看父級(jí)實(shí)現(xiàn)
@Override
public DefaultChannelGroupFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener) {
super.addListener(listener);
return this;
}
@Override
public DefaultChannelGroupFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) {
super.addListeners(listeners);
return this;
}
@Override
public DefaultChannelGroupFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener) {
super.removeListener(listener);
return this;
}
@Override
public DefaultChannelGroupFuture removeListeners(
GenericFutureListener<? extends Future<? super Void>>... listeners) {
super.removeListeners(listeners);
return this;
}
@Override
public DefaultChannelGroupFuture await() throws InterruptedException {
super.await();
return this;
}
@Override
public DefaultChannelGroupFuture awaitUninterruptibly() {
super.awaitUninterruptibly();
return this;
}
@Override
public DefaultChannelGroupFuture syncUninterruptibly() {
super.syncUninterruptibly();
return this;
}
@Override
public DefaultChannelGroupFuture sync() throws InterruptedException {
super.sync();
return this;
}
@Override
public ChannelGroupException cause() {
return (ChannelGroupException) super.cause();
}
//需要注意這兩個(gè)方法是私有方法代表著外部允許設(shè)置成功和失敗,從而也就看出外部不允許操作這種group組任務(wù)的操作
private void setSuccess0() {
super.setSuccess(null);
}
private void setFailure0(ChannelGroupException cause) {
super.setFailure(cause);
}
//上面的講解這里又得到了驗(yàn)證护盈,如果外部調(diào)用了組的設(shè)置方法那么將會(huì)拋出異常
@Override
public DefaultChannelGroupFuture setSuccess(Void result) {
throw new IllegalStateException();
}
@Override
public boolean trySuccess(Void result) {
throw new IllegalStateException();
}
@Override
public DefaultChannelGroupFuture setFailure(Throwable cause) {
throw new IllegalStateException();
}
@Override
public boolean tryFailure(Throwable cause) {
throw new IllegalStateException();
}
//檢查死鎖
@Override
protected void checkDeadLock() {
//獲取到當(dāng)前的執(zhí)行
EventExecutor e = executor();
//如果當(dāng)前執(zhí)行器是null自然不會(huì)死鎖挟纱,因?yàn)椴]有線程執(zhí)行
//ImmediateEventExecutor是立即執(zhí)行的執(zhí)行器還是當(dāng)前線程所以不會(huì)發(fā)生死鎖,后續(xù)再繼續(xù)線程池介紹的時(shí)候?qū)?huì)詳細(xì)講解
//e.inEventLoop() 如果當(dāng)前線程就是執(zhí)行線程那么就會(huì)發(fā)生死鎖
//這里需要清楚第一第二條件都是派出單線程執(zhí)行情況黄琼,如果是單線程那么無(wú)死鎖存在樊销,而最后一個(gè)條件是判斷兩個(gè)線程,到后面講解線程池實(shí)現(xiàn)時(shí)將會(huì)講述
if (e != null && e != ImmediateEventExecutor.INSTANCE && e.inEventLoop()) {
throw new BlockingOperationException();
}
}
//此類很簡(jiǎn)單用于存儲(chǔ)管道和任務(wù)
private static final class DefaultEntry<K, V> implements Map.Entry<K, V> {
private final K key;
private final V value;
DefaultEntry(K key, V value) {
this.key = key;
this.value = value;
}
@Override
public K getKey() {
return key;
}
@Override
public V getValue() {
return value;
}
@Override
public V setValue(V value) {
throw new UnsupportedOperationException("read-only");
}
}
}
到此ChannelGroupFuture的實(shí)現(xiàn)基本完成了可能有人會(huì)覺得一開始的接口并沒有用到脏款,這里需要知道一個(gè)定義,就是ChannelGroup與ChannelGroupFuture的關(guān)系裤园,
ChannelGroup是用于管理管道的而內(nèi)部很多批量操作最終都是分組成為了ChannelGroupFuture作為統(tǒng)一返回撤师。