前言
??JDK中為了處理線程之間的同步問(wèn)題,除了提供鎖機(jī)制之外,還提供了幾個(gè)非常有用的并發(fā)工具類:CountDownLatch说莫、CyclicBarrier术羔、Semphore赢赊、Exchanger、Phaser级历;
??CountDownLatch释移、CyclicBarrier、Semphore寥殖、Phaser 這四個(gè)工具類提供一種并發(fā)流程的控制手段玩讳;而Exchanger工具類則提供了在線程之間交換數(shù)據(jù)的一種手段。
簡(jiǎn)介
Semaphore(信號(hào)量)是用來(lái)控制同時(shí)訪問(wèn)特定資源的線程數(shù)量嚼贡,它通過(guò)協(xié)調(diào)各個(gè)線程锋边,以保證合理的使用公共資源。
很多年以來(lái)编曼,我都覺(jué)得從字面上很難理解Semaphore所表達(dá)的含義,只能把它比作是控制流量的紅綠燈剩辟,比如XX馬路要限制流量掐场,只允許同時(shí)有一百輛車在這條路上行使,其他的都必須在路口等待贩猎,所以前一百輛車會(huì)看到綠燈熊户,可以開(kāi)進(jìn)這條馬路,后面的車會(huì)看到紅燈吭服,不能駛?cè)隭X馬路嚷堡,但是如果前一百輛中有五輛車已經(jīng)離開(kāi)了XX馬路,那么后面就允許有5輛車駛?cè)腭R路艇棕,這個(gè)例子里說(shuō)的車就是線程蝌戒,駛?cè)腭R路就表示線程在執(zhí)行,離開(kāi)馬路就表示線程執(zhí)行完成沼琉,看見(jiàn)紅燈就表示線程被阻塞北苟,不能執(zhí)行。
應(yīng)用場(chǎng)景
??Semaphore可以用于做
流量控制
打瘪,特別公用資源有限的應(yīng)用場(chǎng)景友鼻,比如數(shù)據(jù)庫(kù)連接。假如有一個(gè)需求闺骚,要讀取幾萬(wàn)個(gè)文件的數(shù)據(jù)彩扔,因?yàn)槎际荌O密集型任務(wù),我們可以啟動(dòng)幾十個(gè)線程并發(fā)的讀取僻爽,但是如果讀到內(nèi)存后虫碉,還需要存儲(chǔ)到數(shù)據(jù)庫(kù)中,而數(shù)據(jù)庫(kù)的連接數(shù)只有10個(gè)胸梆,這時(shí)我們必須控制只有十個(gè)線程同時(shí)獲取數(shù)據(jù)庫(kù)連接保存數(shù)據(jù)蔗衡,否則會(huì)報(bào)錯(cuò)無(wú)法獲取數(shù)據(jù)庫(kù)連接纤虽。這個(gè)時(shí)候,我們就可以使用Semaphore來(lái)做流控绞惦,代碼如下:
public class SemaphoreTest {
private static final int THREAD_COUNT = 30;
private static ExecutorService threadPool = Executors
.newFixedThreadPool(THREAD_COUNT);
private static Semaphore s = new Semaphore(10);
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
s.acquire();
System.out.println("save data");
s.release();
} catch (InterruptedException e) {
}
}
});
}
threadPool.shutdown();
}
}
在代碼中逼纸,雖然有30個(gè)線程在執(zhí)行,但是只允許10個(gè)并發(fā)的執(zhí)行济蝉。Semaphore的構(gòu)造方法Semaphore(int permits) 接受一個(gè)整型的數(shù)字杰刽,表示可用的許可證數(shù)量。Semaphore(10)表示允許10個(gè)線程獲取許可證王滤,也就是最大并發(fā)數(shù)是10贺嫂。Semaphore的用法也很簡(jiǎn)單,首先線程使用Semaphore的acquire()獲取一個(gè)許可證雁乡,使用完之后調(diào)用release()歸還許可證第喳。還可以用tryAcquire()方法嘗試獲取許可證。
Semphore的方法摘要
1踱稍、獲取許可
??API中提供了多種的方式獲取鎖:
可以獲取一個(gè)曲饱、多個(gè)許可;
提供阻塞珠月、非阻塞扩淀、超時(shí)的方式獲取許可;
除了可中斷啤挎、還提供一個(gè)非中斷的方式獲取鎖驻谆;
public void acquire() throws InterruptedException
從此信號(hào)量獲取一個(gè)許可,在提供一個(gè)許可前一直將線程阻塞庆聘,否則線程被
中斷胜臊。
public void acquire(int permits) throws InterruptedException
獲取多個(gè)許可。
從此信號(hào)量獲取給定數(shù)目的許可伙判,在提供這些許可前一直將線程阻塞区端,或者線程已被
中斷。
public void acquireUninterruptibly()
從此信號(hào)量中獲取許可澳腹,在有可用的許可前將其阻塞织盼。
不可中斷。
public void acquireUninterruptibly(int permits)
獲取多個(gè)許可酱塔。
從此信號(hào)量獲取給定數(shù)目的許可沥邻,在提供這些許可前一直將線程阻塞。
不可中斷羊娃。
public boolean tryAcquire()
僅在調(diào)用時(shí)此信號(hào)量存在一個(gè)可用許可唐全,才從信號(hào)量獲取許可。
非阻塞的方式嘗試獲取許可。
public boolean tryAcquire(int permits)
僅在調(diào)用時(shí)此信號(hào)量中有給定數(shù)目的許可時(shí)邮利,才從此信號(hào)量中獲取這些許可弥雹。
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException
如果在給定的等待時(shí)間內(nèi)此信號(hào)量有可用的所有許可,并且當(dāng)前線程未被中斷延届,則從此信號(hào)量獲取給定數(shù)目的許可剪勿。
超時(shí)等待獲取許可
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException
如果在給定的等待時(shí)間內(nèi),此信號(hào)量有可用的許可并且當(dāng)前線程未被中斷方庭,則從此信號(hào)量獲取一個(gè)許可厕吉。
2、許可的釋放
public void release( ):
釋放一個(gè)許可械念,將其返回給信號(hào)量头朱。
public void release(int permits)
釋放給定數(shù)目的許可,將其返回到信號(hào)量龄减。
3项钮、提供的監(jiān)控方法
public int availablePermits( )
返回此信號(hào)量中當(dāng)前可用的許可數(shù)
public int drainPermits()
獲取并返回立即可用的所有許可
public final int getQueueLength()
返回正在等待獲取的線程的估計(jì)數(shù)目。該值僅是估計(jì)的數(shù)字希停,因?yàn)樵诖朔椒ū闅v內(nèi)部數(shù)據(jù)結(jié)構(gòu)的同時(shí)烁巫,線程的數(shù)目可能動(dòng)態(tài)地變化。此方法用于監(jiān)視系統(tǒng)狀態(tài)脖苏,不用于同步控制。
public final boolean hasQueuedThreads()
查詢是否有線程正在等待獲取定踱。
public boolean isFair()
如果此信號(hào)量的公平設(shè)置為 true棍潘,則返回 true再沧。
protected 方法:
protected Collection
返回一個(gè) collection欧芽,包含可能等待獲取的線程搀绣。因?yàn)樵跇?gòu)造此結(jié)果的同時(shí)實(shí)際的線程 set 可能動(dòng)態(tài)地變化僚饭,所以返回的 collection 僅是盡力的估計(jì)值歼指。所返回 collection 中的元素沒(méi)有特定的順序蒙保。
protected void reducePermits(int reduction)
根據(jù)指定的縮減量減小可用許可的數(shù)目鸟妙。此方法在使用信號(hào)量來(lái)跟蹤那些變?yōu)椴豢捎觅Y源的子類中很有用
@ Example 獲取扬绪、釋放多個(gè)許可
try {
Semaphore semaphore = new Semaphore(5);
//獲取一個(gè)許可
semaphore.acquire();
//一次性獲取4個(gè)許可
semaphore.acquire(4);
System.out.println("Semaphore 剩下的許可數(shù)量:"+semaphore.availablePermits());
//一次性釋放5個(gè)許可
semaphore.release(5);
System.out.println("Semaphore 剩下的許可數(shù)量:"+semaphore.availablePermits());
//再釋放5個(gè)許可
semaphore.release();
semaphore.release();
semaphore.release(3);
System.out.println("Semaphore 剩下的許可數(shù)量:"+semaphore.availablePermits());
} catch (InterruptedException e) {
e.printStackTrace();
運(yùn)行結(jié)果:
Semaphore 剩下的許可數(shù)量:0
Semaphore 剩下的許可數(shù)量:5
Semaphore 剩下的許可數(shù)量:10
從上面的運(yùn)行結(jié)果可以看出荠呐,
構(gòu)造方法的 new Semaphore(5)中參數(shù)5并不是最終的許可數(shù)量赛蔫,可以通過(guò)release()方法增加許可數(shù)量。
本人測(cè)試用例
package com.wxx.demo;
import com.wxx.demo.util.IdUtiles;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
@RunWith(SpringRunner.class)
//@SpringBootTest(classes = LeisureWebApplication.class)
public class TaskTest {
@Test
public void taskTest(){
Runnable task = new Runnable() {
int count = 0;
@Override
public void run() {
count ++;
try{
String id = IdUtiles.creatId();
System.out.println(id);
}catch (InterruptedException e){
e.printStackTrace();
}
System.out.println(count);
System.out.println("Thread : " + Thread.currentThread().getId());
try {
Thread.sleep(1000);
}catch (InterruptedException e){
e.printStackTrace();
}
}
};
double executeTime = this.executeTime(100, task);
System.out.println("執(zhí)行時(shí)間: " + executeTime);
}
private double executeTime(int taskCount,Runnable task){
CountDownLatch start = new CountDownLatch(1);
CountDownLatch end = new CountDownLatch(taskCount);
for (int i = 0; i < taskCount ; i++) {
Thread thread = new Thread() {
public void run(){
try {
start.await();
try {
task.run();
}finally {
end.countDown();
}
}catch (InterruptedException e){
e.printStackTrace();
}
}
};
thread.start();
}
long startTime = System.nanoTime();
//開(kāi)啟開(kāi)關(guān)
start.countDown();
long endTime = System.nanoTime();
return endTime - startTime;
}
}
package com.wxx.demo.util;
import java.text.SimpleDateFormat;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
@Author : leisure
-
@Date : 2019/1/17
*/
public class IdUtiles {private static String lead = "leisure";
private static int Guid = 100;
private static Semaphore semaphore = new Semaphore(5,false);
/**- 創(chuàng)建以字符串打頭結(jié)尾自增的唯一id
- @return
*/
public static synchronized String creatId() throws InterruptedException{
//測(cè)試控制方法內(nèi)的并發(fā)線程數(shù) 測(cè)試放開(kāi)synchronized
//semaphore.acquire();
semaphore.tryAcquire(1,1000, TimeUnit.MILLISECONDS);
int i = semaphore.availablePermits();
System.out.println("當(dāng)前可用許可" + i);
int i1 = semaphore.drainPermits();
System.out.println("當(dāng)前立即可用的許可" + i1);
boolean b = semaphore.hasQueuedThreads();
System.out.println("當(dāng)前是否有線程等待" + b);
boolean fair = semaphore.isFair();
System.out.println("當(dāng)前信號(hào)是否公平" + fair);
long l = System.currentTimeMillis();
int queueLength = semaphore.getQueueLength();
System.out.println("等待線程數(shù)" + queueLength);
Thread.sleep(100);
Guid += 1;
String format = new SimpleDateFormat("yyyy").format(l);
if (Guid > 999){
Guid = 100;
}
String id = lead + format + l + Guid;
semaphore.release();
return id;
}