Kotlin+SpringBoot與RabbitMQ整合詳解

1钥组、前言簡(jiǎn)介

  • RabbitMQ是一個(gè)開源的AMQP實(shí)現(xiàn)输硝,服務(wù)器端用Erlang語(yǔ)言編寫,支持多種客戶端者铜,如:Python腔丧、Ruby、.NET作烟、Java、JMS砾医、C拿撩、PHP、ActionScript如蚜、XMPP压恒、STOMP等,支持AJAX错邦。用于在分布式系統(tǒng)中存儲(chǔ)轉(zhuǎn)發(fā)消息探赫,在易用性、擴(kuò)展性撬呢、高可用性等方面表現(xiàn)不俗伦吠。

  • 此工程是基于之前公司的Java架構(gòu)SpringBoot2.x+MyBatis+RabbitMQ進(jìn)行改寫開發(fā)過(guò)來(lái)的案例。

  • 整個(gè)工程架構(gòu)是基于Kotlin +SpringBoot + MyBatis完美搭建最簡(jiǎn)潔最酷的前后端分離框架

2、RabbitMQ安裝

3毛仪、RabbitMQ協(xié)議

  • RabbitMQ是基于AMQP協(xié)議(Advanced Message Queuing Protocol)高級(jí)消息隊(duì)列協(xié)議搁嗓,是應(yīng)用層協(xié)議的一個(gè)開放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì)箱靴。消息中間件主要用于組件之間的解耦腺逛,消息的發(fā)送者無(wú)需知道消息使用者的存在,反之亦然衡怀。

  • AMQP的主要特征是面向消息棍矛、隊(duì)列、路由(包括點(diǎn)對(duì)點(diǎn)和發(fā)布/訂閱)抛杨、可靠性茄靠、安全。

4蝶桶、為什么要用RabbitMQ(場(chǎng)景)慨绳?

  • 1、當(dāng)處理一些第三方的接口的時(shí)候可以選擇它真竖,比如:在處理訂單的時(shí)候較為常見脐雪。就用親身經(jīng)歷參與的開發(fā)過(guò)的一個(gè)物流撮合平臺(tái)來(lái)說(shuō),當(dāng)初就大量使用MQ處理不同業(yè)務(wù)的場(chǎng)景(異步下單到第三方系統(tǒng)恢共、延遲取消訂單等)

  • 2战秋、系統(tǒng)之間進(jìn)行高度解偶

  • 3、流量削峰讨韭,比如:秒殺活動(dòng)脂信,流量過(guò)大時(shí),容易導(dǎo)致應(yīng)用掛掉,為了解決這個(gè)問(wèn)題透硝,一般在應(yīng)用加入消息隊(duì)列來(lái)緩解短時(shí)間的負(fù)載

  • 4狰闪、擁有異步處理機(jī)制.

  • 5、MQ擁有良好的隊(duì)列算法濒生,有先進(jìn)先出的特性埋泵,在處理第三方下單和處理延遲取消訂單的等更好不過(guò)了,可以減少傳統(tǒng)做法掃描全表取消訂單罪治。

生活場(chǎng)景: 消息隊(duì)列(Message丽声、Queue)就好比廣州的brt的公交車、每天的車(來(lái)來(lái)來(lái)往往)都會(huì)在一條道路上面行走偶遇觉义、而這個(gè)道路類似MQ的Channel(io管道)當(dāng)紅綠燈的紅燈亮的時(shí)候雁社、道路就開始擁擠阻塞、當(dāng)綠燈亮起的時(shí)晒骇、排在最前車輛會(huì)優(yōu)先行駛走出這條道路霉撵、這就是MQ的先進(jìn)先出的特性原則磺浙、除非開車的人不按常規(guī)行駛、這時(shí)運(yùn)氣不好的話可能會(huì)進(jìn)入try喊巍、catch處理屠缭。

  • 生活場(chǎng)景

5、MQ技術(shù)有哪些

MQ.png

6崭参、Kotlin代碼實(shí)現(xiàn)

  • RabbitMQ依賴jar

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 關(guān)鍵代碼類


    配置類
  • RabbitMQProperties.kt 代碼

EnableConfigurationProperties支持下滑線讀取配置的屬性,Value注解不支持

@Configuration
@ConfigurationProperties(prefix = "rabbitmq")
open class RabbitMQProperties {
    // MQ的主機(jī)
    var host: String = ""
    // 主機(jī)端口號(hào)
    var port: Int = 5672
    // 用戶名
    var username: String = ""
    // 密碼
    var password: String = ""
    // 虛擬主機(jī)路徑
    /**EnableConfigurationProperties支持下滑線讀取配置的屬性,Value注解不支持 */
    var virtualHost: String = ""
}
  • RabbitConstants.kt 代碼

/**
 *常量
 */
open class RabbitConstants {
    //伴生靜態(tài)對(duì)象呵曹,const val不可以修改的常量
    companion object {

        //交換器名稱
        const val TOPIC_EXCHANGE        = "topic_exchange";

        //queue名稱
        const val USER_QUEUE            = "user_queue";

        //route_key名稱
        const val USER_QUEUE_ROUTE_KEY  = "user_queue_route_key";
    }
}
  • RabbitTemplateClient 代碼

  • object是kotlin的關(guān)鍵字所以要用引號(hào)
@Component
open class RabbitTemplateClient {
    @Autowired lateinit var amqpTemplate: AmqpTemplate
    companion object {
        private val log: Logger = LoggerFactory.getLogger(RabbitTemplateClient::class.java)
    }

    /**
     * @Description 生產(chǎn)者發(fā)送消息
     * @Author    liangjl
     * @Date    2018年5月10日 下午5:57:43
     * @param content 生產(chǎn)者的內(nèi)容
     * @param exchangeName 交換機(jī)名稱
     * @param routingKey 路由key
     * @return void 返回類型
     */
    fun sendMessage(`object`: Any, exchangeName: String, routingKey: String) {
        var properties = MessagePropertiesBuilder.newInstance().setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
        var message = Message(SerializationUtils.serialize(`object`), properties);
        this.amqpTemplate.convertAndSend(exchangeName, routingKey, message);
    }

    /**
     * @Description 發(fā)送消息
     * @Author     liangjl
     * @Date      2018年6月4日 下午6:35:36
     * @param object
     * @param queueName 參數(shù)
     * @return void 返回類型
     */
    fun sendMessage(`object`: Object, queueName: String) {
        //序列化對(duì)象
        this.amqpTemplate.convertAndSend(queueName, SerializationUtils.serialize(`object`));
    }
    /**
     * @Description basic
     * @Author        liangjl
     * @Date        2018年6月4日 下午8:11:56
     * @param @param message
     * @param @param channel
     * @param @param tipMsg 參數(shù)
     * @return void 返回類型
     */
    fun basic(message: Message, channel: Channel, tipMsg: String) {
        try {
            // 消息的標(biāo)識(shí),false只確認(rèn)當(dāng)前一個(gè)消息收到何暮,true確認(rèn)所有consumer獲得的消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (e: IOException) {
            log.error("【" + tipMsg + "】RabbitMQ消費(fèi)者執(zhí)行,確認(rèn)回答出現(xiàn)異常奄喂,異常信息為:" + e.message);
            try {
                // ack返回false,并重新回到隊(duì)列海洼,
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            } catch (e1: IOException) {
                e1.printStackTrace();
            }
        } finally {
            try {
                //釋放資源
                if (channel != null) {
                    channel.close();
                }
            } catch (e: Exception) {
                e.printStackTrace();
            }
        }
    }
}
  • RabbitMQConfiguration代碼

@Component
@Configuration
@EnableConfigurationProperties(RabbitMQProperties::class)
open class RabbitMQConfiguration {

    @Autowired lateinit var rabbitMQProperties:RabbitMQProperties
    
    // MQ的主機(jī)
    var host: String = ""
    // 主機(jī)端口號(hào)
    var port: Int = 5672
    // 用戶名
    var username: String = ""
    // 密碼
    var password: String = ""
    // 虛擬主機(jī)路徑
    /**EnableConfigurationProperties支持下滑線讀取配置的屬性,Value注解不支持 */
    var virtualHost: String = ""
    
    //無(wú)參數(shù)構(gòu)造方法
    constructor()
    
    //有參數(shù)的構(gòu)造方法
    constructor(host: String,port: Int,username: String,password: String,virtualHost: String){
        this.host = host
        this.port = port
        this.username = username
        this.password =password
        this.virtualHost= virtualHost
    }
    
    /**
     * 在創(chuàng)建了多個(gè)ConnectionFactory時(shí)跨新,必須定義RabbitAdmin,
否則無(wú)法自動(dòng)創(chuàng)建exchange,queue
     * @param connectionFactory
     */
    @Bean(name = arrayOf("rabbitAdmin"))
    @Qualifier("rabbitAdmin")
    open fun rabbitAdmin(@Qualifier("connectionFactory") connectionFactory: ConnectionFactory): RabbitAdmin {
        var rabbitAdmin = RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
    /**
     * 在創(chuàng)建了多個(gè)ConnectionFactory 
     * @param connectionFactory
     */
    @Bean(name = arrayOf("connectionFactory"))
    open fun connectionFactory(): ConnectionFactory {
        var connectionFactory =  CachingConnectionFactory();
        connectionFactory.setHost(rabbitMQProperties.host);
        connectionFactory.setPort(rabbitMQProperties.port);
        connectionFactory.setUsername(rabbitMQProperties.username);
        connectionFactory.setPassword(rabbitMQProperties.password);
        connectionFactory.setVirtualHost(rabbitMQProperties.virtualHost);
        connectionFactory.setConnectionTimeout(1000 * 20);
        return connectionFactory;
    }
    
    ///////////////////////////////////////////////////////
    //////////////////////// Exchange ////////////
    /////////////////////////////////////////////////////
    /**
     * @Description Topic交換器
     * @Author liangjl
     * @Date 2018年6月4日 下午5:49:01
     * @return TopicExchange 返回類型
     */
    @Bean(name = arrayOf("topicExchange"))
    open fun topicExchange(@Qualifier("rabbitAdmin") rabbitAdmin: RabbitAdmin): T
opicExchange {
        var exchange = TopicExchange(RabbitConstants.TOPIC_EXCHANGE);
        exchange.setShouldDeclare(true);
        exchange.setAdminsThatShouldDeclare(rabbitAdmin);
        return exchange;
    }

    ///////////////////////////////////////////////////////
    //////////////////////// Queue ////////////////
    /////////////////////////////////////////////////////
    
    @Bean(name = arrayOf("userQueue"))
    open fun userQueue(@Qualifier("rabbitAdmin")  rabbitAdmin:RabbitAdmin):
Queue {
        // 隊(duì)列持久
        var queue = Queue(RabbitConstants.USER_QUEUE, true);
        queue.setAdminsThatShouldDeclare(rabbitAdmin);
        queue.setShouldDeclare(true);
        return queue;
    }

    ///////////////////////////////////////////////////////
    //////////////////////// Binding////////////////
    /////////////////////////////////////////////////////
    
    @Bean
    open fun bindingUserQueue(@Qualifier("userQueue")queue:Queue, 
@Qualifier("topicExchange")topicExchange:TopicExchange):Binding {
        // 路由Key
        var routeKey = RabbitConstants.USER_QUEUE_ROUTE_KEY;
        return BindingBuilder.bind(queue).to(topicExchange).with(routeKey);
    }
    
}
  • META-INF的spring.factories代碼

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.flong.kotlin.core.rabbitmq.RabbitMQAutoConfiguration
  • RabbitMQAutoConfiguration代碼

【SpringBoot精髓所在】:
1坏逢、在springboot啟動(dòng)加載大量的自動(dòng)配置
2域帐、我們寫好自動(dòng)配置類交給EnableAutoConfiguration進(jìn)行處理
3、然后交給容器自動(dòng)裝配類添加到組件時(shí)候會(huì)從META-INF的spring.factories讀取相關(guān)屬性
4是整、一般有XXProperties肖揣,這個(gè)類是處理配置的屬性自動(dòng)的key-value

@EnableConfigurationProperties(RabbitMQProperties::class)
open class RabbitMQAutoConfiguration {
    
    @Autowired lateinit var rabbitMQProperties: RabbitMQProperties;
    
    @Bean
    @ConditionalOnMissingBean(RabbitMQConfiguration::class)
    @ConditionalOnProperty(name = arrayOf("rabbitmq.host"))
    open fun rabbitMQTemplateClient():RabbitMQConfiguration {
        return RabbitMQConfiguration(rabbitMQProperties.host,
rabbitMQProperties.port,
            rabbitMQProperties.username,rabbitMQProperties.password
,rabbitMQProperties.virtualHost )
    }
}
  • RabbitMq連接配置

rabbitmq.host=127.0.0.1
rabbitmq.port=5672
rabbitmq.username=admin
rabbitmq.password=admin
rabbitmq.virtual-host=/rabbitmq
  • Provider提供者或生產(chǎn)者

@Service
open class UserProviderService {
    companion object {
        private val log: Logger = LoggerFactory.getLogger(UserProviderService::class.java)
    }
    
    @Autowired lateinit var rabbitTemplateClient :RabbitTemplateClient

    /**
     * @Description 異步處理MQ生產(chǎn)者
     * @Author    liangjl
     * @Date    2018年5月10日 下午8:01:02
     * @param orderDataList 參數(shù)
     * @return void 返回類型
     */
    fun syncUserQueue(objectVo: Any) {
        rabbitTemplateClient.sendMessage(objectVo, RabbitConstants.TOPIC_EXCHANGE, RabbitConstants.USER_QUEUE_ROUTE_KEY);
        log.info("[syncUserQueue]RabbitMQ生產(chǎn)者執(zhí)行完成,完成時(shí)間為:" + SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
    }

}
  • Consumer消費(fèi)者

@Service
open class UserConsumerService {
    
    companion object{
        private val log: Logger = LoggerFactory.getLogger(UserConsumerService::class.java)
    }
    
    
    @RabbitHandler
    @RabbitListener(queues = arrayOf(RabbitConstants.USER_QUEUE))
    fun syncUserQueue(message:Message, channel: Channel ) {
        try {
            var user = SerializationUtils.deserialize(message.getBody());//反序列化
            
            println("rabbitmq獲取到的用戶信息為:" + user )
     
        } catch ( e:Exception) {
            log.error("syncUserQueue異常:" + e.message);
        }finally{
            
            if(channel != null){
                channel.close()
            }
        }
    }
}
  • Controller代碼

@Autowired private lateinit var userProviderService :UserProviderService
//rabbitMq簡(jiǎn)單測(cè)試
    @RequestMapping("/rabbitMq/{userId}")
    fun rabbitMq(@PathVariable("userId") userId:Long ){
        var user = userService.getUserId(userId);
        userProviderService.syncUserQueue(user.toString())
    }
  • 運(yùn)行結(jié)果

2019-06-14 15:07:49.316 INFO 1996 --- [nio-8080-exec-4] c.f.k.m.r.provider.UserProviderService : [syncUserQueue]RabbitMQ生產(chǎn)者執(zhí)行完成浮入,完成時(shí)間為:2019-06-14 15:07:49.316

rabbitmq獲取到的用戶信息為:[User(userId = 12345678,userName = 小梁, passWord=1,isDeleted=0,createTime=Fri Jun 07 02:22:01 CST 2019),]

  • 運(yùn)行結(jié)果.png

7 龙优、工程架構(gòu)源代碼

Kotlin+SpringBoot與RabbitMQ整合工程源代碼

8 、總結(jié)與建議

1 事秀、以上問(wèn)題根據(jù)搭建 kotlin與RabbitMQ實(shí)際情況進(jìn)行總結(jié)整理彤断,除了技術(shù)問(wèn)題查很多網(wǎng)上資料,通過(guò)自身進(jìn)行學(xué)習(xí)之后梳理與分享易迹。

2宰衙、 在學(xué)習(xí)過(guò)程中也遇到很多困難和疑點(diǎn),如有問(wèn)題或誤點(diǎn)赴蝇,望各位老司機(jī)多多指出或者提出建議菩浙、留言。本人會(huì)采納各種好建議和正確方式不斷完善現(xiàn)況句伶,人在成長(zhǎng)過(guò)程中的需要優(yōu)質(zhì)的養(yǎng)料攒庵。

3晨仑、 希望此文章能幫助各位老鐵們更好去了解如何在 kotlin上搭建RabbitMQ彼哼,也希望您看了此文檔或者通過(guò)找資料親身經(jīng)歷實(shí)操學(xué)習(xí)效果會(huì)更好吐辙。

備注:此文章屬于本人原創(chuàng),歡迎轉(zhuǎn)載和收藏.

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末绪杏,一起剝皮案震驚了整個(gè)濱河市樟遣,隨后出現(xiàn)的幾起案子淑翼,更是在濱河造成了極大的恐慌塔粒,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,290評(píng)論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件身冬,死亡現(xiàn)場(chǎng)離奇詭異衅胀,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)酥筝,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,107評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門滚躯,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人嘿歌,你說(shuō)我怎么就攤上這事掸掏。” “怎么了宙帝?”我有些...
    開封第一講書人閱讀 156,872評(píng)論 0 347
  • 文/不壞的土叔 我叫張陵丧凤,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我步脓,道長(zhǎng)愿待,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,415評(píng)論 1 283
  • 正文 為了忘掉前任靴患,我火速辦了婚禮仍侥,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘蚁廓。我一直安慰自己访圃,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,453評(píng)論 6 385
  • 文/花漫 我一把揭開白布相嵌。 她就那樣靜靜地躺著腿时,像睡著了一般。 火紅的嫁衣襯著肌膚如雪饭宾。 梳的紋絲不亂的頭發(fā)上批糟,一...
    開封第一講書人閱讀 49,784評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音看铆,去河邊找鬼徽鼎。 笑死,一個(gè)胖子當(dāng)著我的面吹牛弹惦,可吹牛的內(nèi)容都是我干的否淤。 我是一名探鬼主播,決...
    沈念sama閱讀 38,927評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼棠隐,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼石抡!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起助泽,我...
    開封第一講書人閱讀 37,691評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤啰扛,失蹤者是張志新(化名)和其女友劉穎嚎京,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體隐解,經(jīng)...
    沈念sama閱讀 44,137評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡鞍帝,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,472評(píng)論 2 326
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了煞茫。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片帕涌。...
    茶點(diǎn)故事閱讀 38,622評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖溜嗜,靈堂內(nèi)的尸體忽然破棺而出宵膨,到底是詐尸還是另有隱情,我是刑警寧澤炸宵,帶...
    沈念sama閱讀 34,289評(píng)論 4 329
  • 正文 年R本政府宣布辟躏,位于F島的核電站,受9級(jí)特大地震影響土全,放射性物質(zhì)發(fā)生泄漏捎琐。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,887評(píng)論 3 312
  • 文/蒙蒙 一裹匙、第九天 我趴在偏房一處隱蔽的房頂上張望瑞凑。 院中可真熱鬧,春花似錦概页、人聲如沸籽御。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,741評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)技掏。三九已至,卻和暖如春项鬼,著一層夾襖步出監(jiān)牢的瞬間哑梳,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評(píng)論 1 265
  • 我被黑心中介騙來(lái)泰國(guó)打工绘盟, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留鸠真,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,316評(píng)論 2 360
  • 正文 我出身青樓龄毡,卻偏偏與公主長(zhǎng)得像吠卷,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子沦零,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,490評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容

  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 15,892評(píng)論 2 11
  • 一撤嫩、前言 RabbitMQ是一個(gè)開源的消息代理軟件(面向消息的中間件),它的核心作用就是創(chuàng)建消息隊(duì)列蠢终,異步接收和發(fā)...
    Java中文社群_老王閱讀 949評(píng)論 0 50
  • 1寻拂、安裝 1.1程奠、Erlang: Erlang下載地址,下載后安裝即可祭钉。 1.2瞄沙、RabbitMQ安裝 Rabbi...
    木石前盟Caychen閱讀 873評(píng)論 0 12
  • 什么叫消息隊(duì)列? 消息(Message)是指在應(yīng)用間傳送的數(shù)據(jù)慌核。消息可以非常簡(jiǎn)單距境,比如只包含文本字符串,也可以更復(fù)...
    Agile_dev閱讀 2,370評(píng)論 0 24
  • 關(guān)于消息隊(duì)列垮卓,從前年開始斷斷續(xù)續(xù)看了些資料垫桂,想寫很久了,但一直沒(méi)騰出空粟按,近來(lái)分別碰到幾個(gè)朋友聊這塊的技術(shù)選型诬滩,是時(shí)...
    預(yù)流閱讀 584,522評(píng)論 51 785