實(shí)時(shí)交易數(shù)據(jù)監(jiān)控系統(tǒng)的設(shè)計(jì)與實(shí)現(xiàn)

文章分為四個(gè)部分
1牵啦、主要功能
2、運(yùn)用的技術(shù)
3妄痪、系統(tǒng)設(shè)計(jì)
4哈雏、優(yōu)化與總結(jié)

一、主要功能

對平臺(tái)支付網(wǎng)關(guān)的交易訂單進(jìn)行實(shí)時(shí)的統(tǒng)計(jì)衫生,包括實(shí)時(shí)的交易金額與交易訂單量裳瘪、不同支付方式的交易總額、訂單量以及占比罪针、當(dāng)天各個(gè)時(shí)間段的數(shù)據(jù)統(tǒng)計(jì)折線圖彭羹,實(shí)現(xiàn)效果圖如下:

untitled.jpg

二、運(yùn)用的技術(shù)

Redis:利用Redis的消息發(fā)布與訂閱功能泪酱、以及List派殷、SortedSet、Hash的數(shù)據(jù)結(jié)構(gòu)特性
WebSocket:負(fù)責(zé)將實(shí)時(shí)匯總的交易數(shù)據(jù)推送至瀏覽器客戶端

三墓阀、系統(tǒng)設(shè)計(jì)

實(shí)時(shí)交易數(shù)據(jù)監(jiān)控系統(tǒng)所涉及的工程包括交易服務(wù)毡惜、監(jiān)控統(tǒng)計(jì)服務(wù)、監(jiān)控應(yīng)用(Dubbo服務(wù)化)岂津。交易服務(wù)在交易成功后向Redis中發(fā)布消息并將數(shù)據(jù)發(fā)送至Redis的list隊(duì)列虱黄,監(jiān)控服務(wù)負(fù)責(zé)Redis消息的訂閱并進(jìn)行統(tǒng)計(jì),統(tǒng)計(jì)完成后將實(shí)時(shí)的統(tǒng)計(jì)結(jié)果再次發(fā)送至Redis吮成,監(jiān)控應(yīng)用作為WebSocket的服務(wù)端橱乱,也負(fù)責(zé)監(jiān)聽監(jiān)控服務(wù)推送過來的實(shí)時(shí)統(tǒng)計(jì)數(shù)據(jù)并通過WebSocket將數(shù)據(jù)推送至客戶端。
Redis數(shù)據(jù)結(jié)構(gòu)圖如下:


Redis數(shù)據(jù)結(jié)構(gòu).png

利用list的lpush粱甫、lpop功能進(jìn)行對數(shù)據(jù)的存取操作泳叠,SortedSet最開始主要是用于排序,將交易時(shí)間作為score進(jìn)行排序茶宵,但是因?yàn)樯婕暗揭恍?shù)據(jù)的計(jì)算危纫,在高并發(fā)以及分布式部署的情況下,利用SortedSet進(jìn)行數(shù)據(jù)統(tǒng)計(jì)是會(huì)存在問題的乌庶,文末會(huì)提到种蝶,hash結(jié)構(gòu)主要是用于對數(shù)據(jù)進(jìn)行原子性的計(jì)算。

UML時(shí)序圖如下:


Paste_Image.png
(一)交易系統(tǒng)——支付服務(wù)

支付服務(wù)在交易成功后瞒大,會(huì)給Redis發(fā)布一條訂單記錄消息螃征,并向Redis的list列表lpush一條同樣的訂單記錄信息,為了不影響正常的支付業(yè)務(wù)流程透敌,所以采用的是異步的方式盯滚,偽代碼如下:

/**
     * Redis消息通道
     */
    @Value("#{settings['redis.trade.channel']}")
    private String redisChannel;
    /**
     * 微信支付的訂單隊(duì)列key
     */
    @Value("#{settings['redis.trade.wxDetails']}")
    private String redisWxQueue;
    /**
     * 支付寶支付的訂單隊(duì)列key
     */
    @Value("#{settings['redis.trade.alipayDetails']}")
    private String redisAlipayQueue;
    /**
     * 單個(gè)線程的線程池
     */
    protected static ExecutorService executorService = Executors.newSingleThreadExecutor();
    
    /**
     * 交易成功后需要執(zhí)行的業(yè)務(wù)邏輯
     * @param paymentRecord
     */
    public void successPayment(final PaymentRecord paymentRecord) {
        // do otherthing...
        // 異步發(fā)送消息
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    pushPaymentRecordMonitorVo(paymentRecord);
                } catch (Exception e) {
                    log.error("payment send to redis fail,PaymengRecord:" + JsonUtil.toJsonString(paymentRecord));
                }
            }
        });
    }
    /**
     * 將交易成功的訂單信息插入至Redis隊(duì)列并發(fā)送一條通知通知
     * @param paymentRecord
     * @throws Exception
     */
    private void pushPaymentRecordMonitorVo (PaymentRecord paymentRecord) throws Exception{
        PaymentRecordMonitorVo paymentRecordMonitorVo = new PaymentRecordMonitorVo();
        paymentRecordMonitorVo.setMerchantOrderNo(paymentRecord.getMerchantOrderNo());
        paymentRecordMonitorVo.setPayWay(paymentRecord.getPayWayCode() == null ? null : paymentRecord.getPayWayCode().name());
        paymentRecordMonitorVo.setTradeTime(paymentRecord.getPaySuccessTime());
        paymentRecordMonitorVo.setAmount(paymentRecord.getOrderAmount());
        log.info("訂單消息插入Redis隊(duì)列...");
        if (paymentRecord.getPayWayCode() != null && paymentRecord.getPayWayCode().equals(PayWayEnum.WEIXIN)) {
            JedisHelper.dataCluster().lpush(redisWxQueue,JsonUtil.toJsonString(paymentRecordMonitorVo));
        } else if (paymentRecord.getPayWayCode() != null && paymentRecord.getPayWayCode().equals(PayWayEnum.ALIPAY)) {
            JedisHelper.dataCluster().lpush(redisAlipayQueue,JsonUtil.toJsonString(paymentRecordMonitorVo));
        }
        log.info("訂單消息插入Redis隊(duì)列結(jié)束...");
        // 發(fā)布消息
        log.info("訂單消息發(fā)布到Redis...");
        JedisHelper.dataCluster().publish(redisChannel,JsonUtil.toJsonString(paymentRecordMonitorVo));
        log.info("訂單消息發(fā)布到Redis結(jié)束...");
    }
(二)監(jiān)控服務(wù)
1.主要功能包括:

(1)訂閱Redis中交易服務(wù)發(fā)布過來的訂單消息以及獲取list列表中的訂單數(shù)據(jù)
(2)根據(jù)訂單的交易時(shí)間踢械,按照每15分鐘為一個(gè)數(shù)據(jù)匯總點(diǎn)進(jìn)行匯總
(3)對每15分鐘匯總的SortedSet進(jìn)行統(tǒng)計(jì)后,將結(jié)果再發(fā)布至Redis的消息中

2.遇到的坑:

在監(jiān)控服務(wù)啟動(dòng)的時(shí)候會(huì)進(jìn)行Redis的list列表中數(shù)據(jù)的統(tǒng)計(jì)初始化魄藕,并開啟Redis消息訂閱者的監(jiān)聽内列。但有三個(gè)比較坑的地方就是:
(1)因?yàn)橛玫氖荝edis6個(gè)節(jié)點(diǎn)組成的一個(gè)集群,所以是用JedisCluster背率,但是JedisCluster在2.8.x版本以上才支持消息的發(fā)布與訂閱话瞧,項(xiàng)目原先用的是2.7.3版本
解決方案:把項(xiàng)目Jedis版本改為2.8.1,pom.xml內(nèi)容如下:

 <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>2.8.1</version>
    </dependency>

(2)Redis的消息發(fā)布與訂閱寝姿,在訂閱方必須要手動(dòng)的調(diào)用subscribe()方法移稳,并將監(jiān)聽者和需要監(jiān)聽的通道作為參數(shù)傳入才能開啟監(jiān)聽,而像ActiveMQ這種消息中間件是不需要顯示的調(diào)用会油,只需配置好消息監(jiān)聽者就會(huì)自動(dòng)監(jiān)聽的。還有一個(gè)坑就是subscribe()方法是一個(gè)線程阻塞方法古毛,本想在項(xiàng)目啟動(dòng)的時(shí)候就調(diào)用subscribe()開啟消息的訂閱翻翩,結(jié)果發(fā)現(xiàn)方法調(diào)用后,其他的代碼根本沒法往下執(zhí)行稻薇。
解決方案是:在項(xiàng)目啟動(dòng)的時(shí)候調(diào)用subscribe()方法開啟消息監(jiān)聽嫂冻,并且新開一個(gè)線程去調(diào)用subscribe()方法來避免阻塞主線程。

3.Redis不支持消息的持久化塞椎。在訂閱者沒有啟動(dòng)的時(shí)候桨仿,消息發(fā)布者將消息發(fā)出去了,訂閱者沒有收到案狠,那訂閱者重新啟動(dòng)的時(shí)候也不會(huì)收到之前發(fā)的消息了服傍,而像ActiveMQ是支持消息的持久化的。

解決方案:在往Redis發(fā)布消息的時(shí)候也同樣往Redis的list列表中l(wèi)push一條同樣消息的數(shù)據(jù)(參照上面交易服務(wù)中的代碼)骂铁,消息訂閱者接收到消息并進(jìn)行相應(yīng)的業(yè)務(wù)處理后吹零,再將list列表中的數(shù)據(jù)刪除,那在監(jiān)控服務(wù)掛掉的情況下拉庵,Redis消息無法正常被監(jiān)聽消費(fèi)灿椅,但是Redis的list列表中還是會(huì)存有消息的數(shù)據(jù),所以后續(xù)我們可以從list列表中取出消息數(shù)據(jù)再進(jìn)行相應(yīng)的業(yè)務(wù)處理钞支,這樣就間接的實(shí)現(xiàn)了Redis消息的持久化茫蛹。

3.部分代碼

(1)RedisSubscribeHelper.java:監(jiān)控服務(wù)啟動(dòng)時(shí),進(jìn)行Redis隊(duì)列中數(shù)據(jù)的統(tǒng)計(jì)初始化烁挟,并開啟Redis消息訂閱者的監(jiān)聽的

package com.ylp.core.monitor.redis;

import com.ylp.common.tools.utils.JsonUtil;
import com.ylp.core.monitor.biz.MonitorBiz;
import com.ylp.facade.monitor.utils.JedisHelper;
import com.ylp.facade.monitor.utils.MonitorUtils;
import com.ylp.facade.monitor.vo.PaymentRecordMonitorVo;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;

import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author: Dreyer
 * @date: 16/8/25 下午2:24
 * @description:作用1:項(xiàng)目啟動(dòng)時(shí),對Redis消息未進(jìn)行處理的訂單數(shù)據(jù)進(jìn)行初始化處理; 作用2:Redis消息訂閱的啟動(dòng)器,項(xiàng)目啟動(dòng)時(shí)新啟動(dòng)一個(gè)線程進(jìn)行消息的訂閱
 */
@Component
public class RedisSubscribeHelper {
    private Logger logger = Logger.getLogger(RedisSubscribeHelper.class);

    @Autowired
    private RedisSubscribeListener redisSubscribeListener;
    @Autowired
    private MonitorBiz monitorBiz;

    /**
     * Redis消息訂閱的channel
     */
    @Value("#{settings['redis.trade.channel']}")
    private String redisChannel;

    /**
     * 微信支付的訂單數(shù)據(jù)key
     */
    @Value("#{settings['redis.trade.wxDetails']}")
    private String redisWxQueue;

    /**
     * 支付寶支付的訂單數(shù)據(jù)key
     */
    @Value("#{settings['redis.trade.alipayDetails']}")
    private String redisAlipayQueue;

    /**
     * redis消息訂閱的subscribe方法為阻塞方法,所以需要單獨(dú)啟動(dòng)一個(gè)線程進(jìn)行消息訂閱
     */
    private ExecutorService executorService = Executors.newSingleThreadExecutor();


    /**
     * 構(gòu)造函數(shù)執(zhí)行后,執(zhí)行數(shù)據(jù)初始化與消息訂閱
     */
    @PostConstruct
    public void doWork() {
        init();
        subscribe();
    }

    /**
     * 數(shù)據(jù)的初始化,不能一次性取出所有的數(shù)據(jù),因?yàn)橛锌赡芟到y(tǒng)在啟動(dòng)的時(shí)候Redis的list會(huì)有元素push進(jìn)來
     */
    private void init() {
        long start = System.currentTimeMillis();
        // 避免初始化加載時(shí),JedisCluste還沒完成初始化,所以需要sleep
        JedisCluster jedisCluster = JedisHelper.dataCluster();
        if (jedisCluster == null) {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        logger.info("開始從Redis中獲取微信訂單初始化數(shù)據(jù)....");
        dealData(jedisCluster, redisWxQueue);
        logger.info("微信訂單記錄數(shù)據(jù)全部處理完畢...");

        logger.info("開始從Redis中獲取支付寶訂單初始化數(shù)據(jù)....");
        dealData(jedisCluster, redisAlipayQueue);
        logger.info("支付寶訂單記錄數(shù)據(jù)全部處理完畢...");

        logger.info("初始化數(shù)據(jù)工作耗時(shí):" + (System.currentTimeMillis() - start) + "ms");
    }

    /**
     * 根據(jù)隊(duì)列來處理隊(duì)列中的數(shù)據(jù)
     *
     * @param queueName
     */
    private void dealData(JedisCluster jedisCluster, String queueName) {
        long startTime = MonitorUtils.getCurrentDayZeroTime().getTime();
        long endTime = MonitorUtils.getDateEndTime().getTime();
        Date tradeTime = null;
        String str = "";
        PaymentRecordMonitorVo paymentRecordMonitorVo = null;
        while (true) {
            try {
                str = jedisCluster.lpop(queueName);
                if (StringUtils.isEmpty(str)) {
                    break;
                }
                paymentRecordMonitorVo = JsonUtil.jsonToObject(str, PaymentRecordMonitorVo.class);
                tradeTime = paymentRecordMonitorVo.getTradeTime();
                // 只處理今天的數(shù)據(jù)
                if (tradeTime != null && tradeTime.getTime() >= startTime && tradeTime.getTime() <= endTime) {
                    monitorBiz.periodStatistics(paymentRecordMonitorVo);
                }
            } catch (IOException e) {
                e.printStackTrace();
                break;
            }
        }
    }

    /**
     * Redis消息訂閱
     */
    private void subscribe() {
        logger.info("啟動(dòng)Redis消息訂閱,channel:" + redisChannel);
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                JedisHelper.coreCluster().subscribe(redisSubscribeListener, redisChannel);
            }
        });
    }

}

(2)RedisSubscribeListener.java:Redis消息監(jiān)聽者

package com.ylp.core.monitor.redis;
import com.ylp.common.tools.utils.JsonUtil;
import com.ylp.core.monitor.biz.MonitorBiz;
import com.ylp.facade.monitor.vo.PaymentRecordMonitorVo;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Client;
import redis.clients.jedis.JedisPubSub;
import java.io.IOException;
/**
 * @author: Dreyer
 * @date: 16/8/23 下午2:57
 * @description: Redis消息隊(duì)列監(jiān)聽者
 */
@Component("redisSubscribeListener")
public class RedisSubscribeListener extends JedisPubSub {
    private Logger logger = Logger.getLogger(RedisSubscribeListener.class);
    @Autowired
    private MonitorBiz monitorBiz;
    public RedisSubscribeListener() {
        super();
    }
    @Override
    public void onMessage(String channel, String message) {
        logger.info("Redis received...");
        logger.info("channel:" + channel + ",message:" + message);
        if (StringUtils.isEmpty(message)) {
            return;
        }
        PaymentRecordMonitorVo paymentRecordMonitorVo = null;
        try {
            paymentRecordMonitorVo = JsonUtil.jsonToObject(message, PaymentRecordMonitorVo.class);
        } catch (IOException e) {
            logger.error("message轉(zhuǎn)PaymentRecordMonitorVo異常,message=" + message, e);
            return;
        }
        // 先把該筆交易訂單信息解析存入對應(yīng)的統(tǒng)計(jì)時(shí)間段內(nèi),每隔15分鐘匯總一次數(shù)據(jù),存入Redis數(shù)據(jù)匯總集合
        long start = System.currentTimeMillis();
        monitorBiz.periodStatistics(paymentRecordMonitorVo);
        logger.info("monitorBiz.periodStatistics()執(zhí)行耗時(shí):" + (System.currentTimeMillis() - start) + "ms");
        // 再統(tǒng)計(jì)當(dāng)天各個(gè)時(shí)間段的總數(shù)據(jù),即為實(shí)時(shí)的交易數(shù)據(jù)
        start = System.currentTimeMillis();
        monitorBiz.publishRealTimeData();
        logger.info("monitorBiz.publishRealTimeData()執(zhí)行耗時(shí):" + (System.currentTimeMillis() - start) + "ms");
        super.onMessage(channel, message);
    }
    
}

(3)MonitorBiz.java:Redis數(shù)據(jù)統(tǒng)計(jì)業(yè)務(wù)類

package com.ylp.core.monitor.biz;
import com.ylp.common.tools.utils.DateUtils;
import com.ylp.common.tools.utils.JsonUtil;
import com.ylp.facade.monitor.utils.JedisHelper;
import com.ylp.facade.monitor.utils.MonitorUtils;
import com.ylp.facade.monitor.utils.NumberUtil;
import com.ylp.facade.monitor.vo.PaymentRecordMonitorVo;
import com.ylp.facade.monitor.vo.ScreenDataVo;
import com.ylp.facade.monitor.vo.StatisticsVo;
import com.ylp.facade.trade.enums.PayWayEnum;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import redis.clients.jedis.Tuple;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
 * @author: Dreyer
 * @date: 16/8/25 下午6:12
 * @description: Redis數(shù)據(jù)統(tǒng)計(jì)業(yè)務(wù)類
 */
@Component("monitorBiz")
public class MonitorBiz {
    private Logger logger = Logger.getLogger(MonitorBiz.class);
    /**
     * 實(shí)時(shí)數(shù)據(jù)監(jiān)控的數(shù)據(jù)隊(duì)列
     */
    @Value("#{settings['redis.trade.screen.channel']}")
    private String redisTradeScreenChannel;
    /**
     * 微信支付的訂單詳細(xì)數(shù)據(jù)key
     */
    @Value("#{settings['redis.trade.wxDetails']}")
    private String redisWxQueue;
    /**
     * 支付寶支付的訂單詳細(xì)數(shù)據(jù)key
     */
    @Value("#{settings['redis.trade.alipayDetails']}")
    private String redisAlipayQueue;
    /**
     * 微信支付的訂單匯總數(shù)據(jù)
     */
    @Value("#{settings['redis.trade.wxSummary']}")
    private String redisWxStatisticsQueue;
    /**
     * 支付寶支付的訂單匯總數(shù)據(jù)
     */
    @Value("#{settings['redis.trade.alipaySummary']}")
    private String redisAlipayStatisticsQueue;
    /**
     * 單個(gè)線程的線程池
     */
    private ExecutorService executorService = Executors.newSingleThreadExecutor();
    /**
     * 分時(shí)間間隔的數(shù)據(jù)統(tǒng)計(jì),將時(shí)間段內(nèi)的數(shù)據(jù)存入Redis有序集合
     *
     * @param paymentRecordMonitorVo 交易訂單信息
     * @return
     */
    public void periodStatistics(final PaymentRecordMonitorVo paymentRecordMonitorVo) {
        // 根據(jù)支付訂單的交易時(shí)間獲取該筆交易應(yīng)該劃分至哪個(gè)統(tǒng)計(jì)時(shí)間段內(nèi)
        Map<String, Date> dateMap = MonitorUtils.getTimeSpace(paymentRecordMonitorVo.getTradeTime());
        Date start = dateMap.get("previous");
        Date end = dateMap.get("next");
        logger.info("start:" + DateUtils.dateToTime(start) + ",end:" + DateUtils.dateToTime(end));
        String payWay = paymentRecordMonitorVo.getPayWay();
        String queue = payWay.equals(PayWayEnum.WEIXIN.name()) ? redisWxStatisticsQueue : payWay.equals(PayWayEnum.ALIPAY.name()) ? redisAlipayStatisticsQueue : "";
        StatisticsVo result = new StatisticsVo();
        result.setPayWay(payWay);
        result.setPeriodTime(end);
        // 查詢該時(shí)間段內(nèi),統(tǒng)計(jì)集合中的數(shù)據(jù)
        Set<String> wxSet = JedisHelper.dataCluster().zrangeByScore(queue, end.getTime(), end.getTime());
        // 如果匯總集合中沒有該時(shí)間段的數(shù)據(jù),那么則新增
        if (CollectionUtils.isEmpty(wxSet)) {
            logger.info("wxSet is null.... ");
            result.setTotalOrderCount(1);
            result.setTotalOrderAmount(paymentRecordMonitorVo.getAmount().doubleValue());
            JedisHelper.dataCluster().zadd(queue, end.getTime(), JsonUtil.toJsonString(result));
        } else {
            logger.info("wxSet is not null and wxSet.size()=" + wxSet.size());
            Double amount = 0.0;
            Integer count = 0;
            Iterator<String> iterator = wxSet.iterator();
            while (iterator.hasNext()) {
                try {
                    StatisticsVo statisticsVo = JsonUtil.jsonToObject(iterator.next(), StatisticsVo.class);
                    logger.info("statisticsVo:" + JsonUtil.toJsonString(statisticsVo));
                    amount = NumberUtil.add(statisticsVo.getTotalOrderAmount().doubleValue(), amount.doubleValue());
                    count = count + statisticsVo.getTotalOrderCount();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            result.setTotalOrderCount(count + 1);
            result.setTotalOrderAmount(NumberUtil.add(amount, paymentRecordMonitorVo.getAmount().doubleValue()));
            // 先移除舊數(shù)據(jù)
            long removeRow = JedisHelper.dataCluster().zremrangeByScore(queue, end.getTime(), end.getTime());
            logger.info("移除舊數(shù)據(jù)記錄數(shù):" + removeRow);
            // 再插入最新的數(shù)據(jù)
            JedisHelper.dataCluster().zadd(queue, end.getTime(), JsonUtil.toJsonString(result));
        }
        // 異步刪除Redis隊(duì)列中的訂單詳細(xì)信息
        final String key = payWay.equals(PayWayEnum.WEIXIN.name()) ? redisWxQueue : payWay.equals(PayWayEnum.ALIPAY.name()) ? redisAlipayQueue : "";
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                // 指定元素進(jìn)行刪除
                Long row = JedisHelper.dataCluster().lrem(key, 0, JsonUtil.toJsonString(paymentRecordMonitorVo));
                logger.info("刪除明細(xì)記錄數(shù):" + row);
            }
        });
        logger.info("result:" + JsonUtil.toJsonString(result));
    }
    /**
     * 實(shí)時(shí)數(shù)據(jù)推送
     */
    public void publishRealTimeData() {
        ScreenDataVo screenDataVo = dayStatistics();
        // 將屏幕監(jiān)控的數(shù)據(jù)放入Redis消息中
        JedisHelper.dataCluster().publish(redisTradeScreenChannel, JsonUtil.toJsonString(screenDataVo));
        logger.info("監(jiān)控實(shí)時(shí)匯總數(shù)據(jù)消息發(fā)送成功...");
    }
}
(三)婴洼、監(jiān)控應(yīng)用系統(tǒng)

系統(tǒng)采用前后端分離的模式,后端提供獲取初始化數(shù)據(jù)的API給前端進(jìn)行調(diào)用信夫,數(shù)據(jù)初始化完成后窃蹋,每筆交易產(chǎn)生的數(shù)據(jù)變動(dòng)都通過WebSocket的方式實(shí)時(shí)傳輸?shù)綖g覽器客戶端進(jìn)行展示卡啰。

1.主要功能包括:

(1)實(shí)現(xiàn)WebSocket的服務(wù)端,將實(shí)時(shí)的交易數(shù)據(jù)推送至客戶端
(2)作為Redis消息的訂閱方警没,獲取到消息數(shù)據(jù)后匈辱,將數(shù)據(jù)通過WebSocket的方式廣播出去

2.遇到的坑

(1)在本地測試的時(shí)候,WebSocket服務(wù)端與客戶端能進(jìn)行正常的通信杀迹,部署至服務(wù)器后亡脸,客戶端卻無法連接到服務(wù)端,后來發(fā)現(xiàn)本地IDEA運(yùn)行的是Tomcat8树酪,服務(wù)器上部署的是Tomcat7.x版本浅碾,于是把服務(wù)器上的Tomcat版本換成8后就能進(jìn)行正常的通信了。
WebSocket所需要的jar包如下:

        <dependency>
            <groupId>javax.websocket</groupId>
            <artifactId>javax.websocket-api</artifactId>
            <version>1.0</version>
        </dependency>
        <dependency>
            <groupId>org.glassfish.tyrus.bundles</groupId>
            <artifactId>tyrus-standalone-client</artifactId>
            <version>1.9</version>
        </dependency>

3.部分代碼

(1)WebSocketServer.java:WebSocket服務(wù)端

package com.ylp.webSocket.server;
import org.apache.log4j.Logger;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
 * @author: Dreyer
 * @date: 16/8/24 上午11:25
 * @description:WebSocket服務(wù)端
 */
@ServerEndpoint("/tradeSocketServer")
public class WebSocketServer {
    private Logger logger = Logger.getLogger(this.getClass().getName());
    /**
     * 客戶端會(huì)話集合
     */
    private static Map<String, Session> sessionMap = new ConcurrentHashMap<String, Session>();
    /**
     * 連接建立成功需要執(zhí)行的方法
     *
     * @param session
     */
    @OnOpen
    public void onOpen(Session session) {
        logger.info("onOpen,sessionId:" + session.getId());
        sessionMap.put(session.getId(), session);
    }
    /**
     * 收到客戶端調(diào)用后需要執(zhí)行的方法
     *
     * @param message
     * @param session
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        logger.info("received message:" + message);
        broadcastAll(message);
    }
    /**
     * 廣播給所有客戶端
     *
     * @param message
     */
    public static void broadcastAll(String message) {
        Set<Map.Entry<String, Session>> set = sessionMap.entrySet();
        for (Map.Entry<String, Session> i : set) {
            try {
                i.getValue().getBasicRemote().sendText(message);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    /**
     * 連接關(guān)閉時(shí)執(zhí)行的方法
     *
     * @param session
     * @param closeReason
     */
    @OnClose
    public void onClose(Session session, CloseReason closeReason) {
        sessionMap.remove(session.getId());
        logger.info(String.format("Session %s closed because of %s", session.getId(), closeReason));
    }
    /**
     * 發(fā)生錯(cuò)誤時(shí)執(zhí)行的方法
     *
     * @param session
     * @param throwable
     */
    @OnError
    public void error(Session session, Throwable throwable) {
        sessionMap.remove(session.getId());
        logger.error("異常", throwable);
    }
}

(3)RedisSubscribeListener.java:Redis消息監(jiān)聽者续语,這個(gè)工程中又要實(shí)現(xiàn)一遍垂谢,只不過是收到消息后需要執(zhí)行的業(yè)務(wù)邏輯不一樣,同樣的疮茄,RedisSubcribeHelper.java類也要重新實(shí)現(xiàn)一遍滥朱,后續(xù)可以優(yōu)化

package com.ylp.redis;
import com.ylp.common.tools.utils.JsonUtil;
import com.ylp.facade.monitor.vo.ScreenDataVo;
import com.ylp.webSocket.server.WebSocketServer;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Client;
import redis.clients.jedis.JedisPubSub;
import java.io.IOException;
/**
 * @author: Dreyer
 * @date: 16/8/23 下午2:57
 * @description: Redis消息隊(duì)列監(jiān)聽者
 */
@Component("redisSubscribeListener")
public class RedisSubscribeListener extends JedisPubSub {
    private Logger logger = Logger.getLogger(RedisSubscribeListener.class);
    public RedisSubscribeListener() {
        super();
    }
    @Override
    public void onMessage(String channel, String message) {
        logger.info("Redis received...");
        logger.info("channel:" + channel + ",message:" + message);
        if (StringUtils.isEmpty(message)) {
            return;
        }
        try {
            ScreenDataVo screenDataVo = JsonUtil.jsonToObject(message, ScreenDataVo.class);
            logger.info("screenDataVo:" + JsonUtil.toJsonString(screenDataVo));
        } catch (IOException e) {
            logger.error("message轉(zhuǎn)ScreenDataVo異常,message=" + message, e);
            e.printStackTrace();
        }
        // 將實(shí)時(shí)交易數(shù)據(jù)推送至WebSocketServer
        WebSocketServer.broadcastAll(message);
        super.onMessage(channel, message);
    }
}

(4)WebSocket的客戶端(模擬)

<%@ page language="java" contentType="text/html; charset=UTF-8"
    pageEncoding="UTF-8"%>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
    <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
    <title>Index</title>
    <script type="text/javascript">
        var ws = null;
        function startWebSocket() {
            if ('WebSocket' in window)
                ws = new WebSocket("ws://172.xx.xx.xx/tradeSocketServer");
            else if ('MozWebSocket' in window)
                ws = new MozWebSocket("ws://172.xx.xx.xx/tradeSocketServer");
            else
                alert("not support");
            ws.onmessage = function (evt) {
                alert(evt.data);
            };
            ws.onclose = function (evt) {
                alert("close");
            };
            ws.onopen = function (evt) {
                alert("open");
            };
        }
        function sendMsg() {
            ws.send(document.getElementById('writeMsg').value);
        }
    </script>
</head>
<body onload="startWebSocket();">
<input type="text" id="writeMsg"></input>
<input type="button" value="send" onclick="sendMsg()"></input>
</body>
</html>
(四)WebSocket服務(wù)端通過域名訪問配置Nginx

1.WebSocket服務(wù)端如果要通過域名來訪問,需要配置Nginx力试,附上Nginx相關(guān)的配置信息徙邻,需要特別配置map選項(xiàng)
Nginx版本:Tengine version: Tengine/2.1.1 (nginx/1.6.2)

map $http_upgrade $connection_upgrade {
    default upgrade;
    '' close;
}
server {
    listen   80;
    server_name xxx.xxx.com;
     location / {
        root   /usr/local/www/monitorsystem;
        index index.html;
        try_files $uri $uri/ /index.html =404;
    }
    location /monitor/ {
        proxy_pass http://172.xx.xx.xx:0000/;
        add_header 'Access-Control-Allow-Origin' '*';
        add_header 'Access-Control-Allow-Credentials' 'true';
        add_header 'Access-Control-Allow-Methods' 'POST, GET, OPTIONS,PUT,DELETE';
        add_header 'Access-Control-Allow-Headers' '*,token';
        proxy_set_header Host $http_host;
        proxy_set_header Cookie $http_cookie;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        proxy_read_timeout 1200;
        client_max_body_size  100m;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
    }

2.遇到的坑
在客戶端不活動(dòng)的情況下,默認(rèn)60秒后會(huì)客戶端會(huì)自動(dòng)斷開與服務(wù)端的連接畸裳,通過Nginx中的proxy_red_timeout參數(shù)可以配置缰犁,客戶端也可以通過每隔一段時(shí)間去請求一次服務(wù)端,保持于服務(wù)端的連接怖糊。

四帅容、總結(jié)與優(yōu)化

1、在測試部門進(jìn)行高并發(fā)測試的情況下蓬抄,periodStatistics()統(tǒng)計(jì)方法會(huì)出現(xiàn)數(shù)據(jù)統(tǒng)計(jì)錯(cuò)亂的情況丰嘉,所以針對這個(gè)方法要加上同步代碼塊

2、上述的代碼在單臺(tái)服務(wù)器部署的情況下嚷缭,基本上沒啥問題饮亏,但是如果部署在多臺(tái)服務(wù)器上(分布式部署),那么很多問題就發(fā)生咯阅爽,比如說:
問題1:Redis消息發(fā)布與訂閱路幸,消息發(fā)布出去了,A服務(wù)器中的應(yīng)用會(huì)收到消息然后進(jìn)行統(tǒng)計(jì)付翁,B服務(wù)器中的應(yīng)用也會(huì)收到相同的消息然后進(jìn)行計(jì)算简肴,那計(jì)算出來的結(jié)果就多出一倍的數(shù)據(jù)了。

問題2:當(dāng)日實(shí)時(shí)數(shù)據(jù)是統(tǒng)計(jì)Redis中“每15分鐘匯總”數(shù)據(jù)的SortedSet百侧,即一天被分為96條匯總數(shù)據(jù)砰识,統(tǒng)計(jì)當(dāng)日實(shí)時(shí)數(shù)據(jù)的時(shí)候就把96條數(shù)據(jù)進(jìn)行累加即可能扒。但是 但是在每一筆訂單過來的時(shí)候,會(huì)先計(jì)算出此次統(tǒng)計(jì)的最新結(jié)果辫狼,再把舊的“每15分鐘匯總”數(shù)據(jù)記錄給刪除初斑,然后再插入此次計(jì)算的最新結(jié)果(這樣做的原因是SortedSet沒有更新的方法),代碼請看MonitorBiz的periodStatistics()方法膨处,這樣就會(huì)存在一個(gè)問題:A服務(wù)器計(jì)算出最新統(tǒng)計(jì)結(jié)構(gòu)见秤,然后把舊數(shù)據(jù)刪除了,還沒插入最新統(tǒng)計(jì)數(shù)據(jù)的時(shí)候真椿,B機(jī)器去獲取當(dāng)日實(shí)時(shí)數(shù)據(jù)鹃答,這時(shí)候就會(huì)缺少那條已刪除還未插入的數(shù)據(jù),造成數(shù)據(jù)的錯(cuò)誤突硝。

那針對上面兩個(gè)問題的解決方案是:
問題1解決方案:Redis消息發(fā)送的時(shí)候只發(fā)布一條簡單的消息通知即可测摔,把交易訂單的數(shù)據(jù)存入Redis的list隊(duì)列中,如果多個(gè)服務(wù)器都收到了消息解恰,便去list隊(duì)列中獲取最新的一條記錄避咆,取到則進(jìn)行處理,取不到則直接返回修噪,Redis的list隊(duì)列是能保證只有一臺(tái)機(jī)器能取到數(shù)據(jù)的。

問題2解決方案:最開始設(shè)計(jì)的時(shí)候決定用SortedSet這種數(shù)據(jù)結(jié)構(gòu)是因?yàn)樗軡M足排序的要求路媚,但是從上面的問題看來黄琼,它也只是能解決排序的問題而已(前端要求要按時(shí)間排好順序在“獲取初始化數(shù)據(jù)”的接口中將數(shù)據(jù)發(fā)給他),對于并發(fā)的讀寫數(shù)據(jù)的操作都不能很好的支持整慎,并不能解決上面所說的問題脏款,所以最后決定改用Hash這種數(shù)據(jù)結(jié)構(gòu),原因是因?yàn)樗С衷有缘臄?shù)據(jù)累加操作裤园,在高并發(fā)的情況下撤师,能有很好的支持,如hincrby拧揽、hincrbyfloat方法剃盾。

/**
     * 1.每隔15分鐘匯總一次數(shù)據(jù),存入Redis數(shù)據(jù)匯總集合
     * 2.當(dāng)日實(shí)時(shí)數(shù)據(jù)進(jìn)行累加操作
     *
     * @param message 交易訂單類型
     * @return
     */
    public void periodStatistics(String message) {
        String listStr = "";
        if (StringUtils.isNotEmpty(message) && message.equals(PayWayEnum.WEIXIN.name())) {
            listStr = JedisHelper.dataCluster().lpop(redisWxQueue);
        } else if (StringUtils.isNotEmpty(message) && message.equals(PayWayEnum.ALIPAY.name())) {
            listStr = JedisHelper.dataCluster().lpop(redisAlipayQueue);
        }
        // 為空則說明數(shù)據(jù)被其他服務(wù)器取走了,則不用進(jìn)行處理
        if (StringUtils.isEmpty(listStr)) {
            return;
        }
        PaymentRecordMonitorVo paymentRecordMonitorVo = null;
        try {
            paymentRecordMonitorVo = JsonUtil.jsonToObject(listStr, PaymentRecordMonitorVo.class);
        } catch (IOException e) {
            e.printStackTrace();
            return;
        }
        // 每日實(shí)時(shí)數(shù)據(jù)的hash結(jié)構(gòu)增量統(tǒng)計(jì)
        increaseBy(paymentRecordMonitorVo);
        // 分時(shí)間段的匯總統(tǒng)計(jì)
        periodStatistics(paymentRecordMonitorVo);
    }
    /**
     * 每日實(shí)時(shí)數(shù)據(jù)的hash結(jié)構(gòu)增量統(tǒng)計(jì)
     *
     * @param paymentRecordMonitorVo
     */
    private void increaseBy(PaymentRecordMonitorVo paymentRecordMonitorVo) {
        // step1:微信訂單數(shù)據(jù)處理
        if (paymentRecordMonitorVo.getPayWay().equals(PayWayEnum.ALIPAY.name())) {
            JedisHelper.dataCluster().hincrBy(daySummaryKey, dayAlipayCount, 1);
            JedisHelper.dataCluster().hincrByFloat(daySummaryKey, dayAlipayAmount, paymentRecordMonitorVo.getAmount().doubleValue());
            // step2:支付寶訂單數(shù)據(jù)處理
        } else if (paymentRecordMonitorVo.getPayWay().equals(PayWayEnum.WEIXIN.name())) {
            JedisHelper.dataCluster().hincrBy(daySummaryKey, dayWxCount, 1);
            JedisHelper.dataCluster().hincrByFloat(daySummaryKey, dayWxAmount, paymentRecordMonitorVo.getAmount().doubleValue());
        }
        // step3:總的數(shù)據(jù)增加
        JedisHelper.dataCluster().hincrBy(daySummaryKey, dayTotalCount, 1);
        JedisHelper.dataCluster().hincrByFloat(daySummaryKey, dayTotalAmount, paymentRecordMonitorVo.getAmount().doubleValue());
    }

3、由于系統(tǒng)在設(shè)計(jì)之初是滿足實(shí)時(shí)交易數(shù)據(jù)顯示的淤袜,所以在每一筆訂單過來的時(shí)候都會(huì)進(jìn)行數(shù)據(jù)統(tǒng)計(jì)痒谴,然后利用WebSocket進(jìn)行實(shí)時(shí)數(shù)據(jù)推送到客戶端,在進(jìn)行壓力測試的時(shí)候铡羡,每秒的并發(fā)達(dá)到上百個(gè)积蔚,于是瀏覽器客戶端就會(huì)發(fā)生崩潰的現(xiàn)象,因?yàn)槊棵攵紩?huì)向?yàn)g覽器客戶端推送上百次
解決方案:在服務(wù)端進(jìn)行數(shù)據(jù)統(tǒng)計(jì)后烦周,每隔2秒把最新的數(shù)據(jù)推送至瀏覽器客戶端尽爆。
4怎顾、前期開發(fā)的時(shí)候?yàn)榱烁玫恼{(diào)試與問題的定位,所以寫了很多l(xiāng)ogger.info()這種日志記錄操作漱贱,在上生產(chǎn)環(huán)境的時(shí)候要把這些內(nèi)容去掉或者日志級(jí)別改為debug槐雾,這種日志記錄會(huì)設(shè)計(jì)到磁盤IO讀寫,在某些時(shí)候可能會(huì)成為系統(tǒng)性能的瓶頸饱亿。

注:上述的代碼大部分都已經(jīng)過優(yōu)化蚜退,優(yōu)化后的代碼就不貼出來了。
聰明的讀者要是還發(fā)現(xiàn)設(shè)計(jì)與實(shí)現(xiàn)中有不足之處彪笼,歡迎指正钻注,bu'she
反思與總結(jié),嗯配猫。幅恋。。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末泵肄,一起剝皮案震驚了整個(gè)濱河市捆交,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌腐巢,老刑警劉巖品追,帶你破解...
    沈念sama閱讀 222,627評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異冯丙,居然都是意外死亡肉瓦,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,180評論 3 399
  • 文/潘曉璐 我一進(jìn)店門胃惜,熙熙樓的掌柜王于貴愁眉苦臉地迎上來泞莉,“玉大人,你說我怎么就攤上這事船殉■瓿茫” “怎么了?”我有些...
    開封第一講書人閱讀 169,346評論 0 362
  • 文/不壞的土叔 我叫張陵利虫,是天一觀的道長挨厚。 經(jīng)常有香客問我,道長糠惫,這世上最難降的妖魔是什么幽崩? 我笑而不...
    開封第一講書人閱讀 60,097評論 1 300
  • 正文 為了忘掉前任,我火速辦了婚禮寞钥,結(jié)果婚禮上慌申,老公的妹妹穿的比我還像新娘。我一直安慰自己,他們只是感情好蹄溉,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,100評論 6 398
  • 文/花漫 我一把揭開白布咨油。 她就那樣靜靜地躺著,像睡著了一般柒爵。 火紅的嫁衣襯著肌膚如雪役电。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,696評論 1 312
  • 那天棉胀,我揣著相機(jī)與錄音法瑟,去河邊找鬼。 笑死唁奢,一個(gè)胖子當(dāng)著我的面吹牛霎挟,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播麻掸,決...
    沈念sama閱讀 41,165評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼酥夭,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了脊奋?” 一聲冷哼從身側(cè)響起熬北,我...
    開封第一講書人閱讀 40,108評論 0 277
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎诚隙,沒想到半個(gè)月后讶隐,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,646評論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡久又,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,709評論 3 342
  • 正文 我和宋清朗相戀三年整份,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片籽孙。...
    茶點(diǎn)故事閱讀 40,861評論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖火俄,靈堂內(nèi)的尸體忽然破棺而出犯建,到底是詐尸還是另有隱情,我是刑警寧澤瓜客,帶...
    沈念sama閱讀 36,527評論 5 351
  • 正文 年R本政府宣布适瓦,位于F島的核電站,受9級(jí)特大地震影響谱仪,放射性物質(zhì)發(fā)生泄漏玻熙。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,196評論 3 336
  • 文/蒙蒙 一疯攒、第九天 我趴在偏房一處隱蔽的房頂上張望嗦随。 院中可真熱鬧,春花似錦、人聲如沸枚尼。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,698評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽署恍。三九已至崎溃,卻和暖如春恋谭,著一層夾襖步出監(jiān)牢的瞬間匾旭,已是汗流浹背拓型。 一陣腳步聲響...
    開封第一講書人閱讀 33,804評論 1 274
  • 我被黑心中介騙來泰國打工银舱, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留油挥,地道東北人稻扬。 一個(gè)月前我還...
    沈念sama閱讀 49,287評論 3 379
  • 正文 我出身青樓想诅,卻偏偏與公主長得像账锹,于是被迫代替她去往敵國和親朵逝。 傳聞我的和親對象是個(gè)殘疾皇子蔚袍,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,860評論 2 361

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)配名,斷路器啤咽,智...
    卡卡羅2017閱讀 134,716評論 18 139
  • 本文將從Redis的基本特性入手宇整,通過講述Redis的數(shù)據(jù)結(jié)構(gòu)和主要命令對Redis的基本能力進(jìn)行直觀介紹。之后概...
    kelgon閱讀 61,177評論 23 625
  • 由于百度網(wǎng)盤的不靠譜,將我之前上傳的BT天堂全站種子全刪了为朋,所以我覺得呢這個(gè)還是不方便臂拓。授人以魚不如授人以漁,現(xiàn)在...
    Vaayne閱讀 11,111評論 0 7
  • 有沒有人會(huì)突然之間對一件事充滿興趣,或者對一個(gè)人霞溪,但也會(huì)突然間缺少了興趣孵滞。這就仿佛是當(dāng)我們在一件事件中受到打擊或者...
    七月小莫閱讀 235評論 0 0
  • 你在那里, 寂靜無語鸯匹, 眼里含著淚水坊饶, 我在這里, 無聲無息殴蓬, 心里藏著委屈匿级, 屋內(nèi)的空氣凝固, 幸福無法呼吸, ...
    loveneface閱讀 418評論 0 51