1.前言
作為一名Android開發(fā)镣陕,工作中少不了使用線程津坑。最近因?yàn)樵趦?yōu)化廣告請求結(jié)構(gòu)绽媒,重新查看一遍線程相關(guān)的知識,在此做一個總結(jié)柱宦。
2.線程使用
在Java中開啟子線程些椒,執(zhí)行異步任務(wù)最簡單的做法是:
new Thread(new Runnable() {
@Override
public void run() {
// TODO 相關(guān)業(yè)務(wù)處理
}
}).start();
每當(dāng)需要一個線程的時候就new
一個線程出來操作相關(guān)業(yè)務(wù),但這樣做的話會存在一些問題:
- 每次new Thread新建對象性能差掸刊。
- 線程缺乏統(tǒng)一管理,可能無限制新建線程赢乓,相互之間競爭忧侧,及可能占用過多系統(tǒng)資源導(dǎo)致死機(jī)或oom。
- 缺乏更多功能牌芋,如定時執(zhí)行蚓炬、定期執(zhí)行、線程中斷躺屁。
作為一個有想法的開發(fā)肯夏,是應(yīng)該拒絕該方式使用線程的,此時線程池的概念就應(yīng)運(yùn)而來犀暑。
3. 線程池 Executors
線程池的定義:一種線程使用模式驯击。線程過多會帶來調(diào)度開銷,進(jìn)而影響緩存局部性和整體性能耐亏。而線程池維護(hù)著多個線程徊都,等待著監(jiān)督管理者分配可并發(fā)執(zhí)行的任務(wù)。
在Java中系統(tǒng)會提供一下自帶的幾種線程池:
- newFixedThreadPool:一個可以無限擴(kuò)大的線程池
- newCachedThreadPool:固定大小的線程池广辰;corePoolSize和maximunPoolSize都為用戶設(shè)定的線程數(shù)量nThreads暇矫;keepAliveTime為0,
- newScheduledThreadPool:一個主要接受定時任務(wù)的線程池择吊,有兩種提交任務(wù)的方式:scheduledAtFixedRate和scheduledWithFixedDelaySchduled
- newSingleThreadExecutor:一個只會創(chuàng)建一條工作線程處理任務(wù)線程池
查看Executors
源碼可以看到有如下幾種方法:
分別對應(yīng)上述幾種線程池李根。
關(guān)于線程池的設(shè)計思想其根本是
生產(chǎn)消費(fèi)者
,線程池會維護(hù)兩個隊列
- 各個線程的集合
- 各個任務(wù)的集合
前者負(fù)責(zé)生產(chǎn)維護(hù)線程几睛,后者則消費(fèi)線程房轿。
通過源碼跟蹤,上述幾種創(chuàng)建線程最終都會調(diào)用ThreadPoolExecutor
的構(gòu)造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
}
- int corePoolSize:核心線程池數(shù)量枉长,線程池創(chuàng)建后保持的數(shù)量
- int maximumPoolSize:線程池最大線程數(shù)量冀续,大于這個數(shù)則只需Rejected策略
- long keepAliveTime:空閑線程存活的最大時間,超過這個時間則回收
- TimeUnit unit: 空閑線程存活的最大時間單位設(shè)置
- BlockingQueue<Runnable> workQueue:core線程數(shù)滿后存放任務(wù)需要的阻塞隊列
- ThreadFactory threadFactory: 線程創(chuàng)建工廠必峰,可以指定自定義的線程池名稱等
- RejectedExecutionHandler handler:自定義拒絕策略洪唐,如果任務(wù)數(shù)量大于maximumPoolSize則執(zhí)行此策略。
corePoolSize 和maximumPoolSize 以及workQueue 的長度會有以下幾種關(guān)系: - 如果正在運(yùn)行的線程數(shù)量小于 corePoolSize吼蚁,那么馬上創(chuàng)建線程運(yùn)行這個任務(wù)凭需;
- 如果正在運(yùn)行的線程數(shù)量大于或等于 corePoolSize问欠,那么將這個任務(wù)放入隊列。
- 如果這時候隊列滿了粒蜈,而且正在運(yùn)行的線程數(shù)量小于 maximumPoolSize顺献,那么還是要創(chuàng)建線程運(yùn)行這個任務(wù);
- 如果隊列滿了枯怖,而且正在運(yùn)行的線程數(shù)量大于或等于 maximumPoolSize注整,那么線程池執(zhí)行拒絕策略
3.1 newFixedThreadPool
- 無界任務(wù)隊列,有界的線程隊列
查看源碼可以看到該線程的策略
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
只需要設(shè)置參數(shù)nThreads
度硝,參數(shù)workQueue
是一個空間無屆的隊列肿轨,基本可以認(rèn)為keepAliveTime
,maximumPoolSize
之類的參數(shù)無效蕊程,因?yàn)榇藭r核心線程是滿負(fù)荷運(yùn)行椒袍,沒有線程會處于空閑狀態(tài)。使用方式如下:
public class NewFixedThreadPoolExample {
public static void main(String[] args) {
int coreSize = Runtime.getRuntime().availableProcessors();
System.out.println("coreSize is :"+coreSize);
ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);
for (int i =0;i<10;i++){
int index =i;
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println("currentTime is :"+System.currentTimeMillis()+"---index is :"+index);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}
3.2 newCachedThreadPool
查看源碼可以看到該線程的策略
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
corePoolSize為0藻茂,maximumPoolSize為無限大驹暑,意味著線程數(shù)量可以無限大,采用SynchronousQueue裝等待的任務(wù)辨赐,這個阻塞隊列沒有存儲空間优俘,這意味著只要有請求到來, 就必須要找到一條工作線程處理他肖油,如果當(dāng)前沒有空閑的線程兼吓,那么就會再創(chuàng)建一條新的線程。
- 無界任務(wù)隊列森枪,無界線程隊列视搏。
使用方式如下:
public class NewCachedThreadPoolExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i =0;i<10;i++){
int index = i;
try {
Thread.sleep(1000*index);
} catch (InterruptedException e) {
e.printStackTrace();
}
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println("currentTime is :"+System.currentTimeMillis()+"---index is :"+index);
}
});
}
}
}
3.3 newScheduledThreadPool
其源碼策略如下:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
- 無界任務(wù)隊列
該類型的線程池存在兩種調(diào)用:
scheduleAtFixedRate(Runnable command,long initialDelay, long period,TimeUnit unit);
scheduleWithFixedDelay(Runnable command,long initialDelay, long delay, TimeUnit unit);
調(diào)用如下:
public class NewScheduledThreadPoolExample {
public static void main(String[] args) {
int coreSize = Runtime.getRuntime().availableProcessors();
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
for (int i= 0 ;i<10;i++){
int index = i;
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println("currentTime is :"+System.currentTimeMillis()+"---index is :"+index);
}
},1,3, TimeUnit.SECONDS);
// executorService.scheduleWithFixedDelay()
}
}
}
3.4 newSingleThreadExecutor
- 無界任務(wù)隊列,有界線程隊列
- corePoolSize :1 線程隊列只有一個县袱,和單例模式相差無幾浑娜。它只會創(chuàng)建一條工作線程處理任務(wù);采用的阻塞隊列為LinkedBlockingQueue式散。
使用方式如下:
public class NewSingleThreadExecutorExample {
public static void main(String[] args) {
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
final int index = i;
singleThreadExecutor.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println("currentTime is :" + System.currentTimeMillis() + "---index is :" + index);
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}
}
}
以上是系統(tǒng)自帶的線程池創(chuàng)建方式筋遭,其根本設(shè)計思想生產(chǎn)者--消費(fèi)者
,其中任務(wù)隊列和線程隊列的長度如果采用無界的話暴拄,就會存在OOM
的風(fēng)險漓滔。阿里巴巴Java開發(fā)手冊中強(qiáng)制不允許
使用Executors創(chuàng)建線程池。
鑒于上述原因乖篷,我們在某些場景下需要自定義線程池响驴。
4.自定義線程池
- ThreadPoolExecutor 自定義線程池
public class CustomThreadPool {
//以下相關(guān)參數(shù)可以考慮根據(jù)不同設(shè)備不同環(huán)境計算得到
private static final int CORE_POOL_SIZE = 1;
private static final int MAX_POOL_SIZE = 1;
private static final int KEEP_ALIVE_TIME = 30;
private static final int CAPACITY = 2;
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(CAPACITY), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
//線程創(chuàng)建的次數(shù)同核心線程數(shù),最大線程數(shù)撕蔼,任務(wù)數(shù)有關(guān)系
Thread thread = new Thread(r);
System.out.println("newThread==="+thread.getName());
return thread;
}
}, new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//線程拒絕之后的策略,可以延時將該任務(wù)再次加入到隊列中
System.out.println("rejectedExecution===="+((MyRunnable)r).Name);
}
});
MyRunnable t1 = new MyRunnable("T1");
MyRunnable t2 = new MyRunnable("T2");
MyRunnable t3 = new MyRunnable("T3");
MyRunnable t4 = new MyRunnable("T4");
MyRunnable t5 = new MyRunnable("T5");
MyRunnable t6 = new MyRunnable("T6");
MyRunnable t7 = new MyRunnable("T7");
// 將線程放入池中進(jìn)行執(zhí)行
threadPoolExecutor.execute(t1);
threadPoolExecutor.execute(t2);
threadPoolExecutor.execute(t3);
threadPoolExecutor.execute(t4);
threadPoolExecutor.execute(t5);
threadPoolExecutor.execute(t6);
threadPoolExecutor.execute(t7);
}
static class MyRunnable implements Runnable {
public MyRunnable(String name) {
Name = name;
}
public String Name ;
@Override
public void run() {
System.out.println(Name + "正在執(zhí)行豁鲤。秽誊。。");
}
}
}
拒絕了3個任務(wù)琳骡,因?yàn)?code>run方法耗時不同锅论,核心線程執(zhí)行效率不同所致,運(yùn)行結(jié)果回不一致楣号。
4.2 自定義ThreadPoolExecutor
1.自定義ThreadPoolExecutor 最易,可以自定義核心線程數(shù)等相關(guān)參數(shù)。
public class MyExecutorPools extends ThreadPoolExecutor {
public MyExecutorPools(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
/**
* 線程執(zhí)行后進(jìn)行調(diào)用
*/
protected void afterExecute(Runnable r, Throwable t) {
MyRunnable myr = (MyRunnable) r;
System.out.println(myr.getMyname() + "..執(zhí)行完畢");
}
/**
* 重寫執(zhí)行方法,線程池執(zhí)行前進(jìn)行調(diào)用
*/
protected void beforeExecute(Thread t, Runnable r) {
MyRunnable myr = (MyRunnable) r;
System.out.println(myr.getMyname() + "..準(zhǔn)備執(zhí)行");
}
}
- ThreadFactory
public class MyThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread td = new Thread(r);
td.setName("myfactory-" + td.getId());
td.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
//自定義異常捕獲機(jī)制..
System.out.println("thread execute error");
}
});
return td;
}
}
- Runnable
public class MyRunnable implements Runnable {
public MyRunnable(String name) {
this.myname = name;
}
private String myname;
public String getMyname() {
return myname;
}
public void setMyname(String myname) {
this.myname = myname;
}
@Override
public void run() {
System.out.println("myname is :" + myname);
try {
Thread.sleep(5 * 100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- 拒絕策略 實(shí)現(xiàn)
/**
* 拒絕策略
*/
public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
MyRunnable my = (MyRunnable)r;
System.out.println("拒絕執(zhí)行...:"+my.getMyname());
}
}
- 測試
public class MyExecuterPoolsTest {
public static void main(String[] args) {
MyExecutorPools ex = new MyExecutorPools(3, 5, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3),
new MyThreadFactory(), new RejectedExecutionHandlerImpl());
for (int i = 0; i < 20; i++) {
ex.execute(new MyRunnable("n" + i));
}
}
}
以上是兩種自定義線程池的方式炫狱,根據(jù)不同場景選擇使用耘纱,但是究其根本還是維護(hù)兩個隊列
- 線程隊列
- 任務(wù)對列
5 拒絕策略
上述是自定義一個拒絕策略,當(dāng)該任務(wù)被拒絕之后毕荐,多久再次加入到任務(wù)隊列中。其中Java 自帶幾種策略:
- AbortPolicy 當(dāng)任務(wù)添加到線程池中被拒絕時艳馒,它將拋出 RejectedExecutionException 異常憎亚。
- CallerRunsPolicy 當(dāng)任務(wù)添加到線程池中被拒絕時,會在線程池當(dāng)前正在運(yùn)行的Thread線程池中處理被拒絕的任務(wù)弄慰。
- DiscardOldestPolicy 當(dāng)任務(wù)添加到線程池中被拒絕時第美,線程池會放棄等待隊列中最舊的未處理任務(wù),然后將被拒絕的任務(wù)添加到等待隊列中陆爽。
- DiscardPolicy 當(dāng)任務(wù)添加到線程池中被拒絕時什往,線程池將丟棄被拒絕的任務(wù)。
關(guān)于不同策略的使用可以自行搜索慌闭。
上面是Java相關(guān)的線程池的使用和分析别威,后面結(jié)合項目中廣告任務(wù)相關(guān)業(yè)務(wù),采用自定義線程池的方式進(jìn)行改造驴剔。