RockeMQ quickstart

1梢什、RocketMQ

[官網(wǎng)地址] (http://rocketmq.apache.org)

function : 應(yīng)用解耦践图、流量消峰、消息分發(fā)菇曲、保證最終一致性冠绢、方便動態(tài)擴容等

2、linux 單機RocketMQ

可以參考官網(wǎng)quick-start

1常潮、準(zhǔn)備RocketMQ
  1. 官網(wǎng)下載編譯好的二進制文件弟胀,或者下載源碼自己編譯。

    RocketMQ 當(dāng)前的最新版本是4.2.0

  2. 系統(tǒng)要求: 64bit 的Linux 喊式、Unix 或Mac 孵户。
    Java 版本大于等于JDKl.8 。
    如果需要從GitHub 上下載源碼和編譯的話需要安裝Maven 3.2.x 和Git 岔留。

[root@aliyun rocketmq-all-4.2.0-bin]#>  unzip rocketmq-all-4.2.0-bin-release.zip -d ./rocketmq-all-4.2.0-bin
[root@aliyun rocketmq-all-4.2.0-bin]#  cd rocketmq-all-4.2.0-bin
[root@aliyun rocketmq-all-4.2.0-bin]#  ls 

里面含有以下內(nèi)容: LICENSE NOTICE README.md benchmark/ bin/ conf/ lib/

+ LICENSE 夏哭、NOTICE 和README.md 包括一些版權(quán)聲明和功能說明信息;

+ benchmark 里包括運行benchmark 程序的shell 腳本献联; 

+ bin 文件夾里含有各種使用RocketMQ的shell腳本和cmd 腳本竖配,比如啟動NameServer的mqnamesrv啟動Broker的mqbroker,集群管理腳本mqadmin 等里逆; 

+ conf 文件夾里有一些示例配置文件械念,包括三種方式的broker 配置文件、logback 日志配置文件等运悲,用戶在寫配置文件的時候,一般基于這些示例配置文件项钮,加上自己特殊的需求即可班眯; 

+ lib 文件夾里包括RocketMQ各個模塊編譯成的jar 包,以及RocketMQ 依賴的一些jar包烁巫,比如Netty署隘、commons-lang禽绪、FastJSON 等况鸣。
2、啟動RocketMQ服務(wù)

啟動單機的消息隊列服務(wù)比較簡單幼衰,不需要寫配置文件,只需要依次啟動本機的NameServer 和Broker 即可诊霹。

啟動NameServer:
[root@aliyun rocketmq-all-4.2.0-bin]#> nohup sh bin/mqnamesrv &
[root@aliyun rocketmq-all-4.2.0-bin]# tail -f ~/logs/rocketmqlogs/namesrv.log

The Name Server boot success . ..

啟動B roker :
[root@aliyun rocketmq-all-4.2.0-bin]# nohup sh bin/mqbroker -n localhost:9876 &
[root@aliyun rocketmq-all-4.2.0-bin]# tail -f ~/logs/rocketmqlogs/broker.log

The broker[%s, 192.168.0.233 : 10911] boot success .. .

!!!!!! 1->內(nèi)存不足

如果Java運行時環(huán)境的內(nèi)存不足羞延,修改jdk參數(shù)配置

rocketmq-all-4.2.0-bin/bin/runserver.sh
    JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn125m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m" 

rocketmq-all-4.2.0-bin/bin/runbroker.sh
    JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn125m"

!!!!!! 2-> org.apache.rocketmq.client.exception.MQClientException: No route info of this topic
nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true &

3、用命令行發(fā)送和接收消息

實際上就是運行寫好的demo 程序脾还,后續(xù)我們可以參考這些demo 來寫自己的發(fā)送和接收程序伴箩。

運行示例程序,發(fā)送和接收消息:

[root@aliyun rocketmq-all-4.2.0-bin]# export NAMESRV_ADDR=localhost:9876

[root@aliyun rocketmq-all-4.2.0-bin]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

SendResult [sendStatus=SEND OK, msgid= ...

[root@aliyun rocketmq-all-4.2.0-bin]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

ConsumeMessageThread 主d Receive New Messages : [MessageExt . ..
4鄙漏、關(guān)閉消息隊列
[root@aliyun rocketmq-all-4.2.0-bin]# sh bin/mqshutdown broker

Send shutdown request to mqbroker (36695 ) OK

[root@aliyun rocketmq-all-4.2.0-bin]# sh bin/mqshutdown namesrv

Send shutdown request t o mqnamesrv (36664) OK
5嗤谚、java client發(fā)送、消費消息 demo
  • java client
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.0.0-incubating</version>
        </dependency> 
  • Producer
    創(chuàng)建DefaultMQProducer對象怔蚌,設(shè)置好GroupName和NameServer后啟動巩步,把待發(fā)送的消息拼裝成Message對象,用Producer發(fā)送桦踊。
     public class Producer {
        public static void main(String[] args) throws MQClientException, InterruptedException {
            DefaultMQProducer producer = new DefaultMQProducer("producer1");
            // 設(shè)置NameServer地址 , 多個地址之間用椅野;分隔 ,但是也可以通過環(huán)境變量的方式設(shè)置 钞钙。
            producer.setNamesrvAddr("47.104.209.137:9876");
            producer.start();
            for (int i = 0; i < 10; i++) {
                try {
                    Message msg = new Message(
                         "TopicTest",
                         "TagA",
                         ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
                    );
                    //這里調(diào)用的是同步的方式鳄橘,所以會有返回結(jié)果
                    SendResult sendResult = producer.send(msg);
                    //打印返回結(jié)果,可以看到消息發(fā)送的狀態(tài)以及一些相關(guān)信息
                    System.out.println(sendResult);
                } catch (Exception e) {
                    e.printStackTrace();
                    Thread.sleep(1000);
                }
            }
            producer.shutdown();
        }
    } 
  • Consumer
    設(shè)置GroupName 芒炼、NameServer 地址以及端口號瘫怜。然后指明要操作的Topic 名稱,最后進入發(fā)送和接收邏輯本刽。
     public class Consumer {
             public static void main(String[] args) throws InterruptedException, MQClientException {
                 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");
                 consumer.setNamesrvAddr("47.104.209.137:9876");
                 //這里設(shè)置的是一個consumer的消費策略
                 //CONSUME_FROM_LAST_OFFSET 默認(rèn)策略鲸湃,從該隊列最尾開始消費,即跳過歷史消息
                 //CONSUME_FROM_FIRST_OFFSET 從隊列最開始開始消費子寓,即歷史消息(還儲存在broker的)全部消費一遍
                 //CONSUME_FROM_TIMESTAMP 從某個時間點開始消費暗挑,和setConsumeTimestamp()配合使用,默認(rèn)是半個小時以前
                 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                 //設(shè)置consumer所訂閱的Topic和Tag斜友,*代表全部的Tag
                 consumer.subscribe("TopicTest", "*");
                 //設(shè)置一個Listener炸裆,主要進行消息的邏輯處理
                 consumer.registerMessageListener(new MessageListenerConcurrently() {
                     @Override
                     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                                     ConsumeConcurrentlyContext context) {
                         System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                         //返回消費狀態(tài)
                         //CONSUME_SUCCESS 消費成功
                         //RECONSUME_LATER 消費失敗,需要稍后重新消費
                         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                     }
                 });
                 //調(diào)用start()方法啟動consumer
                 consumer.start();
                 System.out.println("Consumer Started.");
             }
     }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末鲜屏,一起剝皮案震驚了整個濱河市烹看,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌洛史,老刑警劉巖惯殊,帶你破解...
    沈念sama閱讀 218,386評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異也殖,居然都是意外死亡土思,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,142評論 3 394
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來己儒,“玉大人崎岂,你說我怎么就攤上這事≈吩福” “怎么了该镣?”我有些...
    開封第一講書人閱讀 164,704評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長响谓。 經(jīng)常有香客問我损合,道長,這世上最難降的妖魔是什么娘纷? 我笑而不...
    開封第一講書人閱讀 58,702評論 1 294
  • 正文 為了忘掉前任嫁审,我火速辦了婚禮,結(jié)果婚禮上赖晶,老公的妹妹穿的比我還像新娘律适。我一直安慰自己,他們只是感情好遏插,可當(dāng)我...
    茶點故事閱讀 67,716評論 6 392
  • 文/花漫 我一把揭開白布捂贿。 她就那樣靜靜地躺著,像睡著了一般胳嘲。 火紅的嫁衣襯著肌膚如雪厂僧。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,573評論 1 305
  • 那天了牛,我揣著相機與錄音颜屠,去河邊找鬼。 笑死鹰祸,一個胖子當(dāng)著我的面吹牛甫窟,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播蛙婴,決...
    沈念sama閱讀 40,314評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼粗井,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了街图?” 一聲冷哼從身側(cè)響起背传,我...
    開封第一講書人閱讀 39,230評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎台夺,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體痴脾,經(jīng)...
    沈念sama閱讀 45,680評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡颤介,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,873評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片滚朵。...
    茶點故事閱讀 39,991評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡冤灾,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出辕近,到底是詐尸還是另有隱情韵吨,我是刑警寧澤,帶...
    沈念sama閱讀 35,706評論 5 346
  • 正文 年R本政府宣布移宅,位于F島的核電站归粉,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏漏峰。R本人自食惡果不足惜糠悼,卻給世界環(huán)境...
    茶點故事閱讀 41,329評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望浅乔。 院中可真熱鬧倔喂,春花似錦、人聲如沸靖苇。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,910評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽贤壁。三九已至悼枢,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間芯砸,已是汗流浹背萧芙。 一陣腳步聲響...
    開封第一講書人閱讀 33,038評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留假丧,地道東北人双揪。 一個月前我還...
    沈念sama閱讀 48,158評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像包帚,于是被迫代替她去往敵國和親渔期。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,941評論 2 355

推薦閱讀更多精彩內(nèi)容