前言
秒殺架構(gòu)到后期抛蚤,我們采用了消息隊列的形式實現(xiàn)搶購邏輯若锁,那么之前拋出過這樣一個問題:消息隊列異步處理完每個用戶請求后,如何通知給相應(yīng)用戶秒殺成功什荣?
場景映射
首先,我們舉一個生活中比較常見的例子:我們?nèi)ャy行辦理業(yè)務(wù)怀酷,一般會選擇相關(guān)業(yè)務(wù)打印一個排號紙稻爬,然后就可以坐在小板凳上玩著手機,等待被小喇叭報號蜕依。當(dāng)小喇叭喊到你所持有的號碼桅锄,就可以拿著排號紙去柜臺辦理自己的業(yè)務(wù)。
這里样眠,假設(shè)當(dāng)我們?nèi)∨盘柤埖臅r候友瘤,銀行根據(jù)時間段內(nèi)的排隊情況,比較人性化的提示用戶:排隊人數(shù)較多檐束,您是否繼續(xù)等待辫秧?否的話我們可以換個時間段再來辦理。
由此我們把生活場景映射到真實的秒殺業(yè)務(wù)邏輯中來:
我們可以把柜臺比喻成商品下單處理邏輯單元
拿到排號紙說明你進入相應(yīng)商品處理隊列
拿到排號紙的請求直接返回前臺被丧,提示用戶搶購進行中
排號紙進入隊列后盟戏,等待商品業(yè)務(wù)處理邏輯
小喇叭叫到自己的排號相當(dāng)于服務(wù)端通知用戶秒殺成功,這時候可以進行支付邏輯
那些拿不到票號的同學(xué)甥桂,相當(dāng)于隊列已滿直接返回秒殺失敗
解決方案
通過上面的場景柿究,我們很容易能夠想到一種方案就是服務(wù)端通知,那么如何做到服務(wù)端異步通知的呢黄选?下面蝇摸,主角開始登場了,就是我們的Websocket。
WebSocket是HTML5開始提供的一種瀏覽器與服務(wù)器間進行全雙工通訊的網(wǎng)絡(luò)技術(shù)探入。依靠這種技術(shù)可以實現(xiàn)客戶端和服務(wù)器端的長連接狡孔,雙向?qū)崟r通信。
特點:
異步蜂嗽、事件觸發(fā)
可以發(fā)送文本苗膝,圖片等流文件
數(shù)據(jù)格式比較輕量,性能開銷小植旧,通信高效
使用ws或者wss協(xié)議的客戶端socket辱揭,能夠?qū)崿F(xiàn)真正意義上的推送功能
缺點:
部分瀏覽器不支持,瀏覽器支持的程度與方式有區(qū)別病附,需要各種兼容寫法问窃。
集成案例
由于我們的秒殺架構(gòu)項目案例中使用了SpringBoot,因此集成webSocket也是相對比較簡單的完沪。
首先pom.xml引入以下依賴:
org.springframework.bootspring-boot-starter-websocket
WebSocketConfig 配置:
/**
* WebSocket配置
* 創(chuàng)建者? 爪哇筆記
* 創(chuàng)建時間 2018年5月29日
*/@ConfigurationpublicclassWebSocketConfig{@BeanpublicServerEndpointExporterserverEndpointExporter(){returnnewServerEndpointExporter();? ? ? }? }
WebSocketServer 配置:
@ServerEndpoint("/websocket/{userId}")@ComponentpublicclassWebSocketServer{privatefinalstaticLogger log = LoggerFactory.getLogger(WebSocketServer.class);//靜態(tài)變量域庇,用來記錄當(dāng)前在線連接數(shù)。應(yīng)該把它設(shè)計成線程安全的覆积。privatestaticintonlineCount =0;//concurrent包的線程安全Set听皿,用來存放每個客戶端對應(yīng)的MyWebSocket對象。privatestaticCopyOnWriteArraySet webSocketSet =newCopyOnWriteArraySet();//與某個客戶端的連接會話宽档,需要通過它來給客戶端發(fā)送數(shù)據(jù)privateSession session;//接收userIdprivateString userId="";/**
? ? * 連接建立成功調(diào)用的方法*/@OnOpenpublicvoidonOpen(Session session,@PathParam("userId")String userId){this.session = session;? ? ? ? webSocketSet.add(this);//加入set中addOnlineCount();//在線數(shù)加1log.info("有新窗口開始監(jiān)聽:"+userId+",當(dāng)前在線人數(shù)為"+ getOnlineCount());this.userId=userId;try{? ? ? ? ? ? sendMessage("連接成功");? ? ? ? }catch(IOException e) {? ? ? ? ? ? log.error("websocket IO異常");? ? ? ? }? ? }/**
? ? * 連接關(guān)閉調(diào)用的方法
? ? */@OnClosepublicvoidonClose(){? ? ? ? webSocketSet.remove(this);//從set中刪除subOnlineCount();//在線數(shù)減1log.info("有一連接關(guān)閉尉姨!當(dāng)前在線人數(shù)為"+ getOnlineCount());? ? }/**? ? * 收到客戶端消息后調(diào)用的方法? ? *@parammessage 客戶端發(fā)送過來的消息*/@OnMessagepublicvoidonMessage(String message, Session session){? ? ? ? log.info("收到來自窗口"+userId+"的信息:"+message);//群發(fā)消息for(WebSocketServer item : webSocketSet) {try{? ? ? ? ? ? ? ? item.sendMessage(message);? ? ? ? ? ? }catch(IOException e) {? ? ? ? ? ? ? ? e.printStackTrace();? ? ? ? ? ? }? ? ? ? }? ? }/**? ? *@paramsession? ? *@paramerror? ? */@OnErrorpublicvoidonError(Session session, Throwable error){? ? ? ? log.error("發(fā)生錯誤");? ? ? ? error.printStackTrace();? ? }/**
? ? * 實現(xiàn)服務(wù)器主動推送
? ? */publicvoidsendMessage(String message)throwsIOException{this.session.getBasicRemote().sendText(message);? ? }/**
? ? * 群發(fā)自定義消息
? ? * */publicstaticvoidsendInfo(String message,@PathParam("userId")String userId){? ? ? ? log.info("推送消息到窗口"+userId+",推送內(nèi)容:"+message);for(WebSocketServer item : webSocketSet) {try{//這里可以設(shè)定只推送給這個userId的吗冤,為null則全部推送if(userId==null) {? ? ? ? ? ? ? ? ? ? item.sendMessage(message);? ? ? ? ? ? ? ? }elseif(item.userId.equals(userId)){? ? ? ? ? ? ? ? ? ? item.sendMessage(message);? ? ? ? ? ? ? ? }? ? ? ? ? ? }catch(IOException e) {continue;? ? ? ? ? ? }? ? ? ? }? ? }publicstaticsynchronizedintgetOnlineCount(){returnonlineCount;? ? }publicstaticsynchronizedvoidaddOnlineCount(){? ? ? ? WebSocketServer.onlineCount++;? ? }publicstaticsynchronizedvoidsubOnlineCount(){? ? ? ? WebSocketServer.onlineCount--;? ? }}
KafkaConsumer 消費配置又厉,通知用戶是否秒殺成功:
/** * 消費者 spring-kafka 2.0 + 依賴JDK8 *@author科幫網(wǎng) By https://blog.52itstyle.com */@ComponentpublicclassKafkaConsumer{@AutowiredprivateISeckillService seckillService;privatestaticRedisUtil redisUtil =newRedisUtil();/**? ? * 監(jiān)聽seckill主題,有消息就讀取? ? *@parammessage? ? */@KafkaListener(topics = {"seckill"})publicvoidreceiveMessage(String message){//收到通道的消息之后執(zhí)行秒殺操作String[] array = message.split(";");if(redisUtil.getValue(array[0])!=null){//control層已經(jīng)判斷了,其實這里不需要再判斷了Result result = seckillService.startSeckil(Long.parseLong(array[0]), Long.parseLong(array[1]));if(result.equals(Result.ok())){? ? ? ? ? ? ? ? WebSocketServer.sendInfo(array[0].toString(),"秒殺成功");//推送給前臺}else{? ? ? ? ? ? ? ? WebSocketServer.sendInfo(array[0].toString(),"秒殺失敗");//推送給前臺redisUtil.cacheValue(array[0],"ok");//秒殺結(jié)束}? ? ? ? }else{? ? ? ? ? ? WebSocketServer.sendInfo(array[0].toString(),"秒殺失敗");//推送給前臺}? ? }}
webSocket.js 前臺通知邏輯:
$(function(){? ? socket.init();});varbasePath ="ws://localhost:8080/seckill/";socket = {webSocket:"",init:function(){//userId:自行追加if('WebSocket'inwindow) {? ? ? ? ? ? webSocket =newWebSocket(basePath+'websocket/1');? ? ? ? }elseif('MozWebSocket'inwindow) {? ? ? ? ? ? webSocket =newMozWebSocket(basePath+"websocket/1");? ? ? ? }else{? ? ? ? ? ? webSocket =newSockJS(basePath+"sockjs/websocket");? ? ? ? }? ? ? ? webSocket.onerror =function(event){? ? ? ? ? ? alert("websockt連接發(fā)生錯誤椎瘟,請刷新頁面重試!")? ? ? ? };? ? ? ? webSocket.onopen =function(event){? ? ? ? };? ? ? ? webSocket.onmessage =function(event){varmessage = event.data;? ? ? ? ? ? alert(message)//判斷秒殺是否成功覆致、自行處理邏輯};? ? }}
客戶端API
客戶端與服務(wù)器通信
send() 向遠程服務(wù)器發(fā)送數(shù)據(jù)
close() 關(guān)閉該websocket鏈接
監(jiān)聽函數(shù)
onopen 當(dāng)網(wǎng)絡(luò)連接建立時觸發(fā)該事件
onerror 當(dāng)網(wǎng)絡(luò)發(fā)生錯誤時觸發(fā)該事件
onclose 當(dāng)websocket被關(guān)閉時觸發(fā)該事件
onmessage 當(dāng)websocket接收到服務(wù)器發(fā)來的消息的時觸發(fā)的事件,也是通信中最重要的一個監(jiān)聽事件降传。msg.data
readyState屬性
這個屬性可以返回websocket所處的狀態(tài)篷朵。
CONNECTING(0) websocket正嘗試與服務(wù)器建立連接
OPEN(1) websocket與服務(wù)器已經(jīng)建立連接
CLOSING(2) websocket正在關(guān)閉與服務(wù)器的連接
CLOSED(3) websocket已經(jīng)關(guān)閉了與服務(wù)器的連接
開源方案
goeasy
GoEasy實時Web推送勾怒,支持后臺推送和前臺推送兩種:后臺推送可以選擇Java SDK婆排、 Restful API支持所有開發(fā)語言;前臺推送:JS推送笔链。無論選擇哪種方式推送代碼都十分簡單(10分鐘可搞定)段只。由于它支持websocket 和polling兩種連接方式所以兼顧大多數(shù)主流瀏覽器,低版本的IE瀏覽器也是支持的鉴扫。
Pushlets
Pushlets 是通過長連接方式實現(xiàn)“推”消息的赞枕。推送模式分為:Poll(輪詢)、Pull(拉)。
Pushlet
Pushlet 是一個開源的 Comet 框架,Pushlet 使用了觀察者模型:客戶端發(fā)送請求炕婶,訂閱感興趣的事件姐赡;服務(wù)器端為每個客戶端分配一個會話 ID 作為標(biāo)記,事件源會把新產(chǎn)生的事件以多播的方式發(fā)送到訂閱者的事件隊列里柠掂。
總結(jié)
其實前面有提過项滑,盡管WebSocket有諸多優(yōu)點,但是涯贞,如果服務(wù)端維護很多長連接也是挺耗費資源的枪狂,服務(wù)器集群以及覽器或者客戶端兼容性問題,也會帶來了一些不確定性因素宋渔。大體了解了一下各大廠的做法州疾,大多數(shù)都還是基于輪詢的方式實現(xiàn)的,比如:騰訊PC端微信掃碼登錄皇拣、京東商城支付成功通知等等严蓖。
有些小伙伴可能會問了,輪詢豈不是會更耗費資源氧急?其實在我看來谈飒,有些輪詢是不可能穿透到后端數(shù)據(jù)庫查詢服務(wù)的,比如秒殺态蒂,一個緩存標(biāo)記位就可以判定是否秒殺成功杭措。相對于WS的長連接以及其不確定因素,在秒殺場景下钾恢,輪詢還是相對比較合適的手素。
1、具有1-5工作經(jīng)驗的瘩蚪,面對目前流行的技術(shù)不知從何下手泉懦,
需要突破技術(shù)瓶頸的可以加。
2疹瘦、在公司待久了崩哩,過得很安逸,
但跳槽時面試碰壁言沐。
需要在短時間內(nèi)進修邓嘹、跳槽拿高薪的可以加。
3险胰、如果沒有工作經(jīng)驗汹押,但基礎(chǔ)非常扎實,對java工作機制起便,
常用設(shè)計思想棚贾,常用java開發(fā)框架掌握熟練的窖维,可以加。
4妙痹、覺得自己很牛B铸史,一般需求都能搞定。
但是所學(xué)的知識點沒有系統(tǒng)化怯伊,很難在技術(shù)領(lǐng)域繼續(xù)突破的可以加沛贪。
5. 群號:高級架構(gòu)群 Java進階群:180705916.備注好信息!送架構(gòu)視頻震贵。
6.阿里Java高級大牛直播講解知識點利赋,分享知識,
多年工作經(jīng)驗的梳理和總結(jié)猩系,帶著大家全面媚送、