使用Spring AMQP實(shí)現(xiàn)RPC異步調(diào)用
示列
服務(wù)器端
應(yīng)用啟動(dòng)類代碼,
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import java.util.concurrent.TimeUnit;
@ComponentScan
public class Application {
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
System.out.println("===server startup======");
TimeUnit.SECONDS.sleep(120);
context.close();
}
}
配置類:
監(jiān)聽了sms
隊(duì)列,這個(gè)隊(duì)列將會(huì)是客戶端請(qǐng)求消息發(fā)送到的隊(duì)列,配置了適配器,適配器中去調(diào)用服務(wù)叉抡,適配器返回的值就是服務(wù)端返回給客戶端的RPC調(diào)用的結(jié)果
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MQConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("sms");
container.setAcknowledgeMode(AcknowledgeMode.NONE);
//使用適配器的方式
container.setMessageListener(new MessageListenerAdapter(new SendSMSHandler()));
return container;
}
}
處理器,處理器中調(diào)用具體的服務(wù)答毫,我們此列子中處理器方法返回的值是boolean類型
import java.util.concurrent.TimeUnit;
public class SendSMSHandler {
public boolean handleMessage(byte[] body){
String _body = new String(body);
System.out.println(_body);
String[] sms = _body.split(":");
String phone = sms[0];
String content = sms[1];
boolean is = SendSMSTool.sendSMS(phone,content);
try {
TimeUnit.SECONDS.sleep(6);
} catch (InterruptedException e) {
e.printStackTrace();
}
return is;
}
}
服務(wù)接口
public class SendSMSTool {
public static boolean sendSMS(String phone,String content){
System.out.println("發(fā)送短信內(nèi)容:【"+content+"】到手機(jī)號(hào):"+phone);
return phone.length() > 6;
}
}
服務(wù)端步驟
- 消息處理方法褥民,一定要有返回值,這個(gè)返回值就是就是server回復(fù)客戶端的結(jié)果洗搂。比如我們
SendSMSHandler.handleMessage
方法返回的值消返。
客戶端
應(yīng)用啟動(dòng)類:
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@ComponentScan
public class Application {
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
//設(shè)置超時(shí)時(shí)間,單位是ms
rabbitTemplate.setReplyTimeout(10000);
String phone = "15634344321";
String content ="周年慶耘拇,五折優(yōu)惠";
MessageProperties messageProperties = new MessageProperties();
Message message = new Message((phone+":"+content).getBytes(),messageProperties);
//rabbitTemplate.send("","sms",message);
Message reply = rabbitTemplate.sendAndReceive("","sms",message,
new CorrelationData(UUID.randomUUID().toString()));
System.out.println(reply);
System.out.println("message,body:"+new String(reply.getBody()));
System.out.println("message,properties:"+reply.getMessageProperties());
TimeUnit.SECONDS.sleep(30);
context.close();
}
}
配置類代碼:
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MQConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
}
如果服務(wù)端睡眠6s撵颊,則客戶端通過sendAndReceive
方法接收到的Message
對(duì)象為空,怎樣設(shè)置呢惫叛?
客戶端通過設(shè)置rabbitTemplate.setReplyTimeout(10000);
就可以了倡勇。
客戶端步驟
- 使用
sendAndReceive
方法發(fā)送消息,該方法返回一個(gè)Message對(duì)象嘉涌,該對(duì)象就是server返回的結(jié)果 -
sendAndReceive
如果超過5s還沒有收到結(jié)果妻熊,則返回null夸浅,這個(gè)超時(shí)時(shí)間可以通過rabbitTemplate.setReplyTimeout()
來進(jìn)行設(shè)置 - server端返回的結(jié)果一定要注意,和
MessageConverter
有關(guān)扔役,默認(rèn)的org.springframework.amqp.support.converter.SimpleMessageConverter
會(huì)把基本的數(shù)據(jù)類型轉(zhuǎn)換成Serializable
對(duì)象帆喇,這樣的話,client端接收的也是序列化的java對(duì)象亿胸,所以坯钦,需要合理設(shè)置MessageConverter
。
示列代碼中服務(wù)端返回給客戶端的是Boolean類型,
啟動(dòng)服務(wù)端客戶端代碼:
服務(wù)器打印控制臺(tái)打映扌:
15634344321:周年慶婉刀,五折優(yōu)惠
發(fā)送短信內(nèi)容:【周年慶,五折優(yōu)惠】到手機(jī)號(hào):15634344321
客戶端控制臺(tái)打愚致:
message,body:????sr??java.lang.Boolean? r???????Z??valuexp?
message,properties:MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/x-java-serialized-object, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=amq.rabbitmq.reply-to.g2dkAB5yYWJiaXRAaVpicDFqY3d4N3NmYjFud3pyZWh5NloAAFu0AAAABwI=.kHL9zxtdQmtcxl0mQF8zrg==, receivedDelay=null, deliveryTag=1, messageCount=null, consumerTag=null, consumerQueue=null]
我們發(fā)現(xiàn)客戶端接收到的數(shù)據(jù)亂碼路星,將服務(wù)端的處理器的返回值改寫成String類型的,
import java.util.concurrent.TimeUnit;
public class SendSMSHandler {
public String handleMessage(byte[] body){
String _body = new String(body);
System.out.println(_body);
String[] sms = _body.split(":");
String phone = sms[0];
String content = sms[1];
boolean is = SendSMSTool.sendSMS(phone,content);
try {
TimeUnit.SECONDS.sleep(6);
} catch (InterruptedException e) {
e.printStackTrace();
}
return is ? "success":"false";
}
}
此時(shí)發(fā)現(xiàn)客戶端接收的消息數(shù)據(jù)沒有亂碼诱桂,原因何在?我們總結(jié)一下就是服務(wù)器端處理器返回給客戶端boolean類型呈昔,那么返回的消息數(shù)據(jù)就亂碼挥等,如果返回的是String類型,那么返回的消息數(shù)據(jù)就不會(huì)亂碼堤尾。
之前我們學(xué)習(xí)了org.springframework.amqp.support.converter.MessageConverter
接口肝劲,當(dāng)客戶端向服務(wù)端發(fā)送消息的時(shí)候會(huì)進(jìn)行消息類型轉(zhuǎn)換,調(diào)用了fromMessage
方法郭宝,而當(dāng)服務(wù)器返回給客戶端的時(shí)候會(huì)將服務(wù)端的對(duì)象轉(zhuǎn)換成Message
對(duì)象辞槐,很明顯調(diào)用的是toMessage
方法。
我們知道org.springframework.amqp.support.converter.MessageConverter
接口的默認(rèn)實(shí)現(xiàn)是org.springframework.amqp.support.converter.SimpleMessageConverter
粘室,而toMessage
方法的實(shí)現(xiàn)是在其繼承的對(duì)象AbstractMessageConverter
中榄檬,
我們看到其AbstractMessageConverter.toMessage
方法的實(shí)現(xiàn)邏輯是:
@Override
public final Message toMessage(Object object, MessageProperties messageProperties)
throws MessageConversionException {
if (messageProperties == null) {
messageProperties = new MessageProperties();
}
//將對(duì)象轉(zhuǎn)換成Message對(duì)象
Message message = createMessage(object, messageProperties);
messageProperties = message.getMessageProperties();
if (this.createMessageIds && messageProperties.getMessageId() == null) {
messageProperties.setMessageId(UUID.randomUUID().toString());
}
return message;
}
其createMessage
方法就是將對(duì)象轉(zhuǎn)換成Message
對(duì)象,
/**
* Creates an AMQP Message from the provided Object.
*/
@Override
protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
byte[] bytes = null;
if (object instanceof byte[]) {
bytes = (byte[]) object;
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_BYTES);
}
else if (object instanceof String) {
try {
bytes = ((String) object).getBytes(this.defaultCharset);
}
catch (UnsupportedEncodingException e) {
throw new MessageConversionException(
"failed to convert to Message content", e);
}
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
messageProperties.setContentEncoding(this.defaultCharset);
}
//因?yàn)閎oolean類型實(shí)現(xiàn)Serializable接口衔统,所以會(huì)將其序列化
else if (object instanceof Serializable) {
try {
bytes = SerializationUtils.serialize(object);
}
catch (IllegalArgumentException e) {
throw new MessageConversionException(
"failed to convert to serialized Message content", e);
}
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT);
}
if (bytes != null) {
messageProperties.setContentLength(bytes.length);
}
return new Message(bytes, messageProperties);
}
我們?cè)诔绦蛑袑⑿蛄谢瘜?duì)象直接轉(zhuǎn)換成字符串所以亂碼鹿榜,而返回的是String類型的情形的時(shí)候先將字符串轉(zhuǎn)換成相應(yīng)的字節(jié)數(shù)組,然后返回new Message(bytes, messageProperties);
就不會(huì)亂碼锦爵。
繼續(xù)探討舱殿,當(dāng)我們服務(wù)端返回的是一個(gè)對(duì)象的時(shí)候,客戶端會(huì)返回空
返回的對(duì)象:
public class SendStatus {
private String phone;
private String result;
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
public String getResult() {
return result;
}
public void setResult(String result) {
this.result = result;
}
}
將該對(duì)象返回:
public class SendSMSHandler {
public SendStatus handleMessage(byte[] body){
String _body = new String(body);
System.out.println(_body);
String[] sms = _body.split(":");
String phone = sms[0];
String content = sms[1];
boolean is = SendSMSTool.sendSMS(phone,content);
SendStatus sendStatus = new SendStatus();
sendStatus.setPhone(phone);
sendStatus.setResult(is ? "SUCCESS":"FAILURE");
return sendStatus;
}
}
服務(wù)端控制臺(tái):
15634344321:周年慶险掀,五折優(yōu)惠
發(fā)送短信內(nèi)容:【周年慶沪袭,五折優(yōu)惠】到手機(jī)號(hào):15634344321
客戶端:
message,body:
message,properties:MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/octet-stream, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=amq.rabbitmq.reply-to.g2dkAB5yYWJiaXRAaVpicDFqY3d4N3NmYjFud3pyZWh5NloAAFxBAAAABwI=.01fOGW/nvS2nz6gKza+cjg==, receivedDelay=null, deliveryTag=1, messageCount=null, consumerTag=null, consumerQueue=null]
原因何在,因?yàn)槲覀兌x的SendStatus
不走createMessage
中的所有if分支樟氢,最后返回的是null冈绊,怎么解決呢侠鳄,要么自己去定義一個(gè)org.springframework.amqp.support.converter.MessageConverter
實(shí)現(xiàn),要么換一個(gè)默認(rèn)的org.springframework.amqp.support.converter.MessageConverter
實(shí)現(xiàn)焚碌。
改造后的示列
使用AMQP自帶的消息類型轉(zhuǎn)換器Jackson2JsonMessageConverter
服務(wù)端
應(yīng)用啟動(dòng)類畦攘,
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import java.util.concurrent.TimeUnit;
@ComponentScan
public class Application {
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
System.out.println("===server startup======");
TimeUnit.SECONDS.sleep(120);
context.close();
}
}
配置類,添加自定義的消息轉(zhuǎn)換器
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MQConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("sms");
container.setAcknowledgeMode(AcknowledgeMode.NONE);
//使用適配器的方式
container.setMessageListener(new MessageListenerAdapter(new SendSMSHandler(),new Jackson2JsonMessageConverter()));
return container;
}
}
處理器handler十电,返回自定義的SendStatus類型
import java.util.concurrent.TimeUnit;
public class SendSMSHandler {
public SendStatus handleMessage(byte[] body){
String _body = new String(body);
System.out.println(_body);
String[] sms = _body.split(":");
String phone = sms[0];
String content = sms[1];
boolean is = SendSMSTool.sendSMS(phone,content);
SendStatus sendStatus = new SendStatus();
sendStatus.setPhone(phone);
sendStatus.setResult(is ? "SUCCESS":"FAILURE");
try {
TimeUnit.SECONDS.sleep(6);
} catch (InterruptedException e) {
e.printStackTrace();
}
return sendStatus;
}
}
接口服務(wù)知押,
public class SendSMSTool {
public static boolean sendSMS(String phone,String content){
System.out.println("發(fā)送短信內(nèi)容:【"+content+"】到手機(jī)號(hào):"+phone);
return phone.length() > 6;
}
}
服務(wù)端返回的對(duì)象,
public class SendStatus {
private String phone;
private String result;
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
public String getResult() {
return result;
}
public void setResult(String result) {
this.result = result;
}
}
客戶端
應(yīng)用啟動(dòng)類鹃骂,
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@ComponentScan
public class Application {
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
//設(shè)置超時(shí)時(shí)間台盯,單位是ms
rabbitTemplate.setReplyTimeout(10000);
String phone = "15634344321";
String content ="周年慶,五折優(yōu)惠";
MessageProperties messageProperties = new MessageProperties();
Message message = new Message((phone+":"+content).getBytes(),messageProperties);
//rabbitTemplate.send("","sms",message);
Message reply = rabbitTemplate.sendAndReceive("","sms",message,
new CorrelationData(UUID.randomUUID().toString()));
System.out.println(reply);
System.out.println("message,body:"+new String(reply.getBody()));
System.out.println("message,properties:"+reply.getMessageProperties());
TimeUnit.SECONDS.sleep(30);
context.close();
}
}
配置類
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MQConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
}
啟動(dòng)服務(wù)器客戶端畏线,客戶端返回
message,body:{"phone":"15634344321","result":"SUCCESS"}
message,properties:MessageProperties [headers={__TypeId__=rpc.server.SendStatus}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/json, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=amq.rabbitmq.reply-to
返回了SendStatus的JSON格式静盅,因?yàn)槭褂昧?code>Jackson2JsonMessageConverter消息類型轉(zhuǎn)換器。