AQS(三) 多線程可重入

有道面試題

有一家生產(chǎn)奶酪的廠家旁仿,每天需要生產(chǎn)100000份奶酪賣給超市.
通過(guò)一輛送貨車發(fā)貨踏枣,送貨車輛每次送100份。
廠家有一個(gè)容量為1000份的冷庫(kù)丸凭,用于奶酪保鮮福扬,生產(chǎn)的奶酪需要先存放在冷庫(kù)。
運(yùn)輸車輛從冷庫(kù)取貨惜犀。
廠家有三條生產(chǎn)線铛碑,分別是 牛奶供應(yīng)生產(chǎn)線、發(fā)酵劑制作生產(chǎn)線虽界、奶酪生產(chǎn)線汽烦。生產(chǎn)每份奶酪需要2份牛奶和1份發(fā)酵劑。
請(qǐng)?jiān)O(shè)計(jì)生產(chǎn)系統(tǒng)莉御。


1.分析

  1. 三條生產(chǎn)線加一個(gè)送貨車就是四個(gè)線程
  2. 奶酪生產(chǎn)需要等待牛奶與發(fā)酵劑
  3. 送貨車需要等待奶酪數(shù)量滿足一次運(yùn)送的量才開(kāi)始運(yùn)送
  4. 奶酪兩種情況下需要停止生產(chǎn)
  • 達(dá)到冷藏庫(kù)數(shù)量
  • 達(dá)到當(dāng)天奶酪生產(chǎn)目標(biāo)

其實(shí)這就是一個(gè)變種的生產(chǎn)者消費(fèi)者模式撇吞,下面開(kāi)始實(shí)現(xiàn)。

2.實(shí)現(xiàn)

傳統(tǒng)的 消費(fèi)者生產(chǎn)者模型 是用Thread的wait+notify實(shí)現(xiàn)礁叔,這里我想用ReentrantLock來(lái)玩梢夯,用 Condition 來(lái)細(xì)化鎖,具體代碼如下

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class MilkOne {

    // 運(yùn)送容量
    final static int ALL_SIZE = 100;
    // 當(dāng)前容量
    final static int CURR_CAPACITY = 1000;
    // 總生產(chǎn)量
    final static int ALL_CAPACITY = 10000;

    // 牛奶
    final static AtomicInteger milkInt = new AtomicInteger(0);
    // 發(fā)酵劑 - 10000
    final static AtomicInteger starterInt = new AtomicInteger(0);
    // 奶酪 - 10000
    final static AtomicInteger cheeseInt = new AtomicInteger(0);

    static class LockCondition {

        Lock lock = new ReentrantLock();

        // 取貨車
        Condition carCondition = lock.newCondition();
        // 牛奶
        Condition milkCondition = lock.newCondition();
        // 發(fā)酵劑 - 10000
        Condition starterCondition = lock.newCondition();
        // 奶酪 - 10000
        Condition cheeseCondition = lock.newCondition();

        void lock() { lock.lock(); }

        void unlock() { lock.unlock(); }

        void signalCar() { carCondition.signal(); }

        void awaitCar() throws InterruptedException{ carCondition.await(); }

        void signalMilk() { milkCondition.signal(); }

        void awaitMilk() throws InterruptedException{ milkCondition.await(); }

        void signalStarter() { starterCondition.signal(); }

        void awaitStarter() throws InterruptedException{ starterCondition.await(); }

        void signalCheese() { cheeseCondition.signal(); }

        void awaitCheese() throws InterruptedException{ cheeseCondition.await(); }

    }

    /**
     * 三個(gè)生產(chǎn)線程(總共生產(chǎn)10000)
     *
     * 沒(méi)說(shuō)牛奶跟發(fā)酵劑需要保存,這里不設(shè)等待
     *
     * 車取貨的線程(每滿一百取一次)
     *
     */
    public static void main(String[] args) throws InterruptedException{

        LockCondition lock = new LockCondition();

        // 1.生產(chǎn)牛奶
        Thread milkTh = new Thread(new MilkTh(lock), "milk");
        milkTh.start();

        // 2.生產(chǎn)發(fā)酵劑
        Thread starterTh = new Thread(new StarterTh(lock), "starter");
        starterTh.start();

        // 3.生產(chǎn)奶酪
        Thread cheeseTh = new Thread(new CheeseTh(lock), "cheese");
        cheeseTh.start();

        // 4.car
        Thread carTh = new Thread(new CarTh(lock), "car");
        carTh.start();
    }

    // 4.運(yùn)輸車
    static class CarTh implements Runnable {
        LockCondition lock;
        CarTh(LockCondition lock) { this.lock = lock; }

        @Override
        public void run(){
            int p = 0;
            for (;;) {
                lock.lock();
                System.out.println(p + " car lock...");
                try {
                    if (p >= ALL_CAPACITY / ALL_SIZE) {
                        break;
                    }
                    // 1.沒(méi)滿運(yùn)貨車數(shù)量
                    if (cheeseInt.get() < ALL_SIZE) {
                        // 1.喚醒生產(chǎn)奶酪
                        lock.signalCheese();
                        // 2.運(yùn)貨車等待
                        lock.awaitCar();
                        continue;
                    }
                    // 可以運(yùn)送
                    if (cheeseInt.compareAndSet(cheeseInt.get(), cheeseInt.get() - ALL_SIZE)) {
                        System.out.println(p++ + " 運(yùn)送 cheeseInt: " + cheeseInt.get());
                        // 可以繼續(xù)生產(chǎn)奶酪
                        lock.signalCheese();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(p + " car unlock...");
                    lock.unlock();
                }
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("car end..."
                    + ", milk:" + milkInt.get() + ", starter:" + starterInt.get() + ", cheese:" + cheeseInt.get() + ", p:" + p);
        }
    }

    // 3.奶酪
    static class CheeseTh implements Runnable {
        LockCondition lock;
        CheeseTh(LockCondition lock) { this.lock = lock; }

        @Override
        public void run(){

            int z = 0;
            for (;;) {
                lock.lock();
                System.out.println(z + " cheese lock...");
                try {
                    if (z >= ALL_CAPACITY) {
                        break;
                    }
                    if (starterInt.get() < 1) lock.awaitStarter();
                    if (milkInt.get() < 2) lock.awaitMilk();

                    /* 1奶酪 = 2牛奶 + 1發(fā)酵劑 */
                    // 2牛奶
                    milkInt.compareAndSet(milkInt.get(), milkInt.get() - 2);
                    // 1發(fā)酵劑
                    starterInt.decrementAndGet();
                    // 1奶酪
                    int num = cheeseInt.incrementAndGet();
                    System.out.println(z++ + " 生產(chǎn)了一份 cheeseInt: " + cheeseInt.get());
                    // 喚醒運(yùn)貨車
                    lock.signalCar();
                    // 超過(guò)了 1000 份奶酪
                    if (CURR_CAPACITY <= num) {
                        // 停止生產(chǎn)
                        lock.awaitCheese();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(z + " cheese unlock...");
                    lock.unlock();
                }
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("cheese end..."
                    + ", milk:" + milkInt.get() + ", starter:" + starterInt.get() + ", cheese:" + cheeseInt.get());
        }
    }

    // 2.發(fā)酵劑
    static class StarterTh implements Runnable {
        LockCondition lock;
        StarterTh(LockCondition lock) { this.lock = lock; }

        @Override
        public void run(){
            int y = 0;
            for (;;) {
                lock.lock();
                System.out.println(y + " starter lock...");
                try {
                    if (y >= ALL_CAPACITY) {
                        break;
                    }
                    // 一次生產(chǎn)一份
                    starterInt.incrementAndGet();
                    System.out.println(y++ + " 生產(chǎn)了一份 starterInt: " + starterInt.get());
                    lock.signalStarter();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(y + " starter unlock...");
                    lock.unlock();
                }
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("starter end..."
                    + ", milk:" + milkInt.get() + ", starter:" + starterInt.get() + ", cheese:" + cheeseInt.get());
        }
    }

    // 1.牛奶
    static class MilkTh implements Runnable {

        LockCondition lock;
        MilkTh(LockCondition lock) { this.lock = lock; }

        @Override
        public void run(){
            int x = 0;
            for (;;) {
                lock.lock();
                System.out.println(x + " milk lock...");
                try {
                    if (x >= ALL_CAPACITY) {
                        break;
                    }
                    // 一次生產(chǎn)兩份
                    milkInt.addAndGet(2);
                    System.out.println(x++ + " 生產(chǎn)了兩份 milkInt: " + milkInt.get());
                    lock.signalMilk();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(x + " milk unlock...");
                    lock.unlock();
                }
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("milk end..."
                    + ", milk:" + milkInt.get() + ", starter:" + starterInt.get() + ", cheese:" + cheeseInt.get());
        }
    }
}

這么寫功能看上去是實(shí)現(xiàn)了晴圾,但其實(shí)有一個(gè)很嚴(yán)重的問(wèn)題颂砸,那就是四個(gè)線程共用了同一把可重入鎖,而可重入鎖是基于 AQS 的獨(dú)占模式死姚,也就是同一時(shí)間只有一個(gè)線程能加鎖成功人乓,其他線程只能等待。

而此處都毒,明顯四個(gè)線程應(yīng)該是可以同時(shí)執(zhí)行的色罚,比如同時(shí)生產(chǎn)牛奶和發(fā)酵劑,一邊生產(chǎn)奶酪一邊運(yùn)送奶酪等账劲。

因此這里的 ReentrantLock 是不符合要求的戳护。

3. 優(yōu)化

這里就有兩種思路,ReentrantLock 行不通那就換成 AQS 共享模式的實(shí)現(xiàn)瀑焦,不過(guò)這里我先用了 ReentrantLock 就不想換了腌且。另一個(gè)思路就是改造 ReentrantLock。

獨(dú)占模式的 ReentrantLock 行不通榛瓮,我改成共享的 ReentrantLock 可以不铺董?

試試看。

import java.io.Serializable;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;

public class HyLock implements Serializable {

    private Sync sync;

    HyLock(int permits) {
        sync = new Sync(permits);
    }

    public void lock() { sync.lock(); }

    public void unlock() { sync.unlock(); }

    public Condition newCondition() { return sync.newCondition(); }

    static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        Sync(int state) {
            setState(state);
        }

        final void lock() {
            acquireShared(1);
        }

        final void unlock() {
            releaseShared(1);
        }

        final ConditionObject newCondition() {
            return new ConditionObject();
        }

        protected final int tryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current)
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

        protected boolean tryRelease(int arg) {
            return tryReleaseShared(arg);
        }

        protected boolean tryAcquire(int arg) {
            return tryAcquireShared(arg) >= 0;
        }

        protected boolean isHeldExclusively() {
            return true;
        }
    }
}

然后替換下 LockCondition 中的 ReentrantLock

static class LockCondition {

    HyLock lock;
        
    LockCondition(int permits) {
      lock = new HyLock(permits);
    }
    ...
}

main 方法也改下

LockCondition lock = new LockCondition(4);

然后運(yùn)行會(huì)發(fā)現(xiàn)報(bào) lock 空指針禀晓,看下 LockCondition 的 class 文件

static class LockCondition {
    HyLock lock;
    Condition carCondition;
    Condition milkCondition;
    Condition starterCondition;
    Condition cheeseCondition;

    LockCondition(int permits) {
        this.carCondition = this.lock.newCondition();
        this.milkCondition = this.lock.newCondition();
        this.starterCondition = this.lock.newCondition();
        this.cheeseCondition = this.lock.newCondition();
        this.lock = new HyLock(permits);
    }

對(duì)比下 java 文件

static class LockCondition {

    HyLock lock;

    LockCondition(int permits) {
        lock = new HyLock(permits);
    }

    Condition carCondition = lock.newCondition();
    Condition milkCondition = lock.newCondition();
    Condition starterCondition = lock.newCondition();
    Condition cheeseCondition = lock.newCondition();

JIT 會(huì)將下面四個(gè)賦值操作放到唯一構(gòu)造方法里面精续,且放在最上面執(zhí)行坝锰,而此時(shí) lock 還沒(méi)有初始化所以會(huì)報(bào) NPE,所以需要調(diào)整一下 LockCondition 類

static class LockCondition {
    HyLock lock;
    // 取貨車
    Condition carCondition;
    // 牛奶
    Condition milkCondition;
    // 發(fā)酵劑 - 10000
    Condition starterCondition;
    // 奶酪 - 10000
    Condition cheeseCondition;

    LockCondition(int permits) {
        lock = new HyLock(permits);
        carCondition = lock.newCondition();
        milkCondition = lock.newCondition();
        starterCondition = lock.newCondition();
        cheeseCondition = lock.newCondition();
    }
    ...
}

這樣就行了重付。整體上來(lái)看顷级,ReentrantLock 從單線程變成了多線程

  1. 相關(guān)的數(shù)據(jù)修改(AtomicInteger)都是原子的所以不會(huì)造成數(shù)據(jù)問(wèn)題
  2. AQS 的共享模式參照 Semaphore 實(shí)現(xiàn)

但遺憾的是,這種改造是有問(wèn)題的确垫。

如果你看過(guò) ConditionObject 的實(shí)現(xiàn)弓颈,會(huì)發(fā)現(xiàn)其內(nèi)部實(shí)現(xiàn)完全沒(méi)有考慮并發(fā)風(fēng)險(xiǎn),因?yàn)槠鋵?shí)際場(chǎng)景都是先獲取到了鎖再執(zhí)行條件隊(duì)列(condition queue)的操作(入隊(duì)森爽、出隊(duì)),若想真正實(shí)現(xiàn)一個(gè)多線程版 ReentrantLock嚣镜,我們還需要重新寫一個(gè)并發(fā)安全的 Condition 實(shí)現(xiàn)來(lái)替代 AQS 自身的 ConditionObject爬迟。

有個(gè)佐證就是,JDK 提供的共享鎖實(shí)現(xiàn)中 newCondition 方法(繼承自 java.util.concurrent.locks.Lock 接口)均為不支持菊匿,比如 ReentrantReadWriteLock 的 read 鎖

public Condition newCondition() {
  throw new UnsupportedOperationException();
}

另外考慮

  1. 是否只能通過(guò)等待時(shí)間來(lái)控制生產(chǎn)速度 or 運(yùn)送速度付呕?如何設(shè)計(jì)?
  2. 有些地方是否可以換 CountdownLatch / Semaphore / CyclicBarrier 實(shí)現(xiàn)跌捆?
  3. 冷庫(kù)這塊是否充分實(shí)現(xiàn)了題意徽职?若沒(méi)有,如何改佩厚?

劍仙三尺劍姆钉,杯中二兩酒。齊活抄瓦。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末潮瓶,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子钙姊,更是在濱河造成了極大的恐慌毯辅,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,185評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件煞额,死亡現(xiàn)場(chǎng)離奇詭異思恐,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)膊毁,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門胀莹,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人婚温,你說(shuō)我怎么就攤上這事嗜逻。” “怎么了缭召?”我有些...
    開(kāi)封第一講書人閱讀 163,524評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵栈顷,是天一觀的道長(zhǎng)逆日。 經(jīng)常有香客問(wèn)我,道長(zhǎng)萄凤,這世上最難降的妖魔是什么室抽? 我笑而不...
    開(kāi)封第一講書人閱讀 58,339評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮靡努,結(jié)果婚禮上坪圾,老公的妹妹穿的比我還像新娘。我一直安慰自己惑朦,他們只是感情好兽泄,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,387評(píng)論 6 391
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著漾月,像睡著了一般病梢。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上梁肿,一...
    開(kāi)封第一講書人閱讀 51,287評(píng)論 1 301
  • 那天蜓陌,我揣著相機(jī)與錄音,去河邊找鬼吩蔑。 笑死钮热,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的烛芬。 我是一名探鬼主播隧期,決...
    沈念sama閱讀 40,130評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼赘娄!你這毒婦竟也來(lái)了厌秒?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書人閱讀 38,985評(píng)論 0 275
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤擅憔,失蹤者是張志新(化名)和其女友劉穎鸵闪,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體暑诸,經(jīng)...
    沈念sama閱讀 45,420評(píng)論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡蚌讼,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,617評(píng)論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了个榕。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片篡石。...
    茶點(diǎn)故事閱讀 39,779評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖西采,靈堂內(nèi)的尸體忽然破棺而出凰萨,到底是詐尸還是另有隱情,我是刑警寧澤,帶...
    沈念sama閱讀 35,477評(píng)論 5 345
  • 正文 年R本政府宣布胖眷,位于F島的核電站武通,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏珊搀。R本人自食惡果不足惜冶忱,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,088評(píng)論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望境析。 院中可真熱鬧囚枪,春花似錦、人聲如沸劳淆。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 31,716評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)沛鸵。三九已至括勺,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間谒臼,已是汗流浹背朝刊。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 32,857評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工耀里, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留蜈缤,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,876評(píng)論 2 370
  • 正文 我出身青樓冯挎,卻偏偏與公主長(zhǎng)得像底哥,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子房官,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,700評(píng)論 2 354