MQ
Message Queue贯底,消息隊(duì)列,F(xiàn)IFO 結(jié)構(gòu)乍恐。
例如電商平臺(tái)聚谁,在用戶支付訂單后執(zhí)行對(duì)應(yīng)的操作母剥;
優(yōu)點(diǎn):
- 異步
- 削峰
- 解耦
缺點(diǎn)
- 增加系統(tǒng)復(fù)雜性
- 數(shù)據(jù)一致性
- 可用性
JMS
Java Message Service,Java消息服務(wù)形导,類似 JDBC 提供了訪問(wèn)數(shù)據(jù)庫(kù)的標(biāo)準(zhǔn)婿牍,JMS 也制定了一套系統(tǒng)間消息通信的規(guī)范暂衡;
區(qū)別于 JDBC锦茁,JDK 原生包中并未定義 JMS 相關(guān)接口寄症。
ConnectionFactory
Connection
Destination
Session
MessageConsumer
MessageProducer
Message
協(xié)作方式圖示為;
業(yè)界產(chǎn)品
ActiveMQ | RabbitMQ | RocketMQ | kafka | |
---|---|---|---|---|
單機(jī)吞吐量 | 萬(wàn)級(jí) | 萬(wàn)級(jí) | 10 萬(wàn)級(jí) | 10 萬(wàn)級(jí) |
可用性 | 高 | 高 | 非常高 | 非常高 |
可靠性 | 較低概率丟失消息 | 基本不丟 | 可以做到 0 丟失 | 可以做到 0 丟失 |
功能支持 | 較為完善 | 基于 erlang憔披,并發(fā)強(qiáng)等限,性能好爸吮,延時(shí)低 | 分布式芬膝,拓展性好,支持分布式事務(wù) | 較為簡(jiǎn)單形娇,主要應(yīng)用與大數(shù)據(jù)實(shí)時(shí)計(jì)算锰霜,日志采集等 |
社區(qū)活躍度 | 低 | 中 | 高 | 高 |
ActiveMQ
作為 Apache 下的開源項(xiàng)目,完全支持 JMS 規(guī)范桐早。并且 Spring Boot 內(nèi)置了 ActiveMQ 的自動(dòng)化配置癣缅,作為入門再適合不過(guò)。
快速開始
添加依賴哄酝;
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
消息發(fā)送友存;
// 1. 創(chuàng)建連接工廠
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 2. 工廠創(chuàng)建連接
Connection connection = factory.createConnection();
// 3. 啟動(dòng)連接
connection.start();
// 4. 創(chuàng)建連接會(huì)話session,第一個(gè)參數(shù)為是否在事務(wù)中處理陶衅,第二個(gè)參數(shù)為應(yīng)答模式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5. 根據(jù)session創(chuàng)建消息隊(duì)列目的地
Destination queue = session.createQueue("test-queue");
// 6. 根據(jù)session和目的地queue創(chuàng)建生產(chǎn)者
MessageProducer producer = session.createProducer(queue);
// 7. 根據(jù)session創(chuàng)建消息實(shí)體
Message message = session.createTextMessage("hello world!");
// 8. 通過(guò)生產(chǎn)者producer發(fā)送消息實(shí)體
producer.send(message);
// 9. 關(guān)閉連接
connection.close();
Spring Boot 集成
自動(dòng)注入?yún)⒖迹簅rg.springframework.boot.autoconfigure.jms.activemq.ActiveMQConnectionFactoryConfiguration.SimpleConnectionFactoryConfiguration
添加依賴屡立;
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
添加 yaml 配置;
spring:
activemq:
broker-url: tcp://localhost:61616
jms:
#消息模式 true:廣播(Topic)搀军,false:隊(duì)列(Queue),默認(rèn)時(shí)false
pub-sub-domain: true
收發(fā)消息膨俐;
@Autowired
private JmsTemplate jmsTemplate;
// 接收消息
@JmsListener(destination = "test")
public void receiveMsg(String msg) {
System.out.println(msg);
}
// 發(fā)送消息
public void sendMsg(String destination, String msg) {
jmsTemplate.convertAndSend(destination, msg);
}
高可用
基于 zookeeper 實(shí)現(xiàn)主從架構(gòu),修改 activemq.xml 節(jié)點(diǎn) persistenceAdapter 配置罩句;
<persistenceAdapter>
<replicatedLevelDB
directory="${activemq.data}/levelDB"
replicas="3"
bind="tcp://0.0.0.0:0"
zkAddress="172.17.0.4:2181,172.17.0.4:2182,172.17.0.4:2183"
zkPath="/activemq/leveldb-stores"
hostname="localhost"
/>
</persistenceAdapter>
broker 地址為:failover:(tcp://192.168.4.19:61616,tcp://192.168.4.19:61617,tcp://192.168.4.19:61618)?randomize=false
負(fù)載均衡
在高可用集群節(jié)點(diǎn) activemq.xml 添加節(jié)點(diǎn) networkConnectors焚刺;
<networkConnectors>
<networkConnector uri="static:(tcp://192.168.0.103:61616,tcp://192.168.0.103:61617,tcp://192.168.0.103:61618)" duplex="false"/>
</networkConnectors>
更多詳細(xì)信息可參考:https://blog.csdn.net/haoyuyang/article/details/53931710
集群消費(fèi)
由于發(fā)布訂閱模式,所有訂閱者都會(huì)接收到消息门烂,在生產(chǎn)環(huán)境乳愉,消費(fèi)者集群會(huì)產(chǎn)生消息重復(fù)消費(fèi)問(wèn)題。
ActiveMQ 提供 VirtualTopic 功能,解決多消費(fèi)端接收同一條消息的問(wèn)題匾委。于生產(chǎn)者而言拖叙,VirtualTopic 就是一個(gè) topic,對(duì)消費(fèi)而言則是 queue赂乐。
在 activemq.xml 添加節(jié)點(diǎn) destinationInterceptors薯鳍;
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<virtualTopic name="testTopic" prefix="consumer.*." selectorAware="false"/>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
生產(chǎn)者正常往 testTopic 中發(fā)送消息,訂閱者可修改訂閱主題為類似 consumer.A.testTopic 這樣來(lái)消費(fèi)挨措。
更多詳細(xì)信息可參考:https://blog.csdn.net/java_collect/article/details/82154829
RocketMQ
是一個(gè)隊(duì)列模型的消息中間件挖滤,具有高性能、高可靠浅役、高實(shí)時(shí)斩松、分布式特點(diǎn)。
架構(gòu)圖示
-
Name Server
名稱服務(wù)器觉既,類似于 Zookeeper 注冊(cè)中心惧盹,提供 Broker 發(fā)現(xiàn);
-
Broker
RocketMQ 的核心組件瞪讼,絕大部分工作都在 Broker 中完成钧椰,接收請(qǐng)求,處理消費(fèi)符欠,消息持久化等嫡霞;
-
Producer
消息生產(chǎn)方;
-
Consumer
消息消費(fèi)方希柿;
快速開始
安裝后诊沪,依次啟動(dòng) nameserver 和 broker,可以用 mqadmin 管理主題曾撤、集群和 broker 等信息端姚;
添加依賴;
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.5.2</version>
</dependency>
消息發(fā)送挤悉;
DefaultMQProducer producer = new DefaultMQProducer("producer-group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setInstanceName("producer");
producer.start();
Message msg = new Message(
"producer-topic",
"msg",
"hello world".getBytes()
);
//msg.setDelayTimeLevel(1);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult.toString());
producer.shutdown();
delayLevel 從 1 開始默認(rèn)依次是:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h渐裸。
參考 org.apache.rocketmq.store.schedule.ScheduleMessageService#parseDelayLevel。
消息接收尖啡;
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setInstanceName("consumer");
consumer.subscribe("producer-topic", "msg");
consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
for (MessageExt msg : list) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
.\mqadmin.cmd sendMessage -t producer-topic -c msg -p "hello rocketmq" -n localhost:9876
Spring Boot 集成
添加依賴橄仆;
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
添加 yaml 配置;
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: producer
發(fā)送消息衅斩;
@Autowired
private RocketMQTemplate mqTemplate;
public void sendMessage(String topic, String tag, String message) {
SendResult result = mqTemplate.syncSend(topic + ":" + tag, message);
System.out.println(JSON.toJSONString(result));
}
接收消息盆顾;
@Component
@RocketMQMessageListener(consumerGroup = "consumer", topic = "topic-test", selectorExpression = "tag-test")
public class MsgListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println(message);
}
}
Console 控制臺(tái)
RocketMQ 拓展包提供了管理控制臺(tái);
https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console
重復(fù)消費(fèi)
產(chǎn)生原因:
- 生產(chǎn)者重復(fù)投遞畏梆;
- 消息隊(duì)列異常您宪;
- 消費(fèi)者異常消費(fèi)奈懒;
怎么解決重復(fù)消費(fèi)的問(wèn)題,換句話怎么保證消息消費(fèi)的冪等性宪巨。
通沉仔樱基于本地消息表的方案實(shí)現(xiàn),消息處理過(guò)便不再處理捏卓。
順序消息
消息錯(cuò)亂的原因:
- 一個(gè)消息隊(duì)列 queue极祸,多個(gè) consumer 消費(fèi);
- 一個(gè) queue 對(duì)應(yīng)一個(gè) consumer怠晴,但是 consumer 多線程消費(fèi)遥金;
要保證消息的順序消費(fèi),有三個(gè)關(guān)鍵點(diǎn):
- 消息順序發(fā)送
- 消息順序存儲(chǔ)
- 消息順序消費(fèi)
參考 RocketMq 中的 MessageQueueSelector 和 MessageListenerOrderly蒜田。
分布式事務(wù)
在分布式系統(tǒng)中稿械,一個(gè)事務(wù)由多個(gè)本地事務(wù)組成。這里介紹一個(gè)基于 MQ 的分布式事務(wù)解決方案冲粤。
通過(guò) broker 的 HA 高可用美莫,和定時(shí)回查 prepare 消息的狀態(tài),來(lái)保證最終一致性梯捕。