一. 什么是消息隊(duì)列凤覆?
消息(Message)是指在應(yīng)用間傳送的數(shù)據(jù)链瓦。消息可以非常簡(jiǎn)單,比如只包含文本字符串盯桦,也可以更復(fù)雜慈俯,可能包含嵌入對(duì)象。
消息隊(duì)列(Message Queue)是一種應(yīng)用間的通信方式俺附,消息發(fā)送后可以立即返回肥卡,由消息系統(tǒng)來確保消息的可靠傳遞。消息發(fā)布者只管把消息發(fā)布到 MQ 中而不用管誰來取事镣,消息使用者只管從 MQ 中取消息而不管是誰發(fā)布的步鉴。這樣發(fā)布者和使用者都不用知道對(duì)方的存在。
二. 常用的消息隊(duì)列有哪些璃哟?
RabbitMQ氛琢、RocketMQ、ActiveMQ随闪、Kafka阳似、ZeroMQ、MetaMq铐伴。
甚至現(xiàn)在部分NoSQL也可做消息隊(duì)列撮奏,如Redis。
三. 消息隊(duì)列的使用場(chǎng)景当宴?
-
異步處理
-
應(yīng)用解耦
-
流量削峰
四. 使用案例
上規(guī)模的公司都會(huì)有自己的日志分析系統(tǒng)畜吊,日志系統(tǒng)是怎么實(shí)現(xiàn)的呢?
圖解:用戶在訪問應(yīng)用的時(shí)候户矢,我們要記錄下用戶的操作記錄和系統(tǒng)的異常日志玲献,常規(guī)的做法是將系統(tǒng)產(chǎn)生的日志保存到服務(wù)器磁盤,在服務(wù)器中開啟定時(shí)任務(wù)梯浪,定時(shí)將磁盤的日志信息傳入mq中(生產(chǎn)者)捌年,也定時(shí)將mq中的消息取出并存到相應(yīng)的數(shù)據(jù)庫,如ElasticSearch或Hive中挂洛。
五. 如何安裝RabbitMQ礼预?
上面的案例介紹了MQ的一個(gè)使用場(chǎng)景,我這里是用RabbitMQ舉例虏劲,現(xiàn)實(shí)項(xiàng)目中可能用到的是Kafka逆瑞。
-
首先安裝brew(mac為例)
/usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"復(fù)制代碼
-
安裝RabbitMQ
brew install rabbitmq復(fù)制代碼
-
運(yùn)行RabbitMQ
進(jìn)入到 /usr/local/Cellar/rabbitmq/3.7.7荠藤,執(zhí)行
localhost:3.6.6 lidong$ sbin/rabbitmq-server復(fù)制代碼
-
啟動(dòng)插件
進(jìn)入到 /usr/local/Cellar/rabbitmq/3.7.7/sbin
./rabbitmq-plugins enable rabbitmq_management復(fù)制代碼
-
登陸管理界面
打開瀏覽器輸入:http://localhost:15672,RabbitMQ默認(rèn)15672端口
六. Nodejs操作RabbitMQ
網(wǎng)上可以找到好幾個(gè)相應(yīng)的Node SDK,這里推薦amqplib
- 生產(chǎn)者
/**
* 對(duì)RabbitMQ的封裝
*/
let amqp = require('amqplib');
class RabbitMQ {
constructor() {
this.hosts = [];
this.index = 0;
this.length = this.hosts.length;
this.open = amqp.connect(this.hosts[this.index]);
}
sendQueueMsg(queueName, msg, errCallBack) {
let self = this;
self.open
.then(function (conn) {
return conn.createChannel();
})
.then(function (channel) {
return channel.assertQueue(queueName).then(function (ok) {
return channel.sendToQueue(queueName, new Buffer(msg), {
persistent: true
});
})
.then(function (data) {
if (data) {
errCallBack && errCallBack("success");
channel.close();
}
})
.catch(function () {
setTimeout(() => {
if (channel) {
channel.close();
}
}, 500)
});
})
.catch(function () {
let num = self.index++;
if (num <= self.length - 1) {
self.open = amqp.connect(self.hosts[num]);
} else {
self.index == 0;
}
});
}
}
復(fù)制代碼
2. 消費(fèi)者
/**
* 對(duì)RabbitMQ的封裝
*/
let amqp = require('amqplib');
class RabbitMQ {
constructor() {
this.open = amqp.connect(this.hosts[this.index]);
}
receiveQueueMsg(queueName, receiveCallBack, errCallBack) {
let self = this;
self.open
.then(function (conn) {
return conn.createChannel();
})
.then(function (channel) {
return channel.assertQueue(queueName)
.then(function (ok) {
return channel.consume(queueName, function (msg) {
if (msg !== null) {
let data = msg.content.toString();
channel.ack(msg);
receiveCallBack && receiveCallBack(data);
}
})
.finally(function () {
setTimeout(() => {
if (channel) {
channel.close();
}
}, 500)
});
})
})
.catch(function () {
let num = self.index++;
if (num <= self.length - 1) {
self.open = amqp.connect(self.hosts[num]);
} else {
self.index = 0;
self.open = amqp.connect(self.hosts[0]);
}
});
}復(fù)制代碼
3. 通過生產(chǎn)者向MQ發(fā)送一個(gè)消息兼都,并創(chuàng)建隊(duì)列
let mq = new RabbitMQ();
mq.sendQueueMsg('testQueue', 'my first message', (error) => {
console.log(error)
})復(fù)制代碼
執(zhí)行之后生巡,我們打開管理平臺(tái),發(fā)現(xiàn)RabbbitMQ已經(jīng)接受到了一條消息:
并且RabbbitMQ新增了一個(gè)隊(duì)列testQueue
4. 獲取指定隊(duì)列的消息
let mq = new RabbitMQ();
mq.receiveQueueMsg('testQueue',(msg) =>
{
console.log(msg)
})
// 輸出結(jié)果:my first message復(fù)制代碼
此時(shí)打開RabbitMQ管理平臺(tái)冕臭,消息數(shù)量已經(jīng)變?yōu)?:
綜上:我們簡(jiǎn)單講述了消息隊(duì)列及RabbitMQ相關(guān)的一些知識(shí),以及我們?nèi)绾瓮ㄟ^nodejs來生產(chǎn)與消費(fèi)消息,上面講的比較簡(jiǎn)單摊趾,之后會(huì)發(fā)表更多文章講述消息隊(duì)列集群搭建及容災(zāi)的實(shí)現(xiàn)。
作者:吳空
鏈接:https://juejin.im/post/5b4839b2f265da0f6640142b
來源:掘金
著作權(quán)歸作者所有游两。商業(yè)轉(zhuǎn)載請(qǐng)聯(lián)系作者獲得授權(quán)砾层,非商業(yè)轉(zhuǎn)載請(qǐng)注明出處。