前言
Java
多線程也是面試中經(jīng)常會提起到的一個點卧秘。面試官會問:實現(xiàn)多線程的兩種方式以及區(qū)別,死鎖發(fā)生的4
個條件以及如何避免發(fā)生死鎖官扣,死鎖和活鎖的區(qū)別翅敌,常見的線程池以及區(qū)別,怎么理解有界隊列與無界隊列惕蹄,多線程生產(chǎn)者消費者模型蚯涮,怎么設(shè)計一個線程池治专,線程池的大致實現(xiàn),ReetrantLock
和Synchronized
和ReadWriteLock
的源碼和區(qū)別遭顶、具體業(yè)務場景分析等等张峰。
生產(chǎn)者消費者模型
其實生產(chǎn)者消費者模型挺像觀察者模式的,對于該模型我們應該明確以下4點:
- 當生產(chǎn)者生產(chǎn)出產(chǎn)品時棒旗,應該通知消費者去消費喘批。
- 當消費者消費完產(chǎn)品,應該通知生產(chǎn)者去生產(chǎn)铣揉。
- 當產(chǎn)品的庫存滿了的時候饶深,生產(chǎn)者不應該再去生產(chǎn),而是通知消費者去消費逛拱。
- 當產(chǎn)品的庫存為0的時候敌厘,消費者不應該去消費,而是通知生產(chǎn)者去生產(chǎn)橘券。
wait()和notify()實現(xiàn)
- 定義生產(chǎn)者额湘。
public class Producter implements Runnable {
private Storage resource;
public Producter(Storage resource) {
super();
this.resource = resource;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
resource.produce();
}
}
}
- 定義消費者。
public class Consumer implements Runnable {
private Storage resource;
public Consumer(Storage resource) {
super();
this.resource = resource;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
resource.cosume();
}
}
}
- 定義
Storage
倉庫旁舰。
public class Storage {
private int MAX_SIZE = 20;
private int count = 0;
public synchronized void cosume() {
while (count == 0) {
try {
System.out.println("【消費者】庫存已經(jīng)為空了锋华,暫時不能進行消費任務!");
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
count--;
System.out.println("【消費者】" + Thread.currentThread().getName() + "消費產(chǎn)品, 庫存:" + count);
this.notify();
}
public synchronized void produce() {
while (count >= MAX_SIZE) {
try {
System.out.println("【生產(chǎn)者】庫存已經(jīng)滿了,暫時不能進行生產(chǎn)任務箭窜!");
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
count++;
System.out.println("【生產(chǎn)者】" + Thread.currentThread().getName() + "生產(chǎn)產(chǎn)品, 庫存" + count);
this.notify();
}
}
- 測試demo
public class ProducterCosumerXDemo {
public static void main(String[] args) {
Storage storage = new Storage();
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
service.submit(new Producter(storage));
}
for (int i = 0; i < 5; i++) {
service.submit(new Consumer(storage));
}
service.shutdown();
}
}
我們這里創(chuàng)建了5
個生產(chǎn)者毯焕,5
個消費者。生產(chǎn)者生產(chǎn)的速度比消費者消費的速度要快磺樱,從圖中很明顯看到生產(chǎn)者率先生產(chǎn)出20
個產(chǎn)品纳猫,已是庫存極大值,往后不能再去生產(chǎn)了竹捉,然后通知消費者去消費芜辕。
用BlockingQueue實現(xiàn)
BlockingQueue
是一個阻塞隊列击狮,也是面試官常喜歡問的一個點劣摇。BlockingQueue
是線程安全的,內(nèi)部可以自動調(diào)用wait()
喻鳄,notify()
方法憨闰。在多線程環(huán)境下状蜗,我們可以使用BlockingQueue
去完成數(shù)據(jù)的共享,同時可以兼顧到效率和線程安全鹉动。
如果生產(chǎn)者生產(chǎn)商品的速度遠大于消費者消費的速度轧坎,并且生產(chǎn)的商品累積到一定的數(shù)量,已經(jīng)超過了BlockingQueue
的最大容量泽示,那么生產(chǎn)者就會被阻塞缸血。那為什么時候撤銷生產(chǎn)者的阻塞呢蜜氨?只有消費者開始消費累積的商品,且累積的商品數(shù)量小于BlockingQueue
的最大容量属百,才能撤銷生產(chǎn)者的阻塞记劝。
如果庫存為0
的話,消費者自動被阻塞族扰。只有生產(chǎn)者生產(chǎn)出商品,才會撤銷消費者的阻塞定欧。
- 定義
Storage
倉庫
public class Storage {
private BlockingQueue<Product> queues = new LinkedBlockingQueue<Product>(10);
public void push(Product p) throws InterruptedException {
queues.put(p);
}
public Product pop() throws InterruptedException {
return queues.take();
}
}
- 定義
Product
商品
public class Product {
private int id;
public static int MAX = 20;
public Product(int id) {
super();
this.id = id;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
}
- 定義生產(chǎn)者
public class Producer implements Runnable {
private String name;
private Storage storage = null;
public Producer(String name, Storage storage) {
this.name = name;
this.storage = storage;
}
@Override
public void run() {
int i = 0;
try {
while (true) {
System.out.println(name + "已經(jīng)生產(chǎn)一個: id為" + i + "的商品");
System.out.println("===========================");
Product product = new Product(i++);
storage.push(product);
Thread.sleep(100);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- 定義消費者
public class Consumer implements Runnable {
private String name;
private Storage storage = null;
public Consumer(String name, Storage s) {
this.name = name;
this.storage = s;
}
@Override
public void run() {
try {
while (true) {
Product product = storage.pop();
System.out.println(name + "已經(jīng)消費一個: id為" + product.getId() + "的商品");
System.out.println("===========================");
Thread.sleep(500);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- 測試demo
public class ProducterConsumerDemo {
public static void main(String[] args) {
Storage storage = new Storage();
ExecutorService service = Executors.newCachedThreadPool();
Producer p0 = new Producer("騰訊", storage);
Consumer c0 = new Consumer("cmazxiaoma", storage);
service.submit(p0);
service.submit(c0);
service.shutdown();
}
}
我們可以清晰看到在"騰訊已經(jīng)生產(chǎn)一個:id為13的商品"
之前渔呵,生產(chǎn)者是隨便的生產(chǎn),消費者是隨便的消費砍鸠,生產(chǎn)者的速度遠遠大于消費者的速度扩氢。然而在"騰訊已經(jīng)生產(chǎn)一個:id為13的商品"
之后,累積的商品數(shù)量已經(jīng)要到達BlockingQueue
的最大容量10
了爷辱。此時生產(chǎn)者已經(jīng)被阻塞了录豺,已經(jīng)不能再生產(chǎn)了,只有當消費者開始產(chǎn)品時饭弓,才能喚醒生產(chǎn)者双饥。所以之后的打印內(nèi)容就很有規(guī)律了:騰訊一生產(chǎn)商品,cmazxiaoma
就開始消費弟断。
從這個例子中咏花,我們可以看到BlockingQueue
通過平衡生產(chǎn)者和消費者的處理能力,因此提高了整體處理數(shù)據(jù)的速度阀趴。
死鎖和活鎖
死鎖:兩個或者多個線程相互等待對方釋放鎖昏翰,則會出現(xiàn)循環(huán)等待的現(xiàn)象,也就是死鎖刘急。
Java
虛擬機沒有有效的措施去解決死鎖情況棚菊,所以在多線程編程中應該采用措施去避免死鎖的出現(xiàn)。活鎖:任務或者執(zhí)行者沒有被阻塞叔汁,由于某些條件沒有滿足统求,導致一直重復嘗試 -> 失敗 -> 嘗試 -> 失敗」ツ活鎖和死鎖的區(qū)別在于球订,處于活鎖的實體在不斷的改變狀態(tài)。而處于死鎖的實體表現(xiàn)為等待瑰钮,活鎖有可能自行解開冒滩,死鎖則不能。
死鎖產(chǎn)生的原因以及四個必要條件
-
產(chǎn)生死鎖的原因:
- 系統(tǒng)資源不足浪谴。
- 資源分配不當开睡。
- 進程運行推進的順序不合適因苹。
-
發(fā)生死鎖的必要條件:
互斥條件: 一個資源只能被一個進程占用,直到被該進程釋放篇恒。
請求和保持條件:一個進程因請求被占用資源而發(fā)生阻塞時扶檐,對已獲得的資源保持不放。
不可剝奪條件:任何一個資源在沒被該進程釋放之前胁艰,任何其他進程都無法對它進行剝奪占用款筑。
循環(huán)等待條件:當發(fā)生死鎖時,所等待的進程必定會發(fā)生環(huán)路腾么,造成永久阻塞奈梳。
-
防止死鎖的一些方法:
破除互斥等待:一般無法做到。
破除請求和保持:一次性獲取所有的資源解虱。
破除循環(huán)等待:按順序獲取資源攘须。
破除無法剝奪的等待:加入超時機制。
手寫死鎖的demo
public class DeadLockDemo {
public static String obj1 = "obj1";
public static String obj2 = "obj2";
public static void main(String[] args) {
Thread a = new Thread(new LockThread1());
Thread b = new Thread(new LockThread2());
a.start();
b.start();
}
}
class LockThread1 implements Runnable {
@Override
public void run() {
System.out.println("lockThread1 running");
while (true) {
synchronized (DeadLockDemo.obj1) {
System.out.println("lockThrea1 lock obj1");
try {
Thread.sleep(3000);
synchronized (DeadLockDemo.obj2) {
System.out.println("lockThrea1 lock obj2");
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
class LockThread2 implements Runnable {
@Override
public void run() {
System.out.println("lockThread2 running");
while (true) {
synchronized (DeadLockDemo.obj2) {
System.out.println("lockThrea2 lock obj2");
try {
Thread.sleep(3000);
synchronized (DeadLockDemo.obj1) {
System.out.println("lockThrea2 lock obj1");
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
很明顯殴泰,發(fā)生了死鎖現(xiàn)象于宙,美滋滋。
線程池
-
首先來說線程的好處悍汛。
重用存在的線程捞魁,減少對象創(chuàng)建,消亡的開銷员凝,性能佳署驻。
可有效控制最大并發(fā)線程數(shù),提高系統(tǒng)資源的使用率健霹,同時避免過多資源競爭旺上,避免阻塞。
提供定時定期執(zhí)行糖埋,單線程宣吱,并發(fā)數(shù)控制等功能。
我們來看看
ThreadPoolExecutor
的構(gòu)造器瞳别。
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters and default thread factory.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
通過閱讀注釋征候,我們可以知道參數(shù)的含義:
corePoolSize
:核心池的大小,默認情況下祟敛,在創(chuàng)建線程池后疤坝,線程池中的線程數(shù)為0
,當有任務來之后馆铁,就會創(chuàng)建一個線程來執(zhí)行任務跑揉,在線程池中的線程數(shù)目達到corePoolSize
后,就會把到達的任務放到緩存隊列中。maximumPoolSize
:線程池最大的線程數(shù)历谍。keepAliveTime
:表示線程沒有任務執(zhí)行時现拒,最多保持多久時間會終止。默認情況下望侈,只有當線程池中的線程數(shù)大于corePoolSize
時印蔬,keepAliveTime
才會起作用,直到線程池中的線程數(shù)不大于corePoolSize
脱衙。unit
:參數(shù)keepAliveTime
的時間單位侥猬。workQueue
:一個阻塞隊列,用來存儲等待執(zhí)行的任務岂丘。有ArrayBlockingQueue
陵究,LinkedBlockingQueue
,SynchronousQueue
奥帘。threadFactory
:線程工廠,主要用來創(chuàng)建線程仪召。-
handler
:表示當拒絕處理任務時的策略寨蹋。我們可以在ThreadPoolExecutor
類中,看到4種拒絕策略扔茅。-
AbortPolicy
已旧,拒絕處理任務時會拋出一個RejectedExecutionException
異常。
-
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
-
DiscardPolicy
召娜,拒絕處理任務時默認丟棄該任務运褪。
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
-
DiscardOldestPolicy
,去處理拒絕的任務玖瘸,丟棄最舊的未處理的任務秸讹,然后重新執(zhí)行該任務。除非執(zhí)行者被關(guān)閉雅倒,在這種情況下璃诀,任務會被丟棄。
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
}
-
CallerRunsPolicy
蔑匣,拒絕的任務由調(diào)用線程去處理劣欢,除非執(zhí)行者被關(guān)閉,那么拒絕的任務直接丟棄裁良。
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
- Java通過
Executors
提供了4
種線程池凿将,分別為:-
newCachedThreadPool
:創(chuàng)建一個可緩存線程池,里面有我們需要的線程价脾。但是當線程池里面的線程可用的時候牧抵,我們可以重用它們。當我們需要執(zhí)行生命周期很短的任務時彼棍,我們將重復使用之前構(gòu)造的線程處理任務灭忠,這樣線程池通常能提高性能膳算。如果沒有現(xiàn)成的線程可使用,會創(chuàng)建一個新的線程并添加到線程池中弛作。如果有線程在60s
中未使用涕蜂,我們會終結(jié)它并把它從緩存中刪除。因此一個閑置時間足夠長的線程池映琳,將不會消耗任何資源机隙。如果我們需要自定義超時參數(shù),我們可以通過ThreadPoolExecutor
進行構(gòu)建線程池萨西。
-
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available. These pools will typically improve the performance
* of programs that execute many short-lived asynchronous tasks.
* Calls to {@code execute} will reuse previously constructed
* threads if available. If no existing thread is available, a new
* thread will be created and added to the pool. Threads that have
* not been used for sixty seconds are terminated and removed from
* the cache. Thus, a pool that remains idle for long enough will
* not consume any resources. Note that pools with similar
* properties but different details (for example, timeout parameters)
* may be created using {@link ThreadPoolExecutor} constructors.
*
* @return the newly created thread pool
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
-
newFixedThreadPool
:定長線程池有鹿,可控制并發(fā)數(shù),超出指定數(shù)量的線程將在隊列中等待谎脯。創(chuàng)建一個線程池葱跋,重用固定的數(shù)量的線程池,使用的是共享的無界隊列源梭。在任何時候娱俺,至多nThreads
線程被激活的處理事務。當所有的線程處于繁忙的狀態(tài)下废麻,有其他的任務被提交過來荠卷,它們會在隊列中等待,直到有可用的線程為止烛愧。如果任何線程在執(zhí)行期間由于失敗而終止油宜,在shutdown
之前,如果有需要的話怜姿,將會有新的線程去取代它并執(zhí)行后續(xù)任務慎冤。線程池中的存在會一直存在除非明確的進行shutdown
。
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue. At any point, at most
* {@code nThreads} threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
* If any thread terminates due to a failure during execution
* prior to shutdown, a new one will take its place if needed to
* execute subsequent tasks. The threads in the pool will exist
* until it is explicitly {@link ExecutorService#shutdown shutdown}.
*
* @param nThreads the number of threads in the pool
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
-
newScheduledThreadPool
:創(chuàng)建一個線程池社牲,可以調(diào)度命令在給定的延遲后運行粪薛,或定時執(zhí)行。其corePoolSize
參數(shù)是保留在線程池中的線程數(shù)量搏恤,即使它們閑置违寿。
/**
* Creates a thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if {@code corePoolSize < 0}
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
/**
* Creates a new {@code ScheduledThreadPoolExecutor} with the
* given core pool size.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @throws IllegalArgumentException if {@code corePoolSize < 0}
*/
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
-
newSingleThreadExecutor
:創(chuàng)建一個使用單線程執(zhí)行無界隊列的Executor
。(如果這個單線程在關(guān)閉之前的執(zhí)行期由于失敗而終止熟空,如果需要執(zhí)行后續(xù)任務的話藤巢,那么新的線程會取代它)可以保證任務保持按順序進行,并且在任何給定時間不會超過一個任務處于活躍狀態(tài)息罗。它返回的executor
不同于newFixedThreadPool
返回的executor
掂咒。newFixedThreadPool
返回的executor
保證不需要重新配置,即可使用其他的線程。
/**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue. (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* {@code newFixedThreadPool(1)} the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
*
* @return the newly created single-threaded Executor
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
怎么理解無界隊列和有界隊列
ArrayBlockingQueue
就是一個有界隊列绍刮,而LinkedBlockingQueue
温圆、SynchronousQueue
則是無界隊列。
SynchronousQueue
:它是無界的孩革,是一種無緩沖的等待隊列岁歉,但是由于該Queue
本身的特性,在某次添加元素后必須等待其他線程取走才能繼續(xù)添加膝蜈。有2
種模式锅移,一個是公平模式,采用的是公平鎖饱搏,并配合一個FIFO
隊列(Queue
)來管理多余的生產(chǎn)者和消費者非剃。另一個是非公平模式,采用的是非公平鎖推沸,并配合一個LIFO
(Stack
)來管理多余的生產(chǎn)者和消費者备绽,這也是SynchronousQueue
默認的模式。后一者模式鬓催,如果生產(chǎn)者和消費者的處理速度有差距的話疯坤,很容易出現(xiàn)饑渴情況進而導致有些數(shù)據(jù)得不到處理。(公平鎖:加鎖前檢查是否有排隊的線程深浮,優(yōu)先排隊等待的線程,先到先得眠冈。 非公平鎖:加鎖時不考慮排隊等待問題飞苇,直接嘗試獲取鎖,獲取不到自動到隊尾等待)LinkedBlockingQueue
:它是一個無界的蜗顽,是一個無界緩存的等待隊列布卡。LinkedBlockingQueue
之所以能高效的處理并發(fā)數(shù)據(jù),正因為消費者和生產(chǎn)者分別采用了獨立的鎖來控制數(shù)據(jù)的同步雇盖,這也意味著在高并發(fā)的情況下忿等,生產(chǎn)者和消費者可以并行的操作隊列中的數(shù)據(jù),以此來提高整個隊列的并發(fā)性能崔挖。
/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
-
ArrayBlockingQueue
:它是有界的贸街,是一個有界緩存的等待隊列。內(nèi)部維護著一個定長數(shù)據(jù)緩存隊列狸相,由數(shù)組構(gòu)成薛匪。takeIndex
標識著隊列的頭部,putIndex
標識著隊列的尾部脓鹃。ArrayBlockingQueue
只有一個主鎖逸尖,證明生產(chǎn)者和消費者無法并行運行。
/** The queued items */
final Object[] items;
/** items index for next take, poll, peek or remove */
int takeIndex;
/** items index for next put, offer, or add */
int putIndex;
/** Number of elements in the queue */
int count;
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
/**
* Shared state for currently active iterators, or null if there
* are known not to be any. Allows queue operations to update
* iterator state.
*/
transient Itrs itrs = null;
無界隊列和有界隊列其實會給ThreadPoolExecutor
造成一定的影響。
當
corePoolSize = maximumPoolSize
下娇跟,有界隊列也滿了的話岩齿,那么線程池就會采取拒絕任務策略。與有界隊列相比苞俘,除非系統(tǒng)資源耗盡盹沈,否則無界的任務隊列不存在任務入隊失敗的情況。如果任務創(chuàng)建和處理的速度差異很大苗胀,無界隊列會快速增長襟诸,直到耗盡內(nèi)存。
ReetrantLock基协、Synchronized歌亲、ReadWriteLock
Lock
是jdk1.5
引進的,ReetrantLock
是jdk1.6
引進的澜驮。上一次二面機器人公司陷揪,就是掛在這里了。面試官讓我說說它們的源碼實現(xiàn)以及需求場景杂穷。
使用場景:
當讀寫頻率幾乎相等悍缠,而且不需要特殊需求的時候,優(yōu)先考慮synchronized
當我們需要定制我們自己的
Lock
時耐量,或者需要更多的功能(類似定時鎖飞蚓,可中斷鎖等
待),我們可以使用ReetrantLock
廊蜒。當我們很少的進行寫操作趴拧,更多的讀操作,并且讀操作是一個相對耗時的操作山叮,那么就可以使用
ReadWriteLock
著榴。-
Synchronized
是一種互斥鎖。在實際開發(fā)中屁倔,當某個變量需要在多個線程之間共享的話脑又,需要分析具體場景。如果多個線程對該共享變量的讀和寫沒有競爭關(guān)系锐借,則可以使用Concurrent
包下提供的并發(fā)數(shù)據(jù)結(jié)構(gòu)问麸。但是如果多個線程對共享變量之間的讀和寫之間有競爭關(guān)系的話,則需要鎖住整個變量了瞎饲。Synchronized
是Java
內(nèi)置鎖口叙,JVM
是通過monitorenter
和monitorexit
指令實現(xiàn)內(nèi)置鎖。每個對象都有一個
monitor
鎖嗅战,當monitor
被占用時妄田,該對象就處于鎖定狀態(tài)俺亮,其他試圖訪問該對象的線程將會被阻塞。對于同一個線程來說疟呐,
monitor
是可重入的脚曾,重入的時候會將占有數(shù)加1。當一個線程試圖訪問某一個變量時启具,如果發(fā)現(xiàn)該變量的
monitor
占有數(shù)為0
本讥,就可以美滋滋的占用該對象,如果大于等于1
的話鲁冯,那么苦滋滋的進入阻塞狀態(tài)拷沸。執(zhí)行
monitorexit
的線程必須是持有monitor
的某一個對象。當執(zhí)行完這個命令時薯演,如果占用數(shù)為0
撞芍,則當前線程釋放monitor
。
-
ReetrantLock
是基于AQS
(AbstractQueuedSynchronizer
)的鎖跨扮。有一個
state
變量序无,初始值為0
,假設(shè)當前線程為A
衡创,每當A
獲取一次鎖帝嗡,state++
。每當A
釋放一次鎖的時候璃氢,state--
哟玷。當
A
擁有鎖的時候,state
肯定大于0
一也。B
線程嘗試獲取鎖的時候碗降,會對這個state
有一個CAS(0,1)
的操作塘秦,嘗試幾次失敗后就掛起線程,進入等待隊列动看。如果
A
線程恰好釋放鎖尊剔,state
等于0
,就會去喚醒等待隊列中的B
菱皆。B
被喚醒之后回去檢查這個state
的值须误,嘗試CAS(0,1)
仇轻,如果這時恰好C
線程也嘗試爭搶這把鎖京痢。公平鎖的實現(xiàn),
C
發(fā)現(xiàn)線程B
在等待隊列篷店,直接將自己進入等待隊列并掛起祭椰,然后B
獲得鎖臭家。非公平鎖的實現(xiàn),
C
直接嘗試CAS(0,1)
操作方淤,并成功改變了state
的值钉赁,B
獲得鎖失敗,再次掛起携茂。B
在C
之前嘗試獲取鎖你踩,而最終C
搶到了鎖。
參考文章
尾言
書山有路勤為徑,學海無涯苦作舟卿堂。