之前的項(xiàng)目需要用到mina疤坝,實(shí)現(xiàn)的功能主要是:服務(wù)端主動(dòng)發(fā)送消息到客戶端,這個(gè)的服務(wù)端為外網(wǎng)的tomcat馆铁,客戶端為內(nèi)網(wǎng)的tomcat跑揉,由于無(wú)法知道內(nèi)網(wǎng)tomcat 的地址,也就不能直接通過(guò)http的方式發(fā)送信息回來(lái)埠巨,最后想來(lái)想去用mina實(shí)現(xiàn)了這個(gè)功能畔裕。
當(dāng)然衣撬,我這里的服務(wù)端是整合的了spring 的,也可以直接把服務(wù)端獨(dú)立出來(lái)扮饶,不整合spring具练,這個(gè)都一樣,區(qū)別不大甜无。
代碼和配置如下:
---------------------------
1扛点,jar包,我這里使用的是spring4.0.5岂丘,mina2.0.7
maven部分文件如下陵究,這個(gè)包會(huì)自動(dòng)也依賴進(jìn)來(lái)mina-filter-ssl-1.1.7.jar
[java]?view plain?copy
[java]?view plain?copy
??????????????????
????????????org.springframework??
????????????spring-context-support??
????????????${springframework-version}??
??????????
??????????
????????????org.springframework??
????????????spring-jdbc??
????????????${springframework-version}??
??????????
??????????
????????????org.springframework??
????????????spring-orm??
????????????${springframework-version}??
??????????
??????????
????????????org.springframework??
????????????spring-aop??
????????????${springframework-version}??
??????????
??????????
??????????
????????????org.springframework??
????????????spring-web??
????????????${springframework-version}??
??????????????
??????????????????
????????????????????commons-logging??
????????????????????commons-logging??
??????????
????????????org.springframework??
????????????spring-webmvc??
????????????${springframework-version}??
??????????
????????????org.aspectj??
????????????aspectjrt??
????????????${aspectj-version}??
??????????
????????????org.aspectj??
????????????aspectjweaver??
????????????${aspectj-version}??
??????????
????????????org.apache.mina??
????????????mina-core??
2.0.7??
??????????
????????????org.apache.mina??
????????????mina-integration-spring??
1.1.7??
??????????
????????????org.apache.mina??
????????????mina-integration-beans??
2.0.8??
2,spring-ztc_app-mina.xml (spring與mina 的配置文件)
spring與mina 的配置文件奥帘,需要導(dǎo)入到spring 的總配置文件中铜邮,或者加入到web.xml的spring監(jiān)聽(tīng)掃描中
[java]?view plain?copy
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"??
xsi:schemaLocation="http://www.springframework.org/schema/beans???
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">??
??
??
??????????????
??
??
????????????????-->??
value="org.apache.mina.integration.beans.InetSocketAddressEditor">??
??
init-method="bind"?destroy-method="unbind">??
??
??
??
??
??????????
????????????????/>?-->??
??
??
class="org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder">??
??
??????????????
??
??
3,mina服務(wù)端業(yè)務(wù)處理類
[java]?view plain?copy
package?cn.hydom.ztc.ztc_app.controller.mina;??
import?org.apache.commons.logging.Log;??
import?org.apache.commons.logging.LogFactory;??
import?org.apache.mina.core.service.IoHandlerAdapter;??
import?org.apache.mina.core.session.IdleStatus;??
import?org.apache.mina.core.session.IoSession;??
/**
?*?@Description:?mina服務(wù)端業(yè)務(wù)處理類
?*?@author?whl
?*?@date?2014-9-30?下午12:36:28
?*
?*/??
public?class?ServerHandler?extends?IoHandlerAdapter?{??
private?final?static?Log?log?=?LogFactory.getLog(ServerHandler.class);??
public?ServerHandler()?{??
//?TODO?Auto-generated?constructor?stub??
?????}??
@Override??
public?void?exceptionCaught(IoSession?session,?Throwable?cause)??
throws?Exception?{??
?????}??
@Override??
public?void?messageReceived(IoSession?session,?Object?message)??
throws?Exception?{??
log.debug("服務(wù)端收到信息-------------");??
//獲取客戶端發(fā)過(guò)來(lái)的key??
??????????String?key?=?message.toString();??
System.out.println("message?:"+message.toString());??
String?carPark_id?=?key.substring(key.indexOf("=")?+?1);??
System.out.println("carPark_id?:"+carPark_id);??
//保存客戶端的會(huì)話session??
??????????SessionMap?sessionMap?=?SessionMap.newInstance();??
??????????sessionMap.addSession(carPark_id,?session);??
?????}??
@Override??
public?void?messageSent(IoSession?session,?Object?message)?throws?Exception?{??
log.debug("------------服務(wù)端發(fā)消息到客戶端---");??
?????}??
@Override??
public?void?sessionClosed(IoSession?session)?throws?Exception?{??
//?TODO?Auto-generated?method?stub??
log.debug("遠(yuǎn)程session關(guān)閉了一個(gè)..."?+?session.getRemoteAddress().toString());??
?????}??
@Override??
public?void?sessionCreated(IoSession?session)?throws?Exception?{??
log.debug(session.getRemoteAddress().toString()?+"----------------------create");??
?????}??
@Override??
public?void?sessionIdle(IoSession?session,?IdleStatus?status)??
throws?Exception?{??
log.debug(session.getServiceAddress()?+"IDS");??
?????}??
@Override??
public?void?sessionOpened(IoSession?session)?throws?Exception?{??
log.debug("連接打開(kāi):"+session.getLocalAddress());??
?????}??
????}??
4,服務(wù)端緩存客戶端socket連接的單例類
[html]?view plain?copy
package?cn.hydom.ztc.ztc_app.controller.mina;??
import?java.util.HashMap;??
import?java.util.Map;??
import?org.apache.commons.logging.Log;??
import?org.apache.commons.logging.LogFactory;??
import?org.apache.mina.core.session.IoSession;??
/**??
?*?@Description:?單例工具類寨蹋,保存所有mina客戶端連接??
?*?@author?whl??
?*?@date?2014-9-29?上午10:09:15??
?*??
?*/??
public?class?SessionMap?{??
private?final?static?Loglog?=?LogFactory.getLog(SessionMap.class);??
private?static?SessionMapsessionMap?=?null;??
private?Mapmap?=?new?HashMap();??
????//構(gòu)造私有化?單例??
????private?SessionMap(){}??
????/**??
?????*?@Description:?獲取唯一實(shí)例??
?????*?@author?whl??
?????*?@date?2014-9-29?下午1:29:33??
?????*/??
????public?static?SessionMap?newInstance(){??
????????log.debug("SessionMap單例獲取---");??
if(sessionMap?==?null){??
sessionMap?=?new?SessionMap();??
????????}??
????????return?sessionMap;??
????}??
????/**??
?????*?@Description:?保存session會(huì)話??
?????*?@author?whl??
?????*?@date?2014-9-29?下午1:31:05??
?????*/??
????public?void?addSession(String?key,?IoSession?session){??
log.debug("保存會(huì)話到SessionMap單例---key="?+?key);??
????????this.map.put(key,?session);??
????}??
????/**??
?????*?@Description:?根據(jù)key查找緩存的session??
?????*?@author?whl??
?????*?@date?2014-9-29?下午1:31:55??
?????*/??
????public?IoSession?getSession(String?key){??
log.debug("獲取會(huì)話從SessionMap單例---key="?+?key);??
????????return?this.map.get(key);??
????}??
????/**??
?????*?@Description:?發(fā)送消息到客戶端??
?????*?@author?whl??
?????*?@date?2014-9-29?下午1:57:51??
?????*/??
????public?void?sendMessage(String[]?keys,?Object?message){??
????????for(String?key?:?keys){??
IoSessionsession?=?getSession(key);??
log.debug("反向發(fā)送消息到客戶端Session---key="?+?key?+?"----------消息="?+?message);??
if(session?==?null){??
????????????????return;??
????????????}??
????????????session.write(message);??
????????}??
????}??
}??
5松蒜,編碼解碼器,
HCoderFactory.java
[java]?view plain?copy
package?cn.hydom.ztc.ztc_app.controller.mina;??
import?java.nio.charset.Charset;??
import?org.apache.mina.core.session.IoSession;??
import?org.apache.mina.filter.codec.ProtocolCodecFactory;??
import?org.apache.mina.filter.codec.ProtocolDecoder;??
import?org.apache.mina.filter.codec.ProtocolEncoder;??
/**
?*?@Description:?編碼和解碼器工廠類.
?*?@author?whl
?*?@date?2014-9-30?下午12:34:59
?*
?*/??
public?class?HCoderFactory?implements?ProtocolCodecFactory?{??
private?final?HEncoder?encoder;??
private?final?HDecoder?decoder;??
public?HCoderFactory()?{??
//this(Charset.defaultCharset());??
this(Charset.forName("UTF-8"));??
????}??
public?HCoderFactory(Charset?charSet)?{??
this.encoder?=?new?HEncoder(charSet);??
this.decoder?=?new?HDecoder(charSet);??
????}??
@Override??
public?ProtocolDecoder?getDecoder(IoSession?arg0)?throws?Exception?{??
return?decoder;??
????}??
@Override??
public?ProtocolEncoder?getEncoder(IoSession?arg0)?throws?Exception?{??
return?encoder;??
????}??
}??
HDecoder.java
[java]?view plain?copy
package?cn.hydom.ztc.ztc_app.controller.mina;??
import?java.nio.charset.Charset;??
import?java.nio.charset.CharsetDecoder;??
import?org.apache.mina.core.buffer.IoBuffer;??
import?org.apache.mina.core.session.IoSession;??
import?org.apache.mina.filter.codec.CumulativeProtocolDecoder;??
import?org.apache.mina.filter.codec.ProtocolDecoderOutput;??
/**
?*?@Description:?解碼工具類
?*?@author?whl
?*?@date?2014-9-30?下午12:35:22
?*
?*/??
public?class?HDecoder?extends?CumulativeProtocolDecoder?{??
private?final?Charset?charset;??
public?HDecoder(Charset?charset)?{??
this.charset?=?charset;??
????}??
public?boolean?doDecode(IoSession?session,?IoBuffer?in,??
ProtocolDecoderOutput?out)throws?Exception?{??
//System.out.println("-------doDecode----------");??
????????CharsetDecoder?cd?=?charset.newDecoder();??
????????String?mes?=?in.getString(cd);??
????????out.write(mes);??
return?true;??
/*
????????if?(in.remaining()?>?4)?{//?有數(shù)據(jù)時(shí)已旧,讀取字節(jié)判斷消息長(zhǎng)度
????????????in.mark();//?標(biāo)記當(dāng)前位置秸苗,以便reset
????????????int?size?=?in.getInt();
????????????//?如果消息內(nèi)容不夠,則重置运褪,相當(dāng)于不讀取size
????????????if?(size?>?in.remaining())?{
????????????????in.reset();
????????????????return?false;//?接收新數(shù)據(jù)惊楼,以拼湊成完整數(shù)據(jù)
????????????}?else?if?(size?!=?0?&&?(size?-?4?>=?0))?{
????????????????byte[]?bytes?=?new?byte[size?-?4];
????????????????//int?protocol?=?in.getInt();
????????????????//?拿到客戶端發(fā)過(guò)來(lái)的數(shù)據(jù)組裝成基礎(chǔ)包寫出去
????????????????in.get(bytes,?0,?size?-?4);
????????????????//in.get(bytes,?size?-?4,?size);
????????????????PackageBeanFactory?beanFactory?=?(PackageBeanFactory)?session
????????????????????????.getAttribute(ServerHandler.BEAN_FACTORY);
????????????????//out.write(beanFactory.getPackage(protocol,?size,?bytes));
????????????????String?mes?=?in.getString(cd);
????????????????out.write(mes);
????????????????//?如果讀取內(nèi)容后還粘了包,就讓父類再給讀取進(jìn)行下次解析
????????????????if?(in.remaining()?>?0)?{
????????????????????return?true;
????????????????}
????????????}
????????}
????????return?false;//?處理成功秸讹,讓父類進(jìn)行接收下個(gè)包
*/????
????}??
}??
HEncoder.java
[java]?view plain?copy
package?cn.hydom.ztc.ztc_app.controller.mina;??
import?java.nio.charset.Charset;??
import?java.nio.charset.CharsetDecoder;??
import?java.nio.charset.CharsetEncoder;??
import?org.apache.commons.logging.Log;??
import?org.apache.commons.logging.LogFactory;??
import?org.apache.mina.core.buffer.IoBuffer;??
import?org.apache.mina.core.session.IoSession;??
import?org.apache.mina.filter.codec.ProtocolEncoder;??
import?org.apache.mina.filter.codec.ProtocolEncoderOutput;??
/**
?*?@Description:?編碼工具類
?*?@author?whl
?*?@date?2014-9-30?下午12:35:35
?*
?*/??
public?class?HEncoder?implements?ProtocolEncoder?{??
private?final?static?Log?log?=?LogFactory.getLog(HEncoder.class);??
private?final?Charset?charset;??
public?HEncoder(Charset?charset)?{??
this.charset?=?charset;??
????}??
@Override??
public?void?encode(IoSession?session,?Object?message,??
ProtocolEncoderOutput?out)throws?Exception?{??
????????CharsetEncoder?ce?=?charset.newEncoder();??
????????String?mes?=?(String)?message;??
IoBuffer?buffer?=?IoBuffer.allocate(100).setAutoExpand(true);??
????????buffer.putString(mes,ce);??
????????buffer.flip();??
????????out.write(buffer);??
/*System.out.println("---------encode-------------");
????????String?mes?=?(String)?message;
????????byte[]?data?=?mes.getBytes("UTF-8");
????????IoBuffer?buffer?=?IoBuffer.allocate(data.length?+?4);
????????buffer.putInt(data.length);
????????buffer.put(data);
????????buffer.flip();
????????out.write(buffer);
????????out.flush();*/??
????}??
@Override??
public?void?dispose(IoSession?session)?throws?Exception?{??
log.info("Dispose?called,session?is?"?+?session);??
????}??
}??
6,客戶端程序
ClentMain.java
[java]?view plain?copy
package?cn.hydom.ztc.ztc_app.controller.mina;??
import?java.net.InetSocketAddress;??
import?java.text.SimpleDateFormat;??
import?java.util.Date;??
import?org.apache.commons.logging.Log;??
import?org.apache.commons.logging.LogFactory;??
import?org.apache.mina.core.filterchain.IoFilterAdapter;??
import?org.apache.mina.core.future.ConnectFuture;??
import?org.apache.mina.core.session.IoSession;??
import?org.apache.mina.filter.codec.ProtocolCodecFilter;??
import?org.apache.mina.filter.logging.LoggingFilter;??
import?org.apache.mina.transport.socket.nio.NioSocketConnector;??
/**
?*?@Description:?mina客戶端檀咙,包含斷線重連機(jī)制,空閑重連機(jī)制
?*?@author?whl
?*?@date?2014-11-2
?*/??
public?class?ClentMain?extends?Thread{??
private?final?static?Log?log?=?LogFactory.getLog(ClentMain.class);??
@Override??
public?void?run()?{??
//ip??
String?host?="192.168.0.38";??
//端口??
int?port?=?6007;??
//停車場(chǎng)id??
final?String?carPark_id?=?"1";??
//?創(chuàng)建客戶端連接器.??
final?NioSocketConnector?connector?=?new?NioSocketConnector();??
//設(shè)置連接超時(shí)????
connector.setConnectTimeoutMillis(30000);???
//?設(shè)置默認(rèn)訪問(wèn)地址????
connector.setDefaultRemoteAddress(new?InetSocketAddress(host,?port));??
//將IoSession的主鍵屬性注入線程映射表MDC中??
//connector.getFilterChain().addLast("mdc",?new?MdcInjectionFilter());????
//日志過(guò)濾器??
connector.getFilterChain().addLast("logger",?new?LoggingFilter());??
//?設(shè)置編碼過(guò)濾器??
connector.getFilterChain().addLast("codec",new?ProtocolCodecFilter(new?HCoderFactory()));??????????
//添加處理器????
connector.setHandler(new?ClintHandler());??
//?設(shè)置接收緩沖區(qū)的大小????
connector.getSessionConfig().setReceiveBufferSize(10240);??
//?設(shè)置輸出緩沖區(qū)的大小????
connector.getSessionConfig().setSendBufferSize(10240);??
/**
?????????*?空閑重連的機(jī)制璃诀,根據(jù)需要選擇相應(yīng)的配置
?????????*/??
//?讀寫都空閑時(shí)間:30秒????
//connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE,?30);???
//?讀(接收通道)空閑時(shí)間:40秒???
//connector.getSessionConfig().setIdleTime(IdleStatus.READER_IDLE,?40);??
//?寫(發(fā)送通道)空閑時(shí)間:50秒???
//connector.getSessionConfig().setIdleTime(IdleStatus.WRITER_IDLE,?50);???
//斷線重連回調(diào)攔截器????
connector.getFilterChain().addFirst("reconnection",?new?IoFilterAdapter()?{????
@Override????
public?void?sessionClosed(NextFilter?nextFilter,?IoSession?ioSession)?throws?Exception?{????
for(;;){????
try{????
Thread.sleep(3000);???
????????????????????????ConnectFuture?future?=?connector.connect();????
future.awaitUninterruptibly();//?等待連接創(chuàng)建成功????
IoSession?session?=?future.getSession();//?獲取會(huì)話????
session.write("key="+carPark_id);??
if(session.isConnected()){????
log.info("斷線重連["+?connector.getDefaultRemoteAddress().getHostName()?+":"+?connector.getDefaultRemoteAddress().getPort()+"]成功");????
//System.out.println("斷線重連["+?connector.getDefaultRemoteAddress().getHostName()?+":"+?connector.getDefaultRemoteAddress().getPort()+"]成功");??
break;????
????????????????????????}????
}catch(Exception?ex){????
log.info("重連服務(wù)器登錄失敗,3秒再連接一次:"?+?ex.getMessage());????
//System.out.println("重連服務(wù)器登錄失敗,3秒再連接一次:"?+?ex.getMessage());????
????????????????????}????
????????????????}????
????????????}????
????????});??
//開(kāi)始連接??
for?(;;)?{????
try?{????
????????????????ConnectFuture?future?=?connector.connect();????
//?等待連接創(chuàng)建成功????
????????????????future.awaitUninterruptibly();????
//?獲取會(huì)話????
????????????????IoSession?session?=?future.getSession();????
//發(fā)送消息??
session.write("key="?+?carPark_id);??
log.error("連接服務(wù)端"?+?host?+?":"?+?port?+?"[成功]"?+?",,時(shí)間:"?+?new?SimpleDateFormat("yyyy-MM-dd?HH:mm:ss").format(new?Date()));????
break;????
}catch?(Exception?e)?{????
//System.out.println("連接服務(wù)端"?+?host?+?":"?+?port?+?"失敗"?+?",,時(shí)間:"?+?new?SimpleDateFormat("yyyy-MM-dd?HH:mm:ss").format(new?Date())?+?",?連接MSG異常,請(qǐng)檢查MSG端口弧可、IP是否正確,MSG服務(wù)是否啟動(dòng),異常內(nèi)容:"?+?e.getMessage());????
log.error("連接服務(wù)端"?+?host?+?":"?+?port?+?"失敗"?+?",,時(shí)間:"?+?new?SimpleDateFormat("yyyy-MM-dd?HH:mm:ss").format(new?Date())?+?",?連接MSG異常,請(qǐng)檢查MSG端口、IP是否正確,MSG服務(wù)是否啟動(dòng),異常內(nèi)容:"?+?e.getMessage(),?e);????
//?連接失敗后,重連10次,間隔30s????
try?{??
Thread.sleep(5000);??
}catch?(InterruptedException?e1)?{??
????????????????????e1.printStackTrace();??
log.error("連接服務(wù)端失敗后文虏,睡眠5秒發(fā)生異常侣诺!");??
????????????????}??
????????????}????
????????}??
//?cf.getSession().write("quit");//發(fā)送消息??
//cf.getSession().close();??
//cf.getSession().getCloseFuture().awaitUninterruptibly();//?等待連接斷開(kāi)??
//connector.dispose();??
????}??
}??
ClintHandler.java
[java]?view plain?copy
package?cn.hydom.ztc.ztc_app.controller.mina;??
import?org.apache.commons.logging.Log;??
import?org.apache.commons.logging.LogFactory;??
import?org.apache.mina.core.service.IoHandlerAdapter;??
import?org.apache.mina.core.session.IdleStatus;??
import?org.apache.mina.core.session.IoSession;??
/**
?*?@Description:?客戶端業(yè)務(wù)處理類
?*?@author?whl
?*?@date?2014-11-2
?*/??
public?class?ClintHandler?extends?IoHandlerAdapter?{??
private?final?static?Log?log?=?LogFactory.getLog(ClintHandler.class);??
/**
?????*?寫處理服務(wù)端推送的信息的邏輯
?????*/??
@Override??
public?void?messageReceived(IoSession?session,?Object?message)??
throws?Exception?{??
System.out.println("-----服務(wù)頓返回的json數(shù)據(jù)----");??
????????String?s?=?message.toString();??
System.out.println("message?:"?+?s);??
System.out.println("message?length:"?+?s.length());??
????}??
@Override????
public?void?sessionIdle(IoSession?session,?IdleStatus?status)?throws?Exception?{????
log.info("-客戶端與服務(wù)端連接[空閑]?-?"?+?status.toString());????
System.out.println("-客戶端與服務(wù)端連接[空閑]?-?"?+?status.toString());??
if(session?!=?null){????
session.close(true);????
????????}????
????}????
}??
7,測(cè)試
這里就不寫實(shí)際的代碼了氧秘,
1年鸳,首先部署web工程啟動(dòng)tomcat,讓服務(wù)端先運(yùn)行起來(lái)丸相,
2搔确,然后運(yùn)行客戶端
[java]?view plain?copy
public?class?ClintTest1?{??
public?static?void?main(String[]?args)?throws?Exception?{??
ClentMain?mina?=new?ClentMain();??
????????mina.start();??
????}??
}??
3,想要服務(wù)端主動(dòng)發(fā)回來(lái)信息,還得在服務(wù)端的web工程的action中寫一個(gè)htpp訪問(wèn)的方法膳算,方法中去sessionMap類中緩存的map中查找session座硕,然后發(fā)送消息。
代碼如下涕蜂,我用的是springmvc
[java]?view plain?copy
/**
?????*?@Description:?發(fā)送消息到客戶端
?????*?@author?whl
?????*?@date?2014-9-29?下午1:18:54
?????*/??
@ResponseBody??
@RequestMapping(value="/sendMessage")??
public?String?sendMessage(HttpServletRequest?request,?String[]?carPark_id){??
try{??
//??
//獲取鏈接的參數(shù)carPark_id??
log.debug("carPark_id[].length---------?"?+?carPark_id.length);??
for(String?id?:?carPark_id){??
log.debug("carPark_id?---------?"?+?id);??
????????????}??
//這里用的假數(shù)據(jù)????????????????????????????????????????????????carPark_id?=?new?String[]{"1"};??
//發(fā)送的信息String jsonstr = "123";//反向發(fā)送信息log.debug("開(kāi)始反向發(fā)送消息到客戶端-------- ");SessionMap sessionMap = SessionMap.newInstance();sessionMap.sendMessage(carPark_id, jsonstr);//返回信息return "發(fā)送的信息為" +?jsonstr;
}catch (Exception e) {log.error(e);return "出錯(cuò)了华匾。。";}}
4机隙,好了蜘拉,現(xiàn)在重新發(fā)布工程,啟動(dòng)服務(wù)端和客戶端有鹿,然后訪問(wèn)這個(gè)鏈接就可以了旭旭,當(dāng)然springmvc需要自己配置,這也不是這里的重點(diǎn)葱跋。
注意:客戶端發(fā)過(guò)來(lái)的carPark_id必須與服務(wù)端查找的一致持寄,不讓就找不到相應(yīng)的客戶端session連接,消息無(wú)法發(fā)送成功娱俺。