[TOC]
1. 定義任務 (run)
//LiftOff.java
//定義線程任務類
public class LiftOff implements Runnable {
protected int countDown = 10; // Default
private static int taskCount = 0;
private final int id = taskCount++;
public LiftOff() {}
public LiftOff(int countDown) {
this.countDown = countDown;
}
public String status() {
return "#" + id + "(" +
(countDown > 0 ? countDown : "Liftoff!") + "), ";
}
public void run() {
while(countDown-- > 0) {
System.out.print(status());
Thread.yield();// yield:讓步沥阱,對線程調(diào)度器的一種建議:我已經(jīng)執(zhí)行完生命周期中最重要的部分蝌衔,此刻正是切換線程的好時機狠毯。
}
}
//BasicThreads.java
//執(zhí)行
public class BasicThreads {
public static void main(String[] args) {
Thread t = new Thread(new LiftOff());
t.start();
System.out.println("Waiting for LiftOff");
}
}
/* Output: (90% match)
Waiting for LiftOff
#0(9), #0(8), #0(7), #0(6), #0(5), #0(4), #0(3), #0(2), #0(1), #0(Liftoff!),
*///:~
2. 使用Executor
Java SE5 的 java.util.conrurrnt包中的執(zhí)行器(Executor)將為你管理Thread對象,從而簡化并發(fā)編程。
ExcutorService:具有服務生命周期的Executor
//LiftOff.java
//定義線程任務類
public class LiftOff implements Runnable {
protected int countDown = 10; // Default
private static int taskCount = 0;
private final int id = taskCount++;
public LiftOff() {}
public LiftOff(int countDown) {
this.countDown = countDown;
}
public String status() {
return "#" + id + "(" +
(countDown > 0 ? countDown : "Liftoff!") + "), ";
}
public void run() {
while(countDown-- > 0) {
System.out.print(status());
Thread.yield();//yield:屈服,對線程調(diào)度器的一種建議:我已經(jīng)執(zhí)行完生命周期中最重要的部分,此刻正是切換線程的好時機阱高。
}
}
- CachedThreadPool: 為每個任務創(chuàng)建一個線程
// CachedThreadPool.java
public class CachedThreadPool {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < 5; i++)
exec.execute(new LiftOff());
exec.shutdown();//防止新任務被提交給這個Executor
}
} /* Output: (Sample)
#0(9), #0(8), #1(9), #2(9), #3(9), #4(9), #0(7), #1(8), #2(8), #3(8), #4(8), #0(6), #1(7), #2(7), #3(7), #4(7), #0(5), #1(6), #2(6), #3(6), #4(6), #0(4), #1(5), #2(5), #3(5), #4(5), #0(3), #1(4), #2(4), #3(4), #4(4), #0(2), #1(3), #2(3), #3(3), #4(3), #0(1), #1(2), #2(2), #3(2), #4(2), #0(Liftoff!), #1(1), #2(1), #3(1), #4(1), #1(Liftoff!), #2(Liftoff!), #3(Liftoff!), #4(Liftoff!),
*///:~
- FixedThreadPool: 可一次性預先執(zhí)行代價高昂的線程分配,從而可以限制線程數(shù)量茬缩。
// FixedThreadPool.java
public class FixedThreadPool {
public static void main(String[] args) {
// Constructor argument is number of threads:
ExecutorService exec = Executors.newFixedThreadPool(5);
for(int i = 0; i < 5; i++)
exec.execute(new LiftOff());
exec.shutdown();
}
} /* Output: (Sample)
#0(9), #0(8), #1(9), #2(9), #3(9), #4(9), #0(7), #1(8), #2(8), #3(8), #4(8), #0(6), #1(7), #2(7), #3(7), #4(7), #0(5), #1(6), #2(6), #3(6), #4(6), #0(4), #1(5), #2(5), #3(5), #4(5), #0(3), #1(4), #2(4), #3(4), #4(4), #0(2), #1(3), #2(3), #3(3), #4(3), #0(1), #1(2), #2(2), #3(2), #4(2), #0(Liftoff!), #1(1), #2(1), #3(1), #4(1), #1(Liftoff!), #2(Liftoff!), #3(Liftoff!), #4(Liftoff!),
*///:~
- SingleThreadExecutor: 線程數(shù)為1的FixedThreadPool赤惊。向SingleThreadExecutor提交多任務,這些任務會排隊凰锡。
// SingleThreadExecutor.java
public class SingleThreadExecutor {
public static void main(String[] args) {
ExecutorService exec =
Executors.newSingleThreadExecutor();
for(int i = 0; i < 5; i++)
exec.execute(new LiftOff());
exec.shutdown();
}
} /* Output:
#0(9), #0(8), #0(7), #0(6), #0(5), #0(4), #0(3), #0(2), #0(1), #0(Liftoff!), #1(9), #1(8), #1(7), #1(6), #1(5), #1(4), #1(3), #1(2), #1(1), #1(Liftoff!), #2(9), #2(8), #2(7), #2(6), #2(5), #2(4), #2(3), #2(2), #2(1), #2(Liftoff!), #3(9), #3(8), #3(7), #3(6), #3(5), #3(4), #3(3), #3(2), #3(1), #3(Liftoff!), #4(9), #4(8), #4(7), #4(6), #4(5), #4(4), #4(3), #4(2), #4(1), #4(Liftoff!),
*///:~
3. 從任務中返回值 (call)
Runnable是執(zhí)行工作的獨立任務未舟,它無法返回值。此時寡夹,可以實現(xiàn)Callable接口处面,實現(xiàn)call(),并且必須使用ExecutorService.submit()調(diào)用它。
// CallableDemo.java
import java.util.concurrent.*;
import java.util.*;
class TaskWithResult implements Callable<String> {
private int id;
public TaskWithResult(int id) {
this.id = id;
}
public String call() {
return "result of TaskWithResult " + id;
}
}
public class CallableDemo {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
ArrayList<Future<String>> results =
new ArrayList<Future<String>>();
for(int i = 0; i < 10; i++)
results.add(exec.submit(new TaskWithResult(i)));
for(Future<String> fs : results)
try {
// get() blocks until completion:
System.out.println(fs.get());
} catch(InterruptedException e) {
System.out.println(e);
return;
} catch(ExecutionException e) {
System.out.println(e);
} finally {
exec.shutdown();
}
}
} /* Output:
result of TaskWithResult 0
result of TaskWithResult 1
result of TaskWithResult 2
result of TaskWithResult 3
result of TaskWithResult 4
result of TaskWithResult 5
result of TaskWithResult 6
result of TaskWithResult 7
result of TaskWithResult 8
result of TaskWithResult 9
*///:~
submit() 會產(chǎn)生Future對象菩掏,它用Callable返回結(jié)果的特定類型進行參數(shù)化魂角。可以使用isDone()來查詢Future是否已完成智绸。完成后可調(diào)用get()獲取結(jié)果野揪,否則访忿,get()將阻塞,直到結(jié)果完成斯稳。
4. 休眠 (sleep)
//: SleepingTask.java
// Calling sleep() to pause for a while.
import java.util.concurrent.*;
public class SleepingTask extends LiftOff {
public void run() {
try {
while(countDown-- > 0) {
System.out.print(status());
// Old-style:
// Thread.sleep(100);
// Java SE5/6-style:
TimeUnit.MILLISECONDS.sleep(100);
}
} catch(InterruptedException e) {
System.err.println("Interrupted");
}
}
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < 5; i++)
exec.execute(new SleepingTask());
exec.shutdown();
}
} /* Output:
#0(9), #1(9), #2(9), #3(9), #4(9), #0(8), #1(8), #2(8), #3(8), #4(8), #0(7), #1(7), #2(7), #3(7), #4(7), #0(6), #1(6), #2(6), #3(6), #4(6), #0(5), #1(5), #2(5), #3(5), #4(5), #0(4), #1(4), #2(4), #3(4), #4(4), #0(3), #1(3), #2(3), #3(3), #4(3), #0(2), #1(2), #2(2), #3(2), #4(2), #0(1), #1(1), #2(1), #3(1), #4(1), #0(Liftoff!), #1(Liftoff!), #2(Liftoff!), #3(Liftoff!), #4(Liftoff!),
*///:~
調(diào)用sleep可拋出InterruptedException海铆。異常在run()中被捕獲,因為異常不能跨線程轉(zhuǎn)播到main()中挣惰,所以必須本地處理卧斟。但也可以使用Excutor的異常捕獲。
5. 優(yōu)先級 (priorities)
線程優(yōu)先級將該線程的重要性傳遞給了調(diào)度器憎茂,優(yōu)先級僅僅是執(zhí)行頻率的高低珍语。
通常任務都有默認的優(yōu)先級。
你可以使用getPriority()讀取優(yōu)先級竖幔,并用setPriority()來修改他板乙;
//: SimplePriorities.java
// Shows the use of thread priorities.
import java.util.concurrent.*;
public class SimplePriorities implements Runnable {
private int countDown = 5;
private volatile double d; // No optimization
private int priority;
public SimplePriorities(int priority) {
this.priority = priority;
}
public String toString() {
return Thread.currentThread() + ": " + countDown;
}
public void run() {
Thread.currentThread().setPriority(priority);
while(true) {
// An expensive, interruptable operation:
for(int i = 1; i < 100000; i++) {
d += (Math.PI + Math.E) / (double)i;
if(i % 1000 == 0)
Thread.yield();
}
System.out.println(this);
if(--countDown == 0) return;
}
}
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < 5; i++)
exec.execute(
new SimplePriorities(Thread.MIN_PRIORITY));
exec.execute(
new SimplePriorities(Thread.MAX_PRIORITY));
exec.shutdown();
}
} /* Output: (70% match)
Thread[pool-1-thread-6,10,main]: 5
Thread[pool-1-thread-6,10,main]: 4
Thread[pool-1-thread-6,10,main]: 3
Thread[pool-1-thread-6,10,main]: 2
Thread[pool-1-thread-6,10,main]: 1
Thread[pool-1-thread-3,1,main]: 5
Thread[pool-1-thread-2,1,main]: 5
Thread[pool-1-thread-1,1,main]: 5
Thread[pool-1-thread-5,1,main]: 5
Thread[pool-1-thread-4,1,main]: 5
...
*///:~
盡管JDK有10個優(yōu)先級,但它與大多數(shù)操作系統(tǒng)都不能映射好拳氢。通常只使用MIN_PRIORITY 募逞、NORM_PRIORITY、 MAX_PRIORITY 三種級別馋评。
6. 讓步 (yield)
通過yield()方法暗示cpu放接,建議具有相同優(yōu)先級的其他線程可以運行。
7. 后臺線程 (daemon)
所謂后臺線程留特,就是指在程序運行的時候在后臺提供的一種通用的服務線程透乾,且這種線程不屬于程序中不可獲取的部分。因此磕秤,所以的非后臺線程結(jié)束時,程序就終結(jié)了捧韵,同時會殺死所以后臺線程市咆。反過來說,只要有任何非后臺線程在運行再来,程序就不會終止蒙兰。
//: SimpleDaemons.java
// Daemon threads don't prevent the program from ending.
import java.util.concurrent.*;
public class SimpleDaemons implements Runnable {
public void run() {
try {
while(true) {
TimeUnit.MILLISECONDS.sleep(100);
System.out.println(Thread.currentThread() + " " + this);
}
} catch(InterruptedException e) {
System.out.println("sleep() interrupted");
}
}
public static void main(String[] args) throws Exception {
for(int i = 0; i < 10; i++) {
Thread daemon = new Thread(new SimpleDaemons());
daemon.setDaemon(true); // Must call before start()
daemon.start();
}
System.out.println("All daemons started");
TimeUnit.MILLISECONDS.sleep(175);
}
} /* Output: (Sample)
All daemons started
Thread[Thread-0,5,main] SimpleDaemons@530daa
Thread[Thread-1,5,main] SimpleDaemons@a62fc3
Thread[Thread-2,5,main] SimpleDaemons@89ae9e
Thread[Thread-3,5,main] SimpleDaemons@1270b73
Thread[Thread-4,5,main] SimpleDaemons@60aeb0
Thread[Thread-5,5,main] SimpleDaemons@16caf43
Thread[Thread-6,5,main] SimpleDaemons@66848c
Thread[Thread-7,5,main] SimpleDaemons@8813f2
Thread[Thread-8,5,main] SimpleDaemons@1d58aae
Thread[Thread-9,5,main] SimpleDaemons@83cc67
...
*///:~
必須在線程啟動之前調(diào)用setDaemon()方法,才能將它設置為后臺線程芒篷。
//: Daemons.java
// Daemon threads spawn other daemon threads.
import java.util.concurrent.*;
class Daemon implements Runnable {
private Thread[] t = new Thread[10];
public void run() {
for(int i = 0; i < t.length; i++) {
t[i] = new Thread(new DaemonSpawn());
t[i].start();
System.out.println("DaemonSpawn " + i + " started, ");
}
for(int i = 0; i < t.length; i++)
System.out.println("t[" + i + "].isDaemon() = " +
t[i].isDaemon() + ", ");
while(true)
Thread.yield();
}
}
class DaemonSpawn implements Runnable {
public void run() {
while(true)
Thread.yield();
}
}
public class Daemons {
public static void main(String[] args) throws Exception {
Thread d = new Thread(new Daemon());
d.setDaemon(true);
d.start();
System.out.println("d.isDaemon() = " + d.isDaemon() + ", ");
// Allow the daemon threads to
// finish their startup processes:
TimeUnit.SECONDS.sleep(1);
}
} /* Output: (Sample)
d.isDaemon() = true, DaemonSpawn 0 started, DaemonSpawn 1 started, DaemonSpawn 2 started, DaemonSpawn 3 started, DaemonSpawn 4 started, DaemonSpawn 5 started, DaemonSpawn 6 started, DaemonSpawn 7 started, DaemonSpawn 8 started, DaemonSpawn 9 started, t[0].isDaemon() = true, t[1].isDaemon() = true, t[2].isDaemon() = true, t[3].isDaemon() = true, t[4].isDaemon() = true, t[5].isDaemon() = true, t[6].isDaemon() = true, t[7].isDaemon() = true, t[8].isDaemon() = true, t[9].isDaemon() = true,
*///:~
后臺線程創(chuàng)建的線程都是后臺線程搜变。
后臺線程不會執(zhí)行finally子句。因為該線程是隨進程戛然而止的针炉。
8.隱式設置全局線程參數(shù)
通過編寫定制的ThreadFactory(實現(xiàn)接口ThreadFactory的newThread()方法)可以定制由Executor創(chuàng)建的線程的屬性(后臺挠他,優(yōu)先級,名稱……)篡帕。
//: DaemonFromFactory.java
// Using a Thread Factory to create daemons.
import java.util.concurrent.*;
public class DaemonFromFactory implements Runnable {
public void run() {
try {
while(true) {
TimeUnit.MILLISECONDS.sleep(100);
System.out.println(Thread.currentThread() + " " + this);
}
} catch(InterruptedException e) {
System.out.println("Interrupted");
}
}
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool(
new DaemonThreadFactory());
for(int i = 0; i < 10; i++)
exec.execute(new DaemonFromFactory());
System.out.println("All daemons started");
TimeUnit.MILLISECONDS.sleep(500); // Run for a while
}
}
class DaemonThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
}
/* (Execute to see output) *///:~
9. 編碼的變體 (extends Thread)
在簡單情況下殖侵,直接從Thread繼承
//: SimpleThread.java
// Inheriting directly from the Thread class.
public class SimpleThread extends Thread {
private int countDown = 5;
private static int threadCount = 0;
public SimpleThread() {
// Store the thread name:
super(Integer.toString(++threadCount));
start();
}
public String toString() {
return "#" + getName() + "(" + countDown + "), ";
}
public void run() {
while(true) {
System.out.print(this);
if(--countDown == 0)
return;
}
}
public static void main(String[] args) {
for(int i = 0; i < 5; i++)
new SimpleThread();
}
} /* Output:
#1(5), #1(4), #1(3), #1(2), #1(1), #2(5), #2(4), #2(3), #2(2), #2(1), #3(5), #3(4), #3(3), #3(2), #3(1), #4(5), #4(4), #4(3), #4(2), #4(1), #5(5), #5(4), #5(3), #5(2), #5(1),
*///:~
10. 加入一個線程 (join)
一個線程可以在其他線程上調(diào)用join()贸呢,效果是等待一段時間直到第二個線程結(jié)束才執(zhí)行。如果線程t2在線程t1上調(diào)用t1.join()拢军,t2將被掛起楞陷,直到目標線程t1結(jié)束才恢復。
也可以在join()中帶上一個超時參數(shù)茉唉,如果目標線程t1在這段時間內(nèi)未結(jié)束固蛾,join()總能返回。
在調(diào)用線程t1上調(diào)用t1.interrupt() 中斷 joint()度陆。
//: Joining.java
// Understanding join().
class Sleeper extends Thread {
private int duration;
public Sleeper(String name, int sleepTime) {
super(name);
duration = sleepTime;
start();
}
public void run() {
try {
sleep(duration);
} catch(InterruptedException e) {
System.out.println(getName() + " was interrupted. " +
"isInterrupted(): " + isInterrupted());
return;
}
System.out.println(getName() + " has awakened");
}
}
class Joiner extends Thread {
private Sleeper sleeper;
public Joiner(String name, Sleeper sleeper) {
super(name);
this.sleeper = sleeper;
start();
}
public void run() {
try {
sleeper.join();
} catch(InterruptedException e) {
System.out.println("Interrupted");
}
System.out.println(getName() + " join completed");
}
}
public class Joining {
public static void main(String[] args) {
Sleeper
sleepy = new Sleeper("Sleepy", 1500),
grumpy = new Sleeper("Grumpy", 1500);
Joiner
dopey = new Joiner("Dopey", sleepy),
doc = new Joiner("Doc", grumpy);
grumpy.interrupt();
}
} /* Output:
Grumpy was interrupted. isInterrupted(): false
Doc join completed
Sleepy has awakened
Dopey join completed
*///:~
11. 捕獲異常 (CaptureUncaughtException)
java SE5 后艾凯,可以用Executor捕獲異常。為了捕獲異常坚芜,我們修改Executor產(chǎn)生線程的方式览芳。Thread.UncaughtExceptionHandler 接口允許你在每個Thread對象上附著一個異常處理器。Thread.UncaughtExceptionHandler.uncaughtException()會在線程因未捕獲的異常而臨近死亡時被調(diào)用鸿竖。
//: CaptureUncaughtException.java
import java.util.concurrent.*;
class ExceptionThread2 implements Runnable {
public void run() {
Thread t = Thread.currentThread();
System.out.println("run() by " + t);
System.out.println(
"eh = " + t.getUncaughtExceptionHandler());
throw new RuntimeException();
}
}
class MyUncaughtExceptionHandler implements
Thread.UncaughtExceptionHandler {
public void uncaughtException(Thread t, Throwable e) {
System.out.println("caught " + e);
}
}
class HandlerThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
System.out.println(this + " creating new Thread");
Thread t = new Thread(r);
System.out.println("created " + t);
t.setUncaughtExceptionHandler(
new MyUncaughtExceptionHandler());
System.out.println(
"eh = " + t.getUncaughtExceptionHandler());
return t;
}
}
public class CaptureUncaughtException {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool(
new HandlerThreadFactory());
exec.execute(new ExceptionThread2());
}
} /* Output: (90% match)
HandlerThreadFactory@de6ced creating new Thread
created Thread[Thread-0,5,main]
eh = MyUncaughtExceptionHandler@1fb8ee3
run() by Thread[Thread-0,5,main]
eh = MyUncaughtExceptionHandler@1fb8ee3
caught java.lang.RuntimeException
*///:~
如果要在代碼中處處使用相同的異常處理器沧竟,那就在Thread類中設置一個靜態(tài)域,并把這個處理器設置為默認的未捕獲異常處理器:
//: SettingDefaultHandler.java
import java.util.concurrent.*;
public class SettingDefaultHandler {
public static void main(String[] args) {
Thread.setDefaultUncaughtExceptionHandler(
new MyUncaughtExceptionHandler());
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new ExceptionThread());
}
} /* Output:
caught java.lang.RuntimeException
*///:~