在前面的幾篇博客里面已經(jīng)把RabbitMQ的一些理論詳細(xì)了說(shuō)明了赴穗,在這一篇中將記錄下Spring整合RabbitMQ,本文只是簡(jiǎn)單一個(gè)整合介紹加矛,屬于拋磚引玉斟览,具體實(shí)現(xiàn)還需大家深入研究哈..
代碼我會(huì)上傳到我的碼云上辑奈,如需下載請(qǐng)?jiān)谖恼碌哪┪矊ふ蚁螺d地址
1、POM引入
<!-- RabbitMQ -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.5.1</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.4.5.RELEASE</version>
</dependency>
2妓羊、RabbitMQ配置信息
添加rabbitmq.properties配置文件
rabbit.hosts=127.0.0.1
rabbit.username=hrabbit
rabbit.password=123
rabbit.port=5672
rabbit.virtualHost=/hrabbit
# 統(tǒng)一XML配置中易變部分的命名
rabbit.queue=rabbitmq_test
3躁绸、添加FastJson轉(zhuǎn)化類
spring amqp默認(rèn)的是jackson 的一個(gè)插件,目的將生產(chǎn)者生產(chǎn)的數(shù)據(jù)轉(zhuǎn)換為json存入消息隊(duì)列净刮,由于fastjson的速度快于jackson,這里替換為fastjson的一個(gè)實(shí)現(xiàn)
package www.hrabbit.cn.configer;
import com.alibaba.fastjson.JSON;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.*;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
/**
* @Auther: hrabbit
* @Date: 2018-07-02 下午6:35
* @Description:
*/
public class FastJsonMessageConverter extends AbstractJsonMessageConverter {
private static Log log = LogFactory.getLog(FastJsonMessageConverter.class);
private static ClassMapper classMapper = new DefaultClassMapper();
public FastJsonMessageConverter() {
super();
}
@Override
protected Message createMessage(Object object, MessageProperties messageProperties) {
byte[] bytes = null;
try {
String jsonString = JSON.toJSONString(object);
bytes = jsonString.getBytes(getDefaultCharset());
} catch (IOException e) {
throw new MessageConversionException("Failed to convert Message content", e);
}
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
messageProperties.setContentEncoding(getDefaultCharset());
if (bytes != null) {
messageProperties.setContentLength(bytes.length);
}
classMapper.fromClass(object.getClass(), messageProperties);
return new Message(bytes, messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
Object content = null;
MessageProperties properties = message.getMessageProperties();
if (properties != null) {
String contentType = properties.getContentType();
if (contentType != null && contentType.contains("json")) {
String encoding = properties.getContentEncoding();
if (encoding == null) {
encoding = getDefaultCharset();
}
try {
Class<?> targetClass = getClassMapper().toClass(message.getMessageProperties());
content = convertBytesToObject(message.getBody(), encoding, targetClass);
} catch (IOException e) {
throw new MessageConversionException("Failed to convert Message content", e);
}
} else {
log.warn("Could not convert incoming message with content-type [" + contentType + "]");
}
}
if (content == null) {
content = message.getBody();
}
return content;
}
private Object convertBytesToObject(byte[] body, String encoding, Class<?> clazz)
throws UnsupportedEncodingException {
String contentAsString = new String(body, encoding);
return JSON.parseObject(contentAsString, clazz);
}
}
4怎虫、添加amqp-application.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" >
<description>rabbitmq 連接服務(wù)配置</description>
<!-- 連接配置 -->
<rabbit:connection-factory id="connectionFactory" host="${rabbit.hosts}" username="${rabbit.username}" password="${rabbit.password}" port="${rabbit.port}" virtual-host="${rabbit.virtualHost}"/>
<rabbit:admin connection-factory="connectionFactory"/>
<!-- spring amqp默認(rèn)的是jackson 的一個(gè)插件,目的將生產(chǎn)者生產(chǎn)的數(shù)據(jù)轉(zhuǎn)換為json存入消息隊(duì)列大审,由于fastjson的速度快于jackson,這里替換為fastjson的一個(gè)實(shí)現(xiàn) -->
<bean id="jsonMessageConverter" class="www.hrabbit.cn.util.FastJsonMessageConverter"></bean>
<!-- spring template聲明-->
<rabbit:template exchange="koms" id="amqpTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter"/>
<!--
durable:是否持久化
exclusive: 僅創(chuàng)建者可以使用的私有隊(duì)列徒扶,斷開后自動(dòng)刪除
auto_delete: 當(dāng)所有消費(fèi)客戶端連接斷開后,是否自動(dòng)刪除隊(duì)列
-->
<!-- 申明一個(gè)消息隊(duì)列Queue -->
<rabbit:queue id="order" name="order" durable="true" auto-delete="false" exclusive="false" />
<!--
rabbit:direct-exchange:定義exchange模式為direct驾诈,意思就是消息與一個(gè)特定的路由鍵完全匹配溶浴,才會(huì)轉(zhuǎn)發(fā)士败。
rabbit:binding:設(shè)置消息queue匹配的key
-->
<!-- 交換機(jī)定義 -->
<rabbit:direct-exchange name="koms" durable="true" auto-delete="false" id="koms">
<rabbit:bindings>
<rabbit:binding queue="order" key="order"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<!--
queues:監(jiān)聽(tīng)的隊(duì)列谅将,多個(gè)的話用逗號(hào)(,)分隔
ref:監(jiān)聽(tīng)器
-->
<bean class="www.hrabbit.cn.rabbitMq.listener.MessageListener" id="messageListener"></bean>
<!-- 配置監(jiān)聽(tīng) acknowledeg = "manual" 設(shè)置手動(dòng)應(yīng)答 當(dāng)消息處理失敗時(shí):會(huì)一直重發(fā) 直到消息處理成功 -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
<!-- 配置監(jiān)聽(tīng)器 -->
<rabbit:listener queues="order" ref="messageListener"/>
</rabbit:listener-container>
</beans>
在這個(gè)項(xiàng)目中我的生產(chǎn)者和消費(fèi)者是放到同一個(gè)項(xiàng)目中的饥臂。項(xiàng)目中的監(jiān)聽(tīng)器,即為消費(fèi)者稽煤。
5囚戚、生產(chǎn)者
注入AmqpTemplate
模板驰坊,調(diào)用convertAndSend ();
方法添加消息;
package www.hrabbit.cn.rabbitMq.service.impl;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Service;
import www.hrabbit.cn.rabbitMq.service.SpittleService;
import javax.annotation.Resource;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* @Auther: hrabbit
* @Date: 2018-07-02 下午4:26
* @Description:
*/
@Service("spittleService")
public class SpittleServiceImpl implements SpittleService {
@Resource
private AmqpTemplate amqpTemplate;
/**
* 生產(chǎn)消息
* @return
*/
public Map<String,Object> spittleMsg(){
Map<String,Object> dataList = new LinkedHashMap<>();
for (int i=0;i<10;i++){
dataList.put("order","msgResult:"+i);
amqpTemplate.convertAndSend("order","msgResult:"+i);
}
return dataList;
}
}
6、添加監(jiān)聽(tīng)器(即消費(fèi)者)
package www.hrabbit.cn.rabbitMq.listener;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.io.IOException;
/**
* @Auther: hrabbit
* @Date: 2018-07-02 下午4:47
* @Description:
*/
@Component
public class MessageListener implements ChannelAwareMessageListener {
private Logger logger= LoggerFactory.getLogger(MessageListener.class);
@Transactional
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//業(yè)務(wù)處理转培,放到action層浸须,并返回處理成功還是異常的flag
//boolean mqFlag=rabbitMaConsumerTaskAction.saveMq(arg0);
//還有一個(gè)點(diǎn)就是如何獲取mq消息的報(bào)文部分message邦泄?
String result=new String(message.getBody(),"UTF-8");
System.out.println("消息:"+result);
if(true){
basicACK(message,channel);//處理正常--ack
}else{
basicNACK(message,channel);//處理異常--nack
}
}
//正常消費(fèi)掉后通知mq服務(wù)器移除此條mq
private void basicACK(Message message,Channel channel){
try{
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch(IOException e){
logger.error("通知服務(wù)器移除mq時(shí)異常顺囊,異常信息:"+e);
}
}
//處理異常特碳,mq重回隊(duì)列
private void basicNACK(Message message,Channel channel) {
try {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
} catch (IOException e) {
logger.error("mq重新進(jìn)入服務(wù)器時(shí)出現(xiàn)異常,異常信息:" + e);
}
}
}
7站宗、啟動(dòng)項(xiàng)目益愈,測(cè)試
訪問(wèn)地址:http://localhost:8080/amqp/spittleMsg
生產(chǎn)了10條消息,此時(shí)查看控制臺(tái)10條消息都被消費(fèi)了库快!
項(xiàng)目地址:https://gitee.com/hrabbit/spring-rabbitMQ
系列文章:
RabbitMQ:RabbitMQ-理論基礎(chǔ)
RabbitMQ:RabbitMQ:快速入門hello word
RabbitMQ:RabbitMQ:work queues 工作隊(duì)列(Round-robin/Fair dispatch)
RabbitMQ:RabbitMQ:消息應(yīng)答與消息持久化
RabbitMQ:發(fā)布/訂閱 Publish/Subscribe
RabbitMQ:路由Routing
RabbitMQ:Topic類型的exchange
RabbitMQ:RabbitMQ之消息確認(rèn)機(jī)制(事務(wù)+Confirm)