前言
在對dubbo有了較為深入的使用和理解后与斤,來嘗試從dubbo框架的角度重新認識下它肪康,對照著dubbo官方的這張圖進行反復(fù)的理解后,我們可以從已有掌握的技術(shù)出發(fā)撩穿,來嘗試編寫一個簡單的dubbo實現(xiàn)磷支。
dubbo技術(shù)實現(xiàn)
dubbo詳細的說明這里就不再一一詳述了,重點理解下面這張圖
從這張圖食寡,可以得出下面幾點能夠指導(dǎo)我們編碼的要點:
- 服務(wù)提供方和服務(wù)消費端為兩個JVM進程雾狈;
- 服務(wù)提供方需要將服務(wù)注冊到某個地方(注冊中心),方便消費方找到服務(wù)并調(diào)用抵皱;
- 服務(wù)消費方從注冊中心找到服務(wù)后善榛,像調(diào)用本地接口一樣調(diào)用注冊中心的服務(wù);
上面三點的補充說明
1呻畸、服務(wù)提供方和服務(wù)消費端為兩個JVM進程
這意味著服務(wù)提供者和消費者的代碼需要分開
2锭弊、服務(wù)提供方需要將服務(wù)注冊到某個地方
需要提供一種方式,可以將服務(wù)接口注冊上去擂错,并被服務(wù)消費方的JVM獲取到
3、消費方像調(diào)用本地接口一樣調(diào)用注冊中心的服務(wù)
意味著需要通過某種機制樱蛤,能夠?qū)⒎?wù)接口的代理實現(xiàn)進行加載钮呀,在上圖中注意有個關(guān)鍵字 invoke
基于上面的實現(xiàn)思路,接下來讓我們編寫代碼進行實現(xiàn)吧
一昨凡、創(chuàng)建maven工程
導(dǎo)入基本的依賴
<dependencies>
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.1</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.6</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.3.2.RELEASE</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.16.Final</version>
</dependency>
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-core</artifactId>
<version>9.0.12</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-io</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.4</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.51</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.0.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-x-discovery</artifactId>
<version>5.0.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.8</version>
</dependency>
</dependencies>
二爽醋、服務(wù)提供方編碼實現(xiàn)
按照上面的編寫思路,服務(wù)提供方要實現(xiàn)的功能點主要包括如下幾點
提供服務(wù)接口實現(xiàn)便脊;
作為一個單獨的JVM進程蚂四,可以考慮 jetty,tomcat哪痰,或者netty等遂赠;
能夠解析服務(wù)消費方傳遞過來的請求,并找到服務(wù)接口的實現(xiàn)晌杰,并返回處理結(jié)果跷睦;
1、服務(wù)接口
public interface HelloService {
String sayHello(String message);
}
2肋演、接口實現(xiàn)
public class HelloServiceImpl implements HelloService {
@Override
public String sayHello(String message) {
return "hello : " + message;
}
}
3抑诸、使用內(nèi)嵌式的tomcat容器發(fā)布服務(wù)
提供一個HttpServer
import org.apache.catalina.*;
import org.apache.catalina.connector.Connector;
import org.apache.catalina.core.StandardContext;
import org.apache.catalina.core.StandardEngine;
import org.apache.catalina.core.StandardHost;
import org.apache.catalina.startup.Tomcat;
public class HttpServer {
public void start(String hostname, Integer port) {
Tomcat tomcat = new Tomcat();
Server server = tomcat.getServer();
Service service = server.findService("Tomcat");
Connector connector = new Connector();
connector.setPort(port);
Engine engine = new StandardEngine();
engine.setDefaultHost(hostname);
Host host = new StandardHost();
host.setName(hostname);
String contextPath = "";
Context context = new StandardContext();
context.setPath(contextPath);
context.addLifecycleListener(new Tomcat.FixContextListener());
host.addChild(context);
engine.addChild(host);
service.setContainer(engine);
service.addConnector(connector);
//將 Tomcat 接收到的所有請求都交給自定義的 DispatcherServlet 來處理
tomcat.addServlet(contextPath, "dispatcher", new MyServlet());
context.addServletMappingDecoded("/*", "dispatcher");
try {
tomcat.start();
tomcat.getServer().await();
} catch (LifecycleException e) {
e.printStackTrace();
}
}
}
4烂琴、自定義的Servlet
使用自定義的Servlet ,可以讓程序得到一個的擴展
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
public class MyServlet extends HttpServlet {
@Override
protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
new HttpServletHandler().handler(req,resp);
}
}
5蜕乡、自定義HttpServletHandler
該類用于處理來自服務(wù)消費方的請求奸绷,并返回結(jié)果
import com.congge.framework.Invocation;
import com.congge.framework.resgister.LocalRegister;
import org.apache.commons.io.IOUtils;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
public class HttpServletHandler {
public void handler(HttpServletRequest req, HttpServletResponse resp) {
try {
Invocation invocation = (Invocation) new ObjectInputStream(req.getInputStream()).readObject();
String interfaceName = invocation.getInterfaceName();
Class implClass = LocalRegister.get(interfaceName);
try {
Method method = implClass.getMethod(invocation.getMethodName(), invocation.getParamTypes());
try {
String result = (String)method.invoke(implClass.newInstance(), invocation.getParams());
System.out.println("執(zhí)行的結(jié)果:" + result);
IOUtils.write(result,resp.getOutputStream());
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
} catch (InstantiationException e) {
e.printStackTrace();
}
} catch (NoSuchMethodException e) {
e.printStackTrace();
}
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
}
6、服務(wù)注冊中心
盡管是一個簡單的實現(xiàn)层玲,發(fā)布出去的服務(wù)接口也需要一個注冊中心承載号醉,這里我們先使用一個簡單的map結(jié)構(gòu)實現(xiàn)服務(wù)注冊中心,自定義LocalRegister称簿,簡單來說扣癣,map的key為接口名,value為實現(xiàn)類
import java.util.HashMap;
import java.util.Map;
public class LocalRegister {
private static Map<String,Class> map = new HashMap<>();
public static void register(String interfaceName,Class implClass){
map.put(interfaceName,implClass);
}
public static Class get(String interfaceName){
return map.get(interfaceName);
}
}
6憨降、服務(wù)提供方啟動類
public class Provider {
public static void main(String[] args) {
//注冊接口
LocalRegister.register(HelloService.class.getName(),HelloServiceImpl.class);
HttpServer httpServer = new HttpServer();
httpServer.start("localhost",8081);
}
}
啟動main程序父虑,這樣服務(wù)生產(chǎn)方的接口就發(fā)布出去了,暴露的端口為8081
三授药、服務(wù)消費方編碼實現(xiàn)
照上面的編寫思路士嚎,服務(wù)消費方要實現(xiàn)的功能點主要包括如下幾點
- 從注冊中心獲取特定的服務(wù)接口并執(zhí)行調(diào)用;
- 作為一個單獨的JVM進程悔叽,可以考慮 jetty莱衩,tomcat恋捆,或者netty等立镶;
- 需要有個類耳幢,用于集中處理發(fā)起接口請求調(diào)用
1牙寞、提供一個HttpClient
該類用于發(fā)起請求杂瘸,請求特定的服務(wù)接口刑桑,試想在真實的調(diào)用中棕硫,比如是一個springboot的應(yīng)用搓萧,服務(wù)消費方要調(diào)用生產(chǎn)者的服務(wù)戚啥,也是要走http協(xié)議或者通過netty的方式進行調(diào)用的奋单;
import com.congge.framework.Invocation;
import org.apache.commons.io.IOUtils;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
public class HttpClient {
public String send(String hostname, Integer port, Invocation invocation) {
String result = null;
try {
URL url = new URL("http", hostname, port, "/");
try {
HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
httpURLConnection.setRequestMethod("POST");
httpURLConnection.setDoOutput(true);
OutputStream outputStream = httpURLConnection.getOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(outputStream);
oos.writeObject(invocation);
oos.flush();
InputStream inputStream = httpURLConnection.getInputStream();
result = IOUtils.toString(inputStream);
return result;
} catch (IOException e) {
e.printStackTrace();
}
} catch (MalformedURLException e) {
e.printStackTrace();
}
return result;
}
}
2、提供一個用于封裝請求參數(shù)的對象類
該類封裝了從服務(wù)消費方發(fā)出請求的完整參數(shù)對象猫十,比如請求的接口名览濒,方法名,參數(shù)類型等
public class Invocation implements Serializable {
private String interfaceName; //接口名
private String methodName; //方法名
private Class[] paramTypes; //方法參數(shù)類型列表
private Object[] params; //方法參數(shù)值列表
public Invocation(String interfaceName, String methodName, Class[] paramTypes, Object[] params) {
this.interfaceName = interfaceName;
this.methodName = methodName;
this.paramTypes = paramTypes;
this.params = params;
}
public String getInterfaceName() {
return interfaceName;
}
public void setInterfaceName(String interfaceName) {
this.interfaceName = interfaceName;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Object[] getParams() {
return params;
}
public void setParams(Object[] params) {
this.params = params;
}
public Class[] getParamTypes() {
return paramTypes;
}
public void setParamTypes(Class[] paramTypes) {
this.paramTypes = paramTypes;
}
}
3拖云、消費方啟動類
import com.congge.framework.Invocation;
import com.congge.framework.ProxyFactory;
import com.congge.framework.http.HttpClient;
import com.congge.service.HelloService;
public class Consumer {
public static void main(String[] args) {
HttpClient httpClient = new HttpClient();
Invocation invocation = new Invocation(HelloService.class.getName(),
"sayHello",new Class[]{String.class},new Object[]{"jerry"});
String result = httpClient.send("localhost", 8081, invocation);
System.out.println(result);
}
}
啟動消費端main程序后贷笛,執(zhí)行服務(wù)調(diào)用,成功獲取到服務(wù)提供方接口的響應(yīng)結(jié)果
四宙项、優(yōu)化改進點
通過上面的案例代碼的演示昨忆,簡單實現(xiàn)了一個dubbo的調(diào)用過程,但這這是一個非常簡單的實現(xiàn)杉允,從生產(chǎn)者和消費端來看邑贴,均存在一些不太合理的地方席里,下面做幾處簡單的優(yōu)化改進;
1拢驾、消費方改進
從上面的這一段調(diào)用來說奖磁,顯得比較麻煩,很明顯繁疤,這一整段代碼可以通過一個類似代理工廠的方式進行封裝咖为,然后消費端只需要注入服務(wù)接口,調(diào)用服務(wù)接口的方法即可稠腊,這才是我們希望看到的躁染;
自定義一個請求代理工廠
我們知道,在使用dubbo的時候架忌,只需要注入服務(wù)接口即可吞彤,然后消費方就可以直接調(diào)用接口中的方法名了,但接口是不能執(zhí)行的叹放,最終需要一個代理類去執(zhí)行接口的方法調(diào)用饰恕,容易想到的就是使用JDK的動態(tài)代理的方式來完成這件事,該類要做的事情就是這些井仰;
這里我們先考慮用最簡單的方式實現(xiàn)(先不考慮使用分布式注冊中心)
import com.congge.framework.http.HttpClient;
import com.congge.framework.resgister.FileMapRegister;
import com.congge.service.HelloService;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.List;
public class ProxyFactory {
@SuppressWarnings("unchecked")
public static <T> T getProxy(Class interfaceClass) {
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Invocation invocation = new Invocation(
interfaceClass.getName(),
method.getName(),
method.getParameterTypes(),
args);
HttpClient httpClient = new HttpClient();
//端口等信息的獲取
String result = httpClient.send("localhost", 8081, invocation);
//聰注冊中心獲取
//URL url = RemoteMapRegistry.get(interfaceClass.getName()).get(0);
//方式一:Consumer從本地文件獲取Provider地址
//URL url = FileMapRegister.getURL(interfaceClass.getName());
//方式二:Consumer從Zookeeper獲取Provider地址
//URL url = ZookeeperRegister.getURL(interfaceClass.getName());
//String result = httpClient.send(url.getHostname(), url.getPort(), invocation);
return result;
}
});
}
}
這樣改進之后埋嵌,那么在啟動類中就可以像下面這樣調(diào)用
HelloService helloService = ProxyFactory.getProxy(HelloService.class);
String result = helloService.sayHello("zhangfei");
System.out.println(result);
2、服務(wù)提供方改進
從上面的服務(wù)提供方一側(cè)來看俱恶,存在一個比較明顯的問題是雹嗦,服務(wù)注冊的時候,服務(wù)接口的注冊并不是注冊到分布式注冊中心上面合是,由于 provider和consumer是兩個JVM進程了罪,consumer想要從pr注冊中心拿到服務(wù)接口信息,這樣是行不通的端仰;
基于這個問題,從provider一側(cè)來說田藐,需要將服務(wù)注冊到分布式注冊中心上面去荔烧;
這里為了演示方便,我們提供兩種改進方式汽久,provider在啟動的時候鹤竭,將服務(wù)信息注冊到本地的文件中,和注冊到zk上景醇;
提供一個URL類臀稚,記錄provider的host和port信息
import java.io.Serializable;
public class URL implements Serializable {
private String hostname;
private Integer port;
public URL(String hostname, Integer port) {
this.hostname = hostname;
this.port = port;
}
public String getHostname() {
return hostname;
}
public void setHostname(String hostname) {
this.hostname = hostname;
}
public Integer getPort() {
return port;
}
public void setPort(Integer port) {
this.port = port;
}
@Override
public String toString() {
return "URL{" +
"hostname='" + hostname + '\'' +
", port=" + port +
'}';
}
}
提供一個FileMapRegister類,模擬將服務(wù)接口寫入到本地文件
import java.io.*;
import java.util.HashMap;
import java.util.Map;
public class FileMapRegister {
private static Map<String, String> REGISTER = new HashMap<>();
public static void regist(String interfaceName, String implClass, String url) {
REGISTER.put(interfaceName, implClass + "::" + url);
saveFile();
}
public static URL getURL(String interfaceName) {
REGISTER = getFile();
String[] s = REGISTER.get(interfaceName).split("::")[1].split(":");
URL url = new URL(s[0],Integer.parseInt(s[1]));
return url;
}
public static Class getImplClass(String interfaceName) throws ClassNotFoundException {
REGISTER = getFile();
return Class.forName(REGISTER.get(interfaceName).split("::")[0]);
}
public static void saveFile() {
try {
FileOutputStream fileOutputStream = new FileOutputStream("D:\\temp.txt");
ObjectOutputStream objectOutputStream = new ObjectOutputStream(fileOutputStream);
objectOutputStream.writeObject(REGISTER);
} catch (IOException e) {
e.printStackTrace();
}
}
public static Map<String, String> getFile() {
try {
FileInputStream fileInputStream = new FileInputStream("D:\\temp.txt");
ObjectInputStream objectInputStream = new ObjectInputStream(fileInputStream);
return (Map<String, String>) objectInputStream.readObject();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
return null;
}
}
提供一個ZookeeperRegister 類三痰,可以將服務(wù)信息注冊到zk
import com.congge.framework.URL;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import java.util.HashMap;
import java.util.Map;
public class ZookeeperRegister {
static CuratorFramework client;
static Map<String, String> UrlCache = new HashMap<>();
static {
client = CuratorFrameworkFactory
.newClient("localhost:2181", new RetryNTimes(3, 1000));
client.start();
}
private static Map<String, String> REGISTER = new HashMap<>();
//Provider注冊服務(wù)
public static void regist(String interfaceName, String implClass, String url) {
try {
Stat stat = client.checkExists().forPath(String.format("/dubbo/service/%s", interfaceName));
if(stat != null){
client.delete().forPath(String.format("/dubbo/service/%s", interfaceName));
}
String result = client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(String.format("/dubbo/service/%s", interfaceName),(implClass + "::" + url).getBytes());
System.out.println("Provier服務(wù)注冊: " + result);
} catch (Exception e) {
e.printStackTrace();
}
}
//獲取Provider URL
public static URL getURL(String interfaceName) {
URL url = null;
String urlString = null;
//先查詢緩存
if (UrlCache.containsKey(interfaceName)) {
urlString = UrlCache.get(interfaceName);
} else {
try {
byte[] bytes = client.getData().forPath(String.format("/dubbo/service/%s", interfaceName));
urlString = new String(bytes);
} catch (Exception e) {
e.printStackTrace();
}
}
String host = urlString.split("::")[1].split(":")[0];
String port = urlString.split("::")[1].split(":")[1];
return new URL(host,Integer.parseInt(port));
}
//獲取Provider實現(xiàn)類
public static Class getImplClass(String interfaceName) throws Exception {
byte[] bytes = client.getData().forPath(String.format("/dubbo/service/%s", interfaceName));
String urlString = new String(bytes);
return Class.forName(urlString.split("::")[0]);
}
}
改造provider的啟動類
服務(wù)啟動時吧寺,將服務(wù)相關(guān)信息注冊到本地文件
public class Provider {
public static void main(String[] args) {
//注冊接口
LocalRegister.register(HelloService.class.getName(),HelloServiceImpl.class);
//注冊服務(wù)的端口等信息
//URL url = new URL("localhost",8081);
//RemoteMapRegistry.register(HelloService.class.getName(), url);
FileMapRegister.regist(HelloService.class.getName(),HelloServiceImpl.class.getName(),"localhost:8081");
HttpServer httpServer = new HttpServer();
httpServer.start("localhost",8081);
}
}
改造consumer端的ProxyFactory類
將從本地的文件中獲取服務(wù)接口相關(guān)信息
public class ProxyFactory {
@SuppressWarnings("unchecked")
public static <T> T getProxy(Class interfaceClass) {
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Invocation invocation = new Invocation(
interfaceClass.getName(),
method.getName(),
method.getParameterTypes(),
args);
HttpClient httpClient = new HttpClient();
//方式一:Consumer從本地文件獲取Provider地址
//URL url = FileMapRegister.getURL(interfaceClass.getName());
//方式二:Consumer從Zookeeper獲取Provider地址
//URL url = ZookeeperRegister.getURL(interfaceClass.getName());
String result = httpClient.send(url.getHostname(), url.getPort(), invocation);
return result;
}
});
}
}
以上改造完畢后窜管,再次啟動provider和consumer,觀察運行效果稚机,consumer仍然可以正確拿到結(jié)果幕帆;
關(guān)于改造點,可以延續(xù)著這個思路繼續(xù)進行下去赖条,比如注冊中心使用zk或redis失乾,下面提幾點以供參考:
- 消費端的服務(wù)調(diào)用容錯處理,假如調(diào)用接口超時纬乍?或者異常碱茁?
- 消費端的服務(wù)重試;
- 消費端的負載均衡策略如何指定仿贬?
- 將嵌入式tomcat容器改為netty纽竣;