千言萬(wàn)語(yǔ)不如擼碼實(shí)戰(zhàn)蟋恬,begin!
首先說(shuō)明此篇文章只是重要部分代碼示例度硝,完整代碼,末尾會(huì)有彩蛋涯贞。
netty配置類代碼塊:
創(chuàng)建constant類庭瑰,描述:常量類星持,目前存放的是所有推送類型
/**常量類*/
public class Constant {
//測(cè)試推送1
public static final int PUSH_TEST_ONE=1;
//測(cè)試推送2
public static final int PUSH_TEST_TWO=2;
}
創(chuàng)建ChildChannelHandler類,描述:初始化SokcetChannel(socket通道)弹灭,添加socket通道到pipeline(管道),注冊(cè)到WorkGroup(負(fù)責(zé)讀寫(xiě)的nioEventLoop中)督暂,注冊(cè)成功后揪垄,會(huì)調(diào)用pipeline的channelRegistered方法,將registered事件在pipeline上傳播逻翁。
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
public class ChildChannelHandler extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
/**通過(guò)管道饥努,添加handler,HttpServerCodec是由netty自己提供的助手類,可以理解為攔截器八回,當(dāng)請(qǐng)求到服務(wù)端酷愧,我們需要做解碼,響應(yīng)到客戶端做解碼*/
ch.pipeline().addLast("http-codec",new HttpServerCodec());
ch.pipeline().addLast("aggregator",new HttpObjectAggregator(65536));
ch.pipeline().addLast(new WebSocketServerProtocolHandler("/websocket"));
ch.pipeline().addLast("http-chunked",new ChunkedWriteHandler());
//添加自定義的助手類
ch.pipeline().addLast("handler",new NettyServerHandler());
}
}
創(chuàng)建nettyCache類缠诅,描述:處理所有類型的通道保存類
import com.alibaba.fastjson.JSONObject;
import com.king.common.Constant;
import io.netty.channel.ChannelId;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.apache.commons.codec.digest.DigestUtils;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**所有類型的通道保存類*/
@Component
public class NettyCache {
public static Map<Integer,ChannelGroup> channels = new HashMap<Integer,ChannelGroup>();
public static Map<ChannelId,JSONObject> channelMessage = new ConcurrentHashMap<ChannelId,JSONObject>();
public static ChannelGroup defaultGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@PostConstruct
public void initAllChannel(){
String[] pushTypes = getAllPushType();
for (int i=0;i<pushTypes.length;i++){
channels.put(Integer.valueOf(pushTypes[i]),new DefaultChannelGroup(GlobalEventExecutor.INSTANCE));
}
}
/**
* 獲取所有推送的類型
*/
private String[] getAllPushType(){
Field[] fields = Constant.class.getDeclaredFields();
String[] allType = null;
List<String> pushType = new ArrayList<String>();
for(int i = 0;i<fields.length;i++){
String fieldName = fields[i].getName();
if(fieldName.startsWith("PUSH_")){//
try {
Object s = Constant.class.getField(fieldName).get(null);
pushType.add(s.toString());
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (NoSuchFieldException e) {
e.printStackTrace();
}
}
}
allType = pushType.toArray(new String[pushType.size()]);
return allType;
}
}
創(chuàng)建nettyServer類溶浴,描述:處理netty服務(wù)初始化啟動(dòng),監(jiān)聽(tīng)socket等操作管引。
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import com.king.common.CommonConfig;
import com.king.util.ApplicationContextHelper;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
@Service
public class NettyServer {
private static Logger log=LoggerFactory.getLogger(NettyServer.class);
@PostConstruct
public void initServer() {
new Thread() {
public void run() {
new NettyServer().run();
}
}.start();
}
public void run() {
//定義一對(duì)線程組
//主線程組士败,用于接收客戶端的連接,但是不做任何處理汉匙,跟老板一樣不做事
EventLoopGroup bossGroup=new NioEventLoopGroup();
//從線程組拱烁,老板線程組會(huì)把任務(wù)丟給它,讓手下線程組去做任務(wù)
EventLoopGroup workGroup=new NioEventLoopGroup();
try {
//netty服務(wù)器的創(chuàng)建噩翠,ServerBootstrap 是一個(gè)啟動(dòng)類
ServerBootstrap socket=new ServerBootstrap();
//設(shè)置主從線程組
socket.group(bossGroup,workGroup);
//設(shè)置nio的雙向通道
socket.channel(NioServerSocketChannel.class);
//子處理器戏自,用于處理workGroup
socket.childHandler(new ChildChannelHandler());
CommonConfig config=(CommonConfig) ApplicationContextHelper.getBean(CommonConfig.class);
//啟動(dòng)server,并綁定端口號(hào)伤锚,同時(shí)啟動(dòng)方式為同步
Channel channel=socket.bind(config.getWebSocketPort()).sync().channel();
//服務(wù)端管道關(guān)閉的監(jiān)聽(tīng)器并同步阻塞擅笔,知道channel關(guān)閉,線程才會(huì)往下執(zhí)行屯援,結(jié)束進(jìn)程猛们。主線程執(zhí)行到這里就wait子線程結(jié)束,子線程才是真正監(jiān)聽(tīng)和接收請(qǐng)求的狞洋。子線程就是netty啟動(dòng)的監(jiān)聽(tīng)端口的線程弯淘。
//即closeFuture()是 開(kāi)啟了一個(gè)channel的監(jiān)聽(tīng)器,負(fù)責(zé)監(jiān)聽(tīng)channel是否關(guān)閉的狀態(tài)吉懊,如果未來(lái)監(jiān)聽(tīng)到channel關(guān)閉了庐橙,子線程才會(huì)釋放,syncUninterruptibly()讓主線程同步等待子線程結(jié)果借嗽。
//補(bǔ)充:channel.close()才是主動(dòng)關(guān)閉通道的方法态鳖。
//監(jiān)聽(tīng)關(guān)閉的channel,設(shè)置為同步方式
channel.closeFuture().sync();
log.info("服務(wù)端啟動(dòng)成功");
} catch (Exception e) {
log.error(e.getMessage(),e);
}finally {
//優(yōu)雅的關(guān)閉線程組
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
創(chuàng)建nettyServerHandler類,描述:處理打開(kāi)關(guān)閉通道恶导,接收消息分配處理等操作
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
/**通道打開(kāi)浆竭,關(guān)閉,接收消息處理類,異常捕獲*/
public class NettyServerHandler extends SimpleChannelInboundHandler<Object>{
private static Logger log=LoggerFactory.getLogger(NettyServerHandler.class);
//捕獲異常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
//打開(kāi)通道
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//連接默認(rèn)添加
NettyCache.defaultGroup.add(ctx.channel());
log.info("客戶端與服務(wù)端連接開(kāi)啟:"+ctx.channel().remoteAddress().toString());
}
//關(guān)閉通道
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//移除通道以及通道對(duì)應(yīng)的消息
if(NettyCache.defaultGroup.contains(ctx.channel())) {
NettyCache.defaultGroup.remove(ctx.channel());
}
if(NettyCache.channelMessage.containsKey(ctx.channel().id())) {
NettyCache.channels.get(NettyCache.channelMessage.get(ctx.channel().id()).getIntValue("pushType"))
.remove(ctx.channel());
}
if(NettyCache.channelMessage.containsKey(ctx.channel().id())) {
NettyCache.channelMessage.remove(ctx.channel().id());
log.info("客戶端與服務(wù)端連接關(guān)閉:"+ctx.channel().remoteAddress().toString());
}
}
//通道讀取數(shù)據(jù)完成
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
//接收消息
@Override
protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
if(msg instanceof WebSocketFrame) {
handlerWebocketFrame(ctx, (WebSocketFrame) msg);
}
}
//返回應(yīng)答消息
private void handlerWebocketFrame(ChannelHandlerContext ctx,WebSocketFrame frame) {
String requestMessage=((TextWebSocketFrame)frame).text();
JSONObject jsonMessage=JSONObject.parseObject(requestMessage);
log.info("接收到客戶端發(fā)送的消息"+requestMessage);
if(NettyCache.channelMessage.containsKey(ctx.channel().id())) {
NettyCache.channels.get(NettyCache.channelMessage.get(ctx.channel().id()).getIntValue("pushType")).remove(ctx.channel());
}
NettyCache.channels.get(jsonMessage.getIntValue("pushType")).add(ctx.channel());
NettyCache.channelMessage.put(ctx.channel().id(), jsonMessage);
if(NettyCache.defaultGroup.contains(ctx.channel())) {
NettyCache.defaultGroup.remove(ctx.channel());
}
}
}
創(chuàng)建html邦泄,很簡(jiǎn)單一個(gè)頁(yè)面:websocket.html
<html>
<head>
<title>推送消息頁(yè)面</title>
</head>
<body>
<div>
你好删窒!<p th:text="${name}"></p>
</div>
<input id="text" type="text" /><button onclick="send()">Send</button> <button onclick="closeWebSocket()">Close</button>
<div id="message"></div>
<script type="text/javascript">
var websocket = null;
//判斷當(dāng)前瀏覽器是否支持WebSocket
if('WebSocket' in window){
websocket = new WebSocket("ws://localhost:28095/websocket");
}
else{
alert('Not support websocket')
}
//連接發(fā)生錯(cuò)誤的回調(diào)方法
websocket.onerror = function(){
setMessageInnerHTML("error");
};
//連接成功建立的回調(diào)方法
websocket.onopen = function(event){
setMessageInnerHTML("open");
}
//接收到消息的回調(diào)方法
websocket.onmessage = function(event){
setMessageInnerHTML(event.data);
}
//連接關(guān)閉的回調(diào)方法
websocket.onclose = function(){
setMessageInnerHTML("close");
}
//監(jiān)聽(tīng)窗口關(guān)閉事件,當(dāng)窗口關(guān)閉時(shí)顺囊,主動(dòng)去關(guān)閉websocket連接易稠,防止連接還沒(méi)斷開(kāi)就關(guān)閉窗口,server端會(huì)拋異常包蓝。
window.onbeforeunload = function(){
websocket.close();
}
//將消息顯示在網(wǎng)頁(yè)上
function setMessageInnerHTML(innerHTML){
document.getElementById('message').innerHTML += innerHTML + '<br/>';
}
//關(guān)閉連接
function closeWebSocket(){
websocket.close();
}
//發(fā)送消息
function send(){
var zhi = document.getElementById('text').value;
var message={
"pushType":"1",
"message":zhi
}
websocket.send(JSON.stringify(message));
}
</script>
</body>
</html>
好了,到此主要代碼完畢企量,完整代碼测萎,請(qǐng)移步:
https://github.com/wangdonghuihuang/HappyKing
敬請(qǐng)F(tuán)ork跟Star。有問(wèn)題可企鵝群--535296702届巩,群里是一群可愛(ài)的小伙伴硅瞧,探討技術(shù)與吹牛打咖樣樣俱全哦!