RocketMQ 實戰(zhàn)之快速入門

最近 RocketMQ 剛剛上生產(chǎn)環(huán)境鸿秆,閑暇之時在這里做一些分享鸠项,主要目的是讓初學(xué)者能快速上手RocketMQ。

RocketMQ 是什么

Github 上關(guān)于 RocketMQ 的介紹:
RcoketMQ 是一款低延遲嗽测、高可靠、可伸縮铐炫、易于使用的消息中間件。具有以下特性:

  1. 支持發(fā)布/訂閱(Pub/Sub)和點對點(P2P)消息模型
  2. 在一個隊列中可靠的先進(jìn)先出(FIFO)和嚴(yán)格的順序傳遞
  3. 支持拉(pull)和推(push)兩種消息模式
  4. 單一隊列百萬消息的堆積能力
  5. 支持多種消息協(xié)議钓瞭,如 JMS驳遵、MQTT 等
  6. 分布式高可用的部署架構(gòu),滿足至少一次消息傳遞語義
  7. 提供 docker 鏡像用于隔離測試和云集群部署
  8. 提供配置、指標(biāo)和監(jiān)控等功能豐富的 Dashboard

對于這些特性描述山涡,大家簡單過一眼就即可堤结,深入學(xué)習(xí)之后自然就明白了。

專業(yè)術(shù)語

Producer

消息生產(chǎn)者鸭丛,生產(chǎn)者的作用就是將消息發(fā)送到 MQ竞穷,生產(chǎn)者本身既可以產(chǎn)生消息,如讀取文本信息等鳞溉。也可以對外提供接口瘾带,由外部應(yīng)用來調(diào)用接口,再由生產(chǎn)者將收到的消息發(fā)送到 MQ熟菲。

Producer Group

生產(chǎn)者組看政,簡單來說就是多個發(fā)送同一類消息的生產(chǎn)者稱之為一個生產(chǎn)者組。在這里可以不用關(guān)心抄罕,只要知道有這么一個概念即可允蚣。

Consumer

消息消費者,簡單來說呆贿,消費 MQ 上的消息的應(yīng)用程序就是消費者嚷兔,至于消息是否進(jìn)行邏輯處理,還是直接存儲到數(shù)據(jù)庫等取決于業(yè)務(wù)需要做入。

Consumer Group

消費者組冒晰,和生產(chǎn)者類似,消費同一類消息的多個 consumer 實例組成一個消費者組竟块。

Topic

Topic 是一種消息的邏輯分類壶运,比如說你有訂單類的消息,也有庫存類的消息浪秘,那么就需要進(jìn)行分類前弯,一個是訂單 Topic 存放訂單相關(guān)的消息,一個是庫存 Topic 存儲庫存相關(guān)的消息秫逝。

Message

Message 是消息的載體。一個 Message 必須指定 topic询枚,相當(dāng)于寄信的地址违帆。Message 還有一個可選的 tag 設(shè)置,以便消費端可以基于 tag 進(jìn)行過濾消息金蜀。也可以添加額外的鍵值對刷后,例如你需要一個業(yè)務(wù) key 來查找 broker 上的消息的畴,方便在開發(fā)過程中診斷問題。

Tag

標(biāo)簽可以被認(rèn)為是對 Topic 進(jìn)一步細(xì)化尝胆。一般在相同業(yè)務(wù)模塊中通過引入標(biāo)簽來標(biāo)記不同用途的消息丧裁。

Broker

Broker 是 RocketMQ 系統(tǒng)的主要角色,其實就是前面一直說的 MQ含衔。Broker 接收來自生產(chǎn)者的消息煎娇,儲存以及為消費者拉取消息的請求做好準(zhǔn)備。

Name Server

Name Server 為 producer 和 consumer 提供路由信息贪染。

RocketMQ 架構(gòu)

RocketMQ 架構(gòu)

由這張圖可以看到有四個集群缓呛,分別是 NameServer 集群、Broker 集群杭隙、Producer 集群和 Consumer 集群:

  1. NameServer: 提供輕量級的服務(wù)發(fā)現(xiàn)和路由哟绊。 每個 NameServer 記錄完整的路由信息,提供等效的讀寫服務(wù)痰憎,并支持快速存儲擴(kuò)展票髓。
  2. Broker: 通過提供輕量級的 Topic 和 Queue 機(jī)制來處理消息存儲,同時支持推(push)和拉(pull)模式以及主從結(jié)構(gòu)的容錯機(jī)制。
  3. Producer:生產(chǎn)者铣耘,產(chǎn)生消息的實例洽沟,擁有相同 Producer Group 的 Producer 組成一個集群。
  4. Consumer:消費者涡拘,接收消息進(jìn)行消費的實例玲躯,擁有相同 Consumer Group 的
    Consumer 組成一個集群。

簡單說明一下圖中箭頭含義鳄乏,從 Broker 開始跷车,Broker Master1 和 Broker Slave1 是主從結(jié)構(gòu),它們之間會進(jìn)行數(shù)據(jù)同步橱野,即 Date Sync朽缴。同時每個 Broker 與
NameServer 集群中的所有節(jié)
點建立長連接,定時注冊 Topic 信息到所有 NameServer 中水援。

Producer 與 NameServer 集群中的其中一個節(jié)點(隨機(jī)選擇)建立長連接密强,定期從 NameServer 獲取 Topic 路由信息,并向提供 Topic 服務(wù)的 Broker Master 建立長連接蜗元,且定時向 Broker 發(fā)送心跳或渤。Producer 只能將消息發(fā)送到 Broker master,但是 Consumer 則不一樣奕扣,它同時和提供 Topic 服務(wù)的 Master 和 Slave
建立長連接薪鹦,既可以從 Broker Master 訂閱消息,也可以從 Broker Slave 訂閱消息。

RocketMQ 集群部署模式

  1. 單 master 模式
    也就是只有一個 master 節(jié)點池磁,稱不上是集群奔害,一旦這個 master 節(jié)點宕機(jī),那么整個服務(wù)就不可用地熄,適合個人學(xué)習(xí)使用华临。
  2. 多 master 模式
    多個 master 節(jié)點組成集群,單個 master 節(jié)點宕機(jī)或者重啟對應(yīng)用沒有影響端考。
    優(yōu)點:所有模式中性能最高
    缺點:單個 master 節(jié)點宕機(jī)期間雅潭,未被消費的消息在節(jié)點恢復(fù)之前不可用,消息的實時性就受到影響跛梗。
    注意:使用同步刷盤可以保證消息不丟失寻馏,同時 Topic 相對應(yīng)的 queue 應(yīng)該分布在集群中各個節(jié)點,而不是只在某各節(jié)點上核偿,否則诚欠,該節(jié)點宕機(jī)會對訂閱該 topic 的應(yīng)用造成影響。
  3. 多 master 多 slave 異步復(fù)制模式
    在多 master 模式的基礎(chǔ)上漾岳,每個 master 節(jié)點都有至少一個對應(yīng)的 slave轰绵。master
    節(jié)點可讀可寫,但是 slave 只能讀不能寫尼荆,類似于 mysql 的主備模式左腔。
    優(yōu)點: 在 master 宕機(jī)時,消費者可以從 slave 讀取消息捅儒,消息的實時性不會受影響液样,性能幾乎和多 master 一樣。
    缺點:使用異步復(fù)制的同步方式有可能會有消息丟失的問題巧还。
  4. 多 master 多 slave 同步雙寫模式
    同多 master 多 slave 異步復(fù)制模式類似鞭莽,區(qū)別在于 master 和 slave 之間的數(shù)據(jù)同步方式。
    優(yōu)點:同步雙寫的同步模式能保證數(shù)據(jù)不丟失麸祷。
    缺點:發(fā)送單個消息 RT 會略長澎怒,性能相比異步復(fù)制低10%左右。
    刷盤策略:同步刷盤和異步刷盤(指的是節(jié)點自身數(shù)據(jù)是同步還是異步存儲)
    同步方式:同步雙寫和異步復(fù)制(指的一組 master 和 slave 之間數(shù)據(jù)的同步)
    注意:要保證數(shù)據(jù)可靠阶牍,需采用同步刷盤和同步雙寫的方式喷面,但性能會較其他方式低。

RocketMQ 單主部署

鑒于是快速入門走孽,我選擇的是第一種單 master 的部署模式惧辈。先說明一下我的安裝環(huán)境:

  1. Centos 7.2
  2. jdk 1.8
  3. Maven 3.2.x
  4. Git

這里 git 可用可不用,主要是用來直接下載 github 上的源碼磕瓷。也可以選擇自己到
github 上下載盒齿,然后上傳到服務(wù)器上。以git操作為示例。

  1. clone 源碼并用 maven 編譯
> git clone https://github.com/alibaba/RocketMQ.git /opt/RocketMQ
> cd /opt/RocketMQ && mvn -Dmaven.test.skip=true clean package install assembly:assembly -U
> cd target/alibaba-rocketmq-broker/alibaba-rocketmq

此處可能遇到的問題
一县昂、執(zhí)行"git clone https://github.com/alibaba/RocketMQ.git /home/inspkgs/RocketMQ"時出現(xiàn)以下提示:

fatal: unable to access 'https://github.com/alibaba/RocketMQ.git/': Could not resolve host: github.com; Unknown error

解決辦法:一般是由于網(wǎng)絡(luò)原因造成的,執(zhí)行以下命令

> ping github.com

確定可以 ping 通之后陷舅,再重新執(zhí)行 git clone 命令倒彰。
二、執(zhí)行"mvn -Dmaven.test.skip=true clean package install assembly:assembly -U"編譯時莱睁,可能出現(xiàn)下載相關(guān)jar很慢的情況待讳。
這也是由于默認(rèn) maven 中央倉庫在國外的原因,可以根據(jù)需要在 /home/maven/conf/setting.xml 中的 <mirrors></mirrors> 添加以下內(nèi)容后重新編譯:

<mirror>
    <id>aliyun</id>
    <mirrorOf>central</mirrorOf>
    <name>aliyun maven</name>
    <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</mirror>
  1. 啟動 Name Server
> nohup sh /opt/RocketMQ/bin/mqnamesrv &
//執(zhí)行 jps 查看進(jìn)程
> jps
25913 NamesrvStartup
//查看日志確保服務(wù)已正常啟動
> tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
  1. 啟動 broker
> nohup sh /opt/RocketMQ/bin/mqbroker -n localhost:9876 &
//執(zhí)行 jps 查看進(jìn)程
> jps
25954 BrokerStartup
//查看日志確保服務(wù)已正常啟動
> tail -f ~/logs/rocketmqlogs/broker.log 
The broker[broker-a, 10.1.54.121:10911] boot success...
  1. 發(fā)送和接收消息
    發(fā)送/接收消息之前,我們需要告訴客戶端 NameServer 地址仰剿。RocketMQ 提供了多種方式來實現(xiàn)這一目標(biāo)创淡。為簡單起見,我們使用環(huán)境變量 NAMESRV_ADDR。
> export NAMESRV_ADDR=localhost:9876
> sh /opt/RocketMQ/bin/tools.sh com.alibaba.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...
> sh /opt/RocketMQ/bin/tools.sh com.alibaba.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...
  1. 關(guān)閉服務(wù)
> sh /opt/RocketMQ/bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK
> sh /opt/RocketMQ/bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK

生產(chǎn)者南吮、消費者 Demo

  1. 生產(chǎn)者
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        //聲明并初始化一個producer
        //需要一個producer group名字作為構(gòu)造方法的參數(shù)琳彩,這里為producer1
        DefaultMQProducer producer = new DefaultMQProducer("producer1");
        
        //設(shè)置NameServer地址,此處應(yīng)改為實際NameServer地址,多個地址之間用部凑;分隔
        //NameServer的地址必須有露乏,但是也可以通過環(huán)境變量的方式設(shè)置,不一定非得寫死在代碼里
        producer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");
        
        //調(diào)用start()方法啟動一個producer實例
        producer.start();

        //發(fā)送10條消息到Topic為TopicTest涂邀,tag為TagA瘟仿,消息內(nèi)容為“Hello RocketMQ”拼接上i的值
        for (int i = 0; i < 10; i++) {
            try {
                Message msg = new Message("TopicTest",// topic
                        "TagA",// tag
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body
                );
                
                //調(diào)用producer的send()方法發(fā)送消息
                //這里調(diào)用的是同步的方式,所以會有返回結(jié)果
                SendResult sendResult = producer.send(msg);
                
                //打印返回結(jié)果比勉,可以看到消息發(fā)送的狀態(tài)以及一些相關(guān)信息
                System.out.println(sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        //發(fā)送完消息之后劳较,調(diào)用shutdown()方法關(guān)閉producer
        producer.shutdown();
    }
}
  1. 消費者
public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
    
        //聲明并初始化一個consumer
        //需要一個consumer group名字作為構(gòu)造方法的參數(shù),這里為consumer1
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");

        //同樣也要設(shè)置NameServer地址
        consumer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");

        //這里設(shè)置的是一個consumer的消費策略
        //CONSUME_FROM_LAST_OFFSET 默認(rèn)策略浩聋,從該隊列最尾開始消費观蜗,即跳過歷史消息
        //CONSUME_FROM_FIRST_OFFSET 從隊列最開始開始消費,即歷史消息(還儲存在broker的)全部消費一遍
        //CONSUME_FROM_TIMESTAMP 從某個時間點開始消費赡勘,和setConsumeTimestamp()配合使用嫂便,默認(rèn)是半個小時以前
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        //設(shè)置consumer所訂閱的Topic和Tag,*代表全部的Tag
        consumer.subscribe("TopicTest", "*");

        //設(shè)置一個Listener闸与,主要進(jìn)行消息的邏輯處理
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {

                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                
                //返回消費狀態(tài)
                //CONSUME_SUCCESS 消費成功
                //RECONSUME_LATER 消費失敗毙替,需要稍后重新消費
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //調(diào)用start()方法啟動consumer
        consumer.start();

        System.out.println("Consumer Started.");
    }
}

本篇到此完結(jié),第一次寫文章践樱,表達(dá)不清楚的地方還請各位小伙伴海涵厂画。如有問題,可以多多交流拷邢。如果后面有時間的話袱院,我也會分享更多的干貨,包括
producer、consumer 的 API 詳解忽洛、如何與 spring boot 整合腻惠、生產(chǎn)環(huán)境部署的一些注意事項,以及學(xué)習(xí) RocketMQ 這一路踩過的坑等等欲虚。

最后集灌,如果覺得寫的還過得去,點個喜歡或者小小打賞一下都是一種肯定复哆,謝謝大家的支持欣喧。

ps:偶然的機(jī)會看到自己寫的文章出現(xiàn)在一個個人網(wǎng)站,當(dāng)我通過QQ聯(lián)系那個人詢問這件事時梯找,還被直接拉黑唆阿,真的很郁悶。在此锈锤,我想說驯鳖,想要用我的作品可以,但一定要征得我同意牙咏,這是對一個作者最基本的尊重臼隔。

最新更新文章如下,包括修改的內(nèi)容都會重新整合:

  1. 3分鐘快速入門RocketMQ(上)
  2. 3分鐘快速入門RocketMQ(下)
  3. 必知必會的RocketMQ消息類型
  4. RocketMQ的消息發(fā)送方式(20180121更新)
  5. 細(xì)談RocketMQ的消費模式(20180126更新)
  6. 使用RocketMQ的小細(xì)節(jié)(上)(20180205更新)
  7. 使用RocketMQ的小細(xì)節(jié)(下)(20180301更新)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末妄壶,一起剝皮案震驚了整個濱河市摔握,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌丁寄,老刑警劉巖氨淌,帶你破解...
    沈念sama閱讀 216,324評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異伊磺,居然都是意外死亡盛正,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,356評論 3 392
  • 文/潘曉璐 我一進(jìn)店門屑埋,熙熙樓的掌柜王于貴愁眉苦臉地迎上來豪筝,“玉大人,你說我怎么就攤上這事摘能⌒拢” “怎么了?”我有些...
    開封第一講書人閱讀 162,328評論 0 353
  • 文/不壞的土叔 我叫張陵团搞,是天一觀的道長严望。 經(jīng)常有香客問我,道長逻恐,這世上最難降的妖魔是什么像吻? 我笑而不...
    開封第一講書人閱讀 58,147評論 1 292
  • 正文 為了忘掉前任峻黍,我火速辦了婚禮,結(jié)果婚禮上拨匆,老公的妹妹穿的比我還像新娘姆涩。我一直安慰自己,他們只是感情好惭每,可當(dāng)我...
    茶點故事閱讀 67,160評論 6 388
  • 文/花漫 我一把揭開白布阵面。 她就那樣靜靜地躺著,像睡著了一般洪鸭。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上仑扑,一...
    開封第一講書人閱讀 51,115評論 1 296
  • 那天览爵,我揣著相機(jī)與錄音,去河邊找鬼镇饮。 笑死蜓竹,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的储藐。 我是一名探鬼主播俱济,決...
    沈念sama閱讀 40,025評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼钙勃!你這毒婦竟也來了蛛碌?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,867評論 0 274
  • 序言:老撾萬榮一對情侶失蹤辖源,失蹤者是張志新(化名)和其女友劉穎蔚携,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體克饶,經(jīng)...
    沈念sama閱讀 45,307評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡酝蜒,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,528評論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了矾湃。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片亡脑。...
    茶點故事閱讀 39,688評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖邀跃,靈堂內(nèi)的尸體忽然破棺而出霉咨,到底是詐尸還是另有隱情,我是刑警寧澤坞嘀,帶...
    沈念sama閱讀 35,409評論 5 343
  • 正文 年R本政府宣布躯护,位于F島的核電站,受9級特大地震影響丽涩,放射性物質(zhì)發(fā)生泄漏棺滞。R本人自食惡果不足惜裁蚁,卻給世界環(huán)境...
    茶點故事閱讀 41,001評論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望继准。 院中可真熱鬧枉证,春花似錦、人聲如沸移必。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,657評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽崔泵。三九已至秒赤,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間憎瘸,已是汗流浹背入篮。 一陣腳步聲響...
    開封第一講書人閱讀 32,811評論 1 268
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留幌甘,地道東北人潮售。 一個月前我還...
    沈念sama閱讀 47,685評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像锅风,于是被迫代替她去往敵國和親酥诽。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,573評論 2 353

推薦閱讀更多精彩內(nèi)容