RabbitMQ筆記二十一 :priority queue

優(yōu)先級隊列(priority queue)

創(chuàng)建具有優(yōu)先級屬性的隊列

示列

生產(chǎn)端:

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import java.util.Random;
import java.util.concurrent.TimeUnit;

@ComponentScan
public class Application {

    private static MessageProperties getmessageProperties(){
        int priority = new Random().nextInt(5);
        System.out.println("====優(yōu)先級==="+priority);
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("text");
        messageProperties.setPriority(priority);
        return messageProperties;

    }
    public static void main(String[] args) throws Exception{
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
        RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);

        byte[] body = "hello world".getBytes();

        //一次性發(fā)送10條消息,優(yōu)先級分別是1到10
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.send("","zhihao.miao.user",new Message(body,getmessageProperties()));
        }

        TimeUnit.SECONDS.sleep(30);

        context.close();
    }
}

配置:

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MQConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        return rabbitAdmin;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }

}

控制臺打印:

====優(yōu)先級===1
十月 29, 2017 2:12:52 下午 org.springframework.amqp.rabbit.connection.CachingConnectionFactory createBareConnection
信息: Created new connection: connectionFactory#1184ab05:0/SimpleConnection@6a400542 [delegate=amqp://zhihao.miao@192.168.1.131:5672/, localPort= 52105]
====優(yōu)先級===1
====優(yōu)先級===4
====優(yōu)先級===3
====優(yōu)先級===3
====優(yōu)先級===1
====優(yōu)先級===3
====優(yōu)先級===2
====優(yōu)先級===1
====優(yōu)先級===4

消費端進行消費,首先看管控臺,


控制臺中get是按照順序進行獲取到的

代碼消費呢革娄?
應用啟動類:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import java.util.concurrent.TimeUnit;

@ComponentScan
public class Application {
    public static void main(String[] args) throws Exception{
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
        RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);

        System.out.println(rabbitTemplate);
        TimeUnit.SECONDS.sleep(40);
        context.close();
    }
}

配置

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MQConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        return rabbitAdmin;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }


    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames("zhihao.miao.user");
        container.setDefaultRequeueRejected(false);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setMessageListener(new ChannelAwareMessageListener(){
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                System.out.println("=====消費消息======");
                System.out.println("消息的優(yōu)先級是:"+message.getMessageProperties().getPriority()+
                        " 消息內(nèi)容是:"+new String(message.getBody()));
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            }
        });
        return container;
    }
}
信息: Created new connection: connectionFactory#687681be:0/SimpleConnection@61b4ee6c [delegate=amqp://zhihao.miao@192.168.1.131:5672/, localPort= 52155]
=====消費消息======
消息的優(yōu)先級是:4消息內(nèi)容是:hello world
org.springframework.amqp.rabbit.core.RabbitTemplate@7193666c
=====消費消息======
消息的優(yōu)先級是:4消息內(nèi)容是:hello world
=====消費消息======
消息的優(yōu)先級是:3消息內(nèi)容是:hello world
=====消費消息======
消息的優(yōu)先級是:3消息內(nèi)容是:hello world
=====消費消息======
消息的優(yōu)先級是:3消息內(nèi)容是:hello world
=====消費消息======
消息的優(yōu)先級是:2消息內(nèi)容是:hello world
=====消費消息======
消息的優(yōu)先級是:1消息內(nèi)容是:hello world
=====消費消息======
消息的優(yōu)先級是:1消息內(nèi)容是:hello world
=====消費消息======
消息的優(yōu)先級是:1消息內(nèi)容是:hello world
=====消費消息======
消息的優(yōu)先級是:1消息內(nèi)容是:hello world

很明顯也是按照優(yōu)先級順序來消費的。

示列2

如果我們設置的發(fā)送消息的優(yōu)先級都高于隊列zhihao.miao.order設置的x-max-priority屬性呢冕碟?

@ComponentScan
public class Application {

    private static MessageProperties getmessageProperties(){
        int priority = new Random().nextInt(5)+10;
        System.out.println("=====優(yōu)先級==="+priority);
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("text");
        messageProperties.setPriority(priority);
        return messageProperties;

    }
    public static void main(String[] args) throws Exception{
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
        RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);

        byte[] body = "hello world".getBytes();

        //一次性發(fā)送10條消息拦惋,優(yōu)先級分別是1到10
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.send("","zhihao.miao.order",new Message(body,getmessageProperties()));
        }

        TimeUnit.SECONDS.sleep(30);

        context.close();
    }
}

客戶端發(fā)送的消息的優(yōu)先級,控制臺打印出:

====優(yōu)先級===13
十月 29, 2017 2:25:36 下午 org.springframework.amqp.rabbit.connection.CachingConnectionFactory createBareConnection
信息: Created new connection: connectionFactory#1184ab05:0/SimpleConnection@6a400542 [delegate=amqp://zhihao.miao@192.168.1.131:5672/, localPort= 52235]
====優(yōu)先級===14
====優(yōu)先級===14
====優(yōu)先級===13
====優(yōu)先級===11
====優(yōu)先級===10
====優(yōu)先級===10
====優(yōu)先級===12
====優(yōu)先級===14
====優(yōu)先級===12

消費端代碼和上面一樣安寺,執(zhí)行程序厕妖,驗證消費消息順序

=====消費消息======
消息的優(yōu)先級是:13 消息內(nèi)容是:hello world
=====消費消息======
消息的優(yōu)先級是:14 消息內(nèi)容是:hello world
org.springframework.amqp.rabbit.core.RabbitTemplate@7193666c
=====消費消息======
消息的優(yōu)先級是:14 消息內(nèi)容是:hello world
=====消費消息======
消息的優(yōu)先級是:13 消息內(nèi)容是:hello world
=====消費消息======
消息的優(yōu)先級是:11 消息內(nèi)容是:hello world
=====消費消息======
消息的優(yōu)先級是:10 消息內(nèi)容是:hello world
=====消費消息======
消息的優(yōu)先級是:10 消息內(nèi)容是:hello world
=====消費消息======
消息的優(yōu)先級是:12 消息內(nèi)容是:hello world
=====消費消息======
消息的優(yōu)先級是:14 消息內(nèi)容是:hello world
=====消費消息======
消息的優(yōu)先級是:12 消息內(nèi)容是:hello world

發(fā)現(xiàn)沒有嚴格的順序,驗證了如果設置的優(yōu)先級大于隊列設置的x-max-priority屬性挑庶,則優(yōu)先級失效言秸。

發(fā)送消息之后可以通過http監(jiān)控可以看到消息的詳情:

http://192.168.1.131:15672/api/queues/%2F/zhihao.miao.user(/api/queues/vhost/name)
隊列的信息詳情

示列3

如果生產(chǎn)端發(fā)送很慢软能,消費者消息很快,則有可能不會嚴格的按照優(yōu)先級來進行消費举畸。

生產(chǎn)端每隔3s鐘發(fā)送一條消息查排,很明顯消費端消費也是按照發(fā)送的順序。

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import java.util.Random;
import java.util.concurrent.TimeUnit;

@ComponentScan
public class Application {

    private static MessageProperties getmessageProperties(){
        int priority = new Random().nextInt(5);
        System.out.println("====優(yōu)先級==="+priority);
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("text");
        messageProperties.setPriority(priority);
        return messageProperties;

    }
    public static void main(String[] args) throws Exception{
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
        RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);

        byte[] body = "hello world".getBytes();

        //一次性發(fā)送10條消息抄沮,優(yōu)先級分別是1到10
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.send("","zhihao.miao.user",new Message(body,getmessageProperties()));
            TimeUnit.SECONDS.sleep(3);
        }

        TimeUnit.SECONDS.sleep(30);

        context.close();
    }
}

我們發(fā)現(xiàn)生產(chǎn)端生產(chǎn)的順序和消費端消費的消息都是一致的跋核。

總結:

  • 創(chuàng)建優(yōu)先級隊列,需要增加x-max-priority參數(shù)叛买,指定一個數(shù)字砂代。表示最大的優(yōu)先級,建議優(yōu)先級設置為1~10之間率挣。
  • 發(fā)送消息的時候刻伊,需要設置priority屬性,最好不要超過上面指定的最大的優(yōu)先級难礼。
  • 如果生產(chǎn)端發(fā)送很慢娃圆,消費者消息很快玫锋,則有可能不會嚴格的按照優(yōu)先級來進行消費蛾茉。

第一,如果發(fā)送的消息的優(yōu)先級屬性小于設置的隊列屬性x-max-priority值撩鹿,則按優(yōu)先級的高低進行消費谦炬,數(shù)字越高則優(yōu)先級越高。
第二节沦,如果發(fā)送的消息的優(yōu)先級屬性都大于設置的隊列屬性x-max-priority值键思,則設置的優(yōu)先級失效,按照入隊列的順序進行消費甫贯。
第三吼鳞,如果消費端一直進行監(jiān)聽,而發(fā)送端一條條的發(fā)送消息叫搁,優(yōu)先級屬性也會失效赔桌。

RabbitMQ不能保證消息的嚴格的順序消費。

參考資料

priority queue

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末渴逻,一起剝皮案震驚了整個濱河市疾党,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌惨奕,老刑警劉巖雪位,帶你破解...
    沈念sama閱讀 221,695評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異梨撞,居然都是意外死亡雹洗,警方通過查閱死者的電腦和手機香罐,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,569評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來队伟,“玉大人穴吹,你說我怎么就攤上這事∈任辏” “怎么了港令?”我有些...
    開封第一講書人閱讀 168,130評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長锈颗。 經(jīng)常有香客問我顷霹,道長,這世上最難降的妖魔是什么击吱? 我笑而不...
    開封第一講書人閱讀 59,648評論 1 297
  • 正文 為了忘掉前任淋淀,我火速辦了婚禮,結果婚禮上覆醇,老公的妹妹穿的比我還像新娘朵纷。我一直安慰自己,他們只是感情好永脓,可當我...
    茶點故事閱讀 68,655評論 6 397
  • 文/花漫 我一把揭開白布袍辞。 她就那樣靜靜地躺著,像睡著了一般常摧。 火紅的嫁衣襯著肌膚如雪搅吁。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,268評論 1 309
  • 那天落午,我揣著相機與錄音谎懦,去河邊找鬼。 笑死溃斋,一個胖子當著我的面吹牛界拦,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播梗劫,決...
    沈念sama閱讀 40,835評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼享甸,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了在跳?” 一聲冷哼從身側響起枪萄,我...
    開封第一講書人閱讀 39,740評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎猫妙,沒想到半個月后瓷翻,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,286評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,375評論 3 340
  • 正文 我和宋清朗相戀三年齐帚,在試婚紗的時候發(fā)現(xiàn)自己被綠了妒牙。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,505評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡对妄,死狀恐怖湘今,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情剪菱,我是刑警寧澤摩瞎,帶...
    沈念sama閱讀 36,185評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站孝常,受9級特大地震影響旗们,放射性物質發(fā)生泄漏。R本人自食惡果不足惜构灸,卻給世界環(huán)境...
    茶點故事閱讀 41,873評論 3 333
  • 文/蒙蒙 一上渴、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧喜颁,春花似錦稠氮、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,357評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至稿茉,卻和暖如春锹锰,著一層夾襖步出監(jiān)牢的瞬間芥炭,已是汗流浹背漓库。 一陣腳步聲響...
    開封第一講書人閱讀 33,466評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留园蝠,地道東北人渺蒿。 一個月前我還...
    沈念sama閱讀 48,921評論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像彪薛,于是被迫代替她去往敵國和親茂装。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,515評論 2 359

推薦閱讀更多精彩內(nèi)容