1. 生產(chǎn)端服務(wù)類定義
import { Provide, Scope, ScopeEnum, Init, Autoload, Destroy, Config } from '@midwayjs/core';
import * as amqp from 'amqp-connection-manager'
@Autoload()
@Provide()
@Scope(ScopeEnum.Singleton) // Singleton 單例,全局唯一(進(jìn)程級別)
export class RabbitmqService {
private connection: amqp.AmqpConnectionManager;
private channelWrapper;
@Config("rabbitmq")
rabbitmqConfig;
@Init()
async connect() {
// 創(chuàng)建連接壤圃,你可以把配置放在 Config 中抽米,然后注入進(jìn)來
this.connection = await amqp.connect(this.rabbitmqConfig.url);
// 創(chuàng)建 channel
this.channelWrapper = this.connection.createChannel({
json: true,
setup: function (channel) { //在使用通道之前準(zhǔn)備好必要的資源
return Promise.all([
// 綁定隊列
// channel.assertQueue("wx.template.message", { durable: true }),
// channel.assertExchange('logs', 'fanout', { durable: false }),
// channel.assertExchange('direct_logs', 'direct', { durable: false }),
// // 創(chuàng)建延遲隊列和交換機(jī)
// channel.assertExchange("delayExchange", 'direct', { durable: true }),
// channel.assertQueue('delayQueue', {
// durable: true,
// deadLetterExchange: 'dead_topic', // 這里為空,因為不需要死信隊列
// deadLetterRoutingKey: 'dead_topic_key',
// messageTtl: 15000 //創(chuàng)建一個延遲15秒的隊列
// }),
// channel.bindQueue('delayQueue', "delayExchange", "delay")
]);
}
});
}
// 發(fā)送消息到隊列
public async sendToQueue(queueName: string, data: any) {
return this.channelWrapper.sendToQueue(queueName, data, { persistent: true });
}
// 發(fā)送廣播消息到交換機(jī)
public async sendToExchangeFanoutMessage(ex: string, data: any) {
return this.channelWrapper.publish(ex, '', data, { persistent: true });
}
// 發(fā)送普通消息到交換機(jī)
public async sendToExchangeMessage(ex: string, routerKey, data: any) {
return this.channelWrapper.publish(ex, routerKey, data, { persistent: true });
}
// 發(fā)送優(yōu)先級消息到交換機(jī)
public async sendToExchangePriorityMessage(ex: string, routerKey, data: any) {
return this.channelWrapper.publish(ex, routerKey, data, { persistent: true, priority: 5 });
}
/* 延時隊列掸刊,異步發(fā)布確認(rèn) */
public async sendToExchangeDelayMessage(ex: string, routerKey, data: any, delay: number) {
return this.channelWrapper.publish(ex, routerKey, data, { persistent: true, headers: { 'x-delay': delay }, }, (err, ok) => {
console.log("err, ok", err, ok);
});
}
@Destroy()
async close() {
await this.channelWrapper.close();
await this.connection.close();
}
}
// interface Publish {
// expiration?: string | number | undefined;
// userId?: string | undefined;
// CC?: string | string[] | undefined;
// mandatory?: boolean | undefined;
// persistent?: boolean | undefined;
// deliveryMode?: boolean | number | undefined;
// BCC?: string | string[] | undefined;
// contentType?: string | undefined;
// contentEncoding?: string | undefined;
// headers?: any;
// priority?: number | undefined;
// correlationId?: string | undefined;
// replyTo?: string | undefined;
// messageId?: string | undefined;
// timestamp?: number | undefined;
// type?: string | undefined;
// appId?: string | undefined;
// }
- 生產(chǎn)端發(fā)送消息
import { Controller, Get, Inject } from '@midwayjs/core';
// import { Context } from '@midwayjs/rabbitmq';
import { RabbitmqService } from '../service/rabbitmq';
@Controller('/')
export class HomeController {
// @Inject()
// ctx: Context;
@Inject()
mqService: RabbitmqService;
@Get('/')
async home(): Promise<string> {
for (let i = 0; i < 100; i++) {
// 直接發(fā)送給隊列,不經(jīng)過交換機(jī)
let result = await this.mqService.sendToQueue("wx.template.message", { name: `litao${i}` })
//發(fā)給廣播交換機(jī)
let result1 = await this.mqService.sendToExchangeFanout("logs", {name: "litao"})
//發(fā)給路由交換機(jī)赢乓,不通的routerkey
await this.mqService.sendToExchangeDirect("direct_logs", "direct_key", {name: `litao${i}`})
await this.mqService.sendToExchangeDirect("direct_logs", "direct_key1", {name: `litao${i}-${i}`})
await this.mqService.sendToExchangeDirect("direct_logs", "direct_key2", {name: `litao${i}-${i}`})
//發(fā)給路由交換機(jī)忧侧,不通的routerkey
await this.mqService.sendToDelayQueue("delayQueue", { name: `litao${i}` }) //死信隊列
await this.mqService.sendToExchangeDelayDirect("delayExchange", "delay", {name: `litao${i}-${i}`}) //插件延時隊列
//優(yōu)先級隊列
if (i % 8 === 0) {
this.mqService.sendToExchangePriorityMessage("priority", 'priority_key', { name: `litao${i}` })
} else {
this.mqService.sendToExchangeMessage("priority", "priority_key", { name: `litao${i}`})
}
// console.log(`------result${i}------`, result, result1, result2, result3);
}
return 'Hello Midwayjs!';
}
}