spring-amqp-support
一個(gè) Spring-AMQP 擴(kuò)展組件尤泽,便于快速發(fā)送、接收 MQ 事件消息规脸,并將消息自動(dòng)序列化為事件對(duì)象安吁。
信息
基于 spring-boot-starter-amqp:2.6.3 構(gòu)建
by:林同學(xué)(765371578@qq.com)
Getting Started
引入依賴:
<dependency>
<groupId>com.github.LinYuanBaoBao</groupId>
<artifactId>spring-amqp-support</artifactId>
<version>1.0.0-RELEASE</version>
</dependency>
定義 AMQP 配置類:
@Configuration
public class AmqpConfiguration {
@Bean
public MessageConverter messageConverter(EventMessageClassMapper classMapper) {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
converter.setClassMapper(classMapper); // specify class mapper
return converter;
}
@Bean
public EventMessageClassMapper eventMessageClassMapper(EventMessageTypeMapping eventMessageTypeMapping) {
return new EventMessageClassMapper(eventMessageTypeMapping);
}
@Bean
public EventMessageTypeMapping eventMessageTypeMapping() {
return new EventMessageTypeMapping.Builder()
.pkg("my.package")
.build();
}
}
使用 @EventMessage 注解定義事件:
// my.package.UserRegisterEvent
@EventMessage(exchange = "user", code = "user-register")
public class UserRegisterEvent {
private Integer userId;
private Map<String, Object> userInfo;
// ...
}
發(fā)送事件消息:
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage() {
UserRegisterEvent event = new UserRegisterEvent();
event.setUserId(1001);
Map<String, Object> userInfo = new HashMap<>();
userInfo.put("username", "linyuan");
event.setUserInfo(userInfo);
rabbitTemplate.convertAndSend(EventMessageUtils.getEventExchange(event.getClass()), "default", event, message -> {
MessageProperties properties = message.getMessageProperties();
properties.setMessageId(UUID.randomUUID().toString());
properties.setTimestamp(new Date());
return message;
});
}
消息格式如下:
Properties
timestamp: 1644401093
message_id: dc7d0646-a6e6-4183-b755-79ffeefc1159
headers: _EVENT_CODE_: user-register
content_encoding: UTF-8
content_type: application/json
Payload
{"userId":1001,"userInfo":{"username":"linyuan"}}
接收事件消息:
@RabbitHandler
public void listen(UserRegisterEvent message, @Headers Map headers) {
// do something here...
}
或采用下面方式,但該方式消息不會(huì)經(jīng)過 InvocableHandlerMethodDecorator 處理鏈
@Autowired
private RabbitTemplate rabbitTemplate;
public void recMessage(){
UserRegisterEvent msg = (UserRegisterEvent) rabbitTemplate.receiveAndConvert("user.event");
}
InvocableHandlerMethodDecorator
可以注入多個(gè) InvocableHandlerMethodDecorator Bean燃辖,形成消息處理鏈,對(duì)消息做其它操作:
@Bean
public InvocableHandlerMethodDecorator invocableHandlerMethodDecorator() {
return invocableHandlerMethod -> new InvocableHandlerMethodDecoration(invocableHandlerMethod) {
@Override
public Object invoke(Message<?> message, Object... providedArgs) throws Exception {
// do something here...
return super.invoke(message, providedArgs);
}
};
}
IdempotentValidator
注入 IdempotentValidator Bean 對(duì)消息做冪等校驗(yàn):
@Bean
public IdempotentValidator idempotentValidator() {
return new IdempotentValidator() {
@Override
public void valid(Message<?> message) {
// do valid here
}
};
}
Authenticator
注入 Authenticator Bean 在消費(fèi)信息之前進(jìn)行登錄网棍,在消費(fèi)信息之后進(jìn)行注銷:
@Bean
public Authenticator authenticator() {
return new Authenticator() {
@Override
public void login(Message<?> message) {
// do login here
}
@Override
public void logout(Message<?> message) {
// do logout here
}
};
}
記錄消息信息 - MessageRecorder
注入 MessageRecorder Bean 來記錄消息信息:
@Bean
public MessageRecorder messageRecorder() {
return new MessageRecorder() {
@Override
public void record(Message<?> message, boolean success, Object result, Exception error) {
// do record here
}
}
}