阿里云之消息服務(wù)實(shí)戰(zhàn)

概述

阿里云消息服務(wù)(Message Service藤滥,原 MQS)是阿里云商用的消息中間件服務(wù),基于阿里的飛天系統(tǒng)颗味,具有大規(guī)模超陆、高可靠、高可用以及較強(qiáng)的消息堆積能力浦马。由于項(xiàng)目架構(gòu)中时呀,對數(shù)據(jù)的及時(shí)性沒有過高的要求,故采用消息服務(wù)來解耦接口的數(shù)據(jù)同步功能晶默。

場景描述

在項(xiàng)目中有個前端管理界面稱為manager模塊谨娜,主要功能是配置管理一些配置數(shù)據(jù),支持新增磺陡、修改趴梢、刪除以及批量新增、批量修改币他、批量刪除功能坞靶,甚至批量從excel文件中導(dǎo)入的功能。對數(shù)據(jù)操作的同時(shí)蝴悉,需要將數(shù)據(jù)同步到各個agent的模塊中彰阴,agent模塊需要對這些數(shù)據(jù)進(jìn)行入庫、入redis的操作拍冠,系統(tǒng)架構(gòu)圖如下尿这。


初版架構(gòu)圖.png

在不使用消息服務(wù)前,通過接口的方式庆杜,在manager模塊增刪改的時(shí)候射众,將修改的數(shù)據(jù)實(shí)時(shí)同步到各個agent中。這會出現(xiàn)一個問題晃财,如果數(shù)據(jù)量較小的話叨橱,同步速度還是可以理解,如果同步量過大的話断盛,會出現(xiàn)很明顯的卡頓情況雏逾。可能很多朋友會問郑临,為何不做異步尼栖博? 異步的確也是一種解決方法,不過當(dāng)某一條數(shù)據(jù)同步失敗的時(shí)候厢洞,我們不好獲取這些異常仇让,并且無法追蹤這些異常并進(jìn)行回滾操作典奉,因?yàn)樾枰3謒anager模塊與agent模塊同步回滾,只要agent模塊失敗丧叽,manager模塊必須要回滾卫玖。由于對這些數(shù)據(jù)的實(shí)時(shí)性沒有太高的要求,為此引入消息隊(duì)列踊淳。

架構(gòu)設(shè)計(jì)

阿里云消息服務(wù)MNS 已經(jīng)提供隊(duì)列(queue)和主題(topic)兩種模型假瞬。其中隊(duì)列提供的是一對多的共享消息消費(fèi)模型,采用客戶端主動拉扔爻ⅰ(Pull)模式脱茉;主題模型提供一對多的廣播消息消費(fèi)模型,并且采用服務(wù)端主動推送(Push)模式垄开。上面兩種模型基本能滿足我們大多數(shù)應(yīng)用場景琴许。
推送模式的好處是即時(shí)性能比較好,但是需要暴露客戶端地址來接收服務(wù)端的消息推送溉躲。有些情況下榜田,比如企業(yè)內(nèi)網(wǎng),我們無法暴露推送地址锻梳,希望改用拉燃(Pull)的方式。雖然MNS不直接提供這種消費(fèi)模型疑枯,但是我們可以結(jié)合主題和隊(duì)列來實(shí)現(xiàn)一對多的拉取消息消費(fèi)模型邦鲫。

阿里的官方文檔中,其廣播拉取消息模型最佳實(shí)踐正好符合項(xiàng)目的架構(gòu)模型神汹,故采用其模型。下圖是廣播拉取消息模型古今。


廣播拉取消息模型.png

根據(jù)該模型的改造屁魏,改造了符合當(dāng)前項(xiàng)目場景的業(yè)務(wù)模型。


架構(gòu)圖.png

接口說明

Java SDK(1.1.5)中的CloudPullTopic 默認(rèn)支持上述解決方案捉腥。其中MNSClient 提供下面兩個接口來快速創(chuàng)建CloudPullTopic:

public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector<String> queueNameList, boolean needCreateQueue, QueueMeta queueMetaTemplate)
public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector<String> queueNameList)

其中氓拼,TopicMeta 是創(chuàng)建topic的meta 設(shè)置, queueNameList里指定topic消息推送的隊(duì)列名列表抵碟;needCreateQueue表明queueNameList是否需要創(chuàng)建桃漾;queueMetaTemplate是創(chuàng)建queue需要的queue meta 參數(shù)設(shè)置;

生產(chǎn)者manager代碼

作為數(shù)據(jù)生產(chǎn)者的manager端拟逮,bean配置文件中配置如下代碼的實(shí)例bean撬统,目的為不斷的插入數(shù)據(jù)到隊(duì)列為消費(fèi)者各個agent提供數(shù)據(jù)源。

    @Value("${aliyun.mns.accessKeyId}")
    private String accessKeyId;

    @Value("${aliyun.mns.accessKeySecret}")
    private String accessKeySecret;

    @Value("${aliyun.mns.accountEndpoint}")
    private String accountEndpoint;

    @Value("${aliyun.mns.queueNameProducers}")
    private String queueNameProducers;

    @Value("${aliyun.mns.topicNameProducer}")
    private String topicName;
    @Bean
    public MNSClient mnsClient() {
        CloudAccount account = new CloudAccount(accessKeyId, accessKeySecret, accountEndpoint);
        MNSClient client = account.getMNSClient();
        return client;
    }

    @Bean
    public CloudPullTopic cloudPullTopic(@Qualifier("mnsClient")MNSClient mnsClient){

        String[] queueArr = queueNameProducers.split(",");
        List list = Arrays.asList(queueArr);

        Vector<String> queueNameList = new Vector<>();
        queueNameList.addAll(list);

        for (String queueName : queueNameList) {
            if (!mnsClient.getQueueRef(queueName).isQueueExist()) {
                QueueMeta qMeta = new QueueMeta();
                qMeta.setQueueName(queueName);
                qMeta.setPollingWaitSeconds(WAIT_SECONDS);
                mnsClient.createQueue(qMeta);
            }
        }

        TopicMeta topicMeta = new TopicMeta();
        topicMeta.setTopicName(topicName);

        return mnsClient.createPullTopic(topicMeta,queueNameList,false,null);
    }

如上代碼中敦迄,對當(dāng)前隊(duì)列做了一個為空判斷恋追,如果第一個隊(duì)列不存在凭迹,采用新增隊(duì)列的方式:createPullTopic(topicMeta,queueNameList,true,queueMetaTemplate);
其中第三個參數(shù)true代表當(dāng)前是重新創(chuàng)建隊(duì)列的方式。
同步到隊(duì)列的統(tǒng)一數(shù)據(jù)格式:

/**
 * 結(jié)果類
 *
 * @author huwk
 * @date 2018/12/5
 */
public class SynModel {

    /**
     * 操作方法
     */
    private String action;

    /**
     * 返回對象
     */
    private Object result;

    public SynModel(){}

    public SynModel(String action, Object result) {
        this.action = action;
        this.result = result;
    }

    public String getAction() {
        return action;
    }

    public void setAction(String action) {
        this.action = action;
    }

    public Object getResult() {
        return result;
    }

    public void setResult(Object result) {
        this.result = result;
    }
}

因?yàn)闃I(yè)務(wù)需要苦囱,考慮到隊(duì)列中會推入不同的格式的數(shù)據(jù)嗅绸,故統(tǒng)一定義一個實(shí)體類,統(tǒng)一下數(shù)據(jù)結(jié)構(gòu)撕彤,action表示具體操作的類型鱼鸠,result表示放入的實(shí)體對象。
如下是發(fā)送消息代碼:

/* 同步隊(duì)列 */
        try {
            /*封裝對象*/
            EnterpriseWhite white = new EnterpriseWhite();
            white.setStatus(status);
            white.setEnterpriseId(enterpriseId);
            white.setComment(comment);
            white.setCreateTime(new Date());

            SynModel synModel = new SynModel(QueueAction.CreateEnterpriseWhite.name(),white);
            /*發(fā)送消息*/
            publishObjectMessage(synModel);

        } catch (Exception e) {
            logger.error("數(shù)據(jù)接口異常", e);
        }

使用了一個QueueAction枚舉類型羹铅,定義了各種操作某種實(shí)體的類型蚀狰,這樣下次消費(fèi)者去取數(shù)據(jù)的時(shí)候,根據(jù)其類型可以給指定的消費(fèi)者消費(fèi)睦裳。
抽取的推送數(shù)據(jù)給隊(duì)列的公共方法

/**
     * 抽取解析對象以及發(fā)送消息方法
     *
     * @param synModel
     * @throws JsonProcessingException
     */
    private void publishObjectMessage(SynModel synModel) throws JsonProcessingException {
        /*轉(zhuǎn)換為json字符串*/
        String objStr = objectMapper.writeValueAsString(synModel);

        /*主題消息封裝*/
        TopicMessage topicMessage = new Base64TopicMessage();
        topicMessage.setBaseMessageBody(objStr);

        /*發(fā)送主題消息*/
        cloudPullTopic.publishMessage(topicMessage);
    }

消費(fèi)者agent代碼

定義一個線程類造锅,讓其在項(xiàng)目啟動的時(shí)候就開啟線程。agent模塊根據(jù)FIFO廉邑,根據(jù)不同的SynModel類型去從消息隊(duì)列中取消息并消費(fèi)掉哥蔚,完成一個完整的消息隊(duì)列的過程。

@Component
public class ManagerSynDataEngine extends Thread {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private ObjectMapper objectMapper;

    @Autowired
    private MNSClient mnsClient;

    @Autowired
    @Qualifier("managerSynDataTreadPoolTaskExecutor")
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

    @Override
    public void run() {
        try {
            MessageReceiver receiver = new MessageReceiver(mnsClient, queueName);
            while (true) {
                Message message = receiver.receiveMessage();

                String messageBody = message.getMessageBody();

                logger.info("隊(duì)列中的消息: " + messageBody);
                // 當(dāng)線程池內(nèi)的線程全部繁忙時(shí)蛛蒙,暫停向線程池派發(fā)任務(wù)糙箍,等待線程釋放
                while (threadPoolTaskExecutor.getActiveCount() >= threadPoolTaskExecutor.getMaxPoolSize()) {
                    logger.warn("ManagerSynDataEngine thread busy!");
                    Thread.sleep(1000);
                }

                threadPoolTaskExecutor.execute(() -> {
                    try {
                        SynModel synModel = objectMapper.readValue(messageBody, SynModel.class);
                        Class clazz = QueueAction.valueOf(synModel.getAction()).getClazz();
                        Object object = objectMapper.convertValue(synModel.getResult(), clazz);
                        operate(synModel, object);

                    } catch (Exception e) {
                        logger.error("寫入數(shù)據(jù)庫出錯。" + "隊(duì)列名:;" + "消息詳情:" + messageBody, e);
                    } finally {
                        /*刪除隊(duì)列元素*/
                        mnsClient.getQueueRef(queueName).deleteMessage(message.getReceiptHandle());
                    }
                });
            }
        } catch (Exception e) {
            logger.error("解析對象錯誤", e);
        }
    }
/**
* 業(yè)務(wù)代碼
**/
private void operate(SynModel synModel, Object object) throws Exception {
        /*業(yè)務(wù)代碼*/
        if (QueueAction.CreateEnterpriseWhite.name().equals(synModel.getAction())) {
            enterpriseWhiteService.insert((EnterpriseWhite) object);
        } else if (QueueAction.UpdateEnterpriseWhite.name().equals(synModel.getAction())) {
            enterpriseWhiteService.updateById((EnterpriseWhite) object);
        } else if (QueueAction.DeleteEnterpriseWhite.name().equals(synModel.getAction())) {
            enterpriseWhiteService.deleteByEnterpriseId(((EnterpriseWhite) object).getEnterpriseId());
        }
    }
}

附錄

長輪詢模式

MNS提供了LongPolling類型的ReceiveMessage的方法牵祟,只需要在ReceiveMessage的時(shí)候把WaitSecond設(shè)為一個1-30之間的數(shù)就可以了深夯。使用LongPolling可以讓Request一直掛在Server上,等到有Message的時(shí)候才返回诺苹,在保證了第一時(shí)間收到消息的同時(shí)也避免用戶發(fā)送大量無效Request厂榛。LongPolling也是MNS的推薦用法聂喇。
LongPolling是需要掛HTTP層的長連接在Server上,而對于Server來說,HTTP層的長連接的資源是比較有限的证芭。為了避免受到一些惡意攻擊毫目,所以MNS對單用戶的LongPolling連接數(shù)是有限制的帝际。

這里使用了阿里的推薦的最佳實(shí)踐長輪詢模式挠轴,防止當(dāng)隊(duì)列中沒有數(shù)據(jù)時(shí)候,大量線程去請求的問題翩肌,減少不必要的調(diào)用量模暗,節(jié)省成本。

/**
 * 隊(duì)列消息接收
 *
 * @author huwk
 * @date 2018/12/4
 */
public class MessageReceiver {

    private Logger logger = LoggerFactory.getLogger(getClass());
    public static final int WAIT_SECONDS = 30;

    protected static final Map<String, Object> sLockObjMap = new HashMap<String, Object>();
    protected static Map<String, Boolean> sPollingMap = new ConcurrentHashMap<String, Boolean>();

    protected Object lockObj;
    protected String queueName;
    protected CloudQueue cloudQueue;

    public MessageReceiver(MNSClient mnsClient, String queue) {
        cloudQueue = mnsClient.getQueueRef(queue);
        cloudQueue.popMessage();
        queueName = queue;

        synchronized (sLockObjMap) {
            lockObj = sLockObjMap.get(queueName);
            if (lockObj == null) {
                lockObj = new Object();
                sLockObjMap.put(queueName, lockObj);
            }
        }
    }

    public boolean setPolling() {
        synchronized (lockObj) {
            Boolean ret = sPollingMap.get(queueName);
            if (ret == null || !ret) {
                sPollingMap.put(queueName, true);
                return true;
            }
            return false;
        }
    }

    public void clearPolling() {
        synchronized (lockObj) {
            sPollingMap.put(queueName, false);
            lockObj.notifyAll();
            logger.info("喚醒所有線程開始工作!");
        }
    }

    public Message receiveMessage() {
        boolean polling = false;
        while (true) {
            synchronized (lockObj) {
                Boolean p = sPollingMap.get(queueName);
                if (p != null && p) {
                    try {
                        logger.info(" 線程睡眠!");
                        polling = false;
                        lockObj.wait();
                    } catch (InterruptedException e) {
                        logger.error("MessageReceiver 中斷! 隊(duì)列名為 " + queueName);
                        return null;
                    }
                }
            }

            try {
                Message message = null;
                if (!polling) {
                    message = cloudQueue.popMessage();
                    if (message == null) {
                        polling = true;
                        continue;
                    }
                } else {
                    if (setPolling()) {
                        logger.info("線程" + " Polling!");
                    } else {
                        continue;
                    }
                    do {
                        logger.info("線程" + " 保持 Polling!");
                        try {
                            message = cloudQueue.popMessage(WAIT_SECONDS);
                        } catch (Exception e) {
                            logger.error("線程異常 polling popMessage: " + e);
                        }
                    } while (message == null);
                    clearPolling();
                }
                return message;
            } catch (Exception e) {
                // it could be network exception
                logger.error("popMessage時(shí)發(fā)生異常: " + e);
            }
        }
    }
}

當(dāng)隊(duì)列為空的時(shí)候念祭,只會保持一個線程去請求兑宇,并且保持長連接為30秒,如果30秒內(nèi)有數(shù)據(jù)后粱坤,喚醒所有的線程去消費(fèi)顾孽,一旦發(fā)現(xiàn)隊(duì)列為空祝钢,繼續(xù)讓所有線程等待,只保留一個線程若厚。

至此拦英,一個簡單的消息服務(wù)實(shí)戰(zhàn)結(jié)束,后續(xù)還會有更多的高級應(yīng)用测秸,一步步等待去開辟疤估。

不會寫文章的程序員不是好的吉他手o( ̄︶ ̄)o

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市霎冯,隨后出現(xiàn)的幾起案子铃拇,更是在濱河造成了極大的恐慌,老刑警劉巖沈撞,帶你破解...
    沈念sama閱讀 217,826評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件慷荔,死亡現(xiàn)場離奇詭異,居然都是意外死亡缠俺,警方通過查閱死者的電腦和手機(jī)显晶,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,968評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來壹士,“玉大人磷雇,你說我怎么就攤上這事□锞龋” “怎么了唯笙?”我有些...
    開封第一講書人閱讀 164,234評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長盒使。 經(jīng)常有香客問我崩掘,道長,這世上最難降的妖魔是什么少办? 我笑而不...
    開封第一講書人閱讀 58,562評論 1 293
  • 正文 為了忘掉前任苞慢,我火速辦了婚禮,結(jié)果婚禮上凡泣,老公的妹妹穿的比我還像新娘。我一直安慰自己皮假,他們只是感情好鞋拟,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,611評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著惹资,像睡著了一般贺纲。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上褪测,一...
    開封第一講書人閱讀 51,482評論 1 302
  • 那天猴誊,我揣著相機(jī)與錄音潦刃,去河邊找鬼。 笑死懈叹,一個胖子當(dāng)著我的面吹牛乖杠,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播澄成,決...
    沈念sama閱讀 40,271評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼胧洒,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了墨状?” 一聲冷哼從身側(cè)響起卫漫,我...
    開封第一講書人閱讀 39,166評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎肾砂,沒想到半個月后列赎,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,608評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡镐确,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,814評論 3 336
  • 正文 我和宋清朗相戀三年包吝,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片辫塌。...
    茶點(diǎn)故事閱讀 39,926評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡漏策,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出臼氨,到底是詐尸還是另有隱情掺喻,我是刑警寧澤,帶...
    沈念sama閱讀 35,644評論 5 346
  • 正文 年R本政府宣布储矩,位于F島的核電站感耙,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏持隧。R本人自食惡果不足惜即硼,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,249評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望屡拨。 院中可真熱鬧只酥,春花似錦、人聲如沸呀狼。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,866評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽哥艇。三九已至绝编,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背十饥。 一陣腳步聲響...
    開封第一講書人閱讀 32,991評論 1 269
  • 我被黑心中介騙來泰國打工窟勃, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人逗堵。 一個月前我還...
    沈念sama閱讀 48,063評論 3 370
  • 正文 我出身青樓秉氧,卻偏偏與公主長得像,于是被迫代替她去往敵國和親砸捏。 傳聞我的和親對象是個殘疾皇子谬运,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,871評論 2 354

推薦閱讀更多精彩內(nèi)容