wait(),notify() ,線程的中斷唆涝,管道找都,管道泄漏 (?byte[] circular buffer)

首先線程基礎(chǔ):

public class MultiThread{
    public static void main(String[] args) {
 "獲取Java線程管理MXBean"
        ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
"不需要獲取同步的monitor和synchronizer信息廊酣,僅獲取線程和線程堆棧信息"
        ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false, false);
"遍歷線程信息能耻,僅打印線程ID和線程名稱信息"
        for (ThreadInfo threadInfo : threadInfos) {
            System.out.println("[" + threadInfo.getThreadId() + "] " + threadInfo.
                    getThreadName());
        }
    }
}

輸出結(jié)果:

[6] Monitor Ctrl-Break
[5] Attach Listener
[4] Signal Dispatcher
[3] Finalizer
[2] Reference Handler
[1] main

首先,結(jié)論是在運(yùn)行main方法的時候亡驰,并不是一個單獨(dú)的線程晓猛,Attach Listener Signal Dispatcher,然后這兩個線程凡辱,是和jvm的attach機(jī)制相關(guān)戒职,F(xiàn)inalizer Reference Handler 這兩個是和垃圾回收機(jī)制有關(guān),不可達(dá)對象要被垃圾回收透乾,至少要經(jīng)歷兩次標(biāo)記過程洪燥。第一次標(biāo)記時執(zhí)行finalize()方法磕秤,并做記號,第二次標(biāo)記則不會再執(zhí)行finalize()方法了捧韵。

線程狀態(tài)

public class ThreadState {
    public static void main(String[] args) {
        new Thread(new TimeWaiting (), "TimeWaitingThread").start();
        new Thread(new Waiting(), "WaitingThread").start();
"使用兩個Blocked線程市咆,一個獲取鎖成功,另一個被阻塞"
        new Thread(new Blocked(), "BlockedThread-1").start();
        new Thread(new Blocked(), "BlockedThread-2").start();
    }
         "該線程不斷地進(jìn)行睡眠"
    static class TimeWaiting implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    Thread.sleep(100000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
          "該線程在Waiting.class實(shí)例上等待"
    static class Waiting implements Runnable {@Override
    public void run() {
        while (true) {
            synchronized (Waiting.class) {
                try {
                    Waiting.class.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    }
          "該線程在Blocked.class實(shí)例上加鎖后再来,不會釋放該鎖"
    static class Blocked implements Runnable {
        public void run() {
            synchronized (Blocked.class) {
                while (true) {
                    try {
                        Thread.sleep(100000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

image.png

image.png

線程的通信

  • 一種是通過共享變量蒙兰,然后為了解決多線程數(shù)據(jù)一致性問題 ,加上JMM芒篷。比如volatile,synchronized 關(guān)鍵字搜变。
  • wait(),notify(),notifyAll() .Thread.join(),Thread.yield()等jdk自帶的api . 這個就不說了,但是強(qiáng)調(diào)的一點(diǎn)是只有獲取鎖之后梭伐,才能使用 wait(),方法痹雅,因?yàn)閣ait() 方法的含義就是釋放鎖,然后放棄cpu的調(diào)度糊识,進(jìn)入WAITING狀態(tài),然后其他線程調(diào)用notify()的時候摔蓝,也不一定是喚醒他這個線程赂苗,假如說有一個等待隊(duì)列的話,應(yīng)該是等待隊(duì)列的頭部贮尉,重新進(jìn)入ready狀態(tài)拌滋,參與cpu時間調(diào)度算法骤竹。如果是非公平的快鱼,那么也不一定是頭部。
  • 管道(線程之間傳遞數(shù)據(jù)世落,一般不是用字節(jié)流魏铅,而是對象昌犹,少用)
  • ThreadLocal (這里涉及到強(qiáng)引用,弱引用等览芳,放在下一篇講)
    接下來講講管道斜姥,字節(jié)流的PipedOutputStream,PipedInputStream沧竟,還有字符流的PipedReader和PipedWriter铸敏。

PipedOutputStream,PipedInputStream

管道流向流程圖:
PipedOutputStream從內(nèi)存輸出數(shù)據(jù)寫入到PipedInputStream的緩沖區(qū)悟泵,PipedInputStream從PipedInputStream緩沖區(qū)讀取管道流數(shù)據(jù)杈笔。


image.png

截取一段源碼。

public class PipedInputStream extends InputStream {
    boolean closedByWriter = false;
    volatile boolean closedByReader = false;
    boolean connected = false;

        "REMIND: identification of the read and write sides needs to be
           more sophisticated.  Either using thread groups (but what about
           pipes within a thread?) or using finalization (but it may be a
           long time until the next GC). "
    Thread readSide;
    Thread writeSide;

    private static final int DEFAULT_PIPE_SIZE = 1024;

   "
     * The default size of the pipe's circular input buffer.
     * @since   JDK1.1
     */
    // This used to be a constant before the pipe size was allowed
    // to change. This field will continue to be maintained
    // for backward compatibility.
    protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE;

    /**
     * The circular buffer into which incoming data is placed.
     * @since   JDK1.1
    "
    protected byte buffer[];

connect 方法是同步方法synchronized 糕非,看不懂???蒙具,這里好像是涉及到和disruptor框架核心類似的ringBuffer .先擱在這里敦第,我們后續(xù)再研究。 在ThreadLocal后一篇店量,我們補(bǔ)上ringBuffer,然后研究這里的circularBuffer .

public synchronized void connect(PipedInputStream snk) throws IOException {
        if (snk == null) {
            throw new NullPointerException();
        } else if (sink != null || snk.connected) {
            throw new IOException("Already connected");
        }
        sink = snk;
        snk.in = -1;
        snk.out = 0;
        snk.connected = true;
    }
protected synchronized void receive(int b) throws IOException {
        checkStateForReceive();
        writeSide = Thread.currentThread();
        if (in == out)
            awaitSpace();
        if (in < 0) {
            in = 0;
            out = 0;
        }
        buffer[in++] = (byte)(b & 0xFF);
        if (in >= buffer.length) {
            in = 0;
        }
    }

查看PipedOutputStream的源碼我們發(fā)現(xiàn)芜果,PipedOutputStream本身沒有緩沖區(qū),1024的緩沖區(qū)在輸入流中融师。PipedOutputStream的寫入方法
public void write(byte b[], int off, int len) flush() 都是調(diào)用的PipedInputStream的
synchronized void receive(byte b[], int off, int len) 方法
public synchronized void flush() throws IOException {
if (sink != null) {
synchronized (sink) {
sink.notifyAll();
}
}
}
最后我們上代碼:

public class MultiThreadPipedTest {
    public static class Write extends Thread{
        public PipedOutputStream pos = null;

        "獲取線程中的管道輸出流"
        public PipedOutputStream getPos(){
            pos = new PipedOutputStream();
            return pos;
        }
       "把數(shù)據(jù)通過管道輸出流發(fā)送出去"
        public void SentData(){
            PrintStream p = new PrintStream(pos);
            for(int i=1;i<10;i++){
                p.println("hello");
                p.flush();
            }
            p.close();
        }
        @Override
        public void run(){
            while(true);         //模擬耗時工作
        }
    }

    public static class Read extends Thread{
        public PipedInputStream pis = null;
        public String line = "null";

        "獲得線程中的管道輸入流"
        public PipedInputStream getPis(){
            pis = new PipedInputStream();
            return pis;
        }
        "利用管道輸入流接收管道數(shù)據(jù)"
        public void ReceiveData(){
            BufferedReader r = new BufferedReader(new InputStreamReader(pis));
            try {
                while(line!=null){
                    line = r.readLine();
                    System.out.println("read: "+line);
                }
                r.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void run(){
            while(true);        //模擬耗時工作
        }
    }

    public static class Other_Thread extends Thread{
        public PipedInputStream pis = null;
        public String line = "null";

        "獲得線程中的管道輸入流"
        public PipedInputStream getPis(){
            pis = new PipedInputStream();
            return pis;
        }
       "利用管道輸入流接收管道數(shù)據(jù)"
        public void ReceiveData(){
            BufferedReader r = new BufferedReader(new InputStreamReader(pis));
            try {
                while(line!=null){
                    line = r.readLine();
                    System.out.println("Other thread: "+line);
                }
                r.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void run(){
            while(true);   //模擬耗時操作
        }
    }
    public static void main(String args[]) throws InterruptedException, IOException{
        Write write = new Write();
        Read read = new Read();
        Other_Thread other = new Other_Thread();
        "連接兩個線程的管道流 ---read和write線程"
        write.getPos().connect(read.getPis());
        write.start();
        read.start();
        other.start();
        write.SentData();
        read.ReceiveData();
        Thread.sleep(2000);
        "重新連接兩個線程的管道流 ---Other_Thread和write線程"
        write.getPos().connect(other.getPis());
        write.SentData();
        other.ReceiveData();
    }
}
image.png

管道泄漏 右钾,一個線程寫,多個線程讀取數(shù)據(jù)旱爆,本來connect應(yīng)該是1對1 的舀射。但是下面的例子,write和read本來是一對的怀伦,但是Other_Thread 竊取了脆烟,read線程的數(shù)據(jù)。

public class PipedStreamLeakTest {
    public static class Write extends Thread{
        public PipedOutputStream pos;
        Write(PipedOutputStream pos){
            this.pos = pos;
        }
        public void run(){
            PrintStream p = new PrintStream(pos);
            for(int i=1;i<1000;i++){
                p.println("hello");
                p.flush();
            }
            p.close();
        }
    }

    public static class Read extends Thread{
        public PipedInputStream pis;
        public String line = "null";
        Read(PipedInputStream pis){
            this.pis = pis;
        }
        public void run(){
            BufferedReader r = new BufferedReader(new InputStreamReader(pis));
            try {
                while(line!=null){
                    line = r.readLine();
                    System.out.println("read: "+line);
                }
                r.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static class Other_Thread extends Thread{
        public PipedInputStream pis;
        public String line = "null";
        Other_Thread(PipedInputStream pis){
            this.pis = pis;
        }
        public void run(){
            BufferedReader r = new BufferedReader(new InputStreamReader(pis));
            try {
                while(line!=null){
                    line = r.readLine();
                    System.out.println("Other_Thread: "+line);
                }
                r.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String args[]) throws InterruptedException, IOException{
        "創(chuàng)建管道通信流"
        PipedOutputStream pos = new PipedOutputStream();
        PipedInputStream pis = new PipedInputStream(pos);
        new Write(pos).start();
        new Read(pis).start();
        new Other_Thread(pis).start();
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末房待,一起剝皮案震驚了整個濱河市邢羔,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌桑孩,老刑警劉巖拜鹤,帶你破解...
    沈念sama閱讀 211,290評論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異流椒,居然都是意外死亡敏簿,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,107評論 2 385
  • 文/潘曉璐 我一進(jìn)店門宣虾,熙熙樓的掌柜王于貴愁眉苦臉地迎上來惯裕,“玉大人,你說我怎么就攤上這事绣硝◎呤疲” “怎么了?”我有些...
    開封第一講書人閱讀 156,872評論 0 347
  • 文/不壞的土叔 我叫張陵域那,是天一觀的道長咙边。 經(jīng)常有香客問我,道長次员,這世上最難降的妖魔是什么败许? 我笑而不...
    開封第一講書人閱讀 56,415評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮淑蔚,結(jié)果婚禮上市殷,老公的妹妹穿的比我還像新娘。我一直安慰自己刹衫,他們只是感情好醋寝,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,453評論 6 385
  • 文/花漫 我一把揭開白布搞挣。 她就那樣靜靜地躺著,像睡著了一般音羞。 火紅的嫁衣襯著肌膚如雪囱桨。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,784評論 1 290
  • 那天嗅绰,我揣著相機(jī)與錄音舍肠,去河邊找鬼。 笑死窘面,一個胖子當(dāng)著我的面吹牛翠语,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播财边,決...
    沈念sama閱讀 38,927評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼肌括,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了酣难?” 一聲冷哼從身側(cè)響起谍夭,我...
    開封第一講書人閱讀 37,691評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎鲸鹦,沒想到半個月后慧库,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,137評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡馋嗜,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,472評論 2 326
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了吵瞻。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片葛菇。...
    茶點(diǎn)故事閱讀 38,622評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖橡羞,靈堂內(nèi)的尸體忽然破棺而出眯停,到底是詐尸還是另有隱情,我是刑警寧澤卿泽,帶...
    沈念sama閱讀 34,289評論 4 329
  • 正文 年R本政府宣布莺债,位于F島的核電站,受9級特大地震影響签夭,放射性物質(zhì)發(fā)生泄漏齐邦。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,887評論 3 312
  • 文/蒙蒙 一第租、第九天 我趴在偏房一處隱蔽的房頂上張望措拇。 院中可真熱鬧,春花似錦慎宾、人聲如沸丐吓。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,741評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽券犁。三九已至术健,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間粘衬,已是汗流浹背荞估。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評論 1 265
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留色难,地道東北人泼舱。 一個月前我還...
    沈念sama閱讀 46,316評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像枷莉,于是被迫代替她去往敵國和親娇昙。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,490評論 2 348

推薦閱讀更多精彩內(nèi)容

  • 本文主要講了java中多線程的使用方法笤妙、線程同步冒掌、線程數(shù)據(jù)傳遞、線程狀態(tài)及相應(yīng)的一些線程函數(shù)用法蹲盘、概述等股毫。在這之前...
    4ea0af17fd67閱讀 587評論 2 17
  • 場景 假設(shè)我們需要上傳一組動態(tài)增加的數(shù)據(jù), 輸入端可以看作inputSteam, 輸入端是outputSteam,...
    AssIstne閱讀 3,204評論 1 6
  • 從三月份找實(shí)習(xí)到現(xiàn)在,面了一些公司召衔,掛了不少铃诬,但最終還是拿到小米、百度苍凛、阿里趣席、京東、新浪醇蝴、CVTE宣肚、樂視家的研發(fā)崗...
    時芥藍(lán)閱讀 42,209評論 11 349
  • 概述 管道流是用來在多個線程之間進(jìn)行信息傳遞的Java流。管道流分為字節(jié)流管道流和字符管道流悠栓。字節(jié)管道流:Pipe...
    jijs閱讀 7,926評論 0 3
  • 你認(rèn)為的"偉大軟件"是什么霉涨? 保持低耦合,讓你的程序代碼因禁止修改而關(guān)閉惭适, 因允許拓展而開放笙瑟。重復(fù)利用。不必重做每...
    JocobZling閱讀 458評論 0 1