一.BlockingQueue
在Concurrent包中,BlockingQueue很好的解決了多線程中子巾,如何高效安全“傳輸”數(shù)據(jù)的問題帆赢。通過這些高效并且線程安全的隊列類,為我們快速搭建高質(zhì)量的多線程程序帶來極大的便利线梗。本文詳細(xì)介紹了BlockingQueue家庭中的所有成員椰于,包括他們各自的功能以及常見使用場景。
BlockingQueue 的核心方法
1.放入數(shù)據(jù)
add(E): boolean
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
將指定的元素插入到此隊列中(如果立即可行且不會違反容量限制)仪搔,在成功時返回 true瘾婿,如果當(dāng)前沒有可用空間,則拋出 IllegalStateException。
offer(E e): boolean
offer(E e, long timeout, TimeUnit unit): boolean
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
//方法內(nèi)部通過 putIndex 索引直接將 元素添加到數(shù)組 items
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;// 當(dāng)putIndex 等于數(shù)組長度時偏陪,將 putIndex 重置為 0
count++;
notEmpty.signal();//喚醒處于等待狀態(tài)下的線程抢呆,表示當(dāng)前隊列中的元素不為空,如果存在消費者線程阻塞,就可以開始取出元素
}
將指定元素插入到此隊列的尾部(如果立即可行且不會超出此隊列的容量)笛谦,在成功時返回 true抱虐,如果此隊列已滿,則返回 false饥脑。當(dāng)使用有容量限制的隊列時恳邀,此方法通常要優(yōu)于 add 方法,后者可能無法插入元素灶轰,而只是拋出一個異常轩娶。
put(E e): boolean
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//這個也是獲得鎖,但 是和lock的區(qū)別是框往,這個方法優(yōu)先允許在等待時由其他線程調(diào) 用等待線程的 interrupt 方法來中斷等待直接返回鳄抒。而 lock 方法是嘗試獲得鎖成功后才響應(yīng)中斷
try {
while (count == items.length)
notFull.await();//隊列滿了的情況下,當(dāng)前 線程將會被 notFull 條件對象掛起加到等待隊列中
enqueue(e);
} finally {
lock.unlock();
}
}
將指定元素插入到此隊列的尾部椰弊,如有必要许溅,則等待空間變得可用。
2.獲取數(shù)據(jù)
poll() 取走BlockingQueue里排在首位的對象
poll(long timeout, TimeUnit unit)同poll ,限制了超時時間
take():取走BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態(tài)直到BlockingQueue有新的數(shù)據(jù)被加入;
drainTo():一次性從BlockingQueue獲取所有可用的數(shù)據(jù)對象(還可以指定獲取數(shù)據(jù)的個數(shù))秉版,通過該方法贤重,可以提升獲取數(shù)據(jù)效率;不需要多次分批加鎖或釋放鎖清焕。
JUC提供的阻塞隊列
隊列 | 介紹 |
---|---|
ArrayBlockingQueue | 數(shù)組實現(xiàn)的有界阻塞隊列, 此隊列按照先進先出(FIFO)的原則 對元素進行排序并蝗。 |
LinkedBlockingQueue | 鏈表實現(xiàn)的有界阻塞隊列, 此隊列的默認(rèn)和最大長度為 Integer.MAX_VALUE。此隊列按照先進先出的原則對元素進行 排序 |
PriorityBlockingQueue | 支持優(yōu)先級排序的無界阻塞隊列, 默認(rèn)情況下元素采取自然順序 升序排列秸妥。也可以自定義類實現(xiàn) compareTo()方法來指定元素 排序規(guī)則滚停,或者初始化 PriorityBlockingQueue 時,指定構(gòu)造 參數(shù) Comparator 來對元素進行排序粥惧。 |
DelayQueue | 優(yōu)先級隊列實現(xiàn)的無界阻塞隊列 |
SynchronousQueue | 不存儲元素的阻塞隊列, 每一個 put 操作必須等待一個 take 操 作键畴,否則不能繼續(xù)添加元素。 |
LinkedTransferQueue | 鏈表實現(xiàn)的無界阻塞隊列 |
LinkedBlockingDeque | 鏈表實現(xiàn)的雙向阻塞隊列 |
使用案例
生產(chǎn)者消費者的實際使用
用戶注冊的時候突雪,在注冊成功以后發(fā)放積分起惕。
主要考慮兩方面的問題:
1.性能。注冊時需要創(chuàng)建用戶和發(fā)放積分咏删,假設(shè)創(chuàng)建需要1s惹想,發(fā)放積分需要1s,注冊過程就會大于2s
2.耦合督函。添加用戶和增加積分嘀粱,可以認(rèn)為是兩個領(lǐng)域激挪,也就是說,增加積分并不是注冊必須要具備的功能草穆,但是一旦增加積分這個邏輯出現(xiàn)異常,就會導(dǎo)致注冊失敗搓译。
public class BlockingQueueTest {
//用線程池是為了熟悉線程池悲柱,無傷大雅
private final ExecutorService addIntegral =
new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
private final ExecutorService createUser =
new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
private ArrayBlockingQueue queue = new ArrayBlockingQueue(10);
private volatile boolean isRunning = true;
{
init();
}
private class User {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
public boolean register() {
createUser.execute(()->{
while (true){
User user = new User();
user.setName("dage");
addUser(user);
// sendPoints(user);
queue.add(user);
}
});
return true;
}
public void addUser(User user) {
System.out.println("添加用戶:" + user.getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void sendPoints(User user) {
System.out.println("發(fā)送積分給指定用戶:" + user);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void init() {
addIntegral.execute(() -> {
while (isRunning) {
try {
User user = (User) queue.take();
sendPoints(user);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
public static void main(String[] args) {
new BlockingQueueTest().register();
}
}
二.JUC中的原子操作類
多線程下可使用的線程安全的類
原子性這個概念,在多線程編程里是一個老生常談的問題些己。 所謂的原子性表示一個或者多個操作豌鸡,要么全部執(zhí)行完,要么一個也不執(zhí)行段标。不能出現(xiàn)成功一部分失敗一部分的情況涯冠。
多線程里面,要實現(xiàn)原子性逼庞,有幾 種方法蛇更,其中一種就是加 Synchronized 同步鎖。而從 JDK1.5 開始赛糟,在 J.U.C 包中提供了 Atomic 包派任,提供了對于常用數(shù)據(jù)結(jié)構(gòu)的原子操作。它提供了簡單璧南、高效掌逛、以及線程安全的更新一個變量的方式。
1. 原子更新基本類型
AtomicBoolean司倚、AtomicInteger豆混、AtomicLong
2. 原子更新數(shù)組
AtomicIntegerArray 、 AtomicLongArray 动知、AtomicReferenceArray
3. 原子更新引用
AtomicReference 皿伺、 AtomicReferenceFieldUpdater 、AtomicMarkableReference(更新帶有標(biāo)記位的引用類 型)
4. 原子更新字段
AtomicIntegerFieldUpdater盒粮、AtomicLongFieldUpdater心傀、 AtomicStampedReference
AtomicInteger 分析
貫徹大部分AtomicInteger 中方法的變量valueOffset,通過 unsafe.objectFieldOffset() 獲取當(dāng)前 Value 這個變量在內(nèi)存中的偏移量拆讯。后續(xù)多個方法中都通過比較valueOffset 和expect 值脂男,然后更新AtomicInteger 的值。
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
public final int getAndDecrement() {
return unsafe.getAndAddInt(this, valueOffset, -1);
}
private AtomicInteger integer= new AtomicInteger(0);
public void atomicIntegerTest(){
while (true){
new Thread(()->{
System.out.println(integer.getAndDecrement());
}).start();
new Thread(()->{
System.out.println(integer.getAndDecrement());
}).start();
new Thread(()->{
System.out.println(integer.getAndDecrement());
}).start();
}
}
public static void main(String[] args) {
new BlockingQueueTest().atomicIntegerTest();;
}