在1.0版本的基礎(chǔ)上猾普,這次將服務(wù)注冊(cè)中心抽取出來(lái)作為一個(gè)單獨(dú)的module.
先看服務(wù)注冊(cè)中心的實(shí)現(xiàn)
1.創(chuàng)建用于保存服務(wù)信息的實(shí)體
public class RegistServiceEntity implements Serializable {
private String host;
private int port;
public String getHost() {
return host;
}
public int getPort() {
return port;
}
public RegistServiceEntity(String host, int port) {
this.host = host;
this.port = port;
}
}
2.創(chuàng)建服務(wù)注冊(cè)中心框架類(lèi)
package com.zhang.frame;
import com.zhang.entity.RegistServiceEntity;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Description: 服務(wù)注冊(cè)中心
*/
public class RpcRegistCenter {
//創(chuàng)建線程池用于執(zhí)行socket連接線程(不建議這樣創(chuàng)建線程)
private ExecutorService executorService= Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
//端口
private int port;
//服務(wù)緩存
private final static Map<String, Set<RegistServiceEntity>> serviceHolder=new HashMap<>();
public RpcRegistCenter(int port) {
this.port = port;
}
/**
* 創(chuàng)建線程類(lèi)供服務(wù)注冊(cè)中心socket連接使用
* */
private static class SocketTask implements Runnable{
private Socket socket;
public SocketTask(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try (ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());
ObjectOutputStream outputStream =new ObjectOutputStream(socket.getOutputStream())
) {
//是否是"服務(wù)注冊(cè)"
Boolean isRegist=inputStream.readBoolean();
if(isRegist){
registService(inputStream);
outputStream.writeBoolean(true);
outputStream.flush();
}else{
getService(inputStream, outputStream);
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 獲取service
* */
private void getService(ObjectInputStream inputStream, ObjectOutputStream outputStream) throws IOException {
String serviceName=inputStream.readUTF();
Set<RegistServiceEntity> addressSet=serviceHolder.get(serviceName);
outputStream.writeUTF(serviceName);
outputStream.writeObject(addressSet);
outputStream.flush();
System.out.println("Service: "+serviceName +" has been call by client;");
}
/**
* 注冊(cè)服務(wù)
* */
private void registService(ObjectInputStream inputStream) throws IOException {
//讀取服務(wù)名
String serviceName=inputStream.readUTF();
//讀取服務(wù)host
String host=inputStream.readUTF();
//讀取服務(wù)端口
int port=inputStream.readInt();
//獲取服務(wù)注冊(cè)地址
RegistServiceEntity address=new RegistServiceEntity(host,port);
Set<RegistServiceEntity> serviceEntities=serviceHolder.get(serviceName);
if(serviceEntities==null){
serviceEntities=new HashSet<>();
}
serviceEntities.add(address);
serviceHolder.put(serviceName,serviceEntities);
System.out.println("Service: "+host+":"+port+"/"+serviceName+"has been regist.");
}
}
/**
* 開(kāi)啟服務(wù)
*/
public void startService() throws IOException {
ServerSocket serverSocket = new ServerSocket(port);
System.out.println("注冊(cè)中心啟動(dòng)");
try {
while (true){
Socket socket=serverSocket.accept();
executorService.execute(new SocketTask(socket));
}
}finally {
if(serverSocket!=null){
serverSocket.close();
}
}
}
/**
* 啟動(dòng)main類(lèi)
*/
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
RpcRegistCenter rpcRegistCenter=new RpcRegistCenter(8888);
rpcRegistCenter.startService();
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
}
服務(wù)端的實(shí)現(xiàn)
1.服務(wù)端框架類(lèi)
package com.zhang.frame;
import java.io.*;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class RpcServerFrame {
private static ExecutorService executorService
= Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors());
//RPCServer緩存service
private static final Map<String, Class<?>> serviceHolder = new HashMap<>();
//rpc服務(wù)的端口號(hào)
private int port;
public RpcServerFrame(int port) {
this.port = port;
}
//把服務(wù)注冊(cè)到服務(wù)中心
public void registServerToCenter(Class<?> serviceInterface, Class<?> impl) throws IOException {
Socket socket = new Socket();
ObjectOutputStream outputStream = null;
ObjectInputStream inputStream = null;
try {
//連接服務(wù)注冊(cè)中心serverSocket
socket.connect(new InetSocketAddress("127.0.0.1", 8888));
outputStream = new ObjectOutputStream(socket.getOutputStream());
inputStream = new ObjectInputStream(socket.getInputStream());
//本地緩存服務(wù)名對(duì)應(yīng)的實(shí)現(xiàn)類(lèi)
serviceHolder.put(serviceInterface.getName(), impl);
outputStream.writeBoolean(true);
outputStream.writeUTF(serviceInterface.getName());
outputStream.writeUTF("127.0.0.1");
outputStream.writeInt(port);
outputStream.flush();
if (inputStream.readBoolean()) {
System.out.println(serviceInterface.getName() + "regist success");
} else {
System.out.println(serviceInterface.getName() + "regist failed");
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (socket != null) socket.close();
if (outputStream != null) outputStream.close();
if (inputStream != null) inputStream.close();
}
}
//處理服務(wù)請(qǐng)求任務(wù)
private static class ServerTask implements Runnable {
private Socket client = null;
public ServerTask(Socket client) {
this.client = client;
}
@Override
public void run() {
try (ObjectInputStream inputStream =
new ObjectInputStream(client.getInputStream());
ObjectOutputStream outputStream =
new ObjectOutputStream(client.getOutputStream())) {
//方法所在類(lèi)名接口名
String serviceName = inputStream.readUTF();
//方法的名字
String methodName = inputStream.readUTF();
//方法的入?yún)㈩?lèi)型
Class<?>[] paramTypes = (Class<?>[]) inputStream.readObject();
//方法入?yún)⒌闹? Object[] args = (Object[]) inputStream.readObject();
Class serviceClass = serviceHolder.get(serviceName);
if (serviceClass == null) {
throw new ClassNotFoundException(serviceName + " Not Found");
}
Method method = serviceClass.getMethod(methodName, paramTypes);
Object result = method.invoke(serviceClass.newInstance(), args);
outputStream.writeObject(result);
outputStream.flush();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
//啟動(dòng)RPC服務(wù)
public void startService() throws IOException {
ServerSocket serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress(port));
System.out.println("RPC server on:" + port + ":運(yùn)行");
try {
while (true) {
executorService.execute(new ServerTask(serverSocket.accept()));
}
} finally {
serverSocket.close();
}
}
}
2.創(chuàng)建具體service類(lèi)和序列化實(shí)體類(lèi)
與1.0創(chuàng)建的一樣骤坐,不羅列代碼
3.rpc調(diào)用
public class SendSmsRpc {
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
RpcServerFrame serverFrame = new RpcServerFrame(9001);
serverFrame.registServerToCenter(SendSms.class, SendSmsImpl.class);
serverFrame.startService();
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
}
客戶端代碼實(shí)現(xiàn)
1.客戶端rpc框架代碼
package com.zhang.frame;
import com.zhang.entity.RegistServiceEntity;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Random;
import java.util.Set;
public class RpcClientFrame {
//獲取遠(yuǎn)程代理對(duì)象
public static <T> T getRemoteProxyObj(final Class<?> serviceInterface){
InetSocketAddress addr=new InetSocketAddress("127.0.0.1",8888);
return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(),
new Class<?>[]{serviceInterface},
new DynProxy(serviceInterface,addr));
}
//動(dòng)態(tài)代理類(lèi)
public static class DynProxy implements InvocationHandler {
private final Class<?> serviceInterface;
private final InetSocketAddress socketAddress;
//遠(yuǎn)程服務(wù)在本地的緩存
private RegistServiceEntity [] serviceArray;
public DynProxy(Class<?> serviceInterface, InetSocketAddress socketAddress) {
this.serviceInterface = serviceInterface;
this.socketAddress = socketAddress;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//先從服務(wù)注冊(cè)中心取服務(wù)
Socket socket=null;
ObjectOutputStream outputStream=null;
ObjectInputStream inputStream=null;
RegistServiceEntity serviceEntity;
try {
socket=new Socket();
socket.connect(socketAddress);
outputStream=new ObjectOutputStream(socket.getOutputStream());
//false代表從服務(wù)注冊(cè)中心獲取服務(wù)
outputStream.writeBoolean(false);
outputStream.writeUTF(serviceInterface.getName());
outputStream.flush();
inputStream=new ObjectInputStream(socket.getInputStream());
System.out.println("Get services from registered center success:"+inputStream.readUTF());
Set<RegistServiceEntity> result = (Set<RegistServiceEntity>) inputStream.readObject();
serviceArray=new RegistServiceEntity[result.size()];
result.toArray(serviceArray);
} catch (IOException e) {
e.printStackTrace();
}finally {
if(socket!=null){
socket.close();
}
if(outputStream!=null){
outputStream.close();
}
if(inputStream!=null){
inputStream.close();
}
}
//從緩存列表中隨機(jī)取一個(gè)服務(wù)器遠(yuǎn)程端口
Random random=new Random();
int index=random.nextInt(serviceArray.length);
InetSocketAddress socketAddr=new InetSocketAddress(serviceArray[index].getHost(),serviceArray[index].getPort());
//調(diào)用rpc服務(wù)接口
try {
socket=new Socket();
socket.connect(socketAddr);
outputStream=new ObjectOutputStream(socket.getOutputStream());
//方法所在的類(lèi)
outputStream.writeUTF(serviceInterface.getName());
//方法名
outputStream.writeUTF(method.getName());
//方法參數(shù)類(lèi)型
outputStream.writeObject(method.getParameterTypes());
//方法參數(shù)
outputStream.writeObject(args);
outputStream.flush();
inputStream=new ObjectInputStream(socket.getInputStream());
return inputStream.readObject();
}finally {
if(socket!=null) socket.close();
if(outputStream!=null) outputStream.close();
if(inputStream!=null) inputStream.close();
}
}
}
}
2.創(chuàng)建接口對(duì)應(yīng)服務(wù)端铲球,創(chuàng)建序列化實(shí)體類(lèi)
復(fù)制服務(wù)端service接口钞钙,entity;復(fù)制注冊(cè)中心entity
3.客戶端調(diào)用
public class Client {
public static void main(String[] args) {
UserInfo userInfo = new UserInfo("張三","1359999999");
SendSms sendSms = RpcClientFrame.getRemoteProxyObj(SendSms.class);
System.out.println("Send mail: "+ sendSms.sendMail(userInfo));
}
}