本人微信公眾號(jwfy)歡迎關(guān)注
簡單的介紹RPC是什么桶至,RPC整個調(diào)用流程是什么,包含了什么組件镣屹。然后實際編寫一個RPC實例价涝,模擬100個線程調(diào)用以驗證RPC的可用性色瘩,穩(wěn)定性等。最后總結(jié)自己編寫的RPC框架存在哪些問題泞遗,可以去完善的,一個優(yōu)秀的RPC框架應該必備的功能點汹买。
什么是RPC
RPC(Remote Procedure Call)聊倔,遠程過程調(diào)用耙蔑,可通過網(wǎng)絡調(diào)用其他機器的服務請求。RPC是一種規(guī)范甸陌,和TCP钱豁、UDP都沒有關(guān)系,RCP可以采用TCP協(xié)議完成數(shù)據(jù)傳輸卵酪,甚至可以使用HTTP應用協(xié)議。RCP是C端模式溃卡,包含了服務端(服務提供方)蜒简、客戶端(服務使用方),采用特定的網(wǎng)絡傳輸協(xié)議最铁,把數(shù)據(jù)按照特定的協(xié)議包裝后進行傳輸操作等操作垮兑。先來了解下一個具體的RPC調(diào)用請求的執(zhí)行過程
本圖來自網(wǎng)絡
- 1系枪、服務調(diào)用方(Client)調(diào)用本地調(diào)用的方式調(diào)用本地代理對象
- 2、代理對象將類名稱雾棺、方法衬浑、參數(shù)等請求數(shù)據(jù)按照請求協(xié)議組裝成Request
- 3工秩、通過Request數(shù)據(jù)從服務治理獲取有效的服務端信息
- 4、將Request數(shù)據(jù)按照序列化協(xié)議序列化后浪听,使用網(wǎng)絡傳輸協(xié)議通過網(wǎng)絡發(fā)送到服務端中
- 5眉菱、服務端接收到序列化后到數(shù)據(jù),利用序列號協(xié)議反序列化操作生成Request數(shù)據(jù)
- 6克伊、通過Request數(shù)據(jù)找到具體的服務提供方华坦,并調(diào)用執(zhí)行特定的方法季春,計算出執(zhí)行結(jié)果
- 7、執(zhí)行結(jié)果包裝成Response耘拇,按照原路返回至客戶端
- 8宇攻、客戶端解析Response,得到對應的執(zhí)行結(jié)果嘉涌,又或者是具體的錯誤信息
這就是一個完整的RPC調(diào)用過程,對使用方而言就只暴露了本地代理對象扔役,剩下的數(shù)據(jù)解析警医、運輸?shù)榷急话b了预皇,從服務提供方的角度看還有服務暴露,如下圖DUBBO的架構(gòu)圖序仙。
RPC 實踐
學習寫RPC之前必須先了解動態(tài)代理和反射這兩個知識點鲁豪,如不了解先自行了解呈昔,本學習筆記不涉及到此內(nèi)容的介紹。
文件夾目錄
Request對象
// lombok
@Data
public class MethodParameter {
String className;
String methodName;
Object[] arguments;
Class<?>[] parameterTypes;
@Override
public String toString() {
return JSON.toJSONString(this);
}
public static MethodParameter convert(InputStream inputStream) {
try {
ObjectInputStream input = new ObjectInputStream(inputStream);
String className = input.readUTF();
String methodName = input.readUTF();
Class<?>[] parameterTypes = (Class<?>[])input.readObject();
Object[] arguments = (Object[])input.readObject();
MethodParameter methodParameter = new MethodParameter();
methodParameter.setClassName(className);
methodParameter.setMethodName(methodName);
methodParameter.setArguments(arguments);
methodParameter.setParameterTypes(parameterTypes);
return methodParameter;
} catch (Exception e) {
throw new RuntimeException("解析請求錯誤:" + e.getMessage());
}
}
}
可以很清楚的看到convert方法就是從一個輸入流中讀取出類名稱、方法名等數(shù)據(jù)郭宝,組成一個MethodParameter對象粘室,也就是上面所說的Request
服務端 - 服務暴露
public class RpcExploreService {
private Map<String, Object> objectMap = new HashMap<>();
public void explore(String className, Object object) {
objectMap.put(className, object);
}
public Object invoke(MethodParameter methodParameter) {
Object object = objectMap.get(methodParameter.getClassName());
if (object == null) {
throw new RuntimeException("無對應執(zhí)行類:" + methodParameter.getClassName());
}
Method method = null;
try {
method = object.getClass().getMethod(methodParameter.getMethodName(), methodParameter.getParameterTypes());
} catch (NoSuchMethodException e) {
throw new RuntimeException("無對應執(zhí)行方法:" + methodParameter.getClassName() + ", 方法:" + methodParameter.getMethodName());
}
try {
Object result = method.invoke(object, methodParameter.getArguments());
System.out.println(methodParameter);
return result;
} catch (Exception e) {
throw new RuntimeException("invoke方法執(zhí)行失敗:" + e.getMessage());
}
}
}
服務暴露存儲了一個Map<String, Object> objectMap
對象衔统,所有可對外提供服務的都必須添加到該容器中,以便于收到網(wǎng)絡數(shù)據(jù)后能找到對應的服務舱殿,然后采用反射invoke調(diào)用险掀,返回得到的結(jié)果。
服務端 - 網(wǎng)絡數(shù)據(jù)處理
public class IOService implements Runnable{
private int port;
private ServerSocket serverSocket;
private RpcExploreService rpcExploreService;
private volatile boolean flag;
public IOService(RpcExploreService rpcExploreService, int port) throws IOException {
this.rpcExploreService = rpcExploreService;
this.port = port;
this.serverSocket = new ServerSocket(port);
this.flag = true;
System.out.println("服務端啟動了");
// 優(yōu)雅關(guān)閉
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
flag = false;
System.out.println("服務端關(guān)閉了");
}
});
}
@Override
public void run() {
while (flag) {
Socket socket = null;
try {
socket = serverSocket.accept();
} catch (IOException e) {
}
if (socket == null) {
continue;
}
new Thread(new ServerSocketRunnable(socket)).start();
}
}
class ServerSocketRunnable implements Runnable {
private Socket socket;
public ServerSocketRunnable(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try {
InputStream inputStream = socket.getInputStream();
OutputStream outputStream = socket.getOutputStream();
MethodParameter methodParameter = MethodParameter.convert(inputStream);
Object result = rpcExploreService.invoke(methodParameter);
ObjectOutputStream output = new ObjectOutputStream(outputStream);
output.writeObject(result);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
}
簡單的BIO模型侠鳄,開啟了一個ServerSocket后伟恶,接收到數(shù)據(jù)后就把套接字丟給一個新的線程處理十电,ServerSocketRunnable接受一個socket后叹螟,解析出MethodParameter這個請求對象,然后調(diào)用服務暴露的invoke方法畏线,再寫回到socket傳輸給客戶端
客戶端 - 服務訂閱
public class RpcUsedService {
private Map<String, Object> proxyObjectMap = new HashMap<>();
private Map<String, Class> classMap = new HashMap<>();
private IOClient ioClient;
public void setIoClient(IOClient ioClient) {
this.ioClient = ioClient;
}
public void register(Class clazz) {
String className = clazz.getName();
classMap.put(className, clazz);
if (!clazz.isInterface()) {
throw new RuntimeException("暫時只支持接口類型的");
}
try {
RpcInvocationHandler handler = new RpcInvocationHandler();
handler.setClazz(clazz);
Object proxyInstance = Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, handler);
proxyObjectMap.put(className, proxyInstance);
// 然后需要包裝起來
} catch (Exception e) {
e.printStackTrace();
}
}
public <T> T get(Class<T> clazz) {
String className = clazz.getName();
return (T) proxyObjectMap.get(className);
}
class RpcInvocationHandler implements InvocationHandler {
private Class clazz;
public void setClazz(Class clazz) {
this.clazz = clazz;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 實際上proxy沒啥用處寝殴,不需要真正的反invoke射
MethodParameter methodParameter = new MethodParameter();
methodParameter.setClassName(clazz.getName());
methodParameter.setMethodName(method.getName());
methodParameter.setArguments(args);
methodParameter.setParameterTypes(method.getParameterTypes());
return ioClient.invoke(methodParameter);
}
}
}
服務使用方需要使用register進行服務的注冊蚣常,會生成對應的本地代理對象痊银,后續(xù)只需要通過本地代理對象。
客戶端 - 網(wǎng)絡處理
public class IOClient {
private String ip;
private int port;
public IOClient(String ip, int port) throws IOException {
this.ip = ip;
this.port = port;
}
public Object invoke(MethodParameter methodParameter) {
Socket socket = null;
try {
socket = new Socket(ip, port);
OutputStream outputStream = socket.getOutputStream();
ObjectOutputStream ouput = new ObjectOutputStream(outputStream);
ouput.writeUTF(methodParameter.getClassName());
ouput.writeUTF(methodParameter.getMethodName());
ouput.writeObject(methodParameter.getParameterTypes());
ouput.writeObject(methodParameter.getArguments());
InputStream inputStream = socket.getInputStream();
ObjectInputStream input = new ObjectInputStream(inputStream);
return input.readObject();
} catch (Exception e) {
System.out.println(e.getMessage());
} finally {
if (socket != null) {
try {
socket.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
return null;
}
}
代理對象被調(diào)用后生成一個MethodParameter對象,通過此IOClient把數(shù)據(jù)傳輸?shù)椒斩酥孪。⑶曳祷貙臄?shù)據(jù)抖单。
實踐
服務端
public class Service {
public static void main(String[] args) {
RpcExploreService rpcExploreService = new RpcExploreService();
// 傳入的字符串是接口的全名稱
rpcExploreService.explore("new2019.rpc.rpc_v1.expore.Helloworld", new HelloWorldImpl());
try {
Runnable ioService = new IOService(rpcExploreService, 10001);
new Thread(ioService).start();
// 開啟了端口為10001的服務監(jiān)聽
} catch (IOException e) {
}
}
}
客戶端
public class Client {
public static void main(String[] args) {
RpcUsedService rpcUsedService = new RpcUsedService();
rpcUsedService.register(Helloworld.class);
try {
IOClient ioClient = new IOClient("127.0.0.1", 10001);
// 網(wǎng)絡套接字鏈接 同上是10001端口
rpcUsedService.setIoClient(ioClient);
Helloworld helloworld = rpcUsedService.get(Helloworld.class);
// 生成的本地代理對象 proxy
for(int i=0; i< 100; i++) {
// 開啟了100個縣城
new Thread(() -> {
long start = System.currentTimeMillis();
int a = new Random().nextInt(100);
int b = new Random().nextInt(100);
int c = helloworld.add(a, b);
// .add 操作就是屏蔽了所有的細節(jié)矛绘,提供給客戶端使用的方法
System.out.println("a: " + a + ", b:" + b + ", c=" + c + ", 耗時:" + (System.currentTimeMillis() - start));
}).start();
}
} catch (IOException e) {
}
}
}
測試服務
// Helloworld 接口
public interface Helloworld {
String hi();
int add(int a, int b);
}
// Helloworld 接口 實現(xiàn)類
public class HelloWorldImpl implements Helloworld {
@Override
public String hi() {
return "ok";
}
@Override
public int add(int a, int b) {
long start = System.currentTimeMillis();
try {
Thread.sleep(new Random().nextInt(10000));
// 故意添加了耗時操作蔑歌,以便于模擬真實的調(diào)用操作
} catch (InterruptedException e) {
e.printStackTrace();
}
int c = a + b;
System.out.println(Thread.currentThread().getName() + " 耗時:" + (System.currentTimeMillis() - start));
return c;
}
}
運行效果
總結(jié) & 思考
這只是一個非常簡單的RPC實踐次屠,包含了服務暴露雳刺、服務注冊(Proxy生成)裸违、BIO模型進行網(wǎng)絡傳輸,java默認的序列化方法枪汪,對RPC有一個初步的認識和了解怔昨,知道RPC必須包含的模塊。
不過還是有很多需要優(yōu)化的點以改進赖捌。
- IO模型:使用的是BIO模型矮烹,可以改進換成NIO模型,引入netty
- 池化:不要隨意新建線程卤唉,所有的線程都應有線程池統(tǒng)一管理
- 服務發(fā)現(xiàn):本地模擬的小demo仁期,并沒有服務發(fā)現(xiàn)蟀拷,可以采用zk管理
- 序列化:java本身自帶的序列化效率很低,可以換成Hessian(DUBBO默認采用其作為序列化工具)问芬、Protobuf(Protobuf是由Google提出的一種支持多語言的跨平臺的序列化框架)等
還有例如服務統(tǒng)計此衅、優(yōu)雅下線、負載均衡等也都是一個成熟的RPC框架必須要考慮到的點骑歹。
本人微信公眾號(搜索jwfy)歡迎關(guān)注