已經(jīng)在群暉的 Docker 組件中,安裝了 RabbitMQ(含 Delay 插件)撬腾,安裝方法見這里奋姿。
在 .NET Core 中脉课,使用 RabbitMQ 的文章已經(jīng)很多了立哑,這里不再熬述品腹。
本文主要是記錄使用 EasyNetQ 建立 RabbitMQ 的延時(shí)隊(duì)列的經(jīng)歷馒索。
.NET Core 訪問 RabbitMQ 的庫(kù)很多(官方推薦的列表)
一般都使用 RabbitMQ .NET Client
在官方的推薦組件中莹妒,還有一個(gè):EasyNetQ,看了下 Nuget 下載量和 Github 的情況绰上,貌似很不錯(cuò)旨怠,就嘗試用了。(EasyNetQ 官網(wǎng))
注:還有一個(gè)組件蜈块,NServiceBus鉴腻,也是很不錯(cuò)的,不過百揭,該組件已經(jīng)商業(yè)了爽哎,要收費(fèi),免費(fèi)版有訪問量限制器一,就放棄了课锌。(NServiceBus 官網(wǎng))
特別說明:EasyNetQ 的設(shè)計(jì)初衷,是共享業(yè)務(wù)實(shí)體類祈秕,如果不想共享業(yè)務(wù)實(shí)體類渺贤,可考慮在發(fā)送消息時(shí),將業(yè)務(wù)實(shí)體序列化成字符串请毛,再發(fā)送(不過志鞍,該方法也不是全部場(chǎng)景都可行,如果跨平臺(tái)了方仿,也會(huì)報(bào)錯(cuò))固棚。
額外說明:
- 要完美解決上述問題街州,應(yīng)該是要繼承
ITypeNameSerializer
接口,實(shí)現(xiàn)相關(guān)方法玻孟,然后在創(chuàng)建 Bus 時(shí)唆缴,注冊(cè)進(jìn)去,應(yīng)該可以解決黍翎,不過由于目前還沒有細(xì)看面徽,所以,暫時(shí)不能確定是否能解決匣掸√宋桑可以看看這些資料:資料一(中文)、資料二(中文)碰酝、資料三(英文)霎匈、資料四(英文)。 - 如果不想用自帶的 Newtonsoft.Json送爸,甚至铛嘱,不想用 Json 作為消息傳輸?shù)母袷交绞剑勺孕欣^承
EasyNetQ.ISerializer
接口袭厂,并實(shí)現(xiàn)里面的方法墨吓,然后在創(chuàng)建 Bus 時(shí),注冊(cè)進(jìn)去即可纹磺,參考這里(英文)帖烘。
關(guān)于 RabbitMQ 的延時(shí)隊(duì)列方案,總結(jié)起來橄杨,主要有三種秘症,各方案的優(yōu)劣,可自行百度式矫。
總之乡摹,推薦:delayed_message_exchange 插件,該方案在大多數(shù)云服務(wù)商的消息隊(duì)列產(chǎn)品中衷佃,均支持趟卸,兼容性好蹄葱。
如果購(gòu)買了阿里云的 RabbitMQ 版消息隊(duì)列氏义,除了上述三種方案,還提供了阿里云定制的方案图云,如下圖所示:
阿里云的延時(shí)隊(duì)列方案介紹惯悠,參考這里。
正式開始(平臺(tái):.NET Core 5.0)
EasyNetQ 的連接字符串竣况,如下所示:
host=<IP 或域名>:<端口克婶,默認(rèn)“5672”>;virtualhost=<vHost名稱,默認(rèn)“/”>;username=<連接用戶名>;password=<連接密碼>
連接字符串各參數(shù)說明,參考這里情萤。
- 建立兩個(gè) Console 項(xiàng)目:
- MsgProducer:消息生產(chǎn)者鸭蛙,用于發(fā)送消息到延時(shí)隊(duì)列中
- MsgConsumer:消息消費(fèi)者,用于消費(fèi)延時(shí)隊(duì)列中的消息
- MsgProducer 項(xiàng)目
主要邏輯:
- 創(chuàng)建延時(shí) exchange
- 創(chuàng)建普通 exchange筋岛,并和延時(shí) exchange 綁定
- 數(shù)據(jù)發(fā)送到延時(shí) exchange 中
MsgProducer 主要代碼如下(所有方法娶视,可使用異步 Async):
using EasyNetQ;
using EasyNetQ.Topology;
static void Main(string[] args)
{
// 通常建議單例該連接對(duì)象(比如在 StartUp.cs 中單例注入)
using var bus = RabbitHutch.CreateBus("<EasyNetQ 連接字符串>");
// 建立延時(shí) exchange
var exDelay = bus.Advanced.ExchangeDeclare("<延時(shí) exchange 名稱>", cfg => cfg.AsDelayedExchange(ExchangeType.Direct));
// 建立普通 exchange
var exNormal = bus.Advanced.ExchangeDeclare("<普通 exchange 名稱>", ExchangeType.Direct);
// 綁定兩個(gè) exchange,設(shè)置好 RoutingKey
bus.Advanced.Bind(exDelay, exNormal, "delay");
// 創(chuàng)建普通隊(duì)列睁宰,并和普通 exchange 綁定肪获,設(shè)置好 RoutingKey
var qNormal = bus.Advanced.QueueDeclare("<普通隊(duì)列名稱>");
bus.Advanced.Bind(exNormal, qNormal, "delay");
// 以下是測(cè)試發(fā)送消息的代碼
string msg;
while ((msg = Console.ReadLine()) != "exit")
{
// 針對(duì)單個(gè)消息,設(shè)置延遲時(shí)間(毫秒)
var msgHeaders = new MessageProperties();
msgHeaders.Headers.Add("x-delay", new Random().Next(1000, 5000));
// 發(fā)送消息
bus.Advanced.Publish(exDelay, "delay", false, new Message<string>(msg, msgHeaders));
Console.WriteLine($"{DateTimeOffset.Now.ToUnixTimeMilliseconds()} 發(fā)送成功 {msg}");
}
}
在 EasyNetQ 的官方文檔中柒傻,有關(guān)于 Delayed Message 插件的推薦使用說明孝赫,本人實(shí)在受不了 EasyNetQ 自動(dòng)建立的 exchange 和 queue,就放棄了官方的方法红符,自己使用了 Advanced 模式青柄。
關(guān)于 EasyNetQ 官方文檔推薦的使用方式,參考這里(英文)预侯。
- MsgConsumer 項(xiàng)目
主要邏輯
- 從普通隊(duì)列中讀取消息并處理
MsgConsumer 主要代碼如下(所有方法刹前,可使用異步 Async):
using EasyNetQ;
static void Main(string[] args)
{
// 通常建議單例該連接對(duì)象(比如在 StartUp.cs 中單例注入)
using var bus = RabbitHutch.CreateBus(<EasyNetQ 連接字符串>);
// 創(chuàng)建普通隊(duì)列,設(shè)置好 RoutingKey
// 和第二步 MsgProducer 項(xiàng)目中的普通隊(duì)列名稱一致4粕!@怼!
var qNormal = bus.Advanced.QueueDeclare("<MsgProducer 的普通隊(duì)列名稱>");
bus.Advanced.Consume<string>(qNormal, MsgHandle); // MsgHandle 也可以用匿名方法
Console.WriteLine("等待中……(回車退出)");
Console.ReadLine();
}
static void MsgHandle(IMessage<string> msg, MessageReceivedInfo msgInfo)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine($"{DateTimeOffset.Now.ToUnixTimeMilliseconds()} 收到消息:{msg.Body}");
Console.ResetColor();
}
至此校坑,即可正常收發(fā)延時(shí)消息了拣技。