原文:USING AZURE SERVICE BUS QUEUES WITH ASP.NET CORE SERVICES
作者:damienbod
譯文:如何在ASP.NET Core中使用Azure Service Bus Queue
地址:https://www.cnblogs.com/lwqlun/p/10760227.html
作者:Lamond Lu
源代碼: https://github.com/lamondlu/AzureServiceBusMessaging
本文展示了如何使用Azure Service Bus Queue, 實(shí)現(xiàn)2個(gè)ASP.NET Core Api應(yīng)用之間的消息傳輸排拷。
配置Azure Service Bus Queue
你可以從官網(wǎng)文檔中了解到如何配置一個(gè)Azure Service Bus Queue.
https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-create-namespace-portal
這里我們使用Queue或者Topic來(lái)實(shí)現(xiàn)消息傳輸浪读。Queue是一種消息傳輸類(lèi)型,一旦一個(gè)消息被一個(gè)消費(fèi)者接收了流码,該消息就會(huì)從Queue中被移除。
與Queue不同辞做,Topic提供的是一對(duì)多的通訊方式莺掠。
架構(gòu)圖
整個(gè)應(yīng)用的實(shí)現(xiàn)如下:
- Api 1負(fù)責(zé)發(fā)送消息
- Api 2負(fù)責(zé)監(jiān)聽(tīng)Azure Service Bus,并處理接收到的消息
實(shí)現(xiàn)一個(gè)Service Bus Queue
這里我們首先需要引入** Microsoft.Azure.ServiceBus ** 程序集闯冷。Microsoft.Azure.ServiceBus是Azure Service Bus的客戶(hù)端庫(kù)砂心。針對(duì)Service Bus的連接字符串我們保存在項(xiàng)目的User Secret中。當(dāng)部署項(xiàng)目的時(shí)候蛇耀,我們可以使用Azure Key Valut來(lái)設(shè)置這個(gè)Secret值辩诞。
在Visual Studio中,右鍵點(diǎn)擊API1, API2項(xiàng)目屬性纺涤,選擇Manage User Secrets就可以管理當(dāng)前項(xiàng)目使用的所有私密信息译暂。
為了發(fā)送向Azure Service Bus Queue發(fā)送消息,我們需要?jiǎng)?chuàng)建一個(gè)SendMessage
方法洒琢,并接收一個(gè)消息參數(shù)秧秉。這里我們創(chuàng)建了一個(gè)我們自己的消息內(nèi)容類(lèi)型MyPayload
, 將當(dāng)前該MyPayload
對(duì)象序列化成Json字符串, 添加到一個(gè)Message
對(duì)象中衰抑。
using Microsoft.Azure.ServiceBus;
using Microsoft.Extensions.Configuration;
using Newtonsoft.Json;
using System.Text;
using System.Threading.Tasks;
namespace ServiceBusMessaging
{
public class ServiceBusSender
{
private readonly QueueClient _queueClient;
private readonly IConfiguration _configuration;
private const string QUEUE_NAME = "simplequeue";
public ServiceBusSender(IConfiguration configuration)
{
_configuration = configuration;
_queueClient = new QueueClient(
_configuration
.GetConnectionString("ServiceBusConnectionString"),
QUEUE_NAME);
}
public async Task SendMessage(MyPayload payload)
{
string data = JsonConvert.SerializeObject(payload);
Message message = new Message(Encoding.UTF8.GetBytes(data));
await _queueClient.SendAsync(message);
}
}
}
在API 1和API 2中象迎,我們需要將ServiceBusSender
注冊(cè)到應(yīng)用程序的IOC容器中。這里為了測(cè)試方便,我們同時(shí)注冊(cè)Swagger
服務(wù)砾淌。
public void ConfigureServices(IServiceCollection services)
{
services.AddMvc();
services.AddScoped<ServiceBusSender>();
services.AddSwaggerGen(c =>
{
c.SwaggerDoc("v1", new Info
{
Version = "v1",
Title = "Payload View API",
});
});
}
接下來(lái)啦撮,我們就可以在控制器中通過(guò)構(gòu)造函數(shù)注入的方式使用這個(gè)服務(wù)了。
在API1中汪厨,我們創(chuàng)建一個(gè)POST方法赃春,這個(gè)方法會(huì)將API接收到Payload
對(duì)象發(fā)送到Azure Service Bus Queue中。
[HttpPost]
[ProducesResponseType(typeof(Payload), StatusCodes.Status200OK)]
[ProducesResponseType(typeof(Payload), StatusCodes.Status409Conflict)]
public async Task<IActionResult> Create([FromBody][Required]Payload request)
{
if (data.Any(d => d.Id == request.Id))
{
return Conflict($"data with id {request.Id} already exists");
}
data.Add(request);
// Send this to the bus for the other services
await _serviceBusSender.SendMessage(new MyPayload
{
Goals = request.Goals,
Name = request.Name,
Delete = false
});
return Ok(request);
}
從Queue中獲取消息
為了監(jiān)聽(tīng)Azure Service Bus Queue, 并處理接收到的消息劫乱,我們創(chuàng)建了一個(gè)新類(lèi)ServiceBusConsumer
织中,ServiceBusConsumer
實(shí)現(xiàn)了IServiceBusConsumer
接口。
Queue的連接字符串是使用IConfiguration
讀取的衷戈。 RegisterOnMessageHandlerAndReceiveMessages
方法負(fù)責(zé)注冊(cè)消息處理程序ProcessMessagesAsync
處理消息狭吼。ProcessMessagesAsync
方法會(huì)將得到的消息轉(zhuǎn)換成對(duì)象,并調(diào)用IProcessData
接口完成最終的消息處理殖妇。
using Microsoft.Azure.ServiceBus;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ServiceBusMessaging
{
public interface IServiceBusConsumer
{
void RegisterOnMessageHandlerAndReceiveMessages();
Task CloseQueueAsync();
}
public class ServiceBusConsumer : IServiceBusConsumer
{
private readonly IProcessData _processData;
private readonly IConfiguration _configuration;
private readonly QueueClient _queueClient;
private const string QUEUE_NAME = "simplequeue";
private readonly ILogger _logger;
public ServiceBusConsumer(IProcessData processData,
IConfiguration configuration,
ILogger<ServiceBusConsumer> logger)
{
_processData = processData;
_configuration = configuration;
_logger = logger;
_queueClient = new QueueClient(
_configuration.GetConnectionString("ServiceBusConnectionString"), QUEUE_NAME);
}
public void RegisterOnMessageHandlerAndReceiveMessages()
{
var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
{
MaxConcurrentCalls = 1,
AutoComplete = false
};
_queueClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
}
private async Task ProcessMessagesAsync(Message message, CancellationToken token)
{
var myPayload = JsonConvert.DeserializeObject<MyPayload>(Encoding.UTF8.GetString(message.Body));
_processData.Process(myPayload);
await _queueClient.CompleteAsync(message.SystemProperties.LockToken);
}
private Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
_logger.LogError(exceptionReceivedEventArgs.Exception, "Message handler encountered an exception");
var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
_logger.LogDebug($"- Endpoint: {context.Endpoint}");
_logger.LogDebug($"- Entity Path: {context.EntityPath}");
_logger.LogDebug($"- Executing Action: {context.Action}");
return Task.CompletedTask;
}
public async Task CloseQueueAsync()
{
await _queueClient.CloseAsync();
}
}
}
其中IProcessData
接口存在于類(lèi)庫(kù)項(xiàng)目ServiceBusMessaging
中刁笙,它是用來(lái)處理消息的。
public interface IProcessData
{
void Process(MyPayload myPayload);
}
在Api 2中谦趣,我們創(chuàng)建一個(gè)ProcessData
類(lèi)疲吸,它實(shí)現(xiàn)了IProcessData
接口。
public class ProcessData : IProcessData
{
public void Process(MyPayload myPayload)
{
DataServiceSimi.Data.Add(new Payload
{
Name = myPayload.Name,
Goals = myPayload.Goals
});
}
}
這里為了簡(jiǎn)單測(cè)試前鹅,我們創(chuàng)建了一個(gè)靜態(tài)類(lèi)
DataServiceSimi
摘悴,其中存放了API2中所有保存Payload
對(duì)象。同時(shí)嫡纠,我們還創(chuàng)建了一個(gè)新的控制器ViewPayloadMessagesController
烦租,在其中添加了一個(gè)GET Action,并返回了靜態(tài)類(lèi)DataServiceSimi
中的所有數(shù)據(jù)。
[Route("api/[controller]")]
[ApiController]
public class ViewPayloadMessagesController : ControllerBase
{
[HttpGet]
[ProducesResponseType(StatusCodes.Status200OK)]
public ActionResult<List<Payload>> Get()
{
return Ok(DataServiceSimi.Data);
}
}
最后我們還需要將ProcessData
注冊(cè)到API2的IOC容器中除盏。
public void ConfigureServices(IServiceCollection services)
{
services.AddMvc();
services.AddSingleton<IServiceBusConsumer, ServiceBusConsumer>();
services.AddTransient<IProcessData, ProcessData>();
}
最終效果
現(xiàn)在我們分別啟用2個(gè)Api項(xiàng)目叉橱,并在Api 1的Swagger文檔界面,調(diào)用POST請(qǐng)求者蠕,添加一個(gè)Payload
操作完成之后窃祝,我們?cè)L問(wèn)Api 2的/api/ViewPayloadMessages, 獲得結(jié)果如下,Api 1發(fā)出的消息出現(xiàn)在了Api 2的結(jié)果集中踱侣,這說(shuō)明Api 2從Azure Service Bus Queue中獲取了消息粪小,并保存在了自己的靜態(tài)類(lèi)DataServiceSimi
中。