前言
秒殺架構(gòu)到后期筹淫,我們采用了消息隊(duì)列的形式實(shí)現(xiàn)搶購(gòu)邏輯,那么之前拋出過這樣一個(gè)問題:消息隊(duì)列異步處理完每個(gè)用戶請(qǐng)求后呢撞,如何通知給相應(yīng)用戶秒殺成功损姜?
場(chǎng)景映射
首先,我們舉一個(gè)生活中比較常見的例子:我們?nèi)ャy行辦理業(yè)務(wù)狸相,一般會(huì)選擇相關(guān)業(yè)務(wù)打印一個(gè)排號(hào)紙薛匪,然后就可以坐在小板凳上玩著手機(jī),等待被小喇叭報(bào)號(hào)脓鹃。當(dāng)小喇叭喊到你所持有的號(hào)碼逸尖,就可以拿著排號(hào)紙去柜臺(tái)辦理自己的業(yè)務(wù)這里,假設(shè)當(dāng)我們?nèi)∨盘?hào)紙的時(shí)候瘸右,銀行根據(jù)時(shí)間段內(nèi)的排隊(duì)情況娇跟,比較人性化的提示戶:排隊(duì)人數(shù)較多,您是否繼續(xù)等待太颤?否的話我們可以換個(gè)時(shí)間段再來辦理苞俘。
由此我們把生活場(chǎng)景映射到真實(shí)的秒殺業(yè)務(wù)邏輯中來:
?我們可以把柜臺(tái)比喻成商品下單處理邏輯單元
?拿到排號(hào)紙說明你進(jìn)入相應(yīng)商品處理隊(duì)列
?拿到排號(hào)紙的請(qǐng)求直接返回前臺(tái),提示用戶搶購(gòu)進(jìn)行中?排號(hào)紙進(jìn)入隊(duì)列后龄章,等待商品業(yè)務(wù)處理邏輯?小喇叭叫到自己的排號(hào)相當(dāng)于服務(wù)端通知用戶秒殺成功吃谣,這時(shí)候可以進(jìn)行支付邏輯
?那些拿不到票號(hào)的同學(xué),相當(dāng)于隊(duì)列已滿直接返回秒殺失敗解決方案通過上面的場(chǎng)景做裙,我們很容易能夠想到一種方案就是服務(wù)端通知岗憋,那么如何做到服務(wù)端異步通知的呢?
下面锚贱,主角開始登場(chǎng)了仔戈,就是我們的Websocket纵朋。WebSocket是HTML5開始提供的一種瀏覽器與服務(wù)器間進(jìn)行全雙工通訊的網(wǎng)絡(luò)技術(shù)靶擦。依靠這種技術(shù)可以實(shí)現(xiàn)客戶端和服務(wù)器端的長(zhǎng)連接,雙向?qū)崟r(shí)通信六水。
HTTP VS WebSocket特點(diǎn):
?異步吧碾、事件觸發(fā)
?可以發(fā)送文本凰盔,圖片等流文件
?數(shù)據(jù)格式比較輕量,性能開銷小滤港,通信高效
?使用ws或者wss協(xié)議的客戶端socket
缺點(diǎn):
?部分瀏覽器不支持廊蜒,瀏覽器支持的程度與方式有區(qū)別趴拧,需要各種兼容寫法。
集成案例由于我們的秒殺架構(gòu)項(xiàng)目案例中使用了SpringBoot山叮,因此集成webSocket也是相對(duì)比較簡(jiǎn)單的著榴。
首先pom.xml引入以下依賴:org.springframework.bootspring-boot-starter-websocketWebSocketConfig
配置:/** * WebSocket配置? */
@Configuration public class WebSocketConfig
{ @Bean public ServerEndpointExporter serverEndpointExporter()
{ return new ServerEndpointExporter(); } }
WebSocketServer 配置:
@ServerEndpoint("/websocket/{userId}")
@Componentpublic class WebSocketServer { private final static Logger log = LoggerFactory.getLogger(WebSocketServer.class); //靜態(tài)變量,用來記錄當(dāng)前在線連接數(shù)屁倔。應(yīng)該把它設(shè)計(jì)成線程安全的脑又。
private static int onlineCount = 0; //concurrent包的線程安全Set,用來存放每個(gè)客戶端對(duì)應(yīng)的MyWebSocket對(duì)象锐借。
private static CopyOnWriteArraySetwebSocketSet = new CopyOnWriteArraySet(); //與某個(gè)客戶端的連接會(huì)話问麸,需要通過它來給客戶端發(fā)送數(shù)據(jù)
private Session session; //接收userId private String userId=""; /** * 連接建立成功調(diào)用的方法*/
@OnOpen
public void onOpen(Session session,@PathParam("userId") String userId)
{ this.session = session; webSocketSet.add(this); //加入set中 addOnlineCount();
//在線數(shù)加1 log.info("有新窗口開始監(jiān)聽:"+userId+",當(dāng)前在線人數(shù)為" + getOnlineCount());
this.userId=userId; try { sendMessage("連接成功"); } catch (IOException e) { log.error("websocket IO異常"); } }
/** * 連接關(guān)閉調(diào)用的方法 */
@OnClose public void onClose()
?{ webSocketSet.remove(this); //從set中刪除 subOnlineCount(); //在線數(shù)減1 log.info("有一連接關(guān)閉!當(dāng)前在線人數(shù)為" + getOnlineCount()); } /** * 收到客戶端消息后調(diào)用的方法 *
@param message 客戶端發(fā)送過來的消息*/
@OnMessage public void onMessage(String message, Session session) { log.info("收到來自窗口"+userId+"的信息:"+message); //群發(fā)消息
for (WebSocketServer item : webSocketSet) { try { item.sendMessage(message); } catch (IOException e) { e.printStackTrace(); } } } /** * @param session * @param error */
@OnError public void onError(Session session, Throwable error) { log.error("發(fā)生錯(cuò)誤"); error.printStackTrace(); } /** * 實(shí)現(xiàn)服務(wù)器主動(dòng)推送 */
public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); } /** * 群發(fā)自定義消息 * */
public static void sendInfo(String message,@PathParam("userId") String userId){ log.info("推送消息到窗口"+userId+"钞翔,推送內(nèi)容:"+message);
for (WebSocketServer item : webSocketSet) { try { //這里可以設(shè)定只推送給這個(gè)userId的严卖,為null則全部推送
if(userId==null) { item.sendMessage(message); }else if(item.userId.equals(userId)){ item.sendMessage(message); } } catch (IOException e) { continue; } } }
public static synchronized int getOnlineCount() { return onlineCount; }
public static synchronized void addOnlineCount() { WebSocketServer.onlineCount++; }
public static synchronized void subOnlineCount() { WebSocketServer.onlineCount--; } }
KafkaConsumer 消費(fèi)配置,通知用戶是否秒殺成功: /** * 消費(fèi)者 spring-kafka 2.0 + 依賴JDK8 *?
?@Component public class KafkaConsumer { @Autowired private ISeckillService seckillService; private static RedisUtil redisUtil = new RedisUtil(); /** * 監(jiān)聽seckill主題,有消息就讀取 * @param message */
@KafkaListener(topics = {"seckill"})
public void receiveMessage(String message)
{ //收到通道的消息之后執(zhí)行秒殺操作 String[] array = message.split(";");
if(redisUtil.getValue(array[0])!=null){//control層已經(jīng)判斷了布轿,其實(shí)這里不需要再判斷了
Result result = seckillService.startSeckil(Long.parseLong(array[0]), Long.parseLong(array[1]));
if(result.equals(Result.ok())){ WebSocketServer.sendInfo(array[0].toString(), "秒殺成功");//推送給前臺(tái) }
else{ WebSocketServer.sendInfo(array[0].toString(), "秒殺失敗");//推送給前臺(tái)
redisUtil.cacheValue(array[0], "ok");//秒殺結(jié)束 } }else{ WebSocketServer.sendInfo(array[0].toString(), "秒殺失敗");//推送給前臺(tái) } } } webSocket.js 前臺(tái)通知邏輯: $(function(){ socket.init(); });
var basePath = "ws://localhost:8080/seckill/";
socket = { webSocket : "", init : function() { //userId:自行追加 if ('WebSocket' in window) { webSocket = new WebSocket(basePath+'websocket/1'); } else if ('MozWebSocket' in window) { webSocket = new MozWebSocket(basePath+"websocket/1"); } else { webSocket = new SockJS(basePath+"sockjs/websocket"); } webSocket.onerror = function(event) { alert("websockt連接發(fā)生錯(cuò)誤哮笆,請(qǐng)刷新頁(yè)面重試!") }; webSocket.onopen = function(event) { }; webSocket.onmessage = function(event) { var message = event.data; alert(message)//判斷秒殺是否成功、自行處理邏輯 }; } }
客戶端API 客戶端與服務(wù)器通信
?send() 向遠(yuǎn)程服務(wù)器發(fā)送數(shù)據(jù)
?close() 關(guān)閉該websocket鏈接 監(jiān)聽函數(shù)
?onopen 當(dāng)網(wǎng)絡(luò)連接建立時(shí)觸發(fā)該事件
?onerror 當(dāng)網(wǎng)絡(luò)發(fā)生錯(cuò)誤時(shí)觸發(fā)該事件
?onclose 當(dāng)websocket被關(guān)閉時(shí)觸發(fā)該事件
?onmessage
當(dāng)websocket接收到服務(wù)器發(fā)來的消息的時(shí)觸發(fā)的事件汰扭,也是通信中最重要的一個(gè)監(jiān)聽事件稠肘。
Java初高級(jí)一起學(xué)習(xí)分享,共同學(xué)習(xí)才是最明智的選擇萝毛,喜歡的話可以我的學(xué)習(xí)群64弍46衣3凌9项阴,或加資料群69似64陸0吧3
readyState屬性 這個(gè)屬性可以返回websocket所處的狀態(tài)。
?CONNECTING(0) websocket正嘗試與服務(wù)器建立連接
??OPEN(1) websocket與服務(wù)器已經(jīng)建立連接
?CLOSING(2) websocket正在關(guān)閉與服務(wù)器的連接
?CLOSED(3) websocket已經(jīng)關(guān)閉了與服務(wù)器的連接 開源方案 goeasy GoEasy實(shí)時(shí)Web推送笆包,支持后臺(tái)推送和前臺(tái)推送兩種:后臺(tái)推送可以選擇Java SDK环揽、 Restful API支持所有開發(fā)語言;
前臺(tái)推送:JS推送庵佣。無論選擇哪種方式推送代碼都十分簡(jiǎn)單(10分鐘可搞定)薯演。由于它支持websocket 和polling兩種連接方式所以兼顧大多數(shù)主流瀏覽器,低版本的IE瀏覽器也是支持的秧了。
?地址:http://goeasy.io/ Pushlets Pushlets 是通過長(zhǎng)連接方式實(shí)現(xiàn)“推”消息的。
推送模式分為:Poll(輪詢)序无、Pull(拉)验毡。
Pushlet 是一個(gè)開源的 Comet 框架,Pushlet 使用了觀察者模型:客戶端發(fā)送請(qǐng)求,訂閱感興趣的事件帝嗡;服務(wù)器端為每個(gè)客戶端分配一個(gè)會(huì)話 ID 作為標(biāo)記晶通,事件源會(huì)把新產(chǎn)生的事件以多播的方式發(fā)送到訂閱者的事件隊(duì)列里。
?地址:https://github.com/wjw465150/Pushlet
總結(jié)
其實(shí)前面有提過哟玷,盡管WebSocket有諸多優(yōu)點(diǎn)狮辽,但是一也,如果服務(wù)端維護(hù)很多長(zhǎng)連接也是挺耗費(fèi)資源的,服務(wù)器集群以及覽器或者客戶端兼容性問題喉脖,也會(huì)帶來了一些不確定性因素椰苟。大體了解了一下各大廠的做法,大多數(shù)都還是基于輪詢的方式實(shí)現(xiàn)的树叽,比如:騰訊PC端微信掃碼登錄舆蝴、京東商城支付成功通知等等。