以下內容整理自互聯(lián)網社搅,僅用于個人學習
轉載自http://huachao1001.github.io/article.html?QhSkxKKX
生產者消費者問題:
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。
生產者消費者模式的優(yōu)點:
- 解耦
- 支持并發(fā)
- 支持忙閑不均
解決方法可分為兩類:
- 用信號量和鎖機制實現(xiàn)生產者和消費者之間的同步
- wait() / notify()方法
- await() / signal()方法
- BlockingQueue阻塞隊列方法
- Semaphore方法
- 在生產者和消費者之間建立一個管道订歪。(一般不使用撤奸,緩沖區(qū)不易控制署咽、數(shù)據(jù)不易封裝和傳輸)
- PipedInputStream / PipedOutputStream
wait() / notify()方法
publicclassTest{
privatestaticIntegercount=0;
privatefinalIntegerFULL=5;
privatestaticStringlock="lock";
publicstaticvoidmain(String[]args){
Testt=newTest();
newThread(t.newProducer()).start();
newThread(t.newConsumer()).start();
newThread(t.newProducer()).start();
newThread(t.newConsumer()).start();
}
classProducerimplementsRunnable{
@Override
publicvoidrun(){
for(inti=0;i<5;i++){
try{
Thread.sleep(1000);
}catch(InterruptedExceptione1){
e1.printStackTrace();
}
synchronized(lock){
while(count==FULL){
try{
lock.wait();
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
count++;
System.out.println("生產者"+Thread.currentThread().getName()
+"已生產完成漱牵,商品數(shù)量:"+count);
lock.notifyAll();
}
}
}
}
classConsumerimplementsRunnable{
@Override
publicvoidrun(){
for(inti=0;i<5;i++){
try{
Thread.sleep(1000);
}catch(InterruptedExceptione1){
e1.printStackTrace();
}
synchronized(lock){
while(count==0){
try{
lock.wait();
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
count--;
System.out.println("消費者"+Thread.currentThread().getName()
+"已消費戳杀,剩余商品數(shù)量:"+count);
lock.notifyAll();
}
}
}
}
}
運行結果:
生產者Thread-0已生產完成艾少,商品數(shù)量:1
生產者Thread-2已生產完成卡乾,商品數(shù)量:2
消費者Thread-1已消費,剩余商品數(shù)量:1
消費者Thread-3已消費缚够,剩余商品數(shù)量:0
生產者Thread-0已生產完成幔妨,商品數(shù)量:1
消費者Thread-3已消費,剩余商品數(shù)量:0
生產者Thread-2已生產完成谍椅,商品數(shù)量:1
消費者Thread-1已消費误堡,剩余商品數(shù)量:0
生產者Thread-0已生產完成,商品數(shù)量:1
生產者Thread-2已生產完成雏吭,商品數(shù)量:2
消費者Thread-1已消費锁施,剩余商品數(shù)量:1
消費者Thread-3已消費,剩余商品數(shù)量:0
生產者Thread-0已生產完成杖们,商品數(shù)量:1
消費者Thread-1已消費悉抵,剩余商品數(shù)量:0
生產者Thread-2已生產完成,商品數(shù)量:1
消費者Thread-3已消費摘完,剩余商品數(shù)量:0
生產者Thread-0已生產完成姥饰,商品數(shù)量:1
消費者Thread-1已消費,剩余商品數(shù)量:0
生產者Thread-2已生產完成孝治,商品數(shù)量:1
消費者Thread-3已消費媳否,剩余商品數(shù)量:0
await() / signal()方法
await()/signal()是對wait()/notify()的改進,功能更加強大荆秦,更適用于高級用戶篱竭,synchronized是托管給JVM執(zhí)行的,而lock是Java寫的控制鎖的代碼步绸。
下面是使用ReentrantLock來實現(xiàn)生產者消費者問題:
publicclassTest{
privatestaticIntegercount=0;//緩沖區(qū)
privatefinalIntegerFULL=5;
finalLocklock=newReentrantLock();//獲得可重入鎖
finalConditionput=lock.newCondition();
finalConditionget=lock.newCondition();
publicstaticvoidmain(String[]args){
Testt=newTest();
newThread(t.newProducer()).start();
newThread(t.newConsumer()).start();
newThread(t.newConsumer()).start();
newThread(t.newProducer()).start();
}
//生產者
classProducerimplementsRunnable{
@Override
publicvoidrun(){
for(inti=0;i<5;i++){
try{
Thread.sleep(1000);
}catch(InterruptedExceptione1){
e1.printStackTrace();
}
//加鎖
lock.lock();
try{
while(count==FULL){
try{
put.await();
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
count++;
System.out.println("生產者"+Thread.currentThread().getName()
+"已生產完成掺逼,商品數(shù)量:"+count);
//通知消費者,現(xiàn)在可以消費
get.signal();
}finally{
lock.unlock();
}
}
}
}
classConsumerimplementsRunnable{
@Override
publicvoidrun(){
for(inti=0;i<5;i++){
try{
Thread.sleep(1000);
}catch(InterruptedExceptione1){
e1.printStackTrace();
}
lock.lock();
try{
while(count==0){
try{
get.await();
}catch(Exceptione){
e.printStackTrace();
}
}
count--;
System.out.println("消費者"+Thread.currentThread().getName()
+"已消費瓤介,剩余商品數(shù)量:"+count);
put.signal();
}finally{
lock.unlock();
}
}
}
}
}
運行結果:
生產者Thread-3已生產完成吕喘,商品數(shù)量:1
生產者Thread-0已生產完成赘那,商品數(shù)量:2
消費者Thread-1已消費,剩余商品數(shù)量:1
消費者Thread-2已消費氯质,剩余商品數(shù)量:0
生產者Thread-3已生產完成募舟,商品數(shù)量:1
生產者Thread-0已生產完成,商品數(shù)量:2
消費者Thread-1已消費闻察,剩余商品數(shù)量:1
消費者Thread-2已消費拱礁,剩余商品數(shù)量:0
生產者Thread-0已生產完成,商品數(shù)量:1
生產者Thread-3已生產完成辕漂,商品數(shù)量:2
消費者Thread-1已消費呢灶,剩余商品數(shù)量:1
消費者Thread-2已消費,剩余商品數(shù)量:0
生產者Thread-0已生產完成钉嘹,商品數(shù)量:1
生產者Thread-3已生產完成鸯乃,商品數(shù)量:2
消費者Thread-2已消費,剩余商品數(shù)量:1
消費者Thread-1已消費跋涣,剩余商品數(shù)量:0
生產者Thread-3已生產完成缨睡,商品數(shù)量:1
生產者Thread-0已生產完成,商品數(shù)量:2
消費者Thread-2已消費陈辱,剩余商品數(shù)量:1
消費者Thread-1已消費宏蛉,剩余商品數(shù)量:0
BlockingQueue阻塞隊列方法
BlockingQueue實現(xiàn)主要用于生產者-使用者隊列,但它另外還支持Collection接口性置。是線程安全的,所有排隊方法都可以使用內部鎖或其他形式的并發(fā)控制來自動達到它們的目的揍堰。
用于阻塞的兩個方法:
- put()方法:將指定元素插入此隊列中鹏浅,將等待可用的空間(如果有必要)。
- take()方法:獲取并移除此隊列的頭部屏歹,在指定的等待時間前等待可用的元素(如果有必要)隐砸。
public class Test {
private static Integer count = 0;
final BlockingQueue<Integer> bq = new ArrayBlockingQueue<Integer>(5);// 容量為5的阻塞隊列
public static void main(String[] args) {
Test t = new Test();
new Thread(t.new Producer()).start();
new Thread(t.new Consumer()).start();
new Thread(t.new Consumer()).start();
new Thread(t.new Producer()).start();
}
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
try {
bq.put(1);
count++;
System.out.println("生產者" + Thread.currentThread().getName()
+ "已生產完成,商品數(shù)量:" + count);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
try {
bq.take();
count--;
System.out.println("消費者" + Thread.currentThread().getName()
+ "已消費蝙眶,剩余商品數(shù)量:" + count);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
運行結果:
生產者Thread-0已生產完成季希,商品數(shù)量:0
消費者Thread-1已消費,剩余商品數(shù)量:0
生產者Thread-3已生產完成幽纷,商品數(shù)量:0
消費者Thread-2已消費式塌,剩余商品數(shù)量:0
生產者Thread-3已生產完成,商品數(shù)量:1
消費者Thread-2已消費友浸,剩余商品數(shù)量:1
生產者Thread-0已生產完成峰尝,商品數(shù)量:2
消費者Thread-1已消費,剩余商品數(shù)量:0
生產者Thread-0已生產完成收恢,商品數(shù)量:2
消費者Thread-1已消費武学,剩余商品數(shù)量:0
生產者Thread-3已生產完成祭往,商品數(shù)量:2
消費者Thread-2已消費,剩余商品數(shù)量:1
生產者Thread-0已生產完成火窒,商品數(shù)量:1
消費者Thread-1已消費硼补,剩余商品數(shù)量:1
生產者Thread-3已生產完成,商品數(shù)量:2
消費者Thread-2已消費熏矿,剩余商品數(shù)量:0
生產者Thread-0已生產完成已骇,商品數(shù)量:0
消費者Thread-2已消費,剩余商品數(shù)量:0
生產者Thread-3已生產完成曲掰,商品數(shù)量:1
消費者Thread-1已消費疾捍,剩余商品數(shù)量:0
Semaphore方法實現(xiàn)同步
信號量(Semaphore)維護了一個許可集。在許可可用前會阻塞每一個 acquire()栏妖,然后再獲取該許可乱豆。每個release()添加一個許可,從而可能釋放一個正在阻塞的獲取者吊趾。但是宛裕,不使用實際的許可對象,Semaphore只對可用許可的號碼進行計數(shù)论泛,并采取相應的行動揩尸。Semaphore通常用于限制可以訪問某些資源(物理或邏輯的)的線程數(shù)目。
注意屁奏,調用acquire()時無法保持同步鎖岩榆,因為這會阻止將項返回到池中。信號量封裝所需的同步坟瓢,以限制對池的訪問勇边,這同維持該池本身一致性所需的同步是分開的。
public class Test {
int count = 0;
final Semaphore put = new Semaphore(5);// 初始令牌個數(shù)
final Semaphore get = new Semaphore(0);
final Semaphore mutex = new Semaphore(1);
public static void main(String[] args) {
Test t = new Test();
new Thread(t.new Producer()).start();
new Thread(t.new Consumer()).start();
new Thread(t.new Consumer()).start();
new Thread(t.new Producer()).start();
}
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
try {
put.acquire();// 注意順序
mutex.acquire();
count++;
System.out.println("生產者" + Thread.currentThread().getName()
+ "已生產完成折联,商品數(shù)量:" + count);
} catch (Exception e) {
e.printStackTrace();
} finally {
mutex.release();
get.release();
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
try {
get.acquire();// 注意順序
mutex.acquire();
count--;
System.out.println("消費者" + Thread.currentThread().getName()
+ "已消費粒褒,剩余商品數(shù)量:" + count);
} catch (Exception e) {
e.printStackTrace();
} finally {
mutex.release();
put.release();
}
}
}
}
}
運行結果:
生產者Thread-0已生產完成,商品數(shù)量:1
消費者Thread-2已消費诚镰,剩余商品數(shù)量:0
生產者Thread-3已生產完成奕坟,商品數(shù)量:1
消費者Thread-1已消費,剩余商品數(shù)量:0
生產者Thread-0已生產完成清笨,商品數(shù)量:1
生產者Thread-3已生產完成月杉,商品數(shù)量:2
消費者Thread-2已消費,剩余商品數(shù)量:1
消費者Thread-1已消費抠艾,剩余商品數(shù)量:0
生產者Thread-0已生產完成沙合,商品數(shù)量:1
生產者Thread-3已生產完成,商品數(shù)量:2
消費者Thread-2已消費,剩余商品數(shù)量:1
消費者Thread-1已消費首懈,剩余商品數(shù)量:0
生產者Thread-0已生產完成绊率,商品數(shù)量:1
生產者Thread-3已生產完成,商品數(shù)量:2
消費者Thread-2已消費究履,剩余商品數(shù)量:1
消費者Thread-1已消費滤否,剩余商品數(shù)量:0
生產者Thread-0已生產完成,商品數(shù)量:1
生產者Thread-3已生產完成最仑,商品數(shù)量:2
消費者Thread-2已消費藐俺,剩余商品數(shù)量:1
消費者Thread-1已消費,剩余商品數(shù)量:0
PipedInputStream / PipedOutputStream
這個類位于java.io包中泥彤,是解決同步問題的最簡單的辦法欲芹,一個線程將數(shù)據(jù)寫入管道,另一個線程從管道讀取數(shù)據(jù)吟吝,這樣便構成了一種生產者/消費者的緩沖區(qū)編程模式菱父。PipedInputStream/PipedOutputStream只能用于多線程模式,用于單線程下可能會引發(fā)死鎖剑逃。
public class Test {
final PipedInputStream pis = new PipedInputStream();
final PipedOutputStream pos = new PipedOutputStream();
public static void main(String[] args) {
Test t = new Test();
new Thread(t.new Producer()).start();
new Thread(t.new Consumer()).start();
}
class Producer implements Runnable {
@Override
public void run() {
try {
pis.connect(pos);
} catch (IOException e) {
e.printStackTrace();
}
try {
while (true) { // 不斷的產生數(shù)據(jù)
int n = (int) (Math.random() * 255);
System.out.println("生產者" + Thread.currentThread().getName()
+ "已生產完成浙宜,商品數(shù)量:" + n);
pos.write(n);
pos.flush();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
pis.close();
pos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
int n;
try {
while (true) {
n = pis.read();
System.out.println("消費者" + Thread.currentThread().getName()
+ "已消費,剩余商品數(shù)量:" + n);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
pis.close();
pos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
運行結果:
生產者Thread-0已生產完成蛹磺,商品數(shù)量:6
生產者Thread-0已生產完成粟瞬,商品數(shù)量:158
生產者Thread-0已生產完成,商品數(shù)量:79
生產者Thread-0已生產完成萤捆,商品數(shù)量:119
生產者Thread-0已生產完成裙品,商品數(shù)量:93
生產者Thread-0已生產完成,商品數(shù)量:213
生產者Thread-0已生產完成俗或,商品數(shù)量:151
生產者Thread-0已生產完成市怎,商品數(shù)量:101
生產者Thread-0已生產完成,商品數(shù)量:125
生產者Thread-0已生產完成蕴侣,商品數(shù)量:109
生產者Thread-0已生產完成,商品數(shù)量:67
生產者Thread-0已生產完成臭觉,商品數(shù)量:109
生產者Thread-0已生產完成昆雀,商品數(shù)量:132
生產者Thread-0已生產完成,商品數(shù)量:139