生產(chǎn)者
class RabbitMq {
constructor(options) {
this.ex = 'XXX'
this.exType = 'direct'
this.durable = true
this.routeKey = 'XX'
this.autoDelete = true
this.q = 'hello'
}
async send() {
const conn = await amqp.connect(url)
const msg = JSON.stringify({ a: "aa" })
try {
// const ch = await conn.createChannel()
// 確認(rèn)消息發(fā)送 ok 猜測(cè)是開啟 confirm 機(jī)制只厘,對(duì)應(yīng)的監(jiān)聽函數(shù)是什么呢?
const ch = await conn.createConfirmChannel()
const res = await ch.assertExchange(this.ex, this.exType, { durable: this.durable })
var flag = 0
while(flag < 4) {
// 實(shí)現(xiàn)消息持久化, 要exchange,queue,msg 三者同時(shí)持久化
/*
如果exchange根據(jù)自身類型和消息routeKey無法找到一個(gè)符合條件的queue稀拐,
那么會(huì)調(diào)用basic.return方法將消息返回給生產(chǎn)者(Basic.Return + Content-Header + Content-Body)总珠;
當(dāng)mandatory設(shè)置為false時(shí)穆律,出現(xiàn)上述情形broker會(huì)直接將消息扔掉
*/
ch.publish(this.ex, this.routeKey, Buffer.from(msg), {
persistent: true, // 消息持久化
mandatory: true
})
// 確認(rèn)消息已經(jīng)入隊(duì), 返回錯(cuò)誤 是啥樣? 錯(cuò)誤怎么處理?直接close?
const res2 = await ch.waitForConfirms()
console.log('==res2==', res2)
console.log(" [x] Sent '%s'", msg);
await timeout(2000)
flag++
}
ch.close()
} catch (e) {
console.log('==e==', e)
ch.close()
}
}
}
const rabbit = new RabbitMq({})
rabbit.send()
消費(fèi)端
class RabbitMq {
constructor(options) {
this.ex = 'XXX'
this.exType = 'direct'
this.durable = true
this.routeKey = 'XX'
this.autoDelete = true
this.q = 'hello'
}
async send() {
const conn = await amqp.connect(url)
try {
const ch = await conn.createChannel()
// 確認(rèn)消息發(fā)送 ok
const res = await ch.assertExchange(this.ex, this.exType, { durable: this.durable })
// 此處 q 置空伦忠,用的是rabbitmq自動(dòng)生成的隊(duì)列名, exclusive 是生成排他隊(duì)列, 連接斷開后就會(huì)自動(dòng)刪除
const q = await ch.assertQueue('', { exclusive: false })
console.log('==q=', q)
// 隊(duì)列綁定 exchange
ch.bindQueue(q.queue, this.ex, this.routeKey)
ch.consume(q.queue, msg => {
console.log('收到消息: ', msg)
// 發(fā)送確認(rèn)消息
ch.ack(msg)
}, { noAck: false })
// ch.close()
} catch (e) {
console.log('==e==', e)
ch.close()
}
}
}
const rabbit = new RabbitMq({})
rabbit.send()