concurrent主要的類
- Executor :具體Runnable任務(wù)的執(zhí)行者履磨。
- ExecutorService :一個(gè)線程池管理者,其實(shí)現(xiàn)類有多種,我會(huì)介紹一部分汤徽。我們能把- - - Runnable,Callable提交到池中讓其調(diào)度弄痹。
- Semaphore :一個(gè)計(jì)數(shù)信號量
- ReentrantLock :一個(gè)可重入的互斥鎖定 Lock园细,功能類似synchronized耗式,但要強(qiáng)大的多。
- Future :是與Runnable,Callable進(jìn)行交互的接口五芝,比如一個(gè)線程執(zhí)行結(jié)束后取返回的結(jié)果等等痘儡,還提供了cancel終止線程。
- BlockingQueue :阻塞隊(duì)列枢步。
- CompletionService : ExecutorService的擴(kuò)展沉删,可以獲得線程執(zhí)行結(jié)果的
- CountDownLatch :一個(gè)同步輔助類蓄坏,在完成一組正在其他線程中執(zhí)行的操作之前,它允許一個(gè)或多個(gè)線程一直等待丑念。
- CyclicBarrier :一個(gè)同步輔助類,它允許一組線程互相等待结蟋,直到到達(dá)某個(gè)公共屏障點(diǎn)
- Future :Future 表示異步計(jì)算的結(jié)果脯倚。
- ScheduledExecutorService :一個(gè) ExecutorService,可安排在給定的延遲后運(yùn)行或定期執(zhí)行的命令嵌屎。
接下來逐一介紹
Executors主要方法說明
newFixedThreadPool(固定大小線程池)
創(chuàng)建一個(gè)可重用固定線程集合的線程池推正,以共享的無界隊(duì)列方式來運(yùn)行這些線程(只有要請求的過來,就會(huì)在一個(gè)隊(duì)列里等待執(zhí)行)宝惰。如果在關(guān)閉前的執(zhí)行期間由于失敗而導(dǎo)致任何線程終止植榕,那么一個(gè)新線程將代替它執(zhí)行后續(xù)的任務(wù)(如果需要)。
newCachedThreadPool(無界線程池尼夺,可以進(jìn)行自動(dòng)線程回收)
創(chuàng)建一個(gè)可根據(jù)需要?jiǎng)?chuàng)建新線程的線程池尊残,但是在以前構(gòu)造的線程可用時(shí)將重用它們。對于執(zhí)行很多短期異步任務(wù)的程序而言淤堵,這些線程池通城奚溃可提高程序性能。調(diào)用 execute 將重用以前構(gòu)造的線程(如果線程可用)拐邪。如果現(xiàn)有線程沒有可用的慰毅,則創(chuàng)建一個(gè)新線程并添加到池中。終止并從緩存中移除那些已有 60 秒鐘未被使用的線程扎阶。因此汹胃,長時(shí)間保持空閑的線程池不會(huì)使用任何資源。注意东臀,可以使用 ThreadPoolExecutor 構(gòu)造方法創(chuàng)建具有類似屬性但細(xì)節(jié)不同(例如超時(shí)參數(shù))的線程池着饥。
newSingleThreadExecutor(單個(gè)后臺(tái)線程)
創(chuàng)建一個(gè)使用單個(gè) worker 線程的 Executor,以無界隊(duì)列方式來運(yùn)行該線程啡邑。(注意贱勃,如果因?yàn)樵陉P(guān)閉前的執(zhí)行期間出現(xiàn)失敗而終止了此單個(gè)線程,那么如果需要谤逼,一個(gè)新線程將代替它執(zhí)行后續(xù)的任務(wù))贵扰。可保證順序地執(zhí)行各個(gè)任務(wù)流部,并且在任意給定的時(shí)間不會(huì)有多個(gè)線程是活動(dòng)的戚绕。與其他等效的 newFixedThreadPool(1) 不同,可保證無需重新配置此方法所返回的執(zhí)行程序即可使用其他的線程枝冀。
這些方法返回的都是ExecutorService對象舞丛,這個(gè)對象可以理解為就是一個(gè)線程池耘子。
這個(gè)線程池的功能還是比較完善的∏蚯校可以提交任務(wù)submit()可以結(jié)束線程池shutdown()谷誓。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MyExecutor extends Thread {
private int index;
public MyExecutor(int i){
this.index=i;
}
public void run(){
try{
System.out.println("["+this.index+"] start....");
Thread.sleep((int)(Math.random()*1000));
System.out.println("["+this.index+"] end.");
}
catch(Exception e){
e.printStackTrace();
}
}
public static void main(String args[]){
ExecutorService service=Executors.newFixedThreadPool(4);
for(int i=0;i<10;i++){
service.execute(new MyExecutor(i));
//service.submit(new MyExecutor(i));
}
System.out.println("submit finish");
service.shutdown();
}
}
雖然打印了一些信息,但是看的不是非常清晰吨凑,這個(gè)線程池是如何工作的捍歪,我們來將休眠的時(shí)間調(diào)長10倍。
Thread.sleep((int)(Math.random()*10000));
再來看鸵钝,會(huì)清楚看到只能執(zhí)行4個(gè)線程糙臼。當(dāng)執(zhí)行完一個(gè)線程后,才會(huì)又執(zhí)行一個(gè)新的線程恩商,也就是說变逃,我們將所有的線程提交后,線程池會(huì)等待執(zhí)行完最后shutdown怠堪。我們也會(huì)發(fā)現(xiàn)揽乱,提交的線程被放到一個(gè)“無界隊(duì)列里”。這是一個(gè)有序隊(duì)列(BlockingQueue研叫,這個(gè)下面會(huì)說到)锤窑。
另外它使用了Executors的靜態(tài)函數(shù)生成一個(gè)固定的線程池,顧名思義嚷炉,線程池的線程是不會(huì)釋放的渊啰,即使它是Idle。
這就會(huì)產(chǎn)生性能問題申屹,比如如果線程池的大小為200绘证,當(dāng)全部使用完畢后,所有的線程會(huì)繼續(xù)留在池中哗讥,相應(yīng)的內(nèi)存和線程切換(while(true)+sleep循環(huán))都會(huì)增加嚷那。
如果要避免這個(gè)問題,就必須直接使用ThreadPoolExecutor()來構(gòu)造杆煞∥嚎恚可以像通用的線程池一樣設(shè)置“最大線程數(shù)”、“最小線程數(shù)”和“空閑線程keepAlive的時(shí)間”决乎。
這個(gè)就是線程池基本用法队询。
Semaphore
一個(gè)計(jì)數(shù)信號量。從概念上講构诚,信號量維護(hù)了一個(gè)許可集合蚌斩。如有必要,在許可可用前會(huì)阻塞每一個(gè) acquire()范嘱,然后再獲取該許可送膳。每個(gè) release() 添加一個(gè)許可员魏,從而可能釋放一個(gè)正在阻塞的獲取者。但是叠聋,不使用實(shí)際的許可對象撕阎,Semaphore 只對可用許可的號碼進(jìn)行計(jì)數(shù),并采取相應(yīng)的行動(dòng)碌补。
Semaphore 通常用于限制可以訪問某些資源(物理或邏輯的)的線程數(shù)目闻书。例如,下面的類使用信號量控制對內(nèi)容池的訪問:
這里是一個(gè)實(shí)際的情況脑慧,大家排隊(duì)上廁所,廁所只有兩個(gè)位置砰盐,來了10個(gè)人需要排隊(duì)闷袒。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class MySemaphore extends Thread {
Semaphore position;
private int id;
public MySemaphore(int i,Semaphore s){
this.id=i;
this.position=s;
}
public void run(){
try{
if(position.availablePermits()>0){
System.out.println("顧客["+this.id+"]進(jìn)入廁所,有空位");
}
else{
System.out.println("顧客["+this.id+"]進(jìn)入廁所岩梳,沒空位囊骤,排隊(duì)");
}
position.acquire();
System.out.println("顧客["+this.id+"]獲得坑位");
Thread.sleep((int)(Math.random()*1000));
System.out.println("顧客["+this.id+"]使用完畢");
position.release();
}
catch(Exception e){
e.printStackTrace();
}
}
public static void main(String args[]){
ExecutorService list=Executors.newCachedThreadPool();
Semaphore position=new Semaphore(2);
for(int i=0;i<10;i++){
list.submit(new MySemaphore(i+1,position));
}
list.shutdown();
position.acquireUninterruptibly(2);
System.out.println("使用完畢,需要清掃了");
position.release(2);
}
}
ReentrantLock
一個(gè)可重入的互斥鎖定 Lock冀值,它具有與使用 synchronized 方法和語句所訪問的隱式監(jiān)視器鎖定相同的一些基本行為和語義也物,但功能更強(qiáng)大。
ReentrantLock 將由最近成功獲得鎖定列疗,并且還沒有釋放該鎖定的線程所擁有滑蚯。當(dāng)鎖定沒有被另一個(gè)線程所擁有時(shí),調(diào)用 lock 的線程將成功獲取該鎖定并返回抵栈。如果當(dāng)前線程已經(jīng)擁有該鎖定告材,此方法將立即返回」啪ⅲ可以使用 isHeldByCurrentThread() 和 getHoldCount() 方法來檢查此情況是否發(fā)生斥赋。
此類的構(gòu)造方法接受一個(gè)可選的公平參數(shù)。
當(dāng)設(shè)置為 true時(shí)产艾,在多個(gè)線程的爭用下疤剑,這些鎖定傾向于將訪問權(quán)授予等待時(shí)間最長的線程。否則此鎖定將無法保證任何特定訪問順序闷堡。
與采用默認(rèn)設(shè)置(使用不公平鎖定)相比隘膘,使用公平鎖定的程序在許多線程訪問時(shí)表現(xiàn)為很低的總體吞吐量(即速度很慢,常常極其慢)缚窿,但是在獲得鎖定和保證鎖定分配的均衡性時(shí)差異較小棘幸。不過要注意的是,公平鎖定不能保證線程調(diào)度的公平性倦零。因此误续,使用公平鎖定的眾多線程中的一員可能獲得多倍的成功機(jī)會(huì)吨悍,這種情況發(fā)生在其他活動(dòng)線程沒有被處理并且目前并未持有鎖定時(shí)。還要注意的是蹋嵌,未定時(shí)的 tryLock 方法并沒有使用公平設(shè)置育瓜。因?yàn)榧词蛊渌€程正在等待,只要該鎖定是可用的栽烂,此方法就可以獲得成功躏仇。
建議總是 立即實(shí)踐,使用 try 塊來調(diào)用 lock腺办,在之前/之后的構(gòu)造中焰手,最典型的代碼如下:
class X {
private final ReentrantLock lock = new ReentrantLock();
// ...
public void m() {
lock.lock(); // block until condition holds
try {
// ... method body
} finally {
lock.unlock()
}
}
}
我的例子:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantLock;
public class MyReentrantLock extends Thread{
TestReentrantLock lock;
private int id;
public MyReentrantLock(int i,TestReentrantLock test){
this.id=i;
this.lock=test;
}
public void run(){
lock.print(id);
}
public static void main(String args[]){
ExecutorService service=Executors.newCachedThreadPool();
TestReentrantLock lock=new TestReentrantLock();
for(int i=0;i<10;i++){
service.submit(new MyReentrantLock(i,lock));
}
service.shutdown();
}
}
class TestReentrantLock{
private ReentrantLock lock=new ReentrantLock();
public void print(int str){
try{
lock.lock();
System.out.println(str+"獲得");
Thread.sleep((int)(Math.random()*1000));
}
catch(Exception e){
e.printStackTrace();
}
finally{
System.out.println(str+"釋放");
lock.unlock();
}
}
}
BlockingQueue
支持兩個(gè)附加操作的 Queue,這兩個(gè)操作是:檢索元素時(shí)等待隊(duì)列變?yōu)榉强栈澈恚约按鎯?chǔ)元素時(shí)等待空間變得可用书妻。
BlockingQueue 不接受 null 元素聊闯。試圖 add、put 或 offer 一個(gè) null 元素時(shí),某些實(shí)現(xiàn)會(huì)拋出 NullPointerException。null 被用作指示 poll 操作失敗的警戒值。
BlockingQueue 可以是限定容量的。它在任意給定時(shí)間都可以有一個(gè) remainingCapacity,超出此容量,便無法無阻塞地 put 額外的元素。
沒有任何內(nèi)部容量約束的 BlockingQueue 總是報(bào)告 Integer.MAX_VALUE 的剩余容量棘催。
BlockingQueue 實(shí)現(xiàn)主要用于生產(chǎn)者-使用者隊(duì)列纲仍,但它另外還支持 Collection 接口。因此沸版,舉例來說,使用 remove(x) 從隊(duì)列中移除任意一個(gè)元素是有可能的兴蒸。
然而视粮,這種操作通常不 會(huì)有效執(zhí)行,只能有計(jì)劃地偶爾使用橙凳,比如在取消排隊(duì)信息時(shí)蕾殴。
BlockingQueue 實(shí)現(xiàn)是線程安全的。所有排隊(duì)方法都可以使用內(nèi)部鎖定或其他形式的并發(fā)控制來自動(dòng)達(dá)到它們的目的岛啸。
然而钓觉,大量的 Collection 操作(addAll、containsAll坚踩、retainAll 和 removeAll)沒有 必要自動(dòng)執(zhí)行荡灾,除非在實(shí)現(xiàn)中特別說明。
因此,舉例來說批幌,在只添加了 c 中的一些元素后础锐,addAll(c) 有可能失敗(拋出一個(gè)異常)逼裆。
BlockingQueue 實(shí)質(zhì)上不 支持使用任何一種“close”或“shutdown”操作來指示不再添加任何項(xiàng)郁稍。
這種功能的需求和使用有依賴于實(shí)現(xiàn)的傾向。例如胜宇,一種常用的策略是:對于生產(chǎn)者耀怜,插入特殊的 end-of-stream 或 poison 對象,并根據(jù)使用者獲取這些對象的時(shí)間來對它們進(jìn)行解釋桐愉。
下面的例子演示了這個(gè)阻塞隊(duì)列的基本功能财破。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
public class MyBlockingQueue extends Thread {
public static BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);
private int index;
public MyBlockingQueue(int i) {
this.index = i;
}
public void run() {
try {
queue.put(String.valueOf(this.index));
System.out.println("{" + this.index + "} in queue!");
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String args[]) {
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
service.submit(new MyBlockingQueue(i));
}
Thread thread = new Thread() {
public void run() {
try {
while (true) {
Thread.sleep((int) (Math.random() * 1000));
if(MyBlockingQueue.queue.isEmpty())
break;
String str = MyBlockingQueue.queue.take();
System.out.println(str + " has take!");
}
} catch (Exception e) {
e.printStackTrace();
}
}
};
service.submit(thread);
service.shutdown();
}
}
---------------------執(zhí)行結(jié)果-----------------
{0} in queue!
{1} in queue!
{2} in queue!
{3} in queue!
0 has take!
{4} in queue!
1 has take!
{6} in queue!
2 has take!
{7} in queue!
3 has take!
{8} in queue!
4 has take!
{5} in queue!
6 has take!
{9} in queue!
7 has take!
8 has take!
5 has take!
9 has take!
CompletionService
將生產(chǎn)新的異步任務(wù)與使用已完成任務(wù)的結(jié)果分離開來的服務(wù)。生產(chǎn)者 submit 執(zhí)行的任務(wù)从诲。使用者 take 已完成的任務(wù)左痢,
并按照完成這些任務(wù)的順序處理它們的結(jié)果。例如系洛,CompletionService 可以用來管理異步 IO 俊性,執(zhí)行讀操作的任務(wù)作為程序或系統(tǒng)的一部分提交,
然后描扯,當(dāng)完成讀操作時(shí)定页,會(huì)在程序的不同部分執(zhí)行其他操作,執(zhí)行操作的順序可能與所請求的順序不同绽诚。
通常典徊,CompletionService 依賴于一個(gè)單獨(dú)的 Executor 來實(shí)際執(zhí)行任務(wù),在這種情況下恩够,
CompletionService 只管理一個(gè)內(nèi)部完成隊(duì)列卒落。ExecutorCompletionService 類提供了此方法的一個(gè)實(shí)現(xiàn)蜂桶。
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MyCompletionService implements Callable<String> {
private int id;
public MyCompletionService(int i){
this.id=i;
}
public static void main(String[] args) throws Exception{
ExecutorService service=Executors.newCachedThreadPool();
CompletionService<String> completion=new ExecutorCompletionService<String>(service);
for(int i=0;i<10;i++){
completion.submit(new MyCompletionService(i));
}
for(int i=0;i<10;i++){
System.out.println(completion.take().get());
}
service.shutdown();
}
public String call() throws Exception {
Integer time=(int)(Math.random()*1000);
try{
System.out.println(this.id+" start");
Thread.sleep(time);
System.out.println(this.id+" end");
}
catch(Exception e){
e.printStackTrace();
}
return this.id+":"+time;
}
}
CountDownLatch
一個(gè)同步輔助類钦购,在完成一組正在其他線程中執(zhí)行的操作之前,它允許一個(gè)或多個(gè)線程一直等待谎痢。
用給定的計(jì)數(shù) 初始化 CountDownLatch。由于調(diào)用了 countDown() 方法,所以在當(dāng)前計(jì)數(shù)到達(dá)零之前魁蒜,await 方法會(huì)一直受阻塞。
之后,會(huì)釋放所有等待的線程劣针,await 的所有后續(xù)調(diào)用都將立即返回襟己。這種現(xiàn)象只出現(xiàn)一次——計(jì)數(shù)無法被重置贮预。如果需要重置計(jì)數(shù),請考慮使用 CyclicBarrier。
CountDownLatch 是一個(gè)通用同步工具仿吞,它有很多用途滑频。將計(jì)數(shù) 1 初始化的 CountDownLatch 用作一個(gè)簡單的開/關(guān)鎖存器,
或入口:在通過調(diào)用 countDown() 的線程打開入口前唤冈,所有調(diào)用 await 的線程都一直在入口處等待峡迷。
用 N 初始化的 CountDownLatch 可以使一個(gè)線程在 N 個(gè)線程完成某項(xiàng)操作之前一直等待,或者使其在某項(xiàng)操作完成 N 次之前一直等待你虹。
CountDownLatch 的一個(gè)有用特性是绘搞,它不要求調(diào)用 countDown 方法的線程等到計(jì)數(shù)到達(dá)零時(shí)才繼續(xù),
而在所有線程都能通過之前售葡,它只是阻止任何線程繼續(xù)通過一個(gè) await看杭。
一下的例子是別人寫的,非常形象挟伙。
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestCountDownLatch {
public static void main(String[] args) throws InterruptedException {
// 開始的倒數(shù)鎖
final CountDownLatch begin = new CountDownLatch(1);
// 結(jié)束的倒數(shù)鎖
final CountDownLatch end = new CountDownLatch(10);
// 十名選手
final ExecutorService exec = Executors.newFixedThreadPool(10);
for (int index = 0; index < 10; index++) {
final int NO = index + 1;
Runnable run = new Runnable() {
public void run() {
try {
begin.await();//一直阻塞
Thread.sleep((long) (Math.random() * 10000));
System.out.println("No." + NO + " arrived");
} catch (InterruptedException e) {
} finally {
end.countDown();
}
}
};
exec.submit(run);
}
System.out.println("Game Start");
begin.countDown();
end.await();
System.out.println("Game Over");
exec.shutdown();
}
}
CountDownLatch最重要的方法是countDown()和await()楼雹,前者主要是倒數(shù)一次,后者是等待倒數(shù)到0尖阔,如果沒有到達(dá)0贮缅,就只有阻塞等待了。
CyclicBarrier
一個(gè)同步輔助類介却,它允許一組線程互相等待谴供,直到到達(dá)某個(gè)公共屏障點(diǎn) (common barrier point)。
在涉及一組固定大小的線程的程序中齿坷,這些線程必須不時(shí)地互相等待桂肌,此時(shí) CyclicBarrier 很有用。因?yàn)樵?barrier 在釋放等待線程后可以重用永淌,所以稱它為循環(huán) 的 barrier崎场。
CyclicBarrier 支持一個(gè)可選的 Runnable 命令,在一組線程中的最后一個(gè)線程到達(dá)之后(但在釋放所有線程之前)遂蛀,
該命令只在每個(gè)屏障點(diǎn)運(yùn)行一次谭跨。若在繼續(xù)所有參與線程之前更新共享狀態(tài),此屏障操作 很有用李滴。
示例用法:下面是一個(gè)在并行分解設(shè)計(jì)中使用 barrier 的例子螃宙,很經(jīng)典的旅行團(tuán)例子:
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestCyclicBarrier {
// 徒步需要的時(shí)間: Shenzhen, Guangzhou, Shaoguan, Changsha, Wuhan
private static int[] timeWalk = { 5, 8, 15, 15, 10 };
// 自駕游
private static int[] timeSelf = { 1, 3, 4, 4, 5 };
// 旅游大巴
private static int[] timeBus = { 2, 4, 6, 6, 7 };
static String now() {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
return sdf.format(new Date()) + ": ";
}
static class Tour implements Runnable {
private int[] times;
private CyclicBarrier barrier;
private String tourName;
public Tour(CyclicBarrier barrier, String tourName, int[] times) {
this.times = times;
this.tourName = tourName;
this.barrier = barrier;
}
public void run() {
try {
Thread.sleep(times[0] * 1000);
System.out.println(now() + tourName + " Reached Shenzhen");
barrier.await();
Thread.sleep(times[1] * 1000);
System.out.println(now() + tourName + " Reached Guangzhou");
barrier.await();
Thread.sleep(times[2] * 1000);
System.out.println(now() + tourName + " Reached Shaoguan");
barrier.await();
Thread.sleep(times[3] * 1000);
System.out.println(now() + tourName + " Reached Changsha");
barrier.await();
Thread.sleep(times[4] * 1000);
System.out.println(now() + tourName + " Reached Wuhan");
barrier.await();
} catch (InterruptedException e) {
} catch (BrokenBarrierException e) {
}
}
}
public static void main(String[] args) {
// 三個(gè)旅行團(tuán)
CyclicBarrier barrier = new CyclicBarrier(3);
ExecutorService exec = Executors.newFixedThreadPool(3);
exec.submit(new Tour(barrier, "WalkTour", timeWalk));
exec.submit(new Tour(barrier, "SelfTour", timeSelf));
//當(dāng)我們把下面的這段代碼注釋后,會(huì)發(fā)現(xiàn)所坯,程序阻塞了谆扎,無法繼續(xù)運(yùn)行下去。
exec.submit(new Tour(barrier, "BusTour", timeBus));
exec.shutdown();
}
}
CyclicBarrier最重要的屬性就是參與者個(gè)數(shù)芹助,另外最要方法是await()燕酷。當(dāng)所有線程都調(diào)用了await()后籍凝,就表示這些線程都可以繼續(xù)執(zhí)行,否則就會(huì)等待苗缩。
Future
Future 表示異步計(jì)算的結(jié)果饵蒂。它提供了檢查計(jì)算是否完成的方法,以等待計(jì)算的完成酱讶,并檢索計(jì)算的結(jié)果退盯。
計(jì)算完成后只能使用 get 方法來檢索結(jié)果,如有必要泻肯,計(jì)算完成前可以阻塞此方法渊迁。取消則由 cancel 方法來執(zhí)行。
還提供了其他方法灶挟,以確定任務(wù)是正常完成還是被取消了琉朽。一旦計(jì)算完成,就不能再取消計(jì)算稚铣。
如果為了可取消性而使用 Future但又不提供可用的結(jié)果箱叁,則可以聲明 Future<?> 形式類型、并返回 null 作為基礎(chǔ)任務(wù)的結(jié)果惕医。
這個(gè)我們在前面CompletionService已經(jīng)看到了耕漱,這個(gè)Future的功能,而且這個(gè)可以在提交線程的時(shí)候被指定為一個(gè)返回對象的抬伺。
ScheduledExecutorService
一個(gè) ExecutorService螟够,可安排在給定的延遲后運(yùn)行或定期執(zhí)行的命令。
schedule 方法使用各種延遲創(chuàng)建任務(wù)峡钓,并返回一個(gè)可用于取消或檢查執(zhí)行的任務(wù)對象妓笙。scheduleAtFixedRate 和 scheduleWithFixedDelay 方法創(chuàng)建并執(zhí)行某些在取消前一直定期運(yùn)行的任務(wù)。
用 Executor.execute(java.lang.Runnable) 和 ExecutorService 的 submit 方法所提交的命令能岩,通過所請求的 0 延遲進(jìn)行安排寞宫。
schedule 方法中允許出現(xiàn) 0 和負(fù)數(shù)延遲(但不是周期),并將這些視為一種立即執(zhí)行的請求捧灰。
所有的 schedule 方法都接受相對 延遲和周期作為參數(shù)淆九,而不是絕對的時(shí)間或日期统锤。將以 Date 所表示的絕對時(shí)間轉(zhuǎn)換成要求的形式很容易毛俏。
例如,要安排在某個(gè)以后的日期運(yùn)行饲窿,可以使用:schedule(task, date.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS)煌寇。
但是要注意,由于網(wǎng)絡(luò)時(shí)間同步協(xié)議逾雄、時(shí)鐘漂移或其他因素的存在阀溶,因此相對延遲的期滿日期不必與啟用任務(wù)的當(dāng)前 Date 相符腻脏。
Executors 類為此包中所提供的 ScheduledExecutorService 實(shí)現(xiàn)提供了便捷的工廠方法。
一下的例子也是網(wǎng)上比較流行的银锻。
import static java.util.concurrent.TimeUnit.SECONDS;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
public class TestScheduledThread {
public static void main(String[] args) {
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
final Runnable beeper = new Runnable() {
int count = 0;
public void run() {
System.out.println(new Date() + " beep " + (++count));
}
};
// 1秒鐘后運(yùn)行永品,并每隔2秒運(yùn)行一次
final ScheduledFuture beeperHandle = scheduler.scheduleAtFixedRate(beeper, 1, 2, SECONDS);
// 2秒鐘后運(yùn)行,并每次在上次任務(wù)運(yùn)行完后等待5秒后重新運(yùn)行
final ScheduledFuture beeperHandle2 = scheduler.scheduleWithFixedDelay(beeper, 2, 5, SECONDS);
// 30秒后結(jié)束關(guān)閉任務(wù)击纬,并且關(guān)閉Scheduler
scheduler.schedule(new Runnable() {
public void run() {
beeperHandle.cancel(true);
beeperHandle2.cancel(true);
scheduler.shutdown();
}
}, 30, SECONDS);
}
}