最近在開發(fā)MobIM臼予,實(shí)現(xiàn)了消息傳輸和群等功能的IM功能鸣戴。SDK功能包小,而功能全面粘拾≌可以與原來的系統(tǒng)進(jìn)行無縫整合。
自己抽空也實(shí)現(xiàn)了一套IM Server和IMClient的業(yè)務(wù)通信模式缰雇。沒有實(shí)現(xiàn)復(fù)雜的UI界面入偷,實(shí)現(xiàn)簡單的登錄注冊(cè),發(fā)消息械哟,收消息疏之。服務(wù)器端與客戶端都使用Netty通信。
Netty基于非阻塞(nio)暇咆,事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用程序框架和工具锋爪。
通過Netty面對(duì)大規(guī)模的并發(fā)請(qǐng)求可以處理的得心用手。用來替代原來的bio網(wǎng)絡(luò)應(yīng)用請(qǐng)求框架爸业。
BIO通信即平時(shí)使用的基于Socket其骄,ServerSocket的InputStream和OutStream。
Netty神奇的地方在于是否是阻塞的扯旷。
while(true){
//主線程死循環(huán)等待新連接到來
?Socket socket = serverSocket.accept();
//為新的連接創(chuàng)建新的線程,客戶端與服務(wù)器上的線程數(shù)1:1
?executor.submit(new ConnectIOnHandler(socket));
在BIO模型中拯爽,服務(wù)器通過ServerSocket來開啟監(jiān)聽,每當(dāng)有請(qǐng)求的時(shí)候開啟一個(gè)線程來接受處理和維持狀態(tài)钧忽。這種思想在低并發(fā)毯炮,小吞吐的應(yīng)用還可以應(yīng)付逼肯,一旦遇到大并發(fā),大吞吐的請(qǐng)求否副,必然歇菜汉矿。線程和客戶端保持著1:1的對(duì)應(yīng)關(guān)系,維持著線程备禀。維持那么的多的線程洲拇,JVM必然不堪重負(fù),服務(wù)器必然崩潰曲尸,宕機(jī)赋续。
而在非阻塞的Netty中,卻可以應(yīng)付自如另患。從容應(yīng)對(duì)纽乱。Tomcat就是基于BIO的網(wǎng)絡(luò)通信模式(Tomcat可以通過一定配置,改成非阻塞模式)昆箕,而JBoss卻是基于非阻塞的NIO實(shí)現(xiàn)鸦列。
NIO的網(wǎng)絡(luò)通信模式很強(qiáng)勁,但是上手卻一點(diǎn)都不容易鹏倘。其中解決和牽扯到好多網(wǎng)絡(luò)問題薯嗤。如:網(wǎng)絡(luò)延時(shí),TCP的粘包/拆包纤泵,網(wǎng)絡(luò)故障等一堆一堆的問題骆姐。而Netty呢,針對(duì)nio復(fù)雜的編程難題而進(jìn)行一系列的封裝實(shí)現(xiàn)捏题,提供給廣大開發(fā)者一套開源簡單玻褪,方便使用的API類庫,甚至青出于藍(lán)而勝于藍(lán)公荧,甚至幾乎完美的解決CPU突然飆升到100%的bug :http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6403933(其實(shí)也沒有真正的解決带射,只是把復(fù)現(xiàn)的概率降到了最低而已)。
用Netty來實(shí)現(xiàn)IM實(shí)在太合適了循狰∮褂眨可以在最短的時(shí)間里整出一套思路清晰,架構(gòu)簡明的IM通信底層模型晤揣。提下需求,底層用JSON 字符串String進(jìn)行通信朱灿,對(duì)象通過JSON序列化成JSON String昧识。收到JSON數(shù)據(jù)后再反序列化成對(duì)象。
首先盗扒,我們先看服務(wù)器是怎么實(shí)現(xiàn)的跪楞。
private static final StringDecoder DECODER = new StringDecoder();
private static final StringEncoder ENCODER = new StringEncoder();
...
//boss線程監(jiān)聽端口缀去,worker線程負(fù)責(zé)數(shù)據(jù)讀寫
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
//輔助啟動(dòng)類
ServerBootstrap bootstrap = new ServerBootstrap();
try {
//設(shè)置線程池
bootstrap.group(bossGroup, workerGroup);
//設(shè)置socket工廠
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.handler(new LoggingHandler(LogLevel.INFO));
//設(shè)置管道工廠
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//獲取管道
ChannelPipeline pipe = socketChannel.pipeline();
// Add the text line codec combination first,
pipe.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
? ? ? ?// the encoder and decoder are static as these are sharable
//字符串編碼器
pipe.addLast(DECODER);
//字符串解碼器
pipe.addLast(ENCODER);
//業(yè)務(wù)處理類
pipe.addLast(new IMServerHandle());
}
});
//綁定端口
// Bind and start to accept incoming connections.
ChannelFuture f = bootstrap.bind(port).sync();
if (f.isSuccess()) {
Log.debug("server start success... port: " + port + ", main work thread: "
+ Thread.currentThread().getId());
}
////等待服務(wù)端監(jiān)聽端口關(guān)閉
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
//優(yōu)雅退出,釋放線程池資源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
??以上是Netty服務(wù)器啟動(dòng)的代碼甸祭。其中需要注意childHandler方法缕碎。需要把我們要添加的業(yè)務(wù)處理handler來添加到這里。通過ChannelPipeline 添加ChannelHandler池户。而處理字符串的就在IMServerHandle里實(shí)現(xiàn)咏雌。IMServerHandle繼承了SimpleChannelInboundHandler類。其中泛型T就是要轉(zhuǎn)換成的對(duì)象校焦∩薅叮客戶端與服務(wù)器端通信是本質(zhì)上通過字節(jié)碼byte[]通信的,而通過StringDecoder 和StringEncoder工具類對(duì)byte[]進(jìn)行轉(zhuǎn)換寨典,在IMServerHandle中獲取到String進(jìn)行處理即可氛雪。
看下IMServerHandle的實(shí)現(xiàn)方式。
/***
?* 面向IM通信操作的業(yè)務(wù)類
?* @author xhj
?*
?*/
public class IMServerHandle extends SimpleChannelInboundHandler<String> {
/**
* user操作業(yè)務(wù)類
*/
private UserBiz userBiz = new UserBiz();
/***
* 消息操作的業(yè)務(wù)類
*/
private IMMessageBiz immessagebiz = new IMMessageBiz();
/***
* 處理接受到的String類型的JSON數(shù)據(jù)
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(" get msg >> "+msg);
//把JSON數(shù)據(jù)進(jìn)行反序列化
? Request req = JSON.parseObject(msg, Request.class);
? Response respon = new Response();
? respon.setSendTime(System.currentTimeMillis());
? //判斷是否是合法的請(qǐng)求
? if(req != null ) {
? System.out.println("the req method >> "+req.getMethod());
? //獲取操作類型
? if(req.getMethod() == IMProtocol.LOGIN) {
? //獲取要操作的對(duì)象
? User user = JSON.parseObject(req.getBody(),User.class);
? //設(shè)置返回?cái)?shù)據(jù)的操作類型
? respon.setMethod(IMProtocol.LOGIN);
? //執(zhí)行業(yè)務(wù)操作
? boolean bl = userBiz.login(user);
? if(bl) {//檢驗(yàn)用戶有效
? //設(shè)置響應(yīng)數(shù)據(jù)
? respon.setBody("login ok");
? //設(shè)置狀態(tài)
? respon.setStatus(0);
? //登錄成功將連接channel保存到Groups里
? ChannelGroups.add(ctx.channel());
? //將用戶的uname和channelId進(jìn)行綁定耸成,服務(wù)器向指定用戶發(fā)送消息的時(shí)候需要用到uname和channelId
? ChannelGroups.putUser(user.getUname(), ctx.channel().id());
? //發(fā)送廣播通知某人登錄成功了
? userBiz.freshUserLoginStatus(user);
? } else {//用戶密碼錯(cuò)誤
? //設(shè)置錯(cuò)誤描述
? respon.setErrorStr("pwd-error");
? //設(shè)置狀態(tài)描述碼
? respon.setStatus(-1);
? }
? //將Response序列化為json字符串
? msg = JSON.toJSONString(respon);
? //發(fā)送josn字符串?dāng)?shù)據(jù)报亩,注意后面一定要加"\r\n"
? ctx.writeAndFlush(msg+"\r\n");
? } else if(req.getMethod() == IMProtocol.SEND) {
? IMMessage immsg = JSON.parseObject(req.getBody(), IMMessage.class);
? immsg.setSendTime(System.currentTimeMillis()); c
通過IMServerHandle可以十分方便的處理獲取到的String字符串。處理完后井氢,可以直接通過ChannelHandlerContext的writeAndFlush方法發(fā)送數(shù)據(jù)弦追。
再看下Netty客戶端如何實(shí)現(xiàn)。
private BlockingQueue<Request> requests = new LinkedBlockingQueue<>();
? ?/**
? ? * String字符串解碼器
? ? */
private static final StringDecoder DECODER = new StringDecoder();
? ?/***
? ? * String字符串編碼器
? ? */
private static final StringEncoder ENCODER = new StringEncoder();
? ?/**
? ? * 客戶端業(yè)務(wù)處理Handler
? ? */
? ?private IMClientHandler clientHandler ;
? ?/**
? ? * 添加發(fā)送請(qǐng)求Request
? ? * @param request
? ? */
? ?public void addRequest(Request request) {
? ? ? ?try {
? ? ? ? ? ?requests.put(request);
? ? ? ?} catch (InterruptedException e) {
? ? ? ? ? ?e.printStackTrace();
? ? ? ?}
? ?}
? ?/**
? ? * 是否繼續(xù)進(jìn)行運(yùn)行
? ? */
? ?private boolean run = true;
? ?public void run() {
? ? ? ?//遠(yuǎn)程IP
? ? ? ?String host = "172.20.10.7";
? ? ? ?//端口號(hào)
? ? ? ?int port = 10000;
? ? ? ?//工作線程
? ? ? ?EventLoopGroup workerGroup = new NioEventLoopGroup();
? ? ? ?try {
? ? ? ? ? ?//輔助啟動(dòng)類
? ? ? ? ? ?Bootstrap b = new Bootstrap(); // (1)
? ? ? ? ? ?//設(shè)置線程池
? ? ? ? ? ?b.group(workerGroup); // (2)
? ? ? ? ? ?//設(shè)置socket工廠 不是ServerSocket而是Socket
? ? ? ? ? ?b.channel(NioSocketChannel.class); // (3)
? ? ? ? ? ?b.handler(new LoggingHandler(LogLevel.INFO));
? ? ? ? ? ?//設(shè)置管道工廠
? ? ? ? ? ?b.handler(new ChannelInitializer<SocketChannel>() {
? ? ? ? ? ? ? ?public void initChannel(SocketChannel ch) throws Exception {
? ? ? ? ? ? ? ? ? ?ChannelPipeline pipe = ch.pipeline();
? ? ? ? ? ? ? ? ? ?// Add the text line codec combination first,
? ? ? ? ? ? ? ? ? ?pipe.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
? ? ? ? ? ? ? ? ? ?// the encoder and decoder are static as these are sharable
? ? ? ? ? ? ? ? ? ?//字符串解碼器
? ? ? ? ? ? ? ? ? ?pipe.addLast(DECODER);
? ? ? ? ? ? ? ? ? ?//字符串編碼器
? ? ? ? ? ? ? ? ? ?pipe.addLast(ENCODER);
? ? ? ? ? ? ? ? ? ?clientHandler = new IMClientHandler();
? ? ? ? ? ? ? ? ? ?//IM業(yè)務(wù)處理類
? ? ? ? ? ? ? ? ? ?pipe.addLast(clientHandler);
? ? ? ? ? ? ? ?}
? ? ? ? ? ?});
? ? ? ? ? ?// Start the client.
? ? ? ? ? ?ChannelFuture f = b.connect(host, port).sync(); // (5)
? ? ? ? ? ?Channel ch = f.channel();
? ? ? ? ? ?ChannelFuture lastWriteFuture = null;
? ? ? ? ? ?while(run) {
? ? ? ? ? ? ? ?//將要發(fā)送的Request轉(zhuǎn)化為JSON String類型
? ? ? ? ? ? ? ?String line = JSON.toJSONString(requests.take());
? ? ? ? ? ? ? ?if(line != null && line.length() > 0) {//判斷非空
? ? ? ? ? ? ? ? ? ?// Sends the received line to the server.
? ? ? ? ? ? ? ? ? ?//發(fā)送數(shù)據(jù)到服務(wù)器
? ? ? ? ? ? ? ? ? ?lastWriteFuture = ch.writeAndFlush(line + "\r\n");
? ? ? ? ? ? ? ?}
? ? ? ? ? ?}
? ? ? ? ? ?// Wait until all messages are flushed before closing the channel.
? ? ? ? ? ?//關(guān)閉寫的端口
? ? ? ? ? ?if (lastWriteFuture != null) {
? ? ? ? ? ? ? ?lastWriteFuture.sync();
? ? ? ? ? ?}
? ? ? ?} catch(Exception ex){
? ? ? ? ? ?ex.printStackTrace();
? ? ? ?} finally {
? ? ? ? ? ?//優(yōu)雅的關(guān)閉工作線程
? ? ? ? ? ?workerGroup.shutdownGracefully();
? ? ? ?}
? ?}
? ?/**
? ? * 增加消息監(jiān)聽接受接口
? ? * @param messgeReceivedListener
? ? */
? ?public void addMessgeReceivedListener(MessageSender.MessgeReceivedListener messgeReceivedListener) {
? ? ? ?clientHandler.addMessgeReceivedListener(messgeReceivedListener);
? ?}
? ?/***
? ? * ?移除消息監(jiān)聽接口
? ? * @param messgeReceivedListener
? ? */
? ?public void remove(MessageSender.MessgeReceivedListener messgeReceivedListener) {
? ? ? ?clientHandler.remove(messgeReceivedListener);
? ?}?
??Netty的client端實(shí)現(xiàn)和Server實(shí)現(xiàn)方式大同小異毙沾。比Server端要簡要些了骗卜。少一個(gè)NIOEventLoop。在Bootstrap 的handle方法中增加ChannelInitializer初始化監(jiān)聽器左胞,并增加了IMClientHandler的監(jiān)聽操作寇仓。其中IMClientHandler具體處理服務(wù)器返回的通信信息。
通過ChannelFuture獲取Channel烤宙,通過Channel在一個(gè)循環(huán)里發(fā)送請(qǐng)求遍烦。如果消息隊(duì)列BlockingQueue非空的時(shí)候,獲取Request并發(fā)送躺枕。以上發(fā)送服猪,如何接受數(shù)據(jù)呢?接受到的json被反序列化直接變成了對(duì)象Response拐云,對(duì)Response進(jìn)行處理即可罢猪。
定義了一個(gè)消息接受到的監(jiān)聽接口。
public static interface MessgeReceivedListener {
? ? public void onMessageReceived(Response msg);
? ? public void onMessageDisconnect();
? ? public void onMessageConnect();
}
在接口onMessageReceived方法里直接對(duì)獲取成功的響應(yīng)進(jìn)行處理叉瘩。
而服務(wù)器端對(duì)某個(gè)客戶端進(jìn)行發(fā)送操作膳帕,把Channel添加到ChannelGroup里,將uname和channelid對(duì)應(yīng)起來薇缅。需要對(duì)某個(gè)用戶發(fā)送消息的時(shí)候通過uname獲取channelid危彩,通過channelid從ChannelGroup里獲取channel攒磨,通過channel發(fā)送即可。
具體操作如下:
public void transformMessage(IMMessage message) {
Channel channel = ChannelGroups.getChannel(ChannelGroups.getChannelId(message.getTo()));
if(channel != null && channel.isActive()) {
Response response = new Response();
response.setBody(JSON.toJSONString(message));
response.setStatus(0);
response.setMethod(IMProtocol.REV);
response.setSendTime(System.currentTimeMillis());
channel.writeAndFlush(JSON.toJSON(response)+"\r\n");
}
}
ChannelGroups的代碼實(shí)現(xiàn):
public class ChannelGroups {
private static final Map<String,ChannelId> userList = new ConcurrentHashMap();
private static final ChannelGroup CHANNEL_GROUP = new DefaultChannelGroup("ChannelGroups",
GlobalEventExecutor.INSTANCE);
public static void putUser(String uname,ChannelId id) {
userList.put(uname,id);
}
通過以上代碼解析應(yīng)該對(duì)IM的通信模式有了比較全面的認(rèn)識(shí)汤徽。具體實(shí)現(xiàn)過程可以下載源代碼進(jìn)行查看娩缰。歡迎大家反饋提出問題。
https://github.com/sinxiao/NettyIMServerAndAndroidClient?