Spring Cloud Alibaba之消息中間件 - RocketMQ(二十)

MQ消息隊(duì)列對(duì)比

CentOS7上搭建RocketMQ

環(huán)境要求:

  • CentOS 7.2
  • 64位JDK1.8+
  • 4G+的可用磁盤空間

1、下載RocketMQ的二進(jìn)制包唁影,我這里使用的是4.5.1版本盏触,下載地址如下:

http://rocketmq.apache.org/release_notes/release-notes-4.5.1/

使用wget命令下載:

[root@study-01 ~]# cd /usr/local/src
[root@study-01 /usr/local/src]# wget http://mirror.bit.edu.cn/apache/rocketmq/4.5.1/rocketmq-all-4.5.1-bin-release.zip

2九孩、解壓下載好的壓縮包哪替,并移動(dòng)到合適的目錄下:

[root@study-01 /usr/local/src]# unzip rocketmq-all-4.5.1-bin-release.zip
[root@study-01 /usr/local/src]# mv rocketmq-all-4.5.1-bin-release /usr/local/rocketmq-4.5.1

注:若沒有安裝unzip命令則使用如下命令安裝:
yum install -y unzip

3柱彻、進(jìn)入rocketmq的根目錄并查看是否包含如下目錄及文件:

[root@study-01 /usr/local/src]# cd /usr/local/rocketmq-4.5.1
[root@study-01 /usr/local/rocketmq-4.5.1]# ls
benchmark  bin  conf  lib  LICENSE  NOTICE  README.md

4拄查、沒問題后吁津,使用如下命令啟動(dòng)Name Server:

[root@study-01 /usr/local/rocketmq-4.5.1]# nohup sh bin/mqnamesrv &
[1] 2448
[root@study-01 /usr/local/rocketmq-4.5.1]# 

5、查看默認(rèn)的9876端口是否被監(jiān)聽堕扶,以驗(yàn)證Name Server是否啟動(dòng)成功:

[root@study-01 /usr/local/rocketmq-4.5.1]# netstat -lntp |grep java
tcp6       0      0 :::9876                 :::*                    LISTEN      2454/java           
[root@study-01 /usr/local/rocketmq-4.5.1]#

6碍脏、啟動(dòng)Broker:

[root@study-01 /usr/local/rocketmq-4.5.1]# nohup sh bin/mqbroker -n localhost:9876 &
[2] 2485
[root@study-01 /usr/local/rocketmq-4.5.1]# 

7、驗(yàn)證Broker是否啟動(dòng)成功稍算,如果啟動(dòng)成功典尾,能看到類似如下的日志::

[root@study-01 /usr/local/rocketmq-4.5.1]# cat ~/logs/rocketmqlogs/broker.log |grep "boot success"
2019-08-04 01:27:38 INFO main - The broker[study-01, 192.168.190.129:10911] boot success. serializeType=JSON and name server is localhost:9876
[root@study-01 /usr/local/rocketmq-4.5.1]# 

若想停止Name Server和Broker,則依次執(zhí)行以下兩條命令即可:

[root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/mqshutdown broker
The mqbroker(2492) is running...
Send shutdown request to mqbroker(2492) OK  # 輸出該信息說明停止成功
[root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/mqshutdown namesrv
The mqnamesrv(2454) is running...
Send shutdown request to mqnamesrv(2454) OK  # 輸出該信息說明停止成功
[2]+  退出 143              nohup sh bin/mqbroker -n localhost:9876
[root@study-01 /usr/local/rocketmq-4.5.1]#

驗(yàn)證RocketMQ功能是否正常

1糊探、驗(yàn)證生產(chǎn)消息正常钾埂,執(zhí)行如下命令:

[root@study-01 /usr/local/rocketmq-4.5.1]# export NAMESRV_ADDR=localhost:9876
[root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

正常的情況下,會(huì)看到一堆的類似于如下的輸出科平,這是生產(chǎn)消息后成功的result:

SendResult [sendStatus=SEND_OK, msgId=C0A8BE810A690D7163610FCC253B03E7, offsetMsgId=C0A8BE8100002A9F000000000002BDFE, messageQueue=MessageQueue [topic=TopicTest, brokerName=study-01, queueId=3], queueOffset=249]

2褥紫、驗(yàn)證消費(fèi)消息正常,執(zhí)行如下命令:

[root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

正常的情況下瞪慧,會(huì)看到一堆的類似于如下的輸出髓考,這是消費(fèi)的消息內(nèi)容:

ConsumeMessageThread_6 Receive New Messages: [MessageExt [queueId=3, storeSize=180, queueOffset=242, sysFlag=0, bornTimestamp=1564853837073, bornHost=/192.168.190.129:34708, storeTimestamp=1564853837074, storeHost=/192.168.190.129:10911, msgId=C0A8BE8100002A9F000000000002AA4E, commitLogOffset=174670, bodyCRC=911284903, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1564854006859, UNIQ_KEY=C0A8BE810A690D7163610FCC251103CB, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 55, 49], transactionId='null'}]]

搭建RocketMQ控制臺(tái)

RocketMQ官方提供了一個(gè)基于Spring Boot開發(fā)的可視化控制臺(tái),可以方便我們查看RocketMQ的運(yùn)行情況以及提升運(yùn)維效率弃酌。所以本小節(jié)將介紹一下如何搭建搭建RocketMQ的控制臺(tái)氨菇,由于我們使用的RocketMQ版本是4.5.1,所以需要對(duì)控制臺(tái)的源碼進(jìn)行一些改動(dòng)以適配RocketMQ的4.5.1版本矢腻。

1门驾、首先需要下載源碼射赛,有兩種方式多柑,一是使用git克隆代碼倉(cāng)庫(kù),二是直接下載rocketmq-externals的zip包楣责,我這里使用git方式竣灌,執(zhí)行如下命令:

git clone https://github.com/apache/rocketmq-externals.git

2聂沙、修改控制臺(tái)代碼,使用IDE打開rocketmq-console項(xiàng)目初嘹,如下圖所示:

2.1及汉、修改項(xiàng)目中的application.properties配置文件,我這里主要是修改了監(jiān)聽端口和Name Server的連接地址屯烦,至于其他配置項(xiàng)有需要的話可按照說明自行修改:

# console的監(jiān)聽端口坷随,默認(rèn)是8080
server.port=8011
# Name Server的連接地址;非必須驻龟,可以在啟動(dòng)了console后温眉,在控制臺(tái)導(dǎo)航欄 - 運(yùn)維 - NameSvrAddrList一欄設(shè)置
rocketmq.config.namesrvAddr=192.168.190.129:9876

2.2、修改依賴翁狐,由于console項(xiàng)目默認(rèn)使用的rocketmq版本是4.4.0类溢,與我們這里使用的是4.5.1不完全兼容,所以需要修改一下依賴版本露懒,找到這一行:

<rocketmq.version>4.4.0</rocketmq.version>

修改為:

<rocketmq.version>4.5.1</rocketmq.version>

2.3闯冷、修改代碼,由于修改了rocketmq的版本懈词,會(huì)導(dǎo)致org.apache.rocketmq.console.service.impl.MessageServiceImpl#queryMessageByTopic方法編譯報(bào)錯(cuò)蛇耀,所以需要改動(dòng)一下此處代碼 ,將:

@Override
public List<MessageView> queryMessageByTopic(String topic, final long begin, final long end) {
    DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, null);
    ...

修改為:

@Override
public List<MessageView> queryMessageByTopic(String topic, final long begin, final long end) {
    RPCHook rpcHook = null;
    DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook);
    ...

3坎弯、打包構(gòu)建并啟動(dòng)蒂窒,打開idea的terminal,執(zhí)行如下命令:

# 在rocketmq-console目錄下執(zhí)行
mvn clean package -DskipTests

# 進(jìn)入jar包存放目錄
cd target

# 啟動(dòng)rocketmq console
java -jar rocketmq-console-ng-1.0.1.jar

4荞怒、使用瀏覽器訪問控制臺(tái)洒琢,我這里由于修改了端口,所以訪問地址是:http://localhost:8011褐桌,正常的情況下能看到如下界面:


不習(xí)慣英文的話可以在右上角切換語(yǔ)言:

由于控制臺(tái)是可視化界面并且支持中文衰抑,這里就不過多介紹了,可以參考官方的控制臺(tái)使用說明文檔:

RocketMQ術(shù)語(yǔ)與概念

官方文檔:

Spring消息編程模型 - 編寫生產(chǎn)者

在以上小節(jié)搭建完RocketMQ之后荧嵌,我們來使用Spring的消息編程模型呛踊,編寫一個(gè)簡(jiǎn)單的示例。首先需要在項(xiàng)目中添加相關(guān)依賴如下:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.3</version>
</dependency>

在配置文件中添加rocketmq相關(guān)的配置如下:

rocketmq:
  name-server: 192.168.190.129:9876
  producer:
    # 小坑:必須指定group
    group: test-group

編寫生產(chǎn)者的代碼啦撮,這里以Controller做示例谭网,具體代碼如下:

import lombok.Data;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

/**
 * 生產(chǎn)者
 **/
@RestController
@RequiredArgsConstructor
public class TestProducerController {

    /**
     * 用于發(fā)送消息到 RocketMQ 的api
     */
    private final RocketMQTemplate rocketMQTemplate;

    @GetMapping("/test-rocketmq/sendMsg")
    public String testSendMsg() {
        String topic = "test-topic";
        // 發(fā)送消息
        rocketMQTemplate.convertAndSend(topic, MyMessage.getInstance());

        return "send message success";
    }
}

@Data
class MyMessage {
    private Integer id;
    private String name;
    private String status;
    private Date createTime;

    static MyMessage getInstance() {
        MyMessage message = new Message();
        message.id = 1;
        message.name = "×××";
        message.status = "default";
        message.createTime = new Date();

        return message;
    }
}

編寫完成后,啟動(dòng)項(xiàng)目赃春,訪問該接口:



消息發(fā)送成功后愉择,可以到RocketMQ的控制臺(tái)中進(jìn)行查看:



消息體可以在消息詳情中查看,如下:

從生產(chǎn)者的代碼來看,可以說是十分的簡(jiǎn)單了锥涕,只需要使用一個(gè)RocketMQTemplate就可以實(shí)現(xiàn)將對(duì)象轉(zhuǎn)換成消息體并發(fā)送消息衷戈。實(shí)際上除了RocketMQ外,其他的MQ也有對(duì)應(yīng)的Template层坠,如下:

  • RocketMQ:RocketMQTemplate
  • ActiveMQ/Artemis:JmsTemplate
  • RabbitMQ:AmqpTemplate
  • Kafka:KafkaTemplate

Spring消息編程模型 - 編寫消費(fèi)者

在消費(fèi)者項(xiàng)目中殖妇,也需要添加rocketmq的依賴:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.3</version>
</dependency>

同樣需要配置Name Server的連接地址:

rocketmq:
  name-server: 192.168.190.129:9876

編寫消費(fèi)者的代碼,具體代碼如下:

import com.alibaba.fastjson.JSON;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * 消費(fèi)者監(jiān)聽器
 **/
@Slf4j
@Component
// topic需要和生產(chǎn)者的topic一致破花,consumerGroup屬性是必須指定的谦趣,內(nèi)容可以隨意
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer-group")
public class TestConsumerListener implements RocketMQListener<MyMessage> {

    /**
     * 監(jiān)聽到消息的時(shí)候就會(huì)調(diào)用該方法
     *
     * @param message 消息體
     */
    @Override
    public void onMessage(MyMessage message) {
        log.info("從test-topic中監(jiān)聽到消息");
        log.info(JSON.toJSONString(message));
    }
}

/**
 * 消息體結(jié)構(gòu)需要一致
 */
@Data
class MyMessage {
    private Integer id;
    private String name;
    private String status;
    private Date createTime;
}

編寫完成后啟動(dòng)項(xiàng)目,由于之前我們已經(jīng)往隊(duì)列里發(fā)送了消息座每,所以此時(shí)消費(fèi)者項(xiàng)目一啟動(dòng)蔚润,就可以監(jiān)聽到消息并消費(fèi),控制臺(tái)就會(huì)輸出如下日志:



RocketMQ事務(wù)消息

眾所周知RocketMQ是支持事務(wù)消息的尺栖,這也是很多人選擇使用RocketMQ作為消息中間件的一大原因嫡纠,也是RocketMQ的一大特定理疙。RocketMQ事務(wù)消息的流程如下圖所示:



簡(jiǎn)單剖析一下流程:

1畜挨、生產(chǎn)者向MQ Server發(fā)送半消息,半消息是一種特殊的消息胖烛,這種消息會(huì)被存儲(chǔ)到MQ Server里挫以,但是會(huì)標(biāo)記為暫時(shí)不能投遞的狀態(tài)者蠕,所以此時(shí)消費(fèi)者不會(huì)消費(fèi)該消息
2、當(dāng)半消息發(fā)送成功后掐松,生產(chǎn)者就會(huì)去執(zhí)行本地事務(wù)
3踱侣、生產(chǎn)者根據(jù)本地事務(wù)的執(zhí)行結(jié)果,向MQ Server發(fā)送commit或rollback消息進(jìn)行二次確認(rèn)大磺。如果MQ Server接收到的是commit則會(huì)將半消息標(biāo)記為可投遞狀態(tài)抡句,那么消費(fèi)者就可以進(jìn)行消費(fèi)。反之杠愧,MQ Server接收到的是rollback則會(huì)將半消息丟棄掉待榔,消費(fèi)者就無法進(jìn)行消費(fèi)
4、若MQ Server未接收到二次確認(rèn)的消息或生產(chǎn)者暫停了本地事務(wù)的執(zhí)行流济,MQ Server則會(huì)定時(shí)(默認(rèn)1分鐘)向生產(chǎn)者發(fā)送回查消息锐锣,檢查生產(chǎn)者的本地事務(wù)狀態(tài)。然后生產(chǎn)者會(huì)根據(jù)回查的本地事務(wù)執(zhí)行結(jié)果向MQ Server再次發(fā)送commit或rollback消息

概念術(shù)語(yǔ)

  • 半消息(Half Message):暫時(shí)無法消費(fèi)的消息绳瘟,生產(chǎn)者將消息發(fā)送到了MQ Server雕憔,但這個(gè)消息會(huì)被標(biāo)記為“暫不能投遞”狀態(tài),先存儲(chǔ)起來糖声;消費(fèi)者不會(huì)消費(fèi)這條消息
  • 消息回查(Message Status Check):網(wǎng)絡(luò)斷開或生產(chǎn)者重啟可能導(dǎo)致丟失事務(wù)消息的第二次確認(rèn)斤彼。當(dāng)MQ Server發(fā)現(xiàn)消息長(zhǎng)時(shí)間處于半消息狀態(tài)時(shí)分瘦,將向消息生產(chǎn)者發(fā)送請(qǐng)求,詢問該消息的最終狀態(tài)(提交或回滾)

消息三態(tài)

Commit:提交事務(wù)消息畅卓,消費(fèi)者可以消費(fèi)此消息
Rollback:回滾事務(wù)消息,broker會(huì)刪除該消息蟋恬,消費(fèi)者不能消費(fèi)
UNKNOWN:broker需要回查確認(rèn)該消息的狀態(tài)

編碼實(shí)現(xiàn)RocketMQ事務(wù)消息

要想實(shí)現(xiàn)RocketMQ事務(wù)消息的話翁潘,需要按照流程圖編寫一些代碼。在開始編碼之前歼争,先在數(shù)據(jù)庫(kù)中創(chuàng)建一張RocketMQ的事務(wù)日志表拜马,用作于本地事務(wù)回查的依據(jù),表結(jié)構(gòu)如下:


然后再建一張表沐绒,作為事務(wù)方法操作的數(shù)據(jù)表俩莽,表結(jié)構(gòu)如下:



接著開始寫代碼,首先定義一個(gè)service乔遮,里面有帶有事務(wù)注解的方法以及發(fā)送事務(wù)消息的方法扮超。具體代碼如下:

import com.zj.node.contentcenter.dao.content.NoticeMapper;
import com.zj.node.contentcenter.dao.log.RocketmqTransactionLogMapper;
import com.zj.node.contentcenter.domain.entity.content.Notice;
import com.zj.node.contentcenter.domain.entity.log.RocketmqTransactionLog;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.Date;
import java.util.UUID;

@Service
@RequiredArgsConstructor
public class TestProducerService {

    private final RocketMQTemplate rocketMQTemplate;
    private final NoticeMapper noticeMapper;
    private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;

    public String testSendMsg(Notice notice) {
        // topic
        String topic = "test-topic";
        // 生產(chǎn)者所在的事務(wù)組
        String txProducerGroup = "tx-test-producer-group";
        // 生產(chǎn)事務(wù)id
        String transactionId = UUID.randomUUID().toString();
        // 發(fā)送半消息
        rocketMQTemplate.sendMessageInTransaction(
                txProducerGroup, topic,
                // 消息體
                MessageBuilder.withPayload("事務(wù)消息")
                        // header是消息的頭部分,可以用作傳參
                        .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
                        .setHeader("notice_id", notice.getId())
                        .build(),
                // 傳遞到executeLocalTransaction的參數(shù)
                notice);

        return "send message success";
    }

    @Transactional(rollbackFor = Exception.class)
    public void updateNotice(Integer noticeId, Notice notice) {
        Notice newNotice = new Notice();
        newNotice.setId(noticeId);
        newNotice.setContent(notice.getContent());

        noticeMapper.updateByPrimaryKeySelective(newNotice);
    }

    @Transactional(rollbackFor = Exception.class)
    public void updateNoticeWithRocketMQLog(Integer noticeId, Notice notice, String transactionId) {
        updateNotice(noticeId, notice);
        // 寫入事務(wù)日志
        rocketmqTransactionLogMapper.insertSelective(
                RocketmqTransactionLog.builder()
                        .transactionId(transactionId)
                        .log("updateNotice")
                        .build()
        );
    }
}

實(shí)現(xiàn)一個(gè)本地事務(wù)監(jiān)聽器蹋肮,用于執(zhí)行事務(wù)方法及提供本地事務(wù)狀態(tài)的回查方法出刷。具體代碼如下:

package com.zj.node.contentcenter.rocketmq;

import com.zj.node.contentcenter.dao.log.RocketmqTransactionLogMapper;
import com.zj.node.contentcenter.domain.entity.content.Notice;
import com.zj.node.contentcenter.domain.entity.log.RocketmqTransactionLog;
import com.zj.node.contentcenter.service.test.TestProducerService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;

/**
 * 本地事務(wù)監(jiān)聽器
 **/
@Slf4j
@RequiredArgsConstructor
// 這里的txProducerGroup需要與sendMessageInTransaction里設(shè)置的一致
@RocketMQTransactionListener(txProducerGroup = "tx-test-producer-group")
public class TestTransactionListener implements RocketMQLocalTransactionListener {

    private final TestProducerService service;
    private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;

    /**
     * 用于執(zhí)行本地事務(wù)的方法
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        MessageHeaders headers = msg.getHeaders();
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
                log.info("執(zhí)行本地事務(wù)方法. 事務(wù)id: {}", transactionId);
                // header里拿出來的都是String類型
        Integer noticeId = Integer.parseInt((String) headers.get("notice_id"));

        try {
            // 執(zhí)行帶有事務(wù)注解的方法
            service.updateNoticeWithRocketMQLog(noticeId, (Notice) arg, transactionId);
            // 正常執(zhí)行,向MQ Server發(fā)送commit消息
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            log.error("本地事務(wù)方法發(fā)生異常坯辩,消息將被回滾", e);
            // 發(fā)生異常向MQ Server發(fā)送rollback消息
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    /**
     * 用于回查本地事務(wù)的執(zhí)行結(jié)果
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        MessageHeaders headers = msg.getHeaders();
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        log.warn("回查本地事務(wù)狀態(tài). 事務(wù)id: {}", transactionId);

        // 按事務(wù)id查詢?nèi)罩緮?shù)據(jù)
        RocketmqTransactionLog transactionLog = rocketmqTransactionLogMapper.selectOne(
                RocketmqTransactionLog.builder()
                        .transactionId(transactionId)
                        .build()
        );

        // 如果能按事務(wù)id查詢出來數(shù)據(jù)表示本地事務(wù)執(zhí)行成功馁龟,沒有數(shù)據(jù)則表示本地事務(wù)執(zhí)行失敗
        if (transactionLog == null) {
            log.warn("本地事務(wù)執(zhí)行失敗,事務(wù)日志不存在漆魔,消息將被回滾. 事務(wù)id: {}", transactionId);
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        return RocketMQLocalTransactionState.COMMIT;
    }
}

簡(jiǎn)單說明一下這些方法的執(zhí)行流程:

首先調(diào)用TestProducerService.testSendMsg向MQ Server發(fā)送半消息坷檩,從代碼也可以看到該方法里不會(huì)執(zhí)行本地事務(wù)方法。當(dāng)MQ Server接收半消息成功后改抡,會(huì)告訴生產(chǎn)者接收成功矢炼,接著就會(huì)執(zhí)行本地事務(wù)監(jiān)聽器里的executeLocalTransaction方法,該方法里會(huì)調(diào)用TestProducerService里帶有事務(wù)注解的方法updateNoticeWithRocketMQLog阿纤,并在事務(wù)方法執(zhí)行完畢后返回本地事務(wù)狀態(tài)給MQ Server裸删。若executeLocalTransaction方法返回的事務(wù)狀態(tài)是UNKNOWN或者該方法出于某種原因沒有被執(zhí)行完畢,那么MQ Server就接收不到二次確認(rèn)消息阵赠,默認(rèn)會(huì)在一分鐘后向生產(chǎn)者發(fā)送回查消息涯塔,生產(chǎn)者接收到回查消息的話就會(huì)執(zhí)行本地事務(wù)監(jiān)聽器里的checkLocalTransaction方法,通過事務(wù)日志記錄表的數(shù)據(jù)來確認(rèn)該事務(wù)狀態(tài)并返回清蚀。


RocketMQ日志相關(guān)的坑

由于rocketmq有自己內(nèi)部的日志體系匕荸,所以默認(rèn)不會(huì)使用Slf4j。體現(xiàn)到executeLocalTransaction方法的話枷邪,就是如果該方法的執(zhí)行過程中拋出了異常的話榛搔,異常信息不會(huì)被打印到控制臺(tái),而是輸出到rocketmq_client.log日志文件中。相關(guān)源碼:org.apache.rocketmq.client.log.ClientLogger

如果希望rocketmq的日志輸出到控制臺(tái)的話践惑,需要在啟動(dòng)類的main方法中增加如下代碼:

// 讓rocketmq使用slf4j日志
System.setProperty(ClientLogger.CLIENT_LOG_USESLF4J, "true");

原文:https://blog.51cto.com/zero01/2426303

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末腹泌,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子尔觉,更是在濱河造成了極大的恐慌凉袱,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,635評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件侦铜,死亡現(xiàn)場(chǎng)離奇詭異专甩,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)钉稍,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,543評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門涤躲,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人贡未,你說我怎么就攤上這事种樱。” “怎么了俊卤?”我有些...
    開封第一講書人閱讀 168,083評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵缸托,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我瘾蛋,道長(zhǎng)俐镐,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,640評(píng)論 1 296
  • 正文 為了忘掉前任哺哼,我火速辦了婚禮佩抹,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘取董。我一直安慰自己棍苹,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,640評(píng)論 6 397
  • 文/花漫 我一把揭開白布茵汰。 她就那樣靜靜地躺著枢里,像睡著了一般。 火紅的嫁衣襯著肌膚如雪蹂午。 梳的紋絲不亂的頭發(fā)上栏豺,一...
    開封第一講書人閱讀 52,262評(píng)論 1 308
  • 那天,我揣著相機(jī)與錄音豆胸,去河邊找鬼奥洼。 笑死,一個(gè)胖子當(dāng)著我的面吹牛晚胡,可吹牛的內(nèi)容都是我干的灵奖。 我是一名探鬼主播嚼沿,決...
    沈念sama閱讀 40,833評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼瓷患!你這毒婦竟也來了骡尽?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,736評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤擅编,失蹤者是張志新(化名)和其女友劉穎攀细,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體沙咏,經(jīng)...
    沈念sama閱讀 46,280評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡辨图,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,369評(píng)論 3 340
  • 正文 我和宋清朗相戀三年班套,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了肢藐。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,503評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡吱韭,死狀恐怖吆豹,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情理盆,我是刑警寧澤痘煤,帶...
    沈念sama閱讀 36,185評(píng)論 5 350
  • 正文 年R本政府宣布,位于F島的核電站猿规,受9級(jí)特大地震影響衷快,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜姨俩,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,870評(píng)論 3 333
  • 文/蒙蒙 一蘸拔、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧环葵,春花似錦调窍、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,340評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至菊卷,卻和暖如春缔恳,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背洁闰。 一陣腳步聲響...
    開封第一講書人閱讀 33,460評(píng)論 1 272
  • 我被黑心中介騙來泰國(guó)打工褐耳, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人渴庆。 一個(gè)月前我還...
    沈念sama閱讀 48,909評(píng)論 3 376
  • 正文 我出身青樓铃芦,卻偏偏與公主長(zhǎng)得像雅镊,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子刃滓,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,512評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容