最近收到好幾個類似的問題:使用Spring Cloud Stream操作RabbitMQ或Kafka的時候飒焦,出現(xiàn)消息重復(fù)消費的問題。通過溝通與排查下來主要還是用戶對消費組的認識不夠悼沿。其實等舔,在之前的博文以及《Spring Cloud微服務(wù)實戰(zhàn)》一書中都有提到關(guān)于消費組的概念以及作用。
那么什么是消費組呢糟趾?為什么要用消費組慌植?它解決什么問題呢?摘錄一段之前博文的內(nèi)容义郑,來解答這些疑問:
通常在生產(chǎn)環(huán)境蝶柿,我們的每個服務(wù)都不會以單節(jié)點的方式運行在生產(chǎn)環(huán)境,當同一個服務(wù)啟動多個實例的時候非驮,這些實例都會綁定到同一個消息通道的目標主題(Topic)上交汤。默認情況下,當生產(chǎn)者發(fā)出一條消息到綁定通道上劫笙,這條消息會產(chǎn)生多個副本被每個消費者實例接收和處理(出現(xiàn)上述重復(fù)消費問題)芙扎。但是有些業(yè)務(wù)場景之下,我們希望生產(chǎn)者產(chǎn)生的消息只被其中一個實例消費填大,這個時候我們需要為這些消費者設(shè)置消費組來實現(xiàn)這樣的功能戒洼。
下面,通過一個例子來看看如何使用消費組:
問題重現(xiàn)
構(gòu)建消息消費端
第一步:創(chuàng)建綁定接口允华,綁定example-topic輸入通道(默認情況下圈浇,會綁定到RabbitMQ的同名Exchange或Kafaka的同名Topic)。
interfaceExampleBinder{
? ? String NAME = "example-topic";
? ? @Input(NAME)
? ? SubscribableChannelinput();
}
第二步:對上述輸入通道創(chuàng)建監(jiān)聽與處理邏輯靴寂。
@EnableBinding(ExampleBinder.class)public classExampleReceiver{
? ? private static Logger logger = LoggerFactory.getLogger(ExampleReceiver.class);
? ? @StreamListener(ExampleBinder.NAME)
? ? publicvoidreceive(String payload){
? ? ? ? logger.info("Received: " + payload);
? ? }
}
第三步汉额;創(chuàng)建應(yīng)用主類和配置文件
@SpringBootApplicationpublic classExampleApplication{
? ? publicstaticvoidmain(String[] args){
? ? ? ? SpringApplication.run(ExampleApplication.class, args);
? ? }
}
spring.application.name=stream-consumer-groupserver.port=0
這里設(shè)置server.port=0,以方便在本地啟動多實例來重現(xiàn)問題榨汤。
完成上述操作之后蠕搜,啟動兩個該應(yīng)用的實例,以備后續(xù)調(diào)用收壕。
構(gòu)建消息生產(chǎn)端
比較簡單妓灌,需要注意的是,使用@Output創(chuàng)建一個同名的輸出綁定蜜宪,這樣發(fā)出的消息才能被上述啟動的實例接收到虫埂。具體實現(xiàn)如下:
@RunWith(SpringRunner.class)@EnableBinding(value = {ExampleApplicationTests.ExampleBinder.class})public classExampleApplicationTests{
@Autowired private ExampleBinder exampleBinder;
@Test publicvoidexampleBinderTester(){
? ? ? ? exampleBinder.output().send(MessageBuilder.withPayload("Produce a message from : http://blog.didispace.com").build());
}
public interfaceExampleBinder{
String NAME = "example-topic";
@Output(NAME)
MessageChanneloutput();
}
}
啟動上述測試用例之后,可以發(fā)現(xiàn)之前啟動的兩個實例都收到的消息圃验,并在日志中打印了:Received: Produce a message from : http://blog.didispace.com掉伏。消息重復(fù)消費的問題成功重現(xiàn)!
使用消費組解決問題
如何解決上述消息重復(fù)消費的問題呢?我們只需要在配置文件中增加如下配置即可:
spring.cloud.stream.bindings.example-topic.group=aaa
當我們指定了某個綁定所指向的消費組之后斧散,往當前主題發(fā)送的消息在每個訂閱消費組中供常,只會有一個訂閱者接收和消費,從而實現(xiàn)了對消息的負載均衡鸡捐。只所以之前會出現(xiàn)重復(fù)消費的問題栈暇,是由于默認情況下,任何訂閱都會產(chǎn)生一個匿名消費組箍镜,所以每個訂閱實例都會有自己的消費組源祈,從而當有消息發(fā)送的時候,就形成了廣播的模式色迂。
另外香缺,需要注意上述配置中example-topic是在代碼中@Output和@Input中傳入的名字。
覺得不錯請點贊支持歇僧,歡迎留言或進我的個人群855801563領(lǐng)取【架構(gòu)資料專題目合集90期】图张、【BATJTMD大廠JAVA面試真題1000+】,本群專用于學(xué)習(xí)交流技術(shù)馏慨、分享面試機會埂淮,拒絕廣告,我也會在群內(nèi)不定期答題写隶、探討