本文主要介紹的是Java 并發(fā)編程的里面幾個工具類: CountDownLatch, CyclicBarrier, Semaphore, Exchanger, 分析以及使用介紹。
(1) CountDownLatch 類用一個繼承了AQS 抽象類作為內(nèi)部類,實現(xiàn)了讓一個線程或者多個線程等待到達到某一個條件狸页,源碼里面使用State 來記錄考廉,當state 等于 0 時掏膏,所有等待線程都被釋放拜轨。
A code CountDownLatch is a versatile synchronization tool and can be used for a number of purposes. A
CountDownLatch initialized with a count of one serves as a simple on/off latch, or gate: all threads invoking await wait at the gate until it is opened by a thread invoking countDown. A CountDownLatch initialized to N can be used to make one thread wait until N threads have completed some action, or some action has been completed N times.
Example :
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
*
* @author Eric
*
*/
public class CountDownLatchUsage {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(2);
new Thread(new Runnable() {
@Override
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("This is the thread one");
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("This is the thread Two");
}
}).start();
System.out.println("Going to release the latch");
latch.countDown();//state = 1
TimeUnit.SECONDS.sleep(10);
latch.countDown();//state = 0
System.out.println("The end");
}
}
(2) CyclicBarrier 用于一些線程等待其他線程到達同一個柵欄點盯拱,最后一個線程到達之前弥喉,所有的線程都被阻塞郁竟,而且所有等待線程釋放后,CyclicBarrier 可以被復用由境。
Example :
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierUsage {
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
CyclicBarrier barrier = new CyclicBarrier(2, new Runnable() {
@Override
public void run() {
System.out.println("Let's go!!!");
}
});
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("Thread 1 is waiting");
try {
barrier.await();
System.out.println("Thread 1 is running");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("Thread 2 is waiting");
try {
barrier.await();
System.out.println("Thread 2 is running");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}).start();
while (Thread.activeCount() > 1) {
Thread.yield();
}
}
}
(3) Semaphore 用來維護一系列許可證棚亩,經(jīng)常用來限制線程的數(shù)目蓖议,用于資源的訪問,同樣在內(nèi)部有一個繼承了AQS的Sync讥蟆,用于公平NonfairSync和FairSync,兩者的區(qū)別是
公平鎖(首先檢查等待隊列中是否已有線程在等待)
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
非公平鎖: 直接檢查是否能獲取許可勒虾,可以的話直接運行,否者進入等待隊列瘸彤。
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
Example :
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreUsage {
public static void main(String[] args) {
Semaphore sem = new Semaphore(2, true);
new Thread(new Runnable(){
@Override
public void run() {
try {
sem.acquire(1);
System.out.println("This is Thread 1, get the permit");
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
sem.release();
}
System.out.println("This is Thread 1");
}
}).start();
new Thread(new Runnable(){
@Override
public void run() {
try {
sem.acquire(1);
System.out.println("This is Thread 2, get the permit");
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
sem.release();
}
System.out.println("This is Thread 2");
}
}).start();
new Thread(new Runnable(){
@Override
public void run() {
try {
sem.acquire(1);
System.out.println("This is Thread 3, get the permit");
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
sem.release();
}
System.out.println("This is Thread 3");
}
}).start();
new Thread(new Runnable(){
@Override
public void run() {
try {
sem.acquire(1);
System.out.println("This is Thread 4, get the permit");
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
sem.release();
}
System.out.println("This is Thread 4");
}
}).start();
new Thread(new Runnable(){
@Override
public void run() {
try {
sem.acquire(1);
System.out.println("This is Thread 5, get the permit");
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
sem.release();
}
System.out.println("This is Thread 5");
}
}).start();
new Thread(new Runnable(){
@Override
public void run() {
try {
sem.acquire(1);
System.out.println("This is Thread 6, get the permit");
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
sem.release();
}
System.out.println("This is Thread 6");
}
}).start();
}
}
(4) Exchanger: 用來實現(xiàn)線程之間在同步點交換數(shù)據(jù)修然,用于基因算法以及管道的設計。
A synchronization point at which threads can pair and swap elements
within pairs. Each thread presents some object on entry to the
{@link #exchange exchange} method, matches with a partner thread,
and receives its partner's object on return. An Exchanger may be
viewed as a bidirectional form of a {@link SynchronousQueue}.
Exchangers may be useful in applications such as genetic algorithms
and pipeline designs.
核心算法:
for (;;) {
if (slot is empty) { // offer
place item in a Node;
if (can CAS slot from empty to node) {
wait for release;
return matching item in node;
}
}
else if (can CAS slot from node to empty) { // release
get the item in node;
set matching item in node;
release waiting thread;
}
// else retry on CAS failure
}
Example :
class FillAndEmpty {
Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
DataBuffer initialEmptyBuffer = ... a made-up type
DataBuffer initialFullBuffer = ...
class FillingLoop implements Runnable {
public void run() {
DataBuffer currentBuffer = initialEmptyBuffer;
try {
while (currentBuffer != null) {
addToBuffer(currentBuffer);
if (currentBuffer.isFull())
currentBuffer = exchanger.exchange(currentBuffer);
}
} catch (InterruptedException ex) { ... handle ... }
}
}
class EmptyingLoop implements Runnable {
public void run() {
DataBuffer currentBuffer = initialFullBuffer;
try {
while (currentBuffer != null) {
takeFromBuffer(currentBuffer);
if (currentBuffer.isEmpty())
currentBuffer = exchanger.exchange(currentBuffer);
}
} catch (InterruptedException ex) { ... handle ...}
}
}
void start() {
new Thread(new FillingLoop()).start();
new Thread(new EmptyingLoop()).start();
}
}}
Exception one(會一直waiting,找找原因):
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class ExchangerUsage {
private static Exchanger<String> data = new Exchanger<String>();
public static void main(String[] args) {
ExchangerServer server = new ExchangerServer();
ExchangerClient client = new ExchangerClient();
server.run();
client.run();
}
private static class ExchangerServer implements Runnable{
@Override
public void run() {
String A = "A";
try {
try {
System.out.println("Data from client" + data.exchange(A,5,TimeUnit.SECONDS));
} catch (TimeoutException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
private static class ExchangerClient implements Runnable{
@Override
public void run() {
String B = "B";
try {
try {
System.out.println("Data from client" + data.exchange(B,5,TimeUnit.SECONDS));
} catch (TimeoutException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}