問題:客戶端發(fā)起遠程調(diào)用,如果服務端長時間不返回怎么辦鞠苟?
這就涉及到一個調(diào)用超時的問題乞榨,平時我們應用中很多場景都會規(guī)定超時時間,比如:sql查詢超時当娱,http請求超時等吃既。那么如果服務端方法執(zhí)行的時間超過規(guī)定的timeout時間,那么客戶端就需要調(diào)出當前調(diào)用跨细,拋出TimeoutException鹦倚。
好了,下面開始對RpcBuidler進行改造了冀惭,讓其支持超時情況的處理震叙。同樣掀鹅,先給出預期的測試方案和結果:
// 業(yè)務類UserService在之前的基礎上增加超時調(diào)用的方法:
public interface UserService {
// other method
/**
* 超時測試
*/
public boolean timeoutTest();
}
//實現(xiàn)類
public class UserServiceImpl implements UserService {
// other method
@Override
public boolean timeoutTest() {
try {
//模擬長時間執(zhí)行
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {}
return true;
}
}
ClientTest中測試代碼:
@Test
public void timeoutTest(){
long beginTime = System.currentTimeMillis();
try {
boolean result = userService.timeoutTest();
} catch (Exception e) {
long period = System.currentTimeMillis() - beginTime;
System.out.println("period:" + period);
Assert.assertTrue(period < 3100);
}
}
有了異步方法的實現(xiàn)經(jīng)驗,其實這個超時處理過程和異步非常類似捐友,都是利用Future機制來實現(xiàn)的淫半,下面對doInvoke方法進行重構,返回一個異步任務:
private Future<RpcResponse> doInvoke(final RpcRequest request) throws IOException, ClassNotFoundException{
//構造并提交FutureTask異步任務
Future<RpcResponse> retVal = (Future<RpcResponse>) handlerPool.submit(new Callable<RpcResponse>(){
@Override
public RpcResponse call() throws Exception {
Object res = null;
try{
//創(chuàng)建連接,獲取輸入輸出流
Socket socket = new Socket(host,port);
try{
ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream in = new ObjectInputStream(socket.getInputStream());
try{
//發(fā)送
out.writeObject(request);
//接受server端的返回信息---阻塞
res = in.readObject();
}finally{
out.close();
in.close();
}
}finally{
socket.close();
}
}catch(Exception e){
throw e;
}
return (RpcResponse)res;
}
});
return retVal;
}
回調(diào)方法invoke修改如下:
@Override
public Object invoke(Object proxy, Method method,
Object[] args) throws Throwable {
//如果是異步方法,立即返回null
if(asyncMethods.get().contains(method.getName())) return null;
Object retVal = null;
RpcRequest request = new RpcRequest(method.getName(), method.getParameterTypes(),args,RpcContext.getAttributes());
RpcResponse rpcResp = null;
try{
Future<RpcResponse> response = doInvoke(request);
//獲取異步結果
rpcResp = (RpcResponse)response.get(TIMEOUT,TimeUnit.MILLISECONDS);
}catch(TimeoutException e){
throw e;
}catch(Exception e){}
if(!rpcResp.isError()){
retVal = rpcResp.getResponseBody();
}else{
throw new RpcException(rpcResp.getErrorMsg());
}
return retVal;
}
可見匣砖,經(jīng)過這樣改造后科吭,所有的方法調(diào)用都是通過Future獲取結果。
提供Hook猴鲫,讓開發(fā)人員進行RPC層面的AOP对人。
首先看下題目提供的Hook接口:
public interface ConsumerHook {
public void before(RpcRequest request);
public void after(RpcRequest request);
}
//實現(xiàn)類
public class UserConsumerHook implements ConsumerHook{
@Override
public void before(RpcRequest request) {
RpcContext.addAttribute("hook key","this is pass by hook");
}
@Override
public void after(RpcRequest request) {
System.out.println("I have finished Rpc calling.");
}
}
hook實現(xiàn)的功能很簡單,即在客戶端進行遠程調(diào)用的前后執(zhí)行before和after方法拂共。
public final class RpcConsumer implements InvocationHandler{
//牺弄。。宜狐。
//鉤子
private ConsumerHook hook;
public RpcConsumer hook(ConsumerHook hook){
this.hook = hook;
return this;
}
static{
userService = (UserService)consumer.targetHostPort(host, port)
.interfaceClass(UserService.class)
.timeout(TIMEOUT)
.hook(new UserConsumerHook())//新增鉤子
.newProxy();
}
//势告。。抚恒。
}
//UserServiceImpl中的測試方法
public Map<String, Object> getMap() {
Map<String,Object> newMap = new HashMap<String,Object>();
newMap.put("name","getMap");
newMap.putAll(RpcContext.getAttributes());
return newMap;
}
我們只需要在doInvoke方法開始出添加鉤子函數(shù)的執(zhí)行邏輯即可咱台。如下:
private Future<RpcResponse> doInvoke(final RpcRequest request) throws IOException, ClassNotFoundException{
//插入鉤子
hook.before(request);
//。俭驮。回溺。
}
同時在asyncCall和invoke方法的結束添加after的執(zhí)行邏輯。具體實現(xiàn)可以看源碼混萝。