首先線程基礎(chǔ):
public class MultiThread{
public static void main(String[] args) {
"獲取Java線程管理MXBean"
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
"不需要獲取同步的monitor和synchronizer信息廊酣,僅獲取線程和線程堆棧信息"
ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false, false);
"遍歷線程信息能耻,僅打印線程ID和線程名稱信息"
for (ThreadInfo threadInfo : threadInfos) {
System.out.println("[" + threadInfo.getThreadId() + "] " + threadInfo.
getThreadName());
}
}
}
輸出結(jié)果:
[6] Monitor Ctrl-Break
[5] Attach Listener
[4] Signal Dispatcher
[3] Finalizer
[2] Reference Handler
[1] main
首先,結(jié)論是在運(yùn)行main方法的時候亡驰,并不是一個單獨(dú)的線程晓猛,Attach Listener Signal Dispatcher,然后這兩個線程凡辱,是和jvm的attach機(jī)制相關(guān)戒职,F(xiàn)inalizer Reference Handler 這兩個是和垃圾回收機(jī)制有關(guān),不可達(dá)對象要被垃圾回收透乾,至少要經(jīng)歷兩次標(biāo)記過程洪燥。第一次標(biāo)記時執(zhí)行finalize()方法磕秤,并做記號,第二次標(biāo)記則不會再執(zhí)行finalize()方法了捧韵。
線程狀態(tài)
public class ThreadState {
public static void main(String[] args) {
new Thread(new TimeWaiting (), "TimeWaitingThread").start();
new Thread(new Waiting(), "WaitingThread").start();
"使用兩個Blocked線程市咆,一個獲取鎖成功,另一個被阻塞"
new Thread(new Blocked(), "BlockedThread-1").start();
new Thread(new Blocked(), "BlockedThread-2").start();
}
"該線程不斷地進(jìn)行睡眠"
static class TimeWaiting implements Runnable {
@Override
public void run() {
while (true) {
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
"該線程在Waiting.class實(shí)例上等待"
static class Waiting implements Runnable {@Override
public void run() {
while (true) {
synchronized (Waiting.class) {
try {
Waiting.class.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
"該線程在Blocked.class實(shí)例上加鎖后再来,不會釋放該鎖"
static class Blocked implements Runnable {
public void run() {
synchronized (Blocked.class) {
while (true) {
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
線程的通信
- 一種是通過共享變量蒙兰,然后為了解決多線程數(shù)據(jù)一致性問題 ,加上JMM芒篷。比如volatile,synchronized 關(guān)鍵字搜变。
- wait(),notify(),notifyAll() .Thread.join(),Thread.yield()等jdk自帶的api . 這個就不說了,但是強(qiáng)調(diào)的一點(diǎn)是只有獲取鎖之后梭伐,才能使用 wait(),方法痹雅,因?yàn)閣ait() 方法的含義就是釋放鎖,然后放棄cpu的調(diào)度糊识,進(jìn)入WAITING狀態(tài),然后其他線程調(diào)用notify()的時候摔蓝,也不一定是喚醒他這個線程赂苗,假如說有一個等待隊(duì)列的話,應(yīng)該是等待隊(duì)列的頭部贮尉,重新進(jìn)入ready狀態(tài)拌滋,參與cpu時間調(diào)度算法骤竹。如果是非公平的快鱼,那么也不一定是頭部。
- 管道(線程之間傳遞數(shù)據(jù)世落,一般不是用字節(jié)流魏铅,而是對象昌犹,少用)
- ThreadLocal (這里涉及到強(qiáng)引用,弱引用等览芳,放在下一篇講)
接下來講講管道斜姥,字節(jié)流的PipedOutputStream,PipedInputStream沧竟,還有字符流的PipedReader和PipedWriter铸敏。
PipedOutputStream,PipedInputStream
管道流向流程圖:
PipedOutputStream從內(nèi)存輸出數(shù)據(jù)寫入到PipedInputStream的緩沖區(qū)悟泵,PipedInputStream從PipedInputStream緩沖區(qū)讀取管道流數(shù)據(jù)杈笔。
截取一段源碼。
public class PipedInputStream extends InputStream {
boolean closedByWriter = false;
volatile boolean closedByReader = false;
boolean connected = false;
"REMIND: identification of the read and write sides needs to be
more sophisticated. Either using thread groups (but what about
pipes within a thread?) or using finalization (but it may be a
long time until the next GC). "
Thread readSide;
Thread writeSide;
private static final int DEFAULT_PIPE_SIZE = 1024;
"
* The default size of the pipe's circular input buffer.
* @since JDK1.1
*/
// This used to be a constant before the pipe size was allowed
// to change. This field will continue to be maintained
// for backward compatibility.
protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE;
/**
* The circular buffer into which incoming data is placed.
* @since JDK1.1
"
protected byte buffer[];
connect 方法是同步方法synchronized 糕非,看不懂???蒙具,這里好像是涉及到和disruptor框架核心類似的ringBuffer .先擱在這里敦第,我們后續(xù)再研究。 在ThreadLocal后一篇店量,我們補(bǔ)上ringBuffer,然后研究這里的circularBuffer .
public synchronized void connect(PipedInputStream snk) throws IOException {
if (snk == null) {
throw new NullPointerException();
} else if (sink != null || snk.connected) {
throw new IOException("Already connected");
}
sink = snk;
snk.in = -1;
snk.out = 0;
snk.connected = true;
}
protected synchronized void receive(int b) throws IOException {
checkStateForReceive();
writeSide = Thread.currentThread();
if (in == out)
awaitSpace();
if (in < 0) {
in = 0;
out = 0;
}
buffer[in++] = (byte)(b & 0xFF);
if (in >= buffer.length) {
in = 0;
}
}
查看PipedOutputStream的源碼我們發(fā)現(xiàn)芜果,PipedOutputStream本身沒有緩沖區(qū),1024的緩沖區(qū)在輸入流中融师。PipedOutputStream的寫入方法
public void write(byte b[], int off, int len) flush() 都是調(diào)用的PipedInputStream的
synchronized void receive(byte b[], int off, int len) 方法
public synchronized void flush() throws IOException {
if (sink != null) {
synchronized (sink) {
sink.notifyAll();
}
}
}
最后我們上代碼:
public class MultiThreadPipedTest {
public static class Write extends Thread{
public PipedOutputStream pos = null;
"獲取線程中的管道輸出流"
public PipedOutputStream getPos(){
pos = new PipedOutputStream();
return pos;
}
"把數(shù)據(jù)通過管道輸出流發(fā)送出去"
public void SentData(){
PrintStream p = new PrintStream(pos);
for(int i=1;i<10;i++){
p.println("hello");
p.flush();
}
p.close();
}
@Override
public void run(){
while(true); //模擬耗時工作
}
}
public static class Read extends Thread{
public PipedInputStream pis = null;
public String line = "null";
"獲得線程中的管道輸入流"
public PipedInputStream getPis(){
pis = new PipedInputStream();
return pis;
}
"利用管道輸入流接收管道數(shù)據(jù)"
public void ReceiveData(){
BufferedReader r = new BufferedReader(new InputStreamReader(pis));
try {
while(line!=null){
line = r.readLine();
System.out.println("read: "+line);
}
r.close();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run(){
while(true); //模擬耗時工作
}
}
public static class Other_Thread extends Thread{
public PipedInputStream pis = null;
public String line = "null";
"獲得線程中的管道輸入流"
public PipedInputStream getPis(){
pis = new PipedInputStream();
return pis;
}
"利用管道輸入流接收管道數(shù)據(jù)"
public void ReceiveData(){
BufferedReader r = new BufferedReader(new InputStreamReader(pis));
try {
while(line!=null){
line = r.readLine();
System.out.println("Other thread: "+line);
}
r.close();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run(){
while(true); //模擬耗時操作
}
}
public static void main(String args[]) throws InterruptedException, IOException{
Write write = new Write();
Read read = new Read();
Other_Thread other = new Other_Thread();
"連接兩個線程的管道流 ---read和write線程"
write.getPos().connect(read.getPis());
write.start();
read.start();
other.start();
write.SentData();
read.ReceiveData();
Thread.sleep(2000);
"重新連接兩個線程的管道流 ---Other_Thread和write線程"
write.getPos().connect(other.getPis());
write.SentData();
other.ReceiveData();
}
}
管道泄漏 右钾,一個線程寫,多個線程讀取數(shù)據(jù)旱爆,本來connect應(yīng)該是1對1 的舀射。但是下面的例子,write和read本來是一對的怀伦,但是Other_Thread 竊取了脆烟,read線程的數(shù)據(jù)。
public class PipedStreamLeakTest {
public static class Write extends Thread{
public PipedOutputStream pos;
Write(PipedOutputStream pos){
this.pos = pos;
}
public void run(){
PrintStream p = new PrintStream(pos);
for(int i=1;i<1000;i++){
p.println("hello");
p.flush();
}
p.close();
}
}
public static class Read extends Thread{
public PipedInputStream pis;
public String line = "null";
Read(PipedInputStream pis){
this.pis = pis;
}
public void run(){
BufferedReader r = new BufferedReader(new InputStreamReader(pis));
try {
while(line!=null){
line = r.readLine();
System.out.println("read: "+line);
}
r.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static class Other_Thread extends Thread{
public PipedInputStream pis;
public String line = "null";
Other_Thread(PipedInputStream pis){
this.pis = pis;
}
public void run(){
BufferedReader r = new BufferedReader(new InputStreamReader(pis));
try {
while(line!=null){
line = r.readLine();
System.out.println("Other_Thread: "+line);
}
r.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String args[]) throws InterruptedException, IOException{
"創(chuàng)建管道通信流"
PipedOutputStream pos = new PipedOutputStream();
PipedInputStream pis = new PipedInputStream(pos);
new Write(pos).start();
new Read(pis).start();
new Other_Thread(pis).start();
}
}