Futuer的實(shí)現(xiàn)
//默認(rèn)的Promise實(shí)現(xiàn),并沒(méi)有好解釋的這個(gè)名字荒叼,只不過(guò)從名字可以看出他會(huì)有實(shí)現(xiàn)或者說(shuō)別的實(shí)現(xiàn)
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
//netty內(nèi)部自己對(duì)日志打印的實(shí)現(xiàn)轿偎,當(dāng)然這也算是一個(gè)模塊了后面會(huì)詳細(xì)介紹只用知道這是打印日志使用的
private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class);
//打印異常的日志對(duì)象
private static final InternalLogger rejectedExecutionLogger = InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution");
//獲取最大的棧的深度,暫時(shí)知道有這么個(gè)東西即可后面用的時(shí)候再解釋
//SystemPropertyUtil是netty的自帶的配置類可以再啟動(dòng)的時(shí)候進(jìn)行配置,他最終使用的是System.getProperty方法被廓。
private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
//原子更新操作坏晦,這里可以理解為在多線程下操作是線程安全的,具體實(shí)現(xiàn)以后筆者會(huì)在其他文章中講解嫁乘。
//AtomicReferenceFieldUpdater.newUpdater使用了這個(gè)方法傳入了三個(gè)參數(shù)
//1昆婿、DefaultPromise.class需要原子操作的類型
//2、Object.class需要原子操作類中的字段類型
//3蜓斧、result 需要原子操作字段的字段名
//兩個(gè)泛型則是兩個(gè)對(duì)于的類型T仓蛆、V。具體感興趣的讀者可以去看看他的源碼
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
//操作成功挎春,用于result的設(shè)置
private static final Object SUCCESS = new Object();
//不可取消看疙,用于result的設(shè)置具體看接下來(lái)的使用
private static final Object UNCANCELLABLE = new Object();
//存儲(chǔ)取消的原因,用于result的設(shè)置
private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(
new CancellationException(), DefaultPromise.class, "cancel(...)"));
//前面講了三個(gè)設(shè)置對(duì)象終于到了result的直奋,此類是任務(wù)的執(zhí)行結(jié)果
private volatile Object result;
//前面說(shuō)了future是異步使用的來(lái)操作任務(wù)的所以需要執(zhí)行器能庆,因?yàn)閳?zhí)行器是多線程的。
private final EventExecutor executor;
//需要通知的監(jiān)聽(tīng)器如果為null則會(huì)有兩種情況1脚线、沒(méi)有監(jiān)聽(tīng)器2搁胆、監(jiān)聽(tīng)器已經(jīng)通知完畢
private Object listeners;
//計(jì)數(shù),在當(dāng)前類中有地方使用了Object的wait和notifyAll用來(lái)計(jì)數(shù)wait的次數(shù)
private short waiters;
//避免出現(xiàn)并發(fā)通知邮绿,true已經(jīng)有線程進(jìn)行通知了渠旁,false沒(méi)有線程發(fā)送通知具體查看實(shí)現(xiàn)代碼下方會(huì)介紹。
private boolean notifyingListeners;
//構(gòu)造器斯碌,傳入執(zhí)行器一死,并進(jìn)行了校驗(yàn)如果執(zhí)行器是null則會(huì)拋出nullpoint異常
public DefaultPromise(EventExecutor executor) {
this.executor = checkNotNull(executor, "executor");
}
//無(wú)參構(gòu)造,如果子類的實(shí)現(xiàn)沒(méi)有使用到執(zhí)行器那么可以調(diào)用無(wú)參構(gòu)造傻唾,因?yàn)閑xecutor是final的所以必須初始化這里默認(rèn)給了null
protected DefaultPromise() {
// only for subclasses
executor = null;
}
//前面說(shuō)過(guò)Primise是個(gè)特殊的Future投慈,可以進(jìn)行手動(dòng)設(shè)置執(zhí)行成功
@Override
public Promise<V> setSuccess(V result) {
//設(shè)置結(jié)果如果返回true代表設(shè)置成功則調(diào)用通知承耿,否則代表以及完成了并且拋出異常
if (setSuccess0(result)) {
notifyListeners();
return this;
}
throw new IllegalStateException("complete already: " + this);
}
//和上方方法并沒(méi)有不同,僅僅是如果設(shè)置成功失敗則返回false伪煤,而上方設(shè)置失敗則拋出異常加袋,具體的請(qǐng)查看上篇文章的定義
@Override
public boolean trySuccess(V result) {
if (setSuccess0(result)) {
notifyListeners();
return true;
}
return false;
}
//設(shè)置當(dāng)前的任務(wù)為失敗并且傳入一個(gè)異常信息,返回true則通知監(jiān)聽(tīng)器,否則拋出異常
@Override
public Promise<V> setFailure(Throwable cause) {
if (setFailure0(cause)) {
notifyListeners();
return this;
}
throw new IllegalStateException("complete already: " + this, cause);
}
//嘗試設(shè)置當(dāng)前任務(wù)為失敗并且傳入一個(gè)異常信息抱既,返回true則嘗試成功并且通知監(jiān)聽(tīng)器职烧,否則返回false
@Override
public boolean tryFailure(Throwable cause) {
if (setFailure0(cause)) {
notifyListeners();
return true;
}
return false;
}
//設(shè)置當(dāng)前任務(wù)為不可取消
@Override
public boolean setUncancellable() {
//在上方說(shuō)原子操作的時(shí)候RESULT_UPDATER字段是用來(lái)設(shè)置結(jié)果的。
//這里便使用它來(lái)操作設(shè)置當(dāng)前的result為UNCANCELLABLE對(duì)象防泵,
//第一參數(shù)傳入需要操作的對(duì)象蚀之,第二參數(shù)傳入預(yù)計(jì)當(dāng)前的值,第三個(gè)參數(shù)傳入需要設(shè)置的對(duì)象捷泞。
//這里講述第二個(gè)對(duì)象足删,此字段的操作叫做CAS就是下面方法名的縮寫(xiě),翻譯則是比較和設(shè)置锁右,如果當(dāng)前的值是傳入的第二個(gè)參數(shù)那么就設(shè)置第三個(gè)參數(shù)為這個(gè)字段的值失受。
//這里并不是講述他所以大概講述下即可。
if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) {
//設(shè)置成功則返回true說(shuō)明當(dāng)前的任務(wù)狀態(tài)已經(jīng)是不可取消狀態(tài)了
return true;
}
Object result = this.result;
//否則獲取當(dāng)前的結(jié)果并且判斷是成功了還是被取消了咏瑟,兩者一者滿足即可拂到。
//1、要么成功2码泞、要么被取消
return !isDone0(result) || !isCancelled0(result);
}
//當(dāng)前的任務(wù)是否執(zhí)行完成
@Override
public boolean isSuccess() {
Object result = this.result;
//result不等于null是必須的因?yàn)槌跏贾稻褪莕ull說(shuō)明并沒(méi)有進(jìn)行任何狀態(tài)的設(shè)置
//result不等于UNCANCELLABLE 代表是不可取消狀態(tài)但是他是未完成的因?yàn)樽罱K的result并不會(huì)是他兄旬,從而代表正在運(yùn)行并且在運(yùn)行途中還設(shè)置了不可取消狀態(tài)
//result 不是CauseHolder類型,之前在定義失敗異常的時(shí)候就是使用這個(gè)類的對(duì)象創(chuàng)建的標(biāo)記余寥,從而代表結(jié)束運(yùn)行但是是被取消的所以不能算是完成
return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder);
}
//是否取消
@Override
public boolean isCancellable() {
return result == null;
}
//獲取執(zhí)行異常
@Override
public Throwable cause() {
Object result = this.result;
//如果當(dāng)前result是CauseHolder則代表存在異常則將result轉(zhuǎn)為CauseHolder并且調(diào)用cause屬性返回否則返回null
return (result instanceof CauseHolder) ? ((CauseHolder) result).cause : null;
}
//添加監(jiān)聽(tīng)器
@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
checkNotNull(listener, "listener");
//鎖住當(dāng)前對(duì)象
synchronized (this) {
//添加監(jiān)聽(tīng)器
addListener0(listener);
}
//是否完成了當(dāng)前的任務(wù)辖试,如果完成則進(jìn)行通知
if (isDone()) {
notifyListeners();
}
//最后返回當(dāng)前對(duì)象
return this;
}
//添加多個(gè)監(jiān)聽(tīng)器
@Override
public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
checkNotNull(listeners, "listeners");
//鎖住當(dāng)前對(duì)象
synchronized (this) {
//遍歷當(dāng)前傳入的監(jiān)聽(tīng)器如果是null則跳出循環(huán)。
for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
if (listener == null) {
break;
}
addListener0(listener);
}
}
//如果任務(wù)執(zhí)行成功則直接進(jìn)行通知
if (isDone()) {
notifyListeners();
}
return this;
}
//刪除監(jiān)聽(tīng)器
@Override
public Promise<V> removeListener(final GenericFutureListener<? extends Future<? super V>> listener) {
checkNotNull(listener, "listener");
//鎖住當(dāng)前對(duì)象
synchronized (this) {
//進(jìn)行監(jiān)聽(tīng)器的刪除
removeListener0(listener);
}
return this;
}
//同上 只不過(guò)監(jiān)聽(tīng)器是多個(gè)并且進(jìn)行的監(jiān)聽(tīng)器的遍歷去刪除
@Override
public Promise<V> removeListeners(final GenericFutureListener<? extends Future<? super V>>... listeners) {
checkNotNull(listeners, "listeners");
synchronized (this) {
for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
if (listener == null) {
break;
}
removeListener0(listener);
}
}
return this;
}
//從此方法起將不再解釋前面定義的方法的含義劈狐,如果有疑問(wèn)的讀者可以去前面查看定義
@Override
public Promise<V> await() throws InterruptedException {
//如果當(dāng)前的任務(wù)已經(jīng)執(zhí)行完則返回this
if (isDone()) {
return this;
}
//定義的時(shí)候說(shuō)過(guò)await如果發(fā)生了中斷則會(huì)拋出異常,這里判斷當(dāng)前前程是否中斷呐馆,如果中斷則拋出異常
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
//檢查是否死鎖肥缔,下面將這個(gè)方法的時(shí)候在細(xì)說(shuō)
checkDeadLock();
//當(dāng)前線程鎖住當(dāng)前的代碼塊,其他線程不可訪問(wèn)
synchronized (this) {
//是否成功汹来,如果并沒(méi)有成功則進(jìn)入該while续膳,如果成功則返回this
while (!isDone()) {
//之前說(shuō)過(guò)waiters字段用來(lái)記錄等待的線程,此處是對(duì)waiters字段進(jìn)行+1操作
incWaiters();
try {
//當(dāng)前對(duì)象進(jìn)行等待
wait();
} finally {
//等待結(jié)束或者被喚醒則進(jìn)行-1操作
decWaiters();
}
}
}
return this;
}
//與上方方法解釋相同只不過(guò)如果被中斷了不會(huì)拋出異常收班,而是嘗試中斷當(dāng)前的線程坟岔。
@Override
public Promise<V> awaitUninterruptibly() {
if (isDone()) {
return this;
}
checkDeadLock();
boolean interrupted = false;
synchronized (this) {
while (!isDone()) {
incWaiters();
try {
wait();
} catch (InterruptedException e) {
// Interrupted while waiting.
interrupted = true;
} finally {
decWaiters();
}
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
return this;
}
//await加強(qiáng)版,支持設(shè)置等到時(shí)長(zhǎng)摔桦,這里講傳入的時(shí)長(zhǎng)轉(zhuǎn)換為了納秒
@Override
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
return await0(unit.toNanos(timeout), true);
}
//傳入的試毫秒轉(zhuǎn)為納秒
@Override
public boolean await(long timeoutMillis) throws InterruptedException {
return await0(MILLISECONDS.toNanos(timeoutMillis), true);
}
//與上方方法相同只不過(guò)將拋出的中斷異常轉(zhuǎn)為了內(nèi)部錯(cuò)誤社付,在定義的時(shí)候就有說(shuō)過(guò)此方法不會(huì)拋出中斷異常
@Override
public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
try {
return await0(unit.toNanos(timeout), false);
} catch (InterruptedException e) {
// Should not be raised at all.
throw new InternalError();
}
}
//與上方方法相同
@Override
public boolean awaitUninterruptibly(long timeoutMillis) {
try {
return await0(MILLISECONDS.toNanos(timeoutMillis), false);
} catch (InterruptedException e) {
// Should not be raised at all.
throw new InternalError();
}
}
//獲取當(dāng)前結(jié)果非阻塞承疲,如果當(dāng)前值是異常或者是SUCCESS或者UNCANCELLABLE則返回null否則返回當(dāng)前值
@SuppressWarnings("unchecked")
@Override
public V getNow() {
Object result = this.result;
if (result instanceof CauseHolder || result == SUCCESS || result == UNCANCELLABLE) {
return null;
}
return (V) result;
}
//取消當(dāng)前任務(wù)執(zhí)行鸥咖,并且嘗試中斷燕鸽,但是當(dāng)前方法并沒(méi)有嘗試中斷所以傳參則無(wú)用。
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
//設(shè)置當(dāng)前result的值為CANCELLATION_CAUSE_HOLDER啼辣,取消異常
if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) {
//設(shè)置成功則檢查并喚醒之前wait中等待的線程
checkNotifyWaiters();
//通知所有的監(jiān)聽(tīng)器
notifyListeners();
return true;
}
//取消失敗則返回false說(shuō)明當(dāng)前result已經(jīng)被設(shè)置成其他的結(jié)果
return false;
}
//是否取消
@Override
public boolean isCancelled() {
return isCancelled0(result);
}
//是否成功
@Override
public boolean isDone() {
return isDone0(result);
}
//同步等待調(diào)用了之前wait方法啊研。如果失敗則嘗試拋出異常
@Override
public Promise<V> sync() throws InterruptedException {
await();
rethrowIfFailed();
return this;
}
//與上方方法一樣只不過(guò)這里不會(huì)拋出中斷異常
@Override
public Promise<V> syncUninterruptibly() {
awaitUninterruptibly();
rethrowIfFailed();
return this;
}
//打印當(dāng)前任務(wù)的狀態(tài)
@Override
public String toString() {
return toStringBuilder().toString();
}
//封裝當(dāng)前任務(wù)的狀態(tài)
protected StringBuilder toStringBuilder() {
StringBuilder buf = new StringBuilder(64)
.append(StringUtil.simpleClassName(this))
.append('@')
.append(Integer.toHexString(hashCode()));
Object result = this.result;
if (result == SUCCESS) {
buf.append("(success)");
} else if (result == UNCANCELLABLE) {
buf.append("(uncancellable)");
} else if (result instanceof CauseHolder) {
buf.append("(failure: ")
.append(((CauseHolder) result).cause)
.append(')');
} else if (result != null) {
buf.append("(success: ")
.append(result)
.append(')');
} else {
buf.append("(incomplete)");
}
return buf;
}
//獲取傳入的執(zhí)行器
protected EventExecutor executor() {
return executor;
}
//之前用到的檢查死鎖方法,就是檢查當(dāng)前調(diào)用方法的線程是不是執(zhí)行器的線程如果是則說(shuō)明發(fā)生了死鎖需要拋出異常停止死鎖操作
//獲取執(zhí)行器鸥拧,如果執(zhí)行器為null則不會(huì)發(fā)生死鎖党远,如果不是null則判斷當(dāng)前線程是否是執(zhí)行器線程,inEventLoop此方法的定義在之前有講解過(guò)富弦,遺忘的同學(xué)可以看看之前的文章沟娱。
protected void checkDeadLock() {
EventExecutor e = executor();
if (e != null && e.inEventLoop()) {
throw new BlockingOperationException(toString());
}
}
//通知所有的監(jiān)聽(tīng)器
//eventExecutor 通知監(jiān)聽(tīng)器的執(zhí)行器
//future 需要通知的任務(wù)
//listener 需要通知的監(jiān)聽(tīng)器
protected static void notifyListener(
EventExecutor eventExecutor, final Future<?> future, final GenericFutureListener<?> listener) {
//下面三個(gè)方法的調(diào)用用于判斷傳入的三個(gè)參數(shù)是否為null如果是則拋出nullpoint異常
checkNotNull(eventExecutor, "eventExecutor");
checkNotNull(future, "future");
checkNotNull(listener, "listener");
//調(diào)用其他通知的方法此方法是防止棧溢出的,因?yàn)橹拔覀冊(cè)诙x屬性中有個(gè)MAX_LISTENER_STACK_DEPTH的定義此方法就是用到了他舆声,下面在詳細(xì)介紹
notifyListenerWithStackOverFlowProtection(eventExecutor, future, listener);
}
//通知監(jiān)聽(tīng)器
private void notifyListeners() {
//獲取當(dāng)前任務(wù)的的執(zhí)行器
EventExecutor executor = executor();
//如果調(diào)用這個(gè)方法的線程就是執(zhí)行器的線程則進(jìn)入該if
if (executor.inEventLoop()) {
//獲取當(dāng)前線程的InternalThreadLocalMap對(duì)象此對(duì)象是netty聲明的到時(shí)候說(shuō)線程的時(shí)候?qū)?huì)講解花沉,這里暫時(shí)知道有這么個(gè)東西就行。
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
//通過(guò)線程的數(shù)據(jù)對(duì)象或去到當(dāng)前的任務(wù)監(jiān)聽(tīng)器通知的層次媳握,如果是第一次通知?jiǎng)t為0
final int stackDepth = threadLocals.futureListenerStackDepth();
//當(dāng)前的線程數(shù)據(jù)中的層次與我們?cè)O(shè)置的最大層次相比碱屁,如果當(dāng)前層次小于設(shè)置的最大層則進(jìn)入if
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
//進(jìn)入后再層次中+1
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
//并且立即通知
notifyListenersNow();
} finally {
//如果通知完成則還原深度,可以理解為又進(jìn)行了減一
threadLocals.setFutureListenerStackDepth(stackDepth);
}
//結(jié)束
return;
}
}
//如果當(dāng)前線程不是執(zhí)行器或者當(dāng)前的線程深度已經(jīng)大于了設(shè)置的最大深度蛾找,則使用當(dāng)前的執(zhí)行器進(jìn)行通知
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}
//此方法和上方的方法相同但是上方的通知是使用當(dāng)前任務(wù)的監(jiān)聽(tīng)器而此處使用的是傳入的監(jiān)聽(tīng)器娩脾,可能監(jiān)聽(tīng)器會(huì)發(fā)生改變所以沒(méi)有使用當(dāng)前任務(wù)的字段做緩存,因?yàn)樽隽司彺嫔戏酱a是可以復(fù)用的打毛。既然邏輯一樣那么這里就不進(jìn)行介紹了根據(jù)上方源碼進(jìn)行解讀即可
private static void notifyListenerWithStackOverFlowProtection(final EventExecutor executor,
final Future<?> future,
final GenericFutureListener<?> listener) {
if (executor.inEventLoop()) {
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
//此處與上方有差異柿赊,上方調(diào)用notifyListenersNow,暫且知道下一個(gè)方法就是對(duì)他講解幻枉,稍安勿躁碰声。
notifyListener0(future, listener);
} finally {
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListener0(future, listener);
}
});
}
//看到很多處都在調(diào)用這個(gè)通知方法,那么接下來(lái)就是對(duì)他的講解
private void notifyListenersNow() {
//創(chuàng)建了方法內(nèi)部的局部變量
Object listeners;
//使用this作為線程鎖熬甫,并且上鎖
synchronized (this) {
// 如果當(dāng)前任務(wù)并沒(méi)有通知并且是有監(jiān)聽(tīng)器的則進(jìn)行接下來(lái)的邏輯胰挑,否則return。
if (notifyingListeners || this.listeners == null) {
return;
}
//通知只能通知一次既然當(dāng)前線程已經(jīng)到這里了那么接下來(lái)的線程就在上一個(gè)if停止就是了(當(dāng)然代表當(dāng)前線程已經(jīng)釋放了這個(gè)this鎖)椿肩,因?yàn)檫@里設(shè)置了通知狀態(tài)為true瞻颂,代表正在通知
notifyingListeners = true;
//并且將當(dāng)前內(nèi)部屬性賦值給剛才的局部變量
listeners = this.listeners;
//然后將內(nèi)部屬性設(shè)置為null,因?yàn)檎谕ㄖ獱顟B(tài)如果通知完成將會(huì)修改回來(lái)所以這里置為null則為了保證第二個(gè)條件成立
this.listeners = null;
}
//循環(huán)調(diào)用進(jìn)行通知
for (;;) {
//這里對(duì)監(jiān)聽(tīng)器做了兩個(gè)處理第一個(gè)是當(dāng)前監(jiān)聽(tīng)器是一個(gè)列表代表多個(gè)監(jiān)聽(tīng)器第二個(gè)則代表當(dāng)前監(jiān)聽(tīng)器是一個(gè)監(jiān)聽(tīng)器郑象,不一樣的數(shù)據(jù)結(jié)構(gòu)對(duì)應(yīng)不一樣的處理贡这。
if (listeners instanceof DefaultFutureListeners) {
notifyListeners0((DefaultFutureListeners) listeners);
} else {
notifyListener0(this, (GenericFutureListener<?>) listeners);
}
//通知完成后繼續(xù)上鎖
synchronized (this) {
//如果當(dāng)前的監(jiān)聽(tīng)器已經(jīng)重置為null則設(shè)置正在通知的狀態(tài)結(jié)束,否則設(shè)置當(dāng)前的局部變量為當(dāng)前的監(jiān)聽(tīng)器然后設(shè)置當(dāng)前監(jiān)聽(tīng)器為null
if (this.listeners == null) {
notifyingListeners = false;
return;
}
//可能會(huì)在通知的時(shí)候又有新的監(jiān)聽(tīng)器進(jìn)來(lái)所以這里再次設(shè)置了
listeners = this.listeners;
this.listeners = null;
}
}
//這里對(duì)此方法進(jìn)行一個(gè)小結(jié): 這里使用了兩個(gè)地方用鎖而且他們的鎖是一樣的所以會(huì)出現(xiàn)競(jìng)爭(zhēng)問(wèn)題厂榛,如果第一個(gè)線程進(jìn)來(lái)并且設(shè)置為正在發(fā)送通知那么剩下的線程都不會(huì)再繼續(xù)執(zhí)行并且當(dāng)前的監(jiān)聽(tīng)器是null的 如果通過(guò)別的途徑再次添加了監(jiān)聽(tīng)器并且當(dāng)前的通知還是正在通知的狀態(tài)那么其他的線程還是進(jìn)不來(lái)盖矫,但是當(dāng)前的線程執(zhí)行完通知會(huì)發(fā)現(xiàn)當(dāng)前的監(jiān)聽(tīng)器又發(fā)生了變化丽惭,那么這個(gè)for的死循環(huán)再次執(zhí)行,因?yàn)榘l(fā)現(xiàn)又有新的通知所以當(dāng)前還是正在發(fā)送通知狀態(tài)炼彪,所以其他線程還是進(jìn)不來(lái)吐根,最終還是由當(dāng)前線程進(jìn)行執(zhí)行。而在講述notifyListenerWithStackOverFlowProtection的時(shí)候說(shuō)過(guò)監(jiān)聽(tīng)器發(fā)生改變所以不能復(fù)用的問(wèn)題辐马,而這里就處理如果當(dāng)前的監(jiān)聽(tīng)器發(fā)送改變的處理拷橘。
}
//這里進(jìn)行通知數(shù)組類型的監(jiān)聽(tīng)器
private void notifyListeners0(DefaultFutureListeners listeners) {
//首先獲取到傳入監(jiān)聽(tīng)器內(nèi)部包含的數(shù)組
GenericFutureListener<?>[] a = listeners.listeners();
//然后進(jìn)行遍歷通知遍歷中的監(jiān)聽(tīng)器
//而且要注意此方法是私有的那么就代表除了使用它可以進(jìn)行遍歷以外其他的繼承只能一個(gè)一個(gè)發(fā)同通知,具體的看實(shí)現(xiàn)邏輯
int size = listeners.size();
for (int i = 0; i < size; i ++) {
notifyListener0(this, a[i]);
}
}
//上方遍歷調(diào)用的也是他喜爷,而傳入多個(gè)參數(shù)的也是他冗疮,最終發(fā)送消息的也是他,此方法比較強(qiáng)檩帐,但是非常簡(jiǎn)單就是調(diào)用了監(jiān)聽(tīng)器的操作完成方法并且傳入對(duì)于的任務(wù)數(shù)據(jù)术幔。
@SuppressWarnings({ "unchecked", "rawtypes" })
private static void notifyListener0(Future future, GenericFutureListener l) {
try {
l.operationComplete(future);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
}
}
}
//此處添加監(jiān)聽(tīng)器
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
//如果是null則說(shuō)明這是第一個(gè)監(jiān)聽(tīng)器那么直接將其賦值給當(dāng)前的全局變量
if (listeners == null) {
listeners = listener;
//否則說(shuō)明不是第一個(gè)監(jiān)聽(tīng)器那么就判斷是不是數(shù)組類型的監(jiān)聽(tīng)器如果是則add加進(jìn)去就行了
} else if (listeners instanceof DefaultFutureListeners) {
((DefaultFutureListeners) listeners).add(listener);
} else {
//如果當(dāng)監(jiān)聽(tīng)器不是數(shù)組類型并且當(dāng)前添加的不是第一次所以修改當(dāng)前局部變量為數(shù)組類型的監(jiān)聽(tīng)器并且傳入兩個(gè)已知的監(jiān)聽(tīng)器
listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
}
}
//刪除監(jiān)聽(tīng)器,非常簡(jiǎn)單如果是數(shù)組類型那么直接從數(shù)組中移除如果不是數(shù)組類型那么就置為null
private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) {
if (listeners instanceof DefaultFutureListeners) {
((DefaultFutureListeners) listeners).remove(listener);
} else if (listeners == listener) {
listeners = null;
}
}
//設(shè)置當(dāng)前任務(wù)的結(jié)果為成功湃密,如果傳入的結(jié)果是是null則設(shè)置為SUCCESS否則設(shè)置為傳入的result
private boolean setSuccess0(V result) {
return setValue0(result == null ? SUCCESS : result);
}
//設(shè)置當(dāng)前任務(wù)結(jié)果為失敗诅挑,傳入一個(gè)異常信息
private boolean setFailure0(Throwable cause) {
return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
}
//設(shè)置值得方法
private boolean setValue0(Object objResult) {
//如果當(dāng)前結(jié)果是null則修改為傳入的結(jié)果或者當(dāng)前結(jié)果是UNCANCELLABLE不可取消狀態(tài)則設(shè)置傳入的結(jié)果兩者有一個(gè)成功則進(jìn)行通知檢查,這里的通知不是監(jiān)聽(tīng)器的通知
//而是對(duì)前面wait等待線程的通知并且返回true
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
checkNotifyWaiters();
return true;
}
return false;
}
//檢查并且通知喚醒泛源,如果等待的線程大于0則進(jìn)行全部喚醒
private synchronized void checkNotifyWaiters() {
if (waiters > 0) {
notifyAll();
}
}
//當(dāng)有線程等待時(shí)進(jìn)行加一
private void incWaiters() {
if (waiters == Short.MAX_VALUE) {
throw new IllegalStateException("too many waiters: " + this);
}
++waiters;
}
//線程被喚醒的時(shí)候則減一
private void decWaiters() {
--waiters;
}
//拋出失敗異常拔妥,之前在同步等待結(jié)果的時(shí)候使用過(guò)當(dāng)?shù)牡浇Y(jié)果后調(diào)用此方法判斷是否有異常如果有則拋出否則什么都不做。
private void rethrowIfFailed() {
Throwable cause = cause();
if (cause == null) {
return;
}
PlatformDependent.throwException(cause);
}
//之前調(diào)用過(guò)的等待方法达箍,傳入兩個(gè)參數(shù)第一個(gè)等待的納秒時(shí)間没龙,第二個(gè)是否中斷拋出異常
private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
//如果執(zhí)行成功那么直接返回等待結(jié)果為true
if (isDone()) {
return true;
}
//否則判斷當(dāng)前傳入的時(shí)間是否小于等于0 如果是則返回當(dāng)前執(zhí)行結(jié)果是否為成功
if (timeoutNanos <= 0) {
return isDone();
}
//判斷是否允許拋出中斷異常,并且判斷當(dāng)前線程是否被中斷如果兩者都成立則拋出中斷異常
if (interruptable && Thread.interrupted()) {
throw new InterruptedException(toString());
}
//檢查是否為死鎖缎玫,內(nèi)部實(shí)現(xiàn)請(qǐng)查看上方的方法解釋
checkDeadLock();
//獲取當(dāng)前的納秒時(shí)間
long startTime = System.nanoTime();
//用戶設(shè)置的等到時(shí)間
long waitTime = timeoutNanos;
//是否中斷
boolean interrupted = false;
try {
//死循環(huán)
for (;;) {
//上鎖
synchronized (this) {
//是否執(zhí)行成功
if (isDone()) {
return true;
}
//等待線程數(shù)+1
incWaiters();
try {
//使用wait進(jìn)行等待,因?yàn)閣ait傳入?yún)?shù)是毫秒而這里是納秒所以這里做了處理
//1硬纤、獲取納秒數(shù)中的毫秒傳入第一個(gè)參數(shù)
//2、獲取剩余的那納秒數(shù)作為第二個(gè)參數(shù)
//wait 第一個(gè)參數(shù)是毫秒數(shù) 第二個(gè)參數(shù)是納秒數(shù)赃磨,看起來(lái)比較精準(zhǔn)其實(shí)jdk只是發(fā)現(xiàn)有納秒數(shù)后對(duì)毫秒數(shù)進(jìn)行了+1 具體讀者可以去看wait源碼
wait(waitTime / 1000000, (int) (waitTime % 1000000));
} catch (InterruptedException e) {
//如果出現(xiàn)中斷異常那么判斷傳入的第二個(gè)參數(shù)是否拋出異常如果為true此處則拋出異常否則修改前面聲明的變量為true
if (interruptable) {
throw e;
} else {
interrupted = true;
}
} finally {
//不管最終如何都會(huì)對(duì)waiters進(jìn)行-1操作
decWaiters();
}
}
//能到這里說(shuō)明已經(jīng)被喚醒則判斷是否執(zhí)行成功筝家,執(zhí)行成功則返回true
if (isDone()) {
return true;
} else {
//否則判斷當(dāng)前睡眠時(shí)間是否超過(guò)設(shè)置時(shí)間如果超過(guò)則返回當(dāng)前的執(zhí)行結(jié)果,否則繼續(xù)循環(huán)
waitTime = timeoutNanos - (System.nanoTime() - startTime);
if (waitTime <= 0) {
return isDone();
}
}
}
} finally {
//當(dāng)跳出循環(huán)后判斷在等待過(guò)程中是否發(fā)生了中斷異常如果發(fā)生則將當(dāng)前線程進(jìn)行中斷
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
//通知進(jìn)度監(jiān)聽(tīng)器邻辉,就是監(jiān)聽(tīng)的進(jìn)度條可以這么理解肛鹏,第一個(gè)參數(shù)是當(dāng)前的進(jìn)度,第二個(gè)參數(shù)是總的進(jìn)度
@SuppressWarnings("unchecked")
void notifyProgressiveListeners(final long progress, final long total) {
//從當(dāng)前的監(jiān)聽(tīng)器中獲取到進(jìn)度監(jiān)聽(tīng)器如果沒(méi)有則return否則繼續(xù)執(zhí)行
final Object listeners = progressiveListeners();
if (listeners == null) {
return;
}
//對(duì)應(yīng)進(jìn)度監(jiān)聽(tīng)器的自然是進(jìn)度的任務(wù)管理所以會(huì)將當(dāng)前的this轉(zhuǎn)為進(jìn)度管理器self
final ProgressiveFuture<V> self = (ProgressiveFuture<V>) this;
//獲取通知處理器并且判斷當(dāng)前的線程是否是內(nèi)部的處理器恩沛。
EventExecutor executor = executor();
if (executor.inEventLoop()) {
//如果是則判斷是否是數(shù)組監(jiān)聽(tīng)器如果是則調(diào)用notifyProgressiveListeners0進(jìn)行通知否則調(diào)用notifyProgressiveListener0進(jìn)行通知
//他倆的區(qū)別就在于監(jiān)聽(tīng)器是否是數(shù)組
if (listeners instanceof GenericProgressiveFutureListener[]) {
notifyProgressiveListeners0(
self, (GenericProgressiveFutureListener<?>[]) listeners, progress, total);
} else {
notifyProgressiveListener0(
self, (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners, progress, total);
}
} else {
//如果當(dāng)前的線程不是內(nèi)部的處理器線程那么走這里
//判斷當(dāng)前的監(jiān)聽(tīng)器是否是數(shù)組監(jiān)聽(tīng)器
//如果是則創(chuàng)建一個(gè)Runnable內(nèi)部還是調(diào)用的notifyProgressiveListeners0方法只不過(guò)這里將通知的方法當(dāng)做一個(gè)執(zhí)行器中的任務(wù)丟給他叫他去執(zhí)行
//這里和上方的區(qū)別就在于如果是是當(dāng)前線程則直接執(zhí)行否則使用執(zhí)行器執(zhí)行
if (listeners instanceof GenericProgressiveFutureListener[]) {
final GenericProgressiveFutureListener<?>[] array =
(GenericProgressiveFutureListener<?>[]) listeners;
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyProgressiveListeners0(self, array, progress, total);
}
});
} else {
//如果不是線程則直接提交一個(gè)任務(wù)給當(dāng)前的執(zhí)行器執(zhí)行調(diào)用方法是notifyProgressiveListener0
final GenericProgressiveFutureListener<ProgressiveFuture<V>> l =
(GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners;
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyProgressiveListener0(self, l, progress, total);
}
});
}
}
}
//獲取進(jìn)度監(jiān)聽(tīng)器列表,因?yàn)槿蝿?wù)中只有一個(gè)字段存儲(chǔ)監(jiān)聽(tīng)器所以需要從該字段中進(jìn)行篩選缕减,此方法就是對(duì)這個(gè)字段進(jìn)行類的篩選
private synchronized Object progressiveListeners() {
//獲取當(dāng)前任務(wù)的監(jiān)聽(tīng)器,這里之所以使用一個(gè)臨時(shí)變量進(jìn)行接收是害怕其他線程如果修改了監(jiān)聽(tīng)器那么下面的處理會(huì)出現(xiàn)未知異常雷客,所以為了保證不出錯(cuò)此處將監(jiān)聽(tīng)器做了處理。
Object listeners = this.listeners;
if (listeners == null) {
//如果等null那就說(shuō)明沒(méi)有監(jiān)聽(tīng)器則退出方法
return null;
}
//判斷監(jiān)聽(tīng)器的類型是否為數(shù)組
if (listeners instanceof DefaultFutureListeners) {
//如果是數(shù)組類型則將其轉(zhuǎn)換為數(shù)組類型
DefaultFutureListeners dfl = (DefaultFutureListeners) listeners;
//并且獲取進(jìn)度監(jiān)聽(tīng)器在數(shù)組中的存在數(shù)量
int progressiveSize = dfl.progressiveSize();
//數(shù)組等于0則返回null如果等于1則遍歷它里面的監(jiān)聽(tīng)器是否是進(jìn)度監(jiān)聽(tīng)器桥狡,如果是則返回否則返回null
//這里算是一個(gè)優(yōu)化點(diǎn)但是case 1的時(shí)候并不是優(yōu)化因?yàn)闆](méi)有必要去遍歷了 直接下表取值就是了搅裙。
switch (progressiveSize) {
case 0:
return null;
case 1:
for (GenericFutureListener<?> l: dfl.listeners()) {
if (l instanceof GenericProgressiveFutureListener) {
return l;
}
}
return null;
}
//如果大于1那么獲取數(shù)組列表
GenericFutureListener<?>[] array = dfl.listeners();
//并且創(chuàng)建一個(gè)進(jìn)度監(jiān)聽(tīng)器數(shù)組列表
GenericProgressiveFutureListener<?>[] copy = new GenericProgressiveFutureListener[progressiveSize];
//遍歷前面獲取監(jiān)聽(tīng)的個(gè)數(shù)動(dòng)態(tài)比較當(dāng)前下標(biāo)的監(jiān)聽(tīng)器是否是進(jìn)度監(jiān)聽(tīng)器并且給上面創(chuàng)建的數(shù)組賦值
for (int i = 0, j = 0; j < progressiveSize; i ++) {
GenericFutureListener<?> l = array[i];
if (l instanceof GenericProgressiveFutureListener) {
copy[j ++] = (GenericProgressiveFutureListener<?>) l;
}
}
//將遍歷結(jié)果返回
return copy;
} else if (listeners instanceof GenericProgressiveFutureListener) {
//如果不是數(shù)組類型并且類型是進(jìn)度監(jiān)聽(tīng)器則直接返回當(dāng)前任務(wù)的監(jiān)聽(tīng)器
return listeners;
} else {
//上面過(guò)濾的大多情況但是還有一個(gè)情況那就是如果只有一個(gè)監(jiān)聽(tīng)器并且不是進(jìn)度監(jiān)聽(tīng)器這種情況走這里
return null;
}
//此方法已經(jīng)講完了皱卓,感覺(jué)寫(xiě)的不盡人意,之前switch處的遍歷和獲取進(jìn)度監(jiān)聽(tīng)器的地方都是可以優(yōu)化下的部逮,獲取監(jiān)聽(tīng)器的地方直接放到GenericFutureListener類中這樣可以代碼復(fù)用或者說(shuō)傳入一個(gè)class然后獲取對(duì)應(yīng)的監(jiān)聽(tīng)器娜汁,以后不管擴(kuò)展多少的監(jiān)聽(tīng)器都可以直接獲取,這里算是一個(gè)小結(jié)兄朋。
}
//通知進(jìn)度監(jiān)聽(tīng)器參數(shù) 傳入監(jiān)聽(tīng)器數(shù)組 當(dāng)前的進(jìn)度 總進(jìn)度
private static void notifyProgressiveListeners0(
ProgressiveFuture<?> future, GenericProgressiveFutureListener<?>[] listeners, long progress, long total) {
for (GenericProgressiveFutureListener<?> l: listeners) {
if (l == null) {
break;
}
notifyProgressiveListener0(future, l, progress, total);
}
}
//具體進(jìn)度監(jiān)聽(tīng)器的調(diào)用就是個(gè)方法的調(diào)用看過(guò)前面講解的讀者應(yīng)該能看懂這里再不做解釋掐禁,后續(xù)會(huì)有專門(mén)一節(jié)介紹netty的監(jiān)聽(tīng)器
@SuppressWarnings({ "unchecked", "rawtypes" })
private static void notifyProgressiveListener0(
ProgressiveFuture future, GenericProgressiveFutureListener l, long progress, long total) {
try {
l.operationProgressed(future, progress, total);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationProgressed()", t);
}
}
}
//此方法是私有的靜態(tài)方法所以獲取不到當(dāng)前任務(wù)的結(jié)果所以需要調(diào)用者傳入
private static boolean isCancelled0(Object result) {
//如果結(jié)果類型是CauseHolder并且結(jié)果還是取消異常那么則返回true
return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
}
//也是靜態(tài)私有的判斷是否執(zhí)行完成
private static boolean isDone0(Object result) {
//傳入結(jié)果不等于null 并且 不是不能取消(因?yàn)椴荒苋∠麆t說(shuō)明正在運(yùn)行,而不管是SUCCESS 還 CANCELLATION_CAUSE_HOLDER 都是已經(jīng)有確切結(jié)果的)
return result != null && result != UNCANCELLABLE;
}
//前面一直使用的異常存儲(chǔ)的的類很簡(jiǎn)單就一個(gè)異常類存儲(chǔ)的字段颅和,而在之前也有很多比較都是根據(jù)這個(gè)字段進(jìn)行的
private static final class CauseHolder {
final Throwable cause;
CauseHolder(Throwable cause) {
this.cause = cause;
}
}
//使用傳入的執(zhí)行器進(jìn)行execute方法的調(diào)用
private static void safeExecute(EventExecutor executor, Runnable task) {
try {
executor.execute(task);
} catch (Throwable t) {
rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);
}
}
}
總結(jié):對(duì)于DefaultPromise的實(shí)現(xiàn)到此基本完成了傅事,接下來(lái)講解DefaultPromise的實(shí)現(xiàn),可能讀者到這里會(huì)有疑惑我在寫(xiě)講解的時(shí)候使用到任務(wù)這一詞峡扩,但是這個(gè)實(shí)現(xiàn)里面并沒(méi)有這個(gè)東西蹭越,因?yàn)镕uture的定義就是任務(wù)管理員這樣的存在,這個(gè)類中都是對(duì)任務(wù)的管理并沒(méi)有實(shí)質(zhì)性的操作教届,而且有future管理的任務(wù)都是針對(duì)異步來(lái)說(shuō)的响鹃,這也是future存在的意義就是管理異步任務(wù)使用的一般用法都是實(shí)現(xiàn)Runnable進(jìn)行管理他這樣就可以將一個(gè)future當(dāng)做一個(gè)線程的執(zhí)行任務(wù)去執(zhí)行但是并沒(méi)有返回值,所以一般又會(huì)使用Callable接口案训,具體使用看下一張對(duì)DefaultPromise的實(shí)現(xiàn)PromiseTask算是一個(gè)標(biāo)準(zhǔn)的Future的實(shí)現(xiàn)买置。