RocketMQ知識(shí)(及開發(fā)實(shí)戰(zhàn))

MQ基礎(chǔ)概念:

  • MQ:
    消息總線(Message Queue)贡避,是一種跨進(jìn)程的通信機(jī)制,用于上下游傳遞消息予弧。在互聯(lián)網(wǎng)架構(gòu)中刮吧,MQ是一種非常常見的上下游“邏輯解耦+物理解耦”的消息通信服務(wù)。使用MQ之后掖蛤,消息發(fā)送上游只需要依賴MQ杀捻,邏輯上和物理上都不用依賴其他服務(wù)。
    MQ的不足
    (1)系統(tǒng)更加復(fù)雜蚓庭,多了一個(gè)MQ組件
    (2)消息傳遞路徑更長(zhǎng)致讥,延時(shí)會(huì)增加
    (3)消息可能會(huì)被重復(fù)消費(fèi)
    (4)上游無法知道下游的執(zhí)行結(jié)果(因此,調(diào)用方實(shí)時(shí)依賴執(zhí)行結(jié)果的業(yè)務(wù)場(chǎng)景器赞,請(qǐng)使用調(diào)用垢袱,而不是MQ)

    使用場(chǎng)景
    (1)上游不關(guān)注執(zhí)行結(jié)果
    (2)上游關(guān)注結(jié)果,但執(zhí)行時(shí)間比較長(zhǎng)港柜。舉個(gè)例子请契,微信支付,跨公網(wǎng)調(diào)用微信的接口夏醉,執(zhí)行時(shí)間會(huì)比較長(zhǎng)爽锥,但調(diào)用方又非常關(guān)注執(zhí)行結(jié)果,此時(shí)一般怎么玩呢授舟?

    image.png

    一般采用“回調(diào)網(wǎng)關(guān)+MQ”方案來解耦:
    a救恨、調(diào)用方直接跨公網(wǎng)調(diào)用微信接口
    b贸辈、微信返回調(diào)用成功释树,此時(shí)并不代表返回成功
    c肠槽、微信執(zhí)行完成后,回調(diào)統(tǒng)一網(wǎng)關(guān)
    d奢啥、網(wǎng)關(guān)將返回結(jié)果通知MQ
    e秸仙、請(qǐng)求方收到結(jié)果通知

  • rocketMQ:
    RocketMQ 是什么?
    Github 上關(guān)于 RocketMQ 的介紹:
    RcoketMQ 是一款低延遲桩盲、高可靠寂纪、可伸縮、易于使用的消息中間件赌结。具有以下特性:

    支持發(fā)布/訂閱(Pub/Sub)和點(diǎn)對(duì)點(diǎn)(P2P)消息模型
    在一個(gè)隊(duì)列中可靠的先進(jìn)先出(FIFO)和嚴(yán)格的順序傳遞
    支持拉(pull)和推(push)兩種消息模式
    單一隊(duì)列百萬消息的堆積能力
    支持多種消息協(xié)議捞蛋,如 JMS、MQTT 等
    分布式高可用的部署架構(gòu),滿足至少一次消息傳遞語義
    提供 docker 鏡像用于隔離測(cè)試和云集群部署
    提供配置柬姚、指標(biāo)和監(jiān)控等功能豐富的 Dashboard

  • consumer group:
    1拟杉、概念:消費(fèi)者分組,多個(gè)消費(fèi)者在一個(gè)消費(fèi)者分組中量承。
    2搬设、注意點(diǎn):一個(gè)consumer group中的機(jī)器相當(dāng)于一個(gè)集群,consumer group中只有一臺(tái)機(jī)器會(huì)接收到消息撕捍,并進(jìn)行消費(fèi)拿穴。每一個(gè)consumer group都會(huì)接收到消息。這樣子的設(shè)計(jì)要求消費(fèi)端需要保證冪等性忧风。

  • topic:
    1默色、概念:Topic 是一種消息的邏輯分類,比如說你有訂單類的消息阀蒂,也有庫存類的消息该窗,那么就需要進(jìn)行分類,一個(gè)是訂單 Topic 存放訂單相關(guān)的消息蚤霞,一個(gè)是庫存 Topic 存儲(chǔ)庫存相關(guān)的消息酗失。
    2、生產(chǎn)方發(fā)出的消息綁定某個(gè)topic昧绣,然后消費(fèi)方監(jiān)聽某個(gè)topic规肴,消費(fèi)方(各個(gè)group)接收到消息,進(jìn)行消費(fèi)
    3夜畴、topic應(yīng)用級(jí)別:整個(gè)應(yīng)用最好都使用一個(gè)topic拖刃,而更加細(xì)的區(qū)分,使用tags來區(qū)分贪绘。

  • tag:
    1兑牡、概念:標(biāo)簽,用于對(duì)消息分類税灌,在topic的基礎(chǔ)上進(jìn)行更細(xì)的劃分均函。

  • nameServer:
    1亿虽、概念:Name Server 為 producer 和 consumer 提供路由信息。類似rpc中的注冊(cè)中心苞也。當(dāng)producer需要發(fā)送消息首先去詢問nameServer需要請(qǐng)求哪個(gè)broker洛勉。而當(dāng)consumer需要拉取消息,也會(huì)先詢問nameServer需要請(qǐng)求哪個(gè)broker如迟。

  • broker:
    1收毫、概念:rocketMQ中負(fù)責(zé)接收生產(chǎn)者消息、給消費(fèi)者發(fā)送消息的組件殷勘。

  • Message:
    1此再、概念:Message 是消息的載體。一個(gè) Message 必須指定 topic玲销。Message 還有一個(gè)可選的 tag 設(shè)置引润,以便消費(fèi)端可以基于 tag 進(jìn)行過濾消息。也可以添加額外的鍵值對(duì)痒玩,例如你需要一個(gè)業(yè)務(wù) key 來查找 broker 上的消息淳附,方便在開發(fā)過程中診斷問題。

MQ生產(chǎn)者者實(shí)例:

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        //聲明并初始化一個(gè)producer
        //需要一個(gè)producer group名字作為構(gòu)造方法的參數(shù)蠢古,這里為producer1
        DefaultMQProducer producer = new DefaultMQProducer("producer1");
        
        //設(shè)置NameServer地址,此處應(yīng)改為實(shí)際NameServer地址奴曙,多個(gè)地址之間用;分隔
        //NameServer的地址必須有草讶,但是也可以通過環(huán)境變量的方式設(shè)置洽糟,不一定非得寫死在代碼里
        producer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");
        
        //調(diào)用start()方法啟動(dòng)一個(gè)producer實(shí)例
        producer.start();

        //發(fā)送10條消息到Topic為TopicTest,tag為TagA堕战,消息內(nèi)容為“Hello RocketMQ”拼接上i的值
        for (int i = 0; i < 10; i++) {
            try {
                Message msg = new Message("TopicTest",// topic
                        "TagA",// tag
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body
                );
                
                //調(diào)用producer的send()方法發(fā)送消息
                //這里調(diào)用的是同步的方式坤溃,所以會(huì)有返回結(jié)果
                SendResult sendResult = producer.send(msg);
                
                //打印返回結(jié)果,可以看到消息發(fā)送的狀態(tài)以及一些相關(guān)信息
                System.out.println(sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        //發(fā)送完消息之后嘱丢,調(diào)用shutdown()方法關(guān)閉producer
        producer.shutdown();
    }
}

MQ消費(fèi)者實(shí)例:

在開發(fā)過程中薪介,如果想測(cè)試生產(chǎn)者是否發(fā)出了mq,可以編寫一個(gè)消費(fèi)者進(jìn)行測(cè)試

@Test
public void testMqConsumer() throws Exception {
    String rocketmqAddress="10.113.41.2:9876;10.113.41.4:9876";

    int threadNum = 5;
    String topics = "WechatUnionCoreTemplateNotifyTopic";
    String instanceName = "TemplateComsumer";
    String groupName = "wechatUnionTemplateNotifyConsumer";
    DefaultMQPushConsumer consumer = null;

    consumer = new DefaultMQPushConsumer(groupName);
    consumer.setNamesrvAddr(rocketmqAddress);//MQ地址
    consumer.setClientCallbackExecutorThreads(threadNum);//消費(fèi)現(xiàn)場(chǎng)數(shù)量
    consumer.setInstanceName(instanceName);//實(shí)例名稱
    consumer.subscribe(topics, "*");


    //注冊(cè)監(jiān)聽
    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(
                List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
            for (int i = 0; i < msgs.size(); i++) {
                MessageExt msgExt =  msgs.get(i);
                String msgId = msgExt.getMsgId();
                Integer flag = msgExt.getFlag();
                TemplateNotifyItem templateNotifyItem = ProtoBufSerialize.fromProto(msgExt.getBody(), TemplateNotifyItem.class);
                logger.info("receive new Msg:    " + "  msgId=" + msgId + "   flag=" + flag + "  templateNotifyItem=" + templateNotifyItem);
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    logger.info("監(jiān)聽執(zhí)行中");



    Thread.sleep(1000000);
}

參考:
http://blog.csdn.net/manzhizhen/article/details/52606733
http://www.reibang.com/p/824066d70da8
架構(gòu)師之路-mq系列

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末越驻,一起剝皮案震驚了整個(gè)濱河市汁政,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌缀旁,老刑警劉巖记劈,帶你破解...
    沈念sama閱讀 219,366評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異并巍,居然都是意外死亡目木,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,521評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門懊渡,熙熙樓的掌柜王于貴愁眉苦臉地迎上來刽射,“玉大人怀跛,你說我怎么就攤上這事”澹” “怎么了?”我有些...
    開封第一講書人閱讀 165,689評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵忠蝗,是天一觀的道長(zhǎng)现横。 經(jīng)常有香客問我,道長(zhǎng)阁最,這世上最難降的妖魔是什么戒祠? 我笑而不...
    開封第一講書人閱讀 58,925評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮速种,結(jié)果婚禮上姜盈,老公的妹妹穿的比我還像新娘。我一直安慰自己配阵,他們只是感情好馏颂,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,942評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著棋傍,像睡著了一般救拉。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上瘫拣,一...
    開封第一講書人閱讀 51,727評(píng)論 1 305
  • 那天亿絮,我揣著相機(jī)與錄音,去河邊找鬼麸拄。 笑死派昧,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的拢切。 我是一名探鬼主播蒂萎,決...
    沈念sama閱讀 40,447評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼淮椰!你這毒婦竟也來了岖是?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,349評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤实苞,失蹤者是張志新(化名)和其女友劉穎豺撑,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體黔牵,經(jīng)...
    沈念sama閱讀 45,820評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡聪轿,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,990評(píng)論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了猾浦。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片陆错。...
    茶點(diǎn)故事閱讀 40,127評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡灯抛,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出音瓷,到底是詐尸還是另有隱情对嚼,我是刑警寧澤,帶...
    沈念sama閱讀 35,812評(píng)論 5 346
  • 正文 年R本政府宣布绳慎,位于F島的核電站纵竖,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏杏愤。R本人自食惡果不足惜靡砌,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,471評(píng)論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望珊楼。 院中可真熱鬧通殃,春花似錦、人聲如沸厕宗。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,017評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽已慢。三九已至骗炉,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間蛇受,已是汗流浹背句葵。 一陣腳步聲響...
    開封第一講書人閱讀 33,142評(píng)論 1 272
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留兢仰,地道東北人乍丈。 一個(gè)月前我還...
    沈念sama閱讀 48,388評(píng)論 3 373
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像把将,于是被迫代替她去往敵國(guó)和親轻专。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,066評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容