本文參考至《Java7并發(fā)編程實戰(zhàn)手冊》
Exchanger類允許在兩個線程之間定義同步點,當兩個線程都到達同步點時,它們交換數(shù)據(jù)。也就是第一個線程的數(shù)據(jù)進入到第二個線程中,第二線程的數(shù)據(jù)進入到第一個線程中事期。
這里有一個生產(chǎn)者和消費者的例子:生產(chǎn)者每次都會生產(chǎn)10個數(shù)據(jù),這10個數(shù)據(jù)放在一個籮筐(List)里面僧免,Exchanger會在:生產(chǎn)者生產(chǎn)完10個數(shù)據(jù)并且消費者消費完10個數(shù)據(jù)的時候刑赶,將兩者的籮筐換過來。這樣生產(chǎn)者的籮筐里面就沒有數(shù)據(jù)(確保生產(chǎn)者生產(chǎn)的東西有地方裝)懂衩;消費者的籮筐里面就有10個數(shù)據(jù)(確保消費者有數(shù)據(jù)消費)撞叨。
生產(chǎn)者:
package Exchanger;
import java.util.List;
import java.util.concurrent.Exchanger;
/**
* Producer:生產(chǎn)者
*
* @author JM
* @date 2017-2-28 下午10:33:08
* @since JDK 1.7
*/
public class Producer implements Runnable {
private List<String> buffer;
private final Exchanger<List<String>> exchanger;
public Producer(List<String> buffer, Exchanger<List<String>> exchanger) {
super();
this.buffer = buffer;
this.exchanger = exchanger;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
String message = "Event " + ((i * 10) + j);
System.out.printf("Producer:create---------- %s\n", message);
buffer.add(message);
}
System.out.println("Before exchange Producer:have11111111111@@@@@@@@@@ " + buffer.size());
try {
// 如果這個時候消費者也剛好在這里等待,那么生產(chǎn)者和消費者的數(shù)據(jù)產(chǎn)生對換(否則將等待消費者到達這里再進行數(shù)據(jù)對換)
buffer = exchanger.exchange(buffer);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("After exchange Producer:have222222222222@@@@@@@@@@ " + buffer.size());
}
}
}
消費者:
package Exchanger;
import java.util.List;
import java.util.concurrent.Exchanger;
public class Consumer implements Runnable {
private List<String> buffer;
private final Exchanger<List<String>> exchanger;
public Consumer(List<String> buffer, Exchanger<List<String>> exchanger) {
super();
this.buffer = buffer;
this.exchanger = exchanger;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println("Before exchange Consumer:have111111111******** " + buffer.size());
try {
// 如果這個時候生產(chǎn)者也剛好在這里等待浊洞,那么生產(chǎn)者和消費者的數(shù)據(jù)產(chǎn)生對換(否則將等待生產(chǎn)者到達這里再進行數(shù)據(jù)對換)
buffer = exchanger.exchange(buffer);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("After exchange Consumer:have2222222******** " + buffer.size());
for (int j = 0; j < 10; j++) {
String message = buffer.get(0);
System.out.printf("Consumer:use----------- %s\n", message);
buffer.remove(0);
}
}
}
}
測試類:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Exchanger;
public class Main {
public static void main(String[] args) {
List<String> consumerBuffer = new ArrayList<String>();
List<String> producerBuffer = new ArrayList<String>();
Exchanger<List<String>> exchanger = new Exchanger<List<String>>();
Producer producer = new Producer(producerBuffer, exchanger);
Consumer consumer = new Consumer(consumerBuffer, exchanger);
Thread producerThread = new Thread(producer);
Thread consumerThread = new Thread(consumer);
producerThread.start();
consumerThread.start();
}
}