一、多線程產(chǎn)生的問題與簡單優(yōu)化
public class ThreadTest1 {
public static void main(String[] args) {
new Producer().start();
new Consumer().start();
}
static class ProductObject {
public static String value = null;
}
static class Consumer extends Thread {
@Override
public void run() {
while (true) {
if (ProductObject.value != null) {
System.out.println("消費產(chǎn)品" + ProductObject.value);
ProductObject.value = null;
}
}
}
}
static class Producer extends Thread {
@Override
public void run() {
//不斷生產(chǎn)產(chǎn)品
while (true) {
if (ProductObject.value == null) {
//產(chǎn)品已經(jīng)消費完成,生產(chǎn)新的產(chǎn)品
ProductObject.value = "No:" + System.currentTimeMillis();
System.out.println("生產(chǎn)產(chǎn)品" + ProductObject.value);
}
}
}
}
}
結(jié)果輸出:
生產(chǎn)產(chǎn)品No:1505980855609
消費產(chǎn)品No:1505980855609
生產(chǎn)產(chǎn)品No:1505980855609
消費產(chǎn)品No:1505980855609
生產(chǎn)產(chǎn)品No:1505980855609
消費產(chǎn)品No:1505980855609
生產(chǎn)產(chǎn)品No:1505980855609
消費產(chǎn)品No:1505980855609
消費產(chǎn)品No:1505980855609
生產(chǎn)產(chǎn)品No:1505980855609
生產(chǎn)產(chǎn)品No:1505980855609
消費產(chǎn)品No:1505980855609
生產(chǎn)產(chǎn)品No:1505980855609
消費產(chǎn)品No:1505980855609
生產(chǎn)產(chǎn)品No:1505980855609
我們發(fā)現(xiàn)該示例并沒有一直執(zhí)行钱慢,而是執(zhí)行一段時間后停止打印
1.原因
內(nèi)存機制中的 "副本"概念
多個線程訪問一個成員變量時 每個線程都會得到一個該變量的副本 在自己的線程的棧中保存、計算 以提高速度卿堂。 但是這樣就會有同步的問題了束莫。 當(dāng)一個線程修改了自己棧內(nèi)副本的值 還沒有立即將同步到主存中懒棉, 其他線程再來獲取主存中的該變量時 就會得到過期數(shù)據(jù)。
1.解決辦法
為了解決這種問題 可以使用synchronized對該變量的操作同步 览绿, 或使用volatile關(guān)鍵字聲明該變量為易變對象 這樣的話 每個線程就不會創(chuàng)建副本到自己的棧中 而是直接操作主存策严。
(1)volatile
在對象/變量前加上 volatile 。 Volatile修飾的 成員變量 在每次被 線程 訪問時饿敲,都強迫從 共享內(nèi)存 中重讀該成員變量的值妻导。而且,當(dāng) 成員變量 發(fā)生變化時怀各,強迫線程將變化值回寫到 共享內(nèi)存 倔韭。這樣在任何時刻,兩個不同的線程總是看到某個 成員變量 的同一個值瓢对。 Java語言 規(guī)范中指出:為了獲得最佳速度寿酌,允許線程保存共享 成員變量 的私有拷貝,而且只當(dāng)線程進入或者離開 同步代碼塊 時才與共享成員變量的原始值對比硕蛹。這樣當(dāng)多個線程同時與某個對象交互時醇疼,就必須要注意到要讓線程及時的得到共享 成員變量 的變化。而volatile 關(guān)鍵字 就是提示JVM:對于這個 成員變量 不能保存它的私有拷貝法焰,而應(yīng)直接與共享成員變量交互秧荆。使用建議:在兩個或者更多的線程訪問的 成員變量 上使用volatile。當(dāng)要訪問的 變量 已在synchronized代碼塊中壶栋,或者為 常量 時辰如,不必使用。由于使用volatile屏蔽掉了JVM中必要的 代碼優(yōu)化 贵试,所以在效率上比較低琉兜,因此一定在必要時才使用此 關(guān)鍵字 。
static class ProductObject {
public volatile static String value = null;
}
}
結(jié)果輸出:
消費產(chǎn)品No:1505982581204
生產(chǎn)產(chǎn)品No:1505982581204
消費產(chǎn)品No:1505982581204
生產(chǎn)產(chǎn)品No:1505982581204
消費產(chǎn)品No:1505982581204
生產(chǎn)產(chǎn)品No:1505982581204
消費產(chǎn)品No:1505982581204
生產(chǎn)產(chǎn)品No:1505982581204
消費產(chǎn)品No:1505982581204
生產(chǎn)產(chǎn)品No:1505982581204
(省略...)
程序一直輸出符合要求
(2)synchronized
由于是上例中 volatile while 一直執(zhí)行性能開銷比較大 毙玻,則需要加上鎖 synchronized避免大量性能開銷
將對象/變量加上鎖 synchronized 修飾豌蟋。在線程中,使用同步方法或者同步塊桑滩。
public class ThreadTest1 {
public static void main(String[] args) {
Object lock = new Object();
new Producer(lock).start();
new Consumer(lock).start();
}
static class ProductObject {
public static String value = null;
}
static class Consumer extends Thread {
Object lock;
public Consumer(Object lock) {
this.lock = lock;
}
@Override
public void run() {
while (true) {
synchronized (lock) {//互斥鎖
if (ProductObject.value != null) {
System.out.println("消費產(chǎn)品" + ProductObject.value);
ProductObject.value = null;
}
}
}
}
}
static class Producer extends Thread {
Object lock;
public Producer(Object lock) {
this.lock = lock;
}
@Override
public void run() {
//不斷生產(chǎn)產(chǎn)品
while (true) {
synchronized (lock) {//互斥鎖
if (ProductObject.value == null) {
//產(chǎn)品已經(jīng)消費完成梧疲,生產(chǎn)新的產(chǎn)品
ProductObject.value = "No:" + System.currentTimeMillis();
System.out.println("生產(chǎn)產(chǎn)品" + ProductObject.value);
}
}
}
}
}
}
程序一直輸出符合要求
但是,為了明確對象鎖的程序先后執(zhí)行順序(減少輪詢次數(shù))运准,所有要引入wait() notify()方法
Obj.wait()幌氮,與Obj.notify()必須要與synchronized(Obj)一起使用,也就是wait,與notify是針對已經(jīng)獲取了Obj鎖進行操作胁澳,從語法角度來說就是Obj.wait(),Obj.notify必須在synchronized(Obj){...}語句塊內(nèi)该互。從功能上來說wait就是說線程在獲取對象鎖后,主動釋放對象鎖韭畸,同時本線程休眠宇智。直到有其它線程調(diào)用對象的notify()喚醒該線程蔓搞,才能繼續(xù)獲取對象鎖,并繼續(xù)執(zhí)行随橘。相應(yīng)的notify()就是對對象鎖的喚醒操作喂分。但有一點需要注意的是notify()調(diào)用后,并不是馬上就釋放對象鎖的机蔗,而是在相應(yīng)的synchronized(){}語句塊執(zhí)行結(jié)束蒲祈,自動釋放鎖后,JVM會在wait()對象鎖的線程中隨機選取一線程萝嘁,賦予其對象鎖讳嘱,喚醒線程,繼續(xù)執(zhí)行酿愧。這樣就提供了在線程間同步沥潭、喚醒的操作。Thread.sleep()與Object.wait()二者都可以暫停當(dāng)前線程嬉挡,釋放CPU控制權(quán)钝鸽,主要的區(qū)別在于Object.wait()在釋放CPU同時,釋放了對象鎖的控制庞钢。
優(yōu)化后程序:
public class ThreadTest1 {
//產(chǎn)品
static class ProductObject{
//線程操作變量可見
public static String value;
}
//生產(chǎn)者線程
static class Producer extends Thread{
Object lock;
public Producer(Object lock) {
this.lock = lock;
}
@Override
public void run() {
//不斷生產(chǎn)產(chǎn)品
while(true){
synchronized (lock) { //互斥鎖
//產(chǎn)品還沒有被消費拔恰,等待
if(ProductObject.value != null){
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//產(chǎn)品已經(jīng)消費完成,生產(chǎn)新的產(chǎn)品
ProductObject.value = "NO:"+System.currentTimeMillis();
System.out.println("生產(chǎn)產(chǎn)品:"+ProductObject.value);
lock.notify(); //生產(chǎn)完成基括,通知消費者消費
}
}
}
}
//消費者線程
static class Consumer extends Thread{
Object lock;
public Consumer(Object lock) {
this.lock = lock;
}
@Override
public void run() {
while(true){
synchronized (lock) {
//沒有產(chǎn)品可以消費
if(ProductObject.value == null){
//等待颜懊,阻塞
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("消費產(chǎn)品:"+ProductObject.value);
ProductObject.value = null;
lock.notify(); //消費完成,通知生產(chǎn)者风皿,繼續(xù)生產(chǎn)
}
}
}
}
public static void main(String[] args) {
Object lock = new Object();
new Producer(lock).start();
new Consumer(lock).start();
}
}
(4)volatile與synchronized區(qū)別
1)volatile本質(zhì)是在告訴jvm當(dāng)前變量在寄存器中的值是不確定的,需要從主存中讀取,synchronized則是鎖定當(dāng)前變量,只有當(dāng)前線程可以訪問該變量,其他線程被阻塞住.
2)volatile僅能使用在變量級別,synchronized則可以使用在變量,方法.
3)volatile僅能實現(xiàn)變量的修改可見性,而synchronized則可以保證變量的修改可見性和原子性.
《Java編程思想》上說河爹,定義long或double變量時,如果使用volatile關(guān)鍵字桐款,就會獲得(簡單的賦值與返回操作)原子性
4)volatile不會造成線程的阻塞,而synchronized可能會造成線程的阻塞.
5)當(dāng)一個域的值依賴于它之前的值時咸这,volatile就無法工作了,如n=n+1,n++等魔眨。如果某個域的值受到其他域的值的限制媳维,那么volatile也無法工作,如Range類的lower和upper邊界遏暴,必須遵循lower<=upper的限制侄刽。
6)使用volatile而不是synchronized的唯一安全的情況是類中只有一個可變的域。
異步任務(wù)的執(zhí)行的結(jié)果朋凉,主線程是無法獲取
二州丹、Java中的FutureTask
FutureTask可用于異步獲取執(zhí)行結(jié)果或取消執(zhí)行任務(wù)的場景。通過傳入Runnable或者Callable的任務(wù)給FutureTask侥啤,直接調(diào)用其run方法或者放入線程池執(zhí)行当叭,之后可以在外部通過FutureTask的get方法異步獲取執(zhí)行結(jié)果,因此盖灸,F(xiàn)utureTask非常適合用于耗時的計算蚁鳖,主線程可以在完成自己的任務(wù)后,再去獲取結(jié)果赁炎。另外醉箕,F(xiàn)utureTask還可以確保即使調(diào)用了多次run方法,它都只會執(zhí)行一次Runnable或者Callable任務(wù)徙垫,或者通過cancel取消FutureTask的執(zhí)行等讥裤。
- FutureTask執(zhí)行多任務(wù)計算的使用場景
利用FutureTask和ExecutorService,可以用多線程的方式提交計算任務(wù)姻报,主線程繼續(xù)執(zhí)行其他任務(wù)己英,當(dāng)主線程需要子線程的計算結(jié)果時,在異步獲取子線程的執(zhí)行結(jié)果吴旋。
public class FutureTaskForMultiCompute {
public static void main(String[] args) {
FutureTaskForMultiCompute inst=new FutureTaskForMultiCompute();
// 創(chuàng)建任務(wù)集合
List<FutureTask<Integer>> taskList = new ArrayList<FutureTask<Integer>>();
// 創(chuàng)建線程池
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
// 傳入Callable對象創(chuàng)建FutureTask對象
FutureTask<Integer> ft = new FutureTask<Integer>(inst.new ComputeTask( ""+i));
taskList.add(ft);
// 提交給線程池執(zhí)行任務(wù)损肛,也可以通過exec.invokeAll(taskList)一次性提交所有任務(wù);
executor.submit(ft);
}
System.out.println("所有計算任務(wù)提交完畢, 主線程接著干其他事情!");
// 開始統(tǒng)計各計算線程計算結(jié)果
Integer totalResult = 0;
for (FutureTask<Integer> ft : taskList) {
try {
System.out.println("子線程返回值:"+ ft.get());
//FutureTask的get方法會自動阻塞,直到獲取計算結(jié)果為止
totalResult = totalResult + ft.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
// 關(guān)閉線程池
executor.shutdown();
System.out.println("-----------多任務(wù)計算后的總結(jié)果是:" + totalResult);
}
private class ComputeTask implements Callable<Integer> {
private int result = 0;
private String taskName = "";
public ComputeTask( String taskName){
this.taskName = taskName;
System.out.println("生成子線程計算任務(wù): "+taskName);
}
public String getTaskName(){
return this.taskName;
}
@Override
public Integer call() throws Exception {
for (int i = 0; i < 5; i++) {
result += i;
}
// 休眠5秒鐘荣瑟,觀察主線程行為治拿,預(yù)期的結(jié)果是主線程會繼續(xù)執(zhí)行,到要取得FutureTask的結(jié)果是等待直至完成笆焰。
Thread.sleep(5000);
System.out.println("該子線程名: "+Thread.currentThread().getName() );
System.out.println("子線程計算任務(wù): "+taskName+" 執(zhí)行完成!");
return result;
}
}
}
結(jié)果輸出:
生成子線程計算任務(wù): 0
生成子線程計算任務(wù): 1
生成子線程計算任務(wù): 2
生成子線程計算任務(wù): 3
生成子線程計算任務(wù): 4
生成子線程計算任務(wù): 5
生成子線程計算任務(wù): 6
生成子線程計算任務(wù): 7
生成子線程計算任務(wù): 8
生成子線程計算任務(wù): 9
所有計算任務(wù)提交完畢, 主線程接著干其他事情劫谅!
該子線程名: pool-1-thread-3
子線程計算任務(wù): 2 執(zhí)行完成!
該子線程名: pool-1-thread-1
子線程計算任務(wù): 0 執(zhí)行完成!
該子線程名: pool-1-thread-5
子線程計算任務(wù): 4 執(zhí)行完成!
該子線程名: pool-1-thread-2
子線程計算任務(wù): 1 執(zhí)行完成!
該子線程名: pool-1-thread-4
子線程計算任務(wù): 3 執(zhí)行完成!
子線程返回值:10
子線程返回值:10
子線程返回值:10
子線程返回值:10
子線程返回值:10
該子線程名: pool-1-thread-2
子線程計算任務(wù): 8 執(zhí)行完成!
該子線程名: pool-1-thread-5
子線程計算任務(wù): 7 執(zhí)行完成!
該子線程名: pool-1-thread-4
子線程計算任務(wù): 9 執(zhí)行完成!
該子線程名: pool-1-thread-3
子線程計算任務(wù): 5 執(zhí)行完成!
該子線程名: pool-1-thread-1
子線程計算任務(wù): 6 執(zhí)行完成!
子線程返回值:10
子線程返回值:10
子線程返回值:10
子線程返回值:10
子線程返回值:10
-----------多任務(wù)計算后的總結(jié)果是:100
可以看到,同一時刻能夠運行的線程數(shù)為5個嚷掠。也就是說當(dāng)我們啟動了10個任務(wù)時捏检,只有5個任務(wù)能夠立刻執(zhí)行,另外的5個任務(wù)則需要等待不皆,當(dāng)有一個任務(wù)執(zhí)行完畢后未檩,第6個任務(wù)才會啟動,以此類推
- FutureTask在高并發(fā)環(huán)境下確保任務(wù)只執(zhí)行一次
在很多高并發(fā)的環(huán)境下粟焊,往往我們只需要某些任務(wù)只執(zhí)行一次冤狡。這種使用情景FutureTask的特性恰能勝任。舉一個例子项棠,假設(shè)有一個帶key的連接池悲雳,當(dāng)key存在時,即直接返回key對應(yīng)的對象香追;當(dāng)key不存在時合瓢,則創(chuàng)建連接。對于這樣的應(yīng)用場景透典,通常采用的方法為使用一個Map對象來存儲key和連接池對應(yīng)的對應(yīng)關(guān)系晴楔,典型的代碼如下面所示:
private Map<String, Connection> connectionPool = new HashMap<String, Connection>();
private ReentrantLock lock = new ReentrantLock();
public Connection getConnection(String key){
try{
lock.lock();
if(connectionPool.containsKey(key)){
return connectionPool.get(key);
}
else{
//創(chuàng)建 Connection
Connection conn = createConnection();
connectionPool.put(key, conn);
return conn;
}
}
finally{
lock.unlock();
}
}
//創(chuàng)建Connection
private Connection createConnection(){
return null;
}
在上面的例子中顿苇,我們通過加鎖確保高并發(fā)環(huán)境下的線程安全,也確保了connection只創(chuàng)建一次税弃,然而確犧牲了性能纪岁。改用ConcurrentHash的情況下,幾乎可以避免加鎖的操作则果,性能大大提高幔翰,但是在高并發(fā)的情況下有可能出現(xiàn)Connection被創(chuàng)建多次的現(xiàn)象。這時最需要解決的問題就是當(dāng)key不存在時西壮,創(chuàng)建Connection的動作能放在connectionPool之后執(zhí)行遗增,這正是FutureTask發(fā)揮作用的時機,基于ConcurrentHashMap和FutureTask的改造代碼如下:
private ConcurrentHashMap<String,FutureTask<Connection>>connectionPool = new ConcurrentHashMap<String, FutureTask<Connection>>();
public Connection getConnection(String key) throws Exception{
FutureTask<Connection>connectionTask=connectionPool.get(key);
if(connectionTask!=null){
return connectionTask.get();
}
else{
Callable<Connection> callable = new Callable<Connection>(){
@Override
public Connection call() throws Exception {
// TODO Auto-generated method stub
return createConnection();
}
};
FutureTask<Connection>newTask = new FutureTask<Connection>(callable);
connectionTask = connectionPool.putIfAbsent(key, newTask);
if(connectionTask==null){
connectionTask = newTask;
connectionTask.run();
}
return connectionTask.get();
}
}
//創(chuàng)建Connection
private Connection createConnection(){
return null;
}
Java FutureTask 異步任務(wù)操作提供了便利性
1.獲取異步任務(wù)的返回值
2.監(jiān)聽異步任務(wù)的執(zhí)行完畢
3.取消異步任務(wù)
三款青、Android中的AsyncTask
AsyncTask源碼
public abstract class AsyncTask<Params, Progress, Result> {
private static final String LOG_TAG = "AsyncTask”;
//cpu核心數(shù)
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
//核心線程數(shù)的區(qū)間是[2,4]
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
//線程池最大容量
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
//當(dāng)一個線程空閑30秒后就會被取消
private static final int KEEP_ALIVE_SECONDS = 30;
//線程工廠 通過工廠方法newThread來創(chuàng)建新的線程
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
//原子整數(shù) 可以在高并發(fā)下正常工作
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
}
};
//靜態(tài)阻塞式隊列做修,用來存放待執(zhí)行的任務(wù),初始容量:128個
private static final BlockingQueue<Runnable> sPoolWorkQueue =
new LinkedBlockingQueue<Runnable>(128);
//靜態(tài)并發(fā)線程池抡草,可以用來并行執(zhí)行任務(wù)缓待,3.0開始,AsyncTask默認是串行執(zhí)行任務(wù)渠牲,我們可以構(gòu)造并行的AsyncTask
public static final Executor THREAD_POOL_EXECUTOR;
static {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
sPoolWorkQueue, sThreadFactory);
threadPoolExecutor.allowCoreThreadTimeOut(true);
THREAD_POOL_EXECUTOR = threadPoolExecutor;
}
//靜態(tài)串行的任務(wù)執(zhí)行器旋炒,內(nèi)部實現(xiàn)了線程控制,循環(huán)的一個個取出任務(wù)交給并發(fā)線程池去執(zhí)行
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
//消息類型 結(jié)果
private static final int MESSAGE_POST_RESULT = 0x1;
//消息類型 進度
private static final int MESSAGE_POST_PROGRESS = 0x2;
//默認的任務(wù)執(zhí)行器签杈,這里使用的是串行的任務(wù)執(zhí)行器瘫镇,所以AsyncTask是串行的
private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
//靜態(tài)的Handler,AsyncTask必須在UI線程中執(zhí)行是因為Handler用的是UI線程的Looper答姥,子線程沒有Looper
private static InternalHandler sHandler;
private final WorkerRunnable<Params, Result> mWorker;
private final FutureTask<Result> mFuture;
//任務(wù)狀態(tài) 默認為掛起 標識為易變的volatile
private volatile Status mStatus = Status.PENDING;
//原子布爾型 高并發(fā)支持 任務(wù)是否被取消
private final AtomicBoolean mCancelled = new AtomicBoolean();
//任務(wù)是否貝執(zhí)行過
private final AtomicBoolean mTaskInvoked = new AtomicBoolean();
//串行的任務(wù)執(zhí)行器铣除,當(dāng)asyncstask執(zhí)行的時候會加入到任務(wù)隊列中一個個執(zhí)行
private static class SerialExecutor implements Executor {
//線性的雙向隊列 用來存儲所有的AsyncTask任務(wù)
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
//當(dāng)前正在執(zhí)行的任務(wù)
Runnable mActive;
//將新的任務(wù)加入到雙向隊列中
public synchronized void execute(final Runnable r) {
mTasks.offer(new Runnable() {
public void run() {
try {
//執(zhí)行任務(wù)
r.run();
} finally {
//如果還有任務(wù),則上一個任務(wù)執(zhí)行完畢后執(zhí)行下一個任務(wù)
scheduleNext();
}
}
});
//當(dāng)前任務(wù)為空 則進入下一個任務(wù)
if (mActive == null) {
scheduleNext();
}
}
//從任務(wù)棧的頭部取出任務(wù)鹦付,交給并發(fā)線程池執(zhí)行任務(wù)
protected synchronized void scheduleNext() {
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
//任務(wù)的狀態(tài) 等待執(zhí)行尚粘,正在執(zhí)行,執(zhí)行完成
public enum Status {
PENDING,
RUNNING,
FINISHED,
}
//同步鎖 初始化Handler
private static Handler getHandler() {
synchronized (AsyncTask.class) {
if (sHandler == null) {
sHandler = new InternalHandler();
}
return sHandler;
}
}
/** @hide */
//隱藏的類 設(shè)置默認線程執(zhí)行器
public static void setDefaultExecutor(Executor exec) {
sDefaultExecutor = exec;
}
//AsyncTask的構(gòu)造函數(shù)
public AsyncTask() {
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
//...
//result = doInBackground(mParams);
//...
return result;
}
};
mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
//...
}
};
}
private void postResultIfNotInvoked(Result result) {
final boolean wasTaskInvoked = mTaskInvoked.get();
if (!wasTaskInvoked) {
postResult(result);
}
}
//執(zhí)行完畢發(fā)送消息
private Result postResult(Result result) {
@SuppressWarnings("unchecked")
Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
new AsyncTaskResult<Result>(this, result));
message.sendToTarget();
return result;
}
//返回當(dāng)前任務(wù)狀態(tài)
public final Status getStatus() {
return mStatus;
}
//抽象類 在子線程中執(zhí)行
@WorkerThread
protected abstract Result doInBackground(Params... params);
//在Execute之前執(zhí)行
@MainThread
protected void onPreExecute() {
}
//任務(wù)完畢 返回結(jié)果
@MainThread
protected void onPostExecute(Result result) {
}
//更新任務(wù)進度
@MainThread
protected void onProgressUpdate(Progress... values) {
}
//Cancel被調(diào)用并且doInBackground執(zhí)行完畢敲长,onCancelled被調(diào)用郎嫁,表示任務(wù)取消,onPostExecute不會被調(diào)用
@MainThread
protected void onCancelled(Result result) {
onCancelled();
}
@MainThread
protected void onCancelled() {
}
public final boolean isCancelled() {
return mCancelled.get();
}
//取消正在執(zhí)行的任務(wù)
public final boolean cancel(boolean mayInterruptIfRunning) {
mCancelled.set(true);
return mFuture.cancel(mayInterruptIfRunning);
}
public final Result get() throws InterruptedException, ExecutionException {
return mFuture.get();
}
//sDefaultExecutor默認串行執(zhí)行器 如果我們要改成并發(fā)的執(zhí)行方式直接使用executeOnExecutor這個方法
@MainThread
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
return executeOnExecutor(sDefaultExecutor, params);
}
//可以指定執(zhí)行器
@MainThread
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
Params... params) {
if (mStatus != Status.PENDING) {
switch (mStatus) {
case RUNNING:
throw new IllegalStateException("Cannot execute task:"
+ " the task is already running.");
case FINISHED:
throw new IllegalStateException("Cannot execute task:"
+ " the task has already been executed "
+ "(a task can be executed only once)");
}
}
mStatus = Status.RUNNING;
onPreExecute();
mWorker.mParams = params;
exec.execute(mFuture);
return this;
}
//更新任務(wù)進度 onProgressUpdate會被調(diào)用
@WorkerThread
protected final void publishProgress(Progress... values) {
if (!isCancelled()) {
getHandler().obtainMessage(MESSAGE_POST_PROGRESS,
new AsyncTaskResult<Progress>(this, values)).sendToTarget();
}
}
//任務(wù)執(zhí)行完畢 如果沒有被取消執(zhí)行onPostExecute()方法
private void finish(Result result) {
if (isCancelled()) {
onCancelled(result);
} else {
onPostExecute(result);
}
mStatus = Status.FINISHED;
}
//AsyncTask內(nèi)部Handler
private static class InternalHandler extends Handler {
public InternalHandler() {
super(Looper.getMainLooper());
}
@SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
@Override
public void handleMessage(Message msg) {
AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
switch (msg.what) {
case MESSAGE_POST_RESULT:
// There is only one result
result.mTask.finish(result.mData[0]);
break;
case MESSAGE_POST_PROGRESS:
result.mTask.onProgressUpdate(result.mData);
break;
}
}
}
private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> {
Params[] mParams;
}
@SuppressWarnings({"RawUseOfParameterizedType"})
private static class AsyncTaskResult<Data> {
final AsyncTask mTask;
final Data[] mData;
AsyncTaskResult(AsyncTask task, Data... data) {
mTask = task;
mData = data;
}
}
}
關(guān)鍵源碼:
private static Handler getHandler() {
synchronized (AsyncTask.class) {
if (sHandler == null) {
sHandler = new InternalHandler();
}
return sHandler;
}
}
/** @hide */
public static void setDefaultExecutor(Executor exec) {
sDefaultExecutor = exec;
}
/**
* Creates a new asynchronous task. This constructor must be invoked on the UI thread.
*/
public AsyncTask() {
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);
Result result = null;
try {
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//noinspection unchecked
result = doInBackground(mParams);
Binder.flushPendingCommands();
} catch (Throwable tr) {
mCancelled.set(true);
throw tr;
} finally {
postResult(result);
}
return result;
}
};
mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
try {
postResultIfNotInvoked(get());
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occurred while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
}
AsyncTask源碼我們發(fā)現(xiàn)祈噪,它其實是內(nèi)部封裝了Thead泽铛、FutureTask和Handler。
問題一:線程池容量不夠拋出異常
public class AsyncTaskTest {
public static void main(String[] args) {
int CPU_COUNT = Runtime.getRuntime().availableProcessors(); //可用的CPU個數(shù)
int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));//核心線程數(shù)
int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;//9 最大線程數(shù)量
int KEEP_ALIVE_SECONDS = 1;//閑置回收時間
final BlockingDeque<Runnable> sPoolWorkQueue = new LinkedBlockingDeque<Runnable>(128);//異步任務(wù)隊列
// sThreadFactory:線程工廠
final ThreadFactory sThreadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread newThread(Runnable r) {
String name = "Thread #" + mCount.getAndIncrement();
System.out.println(name);
return new Thread(r, name);
}
};
//線程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
sPoolWorkQueue, sThreadFactory);
//執(zhí)行異步任務(wù)
for(int i =0;i < 200;i++){
//相當(dāng)于new AsyncTask().execute();
threadPoolExecutor.execute(new MyTask());
}
}
static class MyTask implements Runnable{
@Override
public void run() {
while(true){
try {
System.out.println(Thread.currentThread().getName());
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
結(jié)果輸出:
Thread #1
Thread #2
Thread #3
Thread #2
Thread #1
Thread #3
Thread #4
Thread #5
Thread #6
Thread #4
Thread #5
Thread #7
Thread #6
Thread #8
Thread #7
Thread #9
Thread #8
Thread #9
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.haocai.app.multithread.test.AsyncTaskTest$MyTask@6d6f6e28 rejected from java.util.concurrent.ThreadPoolExecutor@135fbaa4[Running, pool size = 9, active threads = 9, queued tasks = 128, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at com.haocai.app.multithread.test.AsyncTaskTest.main(AsyncTaskTest.java:45)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Thread #4
Thread #1
......省略......
我們發(fā)現(xiàn)會出現(xiàn)異常java.util.concurrent.RejectedExecutionException
如果當(dāng)前線程池中的數(shù)量小于corePoolSize辑鲤,創(chuàng)建并添加的任務(wù)盔腔。
如果當(dāng)前線程池中的數(shù)量等于corePoolSize,緩沖隊列 workQueue未滿,那么任務(wù)被放入緩沖隊列弛随、等待任務(wù)調(diào)度執(zhí)行瓢喉。
如果當(dāng)前線程池中的數(shù)量大于corePoolSize,緩沖隊列workQueue已滿舀透,并且線程池中的數(shù)量小于maximumPoolSize栓票,新提交任務(wù)會創(chuàng)建新線程執(zhí)行任務(wù)。
如果當(dāng)前線程池中的數(shù)量大于corePoolSize盐杂,緩沖隊列workQueue已滿,并且線程池中的數(shù)量等于maximumPoolSize哆窿,新提交任務(wù)由Handler處理链烈。
當(dāng)線程池中的線程大于corePoolSize時,多余線程空閑時間超過keepAliveTime時挚躯,會關(guān)閉這部分線程强衡。
解決:線程池擴容
//自定義線程池
Executor executor = Executors.newScheduledThreadPool(25);//指定核心線程池數(shù)量
問題二:線程阻塞
AsyncTask里面維護著兩個線程池,THREAD_POOL_EXECUTOR和SERIAL_EXECUTOR码荔,其中SERIAL_EXECUTOR是默認的線程池
再來看api22 SerialExecutor 的源碼
private static class SerialExecutor implements Executor {
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
Runnable mActive;
public synchronized void execute(final Runnable r) {
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (mActive == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
通過上面的源碼可以發(fā)現(xiàn)漩勤,每次執(zhí)行完一個任務(wù)后,才會調(diào)用scheduleNext往線程池里面添加任務(wù)缩搅,所以即使線程池是并行的越败,但是我添加任務(wù)的時候是串行的,所以api22中的AsyncTask是串行的硼瓣,那么線程池其實再多的線程也沒用了究飞,反正每次都只有一個任務(wù)在里面疫赎。
而且由于SERIAL_EXECUTOR被聲明為static军洼,所以,同一個進程里的AsyncTask都會共享這個線程池淤年,這就意味著瘟栖,在同一個進程里葵擎,前面的線程不結(jié)束,后面的線程就會被掛起半哟。
解決:
所以酬滤,使用AsyncTask執(zhí)行任務(wù)的時候,請使用AsyncTask.executeOnExecutor(THREAD_POOL_EXECUTOR)來讓你的任務(wù)跑在并行的線程池上寓涨,避免出現(xiàn)并前面線程阻塞的情況敏晤。當(dāng)然,如果你的CPU核心數(shù)夠多缅茉,2到4個線程的并行度不滿足的話嘴脾,也可以自定義一個線程池來執(zhí)行AsyncTask,不過這樣的話,要注意自己維護這個線程池的初始化译打,釋放等等操作了耗拓。
new AsyncTask<Void, Void, Void>(){
@Override
protected Void doInBackground(Void... params) {
return null;
}
}.executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR);
注意:如果你確定自己做好了同步處理,或者你沒有在不同的AsyncTask里面訪問共享資源奏司,需要AsyncTask能夠并行處理任務(wù)的話乔询,你可以用帶有兩個參數(shù)的executeOnExecutor執(zhí)行任務(wù)
Android AsyncTask版本問題
1.5剛開始引入AsyncTask的時候,execute方法確實是串行執(zhí)行的韵洋,類定義里面只有SERIAL_EXECUTOR線程池竿刁;
1.6版本時,改用并行線程池THREAD_POOL_EXECUTOR搪缨,
3.0版本至今食拜,就成了上面說的模樣————定義兩個線程池,但是默認用串行池副编。
問題三:內(nèi)存泄露
public class MainActivity extends AppCompatActivity {
private MyTask task;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
//使用默認線程池
task = new MyTask();
task.execute();
}
class MyTask extends AsyncTask<Void, Integer, Void> {
int i;
@Override
protected Void doInBackground(Void... params) {
Log.d("main", String.valueOf(i++));
SystemClock.sleep(1000);
return null;
}
}
}
當(dāng)Activity finish() 之后负甸,觀察到MyTask 還在執(zhí)行,這樣會造成內(nèi)存泄漏
解決:
public class MainActivity extends AppCompatActivity {
private MyTask task;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
//使用默認線程池
task= new MyTask();
task.execute();
}
@Override
protected void onDestroy() {
super.onDestroy();
task.cancel(true);
}
class MyTask extends AsyncTask<Void, Integer, Void> {
int i;
@Override
protected Void doInBackground(Void... params) {
while(!isCancelled()){
Log.d("main", String.valueOf(i++));
SystemClock.sleep(1000);
}
return null;
}
}
}