MQ 入門實(shí)踐

MQ

Message Queue贯底,消息隊(duì)列,F(xiàn)IFO 結(jié)構(gòu)乍恐。

image

例如電商平臺(tái)聚谁,在用戶支付訂單后執(zhí)行對(duì)應(yīng)的操作母剥;

image

優(yōu)點(diǎn):

  • 異步
  • 削峰
  • 解耦

缺點(diǎn)

  • 增加系統(tǒng)復(fù)雜性
  • 數(shù)據(jù)一致性
  • 可用性

JMS

Java Message Service,Java消息服務(wù)形导,類似 JDBC 提供了訪問(wèn)數(shù)據(jù)庫(kù)的標(biāo)準(zhǔn)婿牍,JMS 也制定了一套系統(tǒng)間消息通信的規(guī)范暂衡;

區(qū)別于 JDBC锦茁,JDK 原生包中并未定義 JMS 相關(guān)接口寄症。

  1. ConnectionFactory

  2. Connection

  3. Destination

  4. Session

  5. MessageConsumer

  6. MessageProducer

  7. Message

協(xié)作方式圖示為;

image

業(yè)界產(chǎn)品

ActiveMQ RabbitMQ RocketMQ kafka
單機(jī)吞吐量 萬(wàn)級(jí) 萬(wàn)級(jí) 10 萬(wàn)級(jí) 10 萬(wàn)級(jí)
可用性 非常高 非常高
可靠性 較低概率丟失消息 基本不丟 可以做到 0 丟失 可以做到 0 丟失
功能支持 較為完善 基于 erlang憔披,并發(fā)強(qiáng)等限,性能好爸吮,延時(shí)低 分布式芬膝,拓展性好,支持分布式事務(wù) 較為簡(jiǎn)單形娇,主要應(yīng)用與大數(shù)據(jù)實(shí)時(shí)計(jì)算锰霜,日志采集等
社區(qū)活躍度

ActiveMQ

作為 Apache 下的開源項(xiàng)目,完全支持 JMS 規(guī)范桐早。并且 Spring Boot 內(nèi)置了 ActiveMQ 的自動(dòng)化配置癣缅,作為入門再適合不過(guò)。

快速開始

添加依賴哄酝;

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-core</artifactId>
    <version>5.7.0</version>
</dependency>

消息發(fā)送友存;

// 1. 創(chuàng)建連接工廠
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 2. 工廠創(chuàng)建連接
Connection connection = factory.createConnection();
// 3. 啟動(dòng)連接
connection.start();
// 4. 創(chuàng)建連接會(huì)話session,第一個(gè)參數(shù)為是否在事務(wù)中處理陶衅,第二個(gè)參數(shù)為應(yīng)答模式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5. 根據(jù)session創(chuàng)建消息隊(duì)列目的地
Destination queue = session.createQueue("test-queue");
// 6. 根據(jù)session和目的地queue創(chuàng)建生產(chǎn)者
MessageProducer producer = session.createProducer(queue);
// 7. 根據(jù)session創(chuàng)建消息實(shí)體
Message message = session.createTextMessage("hello world!");
// 8. 通過(guò)生產(chǎn)者producer發(fā)送消息實(shí)體
producer.send(message);
// 9. 關(guān)閉連接
connection.close();

Spring Boot 集成

自動(dòng)注入?yún)⒖迹簅rg.springframework.boot.autoconfigure.jms.activemq.ActiveMQConnectionFactoryConfiguration.SimpleConnectionFactoryConfiguration

添加依賴屡立;

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

添加 yaml 配置;

spring:
  activemq:
    broker-url: tcp://localhost:61616
  jms:
    #消息模式 true:廣播(Topic)搀军,false:隊(duì)列(Queue),默認(rèn)時(shí)false
    pub-sub-domain: true

收發(fā)消息膨俐;

@Autowired
private JmsTemplate jmsTemplate;

// 接收消息
@JmsListener(destination = "test")
public void receiveMsg(String msg) {
    System.out.println(msg);
}

// 發(fā)送消息
public void sendMsg(String destination, String msg) {
    jmsTemplate.convertAndSend(destination, msg);
}

高可用

基于 zookeeper 實(shí)現(xiàn)主從架構(gòu),修改 activemq.xml 節(jié)點(diǎn) persistenceAdapter 配置罩句;

<persistenceAdapter>
    <replicatedLevelDB
        directory="${activemq.data}/levelDB"
        replicas="3"
        bind="tcp://0.0.0.0:0"
        zkAddress="172.17.0.4:2181,172.17.0.4:2182,172.17.0.4:2183"
        zkPath="/activemq/leveldb-stores"
        hostname="localhost"
    />
</persistenceAdapter>

broker 地址為:failover:(tcp://192.168.4.19:61616,tcp://192.168.4.19:61617,tcp://192.168.4.19:61618)?randomize=false

負(fù)載均衡

在高可用集群節(jié)點(diǎn) activemq.xml 添加節(jié)點(diǎn) networkConnectors焚刺;

<networkConnectors>
    <networkConnector uri="static:(tcp://192.168.0.103:61616,tcp://192.168.0.103:61617,tcp://192.168.0.103:61618)" duplex="false"/>
</networkConnectors>

更多詳細(xì)信息可參考:https://blog.csdn.net/haoyuyang/article/details/53931710

集群消費(fèi)

由于發(fā)布訂閱模式,所有訂閱者都會(huì)接收到消息门烂,在生產(chǎn)環(huán)境乳愉,消費(fèi)者集群會(huì)產(chǎn)生消息重復(fù)消費(fèi)問(wèn)題。

ActiveMQ 提供 VirtualTopic 功能,解決多消費(fèi)端接收同一條消息的問(wèn)題匾委。于生產(chǎn)者而言拖叙,VirtualTopic 就是一個(gè) topic,對(duì)消費(fèi)而言則是 queue赂乐。

image

在 activemq.xml 添加節(jié)點(diǎn) destinationInterceptors薯鳍;

<destinationInterceptors> 
    <virtualDestinationInterceptor> 
        <virtualDestinations> 
            <virtualTopic name="testTopic" prefix="consumer.*." selectorAware="false"/>    
        </virtualDestinations>
    </virtualDestinationInterceptor> 
</destinationInterceptors>

生產(chǎn)者正常往 testTopic 中發(fā)送消息,訂閱者可修改訂閱主題為類似 consumer.A.testTopic 這樣來(lái)消費(fèi)挨措。

更多詳細(xì)信息可參考:https://blog.csdn.net/java_collect/article/details/82154829

RocketMQ

是一個(gè)隊(duì)列模型的消息中間件挖滤,具有高性能、高可靠浅役、高實(shí)時(shí)斩松、分布式特點(diǎn)。

架構(gòu)圖示

image
  1. Name Server

    名稱服務(wù)器觉既,類似于 Zookeeper 注冊(cè)中心惧盹,提供 Broker 發(fā)現(xiàn);

  2. Broker

    RocketMQ 的核心組件瞪讼,絕大部分工作都在 Broker 中完成钧椰,接收請(qǐng)求,處理消費(fèi)符欠,消息持久化等嫡霞;

  3. Producer

    消息生產(chǎn)方;

  4. Consumer

    消息消費(fèi)方希柿;

快速開始

安裝后诊沪,依次啟動(dòng) nameserver 和 broker,可以用 mqadmin 管理主題曾撤、集群和 broker 等信息端姚;

https://segmentfault.com/a/1190000017841402

添加依賴;

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.5.2</version>
</dependency>

消息發(fā)送挤悉;

DefaultMQProducer producer = new DefaultMQProducer("producer-group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setInstanceName("producer");
producer.start();
Message msg = new Message(
    "producer-topic",
    "msg",
    "hello world".getBytes()
);
//msg.setDelayTimeLevel(1);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult.toString());
producer.shutdown();

delayLevel 從 1 開始默認(rèn)依次是:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h渐裸。

參考 org.apache.rocketmq.store.schedule.ScheduleMessageService#parseDelayLevel。

消息接收尖啡;

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setInstanceName("consumer");
consumer.subscribe("producer-topic", "msg");
consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
    for (MessageExt msg : list) {
        System.out.println(new String(msg.getBody()));
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();

.\mqadmin.cmd sendMessage -t producer-topic -c msg -p "hello rocketmq" -n localhost:9876

Spring Boot 集成

添加依賴橄仆;

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.4</version>
</dependency>

添加 yaml 配置;

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: producer

發(fā)送消息衅斩;

@Autowired
private RocketMQTemplate mqTemplate;

public void sendMessage(String topic, String tag, String message) {
    SendResult result = mqTemplate.syncSend(topic + ":" + tag, message);
    System.out.println(JSON.toJSONString(result));
}

接收消息盆顾;

@Component
@RocketMQMessageListener(consumerGroup = "consumer", topic = "topic-test", selectorExpression = "tag-test")
public class MsgListener implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println(message);
    }
}

Console 控制臺(tái)

RocketMQ 拓展包提供了管理控制臺(tái);

https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console

image

重復(fù)消費(fèi)

產(chǎn)生原因:

  1. 生產(chǎn)者重復(fù)投遞畏梆;
  2. 消息隊(duì)列異常您宪;
  3. 消費(fèi)者異常消費(fèi)奈懒;

怎么解決重復(fù)消費(fèi)的問(wèn)題,換句話怎么保證消息消費(fèi)的冪等性宪巨。

通沉仔樱基于本地消息表的方案實(shí)現(xiàn),消息處理過(guò)便不再處理捏卓。

順序消息

消息錯(cuò)亂的原因:

  1. 一個(gè)消息隊(duì)列 queue极祸,多個(gè) consumer 消費(fèi);
  2. 一個(gè) queue 對(duì)應(yīng)一個(gè) consumer怠晴,但是 consumer 多線程消費(fèi)遥金;

要保證消息的順序消費(fèi),有三個(gè)關(guān)鍵點(diǎn):

  1. 消息順序發(fā)送
  2. 消息順序存儲(chǔ)
  3. 消息順序消費(fèi)
image

參考 RocketMq 中的 MessageQueueSelector 和 MessageListenerOrderly蒜田。

分布式事務(wù)

在分布式系統(tǒng)中稿械,一個(gè)事務(wù)由多個(gè)本地事務(wù)組成。這里介紹一個(gè)基于 MQ 的分布式事務(wù)解決方案冲粤。

image

通過(guò) broker 的 HA 高可用美莫,和定時(shí)回查 prepare 消息的狀態(tài),來(lái)保證最終一致性梯捕。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末厢呵,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子科阎,更是在濱河造成了極大的恐慌述吸,老刑警劉巖忿族,帶你破解...
    沈念sama閱讀 206,311評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件锣笨,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡道批,警方通過(guò)查閱死者的電腦和手機(jī)错英,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,339評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)隆豹,“玉大人椭岩,你說(shuō)我怎么就攤上這事×模” “怎么了判哥?”我有些...
    開封第一講書人閱讀 152,671評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)碉考。 經(jīng)常有香客問(wèn)我塌计,道長(zhǎng),這世上最難降的妖魔是什么侯谁? 我笑而不...
    開封第一講書人閱讀 55,252評(píng)論 1 279
  • 正文 為了忘掉前任锌仅,我火速辦了婚禮章钾,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘热芹。我一直安慰自己贱傀,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,253評(píng)論 5 371
  • 文/花漫 我一把揭開白布伊脓。 她就那樣靜靜地躺著府寒,像睡著了一般。 火紅的嫁衣襯著肌膚如雪报腔。 梳的紋絲不亂的頭發(fā)上椰棘,一...
    開封第一講書人閱讀 49,031評(píng)論 1 285
  • 那天,我揣著相機(jī)與錄音榄笙,去河邊找鬼邪狞。 笑死,一個(gè)胖子當(dāng)著我的面吹牛茅撞,可吹牛的內(nèi)容都是我干的帆卓。 我是一名探鬼主播,決...
    沈念sama閱讀 38,340評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼米丘,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼剑令!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起拄查,我...
    開封第一講書人閱讀 36,973評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤吁津,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后堕扶,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體碍脏,經(jīng)...
    沈念sama閱讀 43,466評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,937評(píng)論 2 323
  • 正文 我和宋清朗相戀三年稍算,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了典尾。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,039評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡糊探,死狀恐怖钾埂,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情科平,我是刑警寧澤褥紫,帶...
    沈念sama閱讀 33,701評(píng)論 4 323
  • 正文 年R本政府宣布,位于F島的核電站瞪慧,受9級(jí)特大地震影響髓考,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜汞贸,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,254評(píng)論 3 307
  • 文/蒙蒙 一绳军、第九天 我趴在偏房一處隱蔽的房頂上張望印机。 院中可真熱鬧,春花似錦门驾、人聲如沸射赛。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,259評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)楣责。三九已至,卻和暖如春聂沙,著一層夾襖步出監(jiān)牢的瞬間秆麸,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工及汉, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留沮趣,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,497評(píng)論 2 354
  • 正文 我出身青樓坷随,卻偏偏與公主長(zhǎng)得像房铭,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子温眉,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,786評(píng)論 2 345