Exchanger(交換機(jī))
交換機(jī)(Exchanger)主要用于線程之間數(shù)據(jù)交換的工具,它提供一個(gè)同步點(diǎn),在這個(gè)同步點(diǎn)兩個(gè)線程可以交換彼此的數(shù)據(jù).如果第一個(gè)線程先執(zhí)行exchange
方法,它會(huì)等待第二個(gè)線程也執(zhí)行exchange
方法.當(dāng)兩個(gè)線程都到達(dá)同步點(diǎn)時(shí),這個(gè)兩個(gè)線程就可以交換數(shù)據(jù).如下圖所示:
例如生活中A和B約定地點(diǎn)進(jìn)行商品交易,A是買家B是賣家.我們可以使用Exchanger來(lái)描述兩個(gè)人的交易過(guò)程.
public class App11 {
public static void main(String[] args) {
//約定交易地點(diǎn)
Exchanger<String> exchanger = new Exchanger<>();
//買家?guī)?00塊錢去買辣條
Thread buyer = new Thread(new Buyer("500元",exchanger));
//賣家?guī)Ю睏l去賣
Thread seller = new Thread(new Seller("500根辣條",exchanger));
//買家和賣家出發(fā)去交易
buyer.start();
seller.start();
}
}
/**
* 買家
*/
class Buyer implements Runnable{
private String amount;
private Exchanger<String> exchanger;
public Buyer(String amount, Exchanger<String> exchanger) {
this.amount = amount;
this.exchanger = exchanger;
}
@Override
public void run() {
try {
System.out.println("買家去交易地點(diǎn)");
//模擬去交易地點(diǎn)耗時(shí)
Thread.sleep(2000);
//到了交易地點(diǎn)等賣家
String goods = exchanger.exchange(amount);
//買到東西
System.out.println(String.format("買家買到[%s]回家",goods));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 賣家
*/
class Seller implements Runnable{
private String goods;
private Exchanger<String> exchanger;
public Seller(String goods, Exchanger<String> exchanger) {
this.goods = goods;
this.exchanger = exchanger;
}
@Override
public void run() {
try {
System.out.println("賣家去交易地點(diǎn)");
//模擬去交易地點(diǎn)
Thread.sleep(5000);
//到了交易地點(diǎn)等待買家
String data = exchanger.exchange(goods);
//賣出了東西
System.out.println(String.format("賣家賣了[%s]錢回家",data));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
最后的打印結(jié)果如下:
賣家去交易地點(diǎn)
買家去交易地點(diǎn)
買家買到[500根辣條]回家
賣家賣了[500元]錢回家
上面例子中,買家和賣家調(diào)用exchange沒(méi)有設(shè)置超時(shí)時(shí)間.如果當(dāng)前等待的人一直沒(méi)有等待到對(duì)方,當(dāng)前的人將會(huì)一直等待下去.所以exchange為我們提供了帶有超時(shí)時(shí)間參數(shù)的方法,通過(guò)超時(shí)時(shí)間將會(huì)通過(guò)拋出TimeoutException
中斷線程的等待.下面我們模擬買家中途錢被小偷偷了,然后賣家等了時(shí)間久了就放棄交易回去了.下面修改買家和賣家的代碼如下:
/**
* 買家
*/
class Buyer implements Runnable{
private String amount;
private Exchanger<String> exchanger;
public Buyer(String amount, Exchanger<String> exchanger) {
this.amount = amount;
this.exchanger = exchanger;
}
@Override
public void run() {
try {
System.out.println("買家去交易地點(diǎn)");
//模擬去交易地點(diǎn)耗時(shí)
Thread.sleep(2000);
//模擬中途可能遇到小偷錢被偷走
Random random = new Random();
int i = random.nextInt(10);
if (i % 2 == 0){
throw new RuntimeException("買家中途遇見(jiàn)小偷,錢丟了");
}
//到了交易地點(diǎn)等賣家
String goods = exchanger.exchange(amount);
//買到東西
System.out.println(String.format("買家買到[%s]回家",goods));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 賣家
*/
class Seller implements Runnable{
private String goods;
private Exchanger<String> exchanger;
public Seller(String goods, Exchanger<String> exchanger) {
this.goods = goods;
this.exchanger = exchanger;
}
@Override
public void run() {
try {
System.out.println("賣家去交易地點(diǎn)");
//模擬去交易地點(diǎn)
Thread.sleep(5000);
//到了交易地點(diǎn)等待買家
String data = exchanger.exchange(goods,2,TimeUnit.SECONDS);
//賣出了東西
System.out.println(String.format("賣家賣了[%s]錢回家",data));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (TimeoutException e) {
System.out.println("賣家等了太久了,不賣了回家");
e.printStackTrace();
}
}
}
多次執(zhí)行代碼,當(dāng)買家的錢被小偷偷了,打印的結(jié)果語(yǔ)句如下:
賣家去交易地點(diǎn)
買家去交易地點(diǎn)
Exception in thread "Thread-0" java.lang.RuntimeException: 買家中途遇見(jiàn)小偷,錢丟了
at com.buydeem.Buyer.run(App11.java:49)
at java.lang.Thread.run(Thread.java:745)
java.util.concurrent.TimeoutException
賣家等了太久了,不賣了回家
at java.util.concurrent.Exchanger.exchange(Exchanger.java:626)
at com.buydeem.Seller.run(App11.java:80)
at java.lang.Thread.run(Thread.java:745)
Semaphore (信號(hào)量)
Semaphore
通常我們叫它信號(hào)量
,可以用來(lái)控制同時(shí)訪問(wèn)特定資源的線程數(shù).它提供acquire
方法用來(lái)獲取許可,在沒(méi)有足夠的許可之前將調(diào)用該方法的線程將一直阻塞知道有可用的許可.同時(shí)還提供release
方法用來(lái)添加許可,用來(lái)釋放一個(gè)正在阻塞的獲取者.
它的使用場(chǎng)景類似于我們平常去窗口買票.如果現(xiàn)在有10個(gè)人去買票.可是窗口只有三個(gè).也就是說(shuō)最多同時(shí)容納三個(gè)人同時(shí)買票,而其他的人必須等待當(dāng)前窗口買票的人離開(kāi)了才能去窗口買票.這個(gè)例子中的三個(gè)窗口就可以理解為信號(hào)量初始的許可個(gè)數(shù)為3
,而去買票的用戶理解為線程
.我們使用下面的代碼來(lái)描述整個(gè)過(guò)程:
public class App12 {
public static void main(String[] args) throws InterruptedException {
//創(chuàng)建買票的窗口
Semaphore semaphore = new Semaphore(3);
//創(chuàng)建乘客買票
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
service.execute(new Passenger("乘客"+i+"號(hào)",semaphore));
}
service.shutdown();
}
}
/**
* 乘客
*/
class Passenger implements Runnable{
private String name;
private Semaphore semaphore;
public Passenger(String name, Semaphore semaphore) {
this.name = name;
this.semaphore = semaphore;
}
@Override
public void run() {
boolean success = false;
try {
//等待空余窗口買票
semaphore.acquire();
success = true;
System.out.println(String.format("[%s]開(kāi)始買票",name));
//模擬當(dāng)前用戶買票耗時(shí)
Random random = new Random();
int second = random.nextInt(4) + 1;
Thread.sleep(second * 1000);
//當(dāng)前用戶買票完成
System.out.println(String.format("[%s]買票結(jié)束",name));
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
if (success){
semaphore.release();
}
}
}
}
最后的打印結(jié)果如下:
[乘客1號(hào)]開(kāi)始買票
[乘客2號(hào)]開(kāi)始買票
[乘客0號(hào)]開(kāi)始買票
[乘客0號(hào)]買票結(jié)束
[乘客3號(hào)]開(kāi)始買票
[乘客1號(hào)]買票結(jié)束
[乘客4號(hào)]開(kāi)始買票
[乘客2號(hào)]買票結(jié)束
[乘客5號(hào)]開(kāi)始買票
[乘客5號(hào)]買票結(jié)束
[乘客6號(hào)]開(kāi)始買票
[乘客3號(hào)]買票結(jié)束
[乘客7號(hào)]開(kāi)始買票
[乘客4號(hào)]買票結(jié)束
[乘客8號(hào)]開(kāi)始買票
[乘客6號(hào)]買票結(jié)束
[乘客9號(hào)]開(kāi)始買票
[乘客9號(hào)]買票結(jié)束
[乘客8號(hào)]買票結(jié)束
[乘客7號(hào)]買票結(jié)束
acquire,acquireUninterruptibly和tryAcquire
這三個(gè)方法都是用來(lái)獲取許可的,但是它們之間還是有部分不同之處.
方法 | 是否阻塞 | 是否響應(yīng)中斷 |
---|---|---|
acquire | 是 | 是 |
acquireUninterruptibly | 是 | 否 |
tryAcquire | 否 | 否 |
tryAcquire帶超時(shí)時(shí)間 | 是 | 是 |
獲取許可的方法基本上就是上面這幾種,同時(shí)每次獲取許可時(shí)還可以指定獲取的個(gè)數(shù).上面的所指的是否會(huì)響應(yīng)中斷指的是當(dāng)線程因?yàn)闊o(wú)法獲取許可而阻塞,該調(diào)用該線程的**interrupt**將會(huì)拋出**InterruptedException**
.
relase
該方法用來(lái)釋放許可,將其返回到信號(hào)量.同時(shí)該方法也提供了釋放許可數(shù)量的參數(shù),可以一次釋放多個(gè)信號(hào)量.
公平與非公平
在構(gòu)建信號(hào)量時(shí)可以在構(gòu)造方法中傳入true
設(shè)置該信號(hào)量為公平模式.該公平性是針對(duì)獲取許可的線程來(lái)說(shuō)的.該模式能保證對(duì)于任何調(diào)用獲取
相關(guān)方法的線程FIFO
.需要注意的是可能線程A先于線程B調(diào)用了acquire
,但是A線程卻在B線程之后到達(dá)排序點(diǎn),這將導(dǎo)致看上去不公平
.還有一點(diǎn)值得注意的就是公平設(shè)置對(duì)于tryAcquire
非阻塞方法無(wú)效.