初看Tigase的packet內(nèi)部流轉(zhuǎn)機(jī)制有巧。Tigase通過(guò)tigase.io包當(dāng)中的代碼讀取網(wǎng)絡(luò)中的字節(jié)數(shù)組扭屁,然后通過(guò)tigase.net包當(dāng)中的類(lèi)把字節(jié)數(shù)組轉(zhuǎn)換為字符西剥,最后通過(guò)tigase.xml包當(dāng)中的XML解析器把這些字符轉(zhuǎn)換成XML DOM對(duì)象。
圖片流程說(shuō)明
看tigase源碼你會(huì)發(fā)現(xiàn)所有的tigase處理都是基于多線程削葱,每個(gè)component都有自己的in和out處理線程,線程間的數(shù)據(jù)傳輸通過(guò)queue
總的流程大致就是:
文字流程說(shuō)明
下面是請(qǐng)求和響應(yīng)的步驟說(shuō)明(只列出了關(guān)鍵的步驟)模软。
A. 從client到server的過(guò)程(請(qǐng)求-Request)
ClientConnectionManager和MessageRouter都間接或直接繼承了AbstractMessageReceiver躺涝。
tigase.server.AbstractMessageReceiver
– 它已經(jīng)實(shí)現(xiàn)了四個(gè)接口:ServerComponent厨钻,MessageReceiver,Configurable和StatisticsContainer坚嗜。
它通過(guò)自己的多個(gè)線程來(lái)管理內(nèi)部數(shù)據(jù)隊(duì)列夯膀,且能避免死鎖。
它使用事件驅(qū)動(dòng)的方式來(lái)處理數(shù)據(jù)惶傻,當(dāng)packet被發(fā)送到AbstractMessageReceiver實(shí)例的abstract void processPacket(Packet packet)方法時(shí)棍郎,就立即啟動(dòng)了packet的處理工作。當(dāng)然你還是需要實(shí)現(xiàn)抽象類(lèi)當(dāng)中的抽象方法银室,如果你還希望輸出packet數(shù)據(jù)(例如當(dāng)它收到請(qǐng)求時(shí)還需要發(fā)送響應(yīng))涂佃,可以調(diào)用boolean addOutPacket(Packet packet)方法。
a. XMPPIOService負(fù)責(zé)接收客戶(hù)端報(bào)文并轉(zhuǎn)換為對(duì)應(yīng)的packet放入隊(duì)列receivedPackets(相當(dāng)于in_queues)中,而SocketThread負(fù)責(zé)創(chuàng)建ReadThread和WriteThread線程蜈敢,
ResultsListener則是SocketThread的一個(gè)內(nèi)部類(lèi)辜荠。ResultsListener負(fù)責(zé)調(diào)度socketReadThread()和
socketWriteThread(), Client2Server讀取和寫(xiě)入數(shù)據(jù)包的IO操作主要由XMPPIOService來(lái)完XMPPIOService實(shí)例則在read和write線程中被使用。
b. ClientConnectionManager則負(fù)責(zé)從XMPPIOService獲取receivedPackets抓狭,并對(duì)收到的報(bào)文進(jìn)行處理伯病,并將處理后的報(bào)文放入MessageRouter的out_queues隊(duì)列中。
c.MessageRouter重寫(xiě)了AbstractMessageReceiver的processPacket方法否过,在該方法中MessageRouter通過(guò)packet.getTo()得到組件的名稱(chēng)并轉(zhuǎn)發(fā)packet到該組件的in_queues隊(duì)列中午笛,該目標(biāo)服務(wù)組件(如ses-man,即SessionManager)可能在本機(jī)服務(wù)器上,也可能在該域(domain)集群的其他服務(wù)器上運(yùn)行MessageRouter的查找順序是先查本機(jī)服務(wù)器苗桂,找不到的話再去集群中查找药磺;如果找到則將packet轉(zhuǎn)發(fā)給目標(biāo)組件處理,如果沒(méi)找到則返回沒(méi)有找到目標(biāo)組件的錯(cuò)誤消息煤伟。
d. 我們的組件一般都有自己的in_queues和out_queues. 當(dāng)前組件的in_queues的數(shù)據(jù)來(lái)自于上一組件的out_queues癌佩。
int queueIdx = Math.abs(hashCodeForPacket(packet) % in_queues_size*);
boolean result = in_queues.get(queueIdx).offer(packet, packet.getPriority().ordinal());
//這里的packet數(shù)據(jù)來(lái)自于packetFrom="xxx",即傳給當(dāng)前組件的上一組件木缝。
B.從Service到Client的過(guò)程(響應(yīng)-Response)
服務(wù)端響應(yīng)的數(shù)據(jù)也是放到out_queues中的,各組件的對(duì)應(yīng)的線程會(huì)對(duì)out_queues中的packet的to屬性做解析围辙,并將消息轉(zhuǎn)發(fā)到指定目標(biāo)我碟。其實(shí)消息的流轉(zhuǎn)傳遞機(jī)制實(shí)現(xiàn)的核心就是packet的 from包含packetFrom和stanzaFrom) 和 to(packetTo和stanzaTo)屬性,路由的路徑會(huì)默認(rèn)先取packetFrom或packetTo姚建,其次再去取stanzaFrom和stanzaTo矫俺。
客戶(hù)端發(fā)送一條ping命令
<iq type='get' id='purplee4ad721'>
<ping xmlns='urn:xmpp:ping'/>
</iq>
IOService.java
public abstract class IOService<RefObject>
implements Callable<IOService<?>>, TLSEventHandler,
IOListener {
/**接收?qǐng)?bào)文后call->處理客戶(hù)端報(bào)文數(shù)據(jù)信息*/
@Override
public IOService<?> call() throws IOException {
writeData(null);
boolean readLock = true;
if (stopping) {
stop();
} else {
readLock = readInProgress.tryLock();
if (readLock) {
try {
processSocketData();
if ((receivedPackets() > 0) && (serviceListener != null)) {
serviceListener.packetsReady(this);
} // end of if (receivedPackets.size() > 0)
} finally {
readInProgress.unlock();
if (!isConnected()) {
// added to sooner detect disconnection of peer - ie. client
if (log.isLoggable(Level.FINEST)) {
log.log(Level.FINEST, "{0}, stopping connection due to the fact that it was disconnected, forceStop()", toString());
}
forceStop();
}
}
}
}
return readLock
? this
: null;
}
/**
* Describe
* <code>writeData</code> method here.
* 這里是最后響應(yīng)消息給客戶(hù)端的方法,寫(xiě)入TCP連接中的Socket
* @param data a
* <code>String</code> value
*/
protected void writeData(final String data) {
返回消息前還調(diào)用了下面兩個(gè)類(lèi)的對(duì)應(yīng)write方法
SocketIO.java
@Override
public int write(final ByteBuffer buff) throws IOException {
}
TLSIO.java
public class TLSIO implements IOInterface {
private int writeBuff(ByteBuffer buff) throws IOException {
}
}
IOService打包好了消息之后形成響應(yīng)的packet包桥胞,->然后再執(zhí)行到ConnectionManager的writePacketToSocket()方法->再執(zhí)行到ClientConnectionManager的processPacket方法恳守。
至此本次消息的請(qǐng)求和響應(yīng)結(jié)束!server會(huì)繼續(xù)通過(guò)AbstractMessageReceiver獲取下一條消息進(jìn)行處理贩虾,如此循環(huán)催烘。。缎罢。
tigase.server.ServerComponent – 這是一個(gè)非骋寥海基本的component接口。所有的component都必須實(shí)現(xiàn)接口中定義的方法策精。
tigase.server.MessageReceiver – 這個(gè)接口extends ServerComponent舰始,所有希望接收數(shù)據(jù)packets的Component都需要實(shí)現(xiàn)接口中定義的方法,比如session manager和c2s connection manager咽袜。
tigase.conf.Configurable – 如果希望components可以被配置丸卷,則需要實(shí)現(xiàn)這個(gè)接口,所有默認(rèn)定義基本都在這。在運(yùn)行時(shí)询刹,配置信息會(huì)被推送到這種類(lèi)型的對(duì)象谜嫉。components必須能夠在運(yùn)行時(shí)對(duì)變更的配置項(xiàng)進(jìn)行處理,這一點(diǎn)在實(shí)現(xiàn)時(shí)要留神凹联。tigase.disco.XMPPService – 實(shí)現(xiàn)了這個(gè)對(duì)象的類(lèi)可以對(duì)“ServiceDiscovery”請(qǐng)求做出響應(yīng)沐兰。
tigase.stats.StatisticsContainer – 實(shí)現(xiàn)了這個(gè)對(duì)象的類(lèi)可以返回運(yùn)行時(shí)的統(tǒng)計(jì)信息。任何一個(gè)對(duì)象都可以實(shí)現(xiàn)這個(gè)接口用來(lái)收集統(tǒng)計(jì)信息
tigase.server.AbstractMessageReceiver – 它已經(jīng)實(shí)現(xiàn)了四個(gè)接口:ServerComponent蔽挠,MessageReceiver住闯,Configurable和StatisticsContainer。它通過(guò)自己的多個(gè)線程來(lái)管理內(nèi)部數(shù)據(jù)隊(duì)列澳淑,且能避免死鎖比原。
它使用事件驅(qū)動(dòng)的方式來(lái)處理數(shù)據(jù),當(dāng)packet被發(fā)送到AbstractMessageReceiver實(shí)例的abstract void processPacket(Packet packet)方法時(shí)杠巡,就立即啟動(dòng)了packet的處理工作春寿。當(dāng)然你還是需要實(shí)現(xiàn)抽象類(lèi)當(dāng)中的抽象方法,如果你還希望輸出packet數(shù)據(jù)(例如當(dāng)它收到請(qǐng)求時(shí)還需要發(fā)送響應(yīng))忽孽,可以調(diào)用boolean addOutPacket(Packet packet)方法。tigase.server.ConnectionManager – 這是一個(gè)extend AbstractMessageReceiver的抽象類(lèi)。正如其名兄一,這個(gè)類(lèi)專(zhuān)注于對(duì)連接進(jìn)行管理工作厘线。如果你的組件需要通過(guò)網(wǎng)絡(luò)直接發(fā)送或接受數(shù)據(jù)(比如c2s connection,s2s connection 或者 連接到外部第三方j(luò)abber服務(wù))出革,你可以把它作為基類(lèi)進(jìn)行擴(kuò)展造壮。它會(huì)幫你把所有和網(wǎng)絡(luò)有關(guān)的工作都打理好(例如io,重連骂束,socket監(jiān)聽(tīng)和連接握手等工作)耳璧。
如果你extend這個(gè)類(lèi),你需要知道數(shù)據(jù)來(lái)源于哪里:如果來(lái)源于MessageRouter展箱,那么abstract void processPacket(Packet packet)方法會(huì)被調(diào)用; 如果來(lái)源于網(wǎng)絡(luò)連接旨枯,那么abstract Queue processSocketData(XMPPIOService serv)方法會(huì)被調(diào)用。
網(wǎng)絡(luò)
connectionManager同時(shí)協(xié)調(diào)ConnectionOpenThread與SocketThread混驰。
ConnectionOpenThread脫離上述組件攀隔,屬于網(wǎng)絡(luò)層實(shí)現(xiàn),操作selector栖榨。它負(fù)責(zé)Selector.open昆汹。
IOService提供線程安全的call方法,XMPPIOService繼承它婴栽,保存了連接信息满粗,每個(gè)連接一個(gè)IOService。
SocketThread在實(shí)例化時(shí)愚争,會(huì)啟動(dòng)多個(gè)線程映皆,同時(shí)盯住selector。負(fù)責(zé)將每個(gè)確定的IOService進(jìn)行數(shù)據(jù)處理准脂。
實(shí)現(xiàn)ConnectionOpenListener接口accept方法接收SocketChannel劫扒,組裝IOService,交由SocketThread處理狸膏。
ConnectionManager用ConcurrentHashMap記錄了所有的連接沟饥。
零碎
AbstractMessageReceiver.addPacket 往自己的in_queue里加數(shù)據(jù),是阻塞的湾戳,如果滿(mǎn)了會(huì)出事贤旷。
AbstractMessageReceiver.addPacketNB 往自己的in_queue里加數(shù)據(jù),非阻塞的砾脑,和上一個(gè)的區(qū)別在于幼驶,一個(gè)是put一個(gè)是offer到queue。
AbstractMessageReceiver.addPackets 來(lái)一堆數(shù)據(jù)韧衣。
所有in_queue里的數(shù)據(jù)盅藻,會(huì)被processPacket方法所處理购桑。
對(duì)應(yīng)有addOutPacket。
所有out_queue里的數(shù)據(jù)氏淑,都默認(rèn)扔給parent的in_queue勃蜘,沒(méi)有parent就扔到自己的in_queue。
所有in_queue的數(shù)據(jù)假残,都由processPacket具體的實(shí)現(xiàn)來(lái)處理缭贡。
企業(yè)級(jí)獨(dú)立部署應(yīng)用:知行辦公http://zx.naton.cn
【總監(jiān)】十二春秋之,3483099@qq.com辉懒;
【Master】zelo阳惹,616701261@qq.com;
【運(yùn)營(yíng)】運(yùn)維艄公眶俩,897221533@qq.com莹汤;
【產(chǎn)品設(shè)計(jì)】流浪貓,364994559@qq.com仿便;
【體驗(yàn)設(shè)計(jì)】兜兜体啰,2435632247@qq.com;
【iOS】淘碼小工嗽仪,492395860@qq.com荒勇;iMcG33K,imcg33k@gmail.com闻坚;
【Android】人猿居士沽翔,1059604515@qq.com;思路的頓悟窿凤,1217022114@qq.com仅偎;
【java】首席工程師MR_W,feixue300@qq.com雳殊;
【測(cè)試】土鏡問(wèn)道橘沥,847071279@qq.com;
【數(shù)據(jù)】fox009521夯秃,42151960@qq.com座咆;
【安全】保密,你懂的仓洼。