45专缠、線程池ThreadPoolExecutor
45.1、創(chuàng)建線程池
??Java通過(guò)Executors提供四個(gè)靜態(tài)方法創(chuàng)建四種線程池乃秀,分別為:
??newCachedThreadPool創(chuàng)建一個(gè)可緩存線程池肛著,如果線程池長(zhǎng)度超過(guò)處理需要,可靈活回收空閑線程跺讯,若無(wú)可回收枢贿,則新建線程。
??newFixedThreadPool 創(chuàng)建一個(gè)定長(zhǎng)線程池刀脏,可控制線程最大并發(fā)數(shù)局荚,超出的線程會(huì)在隊(duì)列中等待。
??newScheduledThreadPool 創(chuàng)建一個(gè)定長(zhǎng)線程池愈污,支持定時(shí)及周期性任務(wù)執(zhí)行耀态。
??newSingleThreadExecutor 創(chuàng)建一個(gè)單線程化的線程池,它只會(huì)用唯一的工作線程來(lái)執(zhí)行任務(wù)暂雹,保證所有任務(wù)按照指定順序(FIFO首装, LIFO,優(yōu)先級(jí))執(zhí)行杭跪。
45.2仙逻、關(guān)鍵參數(shù)解析
??corepoolsize:核心池的大小,默認(rèn)情況下涧尿,在創(chuàng)建了線程池之后系奉,線程池中線程數(shù)為 0,當(dāng)有任務(wù)來(lái)之后姑廉,就會(huì)創(chuàng)建一個(gè)線程去執(zhí)行任務(wù)缺亮,當(dāng)線程池中線程數(shù)達(dá)到 corepoolsize 后,就把任務(wù)放在任務(wù)緩存隊(duì)列中桥言。
??Maximumpoolsize:線程池中最多創(chuàng)建多少個(gè)線程萌踱。
??Keeplivetime:線程沒(méi)有任務(wù)執(zhí)行時(shí),最多保存多久的時(shí)間會(huì)終止号阿,默認(rèn)情況下虫蝶,當(dāng)線程池中線程數(shù) > corepoolsize時(shí),Keeplivetime才起作用倦西,直到線程數(shù)不大于 corepoolsize能真。
??workQueue:阻塞隊(duì)列,用來(lái)存放等待被執(zhí)行的任務(wù)。
??threadFactory:線程工廠粉铐,用來(lái)創(chuàng)建線程疼约。
45.3、線程池的狀態(tài)
??1.當(dāng)線程池創(chuàng)建后蝙泼,初始為 running 狀態(tài)程剥;
??2.調(diào)用 shutdown 方法后,處 shutdown 狀態(tài)汤踏,此時(shí)不再接受新的任務(wù)织鲸,等待已有的任務(wù)執(zhí)行完畢;
??3.調(diào)用 shutdownnow 方法后溪胶,進(jìn)入 stop 狀態(tài)搂擦,不再接受新的任務(wù),并且會(huì)嘗試終止正在執(zhí)行的任務(wù)哗脖。
??4.當(dāng)處于 shotdown 或 stop 狀態(tài)瀑踢,并且所有工作線程已經(jīng)銷毀,任務(wù)緩存隊(duì)列已清空才避,線程池被設(shè)為 terminated 狀態(tài)橱夭。
45.4、當(dāng)有任務(wù)提交到線程池之后的一些操作:
??1.若當(dāng)前線程池中線程數(shù) < corepoolsize桑逝,則每來(lái)一個(gè)任務(wù)就創(chuàng)建一個(gè)線程去執(zhí)行棘劣。
??2.若當(dāng)前線程池中線程數(shù) >= corepoolsize,會(huì)嘗試將任務(wù)添加到任務(wù)緩存隊(duì)列中去楞遏,若添加成功茬暇,則任務(wù)會(huì)等待空閑線程將其取出執(zhí)行,若添加失敗橱健,則嘗試創(chuàng)建線程去執(zhí)行這個(gè)任務(wù)。
??3.若當(dāng)前線程池中線程數(shù) >= Maximumpoolsize沙廉,則采取拒絕策略拘荡。
??1)、abortpolicy:丟棄任務(wù)撬陵,拋出 RejectedExecutionException珊皿;
??2)、discardpolicy:拒絕執(zhí)行巨税,不拋異常蟋定;
??3)、discardoldestpolicy:丟棄任務(wù)緩存隊(duì)列中最老的任務(wù)草添,并且嘗試重新提交新的任務(wù)驶兜;
??4)、callerrunspolicy:由調(diào)用線程處理該任務(wù),有反饋機(jī)制抄淑,使任務(wù)提交的速度變慢屠凶。
46、生產(chǎn)者消費(fèi)者模式實(shí)現(xiàn)
1肆资、阻塞隊(duì)列BlockingQueue實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模式
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;
/**
* @Description: 阻塞隊(duì)列實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模式
*/
public class ProducerConsumerPattern {
public static void main(String[] args) {
BlockingQueue<String> sharedQueue = new ArrayBlockingQueue<String>(2);
// ArrayBlockingQueue:需要設(shè)置隊(duì)列大小矗愧,LinkedBlockingQueue不設(shè)置的話默認(rèn)大小為Integer.MAX_VALUE
// BlockingQueue<String> sharedQueue2 = new LinkedBlockingQueue<String>(2);
Producer producer = new Producer(sharedQueue);
Consumer consumer = new Consumer(sharedQueue);
for(int i = 0; i < 5; i++) {
new Thread(producer, "Producer" + (i + 1)).start();
new Thread(consumer, "Consumer" + (i + 1)).start();
}
}
}
// 生產(chǎn)者
class Producer implements Runnable {
private BlockingQueue<String> sharedQueue;
public Producer(BlockingQueue<String> sharedQueue) {
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
try {
String prod = "產(chǎn)品:" + Thread.currentThread().getName();
// 如果隊(duì)列是滿的話,會(huì)阻塞當(dāng)前線程
sharedQueue.put(prod);
System.out.println("我是生產(chǎn)線程郑原,生產(chǎn)了一個(gè)產(chǎn)品:" + prod);
} catch (Exception e) {
System.out.println(e);
}
}
}
// 消費(fèi)者
class Consumer implements Runnable {
private BlockingQueue<String> sharedQueue;
public Consumer(BlockingQueue<String> sharedQueue) {
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
try {
// 如果隊(duì)列為空唉韭,會(huì)阻塞當(dāng)前線程
String prod = sharedQueue.take();
System.out.println("我是消費(fèi)者,消費(fèi)產(chǎn)品: " + prod);
} catch (Exception e) {
System.out.println(e);
}
}
}
2犯犁、使用Condition實(shí)現(xiàn)
import java.util.PriorityQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @Description: Condition實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模式
*/
public class ProducerConsumerPattern2 {
private PriorityQueue<String> queue = new PriorityQueue<String>(3);
private Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
public static void main(String[] args) {
ProducerConsumerPattern2 test = new ProducerConsumerPattern2();
Producer producer = test.new Producer();
Consumer consumer = test.new Consumer();
for(int i = 0; i < 5; i++) {
new Thread(producer, "Producer" + (i + 1)).start();
new Thread(consumer, "Consumer" + (i + 1)).start();
}
}
class Producer implements Runnable {
@Override
public void run() {
lock.lock();
try {
while(queue.size() == 3) {
try {
notFull.await();
} catch (Exception e) {
e.printStackTrace();
}
}
// 每次插入一個(gè)元素
String prod = "" + Thread.currentThread().getName();
queue.offer(prod);
System.out.println("我是生產(chǎn)線程属愤,生產(chǎn)了一個(gè)產(chǎn)品:" + prod);
// 通知隊(duì)列不空
notEmpty.signal();
} finally {
lock.unlock();
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
lock.lock();
try {
while(queue.size() == 0) {
try {
notEmpty.await();
} catch (Exception e) {
e.printStackTrace();
}
}
String prod = queue.poll();
System.out.println("我是消費(fèi)者,消費(fèi)產(chǎn)品: " + prod);
// 通知隊(duì)列未滿
notFull.signal();
} finally {
lock.unlock();
}
}
}
}