概述
阿里云消息服務(wù)(Message Service藤滥,原 MQS)是阿里云商用的消息中間件服務(wù),基于阿里的飛天系統(tǒng)颗味,具有大規(guī)模超陆、高可靠、高可用以及較強(qiáng)的消息堆積能力浦马。由于項(xiàng)目架構(gòu)中时呀,對數(shù)據(jù)的及時(shí)性沒有過高的要求,故采用消息服務(wù)來解耦接口的數(shù)據(jù)同步功能晶默。
場景描述
在項(xiàng)目中有個前端管理界面稱為manager模塊谨娜,主要功能是配置管理一些配置數(shù)據(jù),支持新增磺陡、修改趴梢、刪除以及批量新增、批量修改币他、批量刪除功能坞靶,甚至批量從excel文件中導(dǎo)入的功能。對數(shù)據(jù)操作的同時(shí)蝴悉,需要將數(shù)據(jù)同步到各個agent的模塊中彰阴,agent模塊需要對這些數(shù)據(jù)進(jìn)行入庫、入redis的操作拍冠,系統(tǒng)架構(gòu)圖如下尿这。
在不使用消息服務(wù)前,通過接口的方式庆杜,在manager模塊增刪改的時(shí)候射众,將修改的數(shù)據(jù)實(shí)時(shí)同步到各個agent中。這會出現(xiàn)一個問題晃财,如果數(shù)據(jù)量較小的話叨橱,同步速度還是可以理解,如果同步量過大的話断盛,會出現(xiàn)很明顯的卡頓情況雏逾。可能很多朋友會問郑临,為何不做異步尼栖博? 異步的確也是一種解決方法,不過當(dāng)某一條數(shù)據(jù)同步失敗的時(shí)候厢洞,我們不好獲取這些異常仇让,并且無法追蹤這些異常并進(jìn)行回滾操作典奉,因?yàn)樾枰3謒anager模塊與agent模塊同步回滾,只要agent模塊失敗丧叽,manager模塊必須要回滾卫玖。由于對這些數(shù)據(jù)的實(shí)時(shí)性沒有太高的要求,為此引入消息隊(duì)列踊淳。
架構(gòu)設(shè)計(jì)
阿里云消息服務(wù)MNS 已經(jīng)提供隊(duì)列(queue)和主題(topic)兩種模型假瞬。其中隊(duì)列提供的是一對多的共享消息消費(fèi)模型,采用客戶端主動拉扔爻ⅰ(Pull)模式脱茉;主題模型提供一對多的廣播消息消費(fèi)模型,并且采用服務(wù)端主動推送(Push)模式垄开。上面兩種模型基本能滿足我們大多數(shù)應(yīng)用場景琴许。
推送模式的好處是即時(shí)性能比較好,但是需要暴露客戶端地址來接收服務(wù)端的消息推送溉躲。有些情況下榜田,比如企業(yè)內(nèi)網(wǎng),我們無法暴露推送地址锻梳,希望改用拉燃(Pull)的方式。雖然MNS不直接提供這種消費(fèi)模型疑枯,但是我們可以結(jié)合主題和隊(duì)列來實(shí)現(xiàn)一對多的拉取消息消費(fèi)模型邦鲫。
阿里的官方文檔中,其廣播拉取消息模型最佳實(shí)踐正好符合項(xiàng)目的架構(gòu)模型神汹,故采用其模型。下圖是廣播拉取消息模型古今。
根據(jù)該模型的改造屁魏,改造了符合當(dāng)前項(xiàng)目場景的業(yè)務(wù)模型。
接口說明
Java SDK(1.1.5)中的CloudPullTopic 默認(rèn)支持上述解決方案捉腥。其中MNSClient 提供下面兩個接口來快速創(chuàng)建CloudPullTopic:
public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector<String> queueNameList, boolean needCreateQueue, QueueMeta queueMetaTemplate)
public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector<String> queueNameList)
其中氓拼,TopicMeta 是創(chuàng)建topic的meta 設(shè)置, queueNameList里指定topic消息推送的隊(duì)列名列表抵碟;needCreateQueue表明queueNameList是否需要創(chuàng)建桃漾;queueMetaTemplate是創(chuàng)建queue需要的queue meta 參數(shù)設(shè)置;
生產(chǎn)者manager代碼
作為數(shù)據(jù)生產(chǎn)者的manager端拟逮,bean配置文件中配置如下代碼的實(shí)例bean撬统,目的為不斷的插入數(shù)據(jù)到隊(duì)列為消費(fèi)者各個agent提供數(shù)據(jù)源。
@Value("${aliyun.mns.accessKeyId}")
private String accessKeyId;
@Value("${aliyun.mns.accessKeySecret}")
private String accessKeySecret;
@Value("${aliyun.mns.accountEndpoint}")
private String accountEndpoint;
@Value("${aliyun.mns.queueNameProducers}")
private String queueNameProducers;
@Value("${aliyun.mns.topicNameProducer}")
private String topicName;
@Bean
public MNSClient mnsClient() {
CloudAccount account = new CloudAccount(accessKeyId, accessKeySecret, accountEndpoint);
MNSClient client = account.getMNSClient();
return client;
}
@Bean
public CloudPullTopic cloudPullTopic(@Qualifier("mnsClient")MNSClient mnsClient){
String[] queueArr = queueNameProducers.split(",");
List list = Arrays.asList(queueArr);
Vector<String> queueNameList = new Vector<>();
queueNameList.addAll(list);
for (String queueName : queueNameList) {
if (!mnsClient.getQueueRef(queueName).isQueueExist()) {
QueueMeta qMeta = new QueueMeta();
qMeta.setQueueName(queueName);
qMeta.setPollingWaitSeconds(WAIT_SECONDS);
mnsClient.createQueue(qMeta);
}
}
TopicMeta topicMeta = new TopicMeta();
topicMeta.setTopicName(topicName);
return mnsClient.createPullTopic(topicMeta,queueNameList,false,null);
}
如上代碼中敦迄,對當(dāng)前隊(duì)列做了一個為空判斷恋追,如果第一個隊(duì)列不存在凭迹,采用新增隊(duì)列的方式:createPullTopic(topicMeta,queueNameList,true,queueMetaTemplate);
其中第三個參數(shù)true代表當(dāng)前是重新創(chuàng)建隊(duì)列的方式。
同步到隊(duì)列的統(tǒng)一數(shù)據(jù)格式:
/**
* 結(jié)果類
*
* @author huwk
* @date 2018/12/5
*/
public class SynModel {
/**
* 操作方法
*/
private String action;
/**
* 返回對象
*/
private Object result;
public SynModel(){}
public SynModel(String action, Object result) {
this.action = action;
this.result = result;
}
public String getAction() {
return action;
}
public void setAction(String action) {
this.action = action;
}
public Object getResult() {
return result;
}
public void setResult(Object result) {
this.result = result;
}
}
因?yàn)闃I(yè)務(wù)需要苦囱,考慮到隊(duì)列中會推入不同的格式的數(shù)據(jù)嗅绸,故統(tǒng)一定義一個實(shí)體類,統(tǒng)一下數(shù)據(jù)結(jié)構(gòu)撕彤,action表示具體操作的類型鱼鸠,result表示放入的實(shí)體對象。
如下是發(fā)送消息代碼:
/* 同步隊(duì)列 */
try {
/*封裝對象*/
EnterpriseWhite white = new EnterpriseWhite();
white.setStatus(status);
white.setEnterpriseId(enterpriseId);
white.setComment(comment);
white.setCreateTime(new Date());
SynModel synModel = new SynModel(QueueAction.CreateEnterpriseWhite.name(),white);
/*發(fā)送消息*/
publishObjectMessage(synModel);
} catch (Exception e) {
logger.error("數(shù)據(jù)接口異常", e);
}
使用了一個QueueAction枚舉類型羹铅,定義了各種操作某種實(shí)體的類型蚀狰,這樣下次消費(fèi)者去取數(shù)據(jù)的時(shí)候,根據(jù)其類型可以給指定的消費(fèi)者消費(fèi)睦裳。
抽取的推送數(shù)據(jù)給隊(duì)列的公共方法
/**
* 抽取解析對象以及發(fā)送消息方法
*
* @param synModel
* @throws JsonProcessingException
*/
private void publishObjectMessage(SynModel synModel) throws JsonProcessingException {
/*轉(zhuǎn)換為json字符串*/
String objStr = objectMapper.writeValueAsString(synModel);
/*主題消息封裝*/
TopicMessage topicMessage = new Base64TopicMessage();
topicMessage.setBaseMessageBody(objStr);
/*發(fā)送主題消息*/
cloudPullTopic.publishMessage(topicMessage);
}
消費(fèi)者agent代碼
定義一個線程類造锅,讓其在項(xiàng)目啟動的時(shí)候就開啟線程。agent模塊根據(jù)FIFO廉邑,根據(jù)不同的SynModel類型去從消息隊(duì)列中取消息并消費(fèi)掉哥蔚,完成一個完整的消息隊(duì)列的過程。
@Component
public class ManagerSynDataEngine extends Thread {
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private ObjectMapper objectMapper;
@Autowired
private MNSClient mnsClient;
@Autowired
@Qualifier("managerSynDataTreadPoolTaskExecutor")
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Override
public void run() {
try {
MessageReceiver receiver = new MessageReceiver(mnsClient, queueName);
while (true) {
Message message = receiver.receiveMessage();
String messageBody = message.getMessageBody();
logger.info("隊(duì)列中的消息: " + messageBody);
// 當(dāng)線程池內(nèi)的線程全部繁忙時(shí)蛛蒙,暫停向線程池派發(fā)任務(wù)糙箍,等待線程釋放
while (threadPoolTaskExecutor.getActiveCount() >= threadPoolTaskExecutor.getMaxPoolSize()) {
logger.warn("ManagerSynDataEngine thread busy!");
Thread.sleep(1000);
}
threadPoolTaskExecutor.execute(() -> {
try {
SynModel synModel = objectMapper.readValue(messageBody, SynModel.class);
Class clazz = QueueAction.valueOf(synModel.getAction()).getClazz();
Object object = objectMapper.convertValue(synModel.getResult(), clazz);
operate(synModel, object);
} catch (Exception e) {
logger.error("寫入數(shù)據(jù)庫出錯。" + "隊(duì)列名:;" + "消息詳情:" + messageBody, e);
} finally {
/*刪除隊(duì)列元素*/
mnsClient.getQueueRef(queueName).deleteMessage(message.getReceiptHandle());
}
});
}
} catch (Exception e) {
logger.error("解析對象錯誤", e);
}
}
/**
* 業(yè)務(wù)代碼
**/
private void operate(SynModel synModel, Object object) throws Exception {
/*業(yè)務(wù)代碼*/
if (QueueAction.CreateEnterpriseWhite.name().equals(synModel.getAction())) {
enterpriseWhiteService.insert((EnterpriseWhite) object);
} else if (QueueAction.UpdateEnterpriseWhite.name().equals(synModel.getAction())) {
enterpriseWhiteService.updateById((EnterpriseWhite) object);
} else if (QueueAction.DeleteEnterpriseWhite.name().equals(synModel.getAction())) {
enterpriseWhiteService.deleteByEnterpriseId(((EnterpriseWhite) object).getEnterpriseId());
}
}
}
附錄
長輪詢模式
MNS提供了LongPolling類型的ReceiveMessage的方法牵祟,只需要在ReceiveMessage的時(shí)候把WaitSecond設(shè)為一個1-30之間的數(shù)就可以了深夯。使用LongPolling可以讓Request一直掛在Server上,等到有Message的時(shí)候才返回诺苹,在保證了第一時(shí)間收到消息的同時(shí)也避免用戶發(fā)送大量無效Request厂榛。LongPolling也是MNS的推薦用法聂喇。
LongPolling是需要掛HTTP層的長連接在Server上,而對于Server來說,HTTP層的長連接的資源是比較有限的证芭。為了避免受到一些惡意攻擊毫目,所以MNS對單用戶的LongPolling連接數(shù)是有限制的帝际。
這里使用了阿里的推薦的最佳實(shí)踐長輪詢模式挠轴,防止當(dāng)隊(duì)列中沒有數(shù)據(jù)時(shí)候,大量線程去請求的問題翩肌,減少不必要的調(diào)用量模暗,節(jié)省成本。
/**
* 隊(duì)列消息接收
*
* @author huwk
* @date 2018/12/4
*/
public class MessageReceiver {
private Logger logger = LoggerFactory.getLogger(getClass());
public static final int WAIT_SECONDS = 30;
protected static final Map<String, Object> sLockObjMap = new HashMap<String, Object>();
protected static Map<String, Boolean> sPollingMap = new ConcurrentHashMap<String, Boolean>();
protected Object lockObj;
protected String queueName;
protected CloudQueue cloudQueue;
public MessageReceiver(MNSClient mnsClient, String queue) {
cloudQueue = mnsClient.getQueueRef(queue);
cloudQueue.popMessage();
queueName = queue;
synchronized (sLockObjMap) {
lockObj = sLockObjMap.get(queueName);
if (lockObj == null) {
lockObj = new Object();
sLockObjMap.put(queueName, lockObj);
}
}
}
public boolean setPolling() {
synchronized (lockObj) {
Boolean ret = sPollingMap.get(queueName);
if (ret == null || !ret) {
sPollingMap.put(queueName, true);
return true;
}
return false;
}
}
public void clearPolling() {
synchronized (lockObj) {
sPollingMap.put(queueName, false);
lockObj.notifyAll();
logger.info("喚醒所有線程開始工作!");
}
}
public Message receiveMessage() {
boolean polling = false;
while (true) {
synchronized (lockObj) {
Boolean p = sPollingMap.get(queueName);
if (p != null && p) {
try {
logger.info(" 線程睡眠!");
polling = false;
lockObj.wait();
} catch (InterruptedException e) {
logger.error("MessageReceiver 中斷! 隊(duì)列名為 " + queueName);
return null;
}
}
}
try {
Message message = null;
if (!polling) {
message = cloudQueue.popMessage();
if (message == null) {
polling = true;
continue;
}
} else {
if (setPolling()) {
logger.info("線程" + " Polling!");
} else {
continue;
}
do {
logger.info("線程" + " 保持 Polling!");
try {
message = cloudQueue.popMessage(WAIT_SECONDS);
} catch (Exception e) {
logger.error("線程異常 polling popMessage: " + e);
}
} while (message == null);
clearPolling();
}
return message;
} catch (Exception e) {
// it could be network exception
logger.error("popMessage時(shí)發(fā)生異常: " + e);
}
}
}
}
當(dāng)隊(duì)列為空的時(shí)候念祭,只會保持一個線程去請求兑宇,并且保持長連接為30秒,如果30秒內(nèi)有數(shù)據(jù)后粱坤,喚醒所有的線程去消費(fèi)顾孽,一旦發(fā)現(xiàn)隊(duì)列為空祝钢,繼續(xù)讓所有線程等待,只保留一個線程若厚。
至此拦英,一個簡單的消息服務(wù)實(shí)戰(zhàn)結(jié)束,后續(xù)還會有更多的高級應(yīng)用测秸,一步步等待去開辟疤估。
不會寫文章的程序員不是好的吉他手o( ̄︶ ̄)o