RocketMQ與Springboot封裝

消息隊(duì)列中間件是分布式系統(tǒng)中重要的組件歇僧,主要解決應(yīng)用解耦图张,異步消息,日志記錄,流量削鋒祸轮、分布式事務(wù)等問(wèn)題兽埃,實(shí)現(xiàn)高性能,高可用适袜,可伸縮和最終一致性架構(gòu)柄错。

zebra架構(gòu)選用RocketMQ作為消息隊(duì)列組件,下面介紹下RocketMQ如何與Springboot進(jìn)行組合封裝苦酱。

1鄙陡、引入依賴包

image

2、設(shè)置配置項(xiàng)信息

namesrvAddr地址

zebra.rocketmq.namesrvAddr=0.0.0.0:9876

生產(chǎn)者group名稱

zebra.rocketmq.producerGroupName=producerGroupName

事務(wù)生產(chǎn)者group名稱

zebra.rocketmq.transactionProducerGroupName=transactionProducerGroupName

消費(fèi)者group名稱

zebra.rocketmq.consumerGroupName=consumerGroupName

生產(chǎn)者實(shí)例名稱

zebra.rocketmq.producerInstanceName=producerInstanceName

消費(fèi)者實(shí)例名稱

zebra.rocketmq.consumerInstanceName=consumerInstanceName

事務(wù)生產(chǎn)者實(shí)例名稱

zebra.rocketmq.producerTranInstanceName=producerTranInstanceName

一次最大消費(fèi)多少數(shù)量消息

zebra.rocketmq.consumerBatchMaxSize=1

廣播消費(fèi)

zebra.rocketmq.consumerBroadcasting=false

消費(fèi)的topic:tag

zebra.rocketmq.subscribe[0]=TopicTest1:TagA

啟動(dòng)的時(shí)候是否消費(fèi)歷史記錄

zebra.rocketmq.enableHisConsumer=false

啟動(dòng)順序消費(fèi)

zebra.rocketmq.enableOrderConsumer=false

3躏啰、編寫(xiě)配置類


@ConfigurationProperties(RocketmqProperties.PREFIX)
public class RocketmqProperties {
   public static final String PREFIX = "zebra.rocketmq";
   private String namesrvAddr;
   private String producerGroupName; 
   private String transactionProducerGroupName; 
   private String consumerGroupName; 
   private String producerInstanceName;
   private String consumerInstanceName;
   private String producerTranInstanceName;
   private int consumerBatchMaxSize;
   private boolean consumerBroadcasting;
   private boolean enableHisConsumer;
   private boolean enableOrderConsumer;
   private List subscribe = new ArrayList<>();
}

4趁矾、編寫(xiě)producer和consumer初始化類

@Configuration
@EnableConfigurationProperties(RocketmqProperties.class)
@ConditionalOnProperty(prefix = RocketmqProperties.PREFIX, value = "namesrvAddr")
public class RocketmqAutoConfiguration {
   private static final Logger log = LogManager.getLogger(RocketmqAutoConfiguration.class);
   @Autowired
   private RocketmqProperties properties;
   @Autowired
   private ApplicationEventPublisher publisher;

   private static boolean isFirstSub = true;

   private static long startTime = System.currentTimeMillis();

   /**
    * 初始化向rocketmq發(fā)送普通消息的生產(chǎn)者
    */
   @Bean
   @ConditionalOnProperty(prefix = RocketmqProperties.PREFIX, value = "producerInstanceName")
   @ConditionalOnBean(EtcdClient.class)
   public DefaultMQProducer defaultProducer() throws MQClientException {
       /**
        * 一個(gè)應(yīng)用創(chuàng)建一個(gè)Producer,由應(yīng)用來(lái)維護(hù)此對(duì)象给僵,可以設(shè)置為全局對(duì)象或者單例<br>
        * 注意:ProducerGroupName需要由應(yīng)用來(lái)保證唯一<br>
        * ProducerGroup這個(gè)概念發(fā)送普通的消息時(shí)毫捣,作用不大,但是發(fā)送分布式事務(wù)消息時(shí)帝际,比較關(guān)鍵蔓同,
        * 因?yàn)榉?wù)器會(huì)回查這個(gè)Group下的任意一個(gè)Producer
        */
       DefaultMQProducer producer = new DefaultMQProducer(properties.getProducerGroupName());
       producer.setNamesrvAddr(properties.getNamesrvAddr());
       producer.setInstanceName(properties.getProducerInstanceName());
       producer.setVipChannelEnabled(false);
       producer.setRetryTimesWhenSendAsyncFailed(10);

       /**
        * Producer對(duì)象在使用之前必須要調(diào)用start初始化,初始化一次即可<br>
        * 注意:切記不可以在每次發(fā)送消息時(shí)蹲诀,都調(diào)用start方法
        */
       producer.start();
       log.info("RocketMq defaultProducer Started.");
       return producer;
   }

   /**
    * 初始化向rocketmq發(fā)送事務(wù)消息的生產(chǎn)者
    */
   @Bean
   @ConditionalOnProperty(prefix = RocketmqProperties.PREFIX, value = "producerTranInstanceName")
   @ConditionalOnBean(EtcdClient.class)
   public TransactionMQProducer transactionProducer() throws MQClientException {
       /**
        * 一個(gè)應(yīng)用創(chuàng)建一個(gè)Producer斑粱,由應(yīng)用來(lái)維護(hù)此對(duì)象,可以設(shè)置為全局對(duì)象或者單例<br>
        * 注意:ProducerGroupName需要由應(yīng)用來(lái)保證唯一<br>
        * ProducerGroup這個(gè)概念發(fā)送普通的消息時(shí)脯爪,作用不大则北,但是發(fā)送分布式事務(wù)消息時(shí),比較關(guān)鍵痕慢,
        * 因?yàn)榉?wù)器會(huì)回查這個(gè)Group下的任意一個(gè)Producer
        */
       TransactionMQProducer producer = new TransactionMQProducer(properties.getTransactionProducerGroupName());
       producer.setNamesrvAddr(properties.getNamesrvAddr());
       producer.setInstanceName(properties.getProducerTranInstanceName());
       producer.setRetryTimesWhenSendAsyncFailed(10);

       // 事務(wù)回查最小并發(fā)數(shù)
       producer.setCheckThreadPoolMinSize(2);
       // 事務(wù)回查最大并發(fā)數(shù)
       producer.setCheckThreadPoolMaxSize(2);
       // 隊(duì)列數(shù)
       producer.setCheckRequestHoldMax(2000);

       // TODO 由于社區(qū)版本的服務(wù)器閹割調(diào)了消息回查的功能尚揣,所以這個(gè)地方?jīng)]有意義
       // TransactionCheckListener transactionCheckListener = new
       // TransactionCheckListenerImpl();
       // producer.setTransactionCheckListener(transactionCheckListener);

       /**
        * Producer對(duì)象在使用之前必須要調(diào)用start初始化,初始化一次即可<br>
        * 注意:切記不可以在每次發(fā)送消息時(shí)掖举,都調(diào)用start方法
        */
       producer.start();

       log.info("RocketMq TransactionMQProducer Started.");
       return producer;
   }

   /**
    * 初始化rocketmq消息監(jiān)聽(tīng)方式的消費(fèi)者
    */
   @Bean
   @ConditionalOnProperty(prefix = RocketmqProperties.PREFIX, value = "consumerInstanceName")
   @ConditionalOnBean(EtcdClient.class)
   public DefaultMQPushConsumer pushConsumer() throws MQClientException {
       DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(properties.getConsumerGroupName());
       consumer.setNamesrvAddr(properties.getNamesrvAddr());
       consumer.setInstanceName(properties.getConsumerInstanceName());
       if (properties.isConsumerBroadcasting()) {
           consumer.setMessageModel(MessageModel.BROADCASTING);
       }
       consumer.setConsumeMessageBatchMaxSize(
               properties.getConsumerBatchMaxSize() == 0 ? 1 : properties.getConsumerBatchMaxSize());// 設(shè)置批量消費(fèi)快骗,以提升消費(fèi)吞吐量,默認(rèn)是1
       /**
        * 訂閱指定topic下tags
        */
       List<String> subscribeList = properties.getSubscribe();
       for (String sunscribe : subscribeList) {
           consumer.subscribe(sunscribe.split(":")[0], sunscribe.split(":")[1]);
       }
       if (properties.isEnableOrderConsumer()) {
           consumer.registerMessageListener((List<MessageExt> msgs, ConsumeOrderlyContext context) -> {
               try {
                   context.setAutoCommit(true);
                   msgs =filter(msgs);
                   if(msgs.size()==0) return ConsumeOrderlyStatus.SUCCESS;
                   this.publisher.publishEvent(new RocketmqEvent(msgs, consumer));
               } catch (Exception e) {
                   e.printStackTrace();
                   return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
               }
               // 如果沒(méi)有return success塔次,consumer會(huì)重復(fù)消費(fèi)此信息方篮,直到success。
               return ConsumeOrderlyStatus.SUCCESS;
           });
       } else {
           consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> {
               try {
                   msgs=filter(msgs);
                   if(msgs.size()==0) return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                   this.publisher.publishEvent(new RocketmqEvent(msgs, consumer));
               } catch (Exception e) {
                   e.printStackTrace();
                   return ConsumeConcurrentlyStatus.RECONSUME_LATER;  
               }
               // 如果沒(méi)有return success励负,consumer會(huì)重復(fù)消費(fèi)此信息藕溅,直到success。
               return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
           });
       }
       new Thread(new Runnable() {
           @Override
           public void run() {
               try {
                   Thread.sleep(5000);// 延遲5秒再啟動(dòng)熄守,主要是等待spring事件監(jiān)聽(tīng)相關(guān)程序初始化完成蜈垮,否則,回出現(xiàn)對(duì)RocketMQ的消息進(jìn)行消費(fèi)后立即發(fā)布消息到達(dá)的事件裕照,然而此事件的監(jiān)聽(tīng)程序還未初始化攒发,從而造成消息的丟失
                   /**
                    * Consumer對(duì)象在使用之前必須要調(diào)用start初始化,初始化一次即可<br>
                    */
                   try {
                       consumer.start();
                   } catch (Exception e) {
                       log.info("RocketMq pushConsumer Start failure!!!.");
                       log.error(e.getMessage(), e);
                   }
                   log.info("RocketMq pushConsumer Started.");
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }

       }).start();

       return consumer;
   }
   
   private List<MessageExt> filter(List<MessageExt> msgs){
       if(isFirstSub&&!properties.isEnableHisConsumer()){
           msgs =msgs.stream().filter(item ->startTime - item.getBornTimestamp() < 0).collect(Collectors.toList());
       }
       if(isFirstSub && msgs.size()>0){
           isFirstSub = false;
       }
       return msgs;
   }

4晋南、編寫(xiě)Event惠猿,方便Consumer使用

public class RocketmqEvent extends ApplicationEvent {
   private static final long serialVersionUID = -4468405250074063206L;
   private DefaultMQPushConsumer consumer;
   private List<MessageExt> msgs;

   public RocketmqEvent(List<MessageExt> msgs, DefaultMQPushConsumer consumer) throws Exception {
       super(msgs);
       this.consumer = consumer;
       this.setMsgs(msgs);
   }

   public String getMsg(int idx) {
       try {
           return new String(getMsgs().get(idx).getBody(), "utf-8");
       } catch (UnsupportedEncodingException e) {
           return null;
       }
   }

   public String getMsg(int idx,String code) {
       try {
           return new String(getMsgs().get(idx).getBody(), code);
       } catch (UnsupportedEncodingException e) {
           return null;
       }
   }

   public DefaultMQPushConsumer getConsumer() {
       return consumer;
   }

   public void setConsumer(DefaultMQPushConsumer consumer) {
       this.consumer = consumer;
   }

   public MessageExt getMessageExt(int idx) {
       return getMsgs().get(idx);
   }


   public String getTopic(int idx) {
       return getMsgs().get(idx).getTopic();
   }


   public String getTag(int idx) {
       return getMsgs().get(idx).getTags();
   }


   public byte[] getBody(int idx) {
       return getMsgs().get(idx).getBody();
   }


   public String getKeys(int idx) {
       return getMsgs().get(idx).getKeys();
   }

   public List<MessageExt> getMsgs() {
       return msgs;
   }

   public void setMsgs(List<MessageExt> msgs) {
       this.msgs = msgs;
   }
}

范例

Producer
@RestController
public class ProducerDemo {
   @Autowired
   private DefaultMQProducer defaultProducer;

   @Autowired
   private TransactionMQProducer transactionProducer;

   private int i = 0;

   @RequestMapping(value = "/sendMsg", method = RequestMethod.GET)
   public void sendMsg() {
       Message msg = new Message("TopicTest1", // topic
               "TagA", // tag
               "OrderID00" + i, // key
               ("Hello zebra mq" + i).getBytes());// body
       try {
           defaultProducer.send(msg, new SendCallback() {

               @Override
               public void onSuccess(SendResult sendResult) {
                   System.out.println(sendResult);
                   // TODO 發(fā)送成功處理
               }

               @Override
               public void onException(Throwable e) {
                   System.out.println(e);
                   // TODO 發(fā)送失敗處理
               }
           });
           i++;
       } catch (Exception e) {
           e.printStackTrace();
       }
   }

   @RequestMapping(value = "/sendTransactionMsg", method = RequestMethod.GET)
   public String sendTransactionMsg() {
       SendResult sendResult = null;
       try {
           // 構(gòu)造消息
           Message msg = new Message("TopicTest1", // topic
                   "TagA", // tag
                   "OrderID001", // key
                   ("Hello zebra mq").getBytes());// body

           // 發(fā)送事務(wù)消息,LocalTransactionExecute的executeLocalTransactionBranch方法中執(zhí)行本地邏輯
           sendResult = transactionProducer.sendMessageInTransaction(msg, (Message msg1, Object arg) -> {
               int value = 1;

               // TODO 執(zhí)行本地事務(wù)负间,改變value的值
               // ===================================================
               System.out.println("執(zhí)行本地事務(wù)偶妖。。政溃。完成");
               if (arg instanceof Integer) {
                   value = (Integer) arg;
               }
               // ===================================================

               if (value == 0) {
                   throw new RuntimeException("Could not find db");
               } else if ((value % 5) == 0) {
                   return LocalTransactionState.ROLLBACK_MESSAGE;
               } else if ((value % 4) == 0) {
                   return LocalTransactionState.COMMIT_MESSAGE;
               }
               return LocalTransactionState.ROLLBACK_MESSAGE;
           }, 4);
           System.out.println(sendResult);
       } catch (Exception e) {
           e.printStackTrace();
       }
       return sendResult.toString();
   }

   @RequestMapping(value = "/sendMsgOrder", method = RequestMethod.GET)
   public void sendMsgOrder() {
       Message msg = new Message("TopicTest1", // topic
               "TagA", // tag
               "OrderID00" + i, // key
               ("Hello zebra mq" + i).getBytes());// body
       try {
           defaultProducer.send(msg, new MessageQueueSelector() {
               @Override
               public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                   System.out.println("MessageQueue" + arg);
                   int index = ((Integer) arg) % mqs.size();
                   return mqs.get(index);
               }
           }, i);// i==arg
           i++;
       } catch (Exception e) {
           e.printStackTrace();
       }
   }
}
Consumer
@Component
public class ConsumerDemo {
   @EventListener(condition = "#event.msgs[0].topic=='TopicTest1' && #event.msgs[0].tags=='TagA'")
   public void rocketmqMsgListen(RocketmqEvent event) {
//      DefaultMQPushConsumer consumer = event.getConsumer();
       try {
           System.out.println("com.guosen.client.controller.consumerDemo監(jiān)聽(tīng)到一個(gè)消息達(dá)到:" + event.getMsgs().get(0).getMsgId());
           // TODO 進(jìn)行業(yè)務(wù)處理
       } catch (Exception e) {
           e.printStackTrace();
       }
   }
}
zebra
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末趾访,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子董虱,更是在濱河造成了極大的恐慌扼鞋,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,576評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件愤诱,死亡現(xiàn)場(chǎng)離奇詭異云头,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)淫半,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,515評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門(mén)溃槐,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人科吭,你說(shuō)我怎么就攤上這事昏滴。” “怎么了对人?”我有些...
    開(kāi)封第一講書(shū)人閱讀 168,017評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵影涉,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我规伐,道長(zhǎng)蟹倾,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,626評(píng)論 1 296
  • 正文 為了忘掉前任猖闪,我火速辦了婚禮鲜棠,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘培慌。我一直安慰自己豁陆,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,625評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布吵护。 她就那樣靜靜地躺著盒音,像睡著了一般表鳍。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上祥诽,一...
    開(kāi)封第一講書(shū)人閱讀 52,255評(píng)論 1 308
  • 那天譬圣,我揣著相機(jī)與錄音,去河邊找鬼雄坪。 笑死厘熟,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的维哈。 我是一名探鬼主播绳姨,決...
    沈念sama閱讀 40,825評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼阔挠!你這毒婦竟也來(lái)了飘庄?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,729評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤购撼,失蹤者是張志新(化名)和其女友劉穎竭宰,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體份招,經(jīng)...
    沈念sama閱讀 46,271評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡切揭,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,363評(píng)論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了锁摔。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片廓旬。...
    茶點(diǎn)故事閱讀 40,498評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖谐腰,靈堂內(nèi)的尸體忽然破棺而出孕豹,到底是詐尸還是另有隱情,我是刑警寧澤十气,帶...
    沈念sama閱讀 36,183評(píng)論 5 350
  • 正文 年R本政府宣布励背,位于F島的核電站,受9級(jí)特大地震影響砸西,放射性物質(zhì)發(fā)生泄漏叶眉。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,867評(píng)論 3 333
  • 文/蒙蒙 一芹枷、第九天 我趴在偏房一處隱蔽的房頂上張望衅疙。 院中可真熱鬧,春花似錦鸳慈、人聲如沸饱溢。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,338評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)绩郎。三九已至潘鲫,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間肋杖,已是汗流浹背溉仑。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,458評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留兽愤,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,906評(píng)論 3 376
  • 正文 我出身青樓挪圾,卻偏偏與公主長(zhǎng)得像浅萧,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子哲思,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,507評(píng)論 2 359