問題引入:多線程并發(fā)安全引起的思考
首先我們通過引入一段示例進入我們今天的主題楼入。先來看下面一段生產(chǎn)者消費者多線程并發(fā)的代碼示例
public class ProducerAndConsumerV1 {
static class DataBuffer<T> {
private Queue<T> queue = new LinkedList();
private static final Integer QUEUE_MAX_LENGTH = 10000;
private Integer length = 0;
private final Integer MAX_LENGTH = 10;
/**
* 消息生產(chǎn)
* @param message
*/
public void produceMessage(T message) {
if(length < QUEUE_MAX_LENGTH){
queue.add(message);
length++;
System.out.println("生產(chǎn)消息,隊列長度:"+length);
}
}
/**
* 消費消息
*/
public void consumerMessage() {
System.out.println("消費消息哥捕,隊列長度:"+length);
if (length > 0) {
queue.poll();
length -- ;
}
}
}
public static void main(String[] args) {
ProducerAndConsumerV1.DataBuffer dataBuffer = new ProducerAndConsumerV1.DataBuffer();
Runnable produceAction = () -> {
while (true) {
try {
dataBuffer.produceMessage("cs");
Thread.sleep(200);
} catch (Exception e) {
e.printStackTrace();
}
}
};
Runnable consumerAction = () -> {
while (true) {
try {
dataBuffer.consumerMessage();
Thread.sleep(100);
} catch (Exception e) {
e.printStackTrace();
}
}
};
// 同時并發(fā)執(zhí)行的線程數(shù)
final int THREAD_TOTAL = 200;
//線程池,用于多線程模擬測試
ExecutorService threadPool =
Executors.newFixedThreadPool(THREAD_TOTAL);
//假定共 200 條線程嘉熊,其中有 100 個消費者和 100 生產(chǎn)者
//100個生產(chǎn)者遥赚,每隔200ms生產(chǎn)消息
for (int i = 0; i < 100; I++)
{
threadPool.submit(produceAction);
}
//100個消費者,每隔100ms消費消息
for (int i = 0; i < 100; I++)
{
threadPool.submit(consumerAction);
}
}
}
在上面的代碼中阐肤,我們做了一個簡單的消息隊列凫佛,100個生產(chǎn)者每隔200ms像消息隊列中生產(chǎn)消息,100個消費者每隔100ms消費消息孕惜。如果對并發(fā)安全稍微有一點了解的同學應(yīng)該能看出來毫炉,上述的實例代碼是存在多線程安全問題的进陡。多個線程間同時去操作隊列和長度盗蟆,是會產(chǎn)生線程不安全的情況的仆邓。那么如何解決呢,大部分人可能會直接上sychroized java的內(nèi)置鎖,或者用java的顯示鎖Lock去保證線程安全。那么問題來了,如果讓你自己實現(xiàn)一把鎖解決這個問題,你該如何設(shè)計呢苞氮?或者換個問法赞厕,如果不讓你使用java的內(nèi)置鎖或者顯示鎖镀虐,你該如果解決這個線程安全的問題呢绽慈?
AQS原理
思考:我們先拋開解決這個問題的各種技術(shù)手段不談仪芒,單純想想如何解決這個問題铆隘。其實上述示例代碼中的核心問題是要保證臨界資源的訪問安全問題,說通俗點,就是同一時刻就只能有一個生產(chǎn)者或者一個消費者去操作隊列捣作。我們知道,在一個進程中,多個線程是共享同一個進程的內(nèi)存、cpu等資源的,也正是因為這樣狭魂,所以才會有多個線程并發(fā)訪問的問題炫掐。但是也正是因為這個特性,我們想到睬涧,如果想讓多個線程順序執(zhí)行募胃,我們是不是可以先定義一個中間變量 state 痹束,初始化值為0(軟件工程學中的一個重要思想,往往在解決一個比較復(fù)雜的問題時郑现,我們只要多引入一層或者一個中間變量脱盲,就可以解決問題)尚卫,讓多個線程同時去修改這個中間變量state = 1归榕,如果修改成功,就表示這個線程獲得了生產(chǎn)消息或者消費消息的權(quán)利吱涉。而修改失敗的線程則繼續(xù)去修改這個變量刹泄,直到修改成功的一刻,則獲取了生產(chǎn)消息和消費消息的權(quán)利邑飒。我們姑且把這個state 叫做鎖循签,state =1 時就表示加鎖成功,就可以操作消息隊列疙咸。上面我們說的這種方法县匠,要保證的前提時,修改state = 1 這個操作要是原子性的操作撒轮,也就是同一時刻只有一個線程可以修改成功乞旦,cpu其實支持這樣的指令:cmpxchg,而在java應(yīng)該層面也封裝好了這樣的操作题山,在UnSafe類中兰粉。這樣的操作有一個專業(yè)名詞,叫做CAS(Compare and Swap)
操作系統(tǒng)層面的 CAS 是一條 CPU 的原子指令(cmpxchg 指令)顶瞳,正是由于該指令具備了原子性玖姑,所以使用 CAS 操作數(shù)據(jù)時不會造成數(shù)據(jù)不一致問題愕秫,Unsafe 提供的 CAS 方法,直接通過native 方式(封裝 C++代碼)調(diào)用了底層的 CPU 指令 cmpxchg焰络。
利用上面說到的CAS戴甩,我們可以寫出以下代碼:
public class MyLock {
//使用volatile關(guān)鍵字,保證有序性和可見性
private volatile int state;
//不安全類
private static final Unsafe unsafe = getUnsafe();;
//value 的內(nèi)存偏移(相對與對象頭部的偏移闪彼,不是絕對偏移)
private static final long valueOffset;
static {
try {
//取得 value 屬性的內(nèi)存偏移
valueOffset = unsafe.objectFieldOffset(MyLock.class.getDeclaredField("state"));
} catch (Exception ex) {
throw new Error(ex);
}
}
/**
* 加鎖
* @return
*/
public boolean lock() {
int oldValue = state;
//通過 CAS 原子操作甜孤,如果操作失敗,則自旋畏腕,一直到操作成功
do {
oldValue = state;
} while (!unSafeCompareAndSet(oldValue, oldValue + 1));
return true;
}
/**
* 解鎖
* @return
*/
public boolean unlock() {
int oldValue = state;
//通過 CAS 原子操作缴川,如果操作失敗,則自旋描馅,一直到操作成功
do {
oldValue = state;
} while (!unSafeCompareAndSet(oldValue, oldValue - 1));
return true;
}
public final boolean unSafeCompareAndSet(int oldValue, int
newValue) {
//原子操作:使用 unsafe 的“比較并交換方法”把夸,進行 value 屬性的交換
return unsafe.compareAndSwapInt(
this, valueOffset,oldValue ,newValue );
}
public static Unsafe getUnsafe() {
try {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
return (Unsafe) theUnsafe.get(null);
} catch (Exception e) {
throw new AssertionError(e);
}
}
public static void main(String[] args) {
MyLock myLock = new MyLock();
System.out.println(myLock.lock());
}
}
lock方法大概解釋一下:CAS 是一種無鎖算法,該算法關(guān)鍵依賴兩個值——期望值(就值)和新值铭污,底層 CPU 利用原子操作扎即,判斷內(nèi)存原值與期望值是否相等,如果相等則給內(nèi)存地址賦新值况凉,否則不做任何操作谚鄙。那么以上代碼的含義就是:
(1)獲得字段的期望值(oldValue)。
(2)計算出需要替換的新值(newValue)刁绒。
(3)通過 CAS 將新值(newValue)放在字段的內(nèi)存地址上闷营,如果 CAS 失敗則重復(fù)第 1 步到第 2 步,一直到 CAS 成功知市,這種重復(fù)俗稱 CAS 自旋傻盟。
以上cas的方法保證了原子性,volatile 關(guān)鍵字保證了有序性和可見性(基于篇幅關(guān)系嫂丙,本文不再講解volatile的原理娘赴,有興趣的小伙伴可以去了解一下)。
通過以上lock方法跟啤,可以多個線程去同時修改state字段的屬性值诽表,修改成功,則返回true隅肥,表示獲取到鎖竿奏,修改失敗,則一直在自旋嘗試去修改腥放。獲取到鎖的線程泛啸,生產(chǎn)或者消費消息結(jié)束后,調(diào)用unLock方法秃症,修改state狀態(tài)為0候址,釋放鎖吕粹。此時在一直不斷自旋的線程則獲取到鎖繼續(xù)執(zhí)行。
以上是我們基于cas自旋加volitale關(guān)鍵字實現(xiàn)的一把鎖岗仑,當然這把鎖還有很不足的地方昂芜,比如搶鎖過程中修改state值失敗的線程,一直自旋去修改赔蒲,這種在競爭激烈的情況下,是非常消耗cpu性能的良漱,我們可以考慮加一個隊列舞虱,這個隊列可以用來保存搶鎖失敗的線程,入隊列成功之后母市,暫停搶鎖失敗的線程矾兜,從而讓出cpu。等到持有鎖的線程釋放鎖時患久,喚醒隊列中頭結(jié)點中的線程椅寺,被喚醒的線程加鎖成功之后,重復(fù)以上流程蒋失,釋放鎖后喚醒隊列中的下一個節(jié)點返帕。那么基于以上想法,MyLock 中幾個重要的屬性就出來了
MyLock{
private volitale int state = 0;
private volitale Queue queue = new LinkedList();
}
說到這里篙挽,其實在java中荆萤,我們以上所說的邏輯就是 AQS的邏輯,只不過在AQS中铣卡,我們定義的隊列链韭,是用一個FIFO先進先出的雙向鏈表來實現(xiàn)的,除此之外煮落,它還支持很多的功能敞峭,讓我們對鎖的使用更為優(yōu)雅,比如還支持共享鎖蝉仇,可中斷旋讹,條件鎖等等。下面我們講的線程間的通信就是基于AQS的條件等待轿衔。因此骗村,如果我們想要使用鎖,可以直接用java中的ReentrantLock,它就是基于AQS實現(xiàn)的一個排它鎖呀枢。
線程間的通信
文章開頭給出的示例胚股,其實除了線程安全的問題,還有一個問題就是:生產(chǎn)消息時即使隊列已經(jīng)滿了裙秋,還是會一直無效的循環(huán)判斷隊列是否可以加入消息琅拌,同樣消費者也是這樣的問題缨伊,即使消息隊列已經(jīng)空了,還是會無效循環(huán)隊列是否有消息可以消費进宝。我們可以想到的處理辦法就是刻坊,是否可以在多線程間通信,比如生產(chǎn)者的線程在消息隊列已經(jīng)滿了的情況下党晋,直接休眠谭胚,等到消費者的線程消費了消息,隊列消息不滿的情況下再來喚醒生產(chǎn)者線程生產(chǎn)消息未玻≡侄或者從消費者角度來看,隊列為空的情況扳剿,線程暫停旁趟,等到隊列有消息時,生產(chǎn)者線程來喚醒消費者線程進行消費消息庇绽。這就涉及到了線程間的通信锡搜,目前java中線程通信有兩種機制,一個是通過Object.wait,Object.notify 來實現(xiàn)(要配合sychroized來使用)瞧掺,另一個就是通過Lock的條件等待耕餐。下面我們就這兩種方式來闡述其使用方式和原理。
Object.wait,notify方式
底層實現(xiàn)原理的數(shù)據(jù)結(jié)構(gòu)
{
entryList
Ower
WaitSet
}
entryList:有資格成為競爭候選的線程
Ower: 擁有monitor的線程
waitSet:處于等待狀態(tài)的線程
下面以此代碼示例講解
Object full = new Object()
A線程:
synchronized (full) {
full.wait();
}
B線程:
synchronized (full) {
full.notify();
}
我們知道sychroized 鎖的原理是通過對象的mointor監(jiān)視器實現(xiàn)的辟狈。
對象的 wait 方法的核心原理蛾方,大致如下:
(1)當線程調(diào)用了 locko(某個同步鎖對象)的 wait 方法后,JVM 會將當前線程加入 locko
監(jiān)視器的 WaitSet(等待集)上陕,等待被其他線程喚醒桩砰。
(2)當前線程會釋放 locko 對象監(jiān)視器的 Owner 權(quán)利,讓其他線程可以搶奪 locko 對象的監(jiān)
視器释簿。
(3)讓當前線程等待亚隅,其狀態(tài)變成 WAITING。
對象的 notify(或者 notifyAll)方法的核心原理庶溶,大致如下:
(1)當線程調(diào)用了 locko(某個同步鎖對象)的 notify 方法后煮纵,JVM 會喚醒 locko 監(jiān)視器
WaitSet 中的第一條等待線程。
(2)當線程調(diào)用了 locko 的 notifyAll 方法后偏螺,JVM 會喚醒 locko 監(jiān)視器 WaitSet 中的所有等
待線程行疏。
(3)等待線程被喚醒后,會從監(jiān)視器的 WaitSet 移動到 EntryList套像,線程具備了排隊搶奪監(jiān)視
器 Owner 權(quán)利的資格酿联,其狀態(tài)從 WAITING 變成 BLOCKED。 (4)EntryList 中的線程搶奪到監(jiān)視器 Owner 權(quán)利之后,線程的其狀態(tài)從 BLOCKED 變成贞让,
Runnable周崭,具備重新執(zhí)行的資格。
下面我們給出用sychroized 和 wait喳张,notify方式的一版代碼:
public class ProducerAndConsumer<T> {
static class DataBuffer<T> {
private Queue<T> queue = new LinkedList();
private Integer length = 0;
private final Integer MAX_LENGTH = 10;
private Object syncObject = new Object();
/**
* 當隊列不滿的時候续镇,向隊列發(fā)送消息
*/
private Object NOT_FULL = new Object();
/**
* 當隊列不為空時,向隊列發(fā)送消息
*/
private Object NOT_EMPTY = new Object();
/**
* 消息生產(chǎn)
* @param message
*/
public void produceMessage(T message) throws InterruptedException {
while (length > MAX_LENGTH) {
synchronized (NOT_FULL) {
NOT_FULL.wait();
System.out.println("-------隊列已滿-------");
}
}
synchronized (syncObject) {
queue.add(message);
length++;
}
synchronized (NOT_EMPTY) {
System.out.println("-------隊列不為空-------");
NOT_EMPTY.notify();
}
}
/**
* 消費消息
*/
public void consumerMessage() throws InterruptedException {
while (length <= 0) {
synchronized (NOT_EMPTY) {
System.out.println("消費者-------隊列已空-------");
NOT_EMPTY.wait();
}
}
synchronized (syncObject) {
queue.poll();
length -- ;
}
synchronized (NOT_FULL) {
NOT_FULL.notify();
}
}
}
public static void main(String[] args) throws Exception{
DataBuffer dataBuffer = new DataBuffer();
Callable produceAction = () -> {
try {
dataBuffer.produceMessage(1001L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
};
Callable consumerAction = () -> {
try {
dataBuffer.consumerMessage();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
};
// 同時并發(fā)執(zhí)行的線程數(shù)
final int THREAD_TOTAL = 20;
//線程池销部,用于多線程模擬測試
ExecutorService threadPool =
Executors.newFixedThreadPool(THREAD_TOTAL);
//假定共 11 條線程摸航,其中有 10 個消費者,但是只有 1 個生產(chǎn)者舅桩;
final int CONSUMER_TOTAL = 3;
final int PRODUCE_TOTAL = 1;
for (int i = 0; i < PRODUCE_TOTAL; i++)
{
//生產(chǎn)者線程每生產(chǎn)一個商品酱虎,間隔 50ms
threadPool.submit(new Producer(produceAction, 30000));
}
for (int i = 0; i < CONSUMER_TOTAL; i++)
{
//消費者線程每消費一個商品,間隔 100ms
threadPool.submit(new Consumer(consumerAction, 15000));
}
}
}
Condition的 await江咳,signal方式
Condition 與 Object 的 wait()/notify()作用是相似的:都是使得一個線程等待某個條件
(Condition),只有當該條件具備(signal 或者 signalAll 方法被調(diào)用)時等待線程才會被喚醒哥放,
從而重新爭奪鎖歼指。不同的是:Object 的 wait()/notify()由 JVM 底層的實現(xiàn),而 Condition 接口與實
現(xiàn)類完全使用Java代碼實現(xiàn)甥雕。當需要進行線程間的通信時踩身,建議結(jié)合使用 ReetrantLock與Condition,
通過 Condition 的 await()和 signal()方法進行線程間的阻塞與喚醒社露。
ConditionObject 類是實現(xiàn)條件隊列的關(guān)鍵挟阻,每個 ConditionObject 對象都維護一個單獨的條件
等待對列。每個 ConditionObject 對應(yīng)一個條件隊列峭弟,它記錄該隊列的頭節(jié)點和尾節(jié)點附鸽。
public class ConditionObject implements Condition, java.io.Serializable {
//記錄該隊列的頭節(jié)點
private transient Node firstWaiter;
//記錄該隊列的尾節(jié)點
private transient Node lastWaiter;
}
await()等待方法原理:
signal()喚醒方法原理
最后給出我們用java顯示鎖和條件等待的一版代碼:
public class ProducerAndConsumerV3 {
static class DataBuffer<T> {
private Queue<T> queue = new LinkedList();
private static final Integer QUEUE_MAX_LENGTH = 1;
private Integer length = 0;
private final Integer MAX_LENGTH = 10;
private Lock syncObject = new ReentrantLock(true);
private Condition NOT_FULL = syncObject.newCondition();
private Condition NOT_EMPTY = syncObject.newCondition();
/**
* 消息生產(chǎn)
* @param message
*/
public void produceMessage(T message) throws InterruptedException {
syncObject.lock();
if (length < QUEUE_MAX_LENGTH) {
queue.add(message);
length++;
System.out.println("生產(chǎn)消息,隊列長度:" + length);
NOT_EMPTY.signal();
} else {
NOT_FULL.await();
}
syncObject.unlock();
}
/**
* 消費消息
*/
public void consumerMessage() throws InterruptedException {
syncObject.lock();
System.out.println("消費消息,隊列長度:"+length);
if (length > 0) {
queue.poll();
length--;
NOT_FULL.signal();
} else {
NOT_EMPTY.await();
}
syncObject.unlock();
}
}
public static void main(String[] args) {
ProducerAndConsumerV3.DataBuffer dataBuffer = new ProducerAndConsumerV3.DataBuffer();
for (int i=0;i<3;i++) {
Runnable produceAction = () -> {
try {
dataBuffer.produceMessage("cs");
Thread.sleep(200);
} catch (Exception e) {
e.printStackTrace();
}
};
new Thread(produceAction).start();
}
for (int j=0;j<3;j++) {
Runnable consumerAction = () -> {
try {
dataBuffer.consumerMessage();
Thread.sleep(100);
} catch (Exception e) {
e.printStackTrace();
}
};
new Thread(consumerAction).start();
}
}
}
我們可以再擴展一下瞒瘸,我們這里考慮的是單機情況下坷备,可以通過線程間的通信去完成,如果是放在分布式的環(huán)境下情臭,比如最常見的消息隊列中間件的實現(xiàn)省撑,消費者是怎么監(jiān)聽的broker中有消息的呢,和單機情況下一樣俯在,不可能是所有消息者循環(huán)自旋請求broker是否有消息產(chǎn)生竟秫,實現(xiàn)方式也是消費者注冊了消息監(jiān)聽的接口,當broker有消息時跷乐,然后從眾多消費者中肥败,利用調(diào)度算法選出一臺消費節(jié)點,然后回調(diào)推送消息給消費者。不同的只是單機情況下拙吉,是多線程間的通信潮孽,而在分布式的環(huán)境下,是多個進程間通過http請求互相通信罷了筷黔。