Springboot2.0 集成Rocketmq實(shí)現(xiàn)事物消息

Springboot2.0 集成Rocketmq實(shí)現(xiàn)事物消息

廢話不多說(shuō)佑惠,直接進(jìn)入正題

1.引入相關(guān)Maven依賴:

注意:當(dāng)前的RELEASE.VERSION=2.0.1

<!--在pom.xml中添加依賴-->

<dependency>

? ? <groupId>org.apache.rocketmq</groupId>

? ? <artifactId>rocketmq-spring-boot-starter</artifactId>

? ? <version>${RELEASE.VERSION}</version>

</dependency>

2.配置生產(chǎn)者:

(1).application.yml配置如下:

rocketmq:

? ? ? name-server: '127.0.0.1:9876'

? ? ? producer:

? ? ? ? ? group: my-group

? ? ? ? ? send-message-timeout: 300000

? ? ? ? ? compress-message-body-threshold: 4096

? ? ? ? ? max-message-size: 4194304

? ? ? ? ? retry-times-when-send-async-failed: 0

? ? ? ? ? retry-next-server: true

? ? ? ? ? retry-times-when-send-failed: 2

(2).發(fā)送事物消息:

@Resource

private RocketMQTemplate rocketMQTemplate;

{

? ? ? ? ? ?Message msg = MessageBuilder.withPayload(s).build();

? ? ? ? ?// rocketMQTemplate.getProducer().setVipChannelEnabled(false);

?// test1事務(wù)組晌块,對(duì)應(yīng)RocketMQLocalTransactionListener中的事務(wù)生產(chǎn)者組名稱

? ? ? ? ? ?rocketMQTemplate.sendMessageInTransaction("test1", "ts", msg, null);

}

實(shí)現(xiàn)事務(wù)監(jiān)聽(tīng):

import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;

import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;

import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;

import org.apache.rocketmq.spring.core.RocketMQTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.messaging.Message;

import com.alibaba.fastjson.JSON;


@RocketMQTransactionListener(txProducerGroup = "test1")

public class TransactionListenerImpl implements RocketMQLocalTransactionListener {

@Override

? ? public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {

? ? ? // ... local transaction process, return bollback, commit or unknown

? ? ? System.out.println("本地事務(wù)和消息發(fā)送:" + JSON.toJSONString(msg));

? ? ? return RocketMQLocalTransactionState.UNKNOWN;

? ? }

? ? @Override

? ? public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {

? ? ? // ... check transaction status and return bollback, commit or unknown

? ? ? System.out.println("回查信息:" + JSON.toJSONString(msg));

? ? ? return RocketMQLocalTransactionState.COMMIT;

? ? }

}

注意liunx上面需要開(kāi)放端口:9876弊仪、10909、10910、10911,不然會(huì)出現(xiàn)消費(fèi)端獲取失敗,連不上nameserv和broker

3.消費(fèi)者配置:

application.yml:

rocketmq:

? ? ? name-server: '127.0.0.1:9876'

代碼實(shí)現(xiàn):

對(duì)消費(fèi)者實(shí)現(xiàn)ack機(jī)制:

import java.util.List;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import org.apache.rocketmq.common.message.MessageExt;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;

import org.apache.rocketmq.spring.core.RocketMQListener;

import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;

import org.springframework.stereotype.Component;

@Component

@RocketMQMessageListener(topic = "ts", consumerGroup = "my-consumerGroup")

public class RocketmqConsumer implements RocketMQListener<String>,RocketMQPushConsumerLifecycleListener{


? ? private int count = 0;


? ? @Override

? ? public void prepareStart(DefaultMQPushConsumer consumer) {


? ? ? ? consumer.registerMessageListener(new MessageListenerConcurrently() {

? ? ? ? ? ? @Override

? ? ? ? ? ? public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,

? ? ? ? ? ? ? ? ConsumeConcurrentlyContext context) {

? ? ? ? ? ? ? ? for (MessageExt messageExt : msgs) {

? ? ? ? ? ? ? ? ? ? System.out.println("重試次數(shù):" + messageExt.getReconsumeTimes());

? ? ? ? ? ? ? ? ? // 注意可以在此處判斷重試次數(shù)症副,實(shí)現(xiàn)入庫(kù)插入,記錄相關(guān)消息政基,進(jìn)行下面的業(yè)務(wù)邏輯處理

? ? ? ? ? ? ? ? ? ? if(count == 0) {

? ? ? ? ? ? ? ? ? ? ? ? count++;

? ? ? ? ? ? ? ? ? ? ? ? return ConsumeConcurrentlyStatus.RECONSUME_LATER;

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? System.out.println("接受到的消息:" + new String(messageExt.getBody()));

? ? ? ? ? ? ? ? ? ? return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

? ? ? ? ? ? }

? ? ? ? });


? ? }

? ? @Override

? ? public void onMessage(String message) {

? ? ? ?實(shí)現(xiàn)RocketMQPushConsumerLifecycleListener監(jiān)聽(tīng)器之后贞铣,此方法不調(diào)用

? ? }


}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市沮明,隨后出現(xiàn)的幾起案子辕坝,更是在濱河造成了極大的恐慌,老刑警劉巖荐健,帶你破解...
    沈念sama閱讀 216,843評(píng)論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件酱畅,死亡現(xiàn)場(chǎng)離奇詭異琳袄,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)圣贸,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,538評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門(mén)挚歧,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)扛稽,“玉大人吁峻,你說(shuō)我怎么就攤上這事≡谡牛” “怎么了用含?”我有些...
    開(kāi)封第一講書(shū)人閱讀 163,187評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)帮匾。 經(jīng)常有香客問(wèn)我啄骇,道長(zhǎng),這世上最難降的妖魔是什么瘟斜? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,264評(píng)論 1 292
  • 正文 為了忘掉前任缸夹,我火速辦了婚禮,結(jié)果婚禮上螺句,老公的妹妹穿的比我還像新娘虽惭。我一直安慰自己,他們只是感情好蛇尚,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,289評(píng)論 6 390
  • 文/花漫 我一把揭開(kāi)白布芽唇。 她就那樣靜靜地躺著,像睡著了一般取劫。 火紅的嫁衣襯著肌膚如雪匆笤。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,231評(píng)論 1 299
  • 那天谱邪,我揣著相機(jī)與錄音炮捧,去河邊找鬼。 笑死惦银,一個(gè)胖子當(dāng)著我的面吹牛寓盗,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播璧函,決...
    沈念sama閱讀 40,116評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼傀蚌,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了蘸吓?” 一聲冷哼從身側(cè)響起善炫,我...
    開(kāi)封第一講書(shū)人閱讀 38,945評(píng)論 0 275
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎库继,沒(méi)想到半個(gè)月后箩艺,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體窜醉,經(jīng)...
    沈念sama閱讀 45,367評(píng)論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,581評(píng)論 2 333
  • 正文 我和宋清朗相戀三年艺谆,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了榨惰。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,754評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡静汤,死狀恐怖琅催,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情虫给,我是刑警寧澤藤抡,帶...
    沈念sama閱讀 35,458評(píng)論 5 344
  • 正文 年R本政府宣布,位于F島的核電站抹估,受9級(jí)特大地震影響缠黍,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜药蜻,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,068評(píng)論 3 327
  • 文/蒙蒙 一瓷式、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧语泽,春花似錦贸典、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,692評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至颊埃,卻和暖如春蔬充,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背班利。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,842評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工饥漫, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人罗标。 一個(gè)月前我還...
    沈念sama閱讀 47,797評(píng)論 2 369
  • 正文 我出身青樓庸队,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親闯割。 傳聞我的和親對(duì)象是個(gè)殘疾皇子彻消,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,654評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容