消息推送

1 什么是消息推送

很多手機(jī)APP會不定時的給用戶推送消息皇筛,例如一些新聞APP會給用戶推送用戶可能感興趣的新聞救军,或者APP有更新了硼端,會給用戶推送是否選擇更新的消息等等,這就是所謂的“消息推送”醋安。

對于APP或者桌面客戶端這種C/S架構(gòu)的軟件杂彭,實(shí)現(xiàn)消息推送其實(shí)比較簡單,只需要維護(hù)TCP連接就行了吓揪,因?yàn)門CP本身是全雙工的亲怠,客戶端和服務(wù)端都能發(fā)送消息。但Web環(huán)境就不太一樣了柠辞,目前的Web軟件大多數(shù)都是B/S架構(gòu)(即瀏覽器/服務(wù)端)团秽,使用的消息傳輸協(xié)議也大多數(shù)是HTTP,HTTP1.0和HTTP1.1都無法實(shí)現(xiàn)服務(wù)端向客戶端(瀏覽器)主動發(fā)送消息叭首,所以實(shí)現(xiàn)的手段主要就是客戶端定時或者不定時輪詢(例如間隔時間動態(tài)變化)习勤,這種方式實(shí)現(xiàn)并不算復(fù)雜,最大的問題就是性能焙格,輪詢是需要消耗CPU資源的图毕,如果很長一段時間內(nèi),服務(wù)端都沒有消息要給客戶端眷唉,那么這個CPU空輪詢的占比就比較大了予颤,而且輪詢也會對服務(wù)端造成壓力,因?yàn)槿绻?wù)端沒有消息要給客戶端冬阳,那么其實(shí)這樣的“請求-響應(yīng)”是沒有意義的蛤虐,算是服務(wù)端的額外壓力。

可能有朋友難以理解上面所描述的情況肝陪,下面我畫個圖來描述這個問題:

可以從圖中看到驳庭,客戶端老是不斷的孜孜不倦的跑去問服務(wù)端“有沒有新的推送消息”,而且還不長記性氯窍,每次都問同樣的問題(HTTP是無狀態(tài)的協(xié)議)饲常,這事給誰誰都得煩,是吧荞驴,服務(wù)端老被客戶端“騷擾”不皆,所以有時候就會“罷工”不干了!(服務(wù)端壓力過大熊楼,短暫不可用)那怎么解決這個問題呢?也就是說讓服務(wù)端過得舒服一些能犯?

2 解決方案

目前主要有兩種主流的解決方案:

  • 利用新版的HTTP2.0鲫骗,重構(gòu)原有的代碼,因?yàn)镠TTP2.0支持服務(wù)端主動發(fā)送數(shù)據(jù)給客戶端踩晶,但這種方案實(shí)現(xiàn)其實(shí)是比較困難的执泰,一是服務(wù)端實(shí)現(xiàn)起來并不簡單,而是一般也不用來推送大量的數(shù)據(jù)渡蜻,常見的使用場景是請求.html术吝,然后服務(wù)端把HTML作為響應(yīng)給前端计济,并且同時把CSS,JS文件都“推”給前端(在傳統(tǒng)的HTTP中排苍,要拿到這三個東西沦寂,至少需要三次HTTP請求)。本文不討論該方案淘衙。
  • 利用WebSocket協(xié)議传藏,WebSocket是一種在瀏覽器環(huán)境下可以全雙工通信的應(yīng)用層協(xié)議(等會兒會給出簡單介紹)。這種方案就是利用WebSocket的全雙工通信的特性彤守,使得B/S架構(gòu)的軟件也能像C/S架構(gòu)的軟件那樣簡單的實(shí)現(xiàn)消息推送毯侦。本文主要介紹的就是這種解決方案。

3 WebSocket協(xié)議

WebSocket和HTTP,FTP等一樣具垫,都屬于應(yīng)用層協(xié)議侈离,誕生于2008年,與2011年稱為國際標(biāo)準(zhǔn)(說實(shí)話筝蚕,這段時間真的很短霍狰,說明WebSocket確實(shí)解決了一些問題)。下面是維基百科上的定義:

WebSocket是一種在單個TCP連接上進(jìn)行全雙工通信的協(xié)議饰及。WebSocket通信協(xié)議于2011年被IETF定為標(biāo)準(zhǔn)RFC 6455蔗坯,并由RFC7936補(bǔ)充規(guī)范。WebSocket API也被W3C定為標(biāo)準(zhǔn)燎含。

WebSocket使得客戶端和服務(wù)器之間的數(shù)據(jù)交換變得更加簡單宾濒,允許服務(wù)端主動向客戶端推送數(shù)據(jù)。在WebSocket API中屏箍,瀏覽器和服務(wù)器只需要完成一次握手绘梦,兩者之間就直接可以創(chuàng)建持久性的連接,并進(jìn)行雙向數(shù)據(jù)傳輸赴魁。

其優(yōu)點(diǎn)也不少(這里所提到的優(yōu)點(diǎn)是相對于HTTP來說的):

  • 支持全雙工通信卸奉,這對于某些場景尤其重要,例如消息推送颖御。
  • 更好的二進(jìn)制支持榄棵,HTTP本身是文本傳輸協(xié)議,對于二進(jìn)制的數(shù)據(jù)需要特殊處理(不過2.0版本也對二進(jìn)制做了補(bǔ)充)潘拱,而WebSocket定義了二進(jìn)制幀疹鳄,所以傳輸二進(jìn)制數(shù)據(jù)的時候不需要像HTTP那樣特殊處理。
  • 較少的控制開銷芦岂。連接創(chuàng)建后瘪弓,ws客戶端、服務(wù)端進(jìn)行數(shù)據(jù)交換時禽最,協(xié)議控制的數(shù)據(jù)包頭部較小腺怯。在不包含頭部的情況下袱饭,服務(wù)端到客戶端的包頭只有2~10字節(jié)(取決于數(shù)據(jù)包長度),客戶端到服務(wù)端的的話呛占,需要加上額外的4字節(jié)的掩碼虑乖。而HTTP協(xié)議每次通信都需要攜帶完整的頭部。
  • ...............

那WebSocket連接是如何建立的呢栓票?答案是:握手協(xié)議决左。

在真正建立WebSocket之前,會先建立一個HTTP連接走贪,然后服務(wù)端響應(yīng)狀態(tài)碼101佛猛,表示切換協(xié)議,之后通信協(xié)議會升級成WebSocket坠狡,這樣WebSocket連接才算是建立起來继找。下面是一個WebSocket握手的示例:

客戶端請求:

GET / HTTP/1.1
#Upgrade就表示要把協(xié)議升級成WebSocket
Upgrade: websocket
Connection: Upgrade
Host: example.com
Origin: http://example.com
Sec-WebSocket-Key: sN9cRrP/n9NdMgdcy2VJFQ==
Sec-WebSocket-Version: 13

服務(wù)端響應(yīng):

#客戶端收到該HTTP報文之后,會將通信協(xié)議升級成WebSocket逃沿,之后的數(shù)據(jù)傳輸就都使用WebSocket了
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: fFBooB7FAkLlXgRSz0BT3v4hq5s=
Sec-WebSocket-Location: ws://example.com/

這就是所謂的握手了(雙方建立友好關(guān)系)婴渡。

4 利用Netty + WebSocket實(shí)現(xiàn)消息推送

在動手之前,先聲明一點(diǎn):本文不會介紹WebSocket的簡單使用凯亮,因?yàn)楸疚牡臉?biāo)題是“消息推送”边臼,而不是“WebSocket入門”,Netty也是同理假消。

首先柠并,我們先確定一些設(shè)計方案:

  1. 客戶端和服務(wù)端使用WebSocket作為通信協(xié)議,當(dāng)服務(wù)端有新的推送消息的時候富拗,主動把消息“推”給客戶端臼予。
  2. Netty作為網(wǎng)絡(luò)通信的基礎(chǔ)框架。
  3. 服務(wù)端監(jiān)聽消息隊(duì)列啃沪,當(dāng)消息隊(duì)列中有新的消息時粘拾,把消息發(fā)送給客戶端。
  4. 可以還有另外一個專門往消息隊(duì)列里放入消息的服務(wù)创千,至此整個系統(tǒng)就形成一個完整的消息推送系統(tǒng)了缰雇。

大致了解了方案之后,可以著手實(shí)現(xiàn)了签餐,先來看看服務(wù)端的實(shí)現(xiàn):

服務(wù)啟動類:

public class WebSocketServer {

    //RabbitMQ客戶端連接工廠
    private static ConnectionFactory connectionFactory = new ConnectionFactory();

    //客戶端連接
    private static Connection connection;

    //客戶端Channel
    private static com.rabbitmq.client.Channel channel;

    //Jackson寓涨,序列化用的
    private static ObjectMapper objectMapper = new ObjectMapper();


    public static void main(String[] args) {
        //初始化ServerBootstrap
        ServerBootstrap server = new ServerBootstrap();
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();

        server.group(boss, worker)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                        ch.pipeline().addLast(new HttpServerCodec());
                        ch.pipeline().addLast(new HttpObjectAggregator(16 * 16 * 1024));
                        ch.pipeline().addLast(new ChunkedWriteHandler());
                        ch.pipeline().addLast(new WebSocketServerHandler());
                    }
                });

        //初始化RabbitMQ的配置
        connectionFactory.setHost("xxx.xxx.xxx.xxx");
        connectionFactory.setUsername("xxx");
        connectionFactory.setPassword("xxx");

        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            channel.queueDeclare(RabbitMQConfig.PUSH_MSG_QUEUE, false, false, true, null);
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }

        //服務(wù)端綁定8081端口
        server.bind(8081).syncUninterruptibly().addListener(future -> {
            if (future.isSuccess()) {
                System.out.println("綁定成功");
                startPushMessage();
                startMQListener();
            } else {
                System.out.println("綁定失敗");
            }
        });


    }

    //開啟消息隊(duì)列的監(jiān)聽,當(dāng)有消息的時候氯檐,就把消息推送給客戶端
    private static void startMQListener() {
        new Thread(() -> {
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    //這里就比較粗暴的獲取ChannleGroup了,建議讀者嘗試的時候用更好的方法
                    ChannelGroup group = WebSocketServerHandler.group;
                    if (group != null) {
                    
                        group.writeAndFlush(new TextWebSocketFrame(message));
                    }
                }


            };
            System.out.println("開始監(jiān)聽");
            try {
                channel.basicConsume(RabbitMQConfig.PUSH_MSG_QUEUE, true, consumer);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
    }

    private static final AtomicLong id = new AtomicLong(0);

    //隨機(jī)生成一個消息
    private static Notify generateNotify() {
        Notify notify = new Notify();
        notify.setId(id.getAndIncrement());
        notify.setTitle(UUID.randomUUID().toString());
        StringBuilder builder = new StringBuilder();
        for (int i = 0; i < 500; i++) {
            builder.append(getRandomChar());
        }
        notify.setContent(builder.toString());
        notify.setPushTime(new Date());
        return notify;
    }

    private static char getRandomChar() {
        String str = "";
        int hightPos; //
        int lowPos;

        Random random = new Random();

        hightPos = (176 + Math.abs(random.nextInt(39)));
        lowPos = (161 + Math.abs(random.nextInt(93)));

        byte[] b = new byte[2];
        b[0] = (Integer.valueOf(hightPos)).byteValue();
        b[1] = (Integer.valueOf(lowPos)).byteValue();

        try {
            str = new String(b, "GBK");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
            System.out.println("錯誤");
        }

        return str.charAt(0);
    }


    //這里我不另外寫專門的生產(chǎn)消息的服務(wù)了体捏,直接定時的往消息隊(duì)列里放入消息
    private static void startPushMessage() {
        ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
        service.scheduleAtFixedRate(() -> {
            try {
                Notify notify = generateNotify();
                String message = objectMapper.writeValueAsString(notify);
                channel.basicPublish("", RabbitMQConfig.PUSH_MSG_QUEUE, null, message.getBytes());
            } catch (IOException e) {
                e.printStackTrace();
            }
        }, 5, 5, TimeUnit.SECONDS);
    }
}

WebSocketHandler類:

public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {

    //WebSocket握手
    private static WebSocketServerHandshaker handshaker;

    //客戶端的群組
    public static ChannelGroup group;

    //客戶端在線人數(shù)
    private static AtomicLong onlineCount = new AtomicLong(0);

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        onlineCount.incrementAndGet();
        System.out.println("有用戶上線冠摄,當(dāng)前在線人數(shù)是: " + onlineCount.get());
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        //第一次請求肯定是HTTP請求糯崎,所以先去處理HTTP請求,在該處理方法里做WebSocket握手的操作
        if (msg instanceof FullHttpRequest) {
            handlerHttpRequest(ctx, (FullHttpRequest)msg);
        }

        //能到這河泳,肯定是連接成功了的
        if (onlineCount.get() == 1) {
            //創(chuàng)建群組
            group = new DefaultChannelGroup(ctx.executor());
            group.add(ctx.channel());
        } else {
            group.add(ctx.channel());
        }

        //之后的請求就都是WebSocket幀了沃呢,不過對于我們的系統(tǒng)來說,這倒不是特別主要的
        //之所以還要處理拆挥,是為了處理客戶端主動關(guān)閉連接的情況以及維持心跳
        if (msg instanceof WebSocketFrame) {
            handlerWebSocketFrame(ctx, (WebSocketFrame)msg);
        }
    }



    private void handlerHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) {
        //如果request解析失敗或者upgrade不是websocket薄霜,那么就直接發(fā)送BAD_REQUEST狀態(tài)即可
        if (!request.decoderResult().isSuccess()
                || !"websocket".equals(request.headers().get("upgrade"))) {
            try {
                sendHttpResponse(ctx, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
            return;
        }

        //如果一切正常,那么就開始進(jìn)行WebSocket握手
        WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory("ws://localhost:8081/ws", null, false);
        handshaker = factory.newHandshaker(request);
        if (handshaker == null) {
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
        } else {
            //這里就是握手操作了
            handshaker.handshake(ctx.channel(), request);
        }
    }

    private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest request,
                                         FullHttpResponse response) throws UnsupportedEncodingException {
        if (response.status().code() != 200) {
            ByteBuf buf = ctx.alloc().buffer();
            buf.writeBytes("發(fā)生錯誤哦".getBytes("utf-8"));
            response.content().writeBytes(buf);
            buf.release();
        }

        ChannelFuture future = ctx.channel().writeAndFlush(response);
        if (request.headers().get("Keep-Alive") == null) {
            future.addListener(ChannelFutureListener.CLOSE);
        }
    }

    
    private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
        if (frame instanceof CloseWebSocketFrame) {
            //如果該幀是CloseWebSocketFrame類型的纸兔,也就是說客戶端主動關(guān)閉連接
            //那么就做相應(yīng)的處理
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            group.remove(ctx.channel());
            onlineCount.decrementAndGet();
            System.out.println("有用戶下線惰瓜,當(dāng)前在線人數(shù)是: " + onlineCount.get());
            return;
        }

        //WebSocket的客戶段會發(fā)送心跳數(shù)據(jù)包,返回PongWebSocketFrame就行了
        if (frame instanceof PingWebSocketFrame) {
            ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
            return;
        }

        //本系統(tǒng)只支持本文數(shù)據(jù)
        if (!(frame instanceof TextWebSocketFrame)) {
            throw new UnsupportedOperationException("不支持該類型消息");

        }
        //向所有在線的用戶發(fā)送消息
        group.writeAndFlush(new TextWebSocketFrame(LocalDateTime.now().toString()));
    }
}

不知道各位注意到?jīng)]有汉矿,我們的系統(tǒng)中不存在WebSocketFrame的編解碼器崎坊,熟悉Netty的朋友應(yīng)該知道,如果真的沒有WebSocketFrame的編解碼器的話洲拇,我們的系統(tǒng)是無法處理WebSocket傳輸?shù)臄?shù)據(jù)的奈揍。其實(shí)Netty在進(jìn)行WebSocket握手的時候,就自動的幫我們添加了編解碼器赋续,如下是handshaker.handshake(ctx.channel(), request)的源碼:

public ChannelFuture handshake(Channel channel, FullHttpRequest req) {
    return handshake(channel, req, null, channel.newPromise());
}

public final ChannelFuture handshake(Channel channel, FullHttpRequest req,
                                        HttpHeaders responseHeaders, final ChannelPromise promise) {

    if (logger.isDebugEnabled()) {
        logger.debug("{} WebSocket version {} server handshake", channel, version());
    }
    FullHttpResponse response = newHandshakeResponse(req, responseHeaders);
    ChannelPipeline p = channel.pipeline();
    if (p.get(HttpObjectAggregator.class) != null) {
        p.remove(HttpObjectAggregator.class);
    }
    if (p.get(HttpContentCompressor.class) != null) {
        p.remove(HttpContentCompressor.class);
    }
    ChannelHandlerContext ctx = p.context(HttpRequestDecoder.class);
    final String encoderName;
    if (ctx == null) {
        // this means the user use a HttpServerCodec
        ctx = p.context(HttpServerCodec.class);
        if (ctx == null) {
            promise.setFailure(
                    new IllegalStateException("No HttpDecoder and no HttpServerCodec in the pipeline"));
            return promise;
        }
        //就是這里了男翰,加入默認(rèn)的編解碼器
        p.addBefore(ctx.name(), "wsdecoder", newWebsocketDecoder());
        p.addBefore(ctx.name(), "wsencoder", newWebSocketEncoder());
        encoderName = ctx.name();
    } else {
        p.replace(ctx.name(), "wsdecoder", newWebsocketDecoder());

        encoderName = p.context(HttpResponseEncoder.class).name();
        p.addBefore(encoderName, "wsencoder", newWebSocketEncoder());
    }
    channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                ChannelPipeline p = future.channel().pipeline();
                p.remove(encoderName);
                promise.setSuccess();
            } else {
                promise.setFailure(future.cause());
            }
        }
    });
    return promise;
}

服務(wù)端完事了,接下來看看客戶端的代碼纽乱,其實(shí)就是前端代碼了(代碼是我網(wǎng)上直接抄的):

<!DOCTYPE html>
<html lang="en">
    <head>
        <meta charset="UTF-8">
        <title>WebSocket測試</title>
    </head>
    <body>
        <h1>WebSocket測試</h1>
        <div id="context"></div>
        <script src="https://code.jquery.com/jquery-3.3.1.min.js"></script>
        <script type="application/javascript">
            var websocket = null;

            // 判斷當(dāng)前瀏覽器是否支持WebSocket
            if ('WebSocket' in window) {
                // 創(chuàng)建WebSocket 對象蛾绎,連接服務(wù)器端點(diǎn)
                websocket = new WebSocket("ws://localhost:8081/ws");
            } else {
                alert('您的瀏覽器不支持websocket');
            }
 
            // 連接發(fā)生錯誤的回調(diào)方法
            websocket.onerror = function() {
                appendMessage ("WebSocket連接失敗");
            }
 
            // 連接成功建立的回調(diào)方法
            websocket.onopen = function(event) {
                appendMessage ("WebSocket連接成功");
            }
 
            // 接收到消息的回調(diào)方法
            websocket.onmessage = function (event) {
                console.log("收到消息")
                jsonObject = JSON.parse(event.data)
                console.log(jsonObject)
                appendMessage(jsonObject.title);
            }
 
            websocket.onclose = function() {
                appendMessage("關(guān)閉連接");
            }
 
            websocket.onbeforeupload = function() {
                websocket.close();
            }
 
            function appendMessage(message) {
                var context = $('#context').html() + '<br>' + message;
                $('#context').html(context);
            }
 
            function closeWebSocket() {
                websocket.close();
            }
 
            function sendMessage() {
                var message = $('#message').val();
                websocket.send(message);
            }
        </script>
    </body>
</html>

接下來啟動服務(wù),然后直接打開該文件迫淹,應(yīng)該就能看到效果了秘通,如下所示:

你也可以多打開幾個客戶端試試,會發(fā)現(xiàn)消息會傳遞給每個客戶端了敛熬,而且這期間不存在什么客戶端主動請求的情況肺稀,即對于客戶端來說,這些個消息就好像“天上掉下來”的一樣应民,這就簡單實(shí)現(xiàn)了消息推送系統(tǒng)话原,現(xiàn)在再來看看客戶端和服務(wù)端的通信情況:

對比上面的那張圖,是不是覺得更加“清爽”了诲锹?現(xiàn)在不再需要在客戶端不斷去輪詢繁仁,去騷擾服務(wù)端了,當(dāng)有新的推送消息的時候归园,服務(wù)端就主動的把消息“推”給客戶端了黄虱,這樣服務(wù)端的壓力也減少了很多,客戶端的CPU也不用一直做沒有意義的事了庸诱。

5 Spring Boot + WebSocket實(shí)現(xiàn)消息推送

我還想介紹一種實(shí)現(xiàn)方案捻浦,這種方案相較于Netty的實(shí)現(xiàn)更加的簡單晤揣,那就是利用Spring 對WebSocket的支持來實(shí)現(xiàn)。就不多說廢話了朱灿,直接來看實(shí)現(xiàn)吧:

建立好Spring Boot項(xiàng)目之后昧识,加入如下依賴(maven):

<dependency>
    <!--RabbitMQ的支持-->
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <!--web mvc的支持-->
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <!--websocket的支持-->
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

之后來做兩個配置,一是配置RabbitMQ盗扒,而是配置WebSocket:

//RabbitMQ配置
@Configuration
public class RabbitMQConfig {

    //三個分別是隊(duì)列跪楞,交換器以及路由鍵
    public static final String PUSH_MSG_EXCHANGE = "push_msg_exchange";

    public static final String PUSH_MSG_QUEUE = "push_msg_queue";

    public static final String PUSH_MSG_ROUTE_KEY = "push_msg.direct";

    @Bean
    public DirectExchange pushMsgExchange() {
        return new DirectExchange(PUSH_MSG_EXCHANGE, true, true);
    }

    @Bean
    public Queue pushMsgQueue() {
        return new Queue(PUSH_MSG_QUEUE, true, false, true);
    }

    //將隊(duì)列和交換器綁定
    @Bean
    public Binding pushMsgBinding() {
        return BindingBuilder.bind(pushMsgQueue()).to(pushMsgExchange()).with(PUSH_MSG_ROUTE_KEY);
    }
}

//WebSocket配置
@Configuration
public class WebSocketConfig {

    //只需要配置ServerEndpointExporter這個Bean就行了
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

接下來就是實(shí)現(xiàn)類了:

@ServerEndpoint("/ws")
@Service
public class WebSocketService {

    private static Set<WebSocketService> webSocketServiceSet = new CopyOnWriteArraySet<>();

    private Session session;

    private static AtomicLong onlineCount = new AtomicLong(0);

    @OnOpen
    public void onOpen(Session session) {
        this.session = session;
        onlineCount.incrementAndGet();
        webSocketServiceSet.add(this);
        System.out.println("有用戶上線,當(dāng)前在線人數(shù)有:" + onlineCount.get());
    }

    @OnClose
    public void onClose() {
        webSocketServiceSet.remove(this);
        onlineCount.decrementAndGet();
        System.out.println("有用戶下線侣灶,當(dāng)前在線人數(shù)有: " + onlineCount.get());
    }

    @OnMessage
    public void onMessage(Session session, String message) {
        System.out.println("來自客戶端的消息甸祭,客戶端IP:PORT是 : ");
        System.out.println(session.getRequestURI().getHost() + ":" + session.getRequestURI().getPort());
        System.out.println("消息是: " + message);
    }

    @OnError
    public void onError(Throwable throwable) {
        System.out.println("服務(wù)端異常");
        throwable.printStackTrace();
    }

    private void sendMessage(String message) {
        try {
            this.session.getBasicRemote().sendText(message);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @RabbitListener(queues = RabbitMQConfig.PUSH_MSG_QUEUE)
    public void ListenMessageFromMQ(String message) {
        for (WebSocketService webSocketService : webSocketServiceSet) {
            //消息推送了,向每個在線客戶端發(fā)送消息
            webSocketService.sendMessage(message);
        }
    }
}

對于每個客戶端炫隶,都會有一個與之對應(yīng)的WebSocketService實(shí)例對象以及Session淋叶,就好像Netty里的Channel一樣(只是一個比喻,并不等同)伪阶,所以用一個靜態(tài)的Set來保存這些實(shí)例對象煞檩,當(dāng)需要發(fā)送消息的時候,直接取出來栅贴,調(diào)用Session的發(fā)送消息的方法就行了斟湃。

好了,就是那么簡單檐薯,幾個注解@OnOpen凝赛,@OnMessage,@OnClose坛缕,@OnError墓猎,聽名字應(yīng)該就知道啥意思了吧,不多說了赚楚,相比于Netty的實(shí)現(xiàn)毙沾,簡單的太多了。不過最后還差一點(diǎn)宠页,消息從哪來呢左胞?和之前一樣,我這里開啟定時任務(wù)举户,定時的往消息隊(duì)列里塞消息就行了烤宙,如下所示:

@Component
public class MyTask {


    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Scheduled(cron = "*/5 * * * * ?")
    public void sendMessageToMQ() {
        //這里的消息我就隱編碼了,實(shí)際上可以有多種方式來構(gòu)造消息
        String message = "Hello, Websocket!!!";
        rabbitTemplate.convertAndSend(RabbitMQConfig.PUSH_MSG_EXCHANGE,
                                    RabbitMQConfig.PUSH_MSG_ROUTE_KEY,
                                    message);
    }
}

最后俭嘁,別忘了應(yīng)用主類上加入@EnableScheduling躺枕,否則定時任務(wù)不會生效。這里的實(shí)現(xiàn)效果和之前的實(shí)現(xiàn)幾乎一樣,客戶端也不需要修改什么屯远,就不多說了蔓姚。

6 小結(jié)

消息推送系統(tǒng)是一個用途廣泛的系統(tǒng)捕虽,本文簡單介紹了兩種實(shí)現(xiàn)方法慨丐,分別是Netty+WebSocket和Spring Boot+WebSocket,后者其實(shí)是基于Servlet實(shí)現(xiàn)的泄私,所以性能上和Netty還是有一些差異的房揭。不過無論哪種實(shí)現(xiàn)吧,最核心的部分還是WebSocket協(xié)議晌端,

有些代碼寫的不太合理捅暴,望諒解。

7 參考資料

《Netty 權(quán)威指南》WebSocket相關(guān)章節(jié)

Spring Boot 使用 WebSocket 實(shí)現(xiàn)消息推送 及 WebSocket原理

WebSocket 教程

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末咧纠,一起剝皮案震驚了整個濱河市蓬痒,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌漆羔,老刑警劉巖梧奢,帶你破解...
    沈念sama閱讀 206,126評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異演痒,居然都是意外死亡亲轨,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評論 2 382
  • 文/潘曉璐 我一進(jìn)店門鸟顺,熙熙樓的掌柜王于貴愁眉苦臉地迎上來惦蚊,“玉大人,你說我怎么就攤上這事讯嫂”姆妫” “怎么了?”我有些...
    開封第一講書人閱讀 152,445評論 0 341
  • 文/不壞的土叔 我叫張陵欧芽,是天一觀的道長莉掂。 經(jīng)常有香客問我,道長渐裸,這世上最難降的妖魔是什么巫湘? 我笑而不...
    開封第一講書人閱讀 55,185評論 1 278
  • 正文 為了忘掉前任,我火速辦了婚禮昏鹃,結(jié)果婚禮上尚氛,老公的妹妹穿的比我還像新娘。我一直安慰自己洞渤,他們只是感情好阅嘶,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,178評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著,像睡著了一般讯柔。 火紅的嫁衣襯著肌膚如雪抡蛙。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 48,970評論 1 284
  • 那天魂迄,我揣著相機(jī)與錄音粗截,去河邊找鬼。 笑死捣炬,一個胖子當(dāng)著我的面吹牛熊昌,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播湿酸,決...
    沈念sama閱讀 38,276評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼婿屹,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了推溃?” 一聲冷哼從身側(cè)響起昂利,我...
    開封第一講書人閱讀 36,927評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎铁坎,沒想到半個月后蜂奸,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,400評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡厢呵,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,883評論 2 323
  • 正文 我和宋清朗相戀三年窝撵,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片襟铭。...
    茶點(diǎn)故事閱讀 37,997評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡碌奉,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出寒砖,到底是詐尸還是另有隱情赐劣,我是刑警寧澤,帶...
    沈念sama閱讀 33,646評論 4 322
  • 正文 年R本政府宣布哩都,位于F島的核電站魁兼,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏漠嵌。R本人自食惡果不足惜咐汞,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,213評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望儒鹿。 院中可真熱鬧化撕,春花似錦、人聲如沸约炎。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至掠手,卻和暖如春憾朴,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背喷鸽。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評論 1 260
  • 我被黑心中介騙來泰國打工众雷, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人魁衙。 一個月前我還...
    沈念sama閱讀 45,423評論 2 352
  • 正文 我出身青樓报腔,卻偏偏與公主長得像,于是被迫代替她去往敵國和親剖淀。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,722評論 2 345

推薦閱讀更多精彩內(nèi)容

  • 今天我將為大家介紹常用的幾種消息推送:JMS,MQTT,XMPP,WebSocket,AMQP,友盟纤房,環(huán)信纵隔。 首先...
    馬小悅閱讀 2,086評論 0 7
  • 本篇結(jié)構(gòu): 背景 HTTP協(xié)議特點(diǎn) 消息推送方案 Websocket簡介 Websocket實(shí)例 一、背景 HTT...
    w1992wishes閱讀 18,262評論 3 6
  • 主題:引導(dǎo)小孩說出在外面發(fā)生的事情 今天早上跟爸爸出去玩了炮姨,回來的時候我問她出去做了一些什么捌刮? 星:吃東西了 我:...
    星之媽媽閱讀 240評論 0 1
  • 謙讓是種美德,可是很多人都沒有舒岸。 雪地狹窄绅作,狹路相逢。走一路都是我讓路蛾派。沒有人謙讓俄认。 出了家門,對于很多人而言洪乍,就...
    神奇魔刀小姐姐閱讀 223評論 0 0
  • 陽光會抹去這窗上的冰花 就連黎明醒來眯杏,都是如此的罪惡 而我,也終會爬上房頂 十月的黎明壳澳,是這濃密的霧色 把眼睛和山...
    羊與遠(yuǎn)方閱讀 426評論 0 1