RabbitMQ:Spring整合RabbitMQ

13F2925356653218AFBD71C70F752C7C.jpg

在前面的幾篇博客里面已經(jīng)把RabbitMQ的一些理論詳細(xì)了說(shuō)明了赴穗,在這一篇中將記錄下Spring整合RabbitMQ,本文只是簡(jiǎn)單一個(gè)整合介紹加矛,屬于拋磚引玉斟览,具體實(shí)現(xiàn)還需大家深入研究哈..

代碼我會(huì)上傳到我的碼云上辑奈,如需下載請(qǐng)?jiān)谖恼碌哪┪矊ふ蚁螺d地址

1、POM引入

<!-- RabbitMQ -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.5.1</version>
</dependency>
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>1.4.5.RELEASE</version>
</dependency>

2妓羊、RabbitMQ配置信息

添加rabbitmq.properties配置文件

rabbit.hosts=127.0.0.1
rabbit.username=hrabbit
rabbit.password=123
rabbit.port=5672
rabbit.virtualHost=/hrabbit
# 統(tǒng)一XML配置中易變部分的命名
rabbit.queue=rabbitmq_test

3躁绸、添加FastJson轉(zhuǎn)化類

spring amqp默認(rèn)的是jackson 的一個(gè)插件,目的將生產(chǎn)者生產(chǎn)的數(shù)據(jù)轉(zhuǎn)換為json存入消息隊(duì)列净刮,由于fastjson的速度快于jackson,這里替換為fastjson的一個(gè)實(shí)現(xiàn)

package www.hrabbit.cn.configer;


import com.alibaba.fastjson.JSON;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.*;

import java.io.IOException;
import java.io.UnsupportedEncodingException;


/**
 * @Auther: hrabbit
 * @Date: 2018-07-02 下午6:35
 * @Description:
 */


public class FastJsonMessageConverter  extends AbstractJsonMessageConverter {

    private static Log log = LogFactory.getLog(FastJsonMessageConverter.class);

    private static ClassMapper classMapper = new DefaultClassMapper();

    public FastJsonMessageConverter() {
        super();
    }

    @Override
    protected Message createMessage(Object object, MessageProperties messageProperties) {
        byte[] bytes = null;
        try {
            String jsonString = JSON.toJSONString(object);
            bytes = jsonString.getBytes(getDefaultCharset());
        } catch (IOException e) {
            throw new MessageConversionException("Failed to convert Message content", e);
        }
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
        messageProperties.setContentEncoding(getDefaultCharset());
        if (bytes != null) {
            messageProperties.setContentLength(bytes.length);
        }
        classMapper.fromClass(object.getClass(), messageProperties);
        return new Message(bytes, messageProperties);
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        Object content = null;
        MessageProperties properties = message.getMessageProperties();
        if (properties != null) {
            String contentType = properties.getContentType();
            if (contentType != null && contentType.contains("json")) {
                String encoding = properties.getContentEncoding();
                if (encoding == null) {
                    encoding = getDefaultCharset();
                }
                try {
                    Class<?> targetClass = getClassMapper().toClass(message.getMessageProperties());
                    content = convertBytesToObject(message.getBody(), encoding, targetClass);
                } catch (IOException e) {
                    throw new MessageConversionException("Failed to convert Message content", e);
                }
            } else {
                log.warn("Could not convert incoming message with content-type [" + contentType + "]");
            }
        }
        if (content == null) {
            content = message.getBody();
        }
        return content;
    }

    private Object convertBytesToObject(byte[] body, String encoding, Class<?> clazz)
            throws UnsupportedEncodingException {
        String contentAsString = new String(body, encoding);
        return JSON.parseObject(contentAsString, clazz);
    }
}

4怎虫、添加amqp-application.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/rabbit
    http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" >

    <description>rabbitmq 連接服務(wù)配置</description>

    <!-- 連接配置 -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbit.hosts}" username="${rabbit.username}" password="${rabbit.password}" port="${rabbit.port}"  virtual-host="${rabbit.virtualHost}"/>
    <rabbit:admin connection-factory="connectionFactory"/>

    <!-- spring amqp默認(rèn)的是jackson 的一個(gè)插件,目的將生產(chǎn)者生產(chǎn)的數(shù)據(jù)轉(zhuǎn)換為json存入消息隊(duì)列大审,由于fastjson的速度快于jackson,這里替換為fastjson的一個(gè)實(shí)現(xiàn) -->
    <bean id="jsonMessageConverter"  class="www.hrabbit.cn.util.FastJsonMessageConverter"></bean>

    <!-- spring template聲明-->
    <rabbit:template exchange="koms" id="amqpTemplate"  connection-factory="connectionFactory" message-converter="jsonMessageConverter"/>

    <!--
        durable:是否持久化

        exclusive: 僅創(chuàng)建者可以使用的私有隊(duì)列徒扶,斷開后自動(dòng)刪除

        auto_delete: 當(dāng)所有消費(fèi)客戶端連接斷開后,是否自動(dòng)刪除隊(duì)列
     -->

    <!--  申明一個(gè)消息隊(duì)列Queue   -->
    <rabbit:queue id="order" name="order" durable="true" auto-delete="false" exclusive="false" />
    <!--
     rabbit:direct-exchange:定義exchange模式為direct驾诈,意思就是消息與一個(gè)特定的路由鍵完全匹配溶浴,才會(huì)轉(zhuǎn)發(fā)士败。

    rabbit:binding:設(shè)置消息queue匹配的key
     -->
    <!-- 交換機(jī)定義 -->
    <rabbit:direct-exchange name="koms" durable="true" auto-delete="false" id="koms">
        <rabbit:bindings>
            <rabbit:binding queue="order" key="order"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!--
         queues:監(jiān)聽(tīng)的隊(duì)列谅将,多個(gè)的話用逗號(hào)(,)分隔

        ref:監(jiān)聽(tīng)器
     -->
    <bean class="www.hrabbit.cn.rabbitMq.listener.MessageListener" id="messageListener"></bean>
    <!-- 配置監(jiān)聽(tīng)  acknowledeg = "manual"   設(shè)置手動(dòng)應(yīng)答  當(dāng)消息處理失敗時(shí):會(huì)一直重發(fā)  直到消息處理成功 -->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
        <!-- 配置監(jiān)聽(tīng)器 -->
        <rabbit:listener queues="order" ref="messageListener"/>
    </rabbit:listener-container>
</beans>

在這個(gè)項(xiàng)目中我的生產(chǎn)者和消費(fèi)者是放到同一個(gè)項(xiàng)目中的饥臂。項(xiàng)目中的監(jiān)聽(tīng)器,即為消費(fèi)者稽煤。

5囚戚、生產(chǎn)者

注入AmqpTemplate模板驰坊,調(diào)用convertAndSend ();方法添加消息;

package www.hrabbit.cn.rabbitMq.service.impl;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Service;
import www.hrabbit.cn.rabbitMq.service.SpittleService;

import javax.annotation.Resource;
import java.util.LinkedHashMap;
import java.util.Map;

/**
 * @Auther: hrabbit
 * @Date: 2018-07-02 下午4:26
 * @Description:
 */
@Service("spittleService")
public class SpittleServiceImpl implements SpittleService {

    @Resource
    private AmqpTemplate amqpTemplate;

    /**
     * 生產(chǎn)消息
     * @return
     */
    public Map<String,Object> spittleMsg(){

        Map<String,Object> dataList = new LinkedHashMap<>();

        for (int i=0;i<10;i++){
            dataList.put("order","msgResult:"+i);
            amqpTemplate.convertAndSend("order","msgResult:"+i);
        }
        return dataList;
    }

}

6、添加監(jiān)聽(tīng)器(即消費(fèi)者)

package www.hrabbit.cn.rabbitMq.listener;

import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.io.IOException;

/**
 * @Auther: hrabbit
 * @Date: 2018-07-02 下午4:47
 * @Description:
 */
@Component
public class MessageListener implements ChannelAwareMessageListener {

    private Logger logger= LoggerFactory.getLogger(MessageListener.class);

    @Transactional
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        //業(yè)務(wù)處理转培,放到action層浸须,并返回處理成功還是異常的flag
        //boolean mqFlag=rabbitMaConsumerTaskAction.saveMq(arg0);
        //還有一個(gè)點(diǎn)就是如何獲取mq消息的報(bào)文部分message邦泄?
        String result=new String(message.getBody(),"UTF-8");
        System.out.println("消息:"+result);
        if(true){
            basicACK(message,channel);//處理正常--ack
        }else{
            basicNACK(message,channel);//處理異常--nack
        }
    }


    //正常消費(fèi)掉后通知mq服務(wù)器移除此條mq
    private void basicACK(Message message,Channel channel){
        try{
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }catch(IOException e){
            logger.error("通知服務(wù)器移除mq時(shí)異常顺囊,異常信息:"+e);
        }
    }
    //處理異常特碳,mq重回隊(duì)列
    private void basicNACK(Message message,Channel channel) {
        try {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        } catch (IOException e) {
            logger.error("mq重新進(jìn)入服務(wù)器時(shí)出現(xiàn)異常,異常信息:" + e);
        }
    }
}

7站宗、啟動(dòng)項(xiàng)目益愈,測(cè)試

訪問(wèn)地址:http://localhost:8080/amqp/spittleMsg生產(chǎn)了10條消息,此時(shí)查看控制臺(tái)10條消息都被消費(fèi)了库快!

image.png

項(xiàng)目地址:https://gitee.com/hrabbit/spring-rabbitMQ

系列文章:

RabbitMQ:RabbitMQ-理論基礎(chǔ)
RabbitMQ:RabbitMQ:快速入門hello word
RabbitMQ:RabbitMQ:work queues 工作隊(duì)列(Round-robin/Fair dispatch)
RabbitMQ:RabbitMQ:消息應(yīng)答與消息持久化
RabbitMQ:發(fā)布/訂閱 Publish/Subscribe
RabbitMQ:路由Routing
RabbitMQ:Topic類型的exchange
RabbitMQ:RabbitMQ之消息確認(rèn)機(jī)制(事務(wù)+Confirm)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市湿蛔,隨后出現(xiàn)的幾起案子县爬,更是在濱河造成了極大的恐慌,老刑警劉巖察迟,帶你破解...
    沈念sama閱讀 222,627評(píng)論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件扎瓶,死亡現(xiàn)場(chǎng)離奇詭異泌枪,居然都是意外死亡碌燕,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,180評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)慈鸠,“玉大人青团,你說(shuō)我怎么就攤上這事÷簦” “怎么了?”我有些...
    開封第一講書人閱讀 169,346評(píng)論 0 362
  • 文/不壞的土叔 我叫張陵胖腾,是天一觀的道長(zhǎng)烟零。 經(jīng)常有香客問(wèn)我瘪松,道長(zhǎng)咸作,這世上最難降的妖魔是什么锨阿? 我笑而不...
    開封第一講書人閱讀 60,097評(píng)論 1 300
  • 正文 為了忘掉前任,我火速辦了婚禮记罚,結(jié)果婚禮上墅诡,老公的妹妹穿的比我還像新娘。我一直安慰自己桐智,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,100評(píng)論 6 398
  • 文/花漫 我一把揭開白布说庭。 她就那樣靜靜地躺著然磷,像睡著了一般。 火紅的嫁衣襯著肌膚如雪刊驴。 梳的紋絲不亂的頭發(fā)上姿搜,一...
    開封第一講書人閱讀 52,696評(píng)論 1 312
  • 那天,我揣著相機(jī)與錄音捆憎,去河邊找鬼舅柜。 笑死,一個(gè)胖子當(dāng)著我的面吹牛躲惰,可吹牛的內(nèi)容都是我干的致份。 我是一名探鬼主播,決...
    沈念sama閱讀 41,165評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼础拨,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼氮块!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起诡宗,我...
    開封第一講書人閱讀 40,108評(píng)論 0 277
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤雇锡,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后僚焦,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體锰提,經(jīng)...
    沈念sama閱讀 46,646評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,709評(píng)論 3 342
  • 正文 我和宋清朗相戀三年芳悲,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了立肘。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,861評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡名扛,死狀恐怖谅年,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情肮韧,我是刑警寧澤融蹂,帶...
    沈念sama閱讀 36,527評(píng)論 5 351
  • 正文 年R本政府宣布旺订,位于F島的核電站,受9級(jí)特大地震影響超燃,放射性物質(zhì)發(fā)生泄漏区拳。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,196評(píng)論 3 336
  • 文/蒙蒙 一意乓、第九天 我趴在偏房一處隱蔽的房頂上張望樱调。 院中可真熱鬧,春花似錦届良、人聲如沸笆凌。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,698評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)乞而。三九已至,卻和暖如春慢显,著一層夾襖步出監(jiān)牢的瞬間爪模,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,804評(píng)論 1 274
  • 我被黑心中介騙來(lái)泰國(guó)打工鳍怨, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留呻右,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 49,287評(píng)論 3 379
  • 正文 我出身青樓鞋喇,卻偏偏與公主長(zhǎng)得像声滥,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子侦香,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,860評(píng)論 2 361

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理落塑,服務(wù)發(fā)現(xiàn),斷路器罐韩,智...
    卡卡羅2017閱讀 134,716評(píng)論 18 139
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 15,943評(píng)論 2 11
  • Spring Boot 參考指南 介紹 轉(zhuǎn)載自:https://www.gitbook.com/book/qbgb...
    毛宇鵬閱讀 46,867評(píng)論 6 342
  • 要加“m”說(shuō)明是MB憾赁,否則就是KB了. -Xms:初始值 -Xmx:最大值 -Xmn:最小值 java -Xms1...
    阿B和阿C閱讀 7,347評(píng)論 0 7
  • 前言 在微服務(wù)架構(gòu)的系統(tǒng)中,我們通常會(huì)使用輕量級(jí)的消息代理來(lái)構(gòu)建一個(gè)共用的消息主題讓系統(tǒng)中所有微服務(wù)實(shí)例都連接上來(lái)...
    Chandler_玨瑜閱讀 6,586評(píng)論 2 39