如何在ASP.NET Core中使用Azure Service Bus Queue

原文:USING AZURE SERVICE BUS QUEUES WITH ASP.NET CORE SERVICES
作者:damienbod
譯文:如何在ASP.NET Core中使用Azure Service Bus Queue
地址:https://www.cnblogs.com/lwqlun/p/10760227.html
作者:Lamond Lu
源代碼: https://github.com/lamondlu/AzureServiceBusMessaging

image

本文展示了如何使用Azure Service Bus Queue, 實(shí)現(xiàn)2個(gè)ASP.NET Core Api應(yīng)用之間的消息傳輸排拷。

配置Azure Service Bus Queue

你可以從官網(wǎng)文檔中了解到如何配置一個(gè)Azure Service Bus Queue.

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-create-namespace-portal

這里我們使用Queue或者Topic來(lái)實(shí)現(xiàn)消息傳輸浪读。Queue是一種消息傳輸類(lèi)型,一旦一個(gè)消息被一個(gè)消費(fèi)者接收了流码,該消息就會(huì)從Queue中被移除。

與Queue不同辞做,Topic提供的是一對(duì)多的通訊方式莺掠。

架構(gòu)圖

整個(gè)應(yīng)用的實(shí)現(xiàn)如下:

image
  • Api 1負(fù)責(zé)發(fā)送消息
  • Api 2負(fù)責(zé)監(jiān)聽(tīng)Azure Service Bus,并處理接收到的消息

實(shí)現(xiàn)一個(gè)Service Bus Queue

這里我們首先需要引入** Microsoft.Azure.ServiceBus ** 程序集闯冷。Microsoft.Azure.ServiceBus是Azure Service Bus的客戶(hù)端庫(kù)砂心。針對(duì)Service Bus的連接字符串我們保存在項(xiàng)目的User Secret中。當(dāng)部署項(xiàng)目的時(shí)候蛇耀,我們可以使用Azure Key Valut來(lái)設(shè)置這個(gè)Secret值辩诞。

在Visual Studio中,右鍵點(diǎn)擊API1, API2項(xiàng)目屬性纺涤,選擇Manage User Secrets就可以管理當(dāng)前項(xiàng)目使用的所有私密信息译暂。

為了發(fā)送向Azure Service Bus Queue發(fā)送消息,我們需要?jiǎng)?chuàng)建一個(gè)SendMessage方法洒琢,并接收一個(gè)消息參數(shù)秧秉。這里我們創(chuàng)建了一個(gè)我們自己的消息內(nèi)容類(lèi)型MyPayload, 將當(dāng)前該MyPayload對(duì)象序列化成Json字符串, 添加到一個(gè)Message對(duì)象中衰抑。

using Microsoft.Azure.ServiceBus;
using Microsoft.Extensions.Configuration;
using Newtonsoft.Json;
using System.Text;
using System.Threading.Tasks;
 
namespace ServiceBusMessaging
{
    public class ServiceBusSender
    {
        private readonly QueueClient _queueClient;
        private readonly IConfiguration _configuration;
        private const string QUEUE_NAME = "simplequeue";
 
        public ServiceBusSender(IConfiguration configuration)
        {
            _configuration = configuration;
            _queueClient = new QueueClient(
            _configuration
                .GetConnectionString("ServiceBusConnectionString"), 
                QUEUE_NAME);
        }
         
        public async Task SendMessage(MyPayload payload)
        {
            string data = JsonConvert.SerializeObject(payload);
            Message message = new Message(Encoding.UTF8.GetBytes(data));
 
            await _queueClient.SendAsync(message);
        }
    }
}

在API 1和API 2中象迎,我們需要將ServiceBusSender注冊(cè)到應(yīng)用程序的IOC容器中。這里為了測(cè)試方便,我們同時(shí)注冊(cè)Swagger服務(wù)砾淌。

public void ConfigureServices(IServiceCollection services)
{
    services.AddMvc();
 
    services.AddScoped<ServiceBusSender>();
 
    services.AddSwaggerGen(c =>
    {
        c.SwaggerDoc("v1", new Info
        {
            Version = "v1",
            Title = "Payload View API",
        });
    });
}

接下來(lái)啦撮,我們就可以在控制器中通過(guò)構(gòu)造函數(shù)注入的方式使用這個(gè)服務(wù)了。

在API1中汪厨,我們創(chuàng)建一個(gè)POST方法赃春,這個(gè)方法會(huì)將API接收到Payload對(duì)象發(fā)送到Azure Service Bus Queue中。

[HttpPost]
[ProducesResponseType(typeof(Payload), StatusCodes.Status200OK)]
[ProducesResponseType(typeof(Payload), StatusCodes.Status409Conflict)]
public async Task<IActionResult> Create([FromBody][Required]Payload request)
{
    if (data.Any(d => d.Id == request.Id))
    {
        return Conflict($"data with id {request.Id} already exists");
    }
 
    data.Add(request);
 
    // Send this to the bus for the other services
    await _serviceBusSender.SendMessage(new MyPayload
    {
        Goals = request.Goals,
        Name = request.Name,
        Delete = false
    });
 
    return Ok(request);
}

從Queue中獲取消息

為了監(jiān)聽(tīng)Azure Service Bus Queue, 并處理接收到的消息劫乱,我們創(chuàng)建了一個(gè)新類(lèi)ServiceBusConsumer织中,ServiceBusConsumer實(shí)現(xiàn)了IServiceBusConsumer接口。

Queue的連接字符串是使用IConfiguration讀取的衷戈。 RegisterOnMessageHandlerAndReceiveMessages方法負(fù)責(zé)注冊(cè)消息處理程序ProcessMessagesAsync處理消息狭吼。ProcessMessagesAsync方法會(huì)將得到的消息轉(zhuǎn)換成對(duì)象,并調(diào)用IProcessData接口完成最終的消息處理殖妇。

using Microsoft.Azure.ServiceBus;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
 
namespace ServiceBusMessaging
{
    public interface IServiceBusConsumer
    {
        void RegisterOnMessageHandlerAndReceiveMessages();
        Task CloseQueueAsync();
    }
 
    public class ServiceBusConsumer : IServiceBusConsumer
    {
        private readonly IProcessData _processData;
        private readonly IConfiguration _configuration;
        private readonly QueueClient _queueClient;
        private const string QUEUE_NAME = "simplequeue";
        private readonly ILogger _logger;
 
        public ServiceBusConsumer(IProcessData processData, 
            IConfiguration configuration, 
            ILogger<ServiceBusConsumer> logger)
        {
            _processData = processData;
            _configuration = configuration;
            _logger = logger;
            _queueClient = new QueueClient(
              _configuration.GetConnectionString("ServiceBusConnectionString"), QUEUE_NAME);
        }
 
        public void RegisterOnMessageHandlerAndReceiveMessages()
        {
            var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
            {
                MaxConcurrentCalls = 1,
                AutoComplete = false
            };
 
            _queueClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
        }
 
        private async Task ProcessMessagesAsync(Message message, CancellationToken token)
        {
            var myPayload = JsonConvert.DeserializeObject<MyPayload>(Encoding.UTF8.GetString(message.Body));
            _processData.Process(myPayload);
            await _queueClient.CompleteAsync(message.SystemProperties.LockToken);
        }
 
        private Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
        {
            _logger.LogError(exceptionReceivedEventArgs.Exception, "Message handler encountered an exception");
            var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
 
            _logger.LogDebug($"- Endpoint: {context.Endpoint}");
            _logger.LogDebug($"- Entity Path: {context.EntityPath}");
            _logger.LogDebug($"- Executing Action: {context.Action}");
 
            return Task.CompletedTask;
        }
 
        public async Task CloseQueueAsync()
        {
            await _queueClient.CloseAsync();
        }
    }
}

其中IProcessData接口存在于類(lèi)庫(kù)項(xiàng)目ServiceBusMessaging中刁笙,它是用來(lái)處理消息的。

public interface IProcessData
{
    void Process(MyPayload myPayload);
}

在Api 2中谦趣,我們創(chuàng)建一個(gè)ProcessData類(lèi)疲吸,它實(shí)現(xiàn)了IProcessData接口。

public class ProcessData : IProcessData
{
    public void Process(MyPayload myPayload)
    {
        DataServiceSimi.Data.Add(new Payload
        {
            Name = myPayload.Name,
            Goals = myPayload.Goals
        });
    }
}

這里為了簡(jiǎn)單測(cè)試前鹅,我們創(chuàng)建了一個(gè)靜態(tài)類(lèi)DataServiceSimi摘悴,其中存放了API2中所有保存Payload對(duì)象。同時(shí)嫡纠,我們還創(chuàng)建了一個(gè)新的控制器ViewPayloadMessagesController烦租,在其中添加了一個(gè)GET Action,并返回了靜態(tài)類(lèi)DataServiceSimi中的所有數(shù)據(jù)。

[Route("api/[controller]")]
[ApiController]
public class ViewPayloadMessagesController : ControllerBase
{
    [HttpGet]
    [ProducesResponseType(StatusCodes.Status200OK)]
    public ActionResult<List<Payload>> Get()
    {
        return Ok(DataServiceSimi.Data);
    }
}

最后我們還需要將ProcessData注冊(cè)到API2的IOC容器中除盏。

public void ConfigureServices(IServiceCollection services)
{
    services.AddMvc();

    services.AddSingleton<IServiceBusConsumer, ServiceBusConsumer>();
    services.AddTransient<IProcessData, ProcessData>();
}

最終效果

現(xiàn)在我們分別啟用2個(gè)Api項(xiàng)目叉橱,并在Api 1的Swagger文檔界面,調(diào)用POST請(qǐng)求者蠕,添加一個(gè)Payload

image

操作完成之后窃祝,我們?cè)L問(wèn)Api 2的/api/ViewPayloadMessages, 獲得結(jié)果如下,Api 1發(fā)出的消息出現(xiàn)在了Api 2的結(jié)果集中踱侣,這說(shuō)明Api 2從Azure Service Bus Queue中獲取了消息粪小,并保存在了自己的靜態(tài)類(lèi)DataServiceSimi中。

image
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末抡句,一起剝皮案震驚了整個(gè)濱河市探膊,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌待榔,老刑警劉巖逞壁,帶你破解...
    沈念sama閱讀 206,968評(píng)論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件流济,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡腌闯,警方通過(guò)查閱死者的電腦和手機(jī)绳瘟,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)姿骏,“玉大人糖声,你說(shuō)我怎么就攤上這事》质荩” “怎么了蘸泻?”我有些...
    開(kāi)封第一講書(shū)人閱讀 153,220評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀(guān)的道長(zhǎng)嘲玫。 經(jīng)常有香客問(wèn)我蟋恬,道長(zhǎng),這世上最難降的妖魔是什么趁冈? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,416評(píng)論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮拜马,結(jié)果婚禮上渗勘,老公的妹妹穿的比我還像新娘。我一直安慰自己俩莽,他們只是感情好旺坠,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,425評(píng)論 5 374
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著扮超,像睡著了一般取刃。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上出刷,一...
    開(kāi)封第一講書(shū)人閱讀 49,144評(píng)論 1 285
  • 那天璧疗,我揣著相機(jī)與錄音,去河邊找鬼馁龟。 笑死崩侠,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的坷檩。 我是一名探鬼主播却音,決...
    沈念sama閱讀 38,432評(píng)論 3 401
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼矢炼!你這毒婦竟也來(lái)了系瓢?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 37,088評(píng)論 0 261
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤句灌,失蹤者是張志新(化名)和其女友劉穎夷陋,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,586評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡肌稻,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,028評(píng)論 2 325
  • 正文 我和宋清朗相戀三年清蚀,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片爹谭。...
    茶點(diǎn)故事閱讀 38,137評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡枷邪,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出诺凡,到底是詐尸還是另有隱情东揣,我是刑警寧澤,帶...
    沈念sama閱讀 33,783評(píng)論 4 324
  • 正文 年R本政府宣布腹泌,位于F島的核電站嘶卧,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏凉袱。R本人自食惡果不足惜芥吟,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,343評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望专甩。 院中可真熱鬧钟鸵,春花似錦、人聲如沸涤躲。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,333評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)种樱。三九已至蒙袍,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間嫩挤,已是汗流浹背害幅。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,559評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留俐镐,地道東北人矫限。 一個(gè)月前我還...
    沈念sama閱讀 45,595評(píng)論 2 355
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像佩抹,于是被迫代替她去往敵國(guó)和親叼风。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,901評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容