MQ應(yīng)用

消息發(fā)送方式

同步發(fā)送消息

同步發(fā)送消息是指赔嚎,Producer發(fā)送一條消息后旭贬,會(huì)在收到MQ返回的ack后才發(fā)送下一條消息凯傲,該方式的消息可靠性最高,但是消息發(fā)送效率太低


同步發(fā)送消息.png
public class SyncProducer {
    public static void main(String[] args) throws Exception{
        //創(chuàng)建一個(gè)producer, 參數(shù)為producer group
        DefaultMQProducer producer = new DefaultMQProducer("pg");
        // 指定namesever地址
        producer.setNamesrvAddr("127.0.0.1:9876");

        // 設(shè)置當(dāng)發(fā)送失敗時(shí)重試發(fā)送的次數(shù)磕诊,默認(rèn)兩次
        producer.setRetryTimesWhenSendFailed(3);

        // 設(shè)置發(fā)送超時(shí)時(shí)間
        producer.setSendMsgTimeout(5000);
        // 開(kāi)啟生產(chǎn)者
        producer.start();

        // 發(fā)送消息
        for(int i =0;i<10;i++){
            byte[] body = ("Hi," + i).getBytes();
            Message msg = new Message("topic","tag",body);
            SendResult sendResult = producer.send(msg);
            System.out.println(sendResult);
        }
        // 關(guān)閉生產(chǎn)者
        producer.shutdown();
    }
}

異步發(fā)送消息

異步發(fā)送消息是指削彬,Producer發(fā)出消息后無(wú)需等待MQ返回ack,直接發(fā)送下一條消息,該方式的消息可靠性可以得到保障秀仲,消息發(fā)送效率也可以


異步發(fā)送消息.png
public class AsyncProducer {
    public static void main(String[] args) throws Exception{
        //創(chuàng)建一個(gè)producer, 參數(shù)為producer group
        DefaultMQProducer producer = new DefaultMQProducer("pg");
        // 指定namesever地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 設(shè)置發(fā)送超時(shí)時(shí)間
        producer.setSendMsgTimeout(5000);
        // 開(kāi)啟生產(chǎn)者
        producer.start();

        for(int i=0;i<10;i++){
            byte[] body = ("Hi," + i).getBytes();
            try{
                Message msg = new Message("async-topic","async-tag",body);
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.println(sendResult);
                    }

                    @Override
                    public void onException(Throwable throwable) {
                        throwable.printStackTrace();
                    }
                });
            }catch (Exception e){
                e.printStackTrace();
            }
        }
        // 因?yàn)槭钱惒降娜谕矗孕枰骶€程休眠一會(huì)等待異步任務(wù)
        TimeUnit.SECONDS.sleep(3);
        producer.shutdown();
    }
}

單向發(fā)送消息

單向發(fā)送消息是指,Producer僅發(fā)負(fù)責(zé)發(fā)送消息神僵,不等待雁刷,不處理MQ的ack,該發(fā)送方式MQ也不返回ack,該方式消息發(fā)送效率最高,但是消息可靠性差保礼。


單向發(fā)送.png
public class OnewayProducer {
    public static void main(String[] args) throws Exception{
        //創(chuàng)建一個(gè)producer, 參數(shù)為producer group
        DefaultMQProducer producer = new DefaultMQProducer("pg");
        // 指定namesever地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 設(shè)置發(fā)送超時(shí)時(shí)間
        producer.setSendMsgTimeout(5000);
        // 開(kāi)啟生產(chǎn)者
        producer.start();

        // 發(fā)送消息
        for(int i =0;i<10;i++){
            byte[] body = ("Hi," + i).getBytes();
            Message msg = new Message("oneway-topic","oneway-tag",body);
            // 方法沒(méi)有返回值
            producer.sendOneway(msg);
        }
        // 關(guān)閉生產(chǎn)者
        producer.shutdown();
    }
}

消息消費(fèi)

public class SomeConsumer {
    public static void main(String[] args) throws Exception{
        // 定義一個(gè)push消費(fèi)者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("someTopic","*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            // 一但broker中有了其訂閱的消息就會(huì)觸發(fā)該方法的執(zhí)行
            // 方法返回值為當(dāng)前consumer消費(fèi)的狀態(tài)
            // 這里雖然為一個(gè)列表沛励,但是每次默認(rèn)只能消費(fèi)一條消息,通過(guò) consumer.getConsumeMessageBatchMaxSize();可以得到默認(rèn)值炮障,也可以改成批量消費(fèi)
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                // 逐條消費(fèi)消息
                for(MessageExt msg:list){
                    System.out.println(msg);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }

        });
        consumer.start();
    }
}
Consumer的pullBatchSize屬性與consumeMessageBatchMaxSize屬性是否設(shè)置越大越好目派,當(dāng)然不是
- pullBatchSize值設(shè)置的越大,Consumer每拉取一次需要的時(shí)間就會(huì)越長(zhǎng)胁赢,且在網(wǎng)絡(luò)上傳輸問(wèn)題的可能性就越高企蹭,若在拉取過(guò)程中出現(xiàn)問(wèn)題,那么本批次所有的消息都需要全部重新拉取智末。
- consumerMessageBatchMaxSize值設(shè)置的越大谅摄,Consumer的消息并發(fā)消費(fèi)能力越低,且這批被消費(fèi)的消息具有相同的消費(fèi)結(jié)果系馆,因?yàn)閏onsumerMessageBatchSize指定的一批消息只會(huì)使用一個(gè)線程進(jìn)行處理送漠,且在處理過(guò)程中只要有一個(gè)消息處理異常,則這批消息需要全部重新再次消費(fèi)處理由蘑。

有序性分類

根據(jù)有序范圍的不同闽寡,Rocketmq可以嚴(yán)格的保證消息的有序性:分區(qū)有序性與全局有序性。

  • 當(dāng)發(fā)送和消費(fèi)參與的queue只有一個(gè)時(shí)所保證的有序性是整個(gè)Topic中的消息順序尼酿,稱為全局有序爷狈。


    image.png
  • 如果有多個(gè)queue參與,其僅可保證在該queue分區(qū)隊(duì)列上的消息順序谓媒,則稱為分區(qū)有序淆院。


    image.png
public class SyncProducer {
    public static void main(String[] args) throws Exception{
        //創(chuàng)建一個(gè)producer, 參數(shù)為producer group
        DefaultMQProducer producer = new DefaultMQProducer("pg");
        // 指定namesever地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 開(kāi)啟生產(chǎn)者
        producer.start();

        // 發(fā)送消息
        for(int i =0;i<10;i++){
            Integer orderId = i;
            byte[] body = ("Hi," + i).getBytes();
            Message msg = new Message("topic","tag",body);
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                    Integer id = (Integer) o;
                    int index = id % list.size();
                    return list.get(index);
                }
            },orderId);
            System.out.println(sendResult);
        }
        // 關(guān)閉生產(chǎn)者
        producer.shutdown();
    }
}

延遲消息

當(dāng)消息寫(xiě)入到Broker后,在指定的時(shí)長(zhǎng)后才可以被消費(fèi)處理,稱為延遲消息
采用rocketmq的延遲消息可以實(shí)現(xiàn)定時(shí)任務(wù)的功能土辩,而不用使用定時(shí)器支救,典型的應(yīng)用場(chǎng)景是,電商交易中超時(shí)未支付關(guān)閉訂單的場(chǎng)景


image.png

事務(wù)消息

代碼舉例

public class TransactionProducer {

    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });

        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                        new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}
public class TransactionListenerImpl implements TransactionListener {

    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();


    /**
     * 回調(diào)方法
     * @param msg
     * @param arg
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }

    /**
     * 消息回查方法:
     * 1拷淘、回調(diào)操作返回UNKNOW
     * 2各墨、TC沒(méi)有收到TM的最終全局事務(wù)確認(rèn)指令
     * @param msg
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

事務(wù)消息場(chǎng)景舉例

工行用戶A向建行用戶B轉(zhuǎn)賬1萬(wàn)元


image.png

問(wèn)題點(diǎn):這里 1,2,3 沒(méi)有實(shí)現(xiàn)原子性,那么A賬號(hào)沒(méi)有扣款成功启涯,但是消息已經(jīng)發(fā)送成功了贬堵,這時(shí)候就會(huì)導(dǎo)致B的賬號(hào)增加了1萬(wàn)元,就會(huì)出現(xiàn)問(wèn)題结洼,這時(shí)候就需要事務(wù)消息來(lái)解決這個(gè)問(wèn)題黎做。


image.png

該分布式事務(wù)的解決方案是依賴于XA模式的,上圖中的第三步與TC向Broker發(fā)送預(yù)提交消息松忍,這里的預(yù)提交消息(半事務(wù)消息)就是消費(fèi)者還不能消費(fèi)的消息蒸殿。當(dāng)執(zhí)行到圖中第9步驟的時(shí)候,才會(huì)真正的寫(xiě)入消息到Broker中鸣峭,簡(jiǎn)單理解TC就是管理各個(gè)分支事務(wù)的狀態(tài)宏所,這里可以看到工行系統(tǒng)熄浓,Broker系統(tǒng)是兩個(gè)分支事務(wù)离福。TM是事務(wù)管理者,一般由Producer擔(dān)任贱纠。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末莫换,一起剝皮案震驚了整個(gè)濱河市霞玄,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌浓镜,老刑警劉巖溃列,帶你破解...
    沈念sama閱讀 218,204評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件劲厌,死亡現(xiàn)場(chǎng)離奇詭異膛薛,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)补鼻,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,091評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門哄啄,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人风范,你說(shuō)我怎么就攤上這事咨跌。” “怎么了硼婿?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,548評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵锌半,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我寇漫,道長(zhǎng)刊殉,這世上最難降的妖魔是什么殉摔? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,657評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮记焊,結(jié)果婚禮上逸月,老公的妹妹穿的比我還像新娘。我一直安慰自己遍膜,他們只是感情好碗硬,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,689評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著瓢颅,像睡著了一般恩尾。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上挽懦,一...
    開(kāi)封第一講書(shū)人閱讀 51,554評(píng)論 1 305
  • 那天特笋,我揣著相機(jī)與錄音,去河邊找鬼巾兆。 笑死猎物,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的角塑。 我是一名探鬼主播蔫磨,決...
    沈念sama閱讀 40,302評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼圃伶!你這毒婦竟也來(lái)了堤如?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,216評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤窒朋,失蹤者是張志新(化名)和其女友劉穎搀罢,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體侥猩,經(jīng)...
    沈念sama閱讀 45,661評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡榔至,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,851評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了欺劳。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片唧取。...
    茶點(diǎn)故事閱讀 39,977評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖划提,靈堂內(nèi)的尸體忽然破棺而出枫弟,到底是詐尸還是另有隱情,我是刑警寧澤鹏往,帶...
    沈念sama閱讀 35,697評(píng)論 5 347
  • 正文 年R本政府宣布淡诗,位于F島的核電站,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏韩容。R本人自食惡果不足惜绪爸,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,306評(píng)論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望宙攻。 院中可真熱鬧奠货,春花似錦、人聲如沸座掘。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,898評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)溢陪。三九已至萍虽,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間形真,已是汗流浹背杉编。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,019評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留咆霜,地道東北人邓馒。 一個(gè)月前我還...
    沈念sama閱讀 48,138評(píng)論 3 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像蛾坯,于是被迫代替她去往敵國(guó)和親光酣。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,927評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • 一. 認(rèn)識(shí)消息隊(duì)列 1. 隊(duì)列 隊(duì)列(queue)是只允許在一端進(jìn)行插入操作脉课,而在另一端進(jìn)行刪除操作的線性表(數(shù)據(jù)...
    Serializable_dx閱讀 1,798評(píng)論 0 1
  • 說(shuō)明 主要內(nèi)容是在網(wǎng)上的一些文章中整理出來(lái)救军; 加粗的字體是比較重要的內(nèi)容,部分是自己的經(jīng)驗(yàn)和理解倘零; 整理的目的主要...
    猴子頂呱呱閱讀 1,596評(píng)論 0 52
  • 消息隊(duì)列有什么優(yōu)點(diǎn)和缺點(diǎn)? 為什么使用消息隊(duì)列?假設(shè)你的業(yè)務(wù)場(chǎng)景遇到個(gè)技術(shù)挑戰(zhàn)唱遭,如果不用 MQ 可能會(huì)很麻煩,但是...
    java高并發(fā)閱讀 700評(píng)論 0 0
  • 一呈驶、 關(guān)鍵特性 1 消息發(fā)送和消費(fèi) 1)消息發(fā)送者步驟分析: 創(chuàng)建消息生產(chǎn)者producer拷泽,并制定生產(chǎn)者組名 指...
    TiaNa_na閱讀 1,993評(píng)論 0 2
  • 各種MQ的比較 MQ有rabbitMq,rocketMq,kafka,activeMq MQ有什么優(yōu)缺點(diǎn) 導(dǎo)致系統(tǒng)...
    80fd1d54878f閱讀 148評(píng)論 0 0