優(yōu)先級隊列(priority queue)
創(chuàng)建具有優(yōu)先級屬性的隊列
示列
生產(chǎn)端:
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import java.util.Random;
import java.util.concurrent.TimeUnit;
@ComponentScan
public class Application {
private static MessageProperties getmessageProperties(){
int priority = new Random().nextInt(5);
System.out.println("====優(yōu)先級==="+priority);
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text");
messageProperties.setPriority(priority);
return messageProperties;
}
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
byte[] body = "hello world".getBytes();
//一次性發(fā)送10條消息,優(yōu)先級分別是1到10
for (int i = 0; i < 10; i++) {
rabbitTemplate.send("","zhihao.miao.user",new Message(body,getmessageProperties()));
}
TimeUnit.SECONDS.sleep(30);
context.close();
}
}
配置:
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MQConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
}
控制臺打印:
====優(yōu)先級===1
十月 29, 2017 2:12:52 下午 org.springframework.amqp.rabbit.connection.CachingConnectionFactory createBareConnection
信息: Created new connection: connectionFactory#1184ab05:0/SimpleConnection@6a400542 [delegate=amqp://zhihao.miao@192.168.1.131:5672/, localPort= 52105]
====優(yōu)先級===1
====優(yōu)先級===4
====優(yōu)先級===3
====優(yōu)先級===3
====優(yōu)先級===1
====優(yōu)先級===3
====優(yōu)先級===2
====優(yōu)先級===1
====優(yōu)先級===4
消費端進行消費,首先看管控臺,
控制臺中get是按照順序進行獲取到的
代碼消費呢革娄?
應用啟動類:
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import java.util.concurrent.TimeUnit;
@ComponentScan
public class Application {
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
System.out.println(rabbitTemplate);
TimeUnit.SECONDS.sleep(40);
context.close();
}
}
配置
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MQConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("zhihao.miao.user");
container.setDefaultRequeueRejected(false);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener(new ChannelAwareMessageListener(){
@Override
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println("=====消費消息======");
System.out.println("消息的優(yōu)先級是:"+message.getMessageProperties().getPriority()+
" 消息內(nèi)容是:"+new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
});
return container;
}
}
信息: Created new connection: connectionFactory#687681be:0/SimpleConnection@61b4ee6c [delegate=amqp://zhihao.miao@192.168.1.131:5672/, localPort= 52155]
=====消費消息======
消息的優(yōu)先級是:4消息內(nèi)容是:hello world
org.springframework.amqp.rabbit.core.RabbitTemplate@7193666c
=====消費消息======
消息的優(yōu)先級是:4消息內(nèi)容是:hello world
=====消費消息======
消息的優(yōu)先級是:3消息內(nèi)容是:hello world
=====消費消息======
消息的優(yōu)先級是:3消息內(nèi)容是:hello world
=====消費消息======
消息的優(yōu)先級是:3消息內(nèi)容是:hello world
=====消費消息======
消息的優(yōu)先級是:2消息內(nèi)容是:hello world
=====消費消息======
消息的優(yōu)先級是:1消息內(nèi)容是:hello world
=====消費消息======
消息的優(yōu)先級是:1消息內(nèi)容是:hello world
=====消費消息======
消息的優(yōu)先級是:1消息內(nèi)容是:hello world
=====消費消息======
消息的優(yōu)先級是:1消息內(nèi)容是:hello world
很明顯也是按照優(yōu)先級順序來消費的。
示列2
如果我們設置的發(fā)送消息的優(yōu)先級都高于隊列zhihao.miao.order
設置的x-max-priority
屬性呢冕碟?
@ComponentScan
public class Application {
private static MessageProperties getmessageProperties(){
int priority = new Random().nextInt(5)+10;
System.out.println("=====優(yōu)先級==="+priority);
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text");
messageProperties.setPriority(priority);
return messageProperties;
}
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
byte[] body = "hello world".getBytes();
//一次性發(fā)送10條消息拦惋,優(yōu)先級分別是1到10
for (int i = 0; i < 10; i++) {
rabbitTemplate.send("","zhihao.miao.order",new Message(body,getmessageProperties()));
}
TimeUnit.SECONDS.sleep(30);
context.close();
}
}
客戶端發(fā)送的消息的優(yōu)先級,控制臺打印出:
====優(yōu)先級===13
十月 29, 2017 2:25:36 下午 org.springframework.amqp.rabbit.connection.CachingConnectionFactory createBareConnection
信息: Created new connection: connectionFactory#1184ab05:0/SimpleConnection@6a400542 [delegate=amqp://zhihao.miao@192.168.1.131:5672/, localPort= 52235]
====優(yōu)先級===14
====優(yōu)先級===14
====優(yōu)先級===13
====優(yōu)先級===11
====優(yōu)先級===10
====優(yōu)先級===10
====優(yōu)先級===12
====優(yōu)先級===14
====優(yōu)先級===12
消費端代碼和上面一樣安寺,執(zhí)行程序厕妖,驗證消費消息順序
=====消費消息======
消息的優(yōu)先級是:13 消息內(nèi)容是:hello world
=====消費消息======
消息的優(yōu)先級是:14 消息內(nèi)容是:hello world
org.springframework.amqp.rabbit.core.RabbitTemplate@7193666c
=====消費消息======
消息的優(yōu)先級是:14 消息內(nèi)容是:hello world
=====消費消息======
消息的優(yōu)先級是:13 消息內(nèi)容是:hello world
=====消費消息======
消息的優(yōu)先級是:11 消息內(nèi)容是:hello world
=====消費消息======
消息的優(yōu)先級是:10 消息內(nèi)容是:hello world
=====消費消息======
消息的優(yōu)先級是:10 消息內(nèi)容是:hello world
=====消費消息======
消息的優(yōu)先級是:12 消息內(nèi)容是:hello world
=====消費消息======
消息的優(yōu)先級是:14 消息內(nèi)容是:hello world
=====消費消息======
消息的優(yōu)先級是:12 消息內(nèi)容是:hello world
發(fā)現(xiàn)沒有嚴格的順序,驗證了如果設置的優(yōu)先級大于隊列設置的x-max-priority
屬性挑庶,則優(yōu)先級失效言秸。
發(fā)送消息之后可以通過http監(jiān)控可以看到消息的詳情:
http://192.168.1.131:15672/api/queues/%2F/zhihao.miao.user(/api/queues/vhost/name)
隊列的信息詳情
示列3
如果生產(chǎn)端發(fā)送很慢软能,消費者消息很快,則有可能不會嚴格的按照優(yōu)先級來進行消費举畸。
生產(chǎn)端每隔3s鐘發(fā)送一條消息查排,很明顯消費端消費也是按照發(fā)送的順序。
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import java.util.Random;
import java.util.concurrent.TimeUnit;
@ComponentScan
public class Application {
private static MessageProperties getmessageProperties(){
int priority = new Random().nextInt(5);
System.out.println("====優(yōu)先級==="+priority);
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text");
messageProperties.setPriority(priority);
return messageProperties;
}
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
byte[] body = "hello world".getBytes();
//一次性發(fā)送10條消息抄沮,優(yōu)先級分別是1到10
for (int i = 0; i < 10; i++) {
rabbitTemplate.send("","zhihao.miao.user",new Message(body,getmessageProperties()));
TimeUnit.SECONDS.sleep(3);
}
TimeUnit.SECONDS.sleep(30);
context.close();
}
}
我們發(fā)現(xiàn)生產(chǎn)端生產(chǎn)的順序和消費端消費的消息都是一致的跋核。
總結:
- 創(chuàng)建優(yōu)先級隊列,需要增加
x-max-priority
參數(shù)叛买,指定一個數(shù)字砂代。表示最大的優(yōu)先級,建議優(yōu)先級設置為1~10之間率挣。 - 發(fā)送消息的時候刻伊,需要設置priority屬性,最好不要超過上面指定的最大的優(yōu)先級难礼。
- 如果生產(chǎn)端發(fā)送很慢娃圆,消費者消息很快玫锋,則有可能不會嚴格的按照優(yōu)先級來進行消費蛾茉。
第一,如果發(fā)送的消息的優(yōu)先級屬性小于設置的隊列屬性x-max-priority
值撩鹿,則按優(yōu)先級的高低進行消費谦炬,數(shù)字越高則優(yōu)先級越高。
第二节沦,如果發(fā)送的消息的優(yōu)先級屬性都大于設置的隊列屬性x-max-priority
值键思,則設置的優(yōu)先級失效,按照入隊列的順序進行消費甫贯。
第三吼鳞,如果消費端一直進行監(jiān)聽,而發(fā)送端一條條的發(fā)送消息叫搁,優(yōu)先級屬性也會失效赔桌。
RabbitMQ不能保證消息的嚴格的順序消費。