前言
實際生產(chǎn)中,我們經(jīng)常會碰到這樣的場景: 業(yè)務(wù)方觸發(fā)了某些預(yù)料之中的bug,(比如項目中調(diào)用了第三方的服務(wù),但是第三方的服務(wù)出問題導(dǎo)致無法訪問,這類錯,我們往往不會直接提示用戶,而是選擇屏蔽此類錯誤,寫入錯誤日志),我們處理此類bug往往需要去生產(chǎn)導(dǎo)出日志記錄,然后排查,最后找到第三方服務(wù)的提供者去解決問題.
那么,與其等“被動”業(yè)務(wù)反饋,能不能讓這類問題“主動”推送給開發(fā)呢? 我們能不做個“錯誤預(yù)警”的服務(wù).
消息推送技術(shù),即是解決這類問題的良方.
消息隊列
消息隊列,一般我們會簡稱它為MQ(Message Queue),再介紹消息隊列前,我們還是先簡單解釋一下隊列這種數(shù)據(jù)結(jié)構(gòu)
隊列
隊列是一種先進先出的數(shù)據(jù)結(jié)構(gòu)
如圖,數(shù)據(jù)從隊尾(右)進,從隊頭(左)出.
消息隊列
消息隊列可以簡單的理解為:把要傳輸?shù)臄?shù)據(jù)放在隊列中讲仰。
當(dāng)我們需要使用消息的時候可以取出數(shù)據(jù)供自己使用赢赊。
消息隊列的兩種場景
從以上概念中我們不難看出有兩個角色對隊列至關(guān)重要,一個是放數(shù)據(jù)的,一個是取數(shù)據(jù)的.
當(dāng)然,這兩個角色都有是有規(guī)范的名字的,同時,消息隊列有兩種場景,在這兩種不同的場景里,這兩個角色名字是不同的:
- 生產(chǎn)者消費者模式
- 生產(chǎn)者: 放數(shù)據(jù)進隊列
- 消費者: 從隊列取數(shù)據(jù)
- 發(fā)布者訂閱者模式
- 發(fā)布者: 放數(shù)據(jù)進隊列
- 訂閱者: 從隊列取數(shù)據(jù)
場景區(qū)別
- 生產(chǎn)者消費者模式(一對一)
生產(chǎn)者將生產(chǎn)消息放到隊列里淑际,多個消費者同時監(jiān)聽隊列便斥,誰先搶到消息誰就
會從隊列中取走消息;即對于每個消息只能被最多一個消費者擁有谆甜。
包括三個角色:
- 消息隊列
- 發(fā)送者(生產(chǎn)者)
- 接收者(消費者)
生產(chǎn)消費者模式特點:
- 每個消息只有一個接收者(Consumer)(即一旦被消費畦攘,消息就不再在消息隊列中)
- 發(fā)送者和接收者間沒有依賴性,發(fā)送者發(fā)送消息之后李茫,不管有沒有接收者在運行揭保,都不會影響到發(fā)送者下次發(fā)送消息
- 接收者在成功接收消息之后需向隊列應(yīng)答成功,以便消息隊列刪除當(dāng)前接收的消息
- 發(fā)布者訂閱者模式(一對多)
發(fā)布者將生產(chǎn)消息放到隊列里魄宏,多個監(jiān)聽隊列的消費者都會收到同一份消息秸侣;
即正常情況下每個消費者收到的消息應(yīng)該都是一樣的。
包括三個角色:
- 角色主題(Topic)
- 發(fā)布者(Publisher)
- 訂閱者(Subscriber)
發(fā)布訂閱模式特點:
- 每個消息可以有多個訂閱者
- 發(fā)布者和訂閱者之間有時間上的依賴性宠互。針對某個主題(Topic)的訂閱者味榛,它必須創(chuàng)建一個訂閱者之后,才能消費發(fā)布者的消息
- 為了消費消息予跌,訂閱者需要提前訂閱該角色主題搏色,并保持在線運行
消息隊列解決的問題
消息隊列為了實現(xiàn)實現(xiàn)高性能,高可用券册,可伸縮和最終一致性架構(gòu),主要可以解決如下問題:
- 異步處理
多應(yīng)用對消息隊列中同一消息進行處理频轿,應(yīng)用間并發(fā)處理消息,相比串行處理烁焙,減少處理時間
場景舉例:
用戶注冊后略吨,需要發(fā)注冊郵件和注冊短信.
傳統(tǒng)的做法有兩種
- 串行方式
將注冊信息寫入數(shù)據(jù)庫成功后,發(fā)送注冊郵件考阱,再發(fā)送注冊短信翠忠。
以上三個任務(wù)全部完成后,返回給客戶端
- 并行方式
將注冊信息寫入數(shù)據(jù)庫成功后乞榨,發(fā)送注冊郵件的同時秽之,發(fā)送注冊短信当娱。以上三個任務(wù)完成后,返回給客戶端考榨。與串行的差別是跨细,并行的方式可以提高處理的時間
- 消息隊列異步處理方式
將注冊信息寫入數(shù)據(jù)庫,再將任務(wù)寫入消息隊列后,立即返回成功給客戶端,
則總的響應(yīng)時間依賴于寫入消息隊列的時間河质,
而寫入消息隊列的時間本身是可以很快的冀惭,基本可以忽略不計,
因此總的處理時間相比串行提高了2倍掀鹅,相比并行提高了一倍
- 應(yīng)用耦合
多應(yīng)用間通過消息隊列對同一消息進行處理散休,避免調(diào)用接口失敗導(dǎo)致整個過程失敗
場景舉例:
銀行身份證人臉識別系統(tǒng),用戶上傳身份證圖片,人臉識別系統(tǒng)會對該圖片進行人臉識別.
一般的做法是:
服務(wù)器接收到圖片后乐尊,圖片上傳系統(tǒng)立即調(diào)用人臉識別系統(tǒng)戚丸,調(diào)用完成后再返回成功
該方法有如下缺點:
- 人臉識別系統(tǒng)被調(diào)失敗,導(dǎo)致圖片上傳失敗
- 延遲高扔嵌,需要人臉識別系統(tǒng)處理完成后限府,再返回給客戶端,即使用戶并不需要立即知道結(jié)果
- 圖片上傳系統(tǒng)與人臉識別系統(tǒng)之間互相調(diào)用痢缎,需要做耦合
為了解決以上缺點,我們采用消息隊列解決應(yīng)用間的耦合問題:
消息隊列的做法:
用戶上傳圖片后胁勺,圖片上傳系統(tǒng)將圖片信息順序?qū)懭胂㈥犃性镎苯臃祷爻晒Γ?br>
人臉識別系統(tǒng)則定時從消息隊列中取數(shù)據(jù)奖唯,完成對圖片的識別。
圖片上傳系統(tǒng)并不需要關(guān)心人臉識別系統(tǒng)是否對這些圖片信息的處理朱监、以及何時對這些圖片信息進行處理势告。事實上,由于用戶并不需要立即知道人臉識別結(jié)果抚恒,人臉識別系統(tǒng)可以選擇不同的調(diào)度策略咱台,按照閑時、忙時俭驮、正常時間回溺,對隊列中的圖片信息進行處理。
- 限流削峰
廣泛應(yīng)用于秒殺或搶購活動中混萝,避免流量過大導(dǎo)致應(yīng)用系統(tǒng)掛掉的情況
場景舉例:
電商秒殺活動,常見的形式是數(shù)量極少的熱門商品讓大量的用戶搶購
傳統(tǒng)的做法是用戶直接請求業(yè)務(wù)系統(tǒng),但往往因為并發(fā)用戶過大,或?qū)е聵I(yè)務(wù)系統(tǒng)崩潰,或著出現(xiàn)超賣等等現(xiàn)象.
采用消息隊列后,系統(tǒng)可以從消息隊列中取數(shù)據(jù)遗遵,相當(dāng)于消息隊列做了一次緩沖
采用消息隊列處理秒殺有如下優(yōu)點:
- 請求先入消息隊列,而不是由業(yè)務(wù)處理系統(tǒng)直接處理逸嘀,做了一次緩沖,極大地減少了業(yè)務(wù)處理系統(tǒng)的壓力车要;
- 隊列長度可以做限制,事實上崭倘,秒殺時翼岁,后入隊列的用戶無法秒殺到商品类垫,這些請求可以直接被拋棄,返回活動已結(jié)束或商品已售完信息琅坡;
- 消息驅(qū)動的系統(tǒng)
場景舉例:
用戶新上傳了一批照片悉患,人臉識別系統(tǒng)需要對這個用戶的所有照片進行聚類,聚類完成后由對賬系統(tǒng)重新生成用戶的人臉?biāo)饕?加快查詢)榆俺。這三個子系統(tǒng)間由消息隊列連接起來售躁,前一個階段的處理結(jié)果放入隊列中,后一個階段從隊列中獲取消息繼續(xù)處理.
使用消息隊列有如下優(yōu)點:
- 避免了直接調(diào)用下一個系統(tǒng)導(dǎo)致當(dāng)前系統(tǒng)失斳罱陪捷;
- 每個子系統(tǒng)對于消息的處理方式可以更為靈活,可以選擇收到消息時就處理晃跺,可以選擇定時處理揩局,也可以劃分時間段按不同處理速度處理;
Redis實現(xiàn)發(fā)布訂閱模式
消息隊列是分布式系統(tǒng)中重要的組件掀虎,使用消息隊列主要是為了通過異步處理提高系統(tǒng)性能和削峰凌盯、降低系統(tǒng)耦合性。目前使用較多的消息隊列有ActiveMQ烹玉,RabbitMQ驰怎,Kafka,RocketMQ,這些消息中間件我們暫時不講,本章,我們使用最為簡單的方式REDIS來實現(xiàn)消息隊列的發(fā)布訂閱模式.
Redis
Redis從2.X版本開始,就支持一種基于非持久化消息的二打、使用發(fā)布/訂閱模式實現(xiàn)的事件通知機制.
所謂基于非連接保持,是因為一旦消息訂閱者由于各種異常情況而被迫斷開連接,在其重新連接后,
其離線期間的事件是無法被重新通知的(一些Redis資料中也稱為即發(fā)即棄).
而其使用的發(fā)布/訂閱模式,意味著其機制并不是由訂閱者周期性的從Redis服務(wù)拉取事件通知,
而是由Redis服務(wù)主動推送事件通知到符合條件的若干訂閱者.
通俗的來講,Redis實現(xiàn)的發(fā)布訂閱模式有如下注意點:
- 基于Redis服務(wù)主動推送消息,而非訂閱者循環(huán)拉取.
- 消息即發(fā)即丟(就是消息一發(fā)布,就丟失了,不會保存)
Springboot+Redis實現(xiàn)
- 引入redis依賴
pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
- application.yml
spring:
redis:
port: 6379
database: 0
host: 127.0.0.1
password: 123456
jedis:
pool:
max-active: 8
max-wait: -1ms
max-idle: 8
min-idle: 0
timeout: 5000ms
server:
port: 9999
- redis配置類
package com.mrcoder.sbredispubsub.config.redis;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mrcoder.sbredispubsub.model.MessageSubscriber;
import com.mrcoder.sbredispubsub.utils.RedisPubSubUtil;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
/**
* @Description: Redis配置類
*/
@Configuration
@ConditionalOnClass({RedisTemplate.class})
public class RedisConfig {
/**
* Redis操作模板配置
*
* @param connectionFactory
* @return
*/
@Bean
public RedisTemplate<?, ?> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<byte[], byte[]> template = new RedisTemplate<byte[], byte[]>();
template.setConnectionFactory(connectionFactory);
template.setKeySerializer(new StringRedisSerializer());
template.afterPropertiesSet();
return template;
}
@Bean
public RedisTemplate<?, ?> redisTemplate(RedisConnectionFactory connectionFactory, Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer) {
RedisTemplate<byte[], byte[]> template = new RedisTemplate<byte[], byte[]>();
template.setConnectionFactory(connectionFactory);
// 設(shè)置key/hashkey序列化
RedisSerializer<String> stringSerializer = new StringRedisSerializer();
template.setKeySerializer(stringSerializer);
template.setHashKeySerializer(stringSerializer);
// 設(shè)置值序列化
template.setValueSerializer(jackson2JsonRedisSerializer);
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
template.setKeySerializer(new StringRedisSerializer());
template.afterPropertiesSet();
return template;
}
@Bean
public RedisPubSubUtil redisPubSubUtil(@Qualifier("redisTemplate") RedisTemplate<String, Object> redis) {
return new RedisPubSubUtil(redis);
}
/**
* 序列化定制
*
* @return
*/
@Bean
public Jackson2JsonRedisSerializer<Object> jackson2JsonSerializer() {
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(
Object.class);
// 初始化objectmapper
ObjectMapper mapper = new ObjectMapper();
mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(mapper);
return jackson2JsonRedisSerializer;
}
/**
* 將訂閱器綁定到容器
*
* @param connectionFactory
* @param listener
* @return
*/
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listener) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listener, new PatternTopic("redis.pubsub.*"));
return container;
}
/**
* 消息監(jiān)聽器县忌,使用MessageAdapter可實現(xiàn)自動化解碼及方法代理
*
* @param jackson2JsonRedisSerializer
* @param subscriber
* @return
*/
@Bean
public MessageListenerAdapter listener(Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer, MessageSubscriber subscriber) {
MessageListenerAdapter adapter = new MessageListenerAdapter(subscriber, "onMessage");
adapter.setSerializer(jackson2JsonRedisSerializer);
adapter.afterPropertiesSet();
return adapter;
}
}
- 定義消息實體類
package com.mrcoder.sbredispubsub.model;
import lombok.Data;
import java.util.Date;
@Data
public class SimpleMessage {
private String publisher;
private String content;
private Date createTime;
}
- 消息發(fā)布類
package com.mrcoder.sbredispubsub.utils;
import com.mrcoder.sbredispubsub.model.SimpleMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import java.util.Date;
/**
* @Description: Redis發(fā)布訂閱
*/
public class RedisPubSubUtil {
private static final Logger logger = LoggerFactory.getLogger(RedisPubSubUtil.class);
private RedisTemplate<String, Object> redisTemplate;
public RedisPubSubUtil(RedisTemplate<String, Object> redisTemplate) {
super();
this.redisTemplate = redisTemplate;
}
public void publish(String publisher, String content) {
logger.info("message send {} by {}", content, publisher);
SimpleMessage simpleMessage = new SimpleMessage();
simpleMessage.setContent(content);
simpleMessage.setPublisher(publisher);
simpleMessage.setCreateTime(new Date());
ChannelTopic channelTopic = new ChannelTopic("redis.pubsub.msg");
redisTemplate.convertAndSend(channelTopic.getTopic(), simpleMessage);
}
}
- 定義訂閱者實體類
package com.mrcoder.sbredispubsub.model;
import com.mrcoder.sbredispubsub.utils.FastJsonUtil;
import org.springframework.stereotype.Component;
/**
* @Description: 消息訂閱類
*/
@Component
public class MessageSubscriber {
public void onMessage(SimpleMessage simpleMessage, String pattern) {
logger.info("topic {} received {}", pattern, FastJsonUtil.javaToJsonSnakeCase(simpleMessage));
}
}
- 控制器
package com.mrcoder.sbredispubsub.controller;
import com.mrcoder.sbredispubsub.utils.RedisPubSubUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.*;
@CrossOrigin
@RestController
public class RedisPubSubController {
@Autowired
private RedisPubSubUtil redisPubSubUtil;
@GetMapping("redisPubSub")
public void redisPubSub(){
redisPubSubUtil.publish("echo", "testMessage"));
}
}
改造
以上已經(jīng)實現(xiàn)了基于redis簡單的發(fā)布訂閱了.
那么,在此之上我們多做一點來更好的理解發(fā)布訂閱這塊的內(nèi)容.
- 我們實現(xiàn)推送內(nèi)容到企業(yè)微信
- 我們實現(xiàn)讀取文件的內(nèi)容來推送(可用于版本新功能的發(fā)布推送)
項目地址:
https://github.com/MrCoderStack/SpringBootDemo/tree/master/sb-redis-pubsub