簡要說明:有時候接口設(shè)計出于性能考慮均驶,會出現(xiàn)你提交一個請求,但是他不會同步返回結(jié)果,他會通過另外一個渠道來告訴你這個請求的結(jié)果赁炎,這樣的話就需要客戶端去維護(hù)這個請求,但是程序開發(fā)的時候钾腺,是需要知道這個請求的結(jié)果才能執(zhí)行下一步的徙垫,最近做了一下類似的功能,簡單的寫個DEMO總結(jié)一下放棒。
netty就是一個NIO的框架姻报,就用這個來做演示了
服務(wù)器端的代碼很簡單,接到請求以后 開啟一個線程间螟,過幾秒以后在返回這條數(shù)據(jù)的結(jié)果
這里簡單約定了一下數(shù)據(jù)的格式 “messageId|messageBody”吴旋, 其中messageId就是客戶端用來維護(hù)這個請求的,
@Override
public void channelRead0(ChannelHandlerContext ctx, String request) throws Exception {
System.out.println("接受到消息:"+request);
Thread demoThread = new Thread(()->{
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// String messageId = request.split("|")[0];
// ctx.channel().writeAndFlush(messageId+"|resp\r\n");
ctx.channel().writeAndFlush(request+" 回復(fù)\r\n");
});
demoThread.start();
}
客戶端東西比較多厢破,有幾個組件
ConnectPool 用來維護(hù)Netty的鏈接荣瑟,方便程序在handel以外的地方發(fā)送信息,里面有兩個變量
public class ConnectPool {
/**
* 緩存鏈接
*/
private static Map<String, ChannelFuture> CONNECT_POOL = new ConcurrentHashMap<>();
/**
* 緩存消息對象
*/
private static Map<String, SyncMessage> MESSAGE_CACHE = new ConcurrentHashMap<>();
public static void addConnect(String host,ChannelFuture channelFuture){
CONNECT_POOL.put(host,channelFuture);
}
public static ChannelFuture getConnect(String host){
return CONNECT_POOL.get(host);
}
public static void addMessage(SyncMessage message){
MESSAGE_CACHE.put(message.getMessageId(),message);
}
public static SyncMessage getMessage(String messageId){
return MESSAGE_CACHE.get(messageId);
}
public static SyncMessage removeMessage(String messageId){
return MESSAGE_CACHE.remove(messageId);
}
}
SyncMessage摩泪,封裝的消息對象笆焰,其中CountDownLatch 用來做同步控制(也可以使用其他具有相同功能的類,這里看個人喜歡)加勤,createReqMessage封裝了一下消息發(fā)送的格式(\r\n這里是nettyDecode的結(jié)束符)
public class SyncMessage {
private String messageId;
private String req;
private String resp;
private CountDownLatch countDownLatch = new CountDownLatch(1);
public String getMessageId() {
return messageId;
}
public void setMessageId(String messageId) {
this.messageId = messageId;
}
public String getReq() {
return req;
}
public void setReq(String req) {
this.req = req;
}
public String getResp() {
return resp;
}
public void setResp(String resp) {
this.resp = resp;
}
public CountDownLatch getCountDownLatch() {
return countDownLatch;
}
public String createReqMessage(){
return messageId+"|"+req+"\r\n";
}
}
SyncMessageSender仙辟,封裝了發(fā)送數(shù)據(jù)的功能同波,即封裝了發(fā)送的數(shù)據(jù)格式與消息ID,消息ID對外不可見叠国,其他一些注意事項(xiàng)見注釋
public class SyncMessageSender {
public static String sendMessage(String host,String message){
ChannelFuture connect = ConnectPool.getConnect(host);
if(connect == null){
throw new RuntimeException("為鏈接");
}
String messageId = UUID.randomUUID().toString();
SyncMessage syncMessage = new SyncMessage();
syncMessage.setMessageId(messageId);
syncMessage.setReq(message);
String messageReq = syncMessage.createReqMessage();
//添加緩存
ConnectPool.addMessage(syncMessage);
connect.channel().writeAndFlush(messageReq);
try {
//這里一定要加上超時時間未檩,不然會發(fā)生線程無法釋放的情況
syncMessage.getCountDownLatch().await(10, TimeUnit.SECONDS);
return syncMessage.getResp();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//根據(jù)情況清理加入的緩存,建議加入緩存和刪除緩存在同一個方法體內(nèi)
//如果不清理粟焊,會引起OOM
ConnectPool.removeMessage(messageId);
}
return null;
}
}
客戶端的Handler 就是讀取消息冤狡,返回結(jié)果,并且出來同步控制
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg);
String[] split = msg.split("\\|");
SyncMessage message = ConnectPool.getMessage(split[0]);
if(message == null){
return;
}
message.setResp(split[1]);
message.getCountDownLatch().countDown();
}
運(yùn)行服務(wù)端项棠,客戶端后查看一下運(yùn)行結(jié)果
總結(jié):這種模式需要客戶端去維護(hù)這個請求悲雳,其實(shí)就是用一種基于內(nèi)存的組件去控制服務(wù)中的請求,需要服務(wù)器和客戶端約定好數(shù)據(jù)格式香追,并且定義好消息ID合瓢,當(dāng)然這種模式也不是僅僅適用于netty,任何需要異步轉(zhuǎn)同步的方法都可以使用,這篇文章只是總結(jié)一下透典,拋磚引玉晴楔;
demo地址:demo/NettySyncDemo at master · MaHanZhen/demo (github.com)