1钥组、前言簡(jiǎn)介
RabbitMQ是一個(gè)開源的AMQP實(shí)現(xiàn)输硝,服務(wù)器端用Erlang語(yǔ)言編寫,支持多種客戶端者铜,如:Python腔丧、Ruby、.NET作烟、Java、JMS砾医、C拿撩、PHP、ActionScript如蚜、XMPP压恒、STOMP等,支持AJAX错邦。用于在分布式系統(tǒng)中存儲(chǔ)轉(zhuǎn)發(fā)消息探赫,在易用性、擴(kuò)展性撬呢、高可用性等方面表現(xiàn)不俗伦吠。
此工程是基于之前公司的Java架構(gòu)SpringBoot2.x+MyBatis+RabbitMQ進(jìn)行改寫開發(fā)過(guò)來(lái)的案例。
整個(gè)工程架構(gòu)是基于Kotlin +SpringBoot + MyBatis完美搭建最簡(jiǎn)潔最酷的前后端分離框架
2、RabbitMQ安裝
RabbitMQ安裝過(guò)程請(qǐng)參考CentOS-7下安裝rabbitMQ-3.7.3與集群搭建
3毛仪、RabbitMQ協(xié)議
RabbitMQ是基于AMQP協(xié)議(Advanced Message Queuing Protocol)高級(jí)消息隊(duì)列協(xié)議搁嗓,是應(yīng)用層協(xié)議的一個(gè)開放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì)箱靴。消息中間件主要用于組件之間的解耦腺逛,消息的發(fā)送者無(wú)需知道消息使用者的存在,反之亦然衡怀。
AMQP的主要特征是面向消息棍矛、隊(duì)列、路由(包括點(diǎn)對(duì)點(diǎn)和發(fā)布/訂閱)抛杨、可靠性茄靠、安全。
4蝶桶、為什么要用RabbitMQ(場(chǎng)景)慨绳?
1、當(dāng)處理一些第三方的接口的時(shí)候可以選擇它真竖,比如:在處理訂單的時(shí)候較為常見脐雪。就用親身經(jīng)歷參與的開發(fā)過(guò)的一個(gè)物流撮合平臺(tái)來(lái)說(shuō),當(dāng)初就大量使用MQ處理不同業(yè)務(wù)的場(chǎng)景(異步下單到第三方系統(tǒng)恢共、延遲取消訂單等)
2战秋、系統(tǒng)之間進(jìn)行高度解偶
3、流量削峰讨韭,比如:秒殺活動(dòng)脂信,流量過(guò)大時(shí),容易導(dǎo)致應(yīng)用掛掉,為了解決這個(gè)問(wèn)題透硝,一般在應(yīng)用加入消息隊(duì)列來(lái)緩解短時(shí)間的負(fù)載
4狰闪、擁有異步處理機(jī)制.
5、MQ擁有良好的隊(duì)列算法濒生,有先進(jìn)先出的特性埋泵,在處理第三方下單和處理延遲取消訂單的等更好不過(guò)了,可以減少傳統(tǒng)做法掃描全表取消訂單罪治。
生活場(chǎng)景: 消息隊(duì)列(Message丽声、Queue)就好比廣州的brt的公交車、每天的車(來(lái)來(lái)來(lái)往往)都會(huì)在一條道路上面行走偶遇觉义、而這個(gè)道路類似MQ的Channel(io管道)當(dāng)紅綠燈的紅燈亮的時(shí)候雁社、道路就開始擁擠阻塞、當(dāng)綠燈亮起的時(shí)晒骇、排在最前車輛會(huì)優(yōu)先行駛走出這條道路霉撵、這就是MQ的先進(jìn)先出的特性原則磺浙、除非開車的人不按常規(guī)行駛、這時(shí)運(yùn)氣不好的話可能會(huì)進(jìn)入try喊巍、catch處理屠缭。
5、MQ技術(shù)有哪些
6崭参、Kotlin代碼實(shí)現(xiàn)
-
RabbitMQ依賴jar
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
-
關(guān)鍵代碼類
-
RabbitMQProperties.kt 代碼
EnableConfigurationProperties支持下滑線讀取配置的屬性,Value注解不支持
@Configuration
@ConfigurationProperties(prefix = "rabbitmq")
open class RabbitMQProperties {
// MQ的主機(jī)
var host: String = ""
// 主機(jī)端口號(hào)
var port: Int = 5672
// 用戶名
var username: String = ""
// 密碼
var password: String = ""
// 虛擬主機(jī)路徑
/**EnableConfigurationProperties支持下滑線讀取配置的屬性,Value注解不支持 */
var virtualHost: String = ""
}
-
RabbitConstants.kt 代碼
/**
*常量
*/
open class RabbitConstants {
//伴生靜態(tài)對(duì)象呵曹,const val不可以修改的常量
companion object {
//交換器名稱
const val TOPIC_EXCHANGE = "topic_exchange";
//queue名稱
const val USER_QUEUE = "user_queue";
//route_key名稱
const val USER_QUEUE_ROUTE_KEY = "user_queue_route_key";
}
}
-
RabbitTemplateClient 代碼
- object是kotlin的關(guān)鍵字所以要用引號(hào)
@Component
open class RabbitTemplateClient {
@Autowired lateinit var amqpTemplate: AmqpTemplate
companion object {
private val log: Logger = LoggerFactory.getLogger(RabbitTemplateClient::class.java)
}
/**
* @Description 生產(chǎn)者發(fā)送消息
* @Author liangjl
* @Date 2018年5月10日 下午5:57:43
* @param content 生產(chǎn)者的內(nèi)容
* @param exchangeName 交換機(jī)名稱
* @param routingKey 路由key
* @return void 返回類型
*/
fun sendMessage(`object`: Any, exchangeName: String, routingKey: String) {
var properties = MessagePropertiesBuilder.newInstance().setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
var message = Message(SerializationUtils.serialize(`object`), properties);
this.amqpTemplate.convertAndSend(exchangeName, routingKey, message);
}
/**
* @Description 發(fā)送消息
* @Author liangjl
* @Date 2018年6月4日 下午6:35:36
* @param object
* @param queueName 參數(shù)
* @return void 返回類型
*/
fun sendMessage(`object`: Object, queueName: String) {
//序列化對(duì)象
this.amqpTemplate.convertAndSend(queueName, SerializationUtils.serialize(`object`));
}
/**
* @Description basic
* @Author liangjl
* @Date 2018年6月4日 下午8:11:56
* @param @param message
* @param @param channel
* @param @param tipMsg 參數(shù)
* @return void 返回類型
*/
fun basic(message: Message, channel: Channel, tipMsg: String) {
try {
// 消息的標(biāo)識(shí),false只確認(rèn)當(dāng)前一個(gè)消息收到何暮,true確認(rèn)所有consumer獲得的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (e: IOException) {
log.error("【" + tipMsg + "】RabbitMQ消費(fèi)者執(zhí)行,確認(rèn)回答出現(xiàn)異常奄喂,異常信息為:" + e.message);
try {
// ack返回false,并重新回到隊(duì)列海洼,
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
} catch (e1: IOException) {
e1.printStackTrace();
}
} finally {
try {
//釋放資源
if (channel != null) {
channel.close();
}
} catch (e: Exception) {
e.printStackTrace();
}
}
}
}
-
RabbitMQConfiguration代碼
@Component
@Configuration
@EnableConfigurationProperties(RabbitMQProperties::class)
open class RabbitMQConfiguration {
@Autowired lateinit var rabbitMQProperties:RabbitMQProperties
// MQ的主機(jī)
var host: String = ""
// 主機(jī)端口號(hào)
var port: Int = 5672
// 用戶名
var username: String = ""
// 密碼
var password: String = ""
// 虛擬主機(jī)路徑
/**EnableConfigurationProperties支持下滑線讀取配置的屬性,Value注解不支持 */
var virtualHost: String = ""
//無(wú)參數(shù)構(gòu)造方法
constructor()
//有參數(shù)的構(gòu)造方法
constructor(host: String,port: Int,username: String,password: String,virtualHost: String){
this.host = host
this.port = port
this.username = username
this.password =password
this.virtualHost= virtualHost
}
/**
* 在創(chuàng)建了多個(gè)ConnectionFactory時(shí)跨新,必須定義RabbitAdmin,
否則無(wú)法自動(dòng)創(chuàng)建exchange,queue
* @param connectionFactory
*/
@Bean(name = arrayOf("rabbitAdmin"))
@Qualifier("rabbitAdmin")
open fun rabbitAdmin(@Qualifier("connectionFactory") connectionFactory: ConnectionFactory): RabbitAdmin {
var rabbitAdmin = RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
/**
* 在創(chuàng)建了多個(gè)ConnectionFactory
* @param connectionFactory
*/
@Bean(name = arrayOf("connectionFactory"))
open fun connectionFactory(): ConnectionFactory {
var connectionFactory = CachingConnectionFactory();
connectionFactory.setHost(rabbitMQProperties.host);
connectionFactory.setPort(rabbitMQProperties.port);
connectionFactory.setUsername(rabbitMQProperties.username);
connectionFactory.setPassword(rabbitMQProperties.password);
connectionFactory.setVirtualHost(rabbitMQProperties.virtualHost);
connectionFactory.setConnectionTimeout(1000 * 20);
return connectionFactory;
}
///////////////////////////////////////////////////////
//////////////////////// Exchange ////////////
/////////////////////////////////////////////////////
/**
* @Description Topic交換器
* @Author liangjl
* @Date 2018年6月4日 下午5:49:01
* @return TopicExchange 返回類型
*/
@Bean(name = arrayOf("topicExchange"))
open fun topicExchange(@Qualifier("rabbitAdmin") rabbitAdmin: RabbitAdmin): T
opicExchange {
var exchange = TopicExchange(RabbitConstants.TOPIC_EXCHANGE);
exchange.setShouldDeclare(true);
exchange.setAdminsThatShouldDeclare(rabbitAdmin);
return exchange;
}
///////////////////////////////////////////////////////
//////////////////////// Queue ////////////////
/////////////////////////////////////////////////////
@Bean(name = arrayOf("userQueue"))
open fun userQueue(@Qualifier("rabbitAdmin") rabbitAdmin:RabbitAdmin):
Queue {
// 隊(duì)列持久
var queue = Queue(RabbitConstants.USER_QUEUE, true);
queue.setAdminsThatShouldDeclare(rabbitAdmin);
queue.setShouldDeclare(true);
return queue;
}
///////////////////////////////////////////////////////
//////////////////////// Binding////////////////
/////////////////////////////////////////////////////
@Bean
open fun bindingUserQueue(@Qualifier("userQueue")queue:Queue,
@Qualifier("topicExchange")topicExchange:TopicExchange):Binding {
// 路由Key
var routeKey = RabbitConstants.USER_QUEUE_ROUTE_KEY;
return BindingBuilder.bind(queue).to(topicExchange).with(routeKey);
}
}
-
META-INF的spring.factories代碼
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.flong.kotlin.core.rabbitmq.RabbitMQAutoConfiguration
-
RabbitMQAutoConfiguration代碼
【SpringBoot精髓所在】:
1坏逢、在springboot啟動(dòng)加載大量的自動(dòng)配置
2域帐、我們寫好自動(dòng)配置類交給EnableAutoConfiguration進(jìn)行處理
3、然后交給容器自動(dòng)裝配類添加到組件時(shí)候會(huì)從META-INF的spring.factories讀取相關(guān)屬性
4是整、一般有XXProperties肖揣,這個(gè)類是處理配置的屬性自動(dòng)的key-value
@EnableConfigurationProperties(RabbitMQProperties::class)
open class RabbitMQAutoConfiguration {
@Autowired lateinit var rabbitMQProperties: RabbitMQProperties;
@Bean
@ConditionalOnMissingBean(RabbitMQConfiguration::class)
@ConditionalOnProperty(name = arrayOf("rabbitmq.host"))
open fun rabbitMQTemplateClient():RabbitMQConfiguration {
return RabbitMQConfiguration(rabbitMQProperties.host,
rabbitMQProperties.port,
rabbitMQProperties.username,rabbitMQProperties.password
,rabbitMQProperties.virtualHost )
}
}
-
RabbitMq連接配置
rabbitmq.host=127.0.0.1
rabbitmq.port=5672
rabbitmq.username=admin
rabbitmq.password=admin
rabbitmq.virtual-host=/rabbitmq
-
Provider提供者或生產(chǎn)者
@Service
open class UserProviderService {
companion object {
private val log: Logger = LoggerFactory.getLogger(UserProviderService::class.java)
}
@Autowired lateinit var rabbitTemplateClient :RabbitTemplateClient
/**
* @Description 異步處理MQ生產(chǎn)者
* @Author liangjl
* @Date 2018年5月10日 下午8:01:02
* @param orderDataList 參數(shù)
* @return void 返回類型
*/
fun syncUserQueue(objectVo: Any) {
rabbitTemplateClient.sendMessage(objectVo, RabbitConstants.TOPIC_EXCHANGE, RabbitConstants.USER_QUEUE_ROUTE_KEY);
log.info("[syncUserQueue]RabbitMQ生產(chǎn)者執(zhí)行完成,完成時(shí)間為:" + SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
}
}
-
Consumer消費(fèi)者
@Service
open class UserConsumerService {
companion object{
private val log: Logger = LoggerFactory.getLogger(UserConsumerService::class.java)
}
@RabbitHandler
@RabbitListener(queues = arrayOf(RabbitConstants.USER_QUEUE))
fun syncUserQueue(message:Message, channel: Channel ) {
try {
var user = SerializationUtils.deserialize(message.getBody());//反序列化
println("rabbitmq獲取到的用戶信息為:" + user )
} catch ( e:Exception) {
log.error("syncUserQueue異常:" + e.message);
}finally{
if(channel != null){
channel.close()
}
}
}
}
-
Controller代碼
@Autowired private lateinit var userProviderService :UserProviderService
//rabbitMq簡(jiǎn)單測(cè)試
@RequestMapping("/rabbitMq/{userId}")
fun rabbitMq(@PathVariable("userId") userId:Long ){
var user = userService.getUserId(userId);
userProviderService.syncUserQueue(user.toString())
}
-
運(yùn)行結(jié)果
2019-06-14 15:07:49.316 INFO 1996 --- [nio-8080-exec-4] c.f.k.m.r.provider.UserProviderService : [syncUserQueue]RabbitMQ生產(chǎn)者執(zhí)行完成浮入,完成時(shí)間為:2019-06-14 15:07:49.316
rabbitmq獲取到的用戶信息為:[User(userId = 12345678,userName = 小梁, passWord=1,isDeleted=0,createTime=Fri Jun 07 02:22:01 CST 2019),]
7 龙优、工程架構(gòu)源代碼
8 、總結(jié)與建議
1 事秀、以上問(wèn)題根據(jù)搭建 kotlin與RabbitMQ實(shí)際情況進(jìn)行總結(jié)整理彤断,除了技術(shù)問(wèn)題查很多網(wǎng)上資料,通過(guò)自身進(jìn)行學(xué)習(xí)之后梳理與分享易迹。
2宰衙、 在學(xué)習(xí)過(guò)程中也遇到很多困難和疑點(diǎn),如有問(wèn)題或誤點(diǎn)赴蝇,望各位老司機(jī)多多指出或者提出建議菩浙、留言。本人會(huì)采納各種好建議和正確方式不斷完善現(xiàn)況句伶,人在成長(zhǎng)過(guò)程中的需要優(yōu)質(zhì)的養(yǎng)料攒庵。
3晨仑、 希望此文章能幫助各位老鐵們更好去了解如何在 kotlin上搭建RabbitMQ彼哼,也希望您看了此文檔或者通過(guò)找資料親身經(jīng)歷實(shí)操學(xué)習(xí)效果會(huì)更好吐辙。
備注:此文章屬于本人原創(chuàng),歡迎轉(zhuǎn)載和收藏.