Netty的線程結構圖
? ? ?可以從上圖看出netty的線程實現(xiàn)由很多種但是他們都是來自于jdk自帶的Executor和ExecutorService震叮。
? ? ?這里稍微提一下Executor他很簡單只有一個execut方法而這個方法只有一個參數(shù)是Runnable,對于開發(fā)者這個類型很熟悉所有線程的執(zhí)行都需要有他鳍鸵,并且該接口也只有一個方法就是run()苇瓣。
? ? ?上文大概說了下Executor的結構,這里說下他的定義偿乖,他是一個執(zhí)行器的定義击罪,因為從圖中可以看出他是一個接口哲嘲。
ExecutorService是一個服務可以當做管理工具,他管理了執(zhí)行器的創(chuàng)建和停止媳禁,既然是執(zhí)行器服務那么就代表著他是可以管理多個執(zhí)行器的否則也就沒有意義眠副,但是也是一個接口對于他的定義如下。
//停止執(zhí)行器竣稽,雖然停止但是會將在停止前已經(jīng)存在的任務將會嘗試停止如果失敗則繼續(xù)執(zhí)行完成為止囱怕。
void shutdown();
//停止執(zhí)行器,相比上方的停止這個就暴力一些毫别,立馬停止并且取消停止前已經(jīng)存在的任務并且嘗試關閉正在執(zhí)行的任務娃弓。
List<Runnable> shutdownNow();
//執(zhí)行器服務是否停止服務(由子類實現(xiàn)決定)
boolean isShutdown();
//該方法是對shutdown或shutdownNow的狀態(tài)獲取是否終止完成。如果沒有調用這兩個方法那么此方法永遠是false(具體情況有子類決定)
boolean isTerminated();
//等待終止服務岛宦,此方法不會有任何的操作僅僅是等待任務的執(zhí)行忘闻,會有兩個結果1、要么執(zhí)行結束2恋博、要么執(zhí)行時間超過操作時間
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
//提交一個可以帶返回值的任務(下方會詳細介紹Future這個類)
//Callable只有一個方法 V call(),F(xiàn)uture會默認將他的返回存到自己的成員變量中返回私恬,調用get方法獲取
<T> Future<T> submit(Callable<T> task);
//提交一個Runnable任務會默認的將傳入的result存入到一個Future中在進行返回
<T> Future<T> submit(Runnable task, T result);
//提交一個Runnable任務會默認將null存入Future的成員變量中通過get獲得
Future<?> submit(Runnable task);
//傳入一個Callable類型集合并且等到集合處理完成一起返回债沮,最終返回一個對應結果的Future集合。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
//相比上方此方法設置了超時時間本鸣,等待任務的超時疫衩,如果超時則進行任務的取消處理,繼續(xù)會放回對應的結果列表但是如果是被取消的結果則會拋出取消異常荣德,這代表著如果在不缺點是否成功的時候需要調用isCancelled方法此方法校驗當前的Future是否取消如果取消了則不要調用get因為會出異常闷煤。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException;
//傳入指定的任務集合只要其中任何一個任務完成則取消其他任務并且返回完成的任務的返回值,這里并不是Future而是Callable的返回類型
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
//傳入指定的任務集合只要啟動任何一個任務完成則取消其他的任務執(zhí)行并且返回第一執(zhí)行完成的任務結果涮瞻,如果連第一個任務執(zhí)行都超時了那么不會返回結果會直接拋出TimeOut的超時異常鲤拿。
<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
ExecutorService的定義到此結束了。接下來講述他依賴的類型Future和Callable署咽。
//這個接口非常簡單定義一個call方法此方法只有兩個結果要么拋出業(yè)務異常近顷,要么返回計算的結果,而結果類型則是創(chuàng)建時設置的泛型V宁否。
public interface Callable<V> {
V call() throws Exception;
}
public interface Future<V> {
//嘗試取消任務執(zhí)行窒升,如果取消失敗則可能是任務已完成或者已經(jīng)取消或其他原因導致失敗,如果成功分為兩種情況1慕匠、任務未運行則永遠不會執(zhí)行2饱须、任務已經(jīng)執(zhí)行mayInterruptIfRunning需要判斷他是否為true如果是則嘗試中斷執(zhí)行。
boolean cancel(boolean mayInterruptIfRunning);
//如果是主動取消的則返回true,否則任何形式的結束此方法都是false
boolean isCancelled();
//在任何情況下都會返回true台谊,除非是新new的調用此方法是false具體看子類狀態(tài)的實現(xiàn)蓉媳。
boolean isDone();
//等待計算完成獲取結果譬挚,有等待就代表是阻塞的。如果執(zhí)行異常則會拋出
V get() throws InterruptedException, ExecutionException;
//等待指定的時間獲取結果如果超時則拋出TimeoutException督怜,如果執(zhí)行異常則會拋出
V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException,TimeoutException;
}
這里介紹完了ExecutorService的結構和他依賴的結構殴瘦,因為都是定義所以就只用知道他們是干嘛的使用者又是怎么組合這些定義最終完成什么功能的,所以接下來還是定義号杠。蚪腋。。姨蟋,不過是關于ScheduledExecutorService的定義屉凯,細心的讀者會發(fā)現(xiàn)筆者的講解流程是根據(jù)線程池的結構圖一層一層講解的。
//可以看出他繼承與ExecutorService眼溶,并且對ExecutorService的結構進行了擴展
public interface ScheduledExecutorService extends ExecutorService {
//創(chuàng)建一個延遲執(zhí)行的任務悠砚,此任務延遲時間有delay與unit控制,返回值Future的get方法返回null,因為run方法并沒有返回值所以返回默認值
public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
//是上方方法的重構,將Runnable改為Callable堂飞,因為傳入的試Callable所以在Future的get方法返回call方法的返回值灌旧,看不明白的讀者可以根據(jù)上方的Callable的講解進行理解。
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
//之前創(chuàng)建的延遲啟動的任務都只執(zhí)行一次绰筛,此方法是根據(jù)initialDelay為運行起點等待設置的啟動時長枢泰,然后以period為周期的循環(huán)執(zhí)行任務。除非取消任務或者中斷調用者線程否則次任務永久運行,因為是永久運行所以無返回值铝噩,默認都為NULL衡蚂。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
//scheduleAtFixedRate的啟動一樣都是指定initialDelay的時長后運行,也是在指定的周期中重復運行骏庸,除非遇到異趁祝或線程中斷和取消執(zhí)行否則將永久運行。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);
//那么scheduleAtFixedRate和scheduleWithFixedDelay他們的區(qū)別是什么具被?
//scheduleWithFixedDelay:在計算周期的時候會將任務的完成時間算進去玻募,意思就是如果任務執(zhí)行了1s而周期是2s那么下次執(zhí)行的時間會是啟動時間+3s。
//對應公式:下一次執(zhí)行開始時間=上一次執(zhí)行結束時間+周期+上一次執(zhí)行的時長
//scheduleAtFixedRate:恰恰相反一姿,計算周期的時候不會管你的執(zhí)行時間會繼續(xù)按照設置的周期計算补箍,比如任務執(zhí)行了1s而周期是2s那么就會根據(jù)你的第一次啟動時間+2s。
//對應公式:下一次執(zhí)行開始時間=上次執(zhí)行結束時間+周期
//這里會有一個疑問啸蜜,那就是如果我周期設置為2s而任務執(zhí)行了3s,scheduleAtFixedRate方法不是不計操作時長的嗎坑雅?那么會不會出現(xiàn)第一次執(zhí)行還未結束第二次已經(jīng)開始呢?這個問題是不存在的衬横,因為在前面講解的時候就說了結束周期任務中有一條就是當線程中斷就會結束周期裹粤,意思就是一個線程維護一個周期任務,一旦線程中斷那么任務也就結束了,如果多個線程對應一個任務的時候只有一個線程在運行任務其他線程都在等待狀態(tài)遥诉。
//這里這樣說可能讀者會納悶這個結論怎么來的拇泣,等介紹完結構后會講解他的實現(xiàn),到時候各位再結合今天的講解看那就一目了然了矮锈。
}
//在ScheduledExecutorService中依賴了ScheduledFuture霉翔,而從下方可以看出ScheduledFuture是一個空的接口他繼承與Delayed和Future
public interface ScheduledFuture<V> extends Delayed, Future<V> {
}
//Delayed接口只定義了getDelay方法,此方法用于獲取剩余的時長:getDelay=下次執(zhí)行開始時間-當前時間苞笨。
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
//可以看出Delayed類繼承了Comparable债朵,此接口是用來比較兩個對象的具體看子類實現(xiàn),這里說下他的返回值瀑凝,當前的數(shù)比較結果:-1小于序芦、0等于、1大于粤咪。
public interface Comparable<T> {
public int compareTo(T o);
}
頂層的基本說完了谚中,剩下零零散散的會在使用的地方再提及,接下來就是對netty定義的講解寥枝。
public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
//是否正在關閉宪塔,只有調用shutdownGracefully兄弟方法或者其他方法,這個是根據(jù)實現(xiàn)走的所以再說實現(xiàn)的時候在細講囊拜。
boolean isShuttingDown();
//優(yōu)美關閉線程池蝌麸,怎么優(yōu)美了看他實現(xiàn)就是了。
Future<?> shutdownGracefully();
//上方的重載艾疟,具體實現(xiàn)按照代碼講解,這里總結為之過早敢辩。
Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
//返回終止時執(zhí)行的Future操作結果蔽莱。實現(xiàn)時細講。
Future<?> terminationFuture();
//此方法來自于ScheduledExecutorService具體講解查看上文
@Override
@Deprecated
void shutdown();
//此方法來自于ScheduledExecutorService具體講解查看上文
@Override
@Deprecated
List<Runnable> shutdownNow();
//返回一個由EventExecutorGroup管理的EventExecutor
EventExecutor next();
//此方法重寫與Iterable接口下方單獨介紹
@Override
Iterator<EventExecutor> iterator();
//此方法來自于ScheduledExecutorService具體講解查看上文
@Override
Future<?> submit(Runnable task);
//此方法來自于ScheduledExecutorService具體講解查看上文
@Override
<T> Future<T> submit(Runnable task, T result);
//此方法來自于ScheduledExecutorService具體講解查看上文
@Override
<T> Future<T> submit(Callable<T> task);
//此方法來自于ScheduledExecutorService具體講解查看上文
@Override
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
//此方法來自于ScheduledExecutorService具體講解查看上文
@Override
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
//此方法來自于ScheduledExecutorService具體講解查看上文
@Override
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
//此方法來自于ScheduledExecutorService具體講解查看上文
@Override
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}
//此接口用來標記當前類可進行迭代并且支持for語句進行遍歷
public interface Iterable<T> {
//獲取當前類的迭代器戚长,迭代器不會進行深入講解盗冷,因為這個涉及到數(shù)據(jù)的存儲方式內存等。如果講又是一大堆同廉,這里讀者只用知道Iterator是獲取元素的仪糖,常用的兩個方法hasNext()是否還有下一個數(shù)據(jù)和next()返回當前數(shù)據(jù)并且做好獲取下一個的準備,這里光說可能不理解迫肖,舉個例子數(shù)組[1,2]锅劝,調用next則從下標0獲取并且下標+1,再次next則獲取下標為1的那么自然返回2,再次next會發(fā)現(xiàn)沒有了那么迭代器報錯蟆湖,所以就需要使用hasNext方法判斷是否有下一個故爵,而他的判斷方法根據(jù)現(xiàn)實數(shù)據(jù)結構走,如果按上面的例子那么就是判斷當前的下標是否小于數(shù)組的length隅津。
Iterator<T> iterator();
//java8的特性接口的default方法和接受一個函數(shù)進行調用诬垂。
default void forEach(Consumer<? super T> action) {
Objects.requireNonNull(action);
for (T t : this) {
action.accept(t);
}
}
//分割迭代器劲室,支持java8的stream操作,這里暫不進行介紹结窘。
default Spliterator<T> spliterator() {
return Spliterators.spliteratorUnknownSize(iterator(), 0);
}
}
//此事件執(zhí)行器繼承與EventExecutorGroup事件執(zhí)行組很洋,這里暫不討論他的數(shù)據(jù)結構后面實現(xiàn)的時候自然會出現(xiàn)他是怎么個結構。
//在EventExecutorGroup接口中可以看到有對EventExecutor的依賴隧枫,EventExecutorGroup的迭代就是EventExecutor喉磁。
public interface EventExecutor extends EventExecutorGroup {
//如果是EventExecutor的實現(xiàn)則此方法代表返回當前類this。
@Override
EventExecutor next();
//返回EventExecutor的管理者悠垛,這里可能有些迷糊在后面會在實現(xiàn)中細講他們的關系线定。
EventExecutorGroup parent();
//判斷當前線程是否是執(zhí)行器的執(zhí)行線程,一般是調用下面的方法傳入當前的線程
boolean inEventLoop();
//傳入一個線程判斷是否是當前執(zhí)行器的執(zhí)行線程
boolean inEventLoop(Thread thread);
//返回一個新的應答确买,這里的Promise是Future的實現(xiàn)斤讥,下面會介紹
<V> Promise<V> newPromise();
//返回一個新的應答
<V> ProgressivePromise<V> newProgressivePromise();
//返回一個SucceededFuture對象并且他的isSuccess方法為true,使用get獲取結果是不會阻塞會返回你傳入的result湾趾。
//這里的Future并不是前面介紹的這個Future是netty自己定義的芭商,下面會詳細介紹
<V> Future<V> newSucceededFuture(V result);
//傳入一個拋出的異常描述,返回FailedFuture類的一個實例搀缠,此實例與上方恰好相反铛楣,isSuccess為false,并且get的時候拋出異常艺普。
<V> Future<V> newFailedFuture(Throwable cause);
}
//netty定義的Future可以看出他繼承與jdk的Future
public interface Future<V> extends java.util.concurrent.Future<V> {
//當操作完成時返回true
boolean isSuccess();
//操作取消的時候返回true
boolean isCancellable();
//獲取執(zhí)行異常簸州,執(zhí)行成功則返回null
Throwable cause();
//添加執(zhí)行監(jiān)聽器,當執(zhí)行操作完成時會調用此監(jiān)聽器也就是isDone為true的時候調用
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
//添加一個集合的監(jiān)聽器
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
//刪除監(jiān)聽器歧譬,只會刪除第一次出現(xiàn)的岸浑,如果一個監(jiān)聽器添加了多次那么只會刪除第一次出現(xiàn)的
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
//與上方相同只是支持傳入多個監(jiān)聽器進行刪除
Future<V> mremoveListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
//同步等待結果,要么返回結果要么拋出異常瑰步,具體按照實現(xiàn)類講解矢洲。
Future<V> sync() throws InterruptedException;
//此方法不會拋出中斷異常,因為內部做了處理缩焦,這是筆者看實現(xiàn)總結的读虏,當然不止一種實現(xiàn)所以暫時理解即可后續(xù)會詳細介紹
Future<V> syncUninterruptibly();
//含義與sync方法相同
Future<V> await() throws InterruptedException;
//含義與syncUninterruptibly方法相同
Future<V> awaitUninterruptibly();
//指定任務執(zhí)行的等待時長,等待結果結束后根據(jù)執(zhí)行結果isDone方法結果返回袁滥,具體以實現(xiàn)為主
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
//調用上方的方法盖桥,是指傳入的timeoutMillis是毫秒
boolean await(long timeoutMillis) throws InterruptedException;
//上面的await方法如果出現(xiàn)中斷會拋出中斷異常,此方法不會拋出異常,傳參和第一個await一樣
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
//不會拋出中斷異常傳參和第二個await一樣
boolean awaitUninterruptibly(long timeoutMillis);
//非阻塞獲取結果如果futrue執(zhí)行成功那么返回call的返回值题翻,如果失敗或者還正在處理則返回null
V getNow();
//暫時與父類的含義一樣葱轩,具體根據(jù)實現(xiàn)子類理解。
@Override
boolean cancel(boolean mayInterruptIfRunning);
}
結構到這里差不多結束了剩下的結構需要根據(jù)具體的實現(xiàn)查看所以這里暫不介紹,下一篇文章將會講解netty中的future靴拱,從本文可以看出有很多關于future的使用垃喊,可以看出netty非常依賴于他那么下節(jié)將會進行future的講解。