springboot2 整合rocketMQ4.3

2019.5.27日更新
目前springcloud alibaba的項(xiàng)目已經(jīng)快要更新到第一個(gè)relase版本了,如果想要整合rocketmq建議移步springcloud alibaba的github哪里有更加方便的操作芥颈,下面僅供參考

前言
前段時(shí)間更新技術(shù)架構(gòu)的時(shí)候妇垢,看到rocketmq出了4的版本,而且本身這個(gè)mq有事務(wù)消息,在分布式的場(chǎng)景中有很好的啟發(fā)性箩做,和作用燃辖,而且本身它也是阿里開(kāi)源到apache的一個(gè)項(xiàng)目俘陷,從出身還是實(shí)力來(lái)說(shuō)都很不錯(cuò)倒得,所以今天就抱著支持國(guó)產(chǎn)的心態(tài)開(kāi)看一看這個(gè)mq

1.項(xiàng)目代碼
maven

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.3.0</version>
 </dependency>

producer
單從分類producer的官網(wǎng)doc來(lái)看主要分成3種,
DefaultMQProducer
TransactionMQProducer
messagingAccessPoint.createProducer()
本文主要說(shuō)的是秉馏,DefaultMQProducer琴昆,TransactionMQProducer
DefaultMQProducer
默認(rèn)的producer,從官方的文檔來(lái)看复颈,前四個(gè)都是對(duì)這個(gè)producer的運(yùn)用只是set的值不同而已衅谷,而且是很細(xì)微的變化而已


1B48.tmp.png

那么從最簡(jiǎn)單的開(kāi)始
application.yml文件

rocketmq: 
  # 生產(chǎn)者配置
  producer: 
    groupName: ${spring.application.name}
    namesrvAddr: 192.168.40.133:9876
    default: false

yml文件配置讀取類

@Getter
@Setter
@ConfigurationProperties(prefix = "rocketmq.producer")
@Configuration
@ToString
public class ProducerConfig {
    private String namesrvAddr;
    
    private String groupName;
}

producer類的創(chuàng)建類渗蟹,需要注意的是這個(gè)producer一個(gè)程序里面只能出現(xiàn)一個(gè)膘怕,當(dāng)重復(fù)創(chuàng)建時(shí)就會(huì)報(bào)錯(cuò)

@Log4j2
@Configuration
public class ProducerConfigure {

    @Autowired
    private ProducerConfig producerConfigure;

    /**
     * 創(chuàng)建普通消息發(fā)送者實(shí)例
     * 
     * @return
     * @throws MQClientException
     */
    @Bean
    @ConditionalOnProperty(prefix = "rocketmq.producer", value = "default", havingValue = "true")
    public DefaultMQProducer defaultProducer() throws MQClientException {
        log.info(producerConfigure.toString());
        log.info("defaultProducer 正在創(chuàng)建---------------------------------------");
        DefaultMQProducer producer = new DefaultMQProducer(producerConfigure.getGroupName());
        producer.setNamesrvAddr(producerConfigure.getNamesrvAddr());
        producer.setVipChannelEnabled(false);
        producer.setRetryTimesWhenSendAsyncFailed(10);
        producer.start();
        log.info("rocketmq producer server開(kāi)啟成功---------------------------------.");
        return producer;
    }
}

當(dāng)producer創(chuàng)建完畢之后就是consumer的公用設(shè)置
首先也是yml和配置類的定義

rocketmq: 
  # 消費(fèi)者配置
  consumer: 
    groupName: ${spring.application.name}
    namesrvAddr: 192.168.40.133:9876
@Getter
@Setter
@ConfigurationProperties(prefix = "rocketmq.consumer")
@Configuration
@ToString
public class ConsumerConfig {
    private String groupName;
    
    private String namesrvAddr;
}
@Configuration
@Log4j2
public abstract class DefaultConsumerConfigure {

    @Autowired
    private ConsumerConfig consumerConfig;

    // 開(kāi)啟消費(fèi)者監(jiān)聽(tīng)服務(wù)
    public void listener(String topic, String tag) throws MQClientException {
        log.info("開(kāi)啟" + topic + ":" + tag + "消費(fèi)者-------------------");
        log.info(consumerConfig.toString());

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerConfig.getGroupName());

        consumer.setNamesrvAddr(consumerConfig.getNamesrvAddr());

        consumer.subscribe(topic, tag);

        // 開(kāi)啟內(nèi)部類實(shí)現(xiàn)監(jiān)聽(tīng)
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                return DefaultConsumerConfigure.this.dealBody(msgs);
            }
        });

        consumer.start();

        log.info("rocketmq啟動(dòng)成功---------------------------------------");

    }
    
    // 處理body的業(yè)務(wù)
    public abstract ConsumeConcurrentlyStatus dealBody(List<MessageExt> msgs);

}

值得注意的是争群,這里DefaultConsumerConfigure沒(méi)有定義在什么時(shí)候運(yùn)行轻要,還有對(duì)body的操作也抽象出來(lái)了幸冻,提供給實(shí)現(xiàn)類做處理碑定,方便業(yè)務(wù)抽取

@Log4j2
@Configuration
public class TestConsumer extends DefaultConsumerConfigure implements ApplicationListener<ContextRefreshedEvent>{

    @Override
    public void onApplicationEvent(ContextRefreshedEvent arg0) {
        try {
            super.listener("t_TopicTest", "Tag1");
        } catch (MQClientException e) {
            log.error("消費(fèi)者監(jiān)聽(tīng)器啟動(dòng)失敗", e);
        }
        
    }

    @Override
    public ConsumeConcurrentlyStatus dealBody(List<MessageExt> msgs)  {
        int num = 1;
        log.info("進(jìn)入");
        for(MessageExt msg : msgs) {
            log.info("第" + num + "次消息");
            try {
                String msgStr = new String(msg.getBody(), "utf-8");
                log.info(msgStr);
            } catch (UnsupportedEncodingException e) {
                log.error("body轉(zhuǎn)字符串解析失敗");
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

那么這些寫(xiě)完之后基本就做測(cè)試類的編寫(xiě)了
這個(gè)地實(shí)現(xiàn)了ApplicationListener,讓他在啟動(dòng)的時(shí)候就開(kāi)始執(zhí)行這個(gè)consumer,相信有些同學(xué)會(huì)喜歡用@PostConstruct勘究,但是不要這么做,因?yàn)樗麜?huì)在init之前執(zhí)行,那么有些類會(huì)加載不完全虏等,會(huì)導(dǎo)致無(wú)法開(kāi)機(jī)啟動(dòng)的

然后再controller里面引入producer澄干,然后直接調(diào)用即可

@RestController
@RequestMapping("/test")
@Log4j2
public class TestController {

    @Autowired
    private DefaultMQProducer defaultMQProducer;
    
//  @Autowired
//  private TransactionMQProducer producer;
    
    @Autowired
    private TestTransactionListener testTransactionListener;

    @GetMapping("/test")
    public void test(String info) throws Exception {
        Message message = new Message("TopicTest", "Tag1", "12345", "rocketmq測(cè)試成功".getBytes());
                // 這里用到了這個(gè)mq的異步處理,類似ajax拂檩,可以得到發(fā)送到mq的情況加矛,并做相應(yīng)的處理
               //不過(guò)要注意的是這個(gè)是異步的
        producer.send(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("傳輸成功");
                log.info(GsonUtil.GSON.toJson(sendResult));
            }
            @Override
            public void onException(Throwable e) {
                log.error("傳輸失敗", e);
            }
        });
    }
}

跑一跑妓羊,就可以看到結(jié)果了

說(shuō)完了DefaultMQProducer那么就來(lái)說(shuō)transactionMQProducer
同樣參考官網(wǎng)的例子來(lái)做整合
在原來(lái)的ProducerConfigure類的基礎(chǔ)上加上即可

需要注意的是ConditionalOnProperty這個(gè)必須得有星持,而且配置文件中
transaction和default中只能有一個(gè)是true捡鱼,不然就會(huì)同時(shí)創(chuàng)建兩個(gè)producer管引,那么就會(huì)報(bào)錯(cuò)

    /**
     * 創(chuàng)建事務(wù)消息發(fā)送者實(shí)例
     * 
     * @return
     * @throws MQClientException
     */
    @Bean
    @ConditionalOnProperty(prefix = "rocketmq.producer", value = "transaction", havingValue = "true")
    public TransactionMQProducer transactionMQProducer() throws MQClientException {
        log.info(producerConfigure.toString());
        log.info("TransactionMQProducer 正在創(chuàng)建---------------------------------------");
        TransactionMQProducer producer = new TransactionMQProducer(producerConfigure.getGroupName());

        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r);
                        thread.setName("client-transaction-msg-check-thread");
                        return thread;
                    }
                });
        producer.setNamesrvAddr(producerConfigure.getNamesrvAddr());
        producer.setExecutorService(executorService);
        producer.start();
        log.info("TransactionMQProducer server開(kāi)啟成功---------------------------------.");
        return producer;
    }

因?yàn)閠ransaction的流程下,rocketmq會(huì)先發(fā)送一個(gè)consumer不可見(jiàn)的消息,然后在調(diào)用
TransactionListener這個(gè)接口中的executeLocalTransaction,中的方法執(zhí)行事務(wù),然后方法內(nèi)部需要返回
一個(gè)LocalTransactionState的枚舉信息恶导,分別為

public enum LocalTransactionState {
    COMMIT_MESSAGE, // 提交
    ROLLBACK_MESSAGE, // 回滾
    UNKNOW, // 未知
}

相應(yīng)的當(dāng)我們返回的是COMMIT_MESSAGE時(shí),那么producer會(huì)把消息提交到mq上,
如果是ROLLBACK_MESSAGE那么producer就會(huì)結(jié)束闸准,并且不提交到mq,

public interface TransactionListener {
    /**
     * When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
     *
     * @param msg Half(prepare) message
     * @param arg Custom business parameter
     * @return Transaction state
     */
    LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);

    /**
     * When no response to prepare(half) message. broker will send check message to check the transaction status, and this
     * method will be invoked to get local transaction status.
     *
     * @param msg Check message
     * @return Transaction state
     */
    LocalTransactionState checkLocalTransaction(final MessageExt msg);
}

需要注意的是checkLocalTransaction是用作mq長(zhǎng)時(shí)間沒(méi)有收到producer的executeLocalTransaction響應(yīng)的時(shí)候調(diào)用的,這個(gè)類在3.0之后的版本就被閹割了察迟,只有接口秕岛,卻沒(méi)有實(shí)現(xiàn)改鲫,那么直接寫(xiě)一個(gè)空實(shí)現(xiàn)即可,在我這邊的代碼上,我做了一個(gè)抽象壳嚎,把需要實(shí)現(xiàn)的executeLocalTransaction抽象出來(lái)

@Configuration
public abstract class AbstractTransactionListener implements TransactionListener {

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        return LocalTransactionState.COMMIT_MESSAGE;
    }

}

這個(gè)是executeLocalTransaction的實(shí)現(xiàn)類寡润,簡(jiǎn)單的做了些業(yè)務(wù)瞬沦,然后返回了一個(gè)commit

@Configuration
@Log4j2
public class TestTransactionListener extends AbstractTransactionListener {

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                log.info(new String(msg.getBody()));
        return LocalTransactionState.COMMIT_MESSAGE;
    }

}

consumer是沒(méi)有變化的边坤,基本相同区拳,那么就直接貼上controller的測(cè)試代碼

    
    @GetMapping("t_test")
    public void Ttest(String info) throws Exception {
        Message message = new Message("t_TopicTest", "Tag1", "12345", "rocketmq測(cè)試成功".getBytes());
        producer.setTransactionListener(testTransactionListener);
        producer.setSendMsgTimeout(10000);
        producer.sendMessageInTransaction(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("傳輸成功");
                log.info(GsonUtil.GSON.toJson(sendResult));
            }
            @Override
            public void onException(Throwable e) {
                log.error("傳輸失敗", e);
            }
        });
    }

跑一跑即可看到結(jié)果參數(shù)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末放祟,一起剝皮案震驚了整個(gè)濱河市纽疟,隨后出現(xiàn)的幾起案子晦款,更是在濱河造成了極大的恐慌,老刑警劉巖锈死,帶你破解...
    沈念sama閱讀 217,406評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蛤袒,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡嫡锌,警方通過(guò)查閱死者的電腦和手機(jī)及皂,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,732評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門弧满,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人地来,你說(shuō)我怎么就攤上這事芽突∩反希” “怎么了静陈?”我有些...
    開(kāi)封第一講書(shū)人閱讀 163,711評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵牵敷,是天一觀的道長(zhǎng)著觉。 經(jīng)常有香客問(wèn)我典徘,道長(zhǎng)嗤栓,這世上最難降的妖魔是什么全封? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,380評(píng)論 1 293
  • 正文 為了忘掉前任妒御,我火速辦了婚禮边灭,結(jié)果婚禮上券册,老公的妹妹穿的比我還像新娘膳殷。我一直安慰自己九火,他們只是感情好勒极,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,432評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布江兢。 她就那樣靜靜地躺著,像睡著了一般。 火紅的嫁衣襯著肌膚如雪褐啡。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,301評(píng)論 1 301
  • 那天,我揣著相機(jī)與錄音残家,去河邊找鬼。 笑死茴晋,一個(gè)胖子當(dāng)著我的面吹牛回窘,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播烁涌,決...
    沈念sama閱讀 40,145評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼酒觅,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了抒钱?” 一聲冷哼從身側(cè)響起颜凯,我...
    開(kāi)封第一講書(shū)人閱讀 39,008評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎蕾额,沒(méi)想到半個(gè)月后彼城,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體逼友,經(jīng)...
    沈念sama閱讀 45,443評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡秤涩,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,649評(píng)論 3 334
  • 正文 我和宋清朗相戀三年筐眷,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片匀谣。...
    茶點(diǎn)故事閱讀 39,795評(píng)論 1 347
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡武翎,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出宝恶,到底是詐尸還是另有隱情,我是刑警寧澤霹疫,帶...
    沈念sama閱讀 35,501評(píng)論 5 345
  • 正文 年R本政府宣布综芥,位于F島的核電站,受9級(jí)特大地震影響屠阻,放射性物質(zhì)發(fā)生泄漏额各。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,119評(píng)論 3 328
  • 文/蒙蒙 一蛉加、第九天 我趴在偏房一處隱蔽的房頂上張望缸逃。 院中可真熱鬧厂抽,春花似錦、人聲如沸筷凤。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,731評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至惠啄,卻和暖如春任内,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背死嗦。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,865評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工越除, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人摘盆。 一個(gè)月前我還...
    沈念sama閱讀 47,899評(píng)論 2 370
  • 正文 我出身青樓骡澈,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親肋殴。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,724評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容