springboot整合netty推送

千言萬(wàn)語(yǔ)不如擼碼實(shí)戰(zhàn)蟋恬,begin!

首先說(shuō)明此篇文章只是重要部分代碼示例度硝,完整代碼,末尾會(huì)有彩蛋涯贞。

netty配置類代碼塊:

創(chuàng)建constant類庭瑰,描述:常量類星持,目前存放的是所有推送類型

/**常量類*/
public class Constant {
//測(cè)試推送1
    public static final int PUSH_TEST_ONE=1;
    //測(cè)試推送2
    public static final int PUSH_TEST_TWO=2;
}

創(chuàng)建ChildChannelHandler類,描述:初始化SokcetChannel(socket通道)弹灭,添加socket通道到pipeline(管道),注冊(cè)到WorkGroup(負(fù)責(zé)讀寫(xiě)的nioEventLoop中)督暂,注冊(cè)成功后揪垄,會(huì)調(diào)用pipeline的channelRegistered方法,將registered事件在pipeline上傳播逻翁。

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;

public class ChildChannelHandler extends ChannelInitializer<SocketChannel>{

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
/**通過(guò)管道饥努,添加handler,HttpServerCodec是由netty自己提供的助手類,可以理解為攔截器八回,當(dāng)請(qǐng)求到服務(wù)端酷愧,我們需要做解碼,響應(yīng)到客戶端做解碼*/
        ch.pipeline().addLast("http-codec",new HttpServerCodec());
        ch.pipeline().addLast("aggregator",new HttpObjectAggregator(65536));
        ch.pipeline().addLast(new WebSocketServerProtocolHandler("/websocket"));
        ch.pipeline().addLast("http-chunked",new ChunkedWriteHandler());
//添加自定義的助手類
        ch.pipeline().addLast("handler",new NettyServerHandler());
    }

}

創(chuàng)建nettyCache類缠诅,描述:處理所有類型的通道保存類

import com.alibaba.fastjson.JSONObject;
import com.king.common.Constant;
import io.netty.channel.ChannelId;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.apache.commons.codec.digest.DigestUtils;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**所有類型的通道保存類*/
@Component
public class NettyCache {

    public static Map<Integer,ChannelGroup> channels = new HashMap<Integer,ChannelGroup>();
    public static Map<ChannelId,JSONObject> channelMessage = new ConcurrentHashMap<ChannelId,JSONObject>();
    public static ChannelGroup defaultGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);


    @PostConstruct
    public void initAllChannel(){
        String[] pushTypes = getAllPushType();
        for (int i=0;i<pushTypes.length;i++){
            channels.put(Integer.valueOf(pushTypes[i]),new DefaultChannelGroup(GlobalEventExecutor.INSTANCE));
        }
    }

    /**
     * 獲取所有推送的類型
     */
    private String[] getAllPushType(){
        Field[] fields = Constant.class.getDeclaredFields();
        String[] allType = null;
        List<String> pushType = new ArrayList<String>();
        for(int i = 0;i<fields.length;i++){
            String fieldName = fields[i].getName();
            if(fieldName.startsWith("PUSH_")){//
                try {
                    Object s = Constant.class.getField(fieldName).get(null);
                    pushType.add(s.toString());
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (NoSuchFieldException e) {
                    e.printStackTrace();
                }
            }
        }
        allType = pushType.toArray(new String[pushType.size()]);
        return allType;
    }
}

創(chuàng)建nettyServer類溶浴,描述:處理netty服務(wù)初始化啟動(dòng),監(jiān)聽(tīng)socket等操作管引。

import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import com.king.common.CommonConfig;
import com.king.util.ApplicationContextHelper;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
@Service
public class NettyServer {
    private static Logger log=LoggerFactory.getLogger(NettyServer.class);
@PostConstruct
    public void initServer() {
    new Thread() {
        public void run() {
            new NettyServer().run();
        }
    }.start();
}
public void run() {
//定義一對(duì)線程組
//主線程組士败,用于接收客戶端的連接,但是不做任何處理汉匙,跟老板一樣不做事
    EventLoopGroup bossGroup=new NioEventLoopGroup();
//從線程組拱烁,老板線程組會(huì)把任務(wù)丟給它,讓手下線程組去做任務(wù)
    EventLoopGroup workGroup=new NioEventLoopGroup();
    try {
        //netty服務(wù)器的創(chuàng)建噩翠,ServerBootstrap 是一個(gè)啟動(dòng)類
        ServerBootstrap socket=new ServerBootstrap();
//設(shè)置主從線程組
        socket.group(bossGroup,workGroup);
        //設(shè)置nio的雙向通道
        socket.channel(NioServerSocketChannel.class);
//子處理器戏自,用于處理workGroup
        socket.childHandler(new ChildChannelHandler());
        CommonConfig config=(CommonConfig) ApplicationContextHelper.getBean(CommonConfig.class);
        //啟動(dòng)server,并綁定端口號(hào)伤锚,同時(shí)啟動(dòng)方式為同步
        Channel channel=socket.bind(config.getWebSocketPort()).sync().channel();
        //服務(wù)端管道關(guān)閉的監(jiān)聽(tīng)器并同步阻塞擅笔,知道channel關(guān)閉,線程才會(huì)往下執(zhí)行屯援,結(jié)束進(jìn)程猛们。主線程執(zhí)行到這里就wait子線程結(jié)束,子線程才是真正監(jiān)聽(tīng)和接收請(qǐng)求的狞洋。子線程就是netty啟動(dòng)的監(jiān)聽(tīng)端口的線程弯淘。
        //即closeFuture()是 開(kāi)啟了一個(gè)channel的監(jiān)聽(tīng)器,負(fù)責(zé)監(jiān)聽(tīng)channel是否關(guān)閉的狀態(tài)吉懊,如果未來(lái)監(jiān)聽(tīng)到channel關(guān)閉了庐橙,子線程才會(huì)釋放,syncUninterruptibly()讓主線程同步等待子線程結(jié)果借嗽。
        //補(bǔ)充:channel.close()才是主動(dòng)關(guān)閉通道的方法态鳖。
//監(jiān)聽(tīng)關(guān)閉的channel,設(shè)置為同步方式
        channel.closeFuture().sync();
        log.info("服務(wù)端啟動(dòng)成功");
    } catch (Exception e) {
        log.error(e.getMessage(),e);
    }finally {
//優(yōu)雅的關(guān)閉線程組
        bossGroup.shutdownGracefully();
        workGroup.shutdownGracefully();
    }
}
}

創(chuàng)建nettyServerHandler類,描述:處理打開(kāi)關(guān)閉通道恶导,接收消息分配處理等操作

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
/**通道打開(kāi)浆竭,關(guān)閉,接收消息處理類,異常捕獲*/
public class NettyServerHandler extends SimpleChannelInboundHandler<Object>{
    private static Logger log=LoggerFactory.getLogger(NettyServerHandler.class);
    //捕獲異常
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
        }
    //打開(kāi)通道
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
//連接默認(rèn)添加
        NettyCache.defaultGroup.add(ctx.channel());
        log.info("客戶端與服務(wù)端連接開(kāi)啟:"+ctx.channel().remoteAddress().toString());
    }
    //關(guān)閉通道
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//移除通道以及通道對(duì)應(yīng)的消息
        if(NettyCache.defaultGroup.contains(ctx.channel())) {
            NettyCache.defaultGroup.remove(ctx.channel());
        }
        if(NettyCache.channelMessage.containsKey(ctx.channel().id())) {
            NettyCache.channels.get(NettyCache.channelMessage.get(ctx.channel().id()).getIntValue("pushType"))
            .remove(ctx.channel());
        }
        if(NettyCache.channelMessage.containsKey(ctx.channel().id())) {
            NettyCache.channelMessage.remove(ctx.channel().id());
            
            log.info("客戶端與服務(wù)端連接關(guān)閉:"+ctx.channel().remoteAddress().toString());
        }
    }
    //通道讀取數(shù)據(jù)完成
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
    //接收消息
    @Override
    protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
        if(msg instanceof WebSocketFrame) {
            handlerWebocketFrame(ctx, (WebSocketFrame) msg);
        }
    }
    
    //返回應(yīng)答消息
    private void handlerWebocketFrame(ChannelHandlerContext ctx,WebSocketFrame frame) {
        String requestMessage=((TextWebSocketFrame)frame).text();
        JSONObject jsonMessage=JSONObject.parseObject(requestMessage);
        log.info("接收到客戶端發(fā)送的消息"+requestMessage);
        if(NettyCache.channelMessage.containsKey(ctx.channel().id())) {
            NettyCache.channels.get(NettyCache.channelMessage.get(ctx.channel().id()).getIntValue("pushType")).remove(ctx.channel());
        }
        NettyCache.channels.get(jsonMessage.getIntValue("pushType")).add(ctx.channel());
        NettyCache.channelMessage.put(ctx.channel().id(), jsonMessage);
        if(NettyCache.defaultGroup.contains(ctx.channel())) {
            NettyCache.defaultGroup.remove(ctx.channel());
        }
    }
}

創(chuàng)建html邦泄,很簡(jiǎn)單一個(gè)頁(yè)面:websocket.html

<html>
<head>
<title>推送消息頁(yè)面</title>
</head>
<body>
<div>
你好删窒!<p th:text="${name}"></p>
</div>
<input id="text" type="text" /><button onclick="send()">Send</button>    <button onclick="closeWebSocket()">Close</button>
 <div id="message"></div>
 <script type="text/javascript">
    var websocket = null;
    //判斷當(dāng)前瀏覽器是否支持WebSocket
    if('WebSocket' in window){
        websocket = new WebSocket("ws://localhost:28095/websocket");
    }
    else{
        alert('Not support websocket')
    }
    //連接發(fā)生錯(cuò)誤的回調(diào)方法
    websocket.onerror = function(){
        setMessageInnerHTML("error");
    };
    //連接成功建立的回調(diào)方法
    websocket.onopen = function(event){
        setMessageInnerHTML("open");
    }
    //接收到消息的回調(diào)方法
    websocket.onmessage = function(event){
        setMessageInnerHTML(event.data);
    }
    //連接關(guān)閉的回調(diào)方法
    websocket.onclose = function(){
        setMessageInnerHTML("close");
    }
    //監(jiān)聽(tīng)窗口關(guān)閉事件,當(dāng)窗口關(guān)閉時(shí)顺囊,主動(dòng)去關(guān)閉websocket連接易稠,防止連接還沒(méi)斷開(kāi)就關(guān)閉窗口,server端會(huì)拋異常包蓝。
    window.onbeforeunload = function(){
        websocket.close();
    }
    //將消息顯示在網(wǎng)頁(yè)上
    function setMessageInnerHTML(innerHTML){
        document.getElementById('message').innerHTML += innerHTML + '<br/>';
    }
    //關(guān)閉連接
    function closeWebSocket(){
        websocket.close();
    }
    //發(fā)送消息
    function send(){
        var zhi = document.getElementById('text').value;
        var message={
            "pushType":"1",
            "message":zhi
        }
        websocket.send(JSON.stringify(message));
    }
</script>
</body>
</html>

好了,到此主要代碼完畢企量,完整代碼测萎,請(qǐng)移步:
https://github.com/wangdonghuihuang/HappyKing
敬請(qǐng)F(tuán)ork跟Star。有問(wèn)題可企鵝群--535296702届巩,群里是一群可愛(ài)的小伙伴硅瞧,探討技術(shù)與吹牛打咖樣樣俱全哦!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末恕汇,一起剝皮案震驚了整個(gè)濱河市腕唧,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌瘾英,老刑警劉巖枣接,帶你破解...
    沈念sama閱讀 221,888評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異缺谴,居然都是意外死亡但惶,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,677評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門湿蛔,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)膀曾,“玉大人,你說(shuō)我怎么就攤上這事阳啥√硪辏” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 168,386評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵察迟,是天一觀的道長(zhǎng)斩狱。 經(jīng)常有香客問(wèn)我,道長(zhǎng)卷拘,這世上最難降的妖魔是什么喊废? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,726評(píng)論 1 297
  • 正文 為了忘掉前任,我火速辦了婚禮栗弟,結(jié)果婚禮上污筷,老公的妹妹穿的比我還像新娘。我一直安慰自己,他們只是感情好瓣蛀,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,729評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布陆蟆。 她就那樣靜靜地躺著,像睡著了一般惋增。 火紅的嫁衣襯著肌膚如雪叠殷。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 52,337評(píng)論 1 310
  • 那天诈皿,我揣著相機(jī)與錄音林束,去河邊找鬼。 笑死稽亏,一個(gè)胖子當(dāng)著我的面吹牛壶冒,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播截歉,決...
    沈念sama閱讀 40,902評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼胖腾,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了瘪松?” 一聲冷哼從身側(cè)響起咸作,我...
    開(kāi)封第一講書(shū)人閱讀 39,807評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎宵睦,沒(méi)想到半個(gè)月后记罚,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,349評(píng)論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡壳嚎,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,439評(píng)論 3 340
  • 正文 我和宋清朗相戀三年毫胜,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片诬辈。...
    茶點(diǎn)故事閱讀 40,567評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡酵使,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出焙糟,到底是詐尸還是另有隱情口渔,我是刑警寧澤,帶...
    沈念sama閱讀 36,242評(píng)論 5 350
  • 正文 年R本政府宣布穿撮,位于F島的核電站缺脉,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏悦穿。R本人自食惡果不足惜攻礼,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,933評(píng)論 3 334
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望栗柒。 院中可真熱鬧礁扮,春花似錦、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,420評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至僚焦,卻和暖如春锰提,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背芳悲。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,531評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工立肘, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人名扛。 一個(gè)月前我還...
    沈念sama閱讀 48,995評(píng)論 3 377
  • 正文 我出身青樓赛不,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親罢洲。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,585評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容