? ? ? ? 隨著soa的出現(xiàn)和普及,遠程分布式調用已經成為主流仗岸。逐漸流行起來的微服務更是讓RPC大放異彩耿焊。目前業(yè)界開源的RPC框架非常多,比較主流的RPC框架有facebook開發(fā)的Apache Thrift奕坟,Hadoop的Avro-RPC祥款,google開源的gRpc等。dubbo也是基于RPC框架之上的一個服務治理框架月杉。
? ? ? ? 下面將通過java原生的序列化刃跛,Socket通信,動態(tài)代理和反射機制苛萎,實現(xiàn)一個簡單的RPC框架桨昙。
????????它主要由三部分組成:
1检号、服務提供者:運行在服務端,負責提供服務接口定義和服務實現(xiàn)類蛙酪。
2齐苛、服務發(fā)布者:它運行在RPC服務端,負責將本地服務發(fā)布成遠程服務滤否,供其他消費者調用脸狸。
3、本地服務代理:它運行在RPC客戶端藐俺,通過代理調用遠程服務提供者炊甲,然后將結果進行封裝返回給本地消費者。
????????首先是服務端接口定義和實現(xiàn):
public interface EchoService {
????????String echo(String ping);
}
public class EchoServiceImpl implements EchoService{
????@Override
????public String echo(String ping) {
????????return ping!=null?ping +" I am Ok":" I am Ok";
????}
}
服務端的服務發(fā)布者代碼如下:
public class RpcExporter {
????????static Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
????????public static void exporter(String hostName , int port) throws Exception{
????????????????ServerSocket server = new ServerSocket();
????????????????server.bind(new InetSocketAddress(hostName, port));
????????????????try{
????????????????????????while(true){
????????????????????????????executor.execute(new ExporterTask(server.accept()));
????????????????????????}
????????????????}catch(Exception e){
????????????????????e.printStackTrace();
????????????????}
????????}
????????private static class ExporterTask implements Runnable{
????????????Socket client = null ;
????????????public ExporterTask(Socket client){
????????????this.client = client;
????????????}
????????????@Override
????????????public void run(){
????????????????????ObjectInputStream input = null ;
????????????????????ObjectOutputStream out = null;
????????????????try{
????????????????????//從client獲取數(shù)據(jù)
????????????????????input = new ObjectInputStream(client.getInputStream());
????????????????????String interfaceName = input.readUTF();
????????????????????Class service = Class.forName(interfaceName);
????????????????????String methodName = input.readUTF();
????????????????????Class[] parameterTypes = (Class[])input.readObject();
????????????????????Object[] arguments = (Object[] )input.readObject();
????????????????????Method method = service.getMethod(methodName, parameterTypes);
????????????????????Object result = method.invoke(service.newInstance(), arguments);
????????????????????out = new ObjectOutputStream(client.getOutputStream());
????????????????????out.writeObject(result);
????????????????}catch(Exception e){
????????????????????e.printStackTrace();
????????????????}finally{
????????????????????????try {
????????????????????????????input.close();
????????????????????????????out.close();
????????????????????????} catch (IOException e) {
????????????????????????????// TODO Auto-generated catch block
????????????????????????????e.printStackTrace();
????????????????????????}
????????????}
????????}
????}
}
????????服務發(fā)布者的主要職責如下:
(1)作為服務端欲芹,監(jiān)聽客戶端的TCP連接卿啡,接收新的客戶端連接之后,將其封裝成Task菱父,讓線程池執(zhí)行颈娜。
(2)將客戶端發(fā)送的碼流反序列化成對象,反射調用服務實現(xiàn)者浙宜,獲取執(zhí)行結果官辽。
(3)將執(zhí)行結果對象反序列化,通過Socket發(fā)送給客戶端粟瞬。
(4)遠程服務調用完成之后同仆,釋放socket等連接資源,防止句柄泄露裙品。
????????接下來是客戶端本地服務代理源碼:
public class RpcImporter {
????????@SuppressWarnings("unchecked")
????????public S importer(final Class serviceClass , final InetSocketAddress addr){
????????????return (S)Proxy.newProxyInstance(serviceClass.getClassLoader(), new Class[]{
????????????serviceClass.getInterfaces()[0]}, new InvocationHandler() {
????????????????????Socket socket = null ;
????????????????????ObjectOutputStream output = null ;
????????????????????ObjectInputStream input = null ;
????????????????????@Override
????????????????????public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
? ? ? ? ? ? ? ? ? ? ? ? ?try{
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? socket = new Socket();
????????????????????????????????socket.connect(addr);
????????????????????????????????output = new ObjectOutputStream(socket.getOutputStream());
????????????????????????????????output.writeUTF(serviceClass.getName());
????????????????????????????????output.writeUTF(method.getName());
????????????????????????????????output.writeObject(method.getParameterTypes());
????????????????????????????????output.writeObject(args);
????????????????????????????????input = new ObjectInputStream(socket.getInputStream());
????????????????????????????????return input.readObject();
????????????????????????}finally{
????????????????????????????if(output!=null){
? ? ? ? ? ? ? ? ? ? ? ? ????? ? output.close();
? ? ? ? ? ? ? ? ? ? ? ? ? ? }
????????????????????????????if(input!=null){
????????????????????????????????input.close();
? ? ? ? ? ? ? ? ? ? ? ? ? ? }
????????????????????}
????????????????}
? ? ? ? ?});
? ? ?}
}
????????本地服務代理的主要功能如下:
(1)將本地的接口調用轉化為JDK的動態(tài)代理俗批,在動態(tài)代理中實現(xiàn)接口的遠程調用。
(2)創(chuàng)建Socket客戶端市怎,根據(jù)指定地址連接遠程服務提供者岁忘。
(3)同步阻塞等待服務端返回應答,回去應答之后返回区匠。
????????下面是測試代碼:
public class RpcTest {
????????public static void main(String[] args){
????????????//啟動服務端 輪詢接受客戶端的請求
????????????new Thread(new Runnable(){
????????????????@Override
????????????????public void run() {
????????????????????try{
????????????????????????RpcExporter.exporter("127.0.0.1", 8080);
? ? ? ? ? ? ? ? ? ? }catch(Exception e){
????????????????????????e.printStackTrace();
????????????????????}
? ? ? ? ? ? ?}
????????????}).start();
????????????RpcImporter importer = new RpcImporter();
????????????EchoService echo = importer.importer(EchoServiceImpl.class, new InetSocketAddress("localhost",8080));
????????????System.out.println(echo.echo("are you ok ? "));
????????}
}
? ? ? ? 首先干像,創(chuàng)建一個異步發(fā)布服務端的線程并啟動,用于接收RPC客戶端的請求驰弄,根據(jù)請求參數(shù)調用服務實現(xiàn)類麻汰,返回結果給客戶端。隨后揩懒,創(chuàng)建客戶端服務代理類什乙,構造RPC請求參數(shù),發(fā)起RPC調用已球,將調用結果輸出到控制臺臣镣。執(zhí)行結果如下所示:
????????are you ok ? I am Ok