ipc.Server 類分析
Hadoop采用了Master/Slave結(jié)構(gòu)碱呼。其中,Master通過ipc.Server接收并處理所有Slave發(fā)送的請求蔗牡,這就要求ipc.Server將高并發(fā)和可擴(kuò)展性作為設(shè)計目標(biāo)颖系。為此,ipc.Server采用了很多具有提高并發(fā)處理能力的技術(shù)辩越,主要包括線程池嘁扼、事件驅(qū)動和Reactor設(shè)計模式等。這些技術(shù)采用了JDK自帶的庫實現(xiàn)黔攒。我們先重點分析它是如何利用Reactor設(shè)計模式提高整體性能的趁啸。
01 Reactor設(shè)計模式
Reactor是并發(fā)編程中的一種基于事件驅(qū)動的設(shè)計模式强缘。它具有以下2個特點:
- 通過派發(fā)/分離IO操作時間提高系統(tǒng)的并發(fā)性能
- 提供了粗粒度的并發(fā)控制,使用單線程實現(xiàn)莲绰,避免了復(fù)雜的同步處理
一個典型的Reactor模式中主要包括以下幾個角色:
- Reactor:IO事件的派發(fā)者
- Acceptor:接收來自Client的連接欺旧,建立與Client對應(yīng)的Handler姑丑,并向Reactor注冊此Handler
- Handler:與一個Client通信的實體蛤签,并按一定的過程實現(xiàn)業(yè)務(wù)的處理。Handler內(nèi)部往往會有更進(jìn)一步的層次劃分栅哀,用來抽象諸如read震肮,decode,compute留拾,encode和send等過程戳晌。在Reactor模式中,業(yè)務(wù)邏輯被分散的IO事件所打破痴柔,所以Handler需要有適當(dāng)?shù)臋C(jī)制在所需的信息還不全(讀到一半)的時候保存上下文沦偎,并在下一次IO事件到來的時候(另一半可讀了)能繼續(xù)上次中斷的處理。
- Reader/Sender:為了加速處理速度咳蔚,Reactor模式往往構(gòu)建一個存放數(shù)據(jù)處理線程的線程池豪嚎,這樣數(shù)據(jù)讀出后,立即扔到線程池中等待后續(xù)處理即可谈火。為此侈询,Reactor模式一般分離Handler中的讀和寫兩個過程,分別注冊成單獨的讀事件和寫事件糯耍,并由對應(yīng)的Reader和Sender線程處理扔字。
ipc.Server實現(xiàn)了一個典型的Reactor設(shè)計模式,其整體架構(gòu)與上述完全一致温技。了解了Reactor的架構(gòu)之后革为,能夠幫助理解和學(xué)習(xí)ipc.Server的設(shè)計思路及實現(xiàn)。下面就分析Ipc.Server的實現(xiàn)細(xì)節(jié)舵鳞。
02 ipc.Server實現(xiàn)細(xì)節(jié)
用eclipse打開已經(jīng)編譯好的源碼震檩,找到ipc.Server,使用eclipse的quick outline查看一下該類的大致結(jié)構(gòu)系任。
源碼內(nèi)容很多恳蹲,要先找到下手的地方,通過outline可以捕獲到Server有幾個內(nèi)部類俩滥,這幾個類是什么作用這是需要關(guān)心的嘉蕾,然后就是程序的入口,這個start()方法霜旧。先看看start()做了什么错忱。
啟動服務(wù)
Server.start()
public synchronized void start() {
responder.start();
listener.start();
handlers = new Handler[handlerCount];
for (int i = 0; i < handlerCount; i++) {
handlers[i] = new Handler(i);
handlers[i].start();
}
}
start()方法啟動了幾個對象儡率,通過名稱可以知道他們是幾個內(nèi)部類的實例,那下一步就應(yīng)該分析一下每個類的作用以清。大致查看一下這幾個類儿普,發(fā)現(xiàn)都繼承自Thread類,也就是說每個類都啟動了一個新的線程掷倔,那么重點就是去考察這個幾個線程主體干了什么眉孩。
Responder、Listener和Handler
1.Responder
private class Responder extends Thread {
// 代碼.... 此處省略
@Override
public void run() {
// 代碼.... 此處省略
while (running) {
try {
writeSelector.select(PURGE_INTERVAL);
Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
try {
if (key.isValid() && key.isWritable()) {
doAsyncWrite(key);
}
} catch (IOException e) {
// 代碼.... 此處省略
}
}
}
// 代碼.... 此處省略
}
}
- Listener
private class Listener extends Thread {
// 代碼.... 此處省略
@Override
public void run() {
// 代碼.... 此處省略
while (running) {
SelectionKey key = null;
try {
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
try {
if (key.isValid()) {
if (key.isAcceptable())
doAccept(key);
}
} catch (IOException e) {
}
key = null;
}
} catch (OutOfMemoryError e) {
// 代碼.... 此處省略
} catch (Exception e) {
// 代碼.... 此處省略
}
// 代碼.... 此處省略
}
// 代碼.... 此處省略
}
- Handler
private class Handler extends Thread {
// 代碼.... 此處省略
@Override
public void run() {
// 代碼.... 此處省略
while (running) {
try {
final Call call = callQueue.take(); // pop the queue; maybe blocked here
// 代碼.... 此處省略
try {
if (call.connection.user == null) {
value = call(call.connection.protocol, call.param,
call.timestamp);
} else {
}
} catch (Throwable e) {
// 代碼.... 此處省略
}
// 代碼.... 此處省略
synchronized (call.connection.responseQueue) {
// 代碼.... 此處省略
setupResponse(buf, call,
(error == null) ? Status.SUCCESS : Status.ERROR,
value, errorClass, error);
responder.doRespond(call);
}
} catch (InterruptedException e) {
// 代碼.... 此處省略
} catch (Exception e) {
// 代碼.... 此處省略
}
}
// 代碼.... 此處省略
}
}
縱觀這幾個線程的主體勒葱,發(fā)現(xiàn)Responder和Listener的代碼很熟悉浪汪,NIO里的知識。Listener負(fù)責(zé)監(jiān)聽op_accept事件凛虽,然后調(diào)用doAccept()方法處理連接死遭;Responder負(fù)責(zé)監(jiān)聽op_write事件,然后調(diào)用doAsyncWrite()方法凯旋;Handler里只能大致知道調(diào)用了Server.call()這個抽象方法(應(yīng)該會在某個地方實現(xiàn))得到了value呀潭,然后setupResponse()把處理結(jié)果關(guān)聯(lián)到Call,再用responder.doRespond()向客戶端做出回應(yīng)至非,至于Call钠署,Connection,這也正是我們還沒有弄清楚的幾個類睡蟋;還有Call是從一個叫做callQueue的變量里拿到的踏幻,這個變量也成為了我們進(jìn)一步需要關(guān)心的地方。
目前能知道的就是:Listener是監(jiān)聽連接的戳杀,但對連接是如何處理的還需要解讀doRead()方法该面;Handler是處理業(yè)務(wù)邏輯的,起點是存放在callQueue中的Call信卡,Call又與Connection聯(lián)系密切隔缀,但這2個類的作用還未知,處理完之后調(diào)用responder.doRespond()做出回應(yīng)傍菇,不過Responder功能不僅僅如此猾瘸,還負(fù)責(zé)doAsyncWrite()。
所以丢习,接下來的任務(wù)是分析一下Call類牵触、Connection類、callQueue變量咐低、doAccept()方法揽思、doRespond()方法和doAsyncWrite()方法。
Call见擦、Connection钉汗、callQueue羹令、doAccept()、doRespond()损痰、doAsyncWrite()
- Call類
/** A call queued for handling. */
private static class Call {
private int id; // the client's call id
private Writable param; // the parameter passed
private Connection connection; // connection to client
private long timestamp; // the time received when response is null
// the time served when response is not null
private ByteBuffer response; // the response for this call
public Call(int id, Writable param, Connection connection) {
this.id = id;
this.param = param;
this.connection = connection;
this.timestamp = System.currentTimeMillis();
this.response = null;
}
@Override
public String toString() {
return param.toString() + " from " + connection.toString();
}
public void setResponse(ByteBuffer response) {
this.response = response;
}
}
- Connection類
/** Reads calls from a connection and queues them for handling. */
public class Connection {
// 代碼.... 此處省略
public Connection(SelectionKey key, SocketChannel channel,
long lastContact) {
this.channel = channel;
this.lastContact = lastContact;
this.data = null;
this.dataLengthBuffer = ByteBuffer.allocate(4);
this.unwrappedData = null;
this.unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
this.socket = channel.socket();
this.addr = socket.getInetAddress();
if (addr == null) {
this.hostAddress = "*Unknown*";
} else {
this.hostAddress = addr.getHostAddress();
}
this.remotePort = socket.getPort();
this.responseQueue = new LinkedList<Call>();
if (socketSendBufferSize != 0) {
try {
socket.setSendBufferSize(socketSendBufferSize);
} catch (IOException e) {
}
}
}
}
Call類的代碼比較少福侈,聯(lián)系RPC的目的,可以分析出這個類是對RPC請求的封裝卢未,有傳遞的參數(shù)param肪凛,還有連接客戶端的Connection,以及處理的結(jié)果response尝丐。而Connection類的成員變量多显拜,方法也多,所以觀察一下構(gòu)造器爹袁,留意到變量responseQueue,應(yīng)該是用來存放經(jīng)過handle之后的Call矮固。
- callQueue變量
public abstract class Server {
//省略代碼
private BlockingQueue<Call> callQueue;
//省略代碼
}
callQueue是一個全局變量失息,專門用來存放封裝請求的Call。call從哪生產(chǎn)档址,又是被誰消費呢盹兢。使用eclipse的Call Hierarchy查看一下調(diào)用層次。
依次查看可以發(fā)現(xiàn)在Connection的processData()方法里面出現(xiàn)了
private void processData(byte[] buf) throws IOException, InterruptedException {
//省略代碼
Call call = new Call(id, param, this);
callQueue.put(call);
//省略代碼
}
Call將一些參數(shù)封裝守伸,并放入隊列callQueue中绎秒。這些參數(shù)是從字節(jié)數(shù)組buf里讀到的,所以繼續(xù)往上找:
終于找到了我們認(rèn)識的Listener尼摹,點開doRead()方法见芹。
void doRead(SelectionKey key) throws InterruptedException {
int count = 0;
Connection c = (Connection)key.attachment();
//代碼....此處省略
try {
count = c.readAndProcess();
} catch (InterruptedException ieo) {
//代碼....此處省略
}
//代碼....此處省略
}
NIO里的SelectionKey對象,doRead()方法中將Connection從SelectionKey中取出蠢涝,然后通過Connection的readAndRrocess()方法封裝call玄呛,也就是doRead()中生產(chǎn)了Call,并存放在callQueue中和二。
public int readAndProcess() throws IOException, InterruptedException {
//代碼....此處省略
count = channelRead(channel, data);
//代碼....此處省略
if (useSasl) {
saslReadAndProcess(data.array());
} else {
processOneRpc(data.array());
}
}
readAndProcess()是從channel中讀取傳遞過來的字節(jié)徘铝,然后從里拿到封裝Call需要的那些參數(shù),至于具體的細(xì)節(jié)就不再鉆了惯吕。
- doAccept()
void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
Connection c = null;
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel;
while ((channel = server.accept()) != null) {
channel.configureBlocking(false);
channel.socket().setTcpNoDelay(tcpNoDelay);
Reader reader = getReader();
try {
reader.startAdd();
SelectionKey readKey = reader.registerChannel(channel);
c = new Connection(readKey, channel, System.currentTimeMillis());
readKey.attach(c);
synchronized (connectionList) {
connectionList.add(numConnections, c);
numConnections++;
}
//代碼....此處省略
} finally {
reader.finishAdd();
}
}
}
doAccept()中生產(chǎn)了Connection并attach到SelectionKey對象中惕它。 這里涉及到一個新的類Reader,我們看看Reader是干什么用的废登。
private class Reader implements Runnable {
private volatile boolean adding = false;
private Selector readSelector = null;
Reader(Selector readSelector) {
this.readSelector = readSelector;
}
public void run() {
LOG.info("Starting SocketReader");
synchronized (this) {
while (running) {
SelectionKey key = null;
try {
readSelector.select();
while (adding) {
this.wait(1000);
}
Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
if (key.isValid()) {
if (key.isReadable()) {
doRead(key);
}
}
key = null;
}
} catch (InterruptedException e) {
//代碼....此處省略
} catch (IOException ex) {
//代碼....此處省略
}
}
}
}
}
Reader繼承自Thread淹魄,那么就要搞清楚是在哪里啟動的線程。調(diào)用Call Hierarchy查看钳宪,發(fā)現(xiàn)是在Listener初始化的時候啟動的揭北,代碼如下:
public Listener() throws IOException {
//代碼....此處省略
for (int i = 0; i < readThreads; i++) {
Selector readSelector = Selector.open();
Reader reader = new Reader(readSelector);
readers[i] = reader;
readPool.execute(reader);
}
//代碼....此處省略
}
Reader的線程體主要是通過doRead()在解析請求扳炬,從上面我們知道了doRead()內(nèi)部是使用Connection.readAndProcess()來解析的。
- doRespond()
void doRespond(Call call) throws IOException {
synchronized (call.connection.responseQueue) {
call.connection.responseQueue.addLast(call);
if (call.connection.responseQueue.size() == 1) {
processResponse(call.connection.responseQueue, true);
}
}
}
doRespond()調(diào)用了processResponse():
private boolean processResponse(LinkedList<Call> responseQueue,
boolean inHandler) throws IOException {
//代碼....此處省略
try {
synchronized (responseQueue) {
//代碼....此處省略
call = responseQueue.removeFirst();
SocketChannel channel = call.connection.channel;
//代碼....此處省略
//
// Send as much data as we can in the non-blocking fashion
//
int numBytes = channelWrite(channel, call.response);
if (!call.response.hasRemaining()) {
if (inHandler) {
//代碼....此處省略
try {
// Wakeup the thread blocked on select, only then can the call
// to channel.register() complete.
writeSelector.wakeup();
channel.register(writeSelector, SelectionKey.OP_WRITE, call);
} catch (ClosedChannelException e) {
//Its ok. channel might be closed else where.
done = true;
} finally {
//代碼....此處省略
}
}
} finally {
//代碼....此處省略
}
}
return done;
}
channelWrite(channel, call.response)把處理的結(jié)果返回給客戶端搔体,“Send as much data as we can in the non-blocking fashion”恨樟,如果有剩余的data就會注冊寫事件:
channel.register(writeSelector, SelectionKey.OP_WRITE, call),也就會調(diào)用doAysnWrite()去處理剩下的數(shù)據(jù)疚俱。
到這里大概的原理就清楚了劝术,畫一個不科學(xué)的示意圖(但比較直觀哈),如下:
總結(jié)一下就是:
(1)接收請求
該階段主要任務(wù)是接收來自各個客戶端的RPC請求呆奕,并將它們封裝成固定的格式(Call類)放到一個共享隊列(callQueue)中养晋,以便進(jìn)行后續(xù)處理。該階段內(nèi)部又分為建立連接和接收請求兩個子階段梁钾,分別由Listener和Reader兩種線程完成绳泉。整個Server只有一個Listener線程,統(tǒng)一負(fù)責(zé)監(jiān)聽來自客戶端的連接請求姆泻,一旦有新的請求到達(dá)零酪,它會采用輪詢的方式從線程池中選擇一個Reader線程進(jìn)行處理,而Reader線程可同時存在多個拇勃,它們分別負(fù)責(zé)接收一部分客戶端連接的RPC請求四苇,至于每個Reader線程負(fù)責(zé)哪些客戶端連接,完全由Listener決定方咆,當(dāng)前Listener只是采用了簡單的輪詢分配機(jī)制月腋。
(2)處理請求
該階段主要任務(wù)是從共享隊列callQueue中獲取Call對象,執(zhí)行對應(yīng)的函數(shù)調(diào)用瓣赂,并將結(jié)果返回給客戶端榆骚,這全部由Handler線程完成钩述。
Server端可同時存在多個Handler線程寨躁,它們并行從共享隊列中讀取Call對象牙勘,經(jīng)執(zhí)行對應(yīng)的函數(shù)調(diào)用后,將嘗試著直接將結(jié)果返回給對應(yīng)的客戶端方面。但考慮到某些函數(shù)調(diào)用返回結(jié)果很大或者網(wǎng)絡(luò)速度過慢放钦,可能難以將結(jié)果一次性發(fā)送到客戶端,此時Handler將嘗試著將后續(xù)發(fā)送任務(wù)交給Responder線程恭金。
(3)返回結(jié)果
前面提到,每個Handler線程執(zhí)行完函數(shù)調(diào)用后横腿,會嘗試著將執(zhí)行結(jié)果返回給客戶端颓屑,但對于特殊情況,比如函數(shù)調(diào)用返回結(jié)果過大或者網(wǎng)絡(luò)異常情況(網(wǎng)速過慢)揪惦,會將發(fā)送任務(wù)交給Responder線程遍搞。
Server端僅存在一個Responder線程,它的內(nèi)部包含一個Selector對象器腋,用于監(jiān)聽SelectionKey.OP_WRITE事件溪猿。當(dāng)Handler沒能將結(jié)果一次性發(fā)送到客戶端時纫塌,會向該Selector對象注冊SelectionKey.OP_WRITE事件,進(jìn)而由Responder線程采用異步方式繼續(xù)發(fā)送未發(fā)送完成的結(jié)果措左。