1 什么是消息推送
很多手機(jī)APP會不定時的給用戶推送消息皇筛,例如一些新聞APP會給用戶推送用戶可能感興趣的新聞救军,或者APP有更新了硼端,會給用戶推送是否選擇更新的消息等等,這就是所謂的“消息推送”醋安。
對于APP或者桌面客戶端這種C/S架構(gòu)的軟件杂彭,實(shí)現(xiàn)消息推送其實(shí)比較簡單,只需要維護(hù)TCP連接就行了吓揪,因?yàn)門CP本身是全雙工的亲怠,客戶端和服務(wù)端都能發(fā)送消息。但Web環(huán)境就不太一樣了柠辞,目前的Web軟件大多數(shù)都是B/S架構(gòu)(即瀏覽器/服務(wù)端)团秽,使用的消息傳輸協(xié)議也大多數(shù)是HTTP,HTTP1.0和HTTP1.1都無法實(shí)現(xiàn)服務(wù)端向客戶端(瀏覽器)主動發(fā)送消息叭首,所以實(shí)現(xiàn)的手段主要就是客戶端定時或者不定時輪詢(例如間隔時間動態(tài)變化)习勤,這種方式實(shí)現(xiàn)并不算復(fù)雜,最大的問題就是性能焙格,輪詢是需要消耗CPU資源的图毕,如果很長一段時間內(nèi),服務(wù)端都沒有消息要給客戶端眷唉,那么這個CPU空輪詢的占比就比較大了予颤,而且輪詢也會對服務(wù)端造成壓力,因?yàn)槿绻?wù)端沒有消息要給客戶端冬阳,那么其實(shí)這樣的“請求-響應(yīng)”是沒有意義的蛤虐,算是服務(wù)端的額外壓力。
可能有朋友難以理解上面所描述的情況肝陪,下面我畫個圖來描述這個問題:
可以從圖中看到驳庭,客戶端老是不斷的孜孜不倦的跑去問服務(wù)端“有沒有新的推送消息”,而且還不長記性氯窍,每次都問同樣的問題(HTTP是無狀態(tài)的協(xié)議)饲常,這事給誰誰都得煩,是吧荞驴,服務(wù)端老被客戶端“騷擾”不皆,所以有時候就會“罷工”不干了!(服務(wù)端壓力過大熊楼,短暫不可用)那怎么解決這個問題呢?也就是說讓服務(wù)端過得舒服一些能犯?
2 解決方案
目前主要有兩種主流的解決方案:
- 利用新版的HTTP2.0鲫骗,重構(gòu)原有的代碼,因?yàn)镠TTP2.0支持服務(wù)端主動發(fā)送數(shù)據(jù)給客戶端踩晶,但這種方案實(shí)現(xiàn)其實(shí)是比較困難的执泰,一是服務(wù)端實(shí)現(xiàn)起來并不簡單,而是一般也不用來推送大量的數(shù)據(jù)渡蜻,常見的使用場景是請求.html术吝,然后服務(wù)端把HTML作為響應(yīng)給前端计济,并且同時把CSS,JS文件都“推”給前端(在傳統(tǒng)的HTTP中排苍,要拿到這三個東西沦寂,至少需要三次HTTP請求)。本文不討論該方案淘衙。
- 利用WebSocket協(xié)議传藏,WebSocket是一種在瀏覽器環(huán)境下可以全雙工通信的應(yīng)用層協(xié)議(等會兒會給出簡單介紹)。這種方案就是利用WebSocket的全雙工通信的特性彤守,使得B/S架構(gòu)的軟件也能像C/S架構(gòu)的軟件那樣簡單的實(shí)現(xiàn)消息推送毯侦。本文主要介紹的就是這種解決方案。
3 WebSocket協(xié)議
WebSocket和HTTP,FTP等一樣具垫,都屬于應(yīng)用層協(xié)議侈离,誕生于2008年,與2011年稱為國際標(biāo)準(zhǔn)(說實(shí)話筝蚕,這段時間真的很短霍狰,說明WebSocket確實(shí)解決了一些問題)。下面是維基百科上的定義:
WebSocket是一種在單個TCP連接上進(jìn)行全雙工通信的協(xié)議饰及。WebSocket通信協(xié)議于2011年被IETF定為標(biāo)準(zhǔn)RFC 6455蔗坯,并由RFC7936補(bǔ)充規(guī)范。WebSocket API也被W3C定為標(biāo)準(zhǔn)燎含。
WebSocket使得客戶端和服務(wù)器之間的數(shù)據(jù)交換變得更加簡單宾濒,允許服務(wù)端主動向客戶端推送數(shù)據(jù)。在WebSocket API中屏箍,瀏覽器和服務(wù)器只需要完成一次握手绘梦,兩者之間就直接可以創(chuàng)建持久性的連接,并進(jìn)行雙向數(shù)據(jù)傳輸赴魁。
其優(yōu)點(diǎn)也不少(這里所提到的優(yōu)點(diǎn)是相對于HTTP來說的):
- 支持全雙工通信卸奉,這對于某些場景尤其重要,例如消息推送颖御。
- 更好的二進(jìn)制支持榄棵,HTTP本身是文本傳輸協(xié)議,對于二進(jìn)制的數(shù)據(jù)需要特殊處理(不過2.0版本也對二進(jìn)制做了補(bǔ)充)潘拱,而WebSocket定義了二進(jìn)制幀疹鳄,所以傳輸二進(jìn)制數(shù)據(jù)的時候不需要像HTTP那樣特殊處理。
- 較少的控制開銷芦岂。連接創(chuàng)建后瘪弓,ws客戶端、服務(wù)端進(jìn)行數(shù)據(jù)交換時禽最,協(xié)議控制的數(shù)據(jù)包頭部較小腺怯。在不包含頭部的情況下袱饭,服務(wù)端到客戶端的包頭只有2~10字節(jié)(取決于數(shù)據(jù)包長度),客戶端到服務(wù)端的的話呛占,需要加上額外的4字節(jié)的掩碼虑乖。而HTTP協(xié)議每次通信都需要攜帶完整的頭部。
- ...............
那WebSocket連接是如何建立的呢栓票?答案是:握手協(xié)議决左。
在真正建立WebSocket之前,會先建立一個HTTP連接走贪,然后服務(wù)端響應(yīng)狀態(tài)碼101佛猛,表示切換協(xié)議,之后通信協(xié)議會升級成WebSocket坠狡,這樣WebSocket連接才算是建立起來继找。下面是一個WebSocket握手的示例:
客戶端請求:
GET / HTTP/1.1
#Upgrade就表示要把協(xié)議升級成WebSocket
Upgrade: websocket
Connection: Upgrade
Host: example.com
Origin: http://example.com
Sec-WebSocket-Key: sN9cRrP/n9NdMgdcy2VJFQ==
Sec-WebSocket-Version: 13
服務(wù)端響應(yīng):
#客戶端收到該HTTP報文之后,會將通信協(xié)議升級成WebSocket逃沿,之后的數(shù)據(jù)傳輸就都使用WebSocket了
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: fFBooB7FAkLlXgRSz0BT3v4hq5s=
Sec-WebSocket-Location: ws://example.com/
這就是所謂的握手了(雙方建立友好關(guān)系)婴渡。
4 利用Netty + WebSocket實(shí)現(xiàn)消息推送
在動手之前,先聲明一點(diǎn):本文不會介紹WebSocket的簡單使用凯亮,因?yàn)楸疚牡臉?biāo)題是“消息推送”边臼,而不是“WebSocket入門”,Netty也是同理假消。
首先柠并,我們先確定一些設(shè)計方案:
- 客戶端和服務(wù)端使用WebSocket作為通信協(xié)議,當(dāng)服務(wù)端有新的推送消息的時候富拗,主動把消息“推”給客戶端臼予。
- Netty作為網(wǎng)絡(luò)通信的基礎(chǔ)框架。
- 服務(wù)端監(jiān)聽消息隊(duì)列啃沪,當(dāng)消息隊(duì)列中有新的消息時粘拾,把消息發(fā)送給客戶端。
- 可以還有另外一個專門往消息隊(duì)列里放入消息的服務(wù)创千,至此整個系統(tǒng)就形成一個完整的消息推送系統(tǒng)了缰雇。
大致了解了方案之后,可以著手實(shí)現(xiàn)了签餐,先來看看服務(wù)端的實(shí)現(xiàn):
服務(wù)啟動類:
public class WebSocketServer {
//RabbitMQ客戶端連接工廠
private static ConnectionFactory connectionFactory = new ConnectionFactory();
//客戶端連接
private static Connection connection;
//客戶端Channel
private static com.rabbitmq.client.Channel channel;
//Jackson寓涨,序列化用的
private static ObjectMapper objectMapper = new ObjectMapper();
public static void main(String[] args) {
//初始化ServerBootstrap
ServerBootstrap server = new ServerBootstrap();
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
server.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new HttpObjectAggregator(16 * 16 * 1024));
ch.pipeline().addLast(new ChunkedWriteHandler());
ch.pipeline().addLast(new WebSocketServerHandler());
}
});
//初始化RabbitMQ的配置
connectionFactory.setHost("xxx.xxx.xxx.xxx");
connectionFactory.setUsername("xxx");
connectionFactory.setPassword("xxx");
try {
connection = connectionFactory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(RabbitMQConfig.PUSH_MSG_QUEUE, false, false, true, null);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
//服務(wù)端綁定8081端口
server.bind(8081).syncUninterruptibly().addListener(future -> {
if (future.isSuccess()) {
System.out.println("綁定成功");
startPushMessage();
startMQListener();
} else {
System.out.println("綁定失敗");
}
});
}
//開啟消息隊(duì)列的監(jiān)聽,當(dāng)有消息的時候氯檐,就把消息推送給客戶端
private static void startMQListener() {
new Thread(() -> {
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
//這里就比較粗暴的獲取ChannleGroup了,建議讀者嘗試的時候用更好的方法
ChannelGroup group = WebSocketServerHandler.group;
if (group != null) {
group.writeAndFlush(new TextWebSocketFrame(message));
}
}
};
System.out.println("開始監(jiān)聽");
try {
channel.basicConsume(RabbitMQConfig.PUSH_MSG_QUEUE, true, consumer);
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
private static final AtomicLong id = new AtomicLong(0);
//隨機(jī)生成一個消息
private static Notify generateNotify() {
Notify notify = new Notify();
notify.setId(id.getAndIncrement());
notify.setTitle(UUID.randomUUID().toString());
StringBuilder builder = new StringBuilder();
for (int i = 0; i < 500; i++) {
builder.append(getRandomChar());
}
notify.setContent(builder.toString());
notify.setPushTime(new Date());
return notify;
}
private static char getRandomChar() {
String str = "";
int hightPos; //
int lowPos;
Random random = new Random();
hightPos = (176 + Math.abs(random.nextInt(39)));
lowPos = (161 + Math.abs(random.nextInt(93)));
byte[] b = new byte[2];
b[0] = (Integer.valueOf(hightPos)).byteValue();
b[1] = (Integer.valueOf(lowPos)).byteValue();
try {
str = new String(b, "GBK");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
System.out.println("錯誤");
}
return str.charAt(0);
}
//這里我不另外寫專門的生產(chǎn)消息的服務(wù)了体捏,直接定時的往消息隊(duì)列里放入消息
private static void startPushMessage() {
ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
service.scheduleAtFixedRate(() -> {
try {
Notify notify = generateNotify();
String message = objectMapper.writeValueAsString(notify);
channel.basicPublish("", RabbitMQConfig.PUSH_MSG_QUEUE, null, message.getBytes());
} catch (IOException e) {
e.printStackTrace();
}
}, 5, 5, TimeUnit.SECONDS);
}
}
WebSocketHandler類:
public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
//WebSocket握手
private static WebSocketServerHandshaker handshaker;
//客戶端的群組
public static ChannelGroup group;
//客戶端在線人數(shù)
private static AtomicLong onlineCount = new AtomicLong(0);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
onlineCount.incrementAndGet();
System.out.println("有用戶上線冠摄,當(dāng)前在線人數(shù)是: " + onlineCount.get());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
//第一次請求肯定是HTTP請求糯崎,所以先去處理HTTP請求,在該處理方法里做WebSocket握手的操作
if (msg instanceof FullHttpRequest) {
handlerHttpRequest(ctx, (FullHttpRequest)msg);
}
//能到這河泳,肯定是連接成功了的
if (onlineCount.get() == 1) {
//創(chuàng)建群組
group = new DefaultChannelGroup(ctx.executor());
group.add(ctx.channel());
} else {
group.add(ctx.channel());
}
//之后的請求就都是WebSocket幀了沃呢,不過對于我們的系統(tǒng)來說,這倒不是特別主要的
//之所以還要處理拆挥,是為了處理客戶端主動關(guān)閉連接的情況以及維持心跳
if (msg instanceof WebSocketFrame) {
handlerWebSocketFrame(ctx, (WebSocketFrame)msg);
}
}
private void handlerHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) {
//如果request解析失敗或者upgrade不是websocket薄霜,那么就直接發(fā)送BAD_REQUEST狀態(tài)即可
if (!request.decoderResult().isSuccess()
|| !"websocket".equals(request.headers().get("upgrade"))) {
try {
sendHttpResponse(ctx, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return;
}
//如果一切正常,那么就開始進(jìn)行WebSocket握手
WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory("ws://localhost:8081/ws", null, false);
handshaker = factory.newHandshaker(request);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
//這里就是握手操作了
handshaker.handshake(ctx.channel(), request);
}
}
private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest request,
FullHttpResponse response) throws UnsupportedEncodingException {
if (response.status().code() != 200) {
ByteBuf buf = ctx.alloc().buffer();
buf.writeBytes("發(fā)生錯誤哦".getBytes("utf-8"));
response.content().writeBytes(buf);
buf.release();
}
ChannelFuture future = ctx.channel().writeAndFlush(response);
if (request.headers().get("Keep-Alive") == null) {
future.addListener(ChannelFutureListener.CLOSE);
}
}
private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (frame instanceof CloseWebSocketFrame) {
//如果該幀是CloseWebSocketFrame類型的纸兔,也就是說客戶端主動關(guān)閉連接
//那么就做相應(yīng)的處理
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
group.remove(ctx.channel());
onlineCount.decrementAndGet();
System.out.println("有用戶下線惰瓜,當(dāng)前在線人數(shù)是: " + onlineCount.get());
return;
}
//WebSocket的客戶段會發(fā)送心跳數(shù)據(jù)包,返回PongWebSocketFrame就行了
if (frame instanceof PingWebSocketFrame) {
ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
return;
}
//本系統(tǒng)只支持本文數(shù)據(jù)
if (!(frame instanceof TextWebSocketFrame)) {
throw new UnsupportedOperationException("不支持該類型消息");
}
//向所有在線的用戶發(fā)送消息
group.writeAndFlush(new TextWebSocketFrame(LocalDateTime.now().toString()));
}
}
不知道各位注意到?jīng)]有汉矿,我們的系統(tǒng)中不存在WebSocketFrame的編解碼器崎坊,熟悉Netty的朋友應(yīng)該知道,如果真的沒有WebSocketFrame的編解碼器的話洲拇,我們的系統(tǒng)是無法處理WebSocket傳輸?shù)臄?shù)據(jù)的奈揍。其實(shí)Netty在進(jìn)行WebSocket握手的時候,就自動的幫我們添加了編解碼器赋续,如下是handshaker.handshake(ctx.channel(), request)的源碼:
public ChannelFuture handshake(Channel channel, FullHttpRequest req) {
return handshake(channel, req, null, channel.newPromise());
}
public final ChannelFuture handshake(Channel channel, FullHttpRequest req,
HttpHeaders responseHeaders, final ChannelPromise promise) {
if (logger.isDebugEnabled()) {
logger.debug("{} WebSocket version {} server handshake", channel, version());
}
FullHttpResponse response = newHandshakeResponse(req, responseHeaders);
ChannelPipeline p = channel.pipeline();
if (p.get(HttpObjectAggregator.class) != null) {
p.remove(HttpObjectAggregator.class);
}
if (p.get(HttpContentCompressor.class) != null) {
p.remove(HttpContentCompressor.class);
}
ChannelHandlerContext ctx = p.context(HttpRequestDecoder.class);
final String encoderName;
if (ctx == null) {
// this means the user use a HttpServerCodec
ctx = p.context(HttpServerCodec.class);
if (ctx == null) {
promise.setFailure(
new IllegalStateException("No HttpDecoder and no HttpServerCodec in the pipeline"));
return promise;
}
//就是這里了男翰,加入默認(rèn)的編解碼器
p.addBefore(ctx.name(), "wsdecoder", newWebsocketDecoder());
p.addBefore(ctx.name(), "wsencoder", newWebSocketEncoder());
encoderName = ctx.name();
} else {
p.replace(ctx.name(), "wsdecoder", newWebsocketDecoder());
encoderName = p.context(HttpResponseEncoder.class).name();
p.addBefore(encoderName, "wsencoder", newWebSocketEncoder());
}
channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
ChannelPipeline p = future.channel().pipeline();
p.remove(encoderName);
promise.setSuccess();
} else {
promise.setFailure(future.cause());
}
}
});
return promise;
}
服務(wù)端完事了,接下來看看客戶端的代碼纽乱,其實(shí)就是前端代碼了(代碼是我網(wǎng)上直接抄的):
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>WebSocket測試</title>
</head>
<body>
<h1>WebSocket測試</h1>
<div id="context"></div>
<script src="https://code.jquery.com/jquery-3.3.1.min.js"></script>
<script type="application/javascript">
var websocket = null;
// 判斷當(dāng)前瀏覽器是否支持WebSocket
if ('WebSocket' in window) {
// 創(chuàng)建WebSocket 對象蛾绎,連接服務(wù)器端點(diǎn)
websocket = new WebSocket("ws://localhost:8081/ws");
} else {
alert('您的瀏覽器不支持websocket');
}
// 連接發(fā)生錯誤的回調(diào)方法
websocket.onerror = function() {
appendMessage ("WebSocket連接失敗");
}
// 連接成功建立的回調(diào)方法
websocket.onopen = function(event) {
appendMessage ("WebSocket連接成功");
}
// 接收到消息的回調(diào)方法
websocket.onmessage = function (event) {
console.log("收到消息")
jsonObject = JSON.parse(event.data)
console.log(jsonObject)
appendMessage(jsonObject.title);
}
websocket.onclose = function() {
appendMessage("關(guān)閉連接");
}
websocket.onbeforeupload = function() {
websocket.close();
}
function appendMessage(message) {
var context = $('#context').html() + '<br>' + message;
$('#context').html(context);
}
function closeWebSocket() {
websocket.close();
}
function sendMessage() {
var message = $('#message').val();
websocket.send(message);
}
</script>
</body>
</html>
接下來啟動服務(wù),然后直接打開該文件迫淹,應(yīng)該就能看到效果了秘通,如下所示:
你也可以多打開幾個客戶端試試,會發(fā)現(xiàn)消息會傳遞給每個客戶端了敛熬,而且這期間不存在什么客戶端主動請求的情況肺稀,即對于客戶端來說,這些個消息就好像“天上掉下來”的一樣应民,這就簡單實(shí)現(xiàn)了消息推送系統(tǒng)话原,現(xiàn)在再來看看客戶端和服務(wù)端的通信情況:
對比上面的那張圖,是不是覺得更加“清爽”了诲锹?現(xiàn)在不再需要在客戶端不斷去輪詢繁仁,去騷擾服務(wù)端了,當(dāng)有新的推送消息的時候归园,服務(wù)端就主動的把消息“推”給客戶端了黄虱,這樣服務(wù)端的壓力也減少了很多,客戶端的CPU也不用一直做沒有意義的事了庸诱。
5 Spring Boot + WebSocket實(shí)現(xiàn)消息推送
我還想介紹一種實(shí)現(xiàn)方案捻浦,這種方案相較于Netty的實(shí)現(xiàn)更加的簡單晤揣,那就是利用Spring 對WebSocket的支持來實(shí)現(xiàn)。就不多說廢話了朱灿,直接來看實(shí)現(xiàn)吧:
建立好Spring Boot項(xiàng)目之后昧识,加入如下依賴(maven):
<dependency>
<!--RabbitMQ的支持-->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<!--web mvc的支持-->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<!--websocket的支持-->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
之后來做兩個配置,一是配置RabbitMQ盗扒,而是配置WebSocket:
//RabbitMQ配置
@Configuration
public class RabbitMQConfig {
//三個分別是隊(duì)列跪楞,交換器以及路由鍵
public static final String PUSH_MSG_EXCHANGE = "push_msg_exchange";
public static final String PUSH_MSG_QUEUE = "push_msg_queue";
public static final String PUSH_MSG_ROUTE_KEY = "push_msg.direct";
@Bean
public DirectExchange pushMsgExchange() {
return new DirectExchange(PUSH_MSG_EXCHANGE, true, true);
}
@Bean
public Queue pushMsgQueue() {
return new Queue(PUSH_MSG_QUEUE, true, false, true);
}
//將隊(duì)列和交換器綁定
@Bean
public Binding pushMsgBinding() {
return BindingBuilder.bind(pushMsgQueue()).to(pushMsgExchange()).with(PUSH_MSG_ROUTE_KEY);
}
}
//WebSocket配置
@Configuration
public class WebSocketConfig {
//只需要配置ServerEndpointExporter這個Bean就行了
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
接下來就是實(shí)現(xiàn)類了:
@ServerEndpoint("/ws")
@Service
public class WebSocketService {
private static Set<WebSocketService> webSocketServiceSet = new CopyOnWriteArraySet<>();
private Session session;
private static AtomicLong onlineCount = new AtomicLong(0);
@OnOpen
public void onOpen(Session session) {
this.session = session;
onlineCount.incrementAndGet();
webSocketServiceSet.add(this);
System.out.println("有用戶上線,當(dāng)前在線人數(shù)有:" + onlineCount.get());
}
@OnClose
public void onClose() {
webSocketServiceSet.remove(this);
onlineCount.decrementAndGet();
System.out.println("有用戶下線侣灶,當(dāng)前在線人數(shù)有: " + onlineCount.get());
}
@OnMessage
public void onMessage(Session session, String message) {
System.out.println("來自客戶端的消息甸祭,客戶端IP:PORT是 : ");
System.out.println(session.getRequestURI().getHost() + ":" + session.getRequestURI().getPort());
System.out.println("消息是: " + message);
}
@OnError
public void onError(Throwable throwable) {
System.out.println("服務(wù)端異常");
throwable.printStackTrace();
}
private void sendMessage(String message) {
try {
this.session.getBasicRemote().sendText(message);
} catch (IOException e) {
e.printStackTrace();
}
}
@RabbitListener(queues = RabbitMQConfig.PUSH_MSG_QUEUE)
public void ListenMessageFromMQ(String message) {
for (WebSocketService webSocketService : webSocketServiceSet) {
//消息推送了,向每個在線客戶端發(fā)送消息
webSocketService.sendMessage(message);
}
}
}
對于每個客戶端炫隶,都會有一個與之對應(yīng)的WebSocketService實(shí)例對象以及Session淋叶,就好像Netty里的Channel一樣(只是一個比喻,并不等同)伪阶,所以用一個靜態(tài)的Set來保存這些實(shí)例對象煞檩,當(dāng)需要發(fā)送消息的時候,直接取出來栅贴,調(diào)用Session的發(fā)送消息的方法就行了斟湃。
好了,就是那么簡單檐薯,幾個注解@OnOpen凝赛,@OnMessage,@OnClose坛缕,@OnError墓猎,聽名字應(yīng)該就知道啥意思了吧,不多說了赚楚,相比于Netty的實(shí)現(xiàn)毙沾,簡單的太多了。不過最后還差一點(diǎn)宠页,消息從哪來呢左胞?和之前一樣,我這里開啟定時任務(wù)举户,定時的往消息隊(duì)列里塞消息就行了烤宙,如下所示:
@Component
public class MyTask {
@Autowired
private RabbitTemplate rabbitTemplate;
@Scheduled(cron = "*/5 * * * * ?")
public void sendMessageToMQ() {
//這里的消息我就隱編碼了,實(shí)際上可以有多種方式來構(gòu)造消息
String message = "Hello, Websocket!!!";
rabbitTemplate.convertAndSend(RabbitMQConfig.PUSH_MSG_EXCHANGE,
RabbitMQConfig.PUSH_MSG_ROUTE_KEY,
message);
}
}
最后俭嘁,別忘了應(yīng)用主類上加入@EnableScheduling躺枕,否則定時任務(wù)不會生效。這里的實(shí)現(xiàn)效果和之前的實(shí)現(xiàn)幾乎一樣,客戶端也不需要修改什么屯远,就不多說了蔓姚。
6 小結(jié)
消息推送系統(tǒng)是一個用途廣泛的系統(tǒng)捕虽,本文簡單介紹了兩種實(shí)現(xiàn)方法慨丐,分別是Netty+WebSocket和Spring Boot+WebSocket,后者其實(shí)是基于Servlet實(shí)現(xiàn)的泄私,所以性能上和Netty還是有一些差異的房揭。不過無論哪種實(shí)現(xiàn)吧,最核心的部分還是WebSocket協(xié)議晌端,
有些代碼寫的不太合理捅暴,望諒解。
7 參考資料
《Netty 權(quán)威指南》WebSocket相關(guān)章節(jié)