學(xué)習(xí)完整課程請移步 互聯(lián)網(wǎng) Java 全棧工程師
概述
在實際生產(chǎn)中翔脱,我們需要發(fā)布和訂閱的消息可能不止一種 Topic 熔掺,故此時就需要使用自定義 Binding 來幫我們實現(xiàn)多 Topic 的發(fā)布和訂閱功能
生產(chǎn)者
自定義 Output 接口岩灭,代碼如下:
public interface MySource {
@Output("output1")
MessageChannel output1();
@Output("output2")
MessageChannel output2();
}
發(fā)布消息的案例代碼如下:
@Autowired
private MySource source;
public void send(String msg) throws Exception {
source.output1().send(MessageBuilder.withPayload(msg).build());
}
消費(fèi)者
自定義 Input 接口涤姊,代碼如下:
public interface MySink {
@Input("input1")
SubscribableChannel input1();
@Input("input2")
SubscribableChannel input2();
@Input("input3")
SubscribableChannel input3();
@Input("input4")
SubscribableChannel input4();
}
接收消息的案例代碼如下:
@StreamListener("input1")
public void receiveInput1(String receiveMsg) {
System.out.println("input1 receive: " + receiveMsg);
}
Application
配置 Input 和 Output 的 Binding 信息并配合 @EnableBinding
注解使其生效籽懦,代碼如下:
@SpringBootApplication
@EnableBinding({ MySource.class, MySink.class })
public class RocketMQApplication {
public static void main(String[] args) {
SpringApplication.run(RocketMQApplication.class, args);
}
}
application.yml
生產(chǎn)者
spring:
application:
name: rocketmq-provider
cloud:
stream:
rocketmq:
binder:
namesrv-addr: 192.168.10.149:9876
bindings:
output1: {destination: test-topic1, content-type: application/json}
output2: {destination: test-topic2, content-type: application/json}
消費(fèi)者
spring:
application:
name: rocketmq-consumer
cloud:
stream:
rocketmq:
binder:
namesrv-addr: 192.168.10.149:9876
bindings:
input: {consumer.orderly: true}
bindings:
input1: {destination: test-topic1, content-type: text/plain, group: test-group, consumer.maxAttempts: 1}
input2: {destination: test-topic1, content-type: text/plain, group: test-group, consumer.maxAttempts: 1}
input3: {destination: test-topic2, content-type: text/plain, group: test-group, consumer.maxAttempts: 1}
input4: {destination: test-topic2, content-type: text/plain, group: test-group, consumer.maxAttempts: 1}