1.Exclusive write / Concurrent read access 互斥讀寫
有時(shí)候我們會對一份數(shù)據(jù)同時(shí)進(jìn)行讀和寫的操作
ReadWriteLock 接口還有他的實(shí)現(xiàn)類ReentrantReadWriteLock 可以讓我們實(shí)現(xiàn)如下場景的功能:
- 可能有任意數(shù)量的同步讀取操作。如果有至少一個(gè)讀取操作獲得允許,那么就不會產(chǎn)生寫入操作奖磁。
- 最多只能有一個(gè)寫操作,如果已經(jīng)有一個(gè)寫操作已經(jīng)被允許那么就不能進(jìn)行讀操作豁辉。
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class Sample {
// Our lock. The constructor allows a "fairness" setting, which guarantees the chronology of lock attributions. protected static final ReadWriteLock RW_LOCK = new ReentrantReadWriteLock();
// This is a typical data that needs to be protected for concurrent access protected static int data = 0;
/**
* This will write to the data, in an exclusive access
*/
public static void writeToData() {
RW_LOCK.writeLock().lock();
try {
data++;
} finally {
RW_LOCK.writeLock().unlock();
}
}
public static int readData() {
RW_LOCK.readLock().lock();
try {
return data;
} finally {
RW_LOCK.readLock().unlock();
}
}
}
備注:如上場景我們應(yīng)該使用AtomicInteger庶橱,但是我們在這邊只是用來舉例夷恍,這個(gè)鎖操作并不關(guān)心這個(gè)數(shù)據(jù)是否是一個(gè)原子類型的變量拱雏。
在讀操作這一邊的鎖是非常有必要的棉安,雖然這個(gè)操作看起來像是針對普通讀操作的。事實(shí)上如果你不在讀文件時(shí)候進(jìn)加鎖铸抑,那么任何操作都有可能會出錯(cuò):
- 基本類型的寫入操作在任何行虛擬機(jī)上都不保證是原子類型的操作贡耽。在寫入一個(gè)64bits 的 long型數(shù)據(jù)最后只會有32bits。
為了更高的性能要求鹊汛,還有一種更快類型的鎖蒲赂,叫做StampedLock ,除此之外還有一些一起繼承樂觀鎖的類型刁憋。這個(gè)所以與ReadWriteLock工作情況區(qū)別很大凳宙。
Producer-Consumer 生產(chǎn)者-消費(fèi)者模型
- 一個(gè)簡單的 Producer-Consumer 問題解決方法。注意 JDK的 類 (AtomicBoolean and BlockingQueue) 是用來同步的职祷,他么不能減少了創(chuàng)建無效方法。有興趣的話可以看一下 BlockingQueue届囚。通過幾種不同的實(shí)現(xiàn)有梆,可能會產(chǎn)生不同的行為。例如e DelayQueue 延遲隊(duì)列 or Priority Queue 優(yōu)先隊(duì)列意系。
public class Producer implements Runnable {
private final BlockingQueue<ProducedData> queue;
public Producer(BlockingQueue<ProducedData> queue) {
this.queue = queue;
}
public void run() {
int producedCount = 0;
try {
while (true) {
producedCount++; //put throws an InterruptedException when the thread is interrupted queue.put(new ProducedData()); } } catch (InterruptedException e) { // the thread has been interrupted: cleanup and exit producedCount--; //re-interrupt the thread in case the interrupt flag is needeed higher up Thread.currentThread().interrupt(); } System.out.println("Produced " + producedCount + " objects"); } }
}
public class Consumer implements Runnable {
private final BlockingQueue<ProducedData> queue;
public Consumer(BlockingQueue<ProducedData> queue) {
this.queue = queue;
}
public void run() {
int consumedCount = 0;
try {
while (true) { //put throws an InterruptedException when the thread is interrupted ProducedData data = queue.poll(10, TimeUnit.MILLISECONDS); // process data consumedCount++; } } catch (InterruptedException e) { // the thread has been interrupted: cleanup and exit consumedCount--; //re-interrupt the thread in case the interrupt flag is needeed higher up Thread.currentThread().interrupt(); } System.out.println("Consumed " + consumedCount + " objects"); } }
}
public class ProducerConsumerExample {
static class ProducedData { // empty data object }
public static void main(String[] args) throws InterruptedException {
BlockingQueue<ProducedData> queue = new ArrayBlockingQueue<ProducedData>(1000); // choice of queue determines the actual behavior: see various BlockingQueue implementations
Thread producer = new Thread(new Producer(queue));
Thread consumer = new Thread(new Consumer(queue));
producer.start();
consumer.start();
Thread.sleep(1000);
producer.interrupt();
Thread.sleep(10);
consumer.interrupt();
}
}
}
使用synchronized / volatile 對讀寫操作可見性的影響
- 正如我們了解的那樣泥耀,我們應(yīng)該使用synchronized 關(guān)鍵字來進(jìn)行同步方法,或者同步代碼塊蛔添。但是我們有些人可能會注意到 synchronized 與 volatile 關(guān)鍵字痰催。提供了read / write barrier。那么問題來了什么是 read / write barrier迎瞧。我們來看一下下面你的例子:
class Counter {
private Integer count = 10;
public synchronized void incrementCount() {
count++;
}
public Integer getCount() {
return count;
}
}
- 我們假設(shè)線程A 調(diào)用了incrementCount() 線程B調(diào)用了getCount夸溶。在這個(gè)場景中我不能保證數(shù)據(jù)更新對線程B可見,甚至很有可能線程B永遠(yuǎn)看不到數(shù)值更新凶硅。
- 為了理解這個(gè)行為缝裁,我們需要理解java 內(nèi)存模型與硬件之間的關(guān)系。在Java中每個(gè)線程都有自己的線程stack足绅。這個(gè)stack 里面包含:調(diào)用線程的方法還有線程中創(chuàng)建的本地變量在多核操作系統(tǒng)中捷绑,這個(gè)場景中很有可能這個(gè)線程存在于某個(gè)cpu核心或者緩存中韩脑。如果存在于某個(gè)線程中,一個(gè)對象使用 synchronized (or volatile) 關(guān)鍵字在synchronized代碼塊之后線程與主內(nèi)存同步自己的本地變量粹污。 synchronized (or volatile)關(guān)鍵字創(chuàng)建了一個(gè)讀寫屏障并且確保線程的最新數(shù)據(jù)可見段多。
- 在我們的這個(gè)案例中,既然線程B還沒有使用synchronized 來同步計(jì)算壮吩,或許線程B永遠(yuǎn)看不到線程A的數(shù)據(jù)更新了进苍。為了確保最新的數(shù)據(jù)我們需要用synchronized 來修飾getCount方法。
public synchronized Integer getCount() { return count; }
- 現(xiàn)在當(dāng)線程A更新完數(shù)據(jù) 粥航,然后釋放Couter 對象的鎖琅捏。與此同時(shí)創(chuàng)建一個(gè)寫屏障并且將線程A的變化更新到主內(nèi)存。類似的當(dāng)線程B請求一個(gè)鎖递雀,在主內(nèi)存讀取數(shù)進(jìn)入讀屏障柄延,然后久可以看到所有的更新改動。
- 使用volatile 關(guān)鍵字也可以代理同樣的效果缀程。volatile關(guān)鍵字修飾的變量的寫入操作搜吧, 會將所有更新同步到主內(nèi)存。volatile關(guān)鍵字修飾的變量的讀取操作將會讀取主內(nèi)存中的值杨凑。
獲取你的程序中的所有線程狀態(tài)
代碼片段 Code snippet
import java.util.Set;
public class ThreadStatus {
public static void main(String args[]) throws Exception {
for (int i = 0; i < 5; i++) {
Thread t = new Thread(new MyThread());
t.setName("MyThread:" + i);
t.start();
}
int threadCount = 0;
Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
for (Thread t : threadSet) {
if (t.getThreadGroup() == Thread.currentThread().getThreadGroup()) {
System.out.println("Thread :" + t + ":" + "state:" + t.getState());
++threadCount;
}
}
System.out.println("Thread count started by Main thread:" + threadCount);
}
}
class MyThread implements Runnable {
public void run() {
try {
Thread.sleep(2000);
} catch (Exception err) {
err.printStackTrace();
}
}
}
解釋:
Thread.getAllStackTraces().keySet()返回包含application 和 系統(tǒng)的所有線程滤奈。如果你只對你創(chuàng)建的線程的狀態(tài)感興趣,那么遍歷Thread set 然后通過檢查 Thread Group 來判斷線程是否屬于app的線程撩满。
使用ThreadLocal
- ThreadLocal 是在Java并發(fā)編程中經(jīng)常用到的工具蜒程。他允許一個(gè)變量在不同線程有不同的值。這樣拿來說伺帘,即使是相同的代碼在不同的線程中運(yùn)行昭躺,這些操作將不貢獻(xiàn)value,而且每個(gè)線程都有自己的本地變量伪嫁。
- 例如這個(gè)在servlet中經(jīng)常被發(fā)布的context 领炫。你可能會這么做:
private static final ThreadLocal<MyUserContext> contexts = new ThreadLocal<>();
public static MyUserContext getContext() {
return contexts.get(); // get returns the variable unique to this thread
}
public void doGet(...) {
MyUserContext context = magicGetContextFromRequest(request);
contexts.put(context); // save that context to our thread-local - other threads
// making this call don't overwrite ours
try {
// business logic
} finally {
contexts.remove(); // 'ensure' removal of thread-local variable
}
}
使用共享全局隊(duì)列的多producer/consumer 案例
- 如下代碼展示了 多producer/consumer 編程。生產(chǎn)者和消費(fèi)者模型共享一個(gè)全局隊(duì)列张咳。
import java.util.concurrent.*;
import java.util.Random;
public class ProducerConsumerWithES {
public static void main(String args[]) {
BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();
ExecutorService pes = Executors.newFixedThreadPool(2);
ExecutorService ces = Executors.newFixedThreadPool(2);
pes.submit(new Producer(sharedQueue, 1));
pes.submit(new Producer(sharedQueue, 2));
ces.submit(new Consumer(sharedQueue, 1));
ces.submit(new Consumer(sharedQueue, 2));
pes.shutdown();
ces.shutdown();
}
}
/* Different producers produces a stream of integers continuously to a shared queue,
which is shared between all Producers and consumers */
class Producer implements Runnable {
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
private Random random = new Random();
public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
this.threadNo = threadNo;
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
// Producer produces a continuous stream of numbers for every 200 milli seconds
while (true) {
try {
int number = random.nextInt(1000);
System.out.println("Produced:" + number + ":by thread:"+ threadNo);
sharedQueue.put(number);
Thread.sleep(200);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
this.sharedQueue = sharedQueue;
this.threadNo = threadNo;
}
@Override
public void run() {
// Consumer consumes numbers generated from Producer threads continuously
while(true){
try {
int num = sharedQueue.take();
System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
- 輸出
Produced:497:by thread:1
Produced:300:by thread:2
Consumed: 497:by thread:1
Consumed: 300:by thread:2
Produced:64:by thread:2
Produced:984:by thread:1
Consumed: 64:by thread:1
Consumed: 984:by thread:2
Produced:102:by thread:2
Produced:498:by thread:1
Consumed: 102:by thread:1
Consumed: 498:by thread:2
Produced:168:by thread:2
Produced:69:by thread:1
Consumed: 69:by thread:2
Consumed: 168:by thread:1
- 說明
- sharedQueue帝洪,是一個(gè)LinkedBlockingQueue,在生產(chǎn)者和消費(fèi)者線程之間共享
- 生產(chǎn)者線程每隔200ms 生產(chǎn)一個(gè)數(shù)字 然后持續(xù)的添加入隊(duì)列
- 消費(fèi)者從sharedQueue 持續(xù)消耗數(shù)字
- 這個(gè)程序?qū)崿F(xiàn)無需 synchronized或者鎖結(jié)構(gòu)脚猾。 BlockingQueue 是實(shí)現(xiàn)這個(gè)模型的關(guān)鍵葱峡。
- BlockingQueue 就是為了生產(chǎn)/消費(fèi) 模型來設(shè)計(jì)的
- BlockingQueue是線程安全的。所有對列的方法都是原子類型的操作婚陪,其使用了 內(nèi)部鎖或者其他類型的并發(fā)控制族沃。
使用Threadpool 相加兩個(gè) int 類型的數(shù)組
- 一個(gè)線程池就是一個(gè)隊(duì)列的任務(wù),其中每個(gè)任務(wù)都會被其中的線程執(zhí)行。
- 如下的案例展示了如何使用線程池添加兩個(gè)int 類型的數(shù)組脆淹。
public static void testThreadpool() {
int[] firstArray = { 2, 4, 6, 8 };
int[] secondArray = { 1, 3, 5, 7 };
int[] result = { 0, 0, 0, 0 };
ExecutorService pool = Executors.newCachedThreadPool();
// Setup the ThreadPool:
// for each element in the array, submit a worker to the pool that adds elements
for (int i = 0; i < result.length; i++) {
final int worker = i;
pool.submit(() -> result[worker] = firstArray[worker] + secondArray[worker] );
}
// Wait for all Workers to finish:
try {
// execute all submitted tasks
pool.shutdown();
// waits until all workers finish, or the timeout ends
pool.awaitTermination(12, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
pool.shutdownNow(); //kill thread
}
System.out.println(Arrays.toString(result));
}
說明:
- 這個(gè)案例只是單純展示用常空。在實(shí)際使用中,我們不會僅僅為了這點(diǎn)任務(wù)就使用線程池盖溺。
- Java7 中你將看到使用匿名內(nèi)部類而不是lamda 來實(shí)現(xiàn)這個(gè)任務(wù)漓糙。
Pausing Execution 暫停執(zhí)行處理器時(shí)間對其他線程可用。sleep 方法有兩個(gè)復(fù)寫方法在Thread 類制作烘嘱。
- 指定sleep 時(shí)間
public static void sleep(long millis) throws InterruptedException
- 指定sleep時(shí)間
public static void sleep(long millis, int nanos)
Thread 源碼介紹
這對于系統(tǒng)核心的調(diào)度是非常重要的昆禽。這個(gè)可能產(chǎn)生不可預(yù)測的結(jié)果,而且有些實(shí)現(xiàn)甚至不考慮nano s參數(shù)蝇庭。我們建議在 用try catch 包住 Thread.sleep 操作并且catch InterruptedException. 異常醉鳖。
線程中斷/終止線程
- 每個(gè)Java線程都有一個(gè)interrupt flag,默認(rèn)的是false。打斷一個(gè)線程哮内,最基礎(chǔ)的就是將這個(gè)flag 設(shè)置成true盗棵。在這個(gè)線程中執(zhí)行的代碼會暗中觀察這個(gè)標(biāo)記,然后做出反應(yīng)北发。當(dāng)然代碼也可以忽略這個(gè)flag纹因。但是為啥每個(gè)線程都要樹flag?畢竟在線程中有一個(gè)Boolean 變量我們可以更好的管理線程琳拨。當(dāng)然了在線程里面還有一些特別的方法瞭恰,他們會在線程被中斷的時(shí)候運(yùn)行。這些方法叫做阻塞方法狱庇。這些方法會將線程設(shè)置成WAITING或是WAITING 狀態(tài)惊畏。當(dāng)線程是這個(gè)狀態(tài)的話,那么打斷線程會拋出一個(gè)InterruptedException密任。而不是interrupt flag 被設(shè)置成true陕截。然后這個(gè)線程狀態(tài)有一次變成RUNNABLE。調(diào)用阻塞方法的時(shí)候會強(qiáng)制要求處理InterruptedException批什。之后在這個(gè)線程中打斷的時(shí)候就會產(chǎn)生一個(gè)WAIT 狀態(tài)。注意社搅,不是所有方法都要響應(yīng)中斷行為驻债。最終線程被設(shè)置成中斷狀態(tài),然后進(jìn)入一個(gè)阻塞方法形葬,然后立刻拋出一個(gè)InterruptedException合呐, interrupt flag 將被清除。
- 與這些原理不同笙以,java 并沒有特別的特別語義描述中斷淌实。代碼非常容易描述打斷。但是大多數(shù)情況下中斷是用來通知一個(gè)線程應(yīng)該盡快停下來。從上面的描述可以清楚的看出拆祈,這取決于線程上的代碼恨闪,對中斷作出適當(dāng)?shù)姆磻?yīng)以停止運(yùn)行。停止線程是一種寫作放坏。當(dāng)一個(gè)線程被打斷了咙咽,它在運(yùn)行的代碼可能會在棧空間下沉好幾個(gè)level淤年。大多數(shù)方法不調(diào)用阻塞方法钧敞,并且結(jié)束時(shí)間充足,無須延遲 關(guān)閉 線程麸粮。代碼在一個(gè)loop 中執(zhí)行溉苛,處理任務(wù)應(yīng)該首先關(guān)注中斷。Loop 應(yīng)該盡可能的初始化任務(wù)弄诲,檢測打斷狀態(tài)來推出loop愚战。對于一個(gè)有限的loop,所有任務(wù)必須在loop終止之前被執(zhí)行完畢威根,以防有任務(wù)沒有被執(zhí)行凤巨。如果在語義上是可能的,那么它可以
簡單地傳遞InterruptedException斷洛搀,并聲明拋出它敢茁。那么它對于它的調(diào)用者來說的話它就是一個(gè)阻塞方法。如果不能傳遞異常留美,那么它至少應(yīng)該設(shè)置打斷狀態(tài)彰檬,那么調(diào)用者就會知道線程被打斷了。在一些案例中谎砾,需要持續(xù)等待而無視等待異常逢倍。在這種情況下,必須延遲設(shè)置打斷狀態(tài)景图,知道它不再等待较雕。這可能調(diào)用本地變量,這個(gè)本地變量用來檢查推出方法和打斷方法的優(yōu)先級挚币。
案例
用中斷線程來打斷任務(wù)執(zhí)行
class TaskHandler implements Runnable {
private final BlockingQueue<Task> queue;
TaskHandler(BlockingQueue<Task> queue) {
this.queue = queue;
}
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) { //
try {
Task task = queue.take(); // blocking call, responsive to interruption
handle(task);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
private void handle(Task task) {
// actual handling
}
}
}
等待程序執(zhí)行完畢亮蒋,延遲設(shè)置打斷flag
class MustFinishHandler implements Runnable {
private final BlockingQueue<Task> queue;
MustFinishHandler(BlockingQueue<Task> queue) {
this.queue = queue;
}
@Override
public void run() {
boolean shouldInterrupt = false;
while (true) {
try {
Task task = queue.take();
if (task.isEndOfTasks()) {
if (shouldInterrupt) {
Thread.currentThread().interrupt();
}
return;
}
handle(task);
} catch (InterruptedException e) {
shouldInterrupt = true; // must finish, remember to set interrupt flag when we're
done
}
}
}
private void handle(Task task) {
// actual handling
}
}
固定的任務(wù)列表不過在中斷時(shí)候可能會提前退出。
class GetAsFarAsPossible implements Runnable {
private final List<Task> tasks = new ArrayList<>();
@Override
public void run() {
for (Task task : tasks) {
if (Thread.currentThread().isInterrupted()) {
return;
}
handle(task);
}
}
private void handle(Task task) {
// actual handling
}
}