33. 從零開始學(xué)springboot-一文讀懂消息隊列-發(fā)布訂閱(附redis實現(xiàn))

前言

實際生產(chǎn)中,我們經(jīng)常會碰到這樣的場景: 業(yè)務(wù)方觸發(fā)了某些預(yù)料之中的bug,(比如項目中調(diào)用了第三方的服務(wù),但是第三方的服務(wù)出問題導(dǎo)致無法訪問,這類錯,我們往往不會直接提示用戶,而是選擇屏蔽此類錯誤,寫入錯誤日志),我們處理此類bug往往需要去生產(chǎn)導(dǎo)出日志記錄,然后排查,最后找到第三方服務(wù)的提供者去解決問題.

那么,與其等“被動”業(yè)務(wù)反饋,能不能讓這類問題“主動”推送給開發(fā)呢? 我們能不做個“錯誤預(yù)警”的服務(wù).

消息推送技術(shù),即是解決這類問題的良方.

消息隊列

消息隊列,一般我們會簡稱它為MQ(Message Queue),再介紹消息隊列前,我們還是先簡單解釋一下隊列這種數(shù)據(jù)結(jié)構(gòu)

隊列

隊列是一種先進先出的數(shù)據(jù)結(jié)構(gòu)

隊列.png

如圖,數(shù)據(jù)從隊尾(右)進,從隊頭(左)出.

消息隊列

消息隊列可以簡單的理解為:把要傳輸?shù)臄?shù)據(jù)放在隊列中讲仰。
當(dāng)我們需要使用消息的時候可以取出數(shù)據(jù)供自己使用赢赊。

消息隊列的兩種場景

從以上概念中我們不難看出有兩個角色對隊列至關(guān)重要,一個是放數(shù)據(jù)的,一個是取數(shù)據(jù)的.
當(dāng)然,這兩個角色都有是有規(guī)范的名字的,同時,消息隊列有兩種場景,在這兩種不同的場景里,這兩個角色名字是不同的:

  • 生產(chǎn)者消費者模式
    • 生產(chǎn)者: 放數(shù)據(jù)進隊列
    • 消費者: 從隊列取數(shù)據(jù)
  • 發(fā)布者訂閱者模式
    • 發(fā)布者: 放數(shù)據(jù)進隊列
    • 訂閱者: 從隊列取數(shù)據(jù)

場景區(qū)別

  • 生產(chǎn)者消費者模式(一對一)
生產(chǎn)者將生產(chǎn)消息放到隊列里淑际,多個消費者同時監(jiān)聽隊列便斥,誰先搶到消息誰就
會從隊列中取走消息;即對于每個消息只能被最多一個消費者擁有谆甜。

包括三個角色:

  1. 消息隊列
  2. 發(fā)送者(生產(chǎn)者)
  3. 接收者(消費者)
消息隊列-生產(chǎn)消費.png

生產(chǎn)消費者模式特點:

  1. 每個消息只有一個接收者(Consumer)(即一旦被消費畦攘,消息就不再在消息隊列中)
  2. 發(fā)送者和接收者間沒有依賴性,發(fā)送者發(fā)送消息之后李茫,不管有沒有接收者在運行揭保,都不會影響到發(fā)送者下次發(fā)送消息
  3. 接收者在成功接收消息之后需向隊列應(yīng)答成功,以便消息隊列刪除當(dāng)前接收的消息
  • 發(fā)布者訂閱者模式(一對多)
發(fā)布者將生產(chǎn)消息放到隊列里魄宏,多個監(jiān)聽隊列的消費者都會收到同一份消息秸侣;
即正常情況下每個消費者收到的消息應(yīng)該都是一樣的。

包括三個角色:

  1. 角色主題(Topic)
  2. 發(fā)布者(Publisher)
  3. 訂閱者(Subscriber)
消息隊列-發(fā)布訂閱.png

發(fā)布訂閱模式特點:

  1. 每個消息可以有多個訂閱者
  2. 發(fā)布者和訂閱者之間有時間上的依賴性宠互。針對某個主題(Topic)的訂閱者味榛,它必須創(chuàng)建一個訂閱者之后,才能消費發(fā)布者的消息
  3. 為了消費消息予跌,訂閱者需要提前訂閱該角色主題搏色,并保持在線運行

消息隊列解決的問題

消息隊列為了實現(xiàn)實現(xiàn)高性能,高可用券册,可伸縮和最終一致性架構(gòu),主要可以解決如下問題:

  • 異步處理
多應(yīng)用對消息隊列中同一消息進行處理频轿,應(yīng)用間并發(fā)處理消息,相比串行處理烁焙,減少處理時間

場景舉例:
用戶注冊后略吨,需要發(fā)注冊郵件和注冊短信.

傳統(tǒng)的做法有兩種

  1. 串行方式
將注冊信息寫入數(shù)據(jù)庫成功后,發(fā)送注冊郵件考阱,再發(fā)送注冊短信翠忠。
以上三個任務(wù)全部完成后,返回給客戶端
串行.png
  1. 并行方式
將注冊信息寫入數(shù)據(jù)庫成功后乞榨,發(fā)送注冊郵件的同時秽之,發(fā)送注冊短信当娱。以上三個任務(wù)完成后,返回給客戶端考榨。與串行的差別是跨细,并行的方式可以提高處理的時間
并行.png
  1. 消息隊列異步處理方式
將注冊信息寫入數(shù)據(jù)庫,再將任務(wù)寫入消息隊列后,立即返回成功給客戶端,
則總的響應(yīng)時間依賴于寫入消息隊列的時間河质,
而寫入消息隊列的時間本身是可以很快的冀惭,基本可以忽略不計,
因此總的處理時間相比串行提高了2倍掀鹅,相比并行提高了一倍
消息隊列-異步.png
  • 應(yīng)用耦合
多應(yīng)用間通過消息隊列對同一消息進行處理散休,避免調(diào)用接口失敗導(dǎo)致整個過程失敗

場景舉例:
銀行身份證人臉識別系統(tǒng),用戶上傳身份證圖片,人臉識別系統(tǒng)會對該圖片進行人臉識別.

一般的做法是:
服務(wù)器接收到圖片后乐尊,圖片上傳系統(tǒng)立即調(diào)用人臉識別系統(tǒng)戚丸,調(diào)用完成后再返回成功


應(yīng)用耦合.png

該方法有如下缺點:

  1. 人臉識別系統(tǒng)被調(diào)失敗,導(dǎo)致圖片上傳失敗
  2. 延遲高扔嵌,需要人臉識別系統(tǒng)處理完成后限府,再返回給客戶端,即使用戶并不需要立即知道結(jié)果
  3. 圖片上傳系統(tǒng)與人臉識別系統(tǒng)之間互相調(diào)用痢缎,需要做耦合

為了解決以上缺點,我們采用消息隊列解決應(yīng)用間的耦合問題:

消息隊列的做法:
用戶上傳圖片后胁勺,圖片上傳系統(tǒng)將圖片信息順序?qū)懭胂㈥犃性镎苯臃祷爻晒Γ?br> 人臉識別系統(tǒng)則定時從消息隊列中取數(shù)據(jù)奖唯,完成對圖片的識別。

圖片上傳系統(tǒng)并不需要關(guān)心人臉識別系統(tǒng)是否對這些圖片信息的處理朱监、以及何時對這些圖片信息進行處理势告。事實上,由于用戶并不需要立即知道人臉識別結(jié)果抚恒,人臉識別系統(tǒng)可以選擇不同的調(diào)度策略咱台,按照閑時、忙時俭驮、正常時間回溺,對隊列中的圖片信息進行處理。

消息隊列-應(yīng)用耦合.png
  • 限流削峰
廣泛應(yīng)用于秒殺或搶購活動中混萝,避免流量過大導(dǎo)致應(yīng)用系統(tǒng)掛掉的情況

場景舉例:
電商秒殺活動,常見的形式是數(shù)量極少的熱門商品讓大量的用戶搶購
傳統(tǒng)的做法是用戶直接請求業(yè)務(wù)系統(tǒng),但往往因為并發(fā)用戶過大,或?qū)е聵I(yè)務(wù)系統(tǒng)崩潰,或著出現(xiàn)超賣等等現(xiàn)象.

采用消息隊列后,系統(tǒng)可以從消息隊列中取數(shù)據(jù)遗遵,相當(dāng)于消息隊列做了一次緩沖


消息隊列-限流削峰.png

采用消息隊列處理秒殺有如下優(yōu)點:

  1. 請求先入消息隊列,而不是由業(yè)務(wù)處理系統(tǒng)直接處理逸嘀,做了一次緩沖,極大地減少了業(yè)務(wù)處理系統(tǒng)的壓力车要;
  2. 隊列長度可以做限制,事實上崭倘,秒殺時翼岁,后入隊列的用戶無法秒殺到商品类垫,這些請求可以直接被拋棄,返回活動已結(jié)束或商品已售完信息琅坡;
  • 消息驅(qū)動的系統(tǒng)
    場景舉例:
    用戶新上傳了一批照片悉患,人臉識別系統(tǒng)需要對這個用戶的所有照片進行聚類,聚類完成后由對賬系統(tǒng)重新生成用戶的人臉?biāo)饕?加快查詢)榆俺。這三個子系統(tǒng)間由消息隊列連接起來售躁,前一個階段的處理結(jié)果放入隊列中,后一個階段從隊列中獲取消息繼續(xù)處理.
消息隊列-消息驅(qū)動.png

使用消息隊列有如下優(yōu)點:

  1. 避免了直接調(diào)用下一個系統(tǒng)導(dǎo)致當(dāng)前系統(tǒng)失斳罱陪捷;
  2. 每個子系統(tǒng)對于消息的處理方式可以更為靈活,可以選擇收到消息時就處理晃跺,可以選擇定時處理揩局,也可以劃分時間段按不同處理速度處理;

Redis實現(xiàn)發(fā)布訂閱模式

消息隊列是分布式系統(tǒng)中重要的組件掀虎,使用消息隊列主要是為了通過異步處理提高系統(tǒng)性能和削峰凌盯、降低系統(tǒng)耦合性。目前使用較多的消息隊列有ActiveMQ烹玉,RabbitMQ驰怎,Kafka,RocketMQ,這些消息中間件我們暫時不講,本章,我們使用最為簡單的方式REDIS來實現(xiàn)消息隊列的發(fā)布訂閱模式.

Redis

Redis從2.X版本開始,就支持一種基于非持久化消息的二打、使用發(fā)布/訂閱模式實現(xiàn)的事件通知機制.
所謂基于非連接保持,是因為一旦消息訂閱者由于各種異常情況而被迫斷開連接,在其重新連接后,
其離線期間的事件是無法被重新通知的(一些Redis資料中也稱為即發(fā)即棄).
而其使用的發(fā)布/訂閱模式,意味著其機制并不是由訂閱者周期性的從Redis服務(wù)拉取事件通知,
而是由Redis服務(wù)主動推送事件通知到符合條件的若干訂閱者.

通俗的來講,Redis實現(xiàn)的發(fā)布訂閱模式有如下注意點:

  • 基于Redis服務(wù)主動推送消息,而非訂閱者循環(huán)拉取.
  • 消息即發(fā)即丟(就是消息一發(fā)布,就丟失了,不會保存)

Springboot+Redis實現(xiàn)

  • 引入redis依賴
    pom.xml
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
  • application.yml
spring:
  redis:
    port: 6379
    database: 0
    host: 127.0.0.1
    password: 123456
    jedis:
      pool:
        max-active: 8
        max-wait: -1ms
        max-idle: 8
        min-idle: 0
    timeout: 5000ms
server:
  port: 9999
  • redis配置類
package com.mrcoder.sbredispubsub.config.redis;


import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mrcoder.sbredispubsub.model.MessageSubscriber;
import com.mrcoder.sbredispubsub.utils.RedisPubSubUtil;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;


/**
 * @Description: Redis配置類
 */
@Configuration
@ConditionalOnClass({RedisTemplate.class})
public class RedisConfig {

    /**
     * Redis操作模板配置
     *
     * @param connectionFactory
     * @return
     */
    @Bean
    public RedisTemplate<?, ?> redisTemplate(RedisConnectionFactory connectionFactory) {
        RedisTemplate<byte[], byte[]> template = new RedisTemplate<byte[], byte[]>();
        template.setConnectionFactory(connectionFactory);
        template.setKeySerializer(new StringRedisSerializer());
        template.afterPropertiesSet();
        return template;
    }

    @Bean
    public RedisTemplate<?, ?> redisTemplate(RedisConnectionFactory connectionFactory, Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer) {
        RedisTemplate<byte[], byte[]> template = new RedisTemplate<byte[], byte[]>();
        template.setConnectionFactory(connectionFactory);
        // 設(shè)置key/hashkey序列化
        RedisSerializer<String> stringSerializer = new StringRedisSerializer();
        template.setKeySerializer(stringSerializer);
        template.setHashKeySerializer(stringSerializer);

        // 設(shè)置值序列化
        template.setValueSerializer(jackson2JsonRedisSerializer);
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();

        template.setKeySerializer(new StringRedisSerializer());
        template.afterPropertiesSet();

        return template;
    }

    @Bean
    public RedisPubSubUtil redisPubSubUtil(@Qualifier("redisTemplate") RedisTemplate<String, Object> redis) {
        return new RedisPubSubUtil(redis);
    }

    /**
     * 序列化定制
     *
     * @return
     */
    @Bean
    public Jackson2JsonRedisSerializer<Object> jackson2JsonSerializer() {
        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(
                Object.class);

        // 初始化objectmapper
        ObjectMapper mapper = new ObjectMapper();
        mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
        mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(mapper);
        return jackson2JsonRedisSerializer;
    }

    /**
     * 將訂閱器綁定到容器
     *
     * @param connectionFactory
     * @param listener
     * @return
     */
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listener) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listener, new PatternTopic("redis.pubsub.*"));
        return container;
    }

    /**
     * 消息監(jiān)聽器县忌,使用MessageAdapter可實現(xiàn)自動化解碼及方法代理
     *
     * @param jackson2JsonRedisSerializer
     * @param subscriber
     * @return
     */
    @Bean
    public MessageListenerAdapter listener(Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer, MessageSubscriber subscriber) {
        MessageListenerAdapter adapter = new MessageListenerAdapter(subscriber, "onMessage");
        adapter.setSerializer(jackson2JsonRedisSerializer);
        adapter.afterPropertiesSet();
        return adapter;
    }

}
  • 定義消息實體類
package com.mrcoder.sbredispubsub.model;

import lombok.Data;

import java.util.Date;

@Data
public class SimpleMessage {
    private String publisher;
    private String content;
    private Date createTime;
}
  • 消息發(fā)布類
package com.mrcoder.sbredispubsub.utils;

import com.mrcoder.sbredispubsub.model.SimpleMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;

import java.util.Date;

/**
 * @Description: Redis發(fā)布訂閱
 */
public class RedisPubSubUtil {

    private static final Logger logger = LoggerFactory.getLogger(RedisPubSubUtil.class);

    private RedisTemplate<String, Object> redisTemplate;

    public RedisPubSubUtil(RedisTemplate<String, Object> redisTemplate) {
        super();
        this.redisTemplate = redisTemplate;
    }

    public void publish(String publisher, String content) {
        logger.info("message send {} by {}", content, publisher);
        SimpleMessage simpleMessage = new SimpleMessage();
        simpleMessage.setContent(content);
        simpleMessage.setPublisher(publisher);
        simpleMessage.setCreateTime(new Date());
        ChannelTopic channelTopic = new ChannelTopic("redis.pubsub.msg");
        redisTemplate.convertAndSend(channelTopic.getTopic(), simpleMessage);
    }
}

  • 定義訂閱者實體類
package com.mrcoder.sbredispubsub.model;

import com.mrcoder.sbredispubsub.utils.FastJsonUtil;
import org.springframework.stereotype.Component;

/**
 * @Description: 消息訂閱類
 */
@Component
public class MessageSubscriber {
    public void onMessage(SimpleMessage simpleMessage, String pattern) {
        logger.info("topic {} received {}", pattern, FastJsonUtil.javaToJsonSnakeCase(simpleMessage));
    }
}

  • 控制器
package com.mrcoder.sbredispubsub.controller;
import com.mrcoder.sbredispubsub.utils.RedisPubSubUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.*;

@CrossOrigin
@RestController
public class RedisPubSubController {

    @Autowired
    private RedisPubSubUtil redisPubSubUtil;

    @GetMapping("redisPubSub")
    public void redisPubSub(){
        redisPubSubUtil.publish("echo", "testMessage"));
    }
}

改造

以上已經(jīng)實現(xiàn)了基于redis簡單的發(fā)布訂閱了.

那么,在此之上我們多做一點來更好的理解發(fā)布訂閱這塊的內(nèi)容.

  • 我們實現(xiàn)推送內(nèi)容到企業(yè)微信
  • 我們實現(xiàn)讀取文件的內(nèi)容來推送(可用于版本新功能的發(fā)布推送)

項目地址:

https://github.com/MrCoderStack/SpringBootDemo/tree/master/sb-redis-pubsub

請關(guān)注我的訂閱號

訂閱號.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市继效,隨后出現(xiàn)的幾起案子症杏,更是在濱河造成了極大的恐慌,老刑警劉巖瑞信,帶你破解...
    沈念sama閱讀 212,816評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件厉颤,死亡現(xiàn)場離奇詭異,居然都是意外死亡凡简,警方通過查閱死者的電腦和手機逼友,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,729評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來秤涩,“玉大人帜乞,你說我怎么就攤上這事】鹁欤” “怎么了黎烈?”我有些...
    開封第一講書人閱讀 158,300評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長。 經(jīng)常有香客問我怨喘,道長津畸,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,780評論 1 285
  • 正文 為了忘掉前任必怜,我火速辦了婚禮肉拓,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘梳庆。我一直安慰自己暖途,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 65,890評論 6 385
  • 文/花漫 我一把揭開白布膏执。 她就那樣靜靜地躺著驻售,像睡著了一般。 火紅的嫁衣襯著肌膚如雪更米。 梳的紋絲不亂的頭發(fā)上欺栗,一...
    開封第一講書人閱讀 50,084評論 1 291
  • 那天,我揣著相機與錄音征峦,去河邊找鬼迟几。 笑死,一個胖子當(dāng)著我的面吹牛栏笆,可吹牛的內(nèi)容都是我干的类腮。 我是一名探鬼主播,決...
    沈念sama閱讀 39,151評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼蛉加,長吁一口氣:“原來是場噩夢啊……” “哼蚜枢!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起针饥,我...
    開封第一講書人閱讀 37,912評論 0 268
  • 序言:老撾萬榮一對情侶失蹤厂抽,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后丁眼,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體筷凤,經(jīng)...
    沈念sama閱讀 44,355評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,666評論 2 327
  • 正文 我和宋清朗相戀三年户盯,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片饲化。...
    茶點故事閱讀 38,809評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡莽鸭,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出吃靠,到底是詐尸還是另有隱情硫眨,我是刑警寧澤,帶...
    沈念sama閱讀 34,504評論 4 334
  • 正文 年R本政府宣布巢块,位于F島的核電站礁阁,受9級特大地震影響巧号,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜姥闭,卻給世界環(huán)境...
    茶點故事閱讀 40,150評論 3 317
  • 文/蒙蒙 一丹鸿、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧棚品,春花似錦靠欢、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,882評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至锅纺,卻和暖如春掷空,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背囤锉。 一陣腳步聲響...
    開封第一講書人閱讀 32,121評論 1 267
  • 我被黑心中介騙來泰國打工坦弟, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人嚼锄。 一個月前我還...
    沈念sama閱讀 46,628評論 2 362
  • 正文 我出身青樓减拭,卻偏偏與公主長得像,于是被迫代替她去往敵國和親区丑。 傳聞我的和親對象是個殘疾皇子拧粪,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,724評論 2 351