文章分為四個(gè)部分
1牵啦、主要功能
2、運(yùn)用的技術(shù)
3妄痪、系統(tǒng)設(shè)計(jì)
4哈雏、優(yōu)化與總結(jié)
一、主要功能
對平臺(tái)支付網(wǎng)關(guān)的交易訂單進(jìn)行實(shí)時(shí)的統(tǒng)計(jì)衫生,包括實(shí)時(shí)的交易金額與交易訂單量裳瘪、不同支付方式的交易總額、訂單量以及占比罪针、當(dāng)天各個(gè)時(shí)間段的數(shù)據(jù)統(tǒng)計(jì)折線圖彭羹,實(shí)現(xiàn)效果圖如下:
二、運(yùn)用的技術(shù)
Redis:利用Redis的消息發(fā)布與訂閱功能泪酱、以及List派殷、SortedSet、Hash的數(shù)據(jù)結(jié)構(gòu)特性
WebSocket:負(fù)責(zé)將實(shí)時(shí)匯總的交易數(shù)據(jù)推送至瀏覽器客戶端
三墓阀、系統(tǒng)設(shè)計(jì)
實(shí)時(shí)交易數(shù)據(jù)監(jiān)控系統(tǒng)所涉及的工程包括交易服務(wù)毡惜、監(jiān)控統(tǒng)計(jì)服務(wù)、監(jiān)控應(yīng)用(Dubbo服務(wù)化)岂津。交易服務(wù)在交易成功后向Redis中發(fā)布消息并將數(shù)據(jù)發(fā)送至Redis的list隊(duì)列虱黄,監(jiān)控服務(wù)負(fù)責(zé)Redis消息的訂閱并進(jìn)行統(tǒng)計(jì),統(tǒng)計(jì)完成后將實(shí)時(shí)的統(tǒng)計(jì)結(jié)果再次發(fā)送至Redis吮成,監(jiān)控應(yīng)用作為WebSocket的服務(wù)端橱乱,也負(fù)責(zé)監(jiān)聽監(jiān)控服務(wù)推送過來的實(shí)時(shí)統(tǒng)計(jì)數(shù)據(jù)并通過WebSocket將數(shù)據(jù)推送至客戶端。
Redis數(shù)據(jù)結(jié)構(gòu)圖如下:
利用list的lpush粱甫、lpop功能進(jìn)行對數(shù)據(jù)的存取操作泳叠,SortedSet最開始主要是用于排序,將交易時(shí)間作為score進(jìn)行排序茶宵,但是因?yàn)樯婕暗揭恍?shù)據(jù)的計(jì)算危纫,在高并發(fā)以及分布式部署的情況下,利用SortedSet進(jìn)行數(shù)據(jù)統(tǒng)計(jì)是會(huì)存在問題的乌庶,文末會(huì)提到种蝶,hash結(jié)構(gòu)主要是用于對數(shù)據(jù)進(jìn)行原子性的計(jì)算。
UML時(shí)序圖如下:
(一)交易系統(tǒng)——支付服務(wù)
支付服務(wù)在交易成功后瞒大,會(huì)給Redis發(fā)布一條訂單記錄消息螃征,并向Redis的list列表lpush一條同樣的訂單記錄信息,為了不影響正常的支付業(yè)務(wù)流程透敌,所以采用的是異步的方式盯滚,偽代碼如下:
/**
* Redis消息通道
*/
@Value("#{settings['redis.trade.channel']}")
private String redisChannel;
/**
* 微信支付的訂單隊(duì)列key
*/
@Value("#{settings['redis.trade.wxDetails']}")
private String redisWxQueue;
/**
* 支付寶支付的訂單隊(duì)列key
*/
@Value("#{settings['redis.trade.alipayDetails']}")
private String redisAlipayQueue;
/**
* 單個(gè)線程的線程池
*/
protected static ExecutorService executorService = Executors.newSingleThreadExecutor();
/**
* 交易成功后需要執(zhí)行的業(yè)務(wù)邏輯
* @param paymentRecord
*/
public void successPayment(final PaymentRecord paymentRecord) {
// do otherthing...
// 異步發(fā)送消息
executorService.submit(new Runnable() {
@Override
public void run() {
try {
pushPaymentRecordMonitorVo(paymentRecord);
} catch (Exception e) {
log.error("payment send to redis fail,PaymengRecord:" + JsonUtil.toJsonString(paymentRecord));
}
}
});
}
/**
* 將交易成功的訂單信息插入至Redis隊(duì)列并發(fā)送一條通知通知
* @param paymentRecord
* @throws Exception
*/
private void pushPaymentRecordMonitorVo (PaymentRecord paymentRecord) throws Exception{
PaymentRecordMonitorVo paymentRecordMonitorVo = new PaymentRecordMonitorVo();
paymentRecordMonitorVo.setMerchantOrderNo(paymentRecord.getMerchantOrderNo());
paymentRecordMonitorVo.setPayWay(paymentRecord.getPayWayCode() == null ? null : paymentRecord.getPayWayCode().name());
paymentRecordMonitorVo.setTradeTime(paymentRecord.getPaySuccessTime());
paymentRecordMonitorVo.setAmount(paymentRecord.getOrderAmount());
log.info("訂單消息插入Redis隊(duì)列...");
if (paymentRecord.getPayWayCode() != null && paymentRecord.getPayWayCode().equals(PayWayEnum.WEIXIN)) {
JedisHelper.dataCluster().lpush(redisWxQueue,JsonUtil.toJsonString(paymentRecordMonitorVo));
} else if (paymentRecord.getPayWayCode() != null && paymentRecord.getPayWayCode().equals(PayWayEnum.ALIPAY)) {
JedisHelper.dataCluster().lpush(redisAlipayQueue,JsonUtil.toJsonString(paymentRecordMonitorVo));
}
log.info("訂單消息插入Redis隊(duì)列結(jié)束...");
// 發(fā)布消息
log.info("訂單消息發(fā)布到Redis...");
JedisHelper.dataCluster().publish(redisChannel,JsonUtil.toJsonString(paymentRecordMonitorVo));
log.info("訂單消息發(fā)布到Redis結(jié)束...");
}
(二)監(jiān)控服務(wù)
1.主要功能包括:
(1)訂閱Redis中交易服務(wù)發(fā)布過來的訂單消息以及獲取list列表中的訂單數(shù)據(jù)
(2)根據(jù)訂單的交易時(shí)間踢械,按照每15分鐘為一個(gè)數(shù)據(jù)匯總點(diǎn)進(jìn)行匯總
(3)對每15分鐘匯總的SortedSet進(jìn)行統(tǒng)計(jì)后,將結(jié)果再發(fā)布至Redis的消息中
2.遇到的坑:
在監(jiān)控服務(wù)啟動(dòng)的時(shí)候會(huì)進(jìn)行Redis的list列表中數(shù)據(jù)的統(tǒng)計(jì)初始化魄藕,并開啟Redis消息訂閱者的監(jiān)聽内列。但有三個(gè)比較坑的地方就是:
(1)因?yàn)橛玫氖荝edis6個(gè)節(jié)點(diǎn)組成的一個(gè)集群,所以是用JedisCluster背率,但是JedisCluster在2.8.x版本以上才支持消息的發(fā)布與訂閱话瞧,項(xiàng)目原先用的是2.7.3版本
解決方案:把項(xiàng)目Jedis版本改為2.8.1,pom.xml內(nèi)容如下:
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.8.1</version>
</dependency>
(2)Redis的消息發(fā)布與訂閱寝姿,在訂閱方必須要手動(dòng)的調(diào)用subscribe()方法移稳,并將監(jiān)聽者和需要監(jiān)聽的通道作為參數(shù)傳入才能開啟監(jiān)聽,而像ActiveMQ這種消息中間件是不需要顯示的調(diào)用会油,只需配置好消息監(jiān)聽者就會(huì)自動(dòng)監(jiān)聽的。還有一個(gè)坑就是subscribe()方法是一個(gè)線程阻塞方法古毛,本想在項(xiàng)目啟動(dòng)的時(shí)候就調(diào)用subscribe()開啟消息的訂閱翻翩,結(jié)果發(fā)現(xiàn)方法調(diào)用后,其他的代碼根本沒法往下執(zhí)行稻薇。
解決方案是:在項(xiàng)目啟動(dòng)的時(shí)候調(diào)用subscribe()方法開啟消息監(jiān)聽嫂冻,并且新開一個(gè)線程去調(diào)用subscribe()方法來避免阻塞主線程。
3.Redis不支持消息的持久化塞椎。在訂閱者沒有啟動(dòng)的時(shí)候桨仿,消息發(fā)布者將消息發(fā)出去了,訂閱者沒有收到案狠,那訂閱者重新啟動(dòng)的時(shí)候也不會(huì)收到之前發(fā)的消息了服傍,而像ActiveMQ是支持消息的持久化的。
解決方案:在往Redis發(fā)布消息的時(shí)候也同樣往Redis的list列表中l(wèi)push一條同樣消息的數(shù)據(jù)(參照上面交易服務(wù)中的代碼)骂铁,消息訂閱者接收到消息并進(jìn)行相應(yīng)的業(yè)務(wù)處理后吹零,再將list列表中的數(shù)據(jù)刪除,那在監(jiān)控服務(wù)掛掉的情況下拉庵,Redis消息無法正常被監(jiān)聽消費(fèi)灿椅,但是Redis的list列表中還是會(huì)存有消息的數(shù)據(jù),所以后續(xù)我們可以從list列表中取出消息數(shù)據(jù)再進(jìn)行相應(yīng)的業(yè)務(wù)處理钞支,這樣就間接的實(shí)現(xiàn)了Redis消息的持久化茫蛹。
3.部分代碼
(1)RedisSubscribeHelper.java:監(jiān)控服務(wù)啟動(dòng)時(shí),進(jìn)行Redis隊(duì)列中數(shù)據(jù)的統(tǒng)計(jì)初始化烁挟,并開啟Redis消息訂閱者的監(jiān)聽的
package com.ylp.core.monitor.redis;
import com.ylp.common.tools.utils.JsonUtil;
import com.ylp.core.monitor.biz.MonitorBiz;
import com.ylp.facade.monitor.utils.JedisHelper;
import com.ylp.facade.monitor.utils.MonitorUtils;
import com.ylp.facade.monitor.vo.PaymentRecordMonitorVo;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author: Dreyer
* @date: 16/8/25 下午2:24
* @description:作用1:項(xiàng)目啟動(dòng)時(shí),對Redis消息未進(jìn)行處理的訂單數(shù)據(jù)進(jìn)行初始化處理; 作用2:Redis消息訂閱的啟動(dòng)器,項(xiàng)目啟動(dòng)時(shí)新啟動(dòng)一個(gè)線程進(jìn)行消息的訂閱
*/
@Component
public class RedisSubscribeHelper {
private Logger logger = Logger.getLogger(RedisSubscribeHelper.class);
@Autowired
private RedisSubscribeListener redisSubscribeListener;
@Autowired
private MonitorBiz monitorBiz;
/**
* Redis消息訂閱的channel
*/
@Value("#{settings['redis.trade.channel']}")
private String redisChannel;
/**
* 微信支付的訂單數(shù)據(jù)key
*/
@Value("#{settings['redis.trade.wxDetails']}")
private String redisWxQueue;
/**
* 支付寶支付的訂單數(shù)據(jù)key
*/
@Value("#{settings['redis.trade.alipayDetails']}")
private String redisAlipayQueue;
/**
* redis消息訂閱的subscribe方法為阻塞方法,所以需要單獨(dú)啟動(dòng)一個(gè)線程進(jìn)行消息訂閱
*/
private ExecutorService executorService = Executors.newSingleThreadExecutor();
/**
* 構(gòu)造函數(shù)執(zhí)行后,執(zhí)行數(shù)據(jù)初始化與消息訂閱
*/
@PostConstruct
public void doWork() {
init();
subscribe();
}
/**
* 數(shù)據(jù)的初始化,不能一次性取出所有的數(shù)據(jù),因?yàn)橛锌赡芟到y(tǒng)在啟動(dòng)的時(shí)候Redis的list會(huì)有元素push進(jìn)來
*/
private void init() {
long start = System.currentTimeMillis();
// 避免初始化加載時(shí),JedisCluste還沒完成初始化,所以需要sleep
JedisCluster jedisCluster = JedisHelper.dataCluster();
if (jedisCluster == null) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
logger.info("開始從Redis中獲取微信訂單初始化數(shù)據(jù)....");
dealData(jedisCluster, redisWxQueue);
logger.info("微信訂單記錄數(shù)據(jù)全部處理完畢...");
logger.info("開始從Redis中獲取支付寶訂單初始化數(shù)據(jù)....");
dealData(jedisCluster, redisAlipayQueue);
logger.info("支付寶訂單記錄數(shù)據(jù)全部處理完畢...");
logger.info("初始化數(shù)據(jù)工作耗時(shí):" + (System.currentTimeMillis() - start) + "ms");
}
/**
* 根據(jù)隊(duì)列來處理隊(duì)列中的數(shù)據(jù)
*
* @param queueName
*/
private void dealData(JedisCluster jedisCluster, String queueName) {
long startTime = MonitorUtils.getCurrentDayZeroTime().getTime();
long endTime = MonitorUtils.getDateEndTime().getTime();
Date tradeTime = null;
String str = "";
PaymentRecordMonitorVo paymentRecordMonitorVo = null;
while (true) {
try {
str = jedisCluster.lpop(queueName);
if (StringUtils.isEmpty(str)) {
break;
}
paymentRecordMonitorVo = JsonUtil.jsonToObject(str, PaymentRecordMonitorVo.class);
tradeTime = paymentRecordMonitorVo.getTradeTime();
// 只處理今天的數(shù)據(jù)
if (tradeTime != null && tradeTime.getTime() >= startTime && tradeTime.getTime() <= endTime) {
monitorBiz.periodStatistics(paymentRecordMonitorVo);
}
} catch (IOException e) {
e.printStackTrace();
break;
}
}
}
/**
* Redis消息訂閱
*/
private void subscribe() {
logger.info("啟動(dòng)Redis消息訂閱,channel:" + redisChannel);
executorService.submit(new Runnable() {
@Override
public void run() {
JedisHelper.coreCluster().subscribe(redisSubscribeListener, redisChannel);
}
});
}
}
(2)RedisSubscribeListener.java:Redis消息監(jiān)聽者
package com.ylp.core.monitor.redis;
import com.ylp.common.tools.utils.JsonUtil;
import com.ylp.core.monitor.biz.MonitorBiz;
import com.ylp.facade.monitor.vo.PaymentRecordMonitorVo;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Client;
import redis.clients.jedis.JedisPubSub;
import java.io.IOException;
/**
* @author: Dreyer
* @date: 16/8/23 下午2:57
* @description: Redis消息隊(duì)列監(jiān)聽者
*/
@Component("redisSubscribeListener")
public class RedisSubscribeListener extends JedisPubSub {
private Logger logger = Logger.getLogger(RedisSubscribeListener.class);
@Autowired
private MonitorBiz monitorBiz;
public RedisSubscribeListener() {
super();
}
@Override
public void onMessage(String channel, String message) {
logger.info("Redis received...");
logger.info("channel:" + channel + ",message:" + message);
if (StringUtils.isEmpty(message)) {
return;
}
PaymentRecordMonitorVo paymentRecordMonitorVo = null;
try {
paymentRecordMonitorVo = JsonUtil.jsonToObject(message, PaymentRecordMonitorVo.class);
} catch (IOException e) {
logger.error("message轉(zhuǎn)PaymentRecordMonitorVo異常,message=" + message, e);
return;
}
// 先把該筆交易訂單信息解析存入對應(yīng)的統(tǒng)計(jì)時(shí)間段內(nèi),每隔15分鐘匯總一次數(shù)據(jù),存入Redis數(shù)據(jù)匯總集合
long start = System.currentTimeMillis();
monitorBiz.periodStatistics(paymentRecordMonitorVo);
logger.info("monitorBiz.periodStatistics()執(zhí)行耗時(shí):" + (System.currentTimeMillis() - start) + "ms");
// 再統(tǒng)計(jì)當(dāng)天各個(gè)時(shí)間段的總數(shù)據(jù),即為實(shí)時(shí)的交易數(shù)據(jù)
start = System.currentTimeMillis();
monitorBiz.publishRealTimeData();
logger.info("monitorBiz.publishRealTimeData()執(zhí)行耗時(shí):" + (System.currentTimeMillis() - start) + "ms");
super.onMessage(channel, message);
}
}
(3)MonitorBiz.java:Redis數(shù)據(jù)統(tǒng)計(jì)業(yè)務(wù)類
package com.ylp.core.monitor.biz;
import com.ylp.common.tools.utils.DateUtils;
import com.ylp.common.tools.utils.JsonUtil;
import com.ylp.facade.monitor.utils.JedisHelper;
import com.ylp.facade.monitor.utils.MonitorUtils;
import com.ylp.facade.monitor.utils.NumberUtil;
import com.ylp.facade.monitor.vo.PaymentRecordMonitorVo;
import com.ylp.facade.monitor.vo.ScreenDataVo;
import com.ylp.facade.monitor.vo.StatisticsVo;
import com.ylp.facade.trade.enums.PayWayEnum;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import redis.clients.jedis.Tuple;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author: Dreyer
* @date: 16/8/25 下午6:12
* @description: Redis數(shù)據(jù)統(tǒng)計(jì)業(yè)務(wù)類
*/
@Component("monitorBiz")
public class MonitorBiz {
private Logger logger = Logger.getLogger(MonitorBiz.class);
/**
* 實(shí)時(shí)數(shù)據(jù)監(jiān)控的數(shù)據(jù)隊(duì)列
*/
@Value("#{settings['redis.trade.screen.channel']}")
private String redisTradeScreenChannel;
/**
* 微信支付的訂單詳細(xì)數(shù)據(jù)key
*/
@Value("#{settings['redis.trade.wxDetails']}")
private String redisWxQueue;
/**
* 支付寶支付的訂單詳細(xì)數(shù)據(jù)key
*/
@Value("#{settings['redis.trade.alipayDetails']}")
private String redisAlipayQueue;
/**
* 微信支付的訂單匯總數(shù)據(jù)
*/
@Value("#{settings['redis.trade.wxSummary']}")
private String redisWxStatisticsQueue;
/**
* 支付寶支付的訂單匯總數(shù)據(jù)
*/
@Value("#{settings['redis.trade.alipaySummary']}")
private String redisAlipayStatisticsQueue;
/**
* 單個(gè)線程的線程池
*/
private ExecutorService executorService = Executors.newSingleThreadExecutor();
/**
* 分時(shí)間間隔的數(shù)據(jù)統(tǒng)計(jì),將時(shí)間段內(nèi)的數(shù)據(jù)存入Redis有序集合
*
* @param paymentRecordMonitorVo 交易訂單信息
* @return
*/
public void periodStatistics(final PaymentRecordMonitorVo paymentRecordMonitorVo) {
// 根據(jù)支付訂單的交易時(shí)間獲取該筆交易應(yīng)該劃分至哪個(gè)統(tǒng)計(jì)時(shí)間段內(nèi)
Map<String, Date> dateMap = MonitorUtils.getTimeSpace(paymentRecordMonitorVo.getTradeTime());
Date start = dateMap.get("previous");
Date end = dateMap.get("next");
logger.info("start:" + DateUtils.dateToTime(start) + ",end:" + DateUtils.dateToTime(end));
String payWay = paymentRecordMonitorVo.getPayWay();
String queue = payWay.equals(PayWayEnum.WEIXIN.name()) ? redisWxStatisticsQueue : payWay.equals(PayWayEnum.ALIPAY.name()) ? redisAlipayStatisticsQueue : "";
StatisticsVo result = new StatisticsVo();
result.setPayWay(payWay);
result.setPeriodTime(end);
// 查詢該時(shí)間段內(nèi),統(tǒng)計(jì)集合中的數(shù)據(jù)
Set<String> wxSet = JedisHelper.dataCluster().zrangeByScore(queue, end.getTime(), end.getTime());
// 如果匯總集合中沒有該時(shí)間段的數(shù)據(jù),那么則新增
if (CollectionUtils.isEmpty(wxSet)) {
logger.info("wxSet is null.... ");
result.setTotalOrderCount(1);
result.setTotalOrderAmount(paymentRecordMonitorVo.getAmount().doubleValue());
JedisHelper.dataCluster().zadd(queue, end.getTime(), JsonUtil.toJsonString(result));
} else {
logger.info("wxSet is not null and wxSet.size()=" + wxSet.size());
Double amount = 0.0;
Integer count = 0;
Iterator<String> iterator = wxSet.iterator();
while (iterator.hasNext()) {
try {
StatisticsVo statisticsVo = JsonUtil.jsonToObject(iterator.next(), StatisticsVo.class);
logger.info("statisticsVo:" + JsonUtil.toJsonString(statisticsVo));
amount = NumberUtil.add(statisticsVo.getTotalOrderAmount().doubleValue(), amount.doubleValue());
count = count + statisticsVo.getTotalOrderCount();
} catch (IOException e) {
e.printStackTrace();
}
}
result.setTotalOrderCount(count + 1);
result.setTotalOrderAmount(NumberUtil.add(amount, paymentRecordMonitorVo.getAmount().doubleValue()));
// 先移除舊數(shù)據(jù)
long removeRow = JedisHelper.dataCluster().zremrangeByScore(queue, end.getTime(), end.getTime());
logger.info("移除舊數(shù)據(jù)記錄數(shù):" + removeRow);
// 再插入最新的數(shù)據(jù)
JedisHelper.dataCluster().zadd(queue, end.getTime(), JsonUtil.toJsonString(result));
}
// 異步刪除Redis隊(duì)列中的訂單詳細(xì)信息
final String key = payWay.equals(PayWayEnum.WEIXIN.name()) ? redisWxQueue : payWay.equals(PayWayEnum.ALIPAY.name()) ? redisAlipayQueue : "";
executorService.submit(new Runnable() {
@Override
public void run() {
// 指定元素進(jìn)行刪除
Long row = JedisHelper.dataCluster().lrem(key, 0, JsonUtil.toJsonString(paymentRecordMonitorVo));
logger.info("刪除明細(xì)記錄數(shù):" + row);
}
});
logger.info("result:" + JsonUtil.toJsonString(result));
}
/**
* 實(shí)時(shí)數(shù)據(jù)推送
*/
public void publishRealTimeData() {
ScreenDataVo screenDataVo = dayStatistics();
// 將屏幕監(jiān)控的數(shù)據(jù)放入Redis消息中
JedisHelper.dataCluster().publish(redisTradeScreenChannel, JsonUtil.toJsonString(screenDataVo));
logger.info("監(jiān)控實(shí)時(shí)匯總數(shù)據(jù)消息發(fā)送成功...");
}
}
(三)婴洼、監(jiān)控應(yīng)用系統(tǒng)
系統(tǒng)采用前后端分離的模式,后端提供獲取初始化數(shù)據(jù)的API給前端進(jìn)行調(diào)用信夫,數(shù)據(jù)初始化完成后窃蹋,每筆交易產(chǎn)生的數(shù)據(jù)變動(dòng)都通過WebSocket的方式實(shí)時(shí)傳輸?shù)綖g覽器客戶端進(jìn)行展示卡啰。
1.主要功能包括:
(1)實(shí)現(xiàn)WebSocket的服務(wù)端,將實(shí)時(shí)的交易數(shù)據(jù)推送至客戶端
(2)作為Redis消息的訂閱方警没,獲取到消息數(shù)據(jù)后匈辱,將數(shù)據(jù)通過WebSocket的方式廣播出去
2.遇到的坑
(1)在本地測試的時(shí)候,WebSocket服務(wù)端與客戶端能進(jìn)行正常的通信杀迹,部署至服務(wù)器后亡脸,客戶端卻無法連接到服務(wù)端,后來發(fā)現(xiàn)本地IDEA運(yùn)行的是Tomcat8树酪,服務(wù)器上部署的是Tomcat7.x版本浅碾,于是把服務(wù)器上的Tomcat版本換成8后就能進(jìn)行正常的通信了。
WebSocket所需要的jar包如下:
<dependency>
<groupId>javax.websocket</groupId>
<artifactId>javax.websocket-api</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.glassfish.tyrus.bundles</groupId>
<artifactId>tyrus-standalone-client</artifactId>
<version>1.9</version>
</dependency>
3.部分代碼
(1)WebSocketServer.java:WebSocket服務(wù)端
package com.ylp.webSocket.server;
import org.apache.log4j.Logger;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author: Dreyer
* @date: 16/8/24 上午11:25
* @description:WebSocket服務(wù)端
*/
@ServerEndpoint("/tradeSocketServer")
public class WebSocketServer {
private Logger logger = Logger.getLogger(this.getClass().getName());
/**
* 客戶端會(huì)話集合
*/
private static Map<String, Session> sessionMap = new ConcurrentHashMap<String, Session>();
/**
* 連接建立成功需要執(zhí)行的方法
*
* @param session
*/
@OnOpen
public void onOpen(Session session) {
logger.info("onOpen,sessionId:" + session.getId());
sessionMap.put(session.getId(), session);
}
/**
* 收到客戶端調(diào)用后需要執(zhí)行的方法
*
* @param message
* @param session
*/
@OnMessage
public void onMessage(String message, Session session) {
logger.info("received message:" + message);
broadcastAll(message);
}
/**
* 廣播給所有客戶端
*
* @param message
*/
public static void broadcastAll(String message) {
Set<Map.Entry<String, Session>> set = sessionMap.entrySet();
for (Map.Entry<String, Session> i : set) {
try {
i.getValue().getBasicRemote().sendText(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 連接關(guān)閉時(shí)執(zhí)行的方法
*
* @param session
* @param closeReason
*/
@OnClose
public void onClose(Session session, CloseReason closeReason) {
sessionMap.remove(session.getId());
logger.info(String.format("Session %s closed because of %s", session.getId(), closeReason));
}
/**
* 發(fā)生錯(cuò)誤時(shí)執(zhí)行的方法
*
* @param session
* @param throwable
*/
@OnError
public void error(Session session, Throwable throwable) {
sessionMap.remove(session.getId());
logger.error("異常", throwable);
}
}
(3)RedisSubscribeListener.java:Redis消息監(jiān)聽者续语,這個(gè)工程中又要實(shí)現(xiàn)一遍垂谢,只不過是收到消息后需要執(zhí)行的業(yè)務(wù)邏輯不一樣,同樣的疮茄,RedisSubcribeHelper.java類也要重新實(shí)現(xiàn)一遍滥朱,后續(xù)可以優(yōu)化
package com.ylp.redis;
import com.ylp.common.tools.utils.JsonUtil;
import com.ylp.facade.monitor.vo.ScreenDataVo;
import com.ylp.webSocket.server.WebSocketServer;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Client;
import redis.clients.jedis.JedisPubSub;
import java.io.IOException;
/**
* @author: Dreyer
* @date: 16/8/23 下午2:57
* @description: Redis消息隊(duì)列監(jiān)聽者
*/
@Component("redisSubscribeListener")
public class RedisSubscribeListener extends JedisPubSub {
private Logger logger = Logger.getLogger(RedisSubscribeListener.class);
public RedisSubscribeListener() {
super();
}
@Override
public void onMessage(String channel, String message) {
logger.info("Redis received...");
logger.info("channel:" + channel + ",message:" + message);
if (StringUtils.isEmpty(message)) {
return;
}
try {
ScreenDataVo screenDataVo = JsonUtil.jsonToObject(message, ScreenDataVo.class);
logger.info("screenDataVo:" + JsonUtil.toJsonString(screenDataVo));
} catch (IOException e) {
logger.error("message轉(zhuǎn)ScreenDataVo異常,message=" + message, e);
e.printStackTrace();
}
// 將實(shí)時(shí)交易數(shù)據(jù)推送至WebSocketServer
WebSocketServer.broadcastAll(message);
super.onMessage(channel, message);
}
}
(4)WebSocket的客戶端(模擬)
<%@ page language="java" contentType="text/html; charset=UTF-8"
pageEncoding="UTF-8"%>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<title>Index</title>
<script type="text/javascript">
var ws = null;
function startWebSocket() {
if ('WebSocket' in window)
ws = new WebSocket("ws://172.xx.xx.xx/tradeSocketServer");
else if ('MozWebSocket' in window)
ws = new MozWebSocket("ws://172.xx.xx.xx/tradeSocketServer");
else
alert("not support");
ws.onmessage = function (evt) {
alert(evt.data);
};
ws.onclose = function (evt) {
alert("close");
};
ws.onopen = function (evt) {
alert("open");
};
}
function sendMsg() {
ws.send(document.getElementById('writeMsg').value);
}
</script>
</head>
<body onload="startWebSocket();">
<input type="text" id="writeMsg"></input>
<input type="button" value="send" onclick="sendMsg()"></input>
</body>
</html>
(四)WebSocket服務(wù)端通過域名訪問配置Nginx
1.WebSocket服務(wù)端如果要通過域名來訪問,需要配置Nginx力试,附上Nginx相關(guān)的配置信息徙邻,需要特別配置map選項(xiàng)
Nginx版本:Tengine version: Tengine/2.1.1 (nginx/1.6.2)
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
server {
listen 80;
server_name xxx.xxx.com;
location / {
root /usr/local/www/monitorsystem;
index index.html;
try_files $uri $uri/ /index.html =404;
}
location /monitor/ {
proxy_pass http://172.xx.xx.xx:0000/;
add_header 'Access-Control-Allow-Origin' '*';
add_header 'Access-Control-Allow-Credentials' 'true';
add_header 'Access-Control-Allow-Methods' 'POST, GET, OPTIONS,PUT,DELETE';
add_header 'Access-Control-Allow-Headers' '*,token';
proxy_set_header Host $http_host;
proxy_set_header Cookie $http_cookie;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_read_timeout 1200;
client_max_body_size 100m;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
2.遇到的坑
在客戶端不活動(dòng)的情況下,默認(rèn)60秒后會(huì)客戶端會(huì)自動(dòng)斷開與服務(wù)端的連接畸裳,通過Nginx中的proxy_red_timeout參數(shù)可以配置缰犁,客戶端也可以通過每隔一段時(shí)間去請求一次服務(wù)端,保持于服務(wù)端的連接怖糊。
四帅容、總結(jié)與優(yōu)化
1、在測試部門進(jìn)行高并發(fā)測試的情況下蓬抄,periodStatistics()統(tǒng)計(jì)方法會(huì)出現(xiàn)數(shù)據(jù)統(tǒng)計(jì)錯(cuò)亂的情況丰嘉,所以針對這個(gè)方法要加上同步代碼塊
2、上述的代碼在單臺(tái)服務(wù)器部署的情況下嚷缭,基本上沒啥問題饮亏,但是如果部署在多臺(tái)服務(wù)器上(分布式部署),那么很多問題就發(fā)生咯阅爽,比如說:
問題1:Redis消息發(fā)布與訂閱路幸,消息發(fā)布出去了,A服務(wù)器中的應(yīng)用會(huì)收到消息然后進(jìn)行統(tǒng)計(jì)付翁,B服務(wù)器中的應(yīng)用也會(huì)收到相同的消息然后進(jìn)行計(jì)算简肴,那計(jì)算出來的結(jié)果就多出一倍的數(shù)據(jù)了。
問題2:當(dāng)日實(shí)時(shí)數(shù)據(jù)是統(tǒng)計(jì)Redis中“每15分鐘匯總”數(shù)據(jù)的SortedSet百侧,即一天被分為96條匯總數(shù)據(jù)砰识,統(tǒng)計(jì)當(dāng)日實(shí)時(shí)數(shù)據(jù)的時(shí)候就把96條數(shù)據(jù)進(jìn)行累加即可能扒。但是 但是在每一筆訂單過來的時(shí)候,會(huì)先計(jì)算出此次統(tǒng)計(jì)的最新結(jié)果辫狼,再把舊的“每15分鐘匯總”數(shù)據(jù)記錄給刪除初斑,然后再插入此次計(jì)算的最新結(jié)果(這樣做的原因是SortedSet沒有更新的方法),代碼請看MonitorBiz的periodStatistics()方法膨处,這樣就會(huì)存在一個(gè)問題:A服務(wù)器計(jì)算出最新統(tǒng)計(jì)結(jié)構(gòu)见秤,然后把舊數(shù)據(jù)刪除了,還沒插入最新統(tǒng)計(jì)數(shù)據(jù)的時(shí)候真椿,B機(jī)器去獲取當(dāng)日實(shí)時(shí)數(shù)據(jù)鹃答,這時(shí)候就會(huì)缺少那條已刪除還未插入的數(shù)據(jù),造成數(shù)據(jù)的錯(cuò)誤突硝。
那針對上面兩個(gè)問題的解決方案是:
問題1解決方案:Redis消息發(fā)送的時(shí)候只發(fā)布一條簡單的消息通知即可测摔,把交易訂單的數(shù)據(jù)存入Redis的list隊(duì)列中,如果多個(gè)服務(wù)器都收到了消息解恰,便去list隊(duì)列中獲取最新的一條記錄避咆,取到則進(jìn)行處理,取不到則直接返回修噪,Redis的list隊(duì)列是能保證只有一臺(tái)機(jī)器能取到數(shù)據(jù)的。
問題2解決方案:最開始設(shè)計(jì)的時(shí)候決定用SortedSet這種數(shù)據(jù)結(jié)構(gòu)是因?yàn)樗軡M足排序的要求路媚,但是從上面的問題看來黄琼,它也只是能解決排序的問題而已(前端要求要按時(shí)間排好順序在“獲取初始化數(shù)據(jù)”的接口中將數(shù)據(jù)發(fā)給他),對于并發(fā)的讀寫數(shù)據(jù)的操作都不能很好的支持整慎,并不能解決上面所說的問題脏款,所以最后決定改用Hash這種數(shù)據(jù)結(jié)構(gòu),原因是因?yàn)樗С衷有缘臄?shù)據(jù)累加操作裤园,在高并發(fā)的情況下撤师,能有很好的支持,如hincrby拧揽、hincrbyfloat方法剃盾。
/**
* 1.每隔15分鐘匯總一次數(shù)據(jù),存入Redis數(shù)據(jù)匯總集合
* 2.當(dāng)日實(shí)時(shí)數(shù)據(jù)進(jìn)行累加操作
*
* @param message 交易訂單類型
* @return
*/
public void periodStatistics(String message) {
String listStr = "";
if (StringUtils.isNotEmpty(message) && message.equals(PayWayEnum.WEIXIN.name())) {
listStr = JedisHelper.dataCluster().lpop(redisWxQueue);
} else if (StringUtils.isNotEmpty(message) && message.equals(PayWayEnum.ALIPAY.name())) {
listStr = JedisHelper.dataCluster().lpop(redisAlipayQueue);
}
// 為空則說明數(shù)據(jù)被其他服務(wù)器取走了,則不用進(jìn)行處理
if (StringUtils.isEmpty(listStr)) {
return;
}
PaymentRecordMonitorVo paymentRecordMonitorVo = null;
try {
paymentRecordMonitorVo = JsonUtil.jsonToObject(listStr, PaymentRecordMonitorVo.class);
} catch (IOException e) {
e.printStackTrace();
return;
}
// 每日實(shí)時(shí)數(shù)據(jù)的hash結(jié)構(gòu)增量統(tǒng)計(jì)
increaseBy(paymentRecordMonitorVo);
// 分時(shí)間段的匯總統(tǒng)計(jì)
periodStatistics(paymentRecordMonitorVo);
}
/**
* 每日實(shí)時(shí)數(shù)據(jù)的hash結(jié)構(gòu)增量統(tǒng)計(jì)
*
* @param paymentRecordMonitorVo
*/
private void increaseBy(PaymentRecordMonitorVo paymentRecordMonitorVo) {
// step1:微信訂單數(shù)據(jù)處理
if (paymentRecordMonitorVo.getPayWay().equals(PayWayEnum.ALIPAY.name())) {
JedisHelper.dataCluster().hincrBy(daySummaryKey, dayAlipayCount, 1);
JedisHelper.dataCluster().hincrByFloat(daySummaryKey, dayAlipayAmount, paymentRecordMonitorVo.getAmount().doubleValue());
// step2:支付寶訂單數(shù)據(jù)處理
} else if (paymentRecordMonitorVo.getPayWay().equals(PayWayEnum.WEIXIN.name())) {
JedisHelper.dataCluster().hincrBy(daySummaryKey, dayWxCount, 1);
JedisHelper.dataCluster().hincrByFloat(daySummaryKey, dayWxAmount, paymentRecordMonitorVo.getAmount().doubleValue());
}
// step3:總的數(shù)據(jù)增加
JedisHelper.dataCluster().hincrBy(daySummaryKey, dayTotalCount, 1);
JedisHelper.dataCluster().hincrByFloat(daySummaryKey, dayTotalAmount, paymentRecordMonitorVo.getAmount().doubleValue());
}
3、由于系統(tǒng)在設(shè)計(jì)之初是滿足實(shí)時(shí)交易數(shù)據(jù)顯示的淤袜,所以在每一筆訂單過來的時(shí)候都會(huì)進(jìn)行數(shù)據(jù)統(tǒng)計(jì)痒谴,然后利用WebSocket進(jìn)行實(shí)時(shí)數(shù)據(jù)推送到客戶端,在進(jìn)行壓力測試的時(shí)候铡羡,每秒的并發(fā)達(dá)到上百個(gè)积蔚,于是瀏覽器客戶端就會(huì)發(fā)生崩潰的現(xiàn)象,因?yàn)槊棵攵紩?huì)向?yàn)g覽器客戶端推送上百次
解決方案:在服務(wù)端進(jìn)行數(shù)據(jù)統(tǒng)計(jì)后烦周,每隔2秒把最新的數(shù)據(jù)推送至瀏覽器客戶端尽爆。
4怎顾、前期開發(fā)的時(shí)候?yàn)榱烁玫恼{(diào)試與問題的定位,所以寫了很多l(xiāng)ogger.info()這種日志記錄操作漱贱,在上生產(chǎn)環(huán)境的時(shí)候要把這些內(nèi)容去掉或者日志級(jí)別改為debug槐雾,這種日志記錄會(huì)設(shè)計(jì)到磁盤IO讀寫,在某些時(shí)候可能會(huì)成為系統(tǒng)性能的瓶頸饱亿。
注:上述的代碼大部分都已經(jīng)過優(yōu)化蚜退,優(yōu)化后的代碼就不貼出來了。
聰明的讀者要是還發(fā)現(xiàn)設(shè)計(jì)與實(shí)現(xiàn)中有不足之處彪笼,歡迎指正钻注,bu'she
反思與總結(jié),嗯配猫。幅恋。。