RabbitMQ作為一款能實現(xiàn)高性能存儲分發(fā)消息的分布式中間件串结,具有異步通信弟晚、服務解耦、接口限流、消息分發(fā)和業(yè)務延遲處理等功能,在實際生產環(huán)境中具有很廣泛的應用绞绒。
為了能在項目中使用RabbitMQ,需要在本地安裝RabbitMQ并能進行簡單的使用√韫模可參考改教程安裝RabbitMQ:安裝教程
一、總的代碼目錄結構如下:
- entity包實體類order唐础,作為消息載體箱歧;
- ackmodel包是寫消息確認消費機制矾飞,自動確認消費和手工確認消費;
- delay包是寫延遲隊列呀邢,在業(yè)務延遲處理時可用到洒沦,如訂單30分鐘內未付款自動取消訂單相關業(yè)務;
- exchangemodel包是寫不同類型交換機下的消息模型价淌,常見的三種消息模型是:direct-直接傳輸申眼、fanout-廣播、topic-主題消息模型蝉衣;
- springevent包是spring事件驅動模型的demo括尸,要了解消息隊列,先了解一下spring的事件驅動會更好买乃;
- 數(shù)據庫是用mysql姻氨,項目啟動需要連接本地數(shù)據庫,新建一個exercisegroup數(shù)據庫剪验;
- appilication.yaml配置rabbitmq地址肴焊,pom文件引入amqp starter即可。
二功戚、spring事件驅動
(1)spring事件驅動模型由三部分構成娶眷,生產者、事件(消息)啸臀、消費者届宠,即生成者采用異步的方式把事件(消息)發(fā)送給消費者,消費者監(jiān)聽到事件(消息)再進一步處理乘粒。
(2)代碼目錄結構如下:
(3)示例代碼
- OrderEvent類豌注,訂單事件,繼承ApplicationEvent灯萍,:
public class OrderEvent extends ApplicationEvent {
public OrderEvent(Order source) {
super(source);
}
}
- OrderPublisher類轧铁,生產者,異步發(fā)送事件:
@Component
public class OrderPublisher {
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
public void sendMsg() {
Order order = new Order();
order.setOrdernum("123456");
OrderEvent orderEvent = new OrderEvent(order);
//發(fā)送消息
applicationEventPublisher.publishEvent(orderEvent);
}
}
- OrderConsumer類旦棉,消費者齿风,監(jiān)聽生產者發(fā)送過來的事件
@Component//加入Spring的IOC容器
@EnableAsync//允許異步執(zhí)行
@Slf4j
public class OrderConsumer implements ApplicationListener<OrderEvent> {
@Override
@Async
public void onApplicationEvent(OrderEvent event) {
log.info("監(jiān)聽到訂單,訂單號:{}", ((Order) event.getSource()).getOrdernum());
}
}
(4)執(zhí)行MessageQueueApplicationTests的test方法即可看結果,RabbitMQ本質也是異步通信绑洛,消息在生產者端進行發(fā)送救斑,在消費者端進行監(jiān)聽,對監(jiān)聽到的消息進一步處理真屯,其功能更加強大脸候。
三、RabbitMQ一些專有名詞
Producer/Publisher生產者,投遞消息的一方纪他。
Consumer消費者鄙煤,接收消息的一方。
Message消息:實際的數(shù)據茶袒,如demo中的order訂單消息載體梯刚。
Queue隊列:是RabbitMQ的內部對象,用于存儲消息薪寓,最終將消息傳輸?shù)较M者亡资。
Exchange交換機:在RabbitMQ中,生產者發(fā)送消息到交換機向叉,由交換機將消息路由到一個或者多個隊列中
RoutingKey路由鍵:生產者將消息發(fā)給交換器的時候锥腻,一般會指定一個RoutingKey,用來指定這個消息的路由規(guī)則母谎。
Binding綁定:RabbitMQ中通過綁定將交換器與隊列關聯(lián)起來瘦黑,在綁定的時候一般會指定一個綁定鍵(BindingKey),這樣RabbitMQ就知道如何正確地將消息路由到隊列奇唤。
四幸斥、RabbitMQ使用
(1)簡單的demo
(2)示例代碼
- RabbitmqConfig配置類,SimpleRabbitListenerContainerFactory Bean是消息監(jiān)聽容器咬扇,服務于監(jiān)聽者甲葬;RabbitTemplate是RabbitMQ發(fā)送消息的操作組件RabbitTemplate,此外配置類還有三個Bean懈贺,一個是隊列basicQueue用于存儲消息最終消息會被消費者監(jiān)聽到经窖,basicExchange是交換機,生產者發(fā)送消息到交換機根據路由規(guī)則發(fā)送到相應的隊列basicQueue上梭灿,basicBinding是負責綁定交換機basicExchange和隊列basicQueue画侣,根據路由規(guī)則綁定起來。創(chuàng)建隊列堡妒、交換機的名詞以及路由規(guī)則我都放到常量類RabbitMqConstants里面棉钧。
@Slf4j
@Configuration
public class RabbitmqConfig {
@Autowired
private CachingConnectionFactory connectionFactory;
//自動裝配消息監(jiān)聽器所在的容器工廠配置類實例
@Autowired
private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
/**
* 下面為單一消費者實例的配置
*/
@Bean(name = "singleListenerContainer")
public SimpleRabbitListenerContainerFactory listenerContainer(){
//定義消息監(jiān)聽器所在的容器工廠
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//設置容器工廠所用的實例
factory.setConnectionFactory(connectionFactory);
//設置消息在傳輸中的格式,在這里采用JSON的格式進行傳輸
factory.setMessageConverter(new Jackson2JsonMessageConverter());
//設置并發(fā)消費者實例的初始數(shù)量涕蚤。在這里為1個
factory.setConcurrentConsumers(1);
//設置并發(fā)消費者實例的最大數(shù)量。在這里為1個
factory.setMaxConcurrentConsumers(1);
//設置并發(fā)消費者實例中每個實例拉取的消息數(shù)量-在這里為1個
factory.setPrefetchCount(1);
return factory;
}
//自定義配置RabbitMQ發(fā)送消息的操作組件RabbitTemplate
@Bean
public RabbitTemplate rabbitTemplate(){
//設置“發(fā)送消息后進行確認”
connectionFactory.setPublisherConfirms(true);
//設置“發(fā)送消息后返回確認信息”
connectionFactory.setPublisherReturns(true);
//構造發(fā)送消息組件實例對象
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
//發(fā)送消息后的诵,如果發(fā)送成功万栅,則輸出“消息發(fā)送成功”的反饋信息
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息發(fā)送成功:correlationData({}),ack({}),cause({})", correlationData,ack,cause));
//發(fā)送消息后,如果發(fā)送失敗西疤,則輸出“消息發(fā)送失敗-消息丟失”的反饋信息
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丟失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message));
//定義消息傳輸?shù)母袷綖镴SON字符串格式
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
//最終返回RabbitMQ的操作組件實例RabbitTemplate
return rabbitTemplate;
}
//創(chuàng)建隊列
@Bean
public Queue basicQueue(){
return new Queue(RabbitMqConstants.BASIC_QUEUE,true);
}
//創(chuàng)建交換機:在這里以DirectExchange為例
@Bean
public DirectExchange basicExchange(){
return new DirectExchange(RabbitMqConstants.BASIC_EXCHANGE,true,false);
}
//創(chuàng)建綁定
@Bean
public Binding basicBinding(){
return BindingBuilder.bind(basicQueue()).to(basicExchange()).with(RabbitMqConstants.BASICE_ROUTING_KEY);
}
}
- RabbitMqConstants常量類烦粒,存放創(chuàng)建隊列、交換機的名詞以及路由規(guī)則。
@Data
public class RabbitMqConstants {
//隊列名詞
public static final String BASIC_QUEUE = "mq.basic.info.queue";
//交換機名詞
public static final String BASIC_EXCHANGE = "mq.basic.info.exchange";
//路由規(guī)則,實際為字符串
public static final String BASICE_ROUTING_KEY = "mq.basic.info.routing.key";
}
- BasicPublisher 類扰她,生產者兽掰,異步發(fā)送消息
@Component
@Slf4j
public class BasicPublisher {
//定義RabbitMQ消息操作組件RabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 發(fā)送消息
* @param message 待發(fā)送的消息
*/
public void sendMsg(Order message){
try {
//指定消息模型中的交換機
rabbitTemplate.setExchange(RabbitMqConstants.BASIC_EXCHANGE);
//指定消息模型中的路由
rabbitTemplate.setRoutingKey(RabbitMqConstants.BASICE_ROUTING_KEY);
//轉化并發(fā)送消息
rabbitTemplate.convertAndSend(message);
log.info("rabbitmq demo-生產者-發(fā)送消息:{} ", JSONUtil.toJsonStr(message));
} catch (Exception e) {
log.error("rabbitmq demo-生產者-發(fā)送消息發(fā)生異常:{} ", message, e.fillInStackTrace());
}
}
}
- BasicConsumer 類,消費者徒役,監(jiān)聽到消息時對消息進行處理孽尽,需要為消費者設置監(jiān)聽的隊列mq.basic.info.queue以及監(jiān)聽容器singleListenerContainer。
@Component
@Slf4j
public class BasicConsumer {
/**
* 監(jiān)聽并接收消費隊列中的消息-在這里采用單一容器工廠實例即可
*/
@RabbitListener(queues = RabbitMqConstants.BASIC_QUEUE, containerFactory = "singleListenerContainer")
public void consumeMsg(Order message) {
try {
log.info("rabbitmq demo-消費者-監(jiān)聽消費到消息:{} ", JSONUtil.toJsonStr(message));
} catch (Exception e) {
log.error("rabbitmq demo-消費者-發(fā)生異常:", e.fillInStackTrace());
}
}
}
(3)安裝好erlang語言以及rabbitmq之后忧勿,項目啟動杉女,訪問http://127.0.0.1:15672,輸入默認賬號密碼鸳吸,可以看到:
(4)運行test方法:
@Test
public void testBasicPublish() {
Order order = new Order();
order.setOrdernum("123456");
basicPublisher.sendMsg(order);
}
下一篇:springboot rabbitmq不同交換機類型實戰(zhàn)
下一篇:springboot rabbitmq高可用消息確認消費實戰(zhàn)
參考資料:
《分布式中間件實戰(zhàn)》
《rabbitmq實戰(zhàn)指南》