基于Netty的IM簡單實(shí)現(xiàn)原理

最近在開發(fā)MobIM臼予,實(shí)現(xiàn)了消息傳輸和群等功能的IM功能鸣戴。SDK功能包小,而功能全面粘拾≌可以與原來的系統(tǒng)進(jìn)行無縫整合。

自己抽空也實(shí)現(xiàn)了一套IM Server和IMClient的業(yè)務(wù)通信模式缰雇。沒有實(shí)現(xiàn)復(fù)雜的UI界面入偷,實(shí)現(xiàn)簡單的登錄注冊(cè),發(fā)消息械哟,收消息疏之。服務(wù)器端與客戶端都使用Netty通信。

Netty基于非阻塞(nio)暇咆,事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用程序框架和工具锋爪。

通過Netty面對(duì)大規(guī)模的并發(fā)請(qǐng)求可以處理的得心用手。用來替代原來的bio網(wǎng)絡(luò)應(yīng)用請(qǐng)求框架爸业。


BIO通信即平時(shí)使用的基于Socket其骄,ServerSocket的InputStream和OutStream。

Netty神奇的地方在于是否是阻塞的扯旷。

while(true){

//主線程死循環(huán)等待新連接到來

?Socket socket = serverSocket.accept();

//為新的連接創(chuàng)建新的線程,客戶端與服務(wù)器上的線程數(shù)1:1

?executor.submit(new ConnectIOnHandler(socket));

在BIO模型中拯爽,服務(wù)器通過ServerSocket來開啟監(jiān)聽,每當(dāng)有請(qǐng)求的時(shí)候開啟一個(gè)線程來接受處理和維持狀態(tài)钧忽。這種思想在低并發(fā)毯炮,小吞吐的應(yīng)用還可以應(yīng)付逼肯,一旦遇到大并發(fā),大吞吐的請(qǐng)求否副,必然歇菜汉矿。線程和客戶端保持著1:1的對(duì)應(yīng)關(guān)系,維持著線程备禀。維持那么的多的線程洲拇,JVM必然不堪重負(fù),服務(wù)器必然崩潰曲尸,宕機(jī)赋续。

而在非阻塞的Netty中,卻可以應(yīng)付自如另患。從容應(yīng)對(duì)纽乱。Tomcat就是基于BIO的網(wǎng)絡(luò)通信模式(Tomcat可以通過一定配置,改成非阻塞模式)昆箕,而JBoss卻是基于非阻塞的NIO實(shí)現(xiàn)鸦列。

NIO的網(wǎng)絡(luò)通信模式很強(qiáng)勁,但是上手卻一點(diǎn)都不容易鹏倘。其中解決和牽扯到好多網(wǎng)絡(luò)問題薯嗤。如:網(wǎng)絡(luò)延時(shí),TCP的粘包/拆包纤泵,網(wǎng)絡(luò)故障等一堆一堆的問題骆姐。而Netty呢,針對(duì)nio復(fù)雜的編程難題而進(jìn)行一系列的封裝實(shí)現(xiàn)捏题,提供給廣大開發(fā)者一套開源簡單玻褪,方便使用的API類庫,甚至青出于藍(lán)而勝于藍(lán)公荧,甚至幾乎完美的解決CPU突然飆升到100%的bug :http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6403933(其實(shí)也沒有真正的解決带射,只是把復(fù)現(xiàn)的概率降到了最低而已)。

用Netty來實(shí)現(xiàn)IM實(shí)在太合適了循狰∮褂眨可以在最短的時(shí)間里整出一套思路清晰,架構(gòu)簡明的IM通信底層模型晤揣。提下需求,底層用JSON 字符串String進(jìn)行通信朱灿,對(duì)象通過JSON序列化成JSON String昧识。收到JSON數(shù)據(jù)后再反序列化成對(duì)象。

首先盗扒,我們先看服務(wù)器是怎么實(shí)現(xiàn)的跪楞。

private static final StringDecoder DECODER = new StringDecoder();

private static final StringEncoder ENCODER = new StringEncoder();

...

//boss線程監(jiān)聽端口缀去,worker線程負(fù)責(zé)數(shù)據(jù)讀寫

bossGroup = new NioEventLoopGroup(1);

workerGroup = new NioEventLoopGroup();

//輔助啟動(dòng)類

ServerBootstrap bootstrap = new ServerBootstrap();

try {

//設(shè)置線程池

bootstrap.group(bossGroup, workerGroup);

//設(shè)置socket工廠

bootstrap.channel(NioServerSocketChannel.class);

bootstrap.handler(new LoggingHandler(LogLevel.INFO));

//設(shè)置管道工廠

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

@Override

protected void initChannel(SocketChannel socketChannel) throws Exception {

//獲取管道

ChannelPipeline pipe = socketChannel.pipeline();

// Add the text line codec combination first,

pipe.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));

? ? ? ?// the encoder and decoder are static as these are sharable

//字符串編碼器

pipe.addLast(DECODER);

//字符串解碼器

pipe.addLast(ENCODER);

//業(yè)務(wù)處理類

pipe.addLast(new IMServerHandle());

}

});

//綁定端口

// Bind and start to accept incoming connections.

ChannelFuture f = bootstrap.bind(port).sync();

if (f.isSuccess()) {

Log.debug("server start success... port: " + port + ", main work thread: "

+ Thread.currentThread().getId());

}

////等待服務(wù)端監(jiān)聽端口關(guān)閉

// Wait until the server socket is closed.

f.channel().closeFuture().sync();

} finally {

//優(yōu)雅退出,釋放線程池資源

bossGroup.shutdownGracefully();

workerGroup.shutdownGracefully();

}

??以上是Netty服務(wù)器啟動(dòng)的代碼甸祭。其中需要注意childHandler方法缕碎。需要把我們要添加的業(yè)務(wù)處理handler來添加到這里。通過ChannelPipeline 添加ChannelHandler池户。而處理字符串的就在IMServerHandle里實(shí)現(xiàn)咏雌。IMServerHandle繼承了SimpleChannelInboundHandler類。其中泛型T就是要轉(zhuǎn)換成的對(duì)象校焦∩薅叮客戶端與服務(wù)器端通信是本質(zhì)上通過字節(jié)碼byte[]通信的,而通過StringDecoder 和StringEncoder工具類對(duì)byte[]進(jìn)行轉(zhuǎn)換寨典,在IMServerHandle中獲取到String進(jìn)行處理即可氛雪。

看下IMServerHandle的實(shí)現(xiàn)方式。

/***

?* 面向IM通信操作的業(yè)務(wù)類

?* @author xhj

?*

?*/

public class IMServerHandle extends SimpleChannelInboundHandler<String> {

/**

* user操作業(yè)務(wù)類

*/

private UserBiz userBiz = new UserBiz();

/***

* 消息操作的業(yè)務(wù)類

*/

private IMMessageBiz immessagebiz = new IMMessageBiz();

/***

* 處理接受到的String類型的JSON數(shù)據(jù)

*/

@Override

protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {

System.out.println(" get msg >> "+msg);

//把JSON數(shù)據(jù)進(jìn)行反序列化

? Request req = JSON.parseObject(msg, Request.class);

? Response respon = new Response();

? respon.setSendTime(System.currentTimeMillis());

? //判斷是否是合法的請(qǐng)求

? if(req != null ) {

? System.out.println("the req method >> "+req.getMethod());

? //獲取操作類型

? if(req.getMethod() == IMProtocol.LOGIN) {

? //獲取要操作的對(duì)象

? User user = JSON.parseObject(req.getBody(),User.class);

? //設(shè)置返回?cái)?shù)據(jù)的操作類型

? respon.setMethod(IMProtocol.LOGIN);

? //執(zhí)行業(yè)務(wù)操作

? boolean bl = userBiz.login(user);

? if(bl) {//檢驗(yàn)用戶有效

? //設(shè)置響應(yīng)數(shù)據(jù)

? respon.setBody("login ok");

? //設(shè)置狀態(tài)

? respon.setStatus(0);

? //登錄成功將連接channel保存到Groups里

? ChannelGroups.add(ctx.channel());

? //將用戶的uname和channelId進(jìn)行綁定耸成,服務(wù)器向指定用戶發(fā)送消息的時(shí)候需要用到uname和channelId

? ChannelGroups.putUser(user.getUname(), ctx.channel().id());

? //發(fā)送廣播通知某人登錄成功了

? userBiz.freshUserLoginStatus(user);

? } else {//用戶密碼錯(cuò)誤

? //設(shè)置錯(cuò)誤描述

? respon.setErrorStr("pwd-error");

? //設(shè)置狀態(tài)描述碼

? respon.setStatus(-1);

? }

? //將Response序列化為json字符串

? msg = JSON.toJSONString(respon);

? //發(fā)送josn字符串?dāng)?shù)據(jù)报亩,注意后面一定要加"\r\n"

? ctx.writeAndFlush(msg+"\r\n");

? } else if(req.getMethod() == IMProtocol.SEND) {

? IMMessage immsg = JSON.parseObject(req.getBody(), IMMessage.class);

? immsg.setSendTime(System.currentTimeMillis()); c

通過IMServerHandle可以十分方便的處理獲取到的String字符串。處理完后井氢,可以直接通過ChannelHandlerContext的writeAndFlush方法發(fā)送數(shù)據(jù)弦追。

再看下Netty客戶端如何實(shí)現(xiàn)。

private BlockingQueue<Request> requests = new LinkedBlockingQueue<>();


? ?/**

? ? * String字符串解碼器

? ? */

private static final StringDecoder DECODER = new StringDecoder();


? ?/***

? ? * String字符串編碼器

? ? */

private static final StringEncoder ENCODER = new StringEncoder();


? ?/**

? ? * 客戶端業(yè)務(wù)處理Handler

? ? */

? ?private IMClientHandler clientHandler ;


? ?/**

? ? * 添加發(fā)送請(qǐng)求Request

? ? * @param request

? ? */

? ?public void addRequest(Request request) {

? ? ? ?try {

? ? ? ? ? ?requests.put(request);

? ? ? ?} catch (InterruptedException e) {

? ? ? ? ? ?e.printStackTrace();

? ? ? ?}

? ?}


? ?/**

? ? * 是否繼續(xù)進(jìn)行運(yùn)行

? ? */

? ?private boolean run = true;


? ?public void run() {

? ? ? ?//遠(yuǎn)程IP

? ? ? ?String host = "172.20.10.7";

? ? ? ?//端口號(hào)

? ? ? ?int port = 10000;

? ? ? ?//工作線程

? ? ? ?EventLoopGroup workerGroup = new NioEventLoopGroup();

? ? ? ?try {

? ? ? ? ? ?//輔助啟動(dòng)類

? ? ? ? ? ?Bootstrap b = new Bootstrap(); // (1)

? ? ? ? ? ?//設(shè)置線程池

? ? ? ? ? ?b.group(workerGroup); // (2)

? ? ? ? ? ?//設(shè)置socket工廠 不是ServerSocket而是Socket

? ? ? ? ? ?b.channel(NioSocketChannel.class); // (3)

? ? ? ? ? ?b.handler(new LoggingHandler(LogLevel.INFO));

? ? ? ? ? ?//設(shè)置管道工廠

? ? ? ? ? ?b.handler(new ChannelInitializer<SocketChannel>() {

? ? ? ? ? ? ? ?public void initChannel(SocketChannel ch) throws Exception {

? ? ? ? ? ? ? ? ? ?ChannelPipeline pipe = ch.pipeline();

? ? ? ? ? ? ? ? ? ?// Add the text line codec combination first,

? ? ? ? ? ? ? ? ? ?pipe.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));

? ? ? ? ? ? ? ? ? ?// the encoder and decoder are static as these are sharable

? ? ? ? ? ? ? ? ? ?//字符串解碼器

? ? ? ? ? ? ? ? ? ?pipe.addLast(DECODER);

? ? ? ? ? ? ? ? ? ?//字符串編碼器

? ? ? ? ? ? ? ? ? ?pipe.addLast(ENCODER);

? ? ? ? ? ? ? ? ? ?clientHandler = new IMClientHandler();

? ? ? ? ? ? ? ? ? ?//IM業(yè)務(wù)處理類

? ? ? ? ? ? ? ? ? ?pipe.addLast(clientHandler);

? ? ? ? ? ? ? ?}

? ? ? ? ? ?});


? ? ? ? ? ?// Start the client.

? ? ? ? ? ?ChannelFuture f = b.connect(host, port).sync(); // (5)


? ? ? ? ? ?Channel ch = f.channel();

? ? ? ? ? ?ChannelFuture lastWriteFuture = null;

? ? ? ? ? ?while(run) {

? ? ? ? ? ? ? ?//將要發(fā)送的Request轉(zhuǎn)化為JSON String類型

? ? ? ? ? ? ? ?String line = JSON.toJSONString(requests.take());

? ? ? ? ? ? ? ?if(line != null && line.length() > 0) {//判斷非空

? ? ? ? ? ? ? ? ? ?// Sends the received line to the server.

? ? ? ? ? ? ? ? ? ?//發(fā)送數(shù)據(jù)到服務(wù)器

? ? ? ? ? ? ? ? ? ?lastWriteFuture = ch.writeAndFlush(line + "\r\n");

? ? ? ? ? ? ? ?}

? ? ? ? ? ?}

? ? ? ? ? ?// Wait until all messages are flushed before closing the channel.

? ? ? ? ? ?//關(guān)閉寫的端口

? ? ? ? ? ?if (lastWriteFuture != null) {

? ? ? ? ? ? ? ?lastWriteFuture.sync();

? ? ? ? ? ?}

? ? ? ?} catch(Exception ex){

? ? ? ? ? ?ex.printStackTrace();

? ? ? ?} finally {

? ? ? ? ? ?//優(yōu)雅的關(guān)閉工作線程

? ? ? ? ? ?workerGroup.shutdownGracefully();

? ? ? ?}

? ?}


? ?/**

? ? * 增加消息監(jiān)聽接受接口

? ? * @param messgeReceivedListener

? ? */

? ?public void addMessgeReceivedListener(MessageSender.MessgeReceivedListener messgeReceivedListener) {

? ? ? ?clientHandler.addMessgeReceivedListener(messgeReceivedListener);

? ?}


? ?/***

? ? * ?移除消息監(jiān)聽接口

? ? * @param messgeReceivedListener

? ? */

? ?public void remove(MessageSender.MessgeReceivedListener messgeReceivedListener) {

? ? ? ?clientHandler.remove(messgeReceivedListener);

? ?}?

??Netty的client端實(shí)現(xiàn)和Server實(shí)現(xiàn)方式大同小異毙沾。比Server端要簡要些了骗卜。少一個(gè)NIOEventLoop。在Bootstrap 的handle方法中增加ChannelInitializer初始化監(jiān)聽器左胞,并增加了IMClientHandler的監(jiān)聽操作寇仓。其中IMClientHandler具體處理服務(wù)器返回的通信信息。

通過ChannelFuture獲取Channel烤宙,通過Channel在一個(gè)循環(huán)里發(fā)送請(qǐng)求遍烦。如果消息隊(duì)列BlockingQueue非空的時(shí)候,獲取Request并發(fā)送躺枕。以上發(fā)送服猪,如何接受數(shù)據(jù)呢?接受到的json被反序列化直接變成了對(duì)象Response拐云,對(duì)Response進(jìn)行處理即可罢猪。

定義了一個(gè)消息接受到的監(jiān)聽接口。

public static interface MessgeReceivedListener {

? ? public void onMessageReceived(Response msg);

? ? public void onMessageDisconnect();

? ? public void onMessageConnect();

}

在接口onMessageReceived方法里直接對(duì)獲取成功的響應(yīng)進(jìn)行處理叉瘩。

而服務(wù)器端對(duì)某個(gè)客戶端進(jìn)行發(fā)送操作膳帕,把Channel添加到ChannelGroup里,將uname和channelid對(duì)應(yīng)起來薇缅。需要對(duì)某個(gè)用戶發(fā)送消息的時(shí)候通過uname獲取channelid危彩,通過channelid從ChannelGroup里獲取channel攒磨,通過channel發(fā)送即可。

具體操作如下:

public void transformMessage(IMMessage message) {

Channel channel = ChannelGroups.getChannel(ChannelGroups.getChannelId(message.getTo()));

if(channel != null && channel.isActive()) {

Response response = new Response();

response.setBody(JSON.toJSONString(message));

response.setStatus(0);

response.setMethod(IMProtocol.REV);

response.setSendTime(System.currentTimeMillis());

channel.writeAndFlush(JSON.toJSON(response)+"\r\n");

}

}

ChannelGroups的代碼實(shí)現(xiàn):

public class ChannelGroups {


private static final Map<String,ChannelId> userList = new ConcurrentHashMap();

private static final ChannelGroup CHANNEL_GROUP = new DefaultChannelGroup("ChannelGroups",

GlobalEventExecutor.INSTANCE);


public static void putUser(String uname,ChannelId id) {

userList.put(uname,id);

}

通過以上代碼解析應(yīng)該對(duì)IM的通信模式有了比較全面的認(rèn)識(shí)汤徽。具體實(shí)現(xiàn)過程可以下載源代碼進(jìn)行查看娩缰。歡迎大家反饋提出問題。

https://github.com/sinxiao/NettyIMServerAndAndroidClient?


運(yùn)行效果圖谒府。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末拼坎,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子狱掂,更是在濱河造成了極大的恐慌演痒,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,273評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件趋惨,死亡現(xiàn)場離奇詭異鸟顺,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)器虾,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,349評(píng)論 3 398
  • 文/潘曉璐 我一進(jìn)店門讯嫂,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人兆沙,你說我怎么就攤上這事欧芽。” “怎么了葛圃?”我有些...
    開封第一講書人閱讀 167,709評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵千扔,是天一觀的道長。 經(jīng)常有香客問我库正,道長曲楚,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,520評(píng)論 1 296
  • 正文 為了忘掉前任褥符,我火速辦了婚禮龙誊,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘喷楣。我一直安慰自己趟大,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,515評(píng)論 6 397
  • 文/花漫 我一把揭開白布铣焊。 她就那樣靜靜地躺著逊朽,像睡著了一般。 火紅的嫁衣襯著肌膚如雪曲伊。 梳的紋絲不亂的頭發(fā)上惋耙,一...
    開封第一講書人閱讀 52,158評(píng)論 1 308
  • 那天,我揣著相機(jī)與錄音,去河邊找鬼绽榛。 笑死,一個(gè)胖子當(dāng)著我的面吹牛婿屹,可吹牛的內(nèi)容都是我干的灭美。 我是一名探鬼主播,決...
    沈念sama閱讀 40,755評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼昂利,長吁一口氣:“原來是場噩夢啊……” “哼届腐!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起蜂奸,我...
    開封第一講書人閱讀 39,660評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤犁苏,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后扩所,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體围详,經(jīng)...
    沈念sama閱讀 46,203評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,287評(píng)論 3 340
  • 正文 我和宋清朗相戀三年祖屏,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了助赞。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,427評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡袁勺,死狀恐怖雹食,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情期丰,我是刑警寧澤群叶,帶...
    沈念sama閱讀 36,122評(píng)論 5 349
  • 正文 年R本政府宣布,位于F島的核電站钝荡,受9級(jí)特大地震影響街立,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜化撕,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,801評(píng)論 3 333
  • 文/蒙蒙 一几晤、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧植阴,春花似錦蟹瘾、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,272評(píng)論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至喷鸽,卻和暖如春众雷,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,393評(píng)論 1 272
  • 我被黑心中介騙來泰國打工砾省, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留鸡岗,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,808評(píng)論 3 376
  • 正文 我出身青樓编兄,卻偏偏與公主長得像轩性,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子狠鸳,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,440評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容