線程是操作系統(tǒng)中獨立的個體,但這些個體如果不經過特殊的處理就不能成為一個整體实抡。線程間的通信就是成為整體的必用方案之一抡笼,是線程間進行通信之后,系統(tǒng)之間的交互性就會跟強大甜奄,在大大提高cpu利用率的同時還會是程序員對各線程任務在處理的過程中進行的把控與監(jiān)督柠横。在本次主要講解以下的技術點:
- 使用wait/notify實現(xiàn)線程間的通信
- 生產者/消費者模式的實現(xiàn)
- 方法join的使用
- ThreadLocal類的使用
利用wait/notify實現(xiàn)線程間的通信
方法wait()的作用就是是當前執(zhí)行的代碼線程進行等待窃款,wait()方法是Object類的方法,該方法用來將當前線程置入“預執(zhí)行隊列”中牍氛,并且在wait()所在的代碼出停止執(zhí)行晨继,知道接到通知或被中斷為止。在調用wait()之前搬俊,線程必須獲得該對象的對象級別鎖紊扬,即只能在同步方法或同步代碼塊中調用wait()方法。在執(zhí)行wait()方法后唉擂,當前線程釋放鎖餐屎。在從wait()返回前,線程與其他線程競爭重新獲得鎖玩祟。如果調用wait()時沒有持有適當?shù)逆i啤挎,則拋出異常,但不需要顯式的捕獲卵凑。
方法notify()也要在同步方法或者同步方法塊中調用庆聘,即在調用前,線程也必須獲得該對象的對象級別鎖勺卢。如果調用notify()時沒有持有適當?shù)逆i伙判,也會拋出異常。該方法用來通知那些可能等待該對象的對象鎖的其他線程黑忱,對其發(fā)出通知,并使他等待獲取該對象的對象鎖甫煞。需要說明的是菇曲,在執(zhí)行notify()方法方法后,當前線程不會馬上釋放該對象鎖抚吠,呈wait狀態(tài)的線程也并不能馬上獲取該對象鎖常潮,要等到執(zhí)行notify()方法的線程將程序執(zhí)行完,也就是說退出synchronized代碼塊后楷力,當前線程才會釋放鎖喊式,而呈wait狀態(tài)所在線程才可以獲取該對象鎖、當?shù)谝粋€獲得了該對象鎖的wait線程完畢之后萧朝,他會釋放調該對象鎖岔留,此時如果該對象沒有再次使用notify語句,則即使該對象已經空閑检柬,其他wait裝袋等待的線程由于沒有得到該對象的通知献联,還會繼續(xù)阻塞在wait狀態(tài),直到這個對象發(fā)出一個notify或者ntotifyall。
用一句話來總結一下wait和notify:wait使線程停止運行里逆,而notiy是停止的線程繼續(xù)運行进胯。
下面通過一個例子來了解wait和notfiy具體的使用。
創(chuàng)建myList具體實現(xiàn)類运悲,具體代碼如下:
import java.util.ArrayList;
import java.util.List;
public class myList {
private List<String> list = new ArrayList<String>();
public myList(List<String> list){
this.list = list;
}
public void add(){
list.add("hello");
}
public int size(){
return list.size();
}
}
創(chuàng)建線程類
//線程類A,只要實現(xiàn)往list中添加數(shù)據(jù)
public class ThreadA extends Thread{
myList my;
public ThreadA(myList my){
this.my = my;
}
public void run(){
synchronized (my) {
for(int i=0;i<10;i++){
my.add();
System.out.println("add successful ! and size = "+my.size());
if(my.size()==5){
my.notify();
System.out.println("notify send~项钮!");
}
}
}
}
}
//線程類B班眯,等待notify才能繼續(xù)運行。
public class ThreadB extends Thread{
myList my ;
public ThreadB(myList my){
this.my = my;
}
public void run(){
System.out.println("threadb start!");
synchronized (my) {
try {
System.out.println("threadb start! and wait lock");
my.wait();
System.out.println("threadb start! and had the lock");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
測試主體類:
import java.util.ArrayList;
import java.util.List;
public class Run {
public static void main(String[] args) throws InterruptedException {
List list = new ArrayList<String>();
myList my = new myList(list);
ThreadA a = new ThreadA(my);
ThreadB b = new ThreadB(my);
b.start();
Thread.sleep(100);//確保B線程先運行烁巫。
a.start();
}
}
運行結果如圖:
可以看到署隘,在size=5時,確實發(fā)出了一個通知亚隙,但是等到ThreadA中run中的代碼執(zhí)行完畢之后磁餐,ThreadB才獲取了鎖,并且開始運行阿弃。這也證實了前面所說的nofify并不能馬上就喚醒wait诊霹,而是要等到對象鎖中的代碼執(zhí)行完畢之后才能喚醒wait中大代碼。
也許你會很好奇渣淳,如果在wait之前就已經發(fā)出了notify脾还,那么結果會怎么樣?
修改運行類中代碼入愧,讓ThreadA先執(zhí)行鄙漏,及可以觀察到這樣的結果。
import java.util.ArrayList;
import java.util.List;
public class Run {
public static void main(String[] args) throws InterruptedException {
List list = new ArrayList<String>();
myList my = new myList(list);
ThreadA a = new ThreadA(my);
ThreadB b = new ThreadB(my);
a.start();
Thread.sleep(100);
b.start();
}
}
運行結果如圖:
可以看到棺蛛,ThreadB還一直在wait怔蚌,但是因為在ThreadA之前就已經notify過了,所以ThreadB將會一直處于阻塞狀態(tài)旁赊。那么如何修改程序桦踊,達到只要nofify過,wait就不會一直阻塞呢终畅?有興趣的可以自己去嘗試嘗試钞钙。
另外,這里有幾點要進行說明:
- 當方法wait()被執(zhí)行完后声离,鎖自動被釋放芒炼,而執(zhí)行notify()之后,鎖卻不會自動釋放术徊。
- 當存在多個wait(),而只執(zhí)行一次notify()時本刽,會隨機喚醒一個wait,如果想一次性將所有持有相同鎖的wait喚醒的話,可以使用notifyAll()子寓。
- wait()方法還可以帶一個參數(shù)暗挑,形如:wait(long),功能是等待某一時間內是否有線程對鎖進行喚醒斜友,如果超過這個時間則自動喚醒炸裆。
- 在使用wait/notify模式時,還需要注意另一種情況鲜屏,就是wait等待的條件發(fā)生了變化時烹看,容易造成程序邏輯的混亂。
通過管道進行線程間的通信
在java中提供了各種各樣的輸入/輸出流洛史,使我們能夠很方便的對數(shù)據(jù)進行操作惯殊,其中的管道流(PipeStream)是一種很特殊的流,用于在不同線程間直接傳送數(shù)據(jù)也殖。一個線程發(fā)送數(shù)據(jù)到輸出管道土思,另一個線程從輸入管道中讀取數(shù)據(jù)。通過使用管道忆嗜,實現(xiàn)不同線程間的通信己儒,而無須借助類似于臨時文件之類的東西。
這里我們主要介紹下面4個類:
1 PipedInputStream 和 PipedOutputStream
2 PipedReader 和 PipedWriter
下面我們直接通過例子來看這4個類該如何使用:
通過字節(jié)流實現(xiàn)線程間的數(shù)據(jù)傳遞:
WriterData寫如數(shù)據(jù)的文件代碼如下:
import java.io.IOException;
import java.io.PipedOutputStream;
public class WriterData {
public void writer(PipedOutputStream out){
try{
System.out.println("writer:");
for(int i=0;i<310;i++){
String outData = ""+(i+1);
out.write(outData.getBytes());
System.out.printf(" "+outData);
}
System.out.println();
out.close();
}catch(IOException e){
e.printStackTrace();
}
}
}
讀數(shù)據(jù)ReadData的文件代碼如下:
import java.io.IOException;
import java.io.PipedInputStream;
public class ReadData {
public void readData(PipedInputStream in) {
try {
System.out.println("read :");
byte[] byteArray = new byte[32];
int readLength = in.read(byteArray);
while(readLength != -1){
String newData = new String(byteArray,0,readLength);
System.out.printf(""+newData);
readLength = in.read(byteArray);
}
System.out.println();
in.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
線程類:
import java.io.PipedOutputStream;
public class ThreadA implements Runnable{
private WriterData writerData;
private PipedOutputStream out;
public ThreadA(WriterData writerData,PipedOutputStream out){
this.writerData = writerData;
this.out = out;
}
@Override
public void run() {
writerData.writer(out);
}
}
//線程B
import java.io.PipedInputStream;
public class ThreadB implements Runnable{
private ReadData readData;
private PipedInputStream in;
public ThreadB(ReadData readData,PipedInputStream in){
this.readData = readData;
this.in = in;
}
@Override
public void run() {
readData.readData(in);
}
}
測試主體類:
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
public class runTest {
public static void main(String[] args) throws Exception{
WriterData writerData = new WriterData();
ReadData readData = new ReadData();
PipedOutputStream out = new PipedOutputStream();
PipedInputStream in = new PipedInputStream();
out.connect(in);//使兩個Stream之間產生通信連接
//in.connect(out);
Runnable a = new ThreadA(writerData,out);
Runnable b = new ThreadB(readData,in);
Thread thA = new Thread(a);
Thread thB = new Thread(b);
thA.start();
Thread.sleep(1000);
thB.start();
}
}
運行結果如圖:
[圖片上傳中捆毫。址愿。。(3)]
從程序的結果來看冻璃,兩個線程通過管道成功進行了數(shù)據(jù)傳遞响谓。
字符流實現(xiàn):
因為代碼同字節(jié)流的大部分相同,這里一并粘貼顯示出來
//寫數(shù)據(jù)
import java.io.IOException;
import java.io.PipedWriter;
public class WriterData {
public void writerDate(PipedWriter out ) throws IOException{
System.out.println("writer:");
for(int i=0;i<300;i++){
String outData = ""+(i+1);
out.write(outData);
System.out.print(outData);
}
System.out.println();
out.close();
}
}
//讀數(shù)據(jù)
import java.io.IOException;
import java.io.PipedReader;
public class ReadData {
public void readData(PipedReader in) throws IOException{
System.out.println("read:");
char[] charArray = new char[32];
int readLength =in.read(charArray);
while(readLength != -1){
String newData = new String(charArray,0,readLength);
System.out.print(newData);
readLength = in.read(charArray);
}
System.out.println();
in.close();
}
}
//線程A的代碼
import java.io.IOException;
import java.io.PipedWriter;
public class ThreadA extends Thread {
private PipedWriter writer;
private WriterData writerData;
public ThreadA(WriterData writerData,PipedWriter writer){
this.writer = writer;
this.writerData = writerData;
}
public void run(){
try {
writerData.writerDate(writer);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
//線程B的代碼
import java.io.IOException;
import java.io.PipedReader;
public class ThreadB extends Thread{
private ReadData readData ;
private PipedReader reader;
public ThreadB(ReadData readData,PipedReader reader){
this.readData = readData;
this.reader = reader;
}
public void run() {
try {
readData.readData(reader);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
//測試類主體
import java.io.PipedReader;
import java.io.PipedWriter;
public class runTest {
public static void main(String[] args) throws Exception{
PipedWriter writer = new PipedWriter();
PipedReader reader = new PipedReader();
WriterData writerData = new WriterData();
ReadData readData = new ReadData();
writer.connect(reader);
ThreadA a = new ThreadA(writerData,writer);
ThreadB b = new ThreadB(readData,reader);
a.start();
Thread.sleep(1000);
((Thread) b).start();
}
}
運行結果同字節(jié)流的輸出結果是一樣的省艳。這里就不在粘貼結果了娘纷。更多關于這四個關鍵字的用法,參考javaI/O中的內容跋炕。
join()方法
方法join的作用是使所屬的線程對象x正常執(zhí)行run()方法中的任務赖晶,而使當前線程y進行無限期的阻塞。等待線程x銷毀后在繼續(xù)執(zhí)行線程z后面的代碼辐烂。
方法join具有使線程排隊運行的作用遏插,有些類似同步的運行效果。join與synchronized的區(qū)別是:join在內部使用wait()方法進行等待纠修,而synchronized關鍵字使用的是“對象監(jiān)視器”原理作為同步胳嘲。
在join過程中,如果當前線程對象被中斷扣草,則當前線程出現(xiàn)異常了牛。下面以一個例子來說明join方法的使用:
線程類:
public class MyThread extends Thread{
public void run(){
int randValue = (int)(Math.random()*10000);
System.out.println("我要運行"+randValue+"毫秒颜屠,這個時間每次都是是不確定的");
try {
Thread.sleep(randValue);
System.out.println("終于運行完畢了,我要退出了");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
測試類鹰祸。
public class runTest {
public static void main(String[] args) throws InterruptedException{
MyThread myThread = new MyThread();
myThread.start();
//myThread.join();
System.out.println("這句話甫窟,我想等線程執(zhí)行完我在說出來");
}
}
當沒有加入join方法時的運行效果:
當加入join方法時的運行效果:
從例子中,應該能明顯感受到join的威力了吧蛙婴,但join還可以當成join(long)來進行使用粗井。如果上例中的運行類修改呈下面的代碼:
public class runTest {
public static void main(String[] args) throws InterruptedException{
MyThread myThread = new MyThread();
myThread.start();
myThread.join(2000);
System.out.println("這句話,我想等線程執(zhí)行完我在說出來");
}
}
可能出現(xiàn)的結果如圖:
補充說明:
程序一開始就輸出第一句:“我要運行5687毫秒街图,這個時間每次都是是不確定的”
間隔2000毫秒后輸出:“這句話浇衬,我想等線程執(zhí)行完我在說出來”
最后在距離程序開始5687毫秒的時候輸出最后一句話。
join(long)方法的作用就不言而喻了台夺,其實質就是給某個線程執(zhí)行多長時間径玖。與sleep(long)有一定的相同之處痴脾。
但是join(long)與sleep(long)的最大區(qū)別就是join(long)方法具有釋放鎖的特點(其內部是使用wait(long)來實現(xiàn)的)颤介,而sleep(long)不具備釋放鎖的特點。
ThreadLocal類的使用
變量值的共享可以使用public static變量的形式赞赖,所有的線程都使用同一個public static變量滚朵。如果想實現(xiàn)每一個線程都有自己的共享變量則可以通過ThreadLocal類來實現(xiàn)。
該類提供了線程局部 (thread-local) 變量前域。這些變量不同于它們的普通對應物辕近,因為訪問某個變量(通過其 get 或 set 方法)的每個線程都有自己的局部變量,它獨立于變量的初始化副本匿垄。ThreadLocal 實例通常是類中的 private static 字段移宅,它們希望將狀態(tài)與某一個線程(例如,用戶 ID 或事務 ID)相關聯(lián)椿疗。(源碼的解讀 可以參android開發(fā)藝術探索的第*章)
例如漏峰,以下類生成對每個線程唯一的局部標識符。線程 ID 是在第一次調用UniqueThreadIdGenerator.getCurrentThreadId() 時分配的届榄,在后續(xù)調用中不會更改浅乔。
import java.util.concurrent.atomic.AtomicInteger;
public class UniqueThreadIdGenerator {
private static final AtomicInteger uniqueId = new AtomicInteger(0);
private static final ThreadLocal < Integer > uniqueNum =
new ThreadLocal < Integer > () {
@Override protected Integer initialValue() {
return uniqueId.getAndIncrement();
}
};
public static int getCurrentThreadId() {
return uniqueId.get();
}
} // UniqueThreadIdGenerator
每個線程都保持對其線程局部變量副本的隱式引用,只要線程是活動的并且 ThreadLocal 實例是可訪問的铝条;在線程消失之后靖苇,其線程局部實例的所有副本都會被垃圾回收(除非存在對這些副本的其他引用)。
該類有四個方法:
- protected T initialValue();
返回此線程局部變量的當前線程的“初始值”班缰。線程第一次使用 get() 方法訪問變量時將調用此方法贤壁,但如果線程之前調用了 set(T) 方法,則不會對該線程再調用 initialValue 方法埠忘。通常芯砸,此方法對每個線程最多調用一次萧芙,但如果在調用 get() 后又調用了 remove(),則可能再次調用此方法假丧。
該實現(xiàn)返回 null双揪;如果程序員希望線程局部變量具有 null 以外的值,則必須為 ThreadLocal 創(chuàng)建子類包帚,并重寫此方法渔期。通常將使用匿名內部類完成此操作。 - public T get()
返回此線程局部變量的當前線程副本中的值渴邦。如果變量沒有用于當前線程的值疯趟,則先將其初始化為調用 initialValue() 方法返回的值。 - public void set(T value)
將此線程局部變量的當前線程副本中的值設置為指定值谋梭。大部分子類不需要重寫此方法信峻,它們只依靠 initialValue() 方法來設置線程局部變量的值。 - public void remove()
移除此線程局部變量當前線程的值瓮床。如果此線程局部變量隨后被當前線程讀取盹舞,且這期間當前線程沒有設置其值,則將調用其 initialValue() 方法重新初始化其值隘庄。這將導致在當前線程多次調用 initialValue 方法踢步。
如果在子線程中取得父線程繼承下來的值,則需要用到InheritableThreadLocal丑掺。
修改默認值可以實例代碼如下:
public class Locat extends InheritableThreadLocal<String>{
@Override
protected String initialValue() {
//return super.initialValue();
return "默認的值在這里進行修改";
}
}
//運行類
public class runTest {
static Locat t1 = new Locat();
public static void main(String [] args){
System.out.println(t1.get());
}
}
輸出結果就是:
如果在繼承的同時還需要對值進行進一步的處理获印,可以這么干:
通過繼承InheritableThreadLocal修改默認值和繼承值
public class Locat extends InheritableThreadLocal<String>{
@Override
protected String initialValue() {
//return super.initialValue();
return "默認的值在這里進行修改";
}
@Override
protected String childValue(String parentValue) {
//return super.childValue(parentValue);
return parentValue+"這里修改子線程的值";
}
}
把數(shù)據(jù)封裝到tools中
public class Tools {
public static Locat t1 = new Locat();
}
線程a的代碼
public class ThreadA extends Thread{
public void run(){
System.out.println(Tools.t1.get());
}
}
運行類的代碼如下:
public class runTest {
public static void main(String [] args){
System.out.println(Tools.t1.get());
ThreadA a = new ThreadA();
a.start();
}
}
運行結果如下:
可見。我們成功修改了初始值和子線程的初始值街州。
值得注意的是兼丰,如果子線程取得值的同時,主線程將InheritableLocal中的值進行修改唆缴,那么子線程取得到的值還是舊值鳍征。
JA