關(guān)于Java多線程的一些称『簦考知識點

前言

Java多線程也是面試中經(jīng)常會提起到的一個點卧秘。面試官會問:實現(xiàn)多線程的兩種方式以及區(qū)別,死鎖發(fā)生的4個條件以及如何避免發(fā)生死鎖官扣,死鎖和活鎖的區(qū)別翅敌,常見的線程池以及區(qū)別,怎么理解有界隊列與無界隊列惕蹄,多線程生產(chǎn)者消費者模型蚯涮,怎么設(shè)計一個線程池治专,線程池的大致實現(xiàn),ReetrantLockSynchronizedReadWriteLock的源碼和區(qū)別遭顶、具體業(yè)務場景分析等等张峰。

ashin_is_handsome.jpg

生產(chǎn)者消費者模型

其實生產(chǎn)者消費者模型挺像觀察者模式的,對于該模型我們應該明確以下4點:

  • 當生產(chǎn)者生產(chǎn)出產(chǎn)品時棒旗,應該通知消費者去消費喘批。
  • 當消費者消費完產(chǎn)品,應該通知生產(chǎn)者去生產(chǎn)铣揉。
  • 當產(chǎn)品的庫存滿了的時候饶深,生產(chǎn)者不應該再去生產(chǎn),而是通知消費者去消費逛拱。
  • 當產(chǎn)品的庫存為0的時候敌厘,消費者不應該去消費,而是通知生產(chǎn)者去生產(chǎn)橘券。
wait()和notify()實現(xiàn)
  • 定義生產(chǎn)者额湘。
public class Producter implements Runnable {

    private Storage resource;

    public Producter(Storage resource) {
        super();
        this.resource = resource;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            resource.produce();
        }
    }
}
  • 定義消費者。
public class Consumer implements Runnable {

    private Storage resource;

    public Consumer(Storage resource) {
        super();
        this.resource = resource;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            resource.cosume();
        }
    }
}

  • 定義Storage倉庫旁舰。
public class Storage {

    private int MAX_SIZE = 20;

    private int count = 0;

    public synchronized void cosume() {
        while (count == 0) {
            try {
                System.out.println("【消費者】庫存已經(jīng)為空了锋华,暫時不能進行消費任務!");
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        count--;
        System.out.println("【消費者】" + Thread.currentThread().getName() + "消費產(chǎn)品, 庫存:" + count);
        this.notify();
    }

    public synchronized void produce() {
        while (count >= MAX_SIZE) {
            try {
                System.out.println("【生產(chǎn)者】庫存已經(jīng)滿了,暫時不能進行生產(chǎn)任務箭窜!");
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        count++;
        System.out.println("【生產(chǎn)者】" + Thread.currentThread().getName() + "生產(chǎn)產(chǎn)品, 庫存" + count);
        this.notify();
    }
}

  • 測試demo
public class ProducterCosumerXDemo {

    public static void main(String[] args) {
        Storage storage = new Storage();
        ExecutorService service = Executors.newCachedThreadPool();

        for (int i = 0; i < 5; i++) {
            service.submit(new Producter(storage));
        }

        for (int i = 0; i < 5; i++) {
            service.submit(new Consumer(storage));
        }

        service.shutdown();
    }
}
image.png

image.png

image.png

我們這里創(chuàng)建了5個生產(chǎn)者毯焕,5個消費者。生產(chǎn)者生產(chǎn)的速度比消費者消費的速度要快磺樱,從圖中很明顯看到生產(chǎn)者率先生產(chǎn)出20個產(chǎn)品纳猫,已是庫存極大值,往后不能再去生產(chǎn)了竹捉,然后通知消費者去消費芜辕。

用BlockingQueue實現(xiàn)

BlockingQueue是一個阻塞隊列击狮,也是面試官常喜歡問的一個點劣摇。BlockingQueue是線程安全的,內(nèi)部可以自動調(diào)用wait()喻鳄,notify()方法憨闰。在多線程環(huán)境下状蜗,我們可以使用BlockingQueue去完成數(shù)據(jù)的共享,同時可以兼顧到效率和線程安全鹉动。

如果生產(chǎn)者生產(chǎn)商品的速度遠大于消費者消費的速度轧坎,并且生產(chǎn)的商品累積到一定的數(shù)量,已經(jīng)超過了BlockingQueue的最大容量泽示,那么生產(chǎn)者就會被阻塞缸血。那為什么時候撤銷生產(chǎn)者的阻塞呢蜜氨?只有消費者開始消費累積的商品,且累積的商品數(shù)量小于BlockingQueue的最大容量属百,才能撤銷生產(chǎn)者的阻塞记劝。

如果庫存為0的話,消費者自動被阻塞族扰。只有生產(chǎn)者生產(chǎn)出商品,才會撤銷消費者的阻塞定欧。

  • 定義Storage倉庫
public class Storage {

    private BlockingQueue<Product> queues = new LinkedBlockingQueue<Product>(10);

    public void push(Product p) throws InterruptedException {
        queues.put(p);
    }

    public Product pop() throws InterruptedException {
        return queues.take();
    }
}
  • 定義Product商品
public class Product {
    private int id;

    public static int MAX = 20;

    public Product(int id) {
        super();
        this.id = id;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }
}
  • 定義生產(chǎn)者
public class Producer implements Runnable {

    private String name;
    private Storage storage = null;

    public Producer(String name, Storage storage) {
        this.name = name;
        this.storage = storage;
    }

    @Override
    public void run() {
        int i = 0;
        try {
            while (true) {
                System.out.println(name + "已經(jīng)生產(chǎn)一個: id為" + i + "的商品");
                System.out.println("===========================");

                Product product = new Product(i++);

                storage.push(product);

                Thread.sleep(100);
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  • 定義消費者
public class Consumer implements Runnable {

    private String name;
    private Storage storage = null;

    public Consumer(String name, Storage s) {
        this.name = name;
        this.storage = s;
    }

    @Override
    public void run() {
        try {
            while (true) {
                Product product = storage.pop();

                System.out.println(name + "已經(jīng)消費一個: id為" + product.getId() + "的商品");
                System.out.println("===========================");
                Thread.sleep(500);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  • 測試demo
public class ProducterConsumerDemo {

    public static void main(String[] args) {
        Storage storage = new Storage();

        ExecutorService service = Executors.newCachedThreadPool();
        Producer p0 = new Producer("騰訊", storage);
        Consumer c0 = new Consumer("cmazxiaoma", storage);
        
        service.submit(p0);
        service.submit(c0);

        service.shutdown();

    }
}
image.png
image.png

我們可以清晰看到在"騰訊已經(jīng)生產(chǎn)一個:id為13的商品"之前渔呵,生產(chǎn)者是隨便的生產(chǎn),消費者是隨便的消費砍鸠,生產(chǎn)者的速度遠遠大于消費者的速度扩氢。然而在"騰訊已經(jīng)生產(chǎn)一個:id為13的商品"之后,累積的商品數(shù)量已經(jīng)要到達BlockingQueue的最大容量10了爷辱。此時生產(chǎn)者已經(jīng)被阻塞了录豺,已經(jīng)不能再生產(chǎn)了,只有當消費者開始產(chǎn)品時饭弓,才能喚醒生產(chǎn)者双饥。所以之后的打印內(nèi)容就很有規(guī)律了:騰訊一生產(chǎn)商品,cmazxiaoma就開始消費弟断。

從這個例子中咏花,我們可以看到BlockingQueue通過平衡生產(chǎn)者和消費者的處理能力,因此提高了整體處理數(shù)據(jù)的速度阀趴。


死鎖和活鎖

  • 死鎖:兩個或者多個線程相互等待對方釋放鎖昏翰,則會出現(xiàn)循環(huán)等待的現(xiàn)象,也就是死鎖刘急。Java虛擬機沒有有效的措施去解決死鎖情況棚菊,所以在多線程編程中應該采用措施去避免死鎖的出現(xiàn)。

  • 活鎖:任務或者執(zhí)行者沒有被阻塞叔汁,由于某些條件沒有滿足统求,導致一直重復嘗試 -> 失敗 -> 嘗試 -> 失敗」ツ活鎖和死鎖的區(qū)別在于球订,處于活鎖的實體在不斷的改變狀態(tài)。而處于死鎖的實體表現(xiàn)為等待瑰钮,活鎖有可能自行解開冒滩,死鎖則不能。


死鎖產(chǎn)生的原因以及四個必要條件

  • 產(chǎn)生死鎖的原因:

    • 系統(tǒng)資源不足浪谴。
    • 資源分配不當开睡。
    • 進程運行推進的順序不合適因苹。
  • 發(fā)生死鎖的必要條件:

    • 互斥條件: 一個資源只能被一個進程占用,直到被該進程釋放篇恒。

    • 請求和保持條件:一個進程因請求被占用資源而發(fā)生阻塞時扶檐,對已獲得的資源保持不放。

    • 不可剝奪條件:任何一個資源在沒被該進程釋放之前胁艰,任何其他進程都無法對它進行剝奪占用款筑。

    • 循環(huán)等待條件:當發(fā)生死鎖時,所等待的進程必定會發(fā)生環(huán)路腾么,造成永久阻塞奈梳。

  • 防止死鎖的一些方法:

    • 破除互斥等待:一般無法做到。

    • 破除請求和保持:一次性獲取所有的資源解虱。

    • 破除循環(huán)等待:按順序獲取資源攘须。

    • 破除無法剝奪的等待:加入超時機制。

  • 手寫死鎖的demo

public class DeadLockDemo {
    public static String obj1 = "obj1";
    public static String obj2 = "obj2";

    public static void main(String[] args) {
        Thread a = new Thread(new LockThread1());
        Thread b = new Thread(new LockThread2());
        a.start();
        b.start();
    }
}

class LockThread1 implements Runnable {

    @Override
    public void run() {

        System.out.println("lockThread1 running");
        while (true) {
            synchronized (DeadLockDemo.obj1) {
                System.out.println("lockThrea1 lock obj1");
                try {
                    Thread.sleep(3000);
                    synchronized (DeadLockDemo.obj2) {
                        System.out.println("lockThrea1 lock obj2");
                    }
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
    }
}

class LockThread2 implements Runnable {

    @Override
    public void run() {

        System.out.println("lockThread2 running");
        while (true) {
            synchronized (DeadLockDemo.obj2) {
                System.out.println("lockThrea2 lock obj2");
                try {
                    Thread.sleep(3000);
                    synchronized (DeadLockDemo.obj1) {
                        System.out.println("lockThrea2 lock obj1");
                    }
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
    }
}
image.png

很明顯殴泰,發(fā)生了死鎖現(xiàn)象于宙,美滋滋。


線程池

  • 首先來說線程的好處悍汛。

    • 重用存在的線程捞魁,減少對象創(chuàng)建,消亡的開銷员凝,性能佳署驻。

    • 可有效控制最大并發(fā)線程數(shù),提高系統(tǒng)資源的使用率健霹,同時避免過多資源競爭旺上,避免阻塞。

    • 提供定時定期執(zhí)行糖埋,單線程宣吱,并發(fā)數(shù)控制等功能。

  • 我們來看看ThreadPoolExecutor的構(gòu)造器瞳别。

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters and default thread factory.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

通過閱讀注釋征候,我們可以知道參數(shù)的含義:

  • corePoolSize:核心池的大小,默認情況下祟敛,在創(chuàng)建線程池后疤坝,線程池中的線程數(shù)為0,當有任務來之后馆铁,就會創(chuàng)建一個線程來執(zhí)行任務跑揉,在線程池中的線程數(shù)目達到corePoolSize后,就會把到達的任務放到緩存隊列中。

  • maximumPoolSize:線程池最大的線程數(shù)历谍。

  • keepAliveTime:表示線程沒有任務執(zhí)行時现拒,最多保持多久時間會終止。默認情況下望侈,只有當線程池中的線程數(shù)大于corePoolSize時印蔬,keepAliveTime才會起作用,直到線程池中的線程數(shù)不大于corePoolSize脱衙。

  • unit:參數(shù)keepAliveTime的時間單位侥猬。

  • workQueue:一個阻塞隊列,用來存儲等待執(zhí)行的任務岂丘。有ArrayBlockingQueue陵究,LinkedBlockingQueueSynchronousQueue奥帘。

  • threadFactory:線程工廠,主要用來創(chuàng)建線程仪召。

  • handler:表示當拒絕處理任務時的策略寨蹋。我們可以在ThreadPoolExecutor類中,看到4種拒絕策略扔茅。

    • AbortPolicy已旧,拒絕處理任務時會拋出一個RejectedExecutionException異常。
    /**
     * A handler for rejected tasks that throws a
     * {@code RejectedExecutionException}.
     */
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }
  • DiscardPolicy召娜,拒絕處理任務時默認丟棄該任務运褪。
    /**
     * A handler for rejected tasks that silently discards the
     * rejected task.
     */
    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }

        /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }
  • DiscardOldestPolicy,去處理拒絕的任務玖瘸,丟棄最舊的未處理的任務秸讹,然后重新執(zhí)行該任務。除非執(zhí)行者被關(guān)閉雅倒,在這種情況下璃诀,任務會被丟棄。
   /**
     * A handler for rejected tasks that discards the oldest unhandled
     * request and then retries {@code execute}, unless the executor
     * is shut down, in which case the task is discarded.
     */
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() { }

        /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }
}
  • CallerRunsPolicy蔑匣,拒絕的任務由調(diào)用線程去處理劣欢,除非執(zhí)行者被關(guān)閉,那么拒絕的任務直接丟棄裁良。
 /**
     * A handler for rejected tasks that runs the rejected task
     * directly in the calling thread of the {@code execute} method,
     * unless the executor has been shut down, in which case the task
     * is discarded.
     */
    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }

        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }
  • Java通過Executors提供了4種線程池凿将,分別為:
    • newCachedThreadPool:創(chuàng)建一個可緩存線程池,里面有我們需要的線程价脾。但是當線程池里面的線程可用的時候牧抵,我們可以重用它們。當我們需要執(zhí)行生命周期很短的任務時彼棍,我們將重復使用之前構(gòu)造的線程處理任務灭忠,這樣線程池通常能提高性能膳算。如果沒有現(xiàn)成的線程可使用,會創(chuàng)建一個新的線程并添加到線程池中弛作。如果有線程在60s中未使用涕蜂,我們會終結(jié)它并把它從緩存中刪除。因此一個閑置時間足夠長的線程池映琳,將不會消耗任何資源机隙。如果我們需要自定義超時參數(shù),我們可以通過ThreadPoolExecutor進行構(gòu)建線程池萨西。
    /**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available.  These pools will typically improve the performance
     * of programs that execute many short-lived asynchronous tasks.
     * Calls to {@code execute} will reuse previously constructed
     * threads if available. If no existing thread is available, a new
     * thread will be created and added to the pool. Threads that have
     * not been used for sixty seconds are terminated and removed from
     * the cache. Thus, a pool that remains idle for long enough will
     * not consume any resources. Note that pools with similar
     * properties but different details (for example, timeout parameters)
     * may be created using {@link ThreadPoolExecutor} constructors.
     *
     * @return the newly created thread pool
     */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
  • newFixedThreadPool:定長線程池有鹿,可控制并發(fā)數(shù),超出指定數(shù)量的線程將在隊列中等待谎脯。創(chuàng)建一個線程池葱跋,重用固定的數(shù)量的線程池,使用的是共享的無界隊列源梭。在任何時候娱俺,至多nThreads線程被激活的處理事務。當所有的線程處于繁忙的狀態(tài)下废麻,有其他的任務被提交過來荠卷,它們會在隊列中等待,直到有可用的線程為止烛愧。如果任何線程在執(zhí)行期間由于失敗而終止油宜,在shutdown之前,如果有需要的話怜姿,將會有新的線程去取代它并執(zhí)行后續(xù)任務慎冤。線程池中的存在會一直存在除非明確的進行shutdown
    /**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue.  At any point, at most
     * {@code nThreads} threads will be active processing tasks.
     * If additional tasks are submitted when all threads are active,
     * they will wait in the queue until a thread is available.
     * If any thread terminates due to a failure during execution
     * prior to shutdown, a new one will take its place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly {@link ExecutorService#shutdown shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code nThreads <= 0}
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
  • newScheduledThreadPool:創(chuàng)建一個線程池社牲,可以調(diào)度命令在給定的延遲后運行粪薛,或定時執(zhí)行。其corePoolSize參數(shù)是保留在線程池中的線程數(shù)量搏恤,即使它們閑置违寿。
    /**
     * Creates a thread pool that can schedule commands to run after a
     * given delay, or to execute periodically.
     * @param corePoolSize the number of threads to keep in the pool,
     * even if they are idle
     * @return a newly created scheduled thread pool
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

    /**
     * Creates a new {@code ScheduledThreadPoolExecutor} with the
     * given core pool size.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

  • newSingleThreadExecutor:創(chuàng)建一個使用單線程執(zhí)行無界隊列的Executor。(如果這個單線程在關(guān)閉之前的執(zhí)行期由于失敗而終止熟空,如果需要執(zhí)行后續(xù)任務的話藤巢,那么新的線程會取代它)可以保證任務保持按順序進行,并且在任何給定時間不會超過一個任務處于活躍狀態(tài)息罗。它返回的executor不同于newFixedThreadPool返回的executor掂咒。newFixedThreadPool返回的executor保證不需要重新配置,即可使用其他的線程。
    /**
     * Creates an Executor that uses a single worker thread operating
     * off an unbounded queue. (Note however that if this single
     * thread terminates due to a failure during execution prior to
     * shutdown, a new one will take its place if needed to execute
     * subsequent tasks.)  Tasks are guaranteed to execute
     * sequentially, and no more than one task will be active at any
     * given time. Unlike the otherwise equivalent
     * {@code newFixedThreadPool(1)} the returned executor is
     * guaranteed not to be reconfigurable to use additional threads.
     *
     * @return the newly created single-threaded Executor
     */
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

怎么理解無界隊列和有界隊列

ArrayBlockingQueue就是一個有界隊列绍刮,而LinkedBlockingQueue温圆、SynchronousQueue則是無界隊列。

  • SynchronousQueue:它是無界的孩革,是一種無緩沖的等待隊列岁歉,但是由于該Queue本身的特性,在某次添加元素后必須等待其他線程取走才能繼續(xù)添加膝蜈。有2種模式锅移,一個是公平模式,采用的是公平鎖饱搏,并配合一個FIFO隊列(Queue)來管理多余的生產(chǎn)者和消費者非剃。另一個是非公平模式,采用的是非公平鎖推沸,并配合一個LIFO(Stack)來管理多余的生產(chǎn)者和消費者备绽,這也是SynchronousQueue默認的模式。后一者模式鬓催,如果生產(chǎn)者和消費者的處理速度有差距的話疯坤,很容易出現(xiàn)饑渴情況進而導致有些數(shù)據(jù)得不到處理。(公平鎖:加鎖前檢查是否有排隊的線程深浮,優(yōu)先排隊等待的線程,先到先得眠冈。 非公平鎖:加鎖時不考慮排隊等待問題飞苇,直接嘗試獲取鎖,獲取不到自動到隊尾等待)

  • LinkedBlockingQueue:它是一個無界的蜗顽,是一個無界緩存的等待隊列布卡。LinkedBlockingQueue之所以能高效的處理并發(fā)數(shù)據(jù),正因為消費者和生產(chǎn)者分別采用了獨立的鎖來控制數(shù)據(jù)的同步雇盖,這也意味著在高并發(fā)的情況下忿等,生產(chǎn)者和消費者可以并行的操作隊列中的數(shù)據(jù),以此來提高整個隊列的并發(fā)性能崔挖。

    /**
     * Tail of linked list.
     * Invariant: last.next == null
     */
    private transient Node<E> last;

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();
  • ArrayBlockingQueue:它是有界的贸街,是一個有界緩存的等待隊列。內(nèi)部維護著一個定長數(shù)據(jù)緩存隊列狸相,由數(shù)組構(gòu)成薛匪。takeIndex標識著隊列的頭部,putIndex標識著隊列的尾部脓鹃。ArrayBlockingQueue只有一個主鎖逸尖,證明生產(chǎn)者和消費者無法并行運行。
    /** The queued items */
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

    /** Number of elements in the queue */
    int count;

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;

    /**
     * Shared state for currently active iterators, or null if there
     * are known not to be any.  Allows queue operations to update
     * iterator state.
     */
    transient Itrs itrs = null;

無界隊列和有界隊列其實會給ThreadPoolExecutor造成一定的影響。

  • corePoolSize = maximumPoolSize下娇跟,有界隊列也滿了的話岩齿,那么線程池就會采取拒絕任務策略。

  • 與有界隊列相比苞俘,除非系統(tǒng)資源耗盡盹沈,否則無界的任務隊列不存在任務入隊失敗的情況。如果任務創(chuàng)建和處理的速度差異很大苗胀,無界隊列會快速增長襟诸,直到耗盡內(nèi)存。


ReetrantLock基协、Synchronized歌亲、ReadWriteLock

Lockjdk1.5引進的,ReetrantLockjdk1.6引進的澜驮。上一次二面機器人公司陷揪,就是掛在這里了。面試官讓我說說它們的源碼實現(xiàn)以及需求場景杂穷。

使用場景:

  • 當讀寫頻率幾乎相等悍缠,而且不需要特殊需求的時候,優(yōu)先考慮synchronized

  • 當我們需要定制我們自己的Lock時耐量,或者需要更多的功能(類似定時鎖飞蚓,可中斷鎖等
    待),我們可以使用ReetrantLock廊蜒。

  • 當我們很少的進行寫操作趴拧,更多的讀操作,并且讀操作是一個相對耗時的操作山叮,那么就可以使用ReadWriteLock著榴。

  • Synchronized是一種互斥鎖。在實際開發(fā)中屁倔,當某個變量需要在多個線程之間共享的話脑又,需要分析具體場景。如果多個線程對該共享變量的讀和寫沒有競爭關(guān)系锐借,則可以使用Concurrent包下提供的并發(fā)數(shù)據(jù)結(jié)構(gòu)问麸。但是如果多個線程對共享變量之間的讀和寫之間有競爭關(guān)系的話,則需要鎖住整個變量了瞎饲。SynchronizedJava內(nèi)置鎖口叙,JVM是通過monitorentermonitorexit指令實現(xiàn)內(nèi)置鎖。

    • 每個對象都有一個monitor鎖嗅战,當monitor被占用時妄田,該對象就處于鎖定狀態(tài)俺亮,其他試圖訪問該對象的線程將會被阻塞。

    • 對于同一個線程來說疟呐,monitor是可重入的脚曾,重入的時候會將占有數(shù)加1。

    • 當一個線程試圖訪問某一個變量時启具,如果發(fā)現(xiàn)該變量的monitor占有數(shù)為0本讥,就可以美滋滋的占用該對象,如果大于等于1的話鲁冯,那么苦滋滋的進入阻塞狀態(tài)拷沸。

    • 執(zhí)行monitorexit的線程必須是持有monitor的某一個對象。當執(zhí)行完這個命令時薯演,如果占用數(shù)為0撞芍,則當前線程釋放monitor

  • ReetrantLock是基于AQS( AbstractQueuedSynchronizer)的鎖跨扮。

    • 有一個state變量序无,初始值為0,假設(shè)當前線程為A衡创,每當A獲取一次鎖帝嗡,state++。每當A釋放一次鎖的時候璃氢,state--哟玷。

    • A擁有鎖的時候,state肯定大于0一也。B線程嘗試獲取鎖的時候碗降,會對這個state有一個CAS(0,1)的操作塘秦,嘗試幾次失敗后就掛起線程,進入等待隊列动看。

    • 如果A線程恰好釋放鎖尊剔,state等于0,就會去喚醒等待隊列中的B菱皆。B被喚醒之后回去檢查這個state的值须误,嘗試CAS(0,1)仇轻,如果這時恰好C線程也嘗試爭搶這把鎖京痢。

    • 公平鎖的實現(xiàn),C發(fā)現(xiàn)線程B在等待隊列篷店,直接將自己進入等待隊列并掛起祭椰,然后B獲得鎖臭家。

    • 非公平鎖的實現(xiàn),C直接嘗試CAS(0,1)操作方淤,并成功改變了state的值钉赁,B獲得鎖失敗,再次掛起携茂。BC之前嘗試獲取鎖你踩,而最終C搶到了鎖。


參考文章


尾言

書山有路勤為徑,學海無涯苦作舟卿堂。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末束莫,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子草描,更是在濱河造成了極大的恐慌览绿,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,723評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件穗慕,死亡現(xiàn)場離奇詭異饿敲,居然都是意外死亡,警方通過查閱死者的電腦和手機逛绵,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,485評論 2 382
  • 文/潘曉璐 我一進店門怀各,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人术浪,你說我怎么就攤上這事瓢对。” “怎么了胰苏?”我有些...
    開封第一講書人閱讀 152,998評論 0 344
  • 文/不壞的土叔 我叫張陵硕蛹,是天一觀的道長。 經(jīng)常有香客問我硕并,道長法焰,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,323評論 1 279
  • 正文 為了忘掉前任倔毙,我火速辦了婚禮埃仪,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘陕赃。我一直安慰自己卵蛉,他們只是感情好颁股,可當我...
    茶點故事閱讀 64,355評論 5 374
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著毙玻,像睡著了一般豌蟋。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上桑滩,一...
    開封第一講書人閱讀 49,079評論 1 285
  • 那天梧疲,我揣著相機與錄音,去河邊找鬼运准。 笑死幌氮,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的胁澳。 我是一名探鬼主播该互,決...
    沈念sama閱讀 38,389評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼韭畸!你這毒婦竟也來了宇智?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,019評論 0 259
  • 序言:老撾萬榮一對情侶失蹤胰丁,失蹤者是張志新(化名)和其女友劉穎随橘,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體锦庸,經(jīng)...
    沈念sama閱讀 43,519評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡机蔗,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,971評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了甘萧。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片萝嘁。...
    茶點故事閱讀 38,100評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖扬卷,靈堂內(nèi)的尸體忽然破棺而出牙言,到底是詐尸還是另有隱情,我是刑警寧澤怪得,帶...
    沈念sama閱讀 33,738評論 4 324
  • 正文 年R本政府宣布嬉挡,位于F島的核電站,受9級特大地震影響汇恤,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜拔恰,卻給世界環(huán)境...
    茶點故事閱讀 39,293評論 3 307
  • 文/蒙蒙 一因谎、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧颜懊,春花似錦财岔、人聲如沸风皿。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,289評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽桐款。三九已至,卻和暖如春夷恍,著一層夾襖步出監(jiān)牢的瞬間魔眨,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,517評論 1 262
  • 我被黑心中介騙來泰國打工酿雪, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留遏暴,地道東北人。 一個月前我還...
    沈念sama閱讀 45,547評論 2 354
  • 正文 我出身青樓指黎,卻偏偏與公主長得像朋凉,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子醋安,可洞房花燭夜當晚...
    茶點故事閱讀 42,834評論 2 345

推薦閱讀更多精彩內(nèi)容