Work Queues介紹
Work Queues簡而言之就是Producer將Message發(fā)送到Queues中滴须,公平調(diào)度的發(fā)送到個個worker處理。
Work Queues模型
值得注意的地方梁厉,在消息接受過程中,worker會遇到異常而崩潰劈猿,導致接收到的消息處理失敗入热,但是Queues發(fā)送Message并不知道這個是否已經(jīng)正確處理而自動刪除這條message。這樣會導致Message的丟失解总,所以需要實現(xiàn)手動Message acknowledgment贮匕。當處理成功是告知RabbitMQ 這條message處理OK并刪除。
除此之外還有一個Message的丟失風險花枫,就是當RabbitMQ 退出或者異常崩潰時刻盐,會導致queue和message的丟失,所以也要配置Message durability(持久化)劳翰。
公平調(diào)度(Fair dispatch)敦锌,RabbitMQ默認是平均分配message到各個worker。為防止出現(xiàn)某些worker因為處理比較復雜佳簸,大量的數(shù)據(jù)而一直處理繁忙狀態(tài)乙墙,其他的worker卻處于閑置狀態(tài),還不停的進行調(diào)度繁忙的worker,需要使用basicQos 方法設(shè)置 prefetchCount = 1 听想,就是告知RabbitMQ 不要同時的給一個worker大于1條Message腥刹。
Fair dispatch
演示代碼
producer:
static void Main(string[] args)
{
//參考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html
Console.WriteLine("********************producer***************");
Console.WriteLine("please Input send message:");
//連接到RabbitMQ
var factory = new RabbitMQ.Client.ConnectionFactory();
//第一種方式
factory.UserName = "admin";
factory.Password = "admin";
factory.VirtualHost = "/";
factory.HostName = "10.19.52.80";
//第二種方式
//factory.Uri = new Uri("amqp://admin:admin@10.19.52.80:5672/");
//產(chǎn)生一個連接對象
using (var conncetion = factory.CreateConnection())
{
//通過conncetion產(chǎn)生一個連接通道
using (var channel = conncetion.CreateModel())
{
//用代碼實現(xiàn) exchanges和Queues
//定義exchanges
string exchangeName = "Ewrokqueues";
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
//定義Queues
string queueName = "Qwrokqueues";
bool durable = true;//設(shè)RabbitMQ置持久化
channel.QueueDeclare(queueName, durable, false, false, null);
//綁定exchanges 和Queues
string routingKey = "task_queue";
channel.QueueBind(queueName, exchangeName, "", null);
//簡單設(shè)置隊列方式
//channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false,
// arguments: null);
for (int i =0;i<20;i++)
{
string message = i.ToString();
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "Ewrokqueues", routingKey: "", mandatory: false, basicProperties: properties, body: body);
Console.WriteLine("[producer] send : {0}", message);
Thread.Sleep(1000);
}
}
}
Console.ReadLine();
}
worker 代碼:
static void Main(string[] args)
{
//參考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html
Console.WriteLine("********************worker1(sleep 5s)***************");
//連接MQ
var factory = new ConnectionFactory();
factory.UserName = "admin";
factory.Password = "admin";
factory.VirtualHost = "/";
factory.HostName = "10.19.52.80";
//產(chǎn)生連接對象
using (var connection = factory.CreateConnection())
{
//通道
using (var channel = connection.CreateModel())
{
//公平調(diào)用
channel.BasicQos(prefetchSize:0,prefetchCount:1,global:false);
//訂閱方式獲取message
var consumer = new EventingBasicConsumer(channel);
//實現(xiàn)獲取message處理事件
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
//睡眠5s 另一個是1s
Thread.Sleep(5000);
Console.WriteLine("[worker1] received : {0}", message);
//手動設(shè)置回復
channel.BasicAck(deliveryTag:ea.DeliveryTag,multiple:false);
};
//設(shè)置手動回復認證
channel.BasicConsume(queue: "Qwrokqueues", autoAck: false, consumer: consumer);
Console.ReadLine();
}
}
}
P會循環(huán)發(fā)送20次,每秒發(fā)送一個數(shù)字到queue中哗魂,兩個worker接受message肛走。最后從運行結(jié)果可以看到整個分配情況,worker1第一個接受到“0”录别,在5秒處理完成后才接受“5”,而worker2會一直在處理邻吞,而不是出于等待閑置组题。
運行結(jié)果