springboot rabbitmq入門使用

RabbitMQ作為一款能實現(xiàn)高性能存儲分發(fā)消息的分布式中間件串结,具有異步通信弟晚、服務解耦、接口限流、消息分發(fā)和業(yè)務延遲處理等功能,在實際生產環(huán)境中具有很廣泛的應用绞绒。

為了能在項目中使用RabbitMQ,需要在本地安裝RabbitMQ并能進行簡單的使用√韫模可參考改教程安裝RabbitMQ:安裝教程

一、總的代碼目錄結構如下:
rabbitmq使用代碼目錄結構.png
  • entity包實體類order唐础,作為消息載體箱歧;
  • ackmodel包是寫消息確認消費機制矾飞,自動確認消費和手工確認消費;
  • delay包是寫延遲隊列呀邢,在業(yè)務延遲處理時可用到洒沦,如訂單30分鐘內未付款自動取消訂單相關業(yè)務;
  • exchangemodel包是寫不同類型交換機下的消息模型价淌,常見的三種消息模型是:direct-直接傳輸申眼、fanout-廣播、topic-主題消息模型蝉衣;
  • springevent包是spring事件驅動模型的demo括尸,要了解消息隊列,先了解一下spring的事件驅動會更好买乃;
  • 數(shù)據庫是用mysql姻氨,項目啟動需要連接本地數(shù)據庫,新建一個exercisegroup數(shù)據庫剪验;
  • appilication.yaml配置rabbitmq地址肴焊,pom文件引入amqp starter即可。
二功戚、spring事件驅動

(1)spring事件驅動模型由三部分構成娶眷,生產者、事件(消息)啸臀、消費者届宠,即生成者采用異步的方式把事件(消息)發(fā)送給消費者,消費者監(jiān)聽到事件(消息)再進一步處理乘粒。


spring事件驅動.jpeg

(2)代碼目錄結構如下:


springevent.png

(3)示例代碼
  • OrderEvent類豌注,訂單事件,繼承ApplicationEvent灯萍,:
public class OrderEvent extends ApplicationEvent {
    public OrderEvent(Order source) {
        super(source);
    }
}
  • OrderPublisher類轧铁,生產者,異步發(fā)送事件:
@Component
public class OrderPublisher {
    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;
    public void sendMsg() {
        Order order = new Order();
        order.setOrdernum("123456");
        OrderEvent orderEvent = new OrderEvent(order);
        //發(fā)送消息
        applicationEventPublisher.publishEvent(orderEvent);
    }
}
  • OrderConsumer類旦棉,消費者齿风,監(jiān)聽生產者發(fā)送過來的事件
@Component//加入Spring的IOC容器
@EnableAsync//允許異步執(zhí)行
@Slf4j
public class OrderConsumer implements ApplicationListener<OrderEvent> {
    @Override
    @Async
    public void onApplicationEvent(OrderEvent event) {
        log.info("監(jiān)聽到訂單,訂單號:{}", ((Order) event.getSource()).getOrdernum());
    }
}

(4)執(zhí)行MessageQueueApplicationTests的test方法即可看結果,RabbitMQ本質也是異步通信绑洛,消息在生產者端進行發(fā)送救斑,在消費者端進行監(jiān)聽,對監(jiān)聽到的消息進一步處理真屯,其功能更加強大脸候。


test-springevent.png
三、RabbitMQ一些專有名詞

Producer/Publisher生產者,投遞消息的一方纪他。
Consumer消費者鄙煤,接收消息的一方。
Message消息:實際的數(shù)據茶袒,如demo中的order訂單消息載體梯刚。
Queue隊列:是RabbitMQ的內部對象,用于存儲消息薪寓,最終將消息傳輸?shù)较M者亡资。
Exchange交換機:在RabbitMQ中,生產者發(fā)送消息到交換機向叉,由交換機將消息路由到一個或者多個隊列中
RoutingKey路由鍵:生產者將消息發(fā)給交換器的時候锥腻,一般會指定一個RoutingKey,用來指定這個消息的路由規(guī)則母谎。
Binding綁定:RabbitMQ中通過綁定將交換器與隊列關聯(lián)起來瘦黑,在綁定的時候一般會指定一個綁定鍵(BindingKey),這樣RabbitMQ就知道如何正確地將消息路由到隊列奇唤。

四幸斥、RabbitMQ使用

(1)簡單的demo


消息隊列流程圖.jpeg
簡單的demo.png

(2)示例代碼

  • RabbitmqConfig配置類,SimpleRabbitListenerContainerFactory Bean是消息監(jiān)聽容器咬扇,服務于監(jiān)聽者甲葬;RabbitTemplate是RabbitMQ發(fā)送消息的操作組件RabbitTemplate,此外配置類還有三個Bean懈贺,一個是隊列basicQueue用于存儲消息最終消息會被消費者監(jiān)聽到经窖,basicExchange是交換機,生產者發(fā)送消息到交換機根據路由規(guī)則發(fā)送到相應的隊列basicQueue上梭灿,basicBinding是負責綁定交換機basicExchange和隊列basicQueue画侣,根據路由規(guī)則綁定起來。創(chuàng)建隊列堡妒、交換機的名詞以及路由規(guī)則我都放到常量類RabbitMqConstants里面棉钧。
@Slf4j
@Configuration
public class RabbitmqConfig {
    @Autowired
    private CachingConnectionFactory connectionFactory;
    //自動裝配消息監(jiān)聽器所在的容器工廠配置類實例
    @Autowired
    private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
    /**
     * 下面為單一消費者實例的配置
     */
    @Bean(name = "singleListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainer(){
        //定義消息監(jiān)聽器所在的容器工廠
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        //設置容器工廠所用的實例
        factory.setConnectionFactory(connectionFactory);
        //設置消息在傳輸中的格式,在這里采用JSON的格式進行傳輸
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        //設置并發(fā)消費者實例的初始數(shù)量涕蚤。在這里為1個
        factory.setConcurrentConsumers(1);
        //設置并發(fā)消費者實例的最大數(shù)量。在這里為1個
        factory.setMaxConcurrentConsumers(1);
        //設置并發(fā)消費者實例中每個實例拉取的消息數(shù)量-在這里為1個
        factory.setPrefetchCount(1);
        return factory;
    }
    //自定義配置RabbitMQ發(fā)送消息的操作組件RabbitTemplate
    @Bean
    public RabbitTemplate rabbitTemplate(){
        //設置“發(fā)送消息后進行確認”
        connectionFactory.setPublisherConfirms(true);
        //設置“發(fā)送消息后返回確認信息”
        connectionFactory.setPublisherReturns(true);
        //構造發(fā)送消息組件實例對象
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        //發(fā)送消息后的诵,如果發(fā)送成功万栅,則輸出“消息發(fā)送成功”的反饋信息
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息發(fā)送成功:correlationData({}),ack({}),cause({})", correlationData,ack,cause));
        //發(fā)送消息后,如果發(fā)送失敗西疤,則輸出“消息發(fā)送失敗-消息丟失”的反饋信息
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丟失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message));
        //定義消息傳輸?shù)母袷綖镴SON字符串格式
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        //最終返回RabbitMQ的操作組件實例RabbitTemplate
        return rabbitTemplate;
    }
    //創(chuàng)建隊列
    @Bean
    public Queue basicQueue(){
        return new Queue(RabbitMqConstants.BASIC_QUEUE,true);
    }
    //創(chuàng)建交換機:在這里以DirectExchange為例
    @Bean
    public DirectExchange basicExchange(){
        return new DirectExchange(RabbitMqConstants.BASIC_EXCHANGE,true,false);
    }
    //創(chuàng)建綁定
    @Bean
    public Binding basicBinding(){
        return BindingBuilder.bind(basicQueue()).to(basicExchange()).with(RabbitMqConstants.BASICE_ROUTING_KEY);
    }
}
  • RabbitMqConstants常量類烦粒,存放創(chuàng)建隊列、交換機的名詞以及路由規(guī)則。
@Data
public class RabbitMqConstants {
    //隊列名詞
    public static final String BASIC_QUEUE = "mq.basic.info.queue";
    //交換機名詞
    public static final String BASIC_EXCHANGE = "mq.basic.info.exchange";
    //路由規(guī)則,實際為字符串
    public static final String BASICE_ROUTING_KEY = "mq.basic.info.routing.key";
}
  • BasicPublisher 類扰她,生產者兽掰,異步發(fā)送消息
@Component
@Slf4j
public class BasicPublisher {
    //定義RabbitMQ消息操作組件RabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 發(fā)送消息
     * @param message 待發(fā)送的消息
     */
    public void sendMsg(Order message){
        try {
            //指定消息模型中的交換機
            rabbitTemplate.setExchange(RabbitMqConstants.BASIC_EXCHANGE);
            //指定消息模型中的路由
            rabbitTemplate.setRoutingKey(RabbitMqConstants.BASICE_ROUTING_KEY);
            //轉化并發(fā)送消息
            rabbitTemplate.convertAndSend(message);
            log.info("rabbitmq demo-生產者-發(fā)送消息:{} ", JSONUtil.toJsonStr(message));
        } catch (Exception e) {
            log.error("rabbitmq demo-生產者-發(fā)送消息發(fā)生異常:{} ", message, e.fillInStackTrace());
        }
    }
}
  • BasicConsumer 類,消費者徒役,監(jiān)聽到消息時對消息進行處理孽尽,需要為消費者設置監(jiān)聽的隊列mq.basic.info.queue以及監(jiān)聽容器singleListenerContainer。
@Component
@Slf4j
public class BasicConsumer {
    /**
     * 監(jiān)聽并接收消費隊列中的消息-在這里采用單一容器工廠實例即可
     */
    @RabbitListener(queues = RabbitMqConstants.BASIC_QUEUE, containerFactory = "singleListenerContainer")
    public void consumeMsg(Order message) {
        try {
            log.info("rabbitmq demo-消費者-監(jiān)聽消費到消息:{} ", JSONUtil.toJsonStr(message));
        } catch (Exception e) {
            log.error("rabbitmq demo-消費者-發(fā)生異常:", e.fillInStackTrace());
        }
    }
}

(3)安裝好erlang語言以及rabbitmq之后忧勿,項目啟動杉女,訪問http://127.0.0.1:15672,輸入默認賬號密碼鸳吸,可以看到:


消息隊列.png
交換機.png
交換機綁定關系.png

(4)運行test方法:

    @Test
    public void testBasicPublish() {
        Order order = new Order();
        order.setOrdernum("123456");
        basicPublisher.sendMsg(order);
    }
生產者發(fā)送消息.png
消費者監(jiān)聽消息.png

下一篇:springboot rabbitmq不同交換機類型實戰(zhàn)

下一篇:springboot rabbitmq高可用消息確認消費實戰(zhàn)

參考資料:
《分布式中間件實戰(zhàn)》
《rabbitmq實戰(zhàn)指南》

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末熏挎,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子晌砾,更是在濱河造成了極大的恐慌坎拐,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,692評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件养匈,死亡現(xiàn)場離奇詭異哼勇,居然都是意外死亡,警方通過查閱死者的電腦和手機乖寒,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,482評論 3 392
  • 文/潘曉璐 我一進店門猴蹂,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人楣嘁,你說我怎么就攤上這事磅轻。” “怎么了逐虚?”我有些...
    開封第一講書人閱讀 162,995評論 0 353
  • 文/不壞的土叔 我叫張陵聋溜,是天一觀的道長。 經常有香客問我叭爱,道長撮躁,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,223評論 1 292
  • 正文 為了忘掉前任买雾,我火速辦了婚禮把曼,結果婚禮上,老公的妹妹穿的比我還像新娘漓穿。我一直安慰自己嗤军,他們只是感情好,可當我...
    茶點故事閱讀 67,245評論 6 388
  • 文/花漫 我一把揭開白布晃危。 她就那樣靜靜地躺著叙赚,像睡著了一般老客。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上震叮,一...
    開封第一講書人閱讀 51,208評論 1 299
  • 那天胧砰,我揣著相機與錄音,去河邊找鬼苇瓣。 笑死尉间,一個胖子當著我的面吹牛,可吹牛的內容都是我干的钓简。 我是一名探鬼主播乌妒,決...
    沈念sama閱讀 40,091評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼外邓!你這毒婦竟也來了撤蚊?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 38,929評論 0 274
  • 序言:老撾萬榮一對情侶失蹤损话,失蹤者是張志新(化名)和其女友劉穎侦啸,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體丧枪,經...
    沈念sama閱讀 45,346評論 1 311
  • 正文 獨居荒郊野嶺守林人離奇死亡光涂,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,570評論 2 333
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了拧烦。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片忘闻。...
    茶點故事閱讀 39,739評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖恋博,靈堂內的尸體忽然破棺而出齐佳,到底是詐尸還是另有隱情,我是刑警寧澤债沮,帶...
    沈念sama閱讀 35,437評論 5 344
  • 正文 年R本政府宣布炼吴,位于F島的核電站,受9級特大地震影響疫衩,放射性物質發(fā)生泄漏硅蹦。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,037評論 3 326
  • 文/蒙蒙 一闷煤、第九天 我趴在偏房一處隱蔽的房頂上張望童芹。 院中可真熱鬧,春花似錦鲤拿、人聲如沸辐脖。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,677評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至,卻和暖如春幕庐,著一層夾襖步出監(jiān)牢的瞬間久锥,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,833評論 1 269
  • 我被黑心中介騙來泰國打工异剥, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留瑟由,地道東北人。 一個月前我還...
    沈念sama閱讀 47,760評論 2 369
  • 正文 我出身青樓冤寿,卻偏偏與公主長得像歹苦,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子督怜,可洞房花燭夜當晚...
    茶點故事閱讀 44,647評論 2 354

推薦閱讀更多精彩內容