消息隊(duì)列之RocketMQ

一摄悯、基礎(chǔ)概念

  • Producer:消息生產(chǎn)者
  • Producer Group:消息生產(chǎn)者組瘪松,發(fā)送同類消息的一個(gè)消息生產(chǎn)組
  • Consumer:消費(fèi)者
  • Consumer Group:消費(fèi)同個(gè)消息的多個(gè)實(shí)例
  • Tag:標(biāo)簽预明,子主題(二級(jí)分類),用于區(qū)分同一個(gè)主題下的不同業(yè)務(wù)的消息
  • Topic:主題
  • Message:消息
  • Broker:MQ程序任柜,接收生產(chǎn)的消息哨鸭,提供給消費(fèi)者消費(fèi)的程序
  • Name Server:給生產(chǎn)和消費(fèi)者提供路由信息,提供輕量級(jí)的服務(wù)發(fā)現(xiàn)和路由

關(guān)于其服務(wù)端的構(gòu)建:

  • 通過數(shù)據(jù)存儲(chǔ)服務(wù)broker往枣,這玩意只做數(shù)據(jù)存儲(chǔ)處理相關(guān),不接受對外收發(fā)請求座享,只和name server交互婉商,為保證數(shù)據(jù)的可靠和穩(wěn)定性,提供主從策略渣叛,多節(jié)點(diǎn)備份丈秩,當(dāng)主節(jié)點(diǎn)掛了從節(jié)點(diǎn)繼續(xù)提供服務(wù);
  • 服務(wù)發(fā)現(xiàn)和路由器name server淳衙,這個(gè)相當(dāng)于微服務(wù)中的注冊中心蘑秽,它負(fù)責(zé)進(jìn)行路由的分發(fā),以及broker集群的管理箫攀,同時(shí)為了提供高可用肠牲,name server集群其實(shí)不叫集群,它們互不影響靴跛,任意一個(gè)掛了對整個(gè)集群依然正常工作缀雳。

相關(guān)文檔資料

二、簡單使用示例

2.1 使用基礎(chǔ)的 rocketmq-client包來實(shí)現(xiàn)

這個(gè)包中包含了封裝的RocketMQ相關(guān)的TCP連接操作梢睛。

2.1.1 添加maven依賴和配置

 <dependency>
   <groupId>org.apache.rocketmq</groupId>
   <artifactId>rocketmq-client</artifactId>
   <version>4.7.1</version>
 </dependency>

在application配置文件添加如下配置

rocketmq:
  name-server: 192.168.111.63:9876
  producer:
    group: client-server

創(chuàng)建DefaultMQProducer構(gòu)建工廠類

@Slf4j
@Component
public class RocketProducerBuilder implements DisposableBean {

    /**
     * NameServer 地址
     */
    @Value(value = "${rocketmq.name-server}")
    private String nameServerAddr;

    /**
     * 生產(chǎn)者的組名
     */
    @Value(value = "${rocketmq.producer.group}")
    private String producerGroup;

    private DefaultMQProducer producer;

    /**
     * 初始化DefaultMQProducer
     *
     * 參考rocketmq-spring-boot包中的org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration類
     *
     * @throws MQClientException 啟動(dòng)消息生產(chǎn)者異常
     */
    @PostConstruct
    void init() throws MQClientException {
        //生產(chǎn)者的組名
        producer = new DefaultMQProducer(producerGroup);
        /// 指定NameServer地址肥印,多個(gè)地址以 ; 隔開
        producer.setNamesrvAddr(nameServerAddr);
        // 關(guān)閉Channel通道
        producer.setVipChannelEnabled(false);
        // 發(fā)送消息超時(shí)時(shí)間,單位毫秒
        producer.setSendMsgTimeout(3000);
        // 在同步模式下绝葡,消息發(fā)送失敗后重試次數(shù)深碱,注意這個(gè)可能導(dǎo)致重復(fù)消息
        producer.setRetryTimesWhenSendFailed(2);
        // 在異步模式下,消息發(fā)送失敗后重試次數(shù)藏畅,注意這個(gè)可能導(dǎo)致重復(fù)消息
        producer.setRetryTimesWhenSendAsyncFailed(2);
        // 發(fā)送消息的消息體網(wǎng)絡(luò)包最大值
        producer.setMaxMessageSize(1024 * 1024 * 4);
        // 當(dāng)消息體網(wǎng)絡(luò)包大于4k時(shí)壓縮消息
        producer.setCompressMsgBodyOverHowmuch(1024 * 4);
        // 當(dāng)向一個(gè)broker發(fā)送消息失敗了敷硅,是否重新嘗試下一個(gè)
        producer.setRetryAnotherBrokerWhenNotStoreOK(false);
        // Producer對象在使用之前必須要調(diào)用start初始化,只能初始化一次
        producer.start();
    }

    /**
     * 獲取DefaultMQProducer
     * @return  返回消息生產(chǎn)者DefaultMQProducer
     */
    public DefaultMQProducer build(){
        return this.producer;
    }

    @Override
    public void destroy() {
        if(null!=producer){
            producer.shutdown();
            log.info("Rocket Producer Destroyed");
        }
    }
}

2.1.2 發(fā)送普通消息

普通消息分為:同步(Sync)發(fā)送愉阎、異步(Async)發(fā)送和單向(Oneway)發(fā)送绞蹦。

  • 發(fā)送同步消息示例;注意同步消息會(huì)阻塞等待消息發(fā)送結(jié)果榜旦,適用場景:重要通知郵件坦辟、報(bào)名短信通知、營銷短信系統(tǒng)等章办。
@Autowired
private RocketProducerBuilder producerBuilder;

/**
* 同步消息
*/
@PostMapping("/general")
public Mono<SendResult> sendMessage(@RequestBody OrderDTO orderDTO){

  DefaultMQProducer producer = producerBuilder.build();
  Message message = new Message();
  message.setTopic("demo-pay");
  message.setTags("train");
  message.setKeys(UUID.randomUUID().toString());
  message.setBody(JSONObject.toJSONString(orderDTO).getBytes(StandardCharsets.UTF_8));
  return Mono.defer(()-> {
    try {
        // 注意這個(gè)方法在發(fā)送失敗了會(huì)重試锉走,消費(fèi)者需要做好處理
      return Mono.just(producer.send(message, 3000));
    } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
      e.printStackTrace();
      return Mono.error(e);
    }
  });
}

普通消息應(yīng)該是最常用的消息滨彻,需要注意的是DefaultMQProducer的send方法有重試機(jī)制,具體查看org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl類中的sendDefaultImpl()方法挪蹭;
因此消費(fèi)者需要處理注意接口的冥等性亭饵。

  • 發(fā)送異步消息;異步發(fā)送是指發(fā)送方發(fā)出一條消息后梁厉,不等服務(wù)端返回響應(yīng)辜羊,接著發(fā)送下一條消息的通訊方式。適用場景:異步發(fā)送一般用于鏈路耗時(shí)較長词顾,對響應(yīng)時(shí)間較為敏感的業(yè)務(wù)場景八秃。
/**
* 異步消息
*/
@PostMapping("/async")
public String asyncSend(@Validated @RequestBody OrderDTO orderDTO) {

  DefaultMQProducer producer = producerBuilder.build();
  Message message = new Message();
  message.setTopic("demo-pay");
  message.setTags("train");
  message.setKeys(UUID.randomUUID().toString());
  message.setBody(JSONObject.toJSONString(orderDTO).getBytes(StandardCharsets.UTF_8));
  try {
    producer.send(message, new SendCallback() {
      @Override
      public void onSuccess(SendResult sendResult) {
        log.info("異步消息發(fā)送結(jié)果:{}", sendResult);
      }
      @Override
      public void onException(Throwable e) {
        log.error("異步消息發(fā)送異常:", e);
      }
    }, 3000);
  } catch (Exception e) {
    log.error("異步消息發(fā)送異常:", e);
  }
  return "發(fā)送成功";
}
  • 單向發(fā)送;發(fā)送方只負(fù)責(zé)發(fā)送消息肉盹,不等待服務(wù)端返回響應(yīng)且沒有回調(diào)函數(shù)觸發(fā)昔驱,即只發(fā)送請求不等待應(yīng)答。此方式發(fā)送消息的過程耗時(shí)非常短上忍,一般在微秒級(jí)別骤肛。適用場景:適用于某些耗時(shí)非常短,但對可靠性要求并不高的場景窍蓝,例如日志收集腋颠。
/**
* 單向消息
*/
@PostMapping("/oneway")
public String sendOneway(@Validated @RequestBody OrderDTO orderDTO) {

  DefaultMQProducer producer = producerBuilder.build();
  Message message = new Message();
  message.setTopic("demo-pay");
  message.setTags("train");
  message.setKeys(UUID.randomUUID().toString());
  message.setBody(JSONObject.toJSONString(orderDTO).getBytes(StandardCharsets.UTF_8));
  try {
    producer.sendOneway(message);
  } catch (Exception e) {
    log.error("單向消息發(fā)送異常:", e);
  }
  return "發(fā)送成功";
}

2.1.3 發(fā)送延時(shí)消息

延時(shí)消息用于指定消息發(fā)送后,延時(shí)一段時(shí)間才被投遞到客戶端進(jìn)行消費(fèi)(例如 3 秒后才被消費(fèi))吓笙,適用于解決一些消息生產(chǎn)和消費(fèi)有時(shí)間窗口要求的場景淑玫,或者通過消息觸發(fā)延遲任務(wù)的場景,類似于延遲隊(duì)列面睛。

注意:開源版本的僅支持18個(gè)等級(jí)的延遲消息絮蒿,阿里云官方的商業(yè)版支持任意時(shí)間的延時(shí)消息停撞;

延時(shí)等級(jí)(delayLevel)對應(yīng)的時(shí)間: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

/**
* 延時(shí)消息
*/
@PostMapping("/delay")
public SendResult delayMessage(@Validated @RequestBody OrderDTO orderDTO){

    Message message = new Message();
    message.setTopic("demo-pay");
    message.setTags("train");
    message.setKeys(UUID.randomUUID().toString());
    message.setBody(JSONObject.toJSONString(orderDTO).getBytes(StandardCharsets.UTF_8));
    // 延時(shí)等級(jí)(delayLevel)對應(yīng)的時(shí)間 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h档插,從1開始
    message.setDelayTimeLevel(3);
    try {
        return producerBuilder.build().send(message);
    } catch (Exception e) {
        e.printStackTrace();
        log.error("延時(shí)消息發(fā)送異常:", e);
    }
    return null;
}

2.1.4 發(fā)送順序消息

順序消息(FIFO 消息)是消息隊(duì)列 RocketMQ 版提供的一種嚴(yán)格按照順序來發(fā)布和消費(fèi)的消息類型。
其分為下面2類:

  • 全局順序:對于指定的一個(gè) Topic,所有消息按照嚴(yán)格的先入先出 FIFO(First In First Out)的順序進(jìn)行發(fā)布和消費(fèi)亲茅。
  • 局部順序:對于指定的一個(gè) Topic,所有消息根據(jù) Sharding Key 進(jìn)行區(qū)塊分區(qū)狗准。同一個(gè)分區(qū)內(nèi)的消息按照嚴(yán)格的 FIFO 順序進(jìn)行發(fā)布和消費(fèi)克锣。Sharding Key 是順序消息中用來區(qū)分不同分區(qū)的關(guān)鍵字段,和普通消息的 Key 是完全不同的概念腔长。

示例:用戶A袭祟、B都下了訂單,需要以順序發(fā)送3條消息捞附,

A1 A2 A3 B1 B2 B3  全局順序巾乳,但是系統(tǒng)性能很受影響您没。
A1 B1 A2 A3 B2 B3  局部順序,只需要保證A或B的消息順序即可胆绊,中間可以穿插其他的消息
A2 B2 A1 A3 B1 B3  這樣的就不符合要求

為了實(shí)現(xiàn)消息的順序消費(fèi)氨鹏,我們需要最生產(chǎn)者和消息者做特殊些要求;對應(yīng)全局順序压状,必須設(shè)置1個(gè)Topic下讀寫隊(duì)列都為1仆抵,同時(shí)業(yè)務(wù)端還需要對消息的重試機(jī)制進(jìn)行處理,性能自然也就差了种冬,因此不建議使用镣丑。

對于局部順序,需要保證消息的發(fā)送順序娱两、消息的存儲(chǔ)順序莺匠、消息的消費(fèi)順序;

  • 消息的發(fā)送:多線程中需要保證同一個(gè)業(yè)務(wù)編號(hào)的消息在一個(gè)線程中完成谷婆,同時(shí)使用同步的消息慨蛙;
  • 消息的存儲(chǔ):mq的topic下會(huì)存在多個(gè)queue,要保證消息的順序存儲(chǔ)纪挎,同一個(gè)業(yè)務(wù)編號(hào)的消息需要被發(fā)送到一個(gè)queue中期贫。對應(yīng)到mq中,需要使用MessageQueueSelector來選擇要發(fā)送的queue异袄,即對業(yè)務(wù)編號(hào)進(jìn)行hash通砍,然后根據(jù)隊(duì)列數(shù)量對hash值取余,將消息發(fā)送到一個(gè)queue中烤蜕。
  • 消息的消費(fèi):要保證消息順序消費(fèi)封孙,同一個(gè)queue就只能被一個(gè)消費(fèi)者所消費(fèi),因此對broker中消費(fèi)隊(duì)列加鎖是無法避免的讽营。同一時(shí)刻虎忌,一個(gè)消費(fèi)隊(duì)列只能被一個(gè)消費(fèi)者消費(fèi),消費(fèi)者內(nèi)部橱鹏,也只能有一個(gè)消費(fèi)線程來消費(fèi)該隊(duì)列膜蠢。即,同一時(shí)刻莉兰,一個(gè)消費(fèi)隊(duì)列只能被一個(gè)消費(fèi)者中的一個(gè)線程消費(fèi)挑围。

全局順序和局部順序的代碼實(shí)現(xiàn)幾乎是一樣。

/**
* 順序消息
*/
@PostMapping
public String sendMessage(@Validated @RequestBody OrderNotify orderNotify){

    Message message = new Message();
    message.setTopic("order-notify");
    message.setTags("train");
    message.setKeys(UUID.randomUUID().toString());
    message.setBody(JSONObject.toJSONString(orderNotify).getBytes(StandardCharsets.UTF_8));
    try {
        producerBuilder.build().send(message, (mqs, msg, arg) -> {
            // 這里就是進(jìn)行隊(duì)列的選擇糖荒,這里的arg參數(shù)就是后面?zhèn)魅氲哪莻€(gè)參數(shù)
            int index = Math.abs(arg.hashCode()%mqs.size());
            return mqs.get(index);
        }, orderNotify.getOrderNo());
    } catch (Exception e) {
        log.error("順序消息發(fā)送異常", e);
        return "順序消息發(fā)送異常:"+e.getMessage();
    }
    return "消息發(fā)送完成";
}

消費(fèi)者實(shí)現(xiàn)杉辙,和這里需要選擇有序的監(jiān)聽類實(shí)現(xiàn),同時(shí)需要從隊(duì)列開始處開始消費(fèi)捶朵。(其他的是一樣的蜘矢,具體的見消費(fèi)者那塊的代碼)

@Override
public void init() {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(getConsumerGroup());
    consumer.setNamesrvAddr(getNameServer());
    try {
        // 設(shè)置consumer所訂閱的Topic和Tag, *代表所有的Tag
        consumer.subscribe(this.topics, this.tags);
        // CONSUME_FROM_FIRST_OFFSET, 從隊(duì)列最開始開始消費(fèi)狂男,即歷史消息(還存在broker的)全部消費(fèi)一遍
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 設(shè)置線程數(shù),默認(rèn)是20品腹;這里先將其設(shè)置小點(diǎn)
        consumer.setConsumeThreadMin(3);
        consumer.setConsumeThreadMax(6);

        // MessageListenerOrderly 有序的
        consumer.registerMessageListener((MessageListenerOrderly) (list, context) -> {
            try{
                // 其實(shí)這里默認(rèn)每次只會(huì)傳入一條消息
                log.warn("本次消息數(shù):{}", list.size());
                for(MessageExt messageExt:list){
                    //打印消息內(nèi)容
                    log.info("messageExt: [{}]: {}", getNumber(), messageExt);
                    String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
                    ConsumerResult consumerResult = rocketConsumerHandler.handler(ConsumerMessage.builder()
                                                                                  .number(getNumber())
                                                                                  .message(messageBody)
                                                                                  .build());
                    if(!consumerResult.isSuccess() && consumerResult.isRetry()){
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }
                }
            }catch (Exception e){
                log.error("順序消息消費(fèi)異常:", e);
                return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
            }
            return ConsumeOrderlyStatus.SUCCESS;
        });
        super.setMqPushConsumer(consumer);
        consumer.start();
        log.info("start rocketmq consumer success");
    }catch (Exception e){
        throw new BizRunTimeException("注冊rocketmq消費(fèi)者異常", e);
    }
}

2.1.5 訂閱消息發(fā)布

Rocket的消息訂閱方式分為集群模式(默認(rèn)的)和廣播模式并淋,這個(gè)只需要在消費(fèi)端進(jìn)行設(shè)置即可;廣播模式下珍昨,同一個(gè) Group ID 所標(biāo)識(shí)的所有 Consumer 都會(huì)各自消費(fèi)某條消息一次县耽。同時(shí)廣播模式下不支持順序消息,消費(fèi)點(diǎn)重置即消費(fèi)失敗不會(huì)重新投遞镣典。

額外說明:同一個(gè)Group ID 所標(biāo)識(shí)的消費(fèi)者訂閱的設(shè)置需要保持一致兔毙,即消費(fèi)者分組A中所有消費(fèi)者的topic和tag必須設(shè)置為一樣的。

下面這個(gè)就是不正確的兄春,如果這樣設(shè)置澎剥,那么消息消費(fèi)的邏輯就會(huì)混亂,甚至導(dǎo)致消息丟失


就是在創(chuàng)建消費(fèi)者時(shí)增加下面這行代碼即可

consumer.setMessageModel(MessageModel.BROADCASTING);

2.1.6 事務(wù)消息

2.1.7 消費(fèi)者

下面使用基于springboot的簡單實(shí)現(xiàn)赶舆;application配置文件如下:

rocketmq:
  name-server: 192.168.111.63:9876
  producer:
    group: client-server
  consumer:
    - consumerGroup: trian-order
      consumeFromWhere: CONSUME_FROM_LAST_OFFSET
      topics: demo-pay
      rocketConsumerHandler: "top.vchar.rocketmq.config.rocketmq.handler.SimpleRocketConsumerHandler"

消費(fèi)者接口定義

public interface RocketConsumer {

    /**
     * 初始化
     */
    void init();

}
/**
 * <p> rocketmq 消費(fèi)者基礎(chǔ)信息 </p>
 *
 * @version 1.0
 * @create_date 2020/10/27
 */
@Slf4j
public abstract class AbstractRocketConsumer implements RocketConsumer, DisposableBean {

    protected MQPushConsumer consumer;

    @Getter
    private final String nameServer;

    @Getter
    private final String consumerGroup;

    @Getter
    private final ConsumeFromWhere consumeFromWhere;

    public AbstractRocketConsumer(String nameServer, String consumerGroup, ConsumeFromWhere consumeFromWhere){
        this.nameServer = nameServer;
        this.consumerGroup = consumerGroup;
        this.consumeFromWhere = Optional.ofNullable(consumeFromWhere).orElse(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    }

    /**
     * 初始化
     */
    @Override
    public abstract void init();

    public void setMqPushConsumer(MQPushConsumer consumer){
        this.consumer = consumer;
    }

    /**
     * 銷毀
     */
    @Override
    public void destroy() throws Exception {
        if (Objects.nonNull(consumer)) {
            consumer.shutdown();
        }
        log.info("container destroyed, {}", this.toString());
    }
}
/**
 * <p> 普通消息哑姚,延時(shí)消息消費(fèi)者 </p>
 *
 * @author vchar fred
 * @version 1.0
 * @create_date 2020/10/29
 */
@Slf4j
public class SimpleRocketConsumer extends AbstractRocketConsumer {

    private final String topics;

    private final String tags;

    private final RocketConsumerHandler rocketConsumerHandler;

    /**
     * 創(chuàng)建簡單的消費(fèi)者
     * @param nameServer name server
     * @param consumerGroup 消費(fèi)者組
     * @param consumeFromWhere 消費(fèi)策略
     *                         CONSUME_FROM_LAST_OFFSET 默認(rèn)策略。從該隊(duì)列最尾開始消費(fèi)芜茵,跳過歷史消息
     *                         CONSUME_FROM_FIRST_OFFSET, 從隊(duì)列最開始開始消費(fèi)叙量,即歷史消息(還存在broker的)全部消費(fèi)一遍
     *                         CONSUME_FROM_TIMESTAMP; 根據(jù)時(shí)間消費(fèi)
     * @param topics 主題
     * @param tags 標(biāo)簽,默認(rèn)為*
     * @param rocketConsumerHandler  消息接收業(yè)務(wù)處理器
     */
    public SimpleRocketConsumer(String nameServer, String consumerGroup, ConsumeFromWhere consumeFromWhere, String topics, String tags, RocketConsumerHandler rocketConsumerHandler){
        super(nameServer, consumerGroup, consumeFromWhere);
        Assert.notNull(nameServer, "RocketMQ name server can't null");
        Assert.notNull(consumerGroup, "RocketMQ consumer group can't null");
        Assert.notNull(topics, "RocketMQ topics can't null");
        Assert.notNull(rocketConsumerHandler, "RocketMQ SimpleRocketConsumerHandler can't null");

        this.topics = topics;
        this.tags = Optional.ofNullable(tags).orElse("*");
        this.rocketConsumerHandler = rocketConsumerHandler;
    }

    @Override
    public void init() {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(getConsumerGroup());
        consumer.setNamesrvAddr(getNameServer());
        try {
            //設(shè)置consumer所訂閱的Topic和Tag, *代表所有的Tag
            consumer.subscribe(this.topics, this.tags);

            // CONSUME_FROM_LAST_OFFSET 默認(rèn)策略九串。從該隊(duì)列最尾開始消費(fèi)绞佩,跳過歷史消息
            // CONSUME_FROM_FIRST_OFFSET, 從隊(duì)列最開始開始消費(fèi),即歷史消息(還存在broker的)全部消費(fèi)一遍
            // CONSUME_FROM_TIMESTAMP;//根據(jù)時(shí)間消費(fèi)
            consumer.setConsumeFromWhere(getConsumeFromWhere());

            // MessageListenerOrderly 有序的猪钮,
            // 注意有序的在返回消費(fèi)失敗后品山,其會(huì)馬上就將消息再次發(fā)過來,并且其消費(fèi)次數(shù)不變烤低,
            //    也就是其會(huì)永遠(yuǎn)的重試(因此建議不要把異常拋出肘交,程序里面手動(dòng)處理下)
            // MessageListenerConcurrently無序的,效率更高
            consumer.registerMessageListener((MessageListenerConcurrently)(list, context)->{
                try{
                    for(MessageExt messageExt:list){
                        //打印消息內(nèi)容
                        log.info("messageExt: {}", messageExt);
                        String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
                        ConsumerResult consumerResult = rocketConsumerHandler.handler(messageBody);
                        if(!consumerResult.isSuccess() && consumerResult.isRetry()){
                            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                        }
                    }
                }catch (Exception e){
                    log.error("消息消費(fèi)異常", e);
                    // 消費(fèi)失敗扑馁,稍后mq會(huì)再次將消息發(fā)過來涯呻,注意mq默認(rèn)最大重試次數(shù)為16,可以修改檐蚜。
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                // 消費(fèi)成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });
            consumer.start();
            super.setMqPushConsumer(consumer);
            log.info("start rocketmq consumer success");
        }catch (Exception e){
            throw new BizRunTimeException("注冊rocketmq消費(fèi)者異常", e);
        }
    }

    public static SimpleRocketConsumerBuilder builder(){
        return new SimpleRocketConsumerBuilder();
    }

    static class SimpleRocketConsumerBuilder {

        private String nameServer;

        private String consumerGroup;

        private ConsumeFromWhere consumeFromWhere;

        private String topics;

        private String tags;

        private RocketConsumerHandler rocketConsumerHandler;

        public SimpleRocketConsumerBuilder(){

        }

        public SimpleRocketConsumer build(){
            return new SimpleRocketConsumer(this.nameServer, this.consumerGroup, this.consumeFromWhere, this.topics, this.tags, this.rocketConsumerHandler);
        }

        public SimpleRocketConsumerBuilder nameServer(String nameServer){
            this.nameServer = nameServer;
            return this;
        }

        public SimpleRocketConsumerBuilder consumerGroup(String consumerGroup){
            this.consumerGroup = consumerGroup;
            return this;
        }

        public SimpleRocketConsumerBuilder consumeFromWhere(ConsumeFromWhere consumeFromWhere){
            this.consumeFromWhere = consumeFromWhere;
            return this;
        }

        public SimpleRocketConsumerBuilder topics(String topics){
            this.topics = topics;
            return this;
        }

        public SimpleRocketConsumerBuilder tags(String tags){
            this.tags = tags;
            return this;
        }

        public SimpleRocketConsumerBuilder rocketConsumerHandler(RocketConsumerHandler rocketConsumerHandler){
            this.rocketConsumerHandler = rocketConsumerHandler;
            return this;
        }

    }

}

業(yè)務(wù)處理handler

public interface RocketConsumerHandler {

    /**
     * 消息處理
     * @param message 消息內(nèi)容
     * @return 返回處理結(jié)果
     */
    ConsumerResult handler(String message);

}
/**
 * <p> 消息業(yè)務(wù)handler實(shí)現(xiàn) </p>
 *
 * @version 1.0
 * @create_date 2020/10/29
 */
@Slf4j
@Component
public class SimpleRocketConsumerHandler implements RocketConsumerHandler {

    @Override
    public ConsumerResult handler(String message) {
        log.info("消費(fèi)消息: {}", message);
        // TODO 這里是業(yè)務(wù)處理魄懂,ConsumerResult類就是個(gè)簡單的處理結(jié)果類
        return new ConsumerResult(true);
    }
}

RocketMQ配置類

@Data
@Component
@ConfigurationProperties(prefix = "rocketmq")
public class RocketMQProperties implements Serializable {

    /**
     * The name server for rocketMQ, formats: `host:port;host:port`.
     */
    private String nameServer;

    private Producer producer;

    private List<Consumer> consumer;

    @Data
    public static final class Producer {
        private String group;
    }

    @Data
    public static final class Consumer {
        private String consumerGroup;

        private ConsumeFromWhere consumeFromWhere;

        private String topics;

        private String tags;

        private String rocketConsumerHandler;
    }
}

注冊消費(fèi)者

/**
 * <p> 消費(fèi)者注冊 </p>
 *
 * spring容器將所有的bean加載完畢后會(huì)執(zhí)行run方法
 *
 * @version 1.0
 * @create_date 2020/10/29
 */
@Slf4j
@Component
public class RocketMQConsumerRegister implements CommandLineRunner {

    private final RocketMQProperties properties;

    public RocketMQConsumerRegister(RocketMQProperties properties) {
        this.properties = properties;
    }

    @Override
    public void run(String... args) throws Exception {
        List<RocketMQProperties.Consumer> consumers = properties.getConsumer();
        if(consumers!=null && !consumers.isEmpty()){
            for(RocketMQProperties.Consumer consumer:consumers){
                SimpleRocketConsumer.builder()
                        .nameServer(this.properties.getNameServer())
                        .consumerGroup(consumer.getConsumerGroup())
                        .consumeFromWhere(consumer.getConsumeFromWhere())
                        .topics(consumer.getTopics())
                        .tags(consumer.getTags())
                        .rocketConsumerHandler(getHandler(consumer.getRocketConsumerHandler()))
                        .build().init();
            }
        }
    }

    private RocketConsumerHandler getHandler(String handlerClass){
        try {
            // 由于這些業(yè)務(wù)處理handler可能依賴些基礎(chǔ)組件沿侈,比如數(shù)據(jù)庫等闯第,因此這里從spring容器中獲取bean
            return (RocketConsumerHandler) SpringBeanUtil.getBean(Class.forName(handlerClass));
        } catch (ClassNotFoundException e) {
            throw new RuntimeException(e);
        }
    }

}
/**
 * <p> spring bean 工具類 </p>
 *
 * @version 1.0
 * @create_date 2020/10/29
 */
@Component
public class SpringBeanUtil implements ApplicationContextAware {

    private static ApplicationContext applicationContext = null;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        if (null == SpringBeanUtil.applicationContext) {
            SpringBeanUtil.applicationContext = applicationContext;
        }
    }

    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }

    /**
     * 通過Bean名字獲取Bean
     *
     * @param beanName bean 名稱
     * @return 返回獲取到的對象
     */
    public static Object getBean(String beanName) {
        return getApplicationContext().getBean(beanName);
    }

    /**
     * 通過Bean類型獲取Bean
     *
     * @param beanClass bean class
     * @param <T>       beanClass
     * @return 返回對象
     */
    public static <T> T getBean(Class<T> beanClass) {
        return getApplicationContext().getBean(beanClass);
    }

    /**
     * 通過Bean名字和Bean類型獲取Bean
     *
     * @param beanName  bean 名稱
     * @param beanClass class
     * @param <T>       beanClass
     * @return 返回對象
     */
    public static <T> T getBean(String beanName, Class<T> beanClass) {
        return getApplicationContext().getBean(beanName, beanClass);
    }

}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市缀拭,隨后出現(xiàn)的幾起案子咳短,更是在濱河造成了極大的恐慌填帽,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,539評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件咙好,死亡現(xiàn)場離奇詭異篡腌,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)勾效,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,594評(píng)論 3 396
  • 文/潘曉璐 我一進(jìn)店門嘹悼,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人层宫,你說我怎么就攤上這事杨伙。” “怎么了萌腿?”我有些...
    開封第一講書人閱讀 165,871評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵限匣,是天一觀的道長。 經(jīng)常有香客問我毁菱,道長米死,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,963評(píng)論 1 295
  • 正文 為了忘掉前任贮庞,我火速辦了婚禮峦筒,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘窗慎。我一直安慰自己勘天,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,984評(píng)論 6 393
  • 文/花漫 我一把揭開白布捉邢。 她就那樣靜靜地躺著脯丝,像睡著了一般。 火紅的嫁衣襯著肌膚如雪伏伐。 梳的紋絲不亂的頭發(fā)上宠进,一...
    開封第一講書人閱讀 51,763評(píng)論 1 307
  • 那天,我揣著相機(jī)與錄音藐翎,去河邊找鬼材蹬。 笑死,一個(gè)胖子當(dāng)著我的面吹牛吝镣,可吹牛的內(nèi)容都是我干的堤器。 我是一名探鬼主播,決...
    沈念sama閱讀 40,468評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼末贾,長吁一口氣:“原來是場噩夢啊……” “哼闸溃!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,357評(píng)論 0 276
  • 序言:老撾萬榮一對情侶失蹤辉川,失蹤者是張志新(化名)和其女友劉穎表蝙,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體乓旗,經(jīng)...
    沈念sama閱讀 45,850評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡府蛇,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,002評(píng)論 3 338
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了屿愚。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片汇跨。...
    茶點(diǎn)故事閱讀 40,144評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖妆距,靈堂內(nèi)的尸體忽然破棺而出扰法,到底是詐尸還是另有隱情,我是刑警寧澤毅厚,帶...
    沈念sama閱讀 35,823評(píng)論 5 346
  • 正文 年R本政府宣布塞颁,位于F島的核電站,受9級(jí)特大地震影響吸耿,放射性物質(zhì)發(fā)生泄漏祠锣。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,483評(píng)論 3 331
  • 文/蒙蒙 一咽安、第九天 我趴在偏房一處隱蔽的房頂上張望伴网。 院中可真熱鬧,春花似錦妆棒、人聲如沸澡腾。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,026評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽动分。三九已至,卻和暖如春红选,著一層夾襖步出監(jiān)牢的瞬間澜公,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,150評(píng)論 1 272
  • 我被黑心中介騙來泰國打工喇肋, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留坟乾,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,415評(píng)論 3 373
  • 正文 我出身青樓蝶防,卻偏偏與公主長得像甚侣,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子间学,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,092評(píng)論 2 355