Thinking in java 之并發(fā)其五:強(qiáng)大的 JUC 包

Thinking in java 之并發(fā)其五:強(qiáng)大的 JUC 包

一、前言

java 的 java.util.concurrent 是 java 用于提供一些并發(fā)程序所需功能的類(lèi)包窄锅。它的功能全面且強(qiáng)大坯汤,在前面幕庐,我們已經(jīng)使用過(guò)原子基本變量熔吗,BlockingQueue 等類(lèi)∧范ぃ現(xiàn)在吆倦,我們需要更加深入的去了解 JUC 的強(qiáng)大功能听诸。

二、CountDownLatch

該類(lèi)用來(lái)同步一個(gè)或多個(gè)任務(wù)蚕泽,強(qiáng)制它們等待由其他任務(wù)執(zhí)行的一組操作完成晌梨。

在 CountDownLatch 對(duì)象中設(shè)置一個(gè)初始的計(jì)數(shù)值,任何在這個(gè)對(duì)象上調(diào)用 wait() 的方法都講阻塞须妻,直至這個(gè)計(jì)數(shù)值到達(dá)0仔蝌。其他任務(wù)在結(jié)束工作時(shí),可以在該對(duì)象上調(diào)用 countDown() 來(lái)減小這個(gè)數(shù)值荒吏。同事敛惊,CountDownLatch 只能出發(fā)一次,計(jì)數(shù)值不能被重置绰更。如果有重置的需要瞧挤,可以使用 CyclicBarrier。

先來(lái)看一個(gè)使用 CountDownLatch 的簡(jiǎn)單示例:

package JUCTest;

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

class TaskPortion implements Runnable{
    private static int counter = 0;
    private final int id = counter++;
    private static Random rand = new Random(47);
    private final CountDownLatch countDownLatch;
    public TaskPortion(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }
    @Override
    public void run() {
        try {
            doWork();
            countDownLatch.countDown();
        }catch(InterruptedException e) {
            System.out.println("Exit");
        }
    }

    public void doWork() throws InterruptedException{
        TimeUnit.MILLISECONDS.sleep(rand.nextInt(20000));
        System.out.println(this + " complete");
    }

    public String toString() {
        return "TaskPorition : " + id;
    }

}

class WaitingTask implements Runnable{
    private static int counter = 0;
    private final int id = counter++;
    private final CountDownLatch countDownLatch;
    public WaitingTask(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
        try {
            countDownLatch.await();
            System.out.println("latch barrier pass for " + this);
        }catch(InterruptedException e) {
            System.out.println(this + "interrupted");
        }
    }

    public String toString() {
        return "TaskPorition : " + id;
    }
}
public class CountDownLatchDemo {
    static final int SIZE = 10;
    public static void main(String args[]) {
        ExecutorService exec = Executors.newCachedThreadPool();
        CountDownLatch latch = new CountDownLatch(SIZE);
        for(int i=0;i<10;i++) {
            exec.execute(new WaitingTask(latch));
        }
        for(int i=0;i<SIZE;i++) {
            exec.execute(new TaskPortion(latch));
        }
        System.out.println("Launched all tasls");
        exec.shutdown();
    }
}
//output
/*Launched all tasls
TaskPorition : 5 complete
TaskPorition : 1 complete
TaskPorition : 4 complete
TaskPorition : 3 complete
TaskPorition : 9 complete
TaskPorition : 0 complete
TaskPorition : 7 complete
TaskPorition : 8 complete
TaskPorition : 6 complete
TaskPorition : 2 complete
latch barrier pass for TaskPorition : 1
latch barrier pass for TaskPorition : 2
latch barrier pass for TaskPorition : 0
latch barrier pass for TaskPorition : 6
latch barrier pass for TaskPorition : 7
latch barrier pass for TaskPorition : 3
latch barrier pass for TaskPorition : 4
latch barrier pass for TaskPorition : 5
latch barrier pass for TaskPorition : 8
latch barrier pass for TaskPorition : 9
*/

通過(guò)前面章節(jié)的內(nèi)容儡湾,我們可以很容一個(gè)實(shí)現(xiàn) “A 任務(wù) 等到 B 任務(wù)完成之后再去執(zhí)行” 的功能特恬,而在上述例子中,B 任務(wù)是由 10 個(gè)子任務(wù)構(gòu)成的徐钠。通過(guò) CountDownLatch 我們沒(méi)完成一個(gè)子任務(wù)癌刽,就會(huì)是 countDownLatch 減1。等待所有子任務(wù)完成,countDownLatch 變?yōu)?后显拜,啟動(dòng) A 任務(wù)衡奥。

二、CyclicBarrier

countDownLatch 可以使某個(gè)任務(wù)完成之后進(jìn)入阻塞狀態(tài)讼油,阻塞狀態(tài)持續(xù)到其他相關(guān)任務(wù)全部完成之后(countDownLatch 變?yōu)?)杰赛。CyclicBarrier 類(lèi)似于countDownLatch ,和 countDownLatch 的區(qū)別在于矮台。在所有任務(wù)完成之后,CyclicBarrier 的計(jì)數(shù)器會(huì)重置根时。

先看一個(gè)簡(jiǎn)單的示例:

package JUCTest;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

class Horse implements Runnable{
    private static int counter = 0;
    private final int id = counter++;
    private int strides = 0;
    private static Random rand = new Random(47);
    private static CyclicBarrier barrier;
    public Horse(CyclicBarrier b) {
        barrier = b;
    }
    public synchronized int getStrides() {
        return strides;
    }
    @Override
    public void run() {
        try {
            while(!Thread.interrupted()) {
                synchronized(this) {
                    strides += rand.nextInt(3);
                }
                barrier.await();
            }
        }catch(InterruptedException e) {
            e.printStackTrace();
        }catch(BrokenBarrierException e) {
            throw new RuntimeException(e);
        }
    }

    public String toString() {
        return "Horse " + id +" ";
    }

    public String tracks(){
        StringBuilder s = new StringBuilder();
        for(int i=0;i<getStrides();i++) {
            s.append("*");
        }
        s.append(id);
        return s.toString();
    }
}
public class HorseRace {

    static final int FINISH_LINE = 75;
    private List<Horse> horses = new ArrayList<Horse>();
    private ExecutorService exec =  Executors.newCachedThreadPool();
    private CyclicBarrier barrier;
    public HorseRace(int nHorses,final int pause) {
        barrier = new CyclicBarrier(nHorses,new Runnable() {
            public void run() {
                StringBuilder s = new StringBuilder();
                for(int i=0;i<FINISH_LINE;i++) {
                    s.append("=");
                }
                System.out.println(s);
                for(Horse horse : horses)
                    System.out.println(horse.tracks());
                for(Horse horse : horses) {
                    if(horse.getStrides() >= FINISH_LINE) {
                        System.out.println(horse + "Won!");
                        exec.shutdownNow();
                        return;
                    }
                }
                try {
                    TimeUnit.MILLISECONDS.sleep(pause);
                }catch(InterruptedException e) {
                    System.out.println("barrier-action sleep interrupted");
                }
            }
        });
        for(int i=0;i<nHorses;i++) {
            Horse horse = new Horse(barrier);
            horses.add(horse);
            exec.execute(horse);
        }
    }
    public static void main(String[] args) {
        int nHorses = 7;
        int pause = 200;
        new HorseRace(nHorses,pause);
    }

}

上述程序是一個(gè)模擬賽馬的操作瘦赫,一共有75個(gè)柵欄,每個(gè)馬的速度都不一樣的蛤迎,所以每次打印每只馬跨越了多少柵欄時(shí)确虱,會(huì)出現(xiàn)你追我趕的情況。但是程序內(nèi)在邏輯是怎么樣呢替裆?

我們可以把馬對(duì)應(yīng)成一個(gè)任務(wù)校辩,馬跨域柵欄是一次 run() 方法內(nèi)部走完了一次循環(huán)。

CyclicBarrier 就相當(dāng)于一堵墻辆童,它橫在所有馬的前方宜咒,當(dāng)馬完成一次操作(隨機(jī)跨越1~3個(gè)柵欄),它來(lái)到了墻面前把鉴,被墻擋住(代碼是通過(guò) await() 實(shí)現(xiàn)的)故黑。等所有的馬(具體幾只是在 CyclicBarrier 的構(gòu)造函數(shù)里確定的)都來(lái)到墻面前的時(shí)候,墻打開(kāi)庭砍,所有馬進(jìn)行下一次操作场晶。

可以推測(cè)出來(lái),CyclicBarrier 內(nèi)部一定有一個(gè)計(jì)數(shù)器(通過(guò)查看源碼可以知道 在構(gòu)造函數(shù)里是把值賦給 final int parties 和 int count 的怠缸,前者是 final 無(wú)法改變用于重置計(jì)數(shù)器使用诗轻,后者用于計(jì)數(shù)),我們沒(méi)調(diào)用一次 await() 方法揭北,這個(gè)計(jì)數(shù)器就會(huì)減1扳炬。直到我們調(diào)用了 parties 次 await() 計(jì)數(shù)器變?yōu)?0 。然后所有任務(wù)可以進(jìn)行一下步罐呼,同時(shí)鞠柄,計(jì)數(shù)器變?yōu)?parties ,繼續(xù)阻塞任務(wù)進(jìn)入再下一步的操作嫉柴,直到它再次為0厌杜;

ps: 通過(guò)源碼可以肯定我們的推測(cè),事實(shí)上 每次我們調(diào)用 await(), count 就會(huì)遞減夯尽,而當(dāng) count 為 0 時(shí)瞧壮,就會(huì)調(diào)用 nextGeneration 方法。nextGeneration 會(huì)把計(jì)數(shù)器重置匙握,同時(shí)會(huì)喚醒阻塞的任務(wù)咆槽。順便一提的事,CyclicBarrier 實(shí)現(xiàn)阻塞和喚醒的方式是使用 Condition (前面有具體內(nèi)容)圈纺。

在 CyclicBarrier 的構(gòu)造函數(shù)里還有一個(gè) Runnable秦忿,它會(huì)在計(jì)數(shù)器為 0 的時(shí)候啟動(dòng)。

三蛾娶、DelayQueue

在 JUC 中灯谣,除了之前提到的,LinkedBlockingQueue蛔琅、ArrayBlockingQueue 和 SynchronousQueue 之外胎许,還有其他幾種 Queue, DelayQueue 就是其中之一。

DelayQueue 是一個(gè)無(wú)界的 BlockingQueue罗售,用于放置實(shí)現(xiàn)了 Delayed 接口的對(duì)象辜窑,其中對(duì)象只能在其到期才能從隊(duì)列中取走。并且該隊(duì)列是有序的寨躁,我們需要實(shí)現(xiàn) compareTo 方法用來(lái)作為排序的標(biāo)準(zhǔn)穆碎。當(dāng)從 DelayQueue 獲取對(duì)象時(shí),只會(huì)獲取延遲到期的對(duì)象朽缎。

package JUCTest;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

class DelayedTask implements Runnable,Delayed{

    private static int counter = 0;
    private final int id = counter++;
    private final int delta;
    private final long trigger;
    protected static List<DelayedTask> squence = new ArrayList<DelayedTask>();
    public DelayedTask(int delayInMilliseconds) {
        delta = delayInMilliseconds;
        trigger = System.nanoTime() + TimeUnit.NANOSECONDS.convert(delta, TimeUnit.MILLISECONDS);
        squence.add(this);
    }

    @Override
    public void run() {
        System.out.println(this);
    }
    @Override
    public int compareTo(Delayed o) {
        DelayedTask that = (DelayedTask) o;
        if(trigger < that.trigger) return 1;
        if(trigger > that.trigger) return 1;
        return 0;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(trigger - System.nanoTime(),TimeUnit.NANOSECONDS);
    }
    public String toString() {
        return String.format("[%1$-4d]", delta) + " Task " + id;
    }
    public String summary() {
        return "("+id+":"+delta+")";
    }

    public static class EndSentinel extends DelayedTask{
        private ExecutorService exec;
        public EndSentinel(int delay,ExecutorService e) {
            super(delay);
            exec=e;
        }
        public void run() {
            for(DelayedTask pt : squence) {
                System.out.print(pt.summary() + " ");
            }
            System.out.println(" ");
            System.out.println(this + " Calling shutdownNow()");
            exec.shutdownNow();
        }
    }
}

class DelayedTaskConsumer implements Runnable{
    private DelayQueue<DelayedTask> q;
    public DelayedTaskConsumer(DelayQueue<DelayedTask> q) {
        this.q = q;
    }
    @Override
    public void run() {
        try {
            while(!Thread.interrupted()) {
                q.take().run();
            }
        }catch(InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Finished DelayedTaskConsumer");
    }
}
public class DelayQueueDemo {

    public static void main(String[] args) {
        Random rand = new Random(47);
        ExecutorService exec = Executors.newCachedThreadPool();
        DelayQueue<DelayedTask> queue = new DelayQueue<DelayedTask>();
        for(int i=0;i<20;i++) {
            queue.put(new DelayedTask(rand.nextInt(5000)));
        }
        queue.add(new DelayedTask.EndSentinel(5000, exec));
        exec.execute(new DelayedTaskConsumer(queue));
    }

}

在上面這個(gè)例子中惨远,我們讓 DelayedTask 實(shí)現(xiàn)了 Runnable 和 Delayed 接口。除了 run() 方法之外话肖,我們同時(shí)實(shí)現(xiàn)了 compareTo() 和 getDelay() 方法北秽。然后輸出的結(jié)果表明,任務(wù)從隊(duì)列總出來(lái)的順序是按照 getDelay() 所獲得的值來(lái)確定的最筒。

我們使用變量 delta 來(lái)作為延遲時(shí)間的贺氓,System.nanoTime() 會(huì)獲得一個(gè)納秒為單位的數(shù)字,這個(gè)數(shù)字單獨(dú)使用沒(méi)有任何意義床蜘,但是辙培,在程序的兩個(gè)位置都使用 System.nanoTime() 并且把這兩個(gè)值相減,就能得到一個(gè)精準(zhǔn)的時(shí)間差邢锯。在構(gòu)造函數(shù)里 trigger 被賦值為 System.nanoTime() + delta扬蕊,而在 getDelay() 中返回的值是 trigger - System.nanoTime()(第二次使用,后面用 System.nanoTime()2 做區(qū)別)丹擎,那么返回的值其實(shí)是尾抑,System.nanoTime() + delta - System.nanoTime()2歇父,System.nanoTime()2 - System.nanoTime() 可以認(rèn)為使我們給 trigger 賦值和程序調(diào)用 getDelay() 之間的時(shí)間差,當(dāng)時(shí)間差再愈,也就是經(jīng)過(guò)的時(shí)間 > delta (設(shè)定的延遲時(shí)間) 時(shí)榜苫,對(duì)象才能出列。換句話(huà)說(shuō) getDelay() 返回的值 < 0 才能出列翎冲。

但是對(duì)象出列除了延遲時(shí)間到達(dá)之外這個(gè)條件之外垂睬,還得滿(mǎn)足它在對(duì)列的首位,所以我們必須使用 compareTo() 來(lái)規(guī)定一個(gè)排列的順序抗悍,使得延遲時(shí)間到達(dá)最短的放在隊(duì)首位置驹饺。所以我們用 trigger 來(lái)盡行比較。注意缴渊,這里的排隊(duì)?wèi)?yīng)該是最塊走完延遲時(shí)間的排前面逻淌,而不是延遲時(shí)間最短的排前面。比如疟暖,A的延遲時(shí)間為 1s 他是在第 10s 中的時(shí)候放進(jìn)去的,B的延遲時(shí)間為 2 s 它是在第 4s 的時(shí)候放進(jìn)去的田柔,那么B應(yīng)該排在A(yíng)前面俐巴。

那么,如果我們使用錯(cuò)誤的方式來(lái)排隊(duì)硬爆,比如把延遲時(shí)間到達(dá)最晚的放在前面欣舵。就會(huì)導(dǎo)致效率低下,程序會(huì)等到最長(zhǎng)的延遲時(shí)間到達(dá)才會(huì)有出列操作缀磕。

四缘圈、PriorityBlockingQueue

顧名思義,他是以?xún)?yōu)先級(jí)作為排序順序來(lái)給隊(duì)列中的對(duì)象排序的袜蚕。而排序的方法糟把,依舊是通過(guò) compareTo 方法實(shí)現(xiàn),其實(shí)牲剃,DelayQueue 可以看做是一種特殊的優(yōu)先級(jí)排序遣疯,除了排序之外,他還有延遲的附加條件凿傅。所以對(duì)于 PriorityBlockingQueue 我們不做過(guò)多的說(shuō)明缠犀。

五、SheduledExecutor

SheduledExecutor 可以使任務(wù)按照設(shè)定的計(jì)劃去執(zhí)行聪舒,通常辨液,我們需要在指定的時(shí)間執(zhí)行某項(xiàng)任務(wù),或者在一定的周期內(nèi)循環(huán)的執(zhí)行某項(xiàng)目箱残,就會(huì)使用到 SheduledExecutor滔迈。

package JUCTest;

import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class GreenhouseScheduler {

    private volatile boolean light = false;
    private volatile boolean water = false;
    private String thermostat = "Day";

    public synchronized String getThermostat() {
        return thermostat;
    }

    public synchronized void setThermostat(String thermostat) {
        this.thermostat = thermostat;
    }

    ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(10);

    public void schedule(Runnable event,long delay) {
        scheduler.schedule(event, delay, TimeUnit.MILLISECONDS);
    }

    public void repeat(Runnable event,long initialDelay,long period) {
        scheduler.scheduleAtFixedRate(event, initialDelay, period, TimeUnit.MILLISECONDS);
    }

    class LightOn implements Runnable{
        public void run() {
            System.out.println("Turn on lights");
            light = true;
        }
    }

    class LightOff implements Runnable{
        public void run() {
            System.out.println("Turn off lights");
            light = false;
        }
    }

    class WaterOn implements Runnable{
        public void run() {
            System.out.println("Turning greenhoulse water on");
            water = true;
        }
    }

    class WaterOff implements Runnable{
        public void run() {
            System.out.println("Turning greenhoulse water off");
            water = false;
        }
    }

    class ThermostatNight implements Runnable{
        public void run() {
            System.out.println("Thermostat to night setting");
            setThermostat("Night");
        }
    }

    class ThermostatDay implements Runnable{
        public void run() {
            System.out.println("Thermostat to day setting");
        }
    }

    class Bell implements Runnable{

        public void run() {
            System.out.println("Bing!");
        }

    }

    class Terminate implements Runnable{
        public void run() {
            System.out.println("Terminating!");
            scheduler.shutdown();
            new Thread() {
                public void run() {
                    for(DataPoint p : data) {
                        System.out.println(p);
                    }
                }
            };
        }

    }

    static class DataPoint{
        final Calendar time;
        final float temperature;
        final float humidity;
        public DataPoint(Calendar d,float temp,float hum) {
            time = d;
            temperature = temp;
            humidity = hum;
        }
        public String toString() {
            return time.getTime() + String.format(" temperature:, %1s$.1f humidity: %2$.2f",temperature);
        }
    }

    private Calendar lastTime = Calendar.getInstance();
    {
        lastTime.set(Calendar.MINUTE, 30);
        lastTime.set(Calendar.SECOND, 00);
    }

    private float lastTemp = 65.0f;
    private int tempDirection = 1;
    private float lastHumidity = 50.0f;
    private int humidityDirection = 1;
    private Random rand = new Random(47);
    List<DataPoint> data = Collections.synchronizedList(new ArrayList<DataPoint>());

    class CollectData implements Runnable{
        public void run() {
            System.out.println("Collecting date");
            synchronized(GreenhouseScheduler.this) {
                lastTime.set(Calendar.MINUTE,lastTime.get(Calendar.MINUTE) + 30);
            }
            if(rand.nextInt(5) == 4) {
                tempDirection = -tempDirection;
            }
            lastTemp = lastTemp + tempDirection*(1.0f + rand.nextFloat());
            if(rand.nextInt(5) == 4) {
                humidityDirection = -humidityDirection;
            }
            lastHumidity = lastHumidity + humidityDirection * rand.nextFloat();
            data.add(new DataPoint((Calendar)lastTime.clone(),lastTemp,lastHumidity));
        }
    }
    public static void main(String[] args) {
        GreenhouseScheduler gh =  new GreenhouseScheduler();
        gh.schedule(gh.new ThermostatNight(),5000);
        gh.repeat(gh.new Bell(), 0, 1000);
        gh.repeat(gh.newThermostatNight(), 0, 2000);
        gh.repeat(gh.new LightOn(), 0, 200);
        gh.repeat(gh.new LightOff(), 0, 400);
        gh.repeat(gh.new WaterOn(), 0, 600);
        gh.repeat(gh.new WaterOff(), 0, 800);
        gh.repeat(gh.new ThermostatDay(), 0, 1400);
        gh.repeat(gh.new CollectData(), 500, 500);

 }

}

這里我們引入了一個(gè)新的線(xiàn)程池—— ScheduledThreadPoolExecutor,他添加和執(zhí)行任務(wù)的方法不在是 Executor,而是 schedule 和 schedule亡鼠。schedule 除了需要提供一個(gè) Runnable 作為參數(shù)以外赏殃,還要提供一個(gè)延遲時(shí)間,和時(shí)間單位间涵。延遲時(shí)間和時(shí)間單位共同決定了任務(wù)在什么時(shí)候被啟動(dòng)仁热。scheduleAtFixedRate 還需額外提供一個(gè)周期時(shí)間,在到達(dá)延遲時(shí)間之后勾哩,每過(guò)一個(gè)周期抗蠢,任務(wù)就會(huì)執(zhí)行一次。

在上面的示例中思劳,我們創(chuàng)建了一個(gè)溫室迅矛,溫室需要進(jìn)行開(kāi)關(guān)燈、防水潜叛、收集數(shù)據(jù)等操作秽褒。

我們一共設(shè)置了1個(gè)單一任務(wù)和8個(gè)循環(huán)任務(wù)。在程序進(jìn)行到 5s 中時(shí)威兜,所有任務(wù)被中斷销斟。

六、Semaphore

無(wú)論是使用 synchronized 亦或是 lock 的方式椒舵,都能保證某項(xiàng)資源只能被一個(gè)任務(wù)獲取和使用蚂踊。但有時(shí)候,我們或許會(huì)希望能夠允許指定數(shù)量的任務(wù)來(lái)獲取同一個(gè)資源笔宿。JUC 為我們提供了 Semaphore 來(lái)實(shí)現(xiàn)這方面的需求犁钟。

在 Thinking in Java 的關(guān)于 Semaphore 的示例中,首先創(chuàng)建了一個(gè)使用 Semaphore 來(lái)進(jìn)行控制的對(duì)象池泼橘,然后通過(guò)這個(gè)對(duì)象池來(lái)實(shí)現(xiàn)“允許指定數(shù)量的任務(wù)來(lái)獲取同一個(gè)資源”這一功能涝动,我們先看代碼。

在看示例錢(qián)侥加,我們需要簡(jiǎn)單理解下 Semaphore 的運(yùn)作方式捧存,Seamaphore 的構(gòu)造方法里,包含兩個(gè)參數(shù):permits(int)担败,fair(bool)昔穴。permits 就是所謂的計(jì)數(shù)器的值,即我們希望資源能同時(shí)被多少任務(wù)訪(fǎng)問(wèn)提前。而 fair 是一個(gè)布爾值吗货,它決定我們使用的是公平鎖還是非公平鎖,關(guān)于公平鎖狈网,在之后的拓展章節(jié)再詳細(xì)敘述宙搬。

創(chuàng)建完 Semaphore 之后笨腥,我們通過(guò)它的 aquire() 來(lái)獲取進(jìn)入資源的權(quán)限,此時(shí)計(jì)數(shù)器 -1勇垛,通過(guò)它的 release() 方法脖母,來(lái)釋放一個(gè)權(quán)限,此時(shí)計(jì)數(shù)器 +1闲孤。

以下是 Thinking in Java 示例中所用的對(duì)象池:

package JUCTest;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;

public class Pool<T> {
    private int size;
    private List<T> items = new ArrayList<T>();
    private volatile boolean[] checkedOut;
    private Semaphore available;
    public Pool(Class<T> classObject,int size) {
        this.size = size;
        checkedOut = new boolean[size];
        available = new Semaphore(10,true);
        for(int i=0;i<size;i++) {
            try {
                items.add(classObject.newInstance());
            }catch(Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    public T checkOut() throws InterruptedException{
        available.acquire();
        return getItem();
    }

    public void checkIn(T x) {
        if(releaseItem(x)) {
            available.release();
        }
    }
    private synchronized T getItem() {
        for(int i=0;i<size;i++) {
            if(!checkedOut[i]) {
                checkedOut[i] = true;
                return items.get(i);
            }
        }
        return null;
    }

    private synchronized boolean releaseItem(T item) {
        int index = items.indexOf(item);
        if(index == -1) {
            checkedOut[index] = false;
            return true;
        }
        return false;
    }
}

在 pool 的構(gòu)造函數(shù)中谆级,我們創(chuàng)建一個(gè)可以放置對(duì)象(泛型 )的 List,初始化 Semaphore讼积。同時(shí)使用了 newInstance() 的方式創(chuàng)建了 size 個(gè)對(duì)象肥照。

在更詳細(xì)的說(shuō)明之前,先來(lái)看看這個(gè)對(duì)象池的應(yīng)用勤众。首先据沈,我們需要新建一個(gè)類(lèi):

package JUCTest;

public class Fat {
    private volatile double d;
    private static int counter = 0;
    private int id = counter++;
    public Fat() {
        for(int i=1;i<10000;i++) {
            d += (Math.PI + Math.E);
        }
    }

    public void operation() {
        System.out.println("this");
    }

    public String toString() {
        return "Fat id: " + id;
    }
}

然后攻人,我么通過(guò) Pool 來(lái)對(duì)該對(duì)象進(jìn)行管理:

package JUCTest;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

class CheckoutTask<T> implements Runnable {

    private static int counter = 0;
    private final int id = counter++;
    private Pool<T> pool;
    public CheckoutTask(Pool<T> pool) {
        this.pool = pool;
    }

    public void run() {
        try {
            T item = pool.checkOut();
            System.out.println(this + " checked out " + item);
            TimeUnit.SECONDS.sleep(1);
            System.out.println(this + "cheked in " + item);
            pool.checkIn(item);
        }catch(InterruptedException e) {
            System.out.println("InterruptedException");
        }
    }

    public String toString() {
        return "CheckoutTesk " + id + " ";
    }




}

public class SemaphoreDemo{
    final static int SIZE = 25;
    public static void main(String[] args) throws InterruptedException {
        final Pool<Fat> pool = new Pool<Fat>(Fat.class,SIZE);
        ExecutorService exec = Executors.newCachedThreadPool();
        List<Fat> lists =  new ArrayList<Fat>();
        for(int i=0;i<SIZE;i++) {
            Fat f = pool.checkOut();
            System.out.println(i + ": main() thread check out");
            f.operation();
            lists.add(f);
        }
        Future<?> blocked = exec.submit(new Runnable() {
            public void run() {
                try {
                    pool.checkOut();
                }catch(InterruptedException e) {
                    System.out.println("Check out Interrupted");
                }
            }
        });

        TimeUnit.SECONDS.sleep(2);
        blocked.cancel(true);
        System.out.println("Check in object in " + lists);
        for(Fat f : lists) {
            pool.checkIn(f);
        }
        for(Fat f : lists) {
            pool.checkIn(f);
        }
        exec.shutdown();
    }
}

在 SemaphoreDemo 中,創(chuàng)建了一個(gè)容量為 SIZE 的 pool茫经,在 pool 的構(gòu)造方法中拂蝎,我們根據(jù)傳入的模板參數(shù)账阻,創(chuàng)建了 SIZE 個(gè) Fat 對(duì)象沮明,然后所有的 Fat 的對(duì)象全部通過(guò) checkout 從 pool 里取出矾麻。

在最后往 pool 中 checkin Fat 時(shí),我們發(fā)現(xiàn)不論我們往里添加了多個(gè)對(duì)象波岛,在 pool 中始終最多只有 SIZE 個(gè)對(duì)象。那么后來(lái)的添加的對(duì)象哪里去了音半?checkin 的操作并沒(méi)有消失则拷,也沒(méi)有出錯(cuò),只是被阻塞了曹鸠,如果我們此時(shí)通過(guò) checkout 釋放出一些位置煌茬,那些消失的 Fat 就會(huì)順利的插入到 pool 里。

那么這是如何實(shí)現(xiàn)的彻桃?

在 Checkout() 中坛善,我們?cè)佾@取到 Fat 對(duì)象前,需要進(jìn)行一次 acquire() 每次的 acquire 操作邻眷,都會(huì)使得 Semaphore 中的計(jì)數(shù)器 -1眠屎,當(dāng)技術(shù)器為 0 時(shí),我們繼續(xù)進(jìn)行 checkout()(或者繼續(xù)進(jìn)行 checkout() 里的 acquire() 操作)肆饶,就會(huì)被阻塞改衩。直到我們使用 checkin() (或者說(shuō)是 checkin() 里的 release()),使得計(jì)數(shù)器 +1驯镊。被阻塞的 checkout 操作才會(huì)繼續(xù)執(zhí)行葫督。

在上面的代碼中竭鞍,我們先進(jìn)行了 SIZE 次 checkout() 操作,然后橄镜,再新建一個(gè)任務(wù)繼續(xù)使用 checkout 操作偎快,其被阻塞,直到我們將其中斷洽胶。后臺(tái)輸出 Check out Interrupted晒夹,如果我們?cè)?blocked.cancel(true) ——中斷操作之前,執(zhí)行 checkin 操作妖异,就會(huì)使阻塞的任務(wù)能夠繼續(xù)進(jìn)行下去惋戏。

七、Exchanger

Exchanger 是在兩個(gè)任務(wù)之間交換對(duì)象的柵欄他膳。當(dāng) A 和 B 任務(wù)進(jìn)入柵欄時(shí)响逢,它們各自擁有一個(gè)對(duì)象 C 和 D,當(dāng)他們離開(kāi)時(shí)棕孙,擁有的對(duì)象互換舔亭,即 A 擁有 D,B 有用 C蟀俊。在創(chuàng)建 A 和 B 時(shí)钦铺,需要把他們和同一個(gè) Exchanger 綁定。

package JUCTest;

import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

class ExchangerProducer implements Runnable{

   private Exchanger<Integer> exchanger;
   public ExchangerProducer(Exchanger<Integer> exchanger) {
       this.exchanger = exchanger;
   }
   @Override
   public void run() {
       for(int i=1;i<10;i++) {
           Integer data = i;
           System.out.println(i + " : producer before exchange : " + data);
           try {
               data = exchanger.exchange(data);
           } catch (InterruptedException e) {
               System.out.println("Interrupted...");
           }
           System.out.println(i + ": producer after exchange : " + data);
       }
   }

}

class ExchagerConsumer implements Runnable{

   private Exchanger<Integer> exchanger;
   public ExchagerConsumer(Exchanger<Integer> exchanger) {
       this.exchanger = exchanger;
       }
   @Override
   public void run() {
       for(int i=1;i<10;i++) {
           Integer data = i * 2;
           System.out.println(i + " : consumer before exchange : " + data);
           try {
               data = exchanger.exchange(data);
           } catch (InterruptedException e) {
               System.out.println("Interrupted...");
           }
           System.out.println(i + " : consumer after exchange : " + data);
       }
   }
}
public class ExchagerDemo {

   public static void main(String[] args) throws InterruptedException {
       Exchanger<Integer> exchanger = new Exchanger<Integer>();
       ExchangerProducer exchangerProducer = new ExchangerProducer(exchanger);
       ExchagerConsumer exchagerConsumer = new ExchagerConsumer(exchanger);
       ExecutorService exec = Executors.newCachedThreadPool();
       exec.execute(exchangerProducer);
       exec.execute(exchagerConsumer);
       TimeUnit.SECONDS.sleep(3);
       exec.shutdownNow();

   }

}

在上面的示例中肢预,producer 負(fù)責(zé)生產(chǎn)奇數(shù)矛洞,consumer 負(fù)責(zé)生產(chǎn)偶數(shù),在 producer 或者 consumer 生產(chǎn)完一個(gè)數(shù)之后烫映,會(huì)將其放入 Exchanger 中等待交換沼本,雙方進(jìn)入到阻塞狀態(tài),等待交換完成之后锭沟,任務(wù)繼續(xù)進(jìn)行抽兆。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市族淮,隨后出現(xiàn)的幾起案子辫红,更是在濱河造成了極大的恐慌,老刑警劉巖祝辣,帶你破解...
    沈念sama閱讀 206,126評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件贴妻,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡蝙斜,警方通過(guò)查閱死者的電腦和手機(jī)揍瑟,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)乍炉,“玉大人绢片,你說(shuō)我怎么就攤上這事滤馍。” “怎么了底循?”我有些...
    開(kāi)封第一講書(shū)人閱讀 152,445評(píng)論 0 341
  • 文/不壞的土叔 我叫張陵巢株,是天一觀(guān)的道長(zhǎng)。 經(jīng)常有香客問(wèn)我熙涤,道長(zhǎng)阁苞,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,185評(píng)論 1 278
  • 正文 為了忘掉前任祠挫,我火速辦了婚禮那槽,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘等舔。我一直安慰自己骚灸,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,178評(píng)論 5 371
  • 文/花漫 我一把揭開(kāi)白布慌植。 她就那樣靜靜地躺著甚牲,像睡著了一般。 火紅的嫁衣襯著肌膚如雪蝶柿。 梳的紋絲不亂的頭發(fā)上丈钙,一...
    開(kāi)封第一講書(shū)人閱讀 48,970評(píng)論 1 284
  • 那天,我揣著相機(jī)與錄音交汤,去河邊找鬼雏赦。 笑死,一個(gè)胖子當(dāng)著我的面吹牛芙扎,可吹牛的內(nèi)容都是我干的喉誊。 我是一名探鬼主播,決...
    沈念sama閱讀 38,276評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼纵顾,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了栋盹?” 一聲冷哼從身側(cè)響起施逾,我...
    開(kāi)封第一講書(shū)人閱讀 36,927評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎例获,沒(méi)想到半個(gè)月后汉额,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,400評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡榨汤,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,883評(píng)論 2 323
  • 正文 我和宋清朗相戀三年蠕搜,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片收壕。...
    茶點(diǎn)故事閱讀 37,997評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡妓灌,死狀恐怖轨蛤,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情虫埂,我是刑警寧澤祥山,帶...
    沈念sama閱讀 33,646評(píng)論 4 322
  • 正文 年R本政府宣布,位于F島的核電站掉伏,受9級(jí)特大地震影響缝呕,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜斧散,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,213評(píng)論 3 307
  • 文/蒙蒙 一供常、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧鸡捐,春花似錦栈暇、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,204評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至鹿寨,卻和暖如春新博,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背脚草。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,423評(píng)論 1 260
  • 我被黑心中介騙來(lái)泰國(guó)打工赫悄, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人馏慨。 一個(gè)月前我還...
    沈念sama閱讀 45,423評(píng)論 2 352
  • 正文 我出身青樓埂淮,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親写隶。 傳聞我的和親對(duì)象是個(gè)殘疾皇子倔撞,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,722評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容