一练般、背景
以前我們在spring boot中構(gòu)建一個(gè)消息驅(qū)動(dòng)的微服務(wù)應(yīng)用跋涣,通常會(huì)使用rabbitMQ或是kafka來做消息中間件蝗砾,應(yīng)用中均需代碼實(shí)現(xiàn)具體消息中間件的通信細(xì)節(jié)汇荐。此時(shí)如果再更換一個(gè)新的消息中間件日熬,這會(huì)我們又需新增這些通信代碼棍厌,寫起來會(huì)比較繁瑣,而stream出現(xiàn)就是為了簡化這一過程。
二耘纱、簡介
它是一個(gè)構(gòu)建消息驅(qū)動(dòng)的微服務(wù)應(yīng)用的框架敬肚。通過一些抽象出來的基礎(chǔ)概念,來簡化消息中間件的使用束析。我們可以看下官網(wǎng)上的處理模型圖:
- 關(guān)鍵概念
Inputs 接收消息的通道
Output 發(fā)送消息的通道
Binder 可理解為一個(gè)抽象的中間件艳馒,應(yīng)用通過在spring cloud stream中所注入的inputs,outputs通道來跟外界消息通信,而這些通道又是通過具體中間件的Binder實(shí)現(xiàn)來連接到消息隊(duì)列的服務(wù)器上员寇。有了Binder弄慰,甚至可以不改一行代碼,就切換中間件的類型蝶锋。目前Binder實(shí)現(xiàn)支持的具體中間件類型為:rabbitMQ 和 kfaka這倆
當(dāng)然MQ中的消費(fèi)組group 和 分區(qū) partion的概念他也有曹动,跟kafka里面的概念是一樣的。
Group:消費(fèi)組牲览,一個(gè)消息到達(dá)一個(gè)消費(fèi)組后墓陈,只能被這個(gè)消費(fèi)組的其中一個(gè)實(shí)例消費(fèi)掉;
Partion:消息分區(qū)第献,一個(gè)非常大的topic可以分布到多個(gè)broker(即服務(wù)器)上
三贡必、使用步驟
以下以rabbitMQ為具體中間件作為示例:
1. 在pom.xml中添加依賴
<dependency>
<groupid >org.springframework.cloud</groupid>
<artifactid>spring-cloud-starter-stream-rabbit</artifactid>
</dependency>
2.自定義通道的創(chuàng)建
自定義通道的創(chuàng)建有兩種方式,一種是提前在代碼里定義好的庸毫,另一種是在運(yùn)行時(shí)通過讀取完通道名來創(chuàng)建的仔拟。
2.1 方式一:提前定義好的通道
- 定義生產(chǎn)者
我們定義一個(gè)生產(chǎn)者類SampleSource,這類要完成2件事: - 2.1.1 自定義發(fā)送通道
- 2.1.2 完成發(fā)送消息的功能
@EnableBinding(SampleSource.MultiOutputSource.class)
public class SampleSource {
//自定義發(fā)送通道
public interface MultiOutputSource {
String OUTPUT1 = "output1";
String OUTPUT2 = "output2";
@Output(OUTPUT1)
MessageChannel output1();
@Output(OUTPUT2)
MessageChannel output2();
}
}
注意:要加上@EnableBinding 綁定通道才能夠發(fā)出消息到mq的服務(wù)器飒赃。
-
方式二:動(dòng)態(tài)創(chuàng)建通道
@EnableBinding
@Controller
public class SourceWithDynamicDestination {
@Autowired
private BinderAwareChannelResolver resolver;
@RequestMapping(path = "/{target}", method = POST, consumes = "*/*")
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest(@RequestBody String body, @PathVariable("target") target,
@RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
sendMessage(body, target, contentType);
}
private void sendMessage(String body, String target, Object contentType) {
resolver.resolveDestination(target).send(MessageBuilder.createMessage(body,
new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
}
}
這是官網(wǎng)的示例代碼利花,可以看到關(guān)鍵代碼是這兩句
@Autowired
private BinderAwareChannelResolver resolver;
//中間省略代碼...
resolver.resolveDestination(target).send(MessageBuilder.createMessage(body,
new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
resolveDestination里面的處理就是,先查找傳入的target通道名载佳,看下有沒創(chuàng)建過炒事,如果沒有就會(huì)默認(rèn)創(chuàng)建一個(gè)。
- 生產(chǎn)者實(shí)現(xiàn)發(fā)送消息的函數(shù)
在SampleSource這類里添加sendMessage函數(shù)
public class SampleSource{
@Bean
@InboundChannelAdapter(value = MultiOutputSource.OUTPUT1, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
public synchronized MessageSource<String> messageSource1() {
return new MessageSource<String>() {
public Message<String> receive() {
String message = "FromSource1";
System.out.println("******************");
System.out.println("From Source1");
System.out.println("******************");
System.out.println("Sending value: " + message);
return new GenericMessage(message);
}
};
}
@Bean
@InboundChannelAdapter(value = MultiOutputSource.OUTPUT2, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
public synchronized MessageSource<String> timerMessageSource() {
return new MessageSource<String>() {
public Message<String> receive() {
String message = "FromSource2";
System.out.println("******************");
System.out.println("From Source2");
System.out.println("******************");
System.out.println("Sending value: " + message);
return new GenericMessage(message);
}
};
}
}
對應(yīng)的配置application.yml:
spring:
cloud.stream.bindings:
output1:
contentType: application/json #約定消息的內(nèi)容編碼格式
output2:
contentType: application/json #約定消息的內(nèi)容編碼格式
rabbitmq:
host: 127.0.0.1
port: 5672
username: sa
password: 123456
3. 消費(fèi)者類的實(shí)現(xiàn)
消費(fèi)者類要做的事也是相似的:
3.1.自定義接收通道
3.2消費(fèi)消息的功能實(shí)現(xiàn)
@EnableBinding(SampleSink.MultiInputSink.class)
public class SampleSink {
@StreamListener(MultiInputSink.INPUT1)
public synchronized void receive1(String message) {
System.out.println("******************");
System.out.println("At Sink1");
System.out.println("******************");
System.out.println("Received message " + message);
}
@StreamListener(MultiInputSink.INPUT2)
public synchronized void receive2(String message) {
System.out.println("******************");
System.out.println("At Sink2");
System.out.println("******************");
System.out.println("Received message " + message);
}
public interface MultiInputSink {
String INPUT1 = "input1";
String INPUT2 = "input2";
@Input(INPUT1)
SubscribableChannel input1();
@Input(INPUT2)
SubscribableChannel input2();
}
}
對應(yīng)的配置 application.yml (當(dāng)然用application.propertities)
spring:
cloud.stream:
bindings:
input1:
group: inputGroup #加上group是為了持久化
input2:
group: inputGroup2
rabbit:
host: 127.0.0.1
port: 5672
username: sa
password: 123456
4.死信隊(duì)列設(shè)置
問題列表:
消息在什么條件下進(jìn)入死信隊(duì)列蔫慧?發(fā)送失敗后挠乳,如何設(shè)置重試次數(shù) or TTL?
進(jìn)入死信隊(duì)列之后姑躲,若又需要該消息重新回到原隊(duì)列進(jìn)行處理睡扬,該怎么辦
轉(zhuǎn)入死信隊(duì)列的條件:
- 消息被拒絕(basic.reject/ basic.nack)并且requeue=false
- 消息TTL過期(參考:RabbitMQ之TTL(Time-To-Live 過期時(shí)間))
- 隊(duì)列達(dá)到最大長度
配置死信隊(duì)列及消息消費(fèi)失敗重試次數(shù)(application.yml):
-
配置消息消費(fèi)重試次數(shù)
兩種方式
1) 如果允許重試一定次數(shù):如上圖配置所示,設(shè)置max_attempt 黍析,大于1即可
2)如果不允許重發(fā)卖怜,消費(fèi)失敗了就進(jìn)入死信隊(duì)列,在配置中添加requeueRejected設(shè)為true
spring:
cloud.stream:
bindings:
input1:
group: inputGroup1
rabbit:
bindings:
input1:
consumer:
autoBindDlq: true #啟用死信隊(duì)列阐枣,默認(rèn)會(huì)生成一個(gè)DLX EXCHANGE马靠,當(dāng)消息重復(fù)消費(fèi)失敗后
dlqDeadLetterExchange: input-deadLetter.DLX #如果該列聲明牍戚,那么deadLetterExchange也要聲明,這個(gè)保持一致
deadLetterExchange: input-deadLetter.DLX #與dlqDeadLetterExchange保持一致
requeueRejected: true
host: 127.0.0.1
port: 5672
username: sa
password: 123456
- 在代碼中將某消息轉(zhuǎn)入死信隊(duì)列虑粥,另可見官網(wǎng)示例
5如孝、消息的格式:
(1)消息頭,包含了以下字段:
翻看源碼娩贷,MessageHeader是個(gè)Map結(jié)構(gòu)第晰,左邊是字段名,右邊是字段內(nèi)容彬祖∽率荩可以在創(chuàng)建MessageHeader的時(shí)候傳入已經(jīng)初始化的Map,注意我們可以在這指定body的contentType储笑。contentType能填什么內(nèi)容甜熔,查找下表即可(官網(wǎng)上找到的):
(2)至于消息體(payLoad),它支持自定義結(jié)構(gòu)突倍,格式自定腔稀。
6、如何保證消息的可靠性羽历?
一般是通過具體的消息中間件來保證焊虏。
配置組就可以來保證消息可靠性。見官網(wǎng)描述:消息的持久性
設(shè)置持久性的屬性:durableSubscription