前言:接上篇若治,看完了注冊中心破加,該看看RPC框架了——《分布式服務(wù)框架XXL-RPC》
老樣子,想看看它自己怎么吹的
1.1 概述>
XXL-RPC 是一個分布式服務(wù)框架武学,提供穩(wěn)定高性能的RPC遠程服務(wù)調(diào)用功能绳泉。擁有"高性能逊抡、分布式、注冊中心零酪、負載均衡冒嫡、服務(wù)治理"等特性。現(xiàn)已開放源代碼四苇,開箱即用孝凌。>
1.2 特性>
- 1、快速接入:接入步驟非常簡潔月腋,兩分鐘即可上手蟀架;
- 2、服務(wù)透明:系統(tǒng)完整的封裝了底層通信細節(jié)榆骚,開發(fā)時調(diào)用遠程服務(wù)就像調(diào)用本地服務(wù)片拍,在提供遠程調(diào)用能力時不損失本地調(diào)用的語義簡潔性;
- 3寨躁、多調(diào)用方案:支持 SYNC穆碎、ONEWAY、FUTURE职恳、CALLBACK 等方案;
- 4方面、多通訊方案:支持 TCP 和 HTTP 兩種通訊方式進行服務(wù)調(diào)用放钦;其中 TCP 提供可選方案 NETTY 或 MINA ,HTTP 提供可選方案 NETTY_HTTP 或 Jetty恭金;
- 5操禀、多序列化方案:支持 HESSIAN、HESSIAN1横腿、PROTOSTUFF颓屑、KRYO斤寂、JACKSON 等方案;
- 6揪惦、負載均衡/軟負載:提供豐富的負載均衡策略遍搞,包括:輪詢、隨機器腋、LRU溪猿、LFU、一致性HASH等纫塌;
- 7诊县、注冊中心:可選組件,支持服務(wù)注冊并動態(tài)發(fā)現(xiàn)措左;可選擇不啟用依痊,直接指定服務(wù)提供方機器地址通訊;選擇啟用時怎披,內(nèi)置可選方案:“XXL-REGISTRY 輕量級注冊中心”(推薦)抗悍、“ZK注冊中心”、“Local注冊中心”等钳枕;
- 8缴渊、服務(wù)治理:提供服務(wù)治理中心,可在線管理注冊的服務(wù)信息鱼炒,如服務(wù)鎖定衔沼、禁用等;
- 9昔瞧、服務(wù)監(jiān)控:可在線監(jiān)控服務(wù)調(diào)用統(tǒng)計信息以及服務(wù)健康狀況等(計劃中)指蚁;
- 10、容錯:服務(wù)提供方集群注冊時自晰,某個服務(wù)節(jié)點不可用時將會自動摘除凝化,同時消費方將會移除失效節(jié)點將流量分發(fā)到其余節(jié)點,提高系統(tǒng)容錯能力酬荞。
- 11搓劫、解決1+1問題:傳統(tǒng)分布式通訊一般通過nginx或f5做集群服務(wù)的流量負載均衡,每次請求在到達目標服務(wù)機器之前都需要經(jīng)過負載均衡機器混巧,即1+1枪向,這將會把流量放大一倍。而XXL-RPC將會從消費方直達服務(wù)提供方咧党,每次請求直達目標機器秘蛔,從而可以避免上述問題;
- 12、高兼容性:得益于優(yōu)良的兼容性與模塊化設(shè)計深员,不限制外部框架负蠕;除 spring/springboot 環(huán)境之外,理論上支持運行在任何Java代碼中倦畅,甚至main方法直接啟動運行遮糖;
- 13、泛化調(diào)用:服務(wù)調(diào)用方不依賴服務(wù)方提供的API滔迈;
還是老套路止吁,直接代碼下下來,跑個demo看看(ps:注冊中心它自己推薦的XXL-REGISTRY)
先看看provider的代碼燎悍。provider提供了一個簡單的sayHi方法敬惦,代碼如下。
@XxlRpcService
@Service
public class DemoServiceImpl implements DemoService {
private static Logger logger = LoggerFactory.getLogger(DemoServiceImpl.class);
@Override
public UserDTO sayHi(String name) {
String word = MessageFormat.format("Hi {0}, from {1} as {2}",
name, DemoServiceImpl.class.getName(), String.valueOf(System.currentTimeMillis()));
if ("error".equalsIgnoreCase(name)) throw new RuntimeException("test exception.");
UserDTO userDTO = new UserDTO(name, word);
logger.info(userDTO.toString());
return userDTO;
}
}
結(jié)著按照說明稍微配置一下注冊中心谈山,provider走你俄删!
成功啟動后可以在注冊中心看到剛剛啟動的provider
點開編輯看看
很好,看起來非常美好奏路,注冊key就是接口畴椰,注冊信息就是provider的url,看起來非常ok鸽粉。
那么我們再把consumer跑起來看看斜脂,配置同一個注冊中心,走你触机!
控制臺表示成功運行起來了帚戳。那么我們看看怎么測試一下遠程調(diào)用。
翻一翻consumer代碼儡首,看到一個controller片任,然后在controller里面會調(diào)用遠程方法。
@Controller
public class IndexController {
@XxlRpcReference
private DemoService demoService;
@RequestMapping("")
@ResponseBody
public UserDTO http(String name) {
try {
return demoService.sayHi(name);
} catch (Exception e) {
e.printStackTrace();
return new UserDTO(null, e.getMessage());
}
}
}
ok蔬胯,讓我們打開瀏覽器試一試对供。和預(yù)期的一樣,得到了provider的response
這樣一個最簡單的遠程調(diào)用就完成了氛濒。
那么接下來产场,就該看看這些功能是怎么實現(xiàn)的了。先帖個框架自己對rpc的描述
4.4 RPC工作原理剖析
概念
1泼橘、serialization:序列化涝动,通訊數(shù)據(jù)需要經(jīng)過序列化,從而支持在網(wǎng)絡(luò)中傳輸;
2、deserialization:反序列化尝偎,服務(wù)接受到序列化的請求數(shù)據(jù),需要序列化為底層原始數(shù)據(jù)米愿;
3、stub:體現(xiàn)在XXL-RPC為服務(wù)的api接口鼻吮;
4育苟、skeleton:體現(xiàn)在XXL-RPC為服務(wù)的實現(xiàn)api接口的具體服務(wù);
5椎木、proxy:根據(jù)遠程服務(wù)的stub生成的代理服務(wù)违柏,對開發(fā)人員透明;
6香椎、provider:遠程服務(wù)的提供方漱竖;
7、consumer:遠程服務(wù)的消費方畜伐;
RPC通訊馍惹,可大致劃分為四個步驟,可參考上圖進行理解:(XXL-RPC提供了多種調(diào)用方案玛界,此處以 “SYNC” 方案為例講解万矾;)>
1、consumer發(fā)起請求:consumer會根據(jù)遠程服務(wù)的stub實例化遠程服務(wù)的代理服務(wù)慎框,在發(fā)起請求時良狈,代理服務(wù)會封裝本次請求相關(guān)底層數(shù)據(jù),如服務(wù)iface笨枯、methos薪丁、params等等,然后將數(shù)據(jù)經(jīng)過serialization之后發(fā)送給provider猎醇;
2窥突、provider接收請求:provider接收到請求數(shù)據(jù),首先會deserialization獲取原始請求數(shù)據(jù)硫嘶,然后根據(jù)stub匹配目標服務(wù)并調(diào)用阻问;
3、provider響應(yīng)請求:provider在調(diào)用目標服務(wù)后沦疾,封裝服務(wù)返回數(shù)據(jù)并進行serialization称近,然后把數(shù)據(jù)傳輸給consumer;
4哮塞、consumer接收響應(yīng):consumer接受到相應(yīng)數(shù)據(jù)后刨秆,首先會deserialization獲取原始數(shù)據(jù),然后根據(jù)stub生成調(diào)用返回結(jié)果忆畅,返回給請求調(diào)用處衡未。結(jié)束。
其實已經(jīng)講得比較清楚,在官方提供的demo里缓醋,
- consumer調(diào)用sayHi方法
- 通過注冊中心找到provider
- 代理類封裝請求并序列化后發(fā)送給provider
- provider反序列化數(shù)據(jù)如失,發(fā)現(xiàn)調(diào)用的是sayHi方法
- 把調(diào)用結(jié)果序列化返回給consumer
- consumer反序列化返回結(jié)果
接下來回到代碼本身,看看這一系列過程是怎么實現(xiàn)的送粱。
先從provider的demo入手吧褪贵。
先看看配置,只配置了一個XxlRpcSpringProviderFactory
的 bean
從配置代碼來看抗俄,配置了provider的端口脆丁,注冊中心的類型(xxl-registry or zookeeper or local,這里是xxl-registry) 动雹,已經(jīng)注冊中心的一些參數(shù)(這里是對應(yīng)注冊中心xxl-registry需要的配置:注冊中心地址槽卫,環(huán)境,token)
@Configuration
public class XxlRpcProviderConfig {
private Logger logger = LoggerFactory.getLogger(XxlRpcProviderConfig.class);
@Value("${xxl-rpc.remoting.port}")
private int port;
@Value("${xxl-rpc.registry.xxlregistry.address}")
private String address;
@Value("${xxl-rpc.registry.xxlregistry.env}")
private String env;
@Value("${xxl-rpc.registry.xxlregistry.token}")
private String token;
@Bean
public XxlRpcSpringProviderFactory xxlRpcSpringProviderFactory() {
XxlRpcSpringProviderFactory providerFactory = new XxlRpcSpringProviderFactory();
providerFactory.setPort(port);
providerFactory.setServiceRegistryClass(XxlRegistryServiceRegistry.class);
providerFactory.setServiceRegistryParam(new HashMap<String, String>(){{
put(XxlRegistryServiceRegistry.XXL_REGISTRY_ADDRESS, address);
put(XxlRegistryServiceRegistry.ENV, env);
put(XxlRegistryServiceRegistry.ACCESS_TOKEN,token);
}});
logger.info(">>>>>>>>>>> xxl-rpc provider config init finish.");
return providerFactory;
}
}
ok洽胶,那我們在看看XxlRpcSpringProviderFactory
有什么花頭晒夹。
實現(xiàn)了3個接口
implements ApplicationContextAware, InitializingBean,DisposableBean
,并繼承自XxlRpcProviderFactory
ps:這幾個接口都是一些spring bean的一些擴展姊氓,詳細的可以自行搜索丐怯, 下面給出一些簡單的描述
InitialingBean
是一個接口,提供了一個唯一的方法afterPropertiesSet()
翔横。
DisposableBean
也是一個接口读跷,提供了一個唯一的方法destory()
。
前者顧名思義在Bean屬性都設(shè)置完畢后調(diào)用afterPropertiesSet()
方法做一些初始化的工作禾唁,后者在Bean生命周期結(jié)束前調(diào)用destory()
方法做一些收尾工作
實現(xiàn)
ApplicationContextAware
接口的Bean效览,在Bean加載的過程中可以獲取到Spring的ApplicationContext
,這個尤其重要荡短,ApplicationContext
是Spring應(yīng)用上下文丐枉,從ApplicationContext
中可以獲取包括任意的Bean在內(nèi)的大量Spring容器內(nèi)容和信息
public class XxlRpcSpringProviderFactory extends XxlRpcProviderFactory implements ApplicationContextAware, InitializingBean,DisposableBean {
這里比較好理解,因為XxlRpcSpringProviderFactory
是針對spring的客戶端掘托,所需需要額外實現(xiàn)幾個接口瘦锹,主要的邏輯是在它的父類XxlRpcProviderFactory
里面。
先看看第一段
// ---------------------- config ----------------------
里面的內(nèi)容
netType
定義了網(wǎng)絡(luò)通信協(xié)議(NETTY闪盔,NETTY_HTTP弯院,MINA,JETTY)
Serializer
定義了序列化方式
ip,port,accessToken
還不知道干嘛泪掀,留著
serviceRegistryClass
和serviceRegistryParam
定義注冊中心類和參數(shù)
private NetEnum netType;
private Serializer serializer;
private String ip; // for registry
private int port; // default port
private String accessToken;
private Class<? extends ServiceRegistry> serviceRegistryClass;
private Map<String, String> serviceRegistryParam;
再往下看听绳,這些屬性的設(shè)置全是在initConfig
方法中設(shè)值的。
通過這段代碼异赫,就可以知道椅挣,ip其實是給consumer使用的本地ip(會注冊到注冊中心的ip)头岔,port其實就是用來和consumer通信的端口號(比如剛剛demo里面的7080)
public void initConfig(NetEnum netType,
Serializer serializer,
String ip,
int port,
String accessToken,
Class<? extends ServiceRegistry> serviceRegistryClass,
Map<String, String> serviceRegistryParam) {
// init
this.netType = netType;
this.serializer = serializer;
this.ip = ip;
this.port = port;
this.accessToken = accessToken;
this.serviceRegistryClass = serviceRegistryClass;
this.serviceRegistryParam = serviceRegistryParam;
// valid
if (this.netType==null) {
throw new XxlRpcException("xxl-rpc provider netType missing.");
}
if (this.serializer==null) {
throw new XxlRpcException("xxl-rpc provider serializer missing.");
}
if (this.ip == null) {
this.ip = IpUtil.getIp();
}
if (this.port <= 0) {
this.port = 7080;
}
if (NetUtil.isPortUsed(this.port)) {
throw new XxlRpcException("xxl-rpc provider port["+ this.port +"] is used.");
}
if (this.serviceRegistryClass != null) {
if (this.serviceRegistryParam == null) {
throw new XxlRpcException("xxl-rpc provider serviceRegistryParam is missing.");
}
}
}
這段代碼其實就是一些基礎(chǔ)參數(shù)的config。比如用什么通信協(xié)議贴妻,開房什么端口切油,用什么注冊中心等等蝙斜。
再接著往下看
// ---------------------- start / stop ----------------------
看看start和stop代碼有什么
private Server server;
private ServiceRegistry serviceRegistry;
private String serviceAddress;
先往下看名惩,定義了start和stop方法,仔細一看是針對server的start
和stop
孕荠,server是netType
的一個instance娩鹉。
那么就比較好理解了,server就是負責(zé)通信的實例(demo里是netty)稚伍。
首先拿到server的實例弯予,然后設(shè)置了setStartedCallback
和setStopedCallback
,并調(diào)用了start方法个曙。
public void start() throws Exception {
// start server
serviceAddress = IpUtil.getIpPort(this.ip, port);
server = netType.serverClass.newInstance();
server.setStartedCallback(new BaseCallback() { // serviceRegistry started
@Override
public void run() throws Exception {
// start registry
if (serviceRegistryClass != null) {
serviceRegistry = serviceRegistryClass.newInstance();
serviceRegistry.start(serviceRegistryParam);
if (serviceData.size() > 0) {
serviceRegistry.registry(serviceData.keySet(), serviceAddress);
}
}
}
});
server.setStopedCallback(new BaseCallback() { // serviceRegistry stoped
@Override
public void run() {
// stop registry
if (serviceRegistry != null) {
if (serviceData.size() > 0) {
serviceRegistry.remove(serviceData.keySet(), serviceAddress);
}
serviceRegistry.stop();
serviceRegistry = null;
}
}
});
server.start(this);
}
public void stop() throws Exception {
// stop server
server.stop();
}
那我們再深入看看server這些callback
和start
方法都干了什么吧锈嫩。
server是一個抽象類(nettyServer
會繼承這個類),定義了BaseCallback
類型的startedCallback
和stopedCallback
垦搬,根據(jù)名字猜測是通信server調(diào)用start
后呼寸,會調(diào)用startedCallback.run
方法,server調(diào)用stop
之后會調(diào)用stopedCallback.run
方法猴贰。好像比較抽象对雪,畢竟是抽象類。
public abstract class Server {
protected static final Logger logger = LoggerFactory.getLogger(Server.class);
private BaseCallback startedCallback;
private BaseCallback stopedCallback;
public void setStartedCallback(BaseCallback startedCallback) {
this.startedCallback = startedCallback;
}
public void setStopedCallback(BaseCallback stopedCallback) {
this.stopedCallback = stopedCallback;
}
/**
* start server
*
* @param xxlRpcProviderFactory
* @throws Exception
*/
public abstract void start(final XxlRpcProviderFactory xxlRpcProviderFactory) throws Exception;
/**
* callback when started
*/
public void onStarted() {
if (startedCallback != null) {
try {
startedCallback.run();
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-rpc, server startedCallback error.", e);
}
}
}
/**
* stop server
*
* @throws Exception
*/
public abstract void stop() throws Exception;
/**
* callback when stoped
*/
public void onStoped() {
if (stopedCallback != null) {
try {
stopedCallback.run();
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-rpc, server stopedCallback error.", e);
}
}
}
}
那我們直接進入繼承他的NettyServer
看看米绕,這樣應(yīng)該就比較清晰了瑟捣。
start
方法里面直接開了一個守護線程,線程做的事情非常簡單:配置并開啟netty服務(wù)栅干,并調(diào)用onStarted
方法
stop方法更簡單迈套,直接interrupt
,并調(diào)用onStoped
方法碱鳞。
public class NettyServer extends Server {
private Thread thread;
@Override
public void start(final XxlRpcProviderFactory xxlRpcProviderFactory) throws Exception {
thread = new Thread(new Runnable() {
@Override
public void run() {
// param
final ThreadPoolExecutor serverHandlerPool = ThreadPoolUtil.makeServerThreadPool(NettyServer.class.getSimpleName());
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// start server
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new IdleStateHandler(0,0,10, TimeUnit.MINUTES))
.addLast(new NettyDecoder(XxlRpcRequest.class, xxlRpcProviderFactory.getSerializer()))
.addLast(new NettyEncoder(XxlRpcResponse.class, xxlRpcProviderFactory.getSerializer()))
.addLast(new NettyServerHandler(xxlRpcProviderFactory, serverHandlerPool));
}
})
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// bind
ChannelFuture future = bootstrap.bind(xxlRpcProviderFactory.getPort()).sync();
logger.info(">>>>>>>>>>> xxl-rpc remoting server start success, nettype = {}, port = {}", NettyServer.class.getName(), xxlRpcProviderFactory.getPort());
onStarted();
// wait util stop
future.channel().closeFuture().sync();
} catch (Exception e) {
if (e instanceof InterruptedException) {
logger.info(">>>>>>>>>>> xxl-rpc remoting server stop.");
} else {
logger.error(">>>>>>>>>>> xxl-rpc remoting server error.", e);
}
} finally {
// stop
try {
serverHandlerPool.shutdown(); // shutdownNow
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
try {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
thread.setDaemon(true);
thread.start();
}
@Override
public void stop() throws Exception {
// destroy server thread
if (thread != null && thread.isAlive()) {
thread.interrupt();
}
// on stop
onStoped();
logger.info(">>>>>>>>>>> xxl-rpc remoting server destroy success.");
}
}
讓我們再回到剛才的代碼桑李,startedCallback
方法就是在netty服務(wù)啟動完成之后,把provider的信息注冊到注冊中心
ServiceRegistry是個注冊中心的抽象劫笙,demo里面用的是XxlRegistryServiceRegistry芙扎,其實就是對注冊中心操作的一些封裝,代碼非常簡單填大,不懂可以參考上一篇源碼閱讀:分布式服務(wù)注冊中心XXL-REGISTRY(基于1.0.2)
server.setStartedCallback(new BaseCallback() { // serviceRegistry started
@Override
public void run() throws Exception {
// start registry
if (serviceRegistryClass != null) {
serviceRegistry = serviceRegistryClass.newInstance();
serviceRegistry.start(serviceRegistryParam);
if (serviceData.size() > 0) {
serviceRegistry.registry(serviceData.keySet(), serviceAddress);
}
}
}
});
stopedCallback
也比較好理解了戒洼,就是在netty服務(wù)關(guān)閉之后,從注冊中心移除自己允华。
server.setStopedCallback(new BaseCallback() { // serviceRegistry stoped
@Override
public void run() {
// stop registry
if (serviceRegistry != null) {
if (serviceData.size() > 0) {
serviceRegistry.remove(serviceData.keySet(), serviceAddress);
}
serviceRegistry.stop();
serviceRegistry = null;
}
}
});
最后再看看server invoke
里面有什么吧
看起來像是用來記rpc service的圈浇,先不管他寥掐,接著往下看
// ---------------------- server invoke ----------------------
/**
* init local rpc service map
*/
private Map<String, Object> serviceData = new HashMap<String, Object>();
public Map<String, Object> getServiceData() {
return serviceData;
}
好像就是字符串的拼接,不知道干嘛的磷蜀,先往下看
/**
* make service key
*
* @param iface
* @param version
* @return
*/
public static String makeServiceKey(String iface, String version){
String serviceKey = iface;
if (version!=null && version.trim().length()>0) {
serviceKey += "#".concat(version);
}
return serviceKey;
}
addService
用到了makeServiceKey
召耘,看來這個是用來做唯一主鍵的。
根據(jù)名字推測褐隆,應(yīng)該是往前面的serviceData
里面把serviceBean
放進去污它。
/**
* add service
*
* @param iface
* @param version
* @param serviceBean
*/
public void addService(String iface, String version, Object serviceBean){
String serviceKey = makeServiceKey(iface, version);
serviceData.put(serviceKey, serviceBean);
logger.info(">>>>>>>>>>> xxl-rpc, provider factory add service success. serviceKey = {}, serviceBean = {}", serviceKey, serviceBean.getClass());
}
通過IDE看看哪里用了addService
方法
發(fā)現(xiàn)在剛才的XxlRpcSpringProviderFactory
就有用到!
看下代碼庶弃,其實很簡單:
- 從spring上下文找到加了
XxlRpcService
注解的bean - 接口名+版本號作為唯一主鍵把bean放入
serviceData
里面
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(XxlRpcService.class);
if (serviceBeanMap!=null && serviceBeanMap.size()>0) {
for (Object serviceBean : serviceBeanMap.values()) {
// valid
if (serviceBean.getClass().getInterfaces().length ==0) {
throw new XxlRpcException("xxl-rpc, service(XxlRpcService) must inherit interface.");
}
// add service
XxlRpcService xxlRpcService = serviceBean.getClass().getAnnotation(XxlRpcService.class);
String iface = serviceBean.getClass().getInterfaces()[0].getName();
String version = xxlRpcService.version();
super.addService(iface, version, serviceBean);
}
}
// TODO衫贬,addServices by api + prop
}
最后一個方法,從方法名就能看出來歇攻,調(diào)用service固惯,接受一個xxlRpcRequest
參數(shù)
從serviceData
里面取出request里面要調(diào)用的bean
通過反射調(diào)用方法并返回response
/**
* invoke service
*
* @param xxlRpcRequest
* @return
*/
public XxlRpcResponse invokeService(XxlRpcRequest xxlRpcRequest) {
// make response
XxlRpcResponse xxlRpcResponse = new XxlRpcResponse();
xxlRpcResponse.setRequestId(xxlRpcRequest.getRequestId());
// match service bean
String serviceKey = makeServiceKey(xxlRpcRequest.getClassName(), xxlRpcRequest.getVersion());
Object serviceBean = serviceData.get(serviceKey);
// valid
if (serviceBean == null) {
xxlRpcResponse.setErrorMsg("The serviceKey["+ serviceKey +"] not found.");
return xxlRpcResponse;
}
if (System.currentTimeMillis() - xxlRpcRequest.getCreateMillisTime() > 3*60*1000) {
xxlRpcResponse.setErrorMsg("The timestamp difference between admin and executor exceeds the limit.");
return xxlRpcResponse;
}
if (accessToken!=null && accessToken.trim().length()>0 && !accessToken.trim().equals(xxlRpcRequest.getAccessToken())) {
xxlRpcResponse.setErrorMsg("The access token[" + xxlRpcRequest.getAccessToken() + "] is wrong.");
return xxlRpcResponse;
}
try {
// invoke
Class<?> serviceClass = serviceBean.getClass();
String methodName = xxlRpcRequest.getMethodName();
Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();
Object[] parameters = xxlRpcRequest.getParameters();
Method method = serviceClass.getMethod(methodName, parameterTypes);
method.setAccessible(true);
Object result = method.invoke(serviceBean, parameters);
/*FastClass serviceFastClass = FastClass.create(serviceClass);
FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
Object result = serviceFastMethod.invoke(serviceBean, parameters);*/
xxlRpcResponse.setResult(result);
} catch (Throwable t) {
// catch error
logger.error("xxl-rpc provider invokeService error.", t);
xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(t));
}
return xxlRpcResponse;
}
ok,到這里為止缴守,XxlRpcProviderFactory
的代碼看完了葬毫,來總結(jié)一下它究竟能干什么事情
- 配置通信協(xié)議,序列化方式屡穗,注冊中心
- 開啟通信server
- 把
serviceData
里所有的provider服務(wù)注冊到注冊中心 - 通過反射機制贴捡,提供調(diào)用服務(wù)的(
invokeService
)方法
看完了XxlRpcProviderFactory
,我們再回到XxlRpcSpringProviderFactory
與父類不同的是鸡捐,提供了netType
默認使用netty
栈暇,序列化默認使用hessian
// ---------------------- config ----------------------
private String netType = NetEnum.NETTY.name();
private String serialize = Serializer.SerializeEnum.HESSIAN.name();
再看看必須實現(xiàn)的幾個接口
這個方法前面已經(jīng)看到過,這里是把所有帶有XxlRpcService
注解的bean放到serverData
這個map里面
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(XxlRpcService.class);
if (serviceBeanMap!=null && serviceBeanMap.size()>0) {
for (Object serviceBean : serviceBeanMap.values()) {
// valid
if (serviceBean.getClass().getInterfaces().length ==0) {
throw new XxlRpcException("xxl-rpc, service(XxlRpcService) must inherit interface.");
}
// add service
XxlRpcService xxlRpcService = serviceBean.getClass().getAnnotation(XxlRpcService.class);
String iface = serviceBean.getClass().getInterfaces()[0].getName();
String version = xxlRpcService.version();
super.addService(iface, version, serviceBean);
}
}
// TODO箍镜,addServices by api + prop
}
最后兩個方法源祈,很簡單,就是配置一下基本參數(shù)色迂,以及調(diào)用父類的方法香缺。
@Override
public void afterPropertiesSet() throws Exception {
this.prepareConfig();
super.start();
}
@Override
public void destroy() throws Exception {
super.stop();
}
這樣一來,XxlRpcSpringProviderFactory
就全部閱讀完了歇僧。我們重新梳理一遍流程XxlRpcInvokerConfig
干的事情
- 配置通信框架(
netty
)图张,序列化框架(hessian
),注冊中心(xxl-registry) - 把使用了
XxlRpcService
注解的bean全部put到Map<String,Object> serviceData
里面诈悍,key為bean繼承的接口名+版本號祸轮,Object為service的bean本身 - 啟動通信框架(
netty
),啟動成功后把serviceData
里面的bean注冊到注冊中心
這樣侥钳,provider就已經(jīng)完全啟動完成了适袜,一切準備就緒,就等客戶端調(diào)用了舷夺!
那么苦酱,我們再看看客戶端的代碼售貌!
老套路,從配置開始看起疫萤,和provider的配置差不多颂跨,就不在贅述
@Configuration
public class XxlRpcInvokerConfig {
private Logger logger = LoggerFactory.getLogger(XxlRpcInvokerConfig.class);
@Value("${xxl-rpc.registry.xxlregistry.address}")
private String address;
@Value("${xxl-rpc.registry.xxlregistry.env}")
private String env;
@Value("${xxl-rpc.registry.xxlregistry.token}")
private String token;
@Bean
public XxlRpcSpringInvokerFactory xxlJobExecutor() {
XxlRpcSpringInvokerFactory invokerFactory = new XxlRpcSpringInvokerFactory();
invokerFactory.setServiceRegistryClass(XxlRegistryServiceRegistry.class);
invokerFactory.setServiceRegistryParam(new HashMap<String, String>(){{
put(XxlRegistryServiceRegistry.XXL_REGISTRY_ADDRESS, address);
put(XxlRegistryServiceRegistry.ENV, env);
put(XxlRegistryServiceRegistry.ACCESS_TOKEN,token);
}});
logger.info(">>>>>>>>>>> xxl-rpc invoker config init finish.");
return invokerFactory;
}
}
直接去XxlRpcSpringInvokerFactory
里面看看吧,InitializingBean
,DisposableBean
不再贅述
實現(xiàn)
BeanFactoryAware
接口的Bean扯饶,在Bean加載的過程中可以獲取到加載該Bean的BeanFactory
InstantiationAwareBeanPostProcessor
作用的是Bean實例化前后恒削,即:
1、Bean構(gòu)造出來之前調(diào)用postProcessBeforeInstantiation()
方法
2帝际、Bean構(gòu)造出來之后調(diào)用postProcessAfterInstantiation()
方法
public class XxlRpcSpringInvokerFactory extends InstantiationAwareBeanPostProcessorAdapter implements InitializingBean,DisposableBean, BeanFactoryAware {
ok蔓同,看看具體類里面的代碼,先看第一段config相關(guān)
這段代碼似曾相識蹲诀,在ProviderFactory里面也有:注冊中心配置
// ---------------------- config ----------------------
private Class<? extends ServiceRegistry> serviceRegistryClass; // class.forname
private Map<String, String> serviceRegistryParam;
public void setServiceRegistryClass(Class<? extends ServiceRegistry> serviceRegistryClass) {
this.serviceRegistryClass = serviceRegistryClass;
}
public void setServiceRegistryParam(Map<String, String> serviceRegistryParam) {
this.serviceRegistryParam = serviceRegistryParam;
}
接著往下看,定義了一個 XxlRpcInvokerFactory
// ---------------------- util ----------------------
private XxlRpcInvokerFactory xxlRpcInvokerFactory;
進到XxlRpcInvokerFactory
里面看看吧
首先很明顯弃揽,這是一個單例模式
然后也配置了注冊中心
public class XxlRpcInvokerFactory {
private static Logger logger = LoggerFactory.getLogger(XxlRpcInvokerFactory.class);
// ---------------------- default instance ----------------------
private static volatile XxlRpcInvokerFactory instance = new XxlRpcInvokerFactory(LocalServiceRegistry.class, null);
public static XxlRpcInvokerFactory getInstance() {
return instance;
}
// ---------------------- config ----------------------
private Class<? extends ServiceRegistry> serviceRegistryClass; // class.forname
private Map<String, String> serviceRegistryParam;
public XxlRpcInvokerFactory() {
}
public XxlRpcInvokerFactory(Class<? extends ServiceRegistry> serviceRegistryClass, Map<String, String> serviceRegistryParam) {
this.serviceRegistryClass = serviceRegistryClass;
this.serviceRegistryParam = serviceRegistryParam;
}
// 略
}
再往下看脯爪,似曾相識的代碼
start
方法是開始想注冊中心注冊
stop
方法先從注冊中心移除,然后在吧stopCallbackList
里面還沒執(zhí)行的方法執(zhí)行了
那什么時候調(diào)用addStopCallBack
把stopCallBack
加進list呢矿微?用IDE搜一搜痕慢,
發(fā)現(xiàn)在JettyClient
和ConnectClient
的時候用到了,好像暫時和我們demo的代碼沒什么關(guān)系涌矢,先放一放
最后把responseCallbackThreadPool
線程池shutDown
了掖举。
// ---------------------- start / stop ----------------------
public void start() throws Exception {
// start registry
if (serviceRegistryClass != null) {
serviceRegistry = serviceRegistryClass.newInstance();
serviceRegistry.start(serviceRegistryParam);
}
}
public void stop() throws Exception {
// stop registry
if (serviceRegistry != null) {
serviceRegistry.stop();
}
// stop callback
if (stopCallbackList.size() > 0) {
for (BaseCallback callback: stopCallbackList) {
try {
callback.run();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
// stop CallbackThreadPool
stopCallbackThreadPool();
}
// ---------------------- service registry ----------------------
private ServiceRegistry serviceRegistry;
public ServiceRegistry getServiceRegistry() {
return serviceRegistry;
}
// ---------------------- service registry ----------------------
private List<BaseCallback> stopCallbackList = new ArrayList<BaseCallback>();
public void addStopCallBack(BaseCallback callback){
stopCallbackList.add(callback);
}
那
responseCallbackThreadPool
線程池是用來干什么的?用IDE搜一搜娜庇,就在stopCallbackThreadPool
上面executeResponseCallback
接受一個Runnable
對象塔次,并初始化線程池,并放入線程池那么
executeResponseCallback
什么時候會被用到名秀?
// ---------------------- response callback ThreadPool ----------------------
private ThreadPoolExecutor responseCallbackThreadPool = null;
public void executeResponseCallback(Runnable runnable){
if (responseCallbackThreadPool == null) {
synchronized (this) {
if (responseCallbackThreadPool == null) {
responseCallbackThreadPool = new ThreadPoolExecutor(
10,
100,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-rpc, XxlRpcInvokerFactory-responseCallbackThreadPool-" + r.hashCode());
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
throw new XxlRpcException("xxl-rpc Invoke Callback Thread pool is EXHAUSTED!");
}
}); // default maxThreads 300, minThreads 60
}
}
}
responseCallbackThreadPool.execute(runnable);
}
public void stopCallbackThreadPool() {
if (responseCallbackThreadPool != null) {
responseCallbackThreadPool.shutdown();
}
}
其實就在上面
定義了一個concurrentMap
励负,參數(shù)是一個string和一個XxlRpcFutureResponse
根據(jù)future這個名字,可以猜一下應(yīng)該是個future(多線程)操作相關(guān)的
set和remove方法看起來就是對map進行一些關(guān)于request的操作
notifyInvokerFuture
方法匕得,從futureResponsePool
根據(jù)requestId
取出一個XxlRpcFutureResponse
對象
然后判斷Response的狀態(tài)继榆,并做一些設(shè)置,然后從futureResponsePool
移出這個requestId
到這里汁掠,還比較蒙逼略吨,大概只能看出,利用了future考阱,對response做一些處理翠忠。
那么問題來了,response是什么時候生產(chǎn)的羔砾?為什么會有一堆callback
方法负间?這樣做的目的是什么偶妖?
因為沒有看到調(diào)用遠程服務(wù)的代碼,看不懂很正常政溃!
// ---------------------- future-response pool ----------------------
// XxlRpcFutureResponseFactory
private ConcurrentMap<String, XxlRpcFutureResponse> futureResponsePool = new ConcurrentHashMap<String, XxlRpcFutureResponse>();
public void setInvokerFuture(String requestId, XxlRpcFutureResponse futureResponse){
futureResponsePool.put(requestId, futureResponse);
}
public void removeInvokerFuture(String requestId){
futureResponsePool.remove(requestId);
}
public void notifyInvokerFuture(String requestId, final XxlRpcResponse xxlRpcResponse){
// get
final XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId);
if (futureResponse == null) {
return;
}
// notify
if (futureResponse.getInvokeCallback()!=null) {
// callback type
try {
executeResponseCallback(new Runnable() {
@Override
public void run() {
if (xxlRpcResponse.getErrorMsg() != null) {
futureResponse.getInvokeCallback().onFailure(new XxlRpcException(xxlRpcResponse.getErrorMsg()));
} else {
futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult());
}
}
});
}catch (Exception e) {
logger.error(e.getMessage(), e);
}
} else {
// other nomal type
futureResponse.setResponse(xxlRpcResponse);
}
// do remove
futureResponsePool.remove(requestId);
}
讓我們回到XxlRpcSpringInvokerFactory
趾访,還剩最后一個方法postProcessAfterInstantiation
看看都干了些什么吧
取出加了XxlRpcReference
注解的字段(field)
組裝成XxlRpcReferenceBean
,并根據(jù)名字猜測董虱,通過這個bean得到一個service的代理對象扼鞋!
@Override
public boolean postProcessAfterInstantiation(final Object bean, final String beanName) throws BeansException {
// collection
final Set<String> serviceKeyList = new HashSet<>();
// parse XxlRpcReferenceBean
ReflectionUtils.doWithFields(bean.getClass(), new ReflectionUtils.FieldCallback() {
@Override
public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {
if (field.isAnnotationPresent(XxlRpcReference.class)) {
// valid
Class iface = field.getType();
if (!iface.isInterface()) {
throw new XxlRpcException("xxl-rpc, reference(XxlRpcReference) must be interface.");
}
XxlRpcReference rpcReference = field.getAnnotation(XxlRpcReference.class);
// init reference bean
XxlRpcReferenceBean referenceBean = new XxlRpcReferenceBean(
rpcReference.netType(),
rpcReference.serializer().getSerializer(),
rpcReference.callType(),
rpcReference.loadBalance(),
iface,
rpcReference.version(),
rpcReference.timeout(),
rpcReference.address(),
rpcReference.accessToken(),
null,
xxlRpcInvokerFactory
);
Object serviceProxy = referenceBean.getObject();
// set bean
field.setAccessible(true);
field.set(bean, serviceProxy);
logger.info(">>>>>>>>>>> xxl-rpc, invoker factory init reference bean success. serviceKey = {}, bean.field = {}.{}",
XxlRpcProviderFactory.makeServiceKey(iface.getName(), rpcReference.version()), beanName, field.getName());
// collection
String serviceKey = XxlRpcProviderFactory.makeServiceKey(iface.getName(), rpcReference.version());
serviceKeyList.add(serviceKey);
}
}
});
// mult discovery
if (xxlRpcInvokerFactory.getServiceRegistry() != null) {
try {
xxlRpcInvokerFactory.getServiceRegistry().discovery(serviceKeyList);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
return super.postProcessAfterInstantiation(bean, beanName);
}
看看getObject
做了什么吧
- 配置一個動態(tài)代理(猜測就是調(diào)用遠程服務(wù)用的)
- 根據(jù)
XxlRpcInvokerFactory
配置的注冊中心,查找provider地址 - 通過通信框架愤诱,把數(shù)據(jù)發(fā)送到provider機器
-
NettyClientHandler
獲得響應(yīng)的時候云头, 會調(diào)用futureResponse.setResponse(xxlRpcResponse);
把拿到的response放進futureResponse里面 - 再通過
XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
拿到response(同步調(diào)用)
// ---------------------- util ----------------------
public Object getObject() {
return Proxy.newProxyInstance(Thread.currentThread()
.getContextClassLoader(), new Class[] { iface },
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// method param
String className = method.getDeclaringClass().getName(); // iface.getName()
String varsion_ = version;
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
Object[] parameters = args;
// filter for generic
if (className.equals(XxlRpcGenericService.class.getName()) && methodName.equals("invoke")) {
Class<?>[] paramTypes = null;
if (args[3]!=null) {
String[] paramTypes_str = (String[]) args[3];
if (paramTypes_str.length > 0) {
paramTypes = new Class[paramTypes_str.length];
for (int i = 0; i < paramTypes_str.length; i++) {
paramTypes[i] = ClassUtil.resolveClass(paramTypes_str[i]);
}
}
}
className = (String) args[0];
varsion_ = (String) args[1];
methodName = (String) args[2];
parameterTypes = paramTypes;
parameters = (Object[]) args[4];
}
// filter method like "Object.toString()"
if (className.equals(Object.class.getName())) {
logger.info(">>>>>>>>>>> xxl-rpc proxy class-method not support [{}#{}]", className, methodName);
throw new XxlRpcException("xxl-rpc proxy class-method not support");
}
// address
String finalAddress = address;
if (finalAddress==null || finalAddress.trim().length()==0) {
if (invokerFactory!=null && invokerFactory.getServiceRegistry()!=null) {
// discovery
String serviceKey = XxlRpcProviderFactory.makeServiceKey(className, varsion_);
TreeSet<String> addressSet = invokerFactory.getServiceRegistry().discovery(serviceKey);
// load balance
if (addressSet==null || addressSet.size()==0) {
// pass
} else if (addressSet.size()==1) {
finalAddress = addressSet.first();
} else {
finalAddress = loadBalance.xxlRpcInvokerRouter.route(serviceKey, addressSet);
}
}
}
if (finalAddress==null || finalAddress.trim().length()==0) {
throw new XxlRpcException("xxl-rpc reference bean["+ className +"] address empty");
}
// request
XxlRpcRequest xxlRpcRequest = new XxlRpcRequest();
xxlRpcRequest.setRequestId(UUID.randomUUID().toString());
xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis());
xxlRpcRequest.setAccessToken(accessToken);
xxlRpcRequest.setClassName(className);
xxlRpcRequest.setMethodName(methodName);
xxlRpcRequest.setParameterTypes(parameterTypes);
xxlRpcRequest.setParameters(parameters);
// send
if (CallType.SYNC == callType) {
// future-response set
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
try {
// do invoke
client.asyncSend(finalAddress, xxlRpcRequest);
// future get
XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
if (xxlRpcResponse.getErrorMsg() != null) {
throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
}
return xxlRpcResponse.getResult();
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
} finally{
// future-response remove
futureResponse.removeInvokerFuture();
}
} else if (CallType.FUTURE == callType) {
// future-response set
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
try {
// invoke future set
XxlRpcInvokeFuture invokeFuture = new XxlRpcInvokeFuture(futureResponse);
XxlRpcInvokeFuture.setFuture(invokeFuture);
// do invoke
client.asyncSend(finalAddress, xxlRpcRequest);
return null;
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
// future-response remove
futureResponse.removeInvokerFuture();
throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
}
} else if (CallType.CALLBACK == callType) {
// get callback
XxlRpcInvokeCallback finalInvokeCallback = invokeCallback;
XxlRpcInvokeCallback threadInvokeCallback = XxlRpcInvokeCallback.getCallback();
if (threadInvokeCallback != null) {
finalInvokeCallback = threadInvokeCallback;
}
if (finalInvokeCallback == null) {
throw new XxlRpcException("xxl-rpc XxlRpcInvokeCallback(CallType="+ CallType.CALLBACK.name() +") cannot be null.");
}
// future-response set
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, finalInvokeCallback);
try {
client.asyncSend(finalAddress, xxlRpcRequest);
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
// future-response remove
futureResponse.removeInvokerFuture();
throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
}
return null;
} else if (CallType.ONEWAY == callType) {
client.asyncSend(finalAddress, xxlRpcRequest);
return null;
} else {
throw new XxlRpcException("xxl-rpc callType["+ callType +"] invalid");
}
}
});
}
回到前面的代碼
getObject
的代理之后,為field設(shè)置了代理淫半,并把serviceKey
(接口名+版本號)放在里一個set里面溃槐。
現(xiàn)先總結(jié)一下XxlRpcInvokerConfig
干的事情吧
- 初始化一個
XxlRpcSpringInvokerFactory
的bean - 配置注冊中心,通信框架科吭,序列化框架
- 向注冊中心注冊
- 為
@XxlRpcReference
注解的field配置代理
是不是似曾相識昏滴,沒錯,和provider的config其實幾乎一樣对人。
那我們從consumer角度看看谣殊,一次調(diào)用是怎么完成的吧!看完了牺弄,前面的疑問應(yīng)該都能解決了姻几,吧
看看調(diào)用代碼,用@XxlRpcReference
注解了provider提供的接口
然后直接通過接口調(diào)用方法势告。
ok蛇捌,那么所有貓膩都在@XxlRpcReference
這個注解里面了
@Controller
public class IndexController {
@XxlRpcReference
private DemoService demoService;
@RequestMapping("")
@ResponseBody
public UserDTO http(String name) {
try {
return demoService.sayHi(name);
} catch (Exception e) {
e.printStackTrace();
return new UserDTO(null, e.getMessage());
}
}
}
先看看這個注解本身吧
給了幾個默認值:默認使用netty,使用HESSIAN培慌,使用同步調(diào)用豁陆,負責(zé)均衡方式是輪詢
@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface XxlRpcReference {
NetEnum netType() default NetEnum.NETTY;
Serializer.SerializeEnum serializer() default Serializer.SerializeEnum.HESSIAN;
CallType callType() default CallType.SYNC;
LoadBalance loadBalance() default LoadBalance.ROUND;
//Class<?> iface;
String version() default "";
long timeout() default 1000;
String address() default "";
String accessToken() default "";
//XxlRpcInvokeCallback invokeCallback() ;
}
搜一搜那里對這個注解做了處理吧
似曾相識,就是前面提到過的XxlRpcSpringInvokerFactory
的postProcessAfterInstantiation
方法吵护!
所以整個調(diào)用過程應(yīng)該就是調(diào)用代理方法的過程盒音。
這樣整個客戶端調(diào)用過程就比較清晰了
- 初始化的時候,配置客戶端的通信框架馅而,序列化框架祥诽,注冊中心
- 通過掃描
@XxlRpcReference
注解,初始化provider提供的接口的代理 - 進行遠程調(diào)用的時候瓮恭,實際是代理調(diào)用
- 通過代理通信協(xié)議客戶端和注冊中心雄坪,向provider請求數(shù)據(jù)
發(fā)一次請求,下個斷點看看
果然進到這個代理調(diào)用里面了
看看服務(wù)端是怎么接收請求的吧屯蹦,有了前面的服務(wù)端代碼閱讀和客戶端調(diào)用代碼閱讀维哈,其實服務(wù)端的就比較簡單了
長話短說:
在裝載XxlRpcSpringProviderFactory
的時候會掃描@XxlRpcService
注解的類绳姨,并把它作為service放進(put
)服務(wù)map里面
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(XxlRpcService.class);
if (serviceBeanMap!=null && serviceBeanMap.size()>0) {
for (Object serviceBean : serviceBeanMap.values()) {
// valid
if (serviceBean.getClass().getInterfaces().length ==0) {
throw new XxlRpcException("xxl-rpc, service(XxlRpcService) must inherit interface.");
}
// add service
XxlRpcService xxlRpcService = serviceBean.getClass().getAnnotation(XxlRpcService.class);
String iface = serviceBean.getClass().getInterfaces()[0].getName();
String version = xxlRpcService.version();
super.addService(iface, version, serviceBean);
}
}
// TODO,addServices by api + prop
}
當發(fā)生遠程調(diào)用的時候阔挠,會調(diào)用invokeService
方法飘庄,主要的代碼就是這段通過反射來獲取真正需要調(diào)用的方法
try {
// invoke
Class<?> serviceClass = serviceBean.getClass();
String methodName = xxlRpcRequest.getMethodName();
Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();
Object[] parameters = xxlRpcRequest.getParameters();
Method method = serviceClass.getMethod(methodName, parameterTypes);
method.setAccessible(true);
Object result = method.invoke(serviceBean, parameters);
/*FastClass serviceFastClass = FastClass.create(serviceClass);
FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
Object result = serviceFastMethod.invoke(serviceBean, parameters);*/
xxlRpcResponse.setResult(result);
} catch (Throwable t) {
// catch error
logger.error("xxl-rpc provider invokeService error.", t);
xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(t));
}
ok,到這里整個同步調(diào)用的流程就比較清楚了购撼。我們在回顧一下它吹噓的功能跪削。
- 1、快速接入:接入步驟非常簡潔迂求,兩分鐘即可上手碾盐;
確實還行
- 2、服務(wù)透明:系統(tǒng)完整的封裝了底層通信細節(jié)揩局,開發(fā)時調(diào)用遠程服務(wù)就像調(diào)用本地服務(wù)毫玖,在提供遠程調(diào)用能力時不損失本地調(diào)用的語義簡潔性;
通過掃描注解和反射的方式谐腰,做到了本地調(diào)用的效果
- 3孕豹、多調(diào)用方案:支持 SYNC、ONEWAY十气、FUTURE、CALLBACK 等方案春霍;
現(xiàn)在我們只試過同步調(diào)用(SYNC)砸西,接下來看看其他調(diào)用方式吧
ONEWAY
用ONEWAY模式拿到的返回值是null。
provider的代碼如下址儒,其實就是發(fā)起了一個不需要結(jié)果的調(diào)用
else if (CallType.ONEWAY == callType) {
client.asyncSend(finalAddress, xxlRpcRequest);
return null;
}
FUTURE
直接返回null芹枷,拿到結(jié)果要從future里面get
else if (CallType.FUTURE == callType) {
// future-response set
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
try {
// invoke future set
XxlRpcInvokeFuture invokeFuture = new XxlRpcInvokeFuture(futureResponse);
XxlRpcInvokeFuture.setFuture(invokeFuture);
// do invoke
client.asyncSend(finalAddress, xxlRpcRequest);
return null;
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
// future-response remove
futureResponse.removeInvokerFuture();
throw (e instanceof XxlRpcException) ? e : new XxlRpcException(e);
}
}
@RequestMapping("")
@ResponseBody
public UserDTO http(String name) {
try {
// dto is null
UserDTO dto = demoService.sayHi(name);
Future<UserDTO> future = XxlRpcInvokeFuture.getFuture(UserDTO.class);
return future.get();
} catch (Exception e) {
e.printStackTrace();
return new UserDTO(null, e.getMessage());
}
}
callback
調(diào)用完成后會調(diào)用onSuccess
或onFailure
方法
else if (CallType.CALLBACK == callType) {
// get callback
XxlRpcInvokeCallback finalInvokeCallback = invokeCallback;
XxlRpcInvokeCallback threadInvokeCallback = XxlRpcInvokeCallback.getCallback();
if (threadInvokeCallback != null) {
finalInvokeCallback = threadInvokeCallback;
}
if (finalInvokeCallback == null) {
throw new XxlRpcException("xxl-rpc XxlRpcInvokeCallback(CallType=" + CallType.CALLBACK.name() + ") cannot be null.");
}
// future-response set
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, finalInvokeCallback);
try {
client.asyncSend(finalAddress, xxlRpcRequest);
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
// future-response remove
futureResponse.removeInvokerFuture();
throw (e instanceof XxlRpcException) ? e : new XxlRpcException(e);
}
return null;
}
XxlRpcInvokeCallback.setCallback(new XxlRpcInvokeCallback<UserDTO>() {
@Override
public void onSuccess(UserDTO result) {
System.out.println(result);
}
@Override
public void onFailure(Throwable exception) {
exception.printStackTrace();
}
});
demoService.sayHi("[CALLBACK]jack");
ps:這部分有很多值得深度閱讀的地方,暫時
todo
- 4莲趣、多通訊方案:支持 TCP 和 HTTP 兩種通訊方式進行服務(wù)調(diào)用鸳慈;其中 TCP 提供可選方案 NETTY 或
MINA ,HTTP 提供可選方案 NETTY_HTTP 或 Jetty喧伞;
通過繼承同一接口來完成調(diào)用方的細節(jié)隱藏
- 5走芋、多序列化方案:支持 HESSIAN、HESSIAN1潘鲫、PROTOSTUFF翁逞、KRYO、JACKSON 等方案溉仑;
和通信的方式差不多
- 6挖函、負載均衡/軟負載:提供豐富的負載均衡策略,包括:輪詢浊竟、隨機怨喘、LRU津畸、LFU、一致性HASH等必怜;
這部分比較簡單肉拓,不在贅述
- 7、注冊中心:可選組件棚赔,支持服務(wù)注冊并動態(tài)發(fā)現(xiàn)帝簇;可選擇不啟用,直接指定服務(wù)提供方機器地址通訊靠益;選擇啟用時丧肴,內(nèi)置可選方案:“XXL-REGISTRY 輕量級注冊中心”(推薦)、“ZK注冊中心”胧后、“Local注冊中心”等芋浮;
前面已經(jīng)提到
- 8、服務(wù)治理:提供服務(wù)治理中心壳快,可在線管理注冊的服務(wù)信息纸巷,如服務(wù)鎖定、禁用等眶痰;
通過注冊中心實現(xiàn)
- 9瘤旨、服務(wù)監(jiān)控:可在線監(jiān)控服務(wù)調(diào)用統(tǒng)計信息以及服務(wù)健康狀況等(計劃中);
pass
- 10竖伯、容錯:服務(wù)提供方集群注冊時存哲,某個服務(wù)節(jié)點不可用時將會自動摘除,同時消費方將會移除失效節(jié)點將流量分發(fā)到其余節(jié)點七婴,提高系統(tǒng)容錯能力祟偷。
通過注冊中心實現(xiàn)
- 11、解決1+1問題:傳統(tǒng)分布式通訊一般通過nginx或f5做集群服務(wù)的流量負載均衡打厘,每次請求在到達目標服務(wù)機器之前都需要經(jīng)過負載均衡機器修肠,即1+1,這將會把流量放大一倍户盯。而XXL-RPC將會從消費方直達
服務(wù)提供方嵌施,每次請求直達目標機器,從而可以避免上述問題先舷;
通過注冊中心實現(xiàn)
- 12艰管、高兼容性:得益于優(yōu)良的兼容性與模塊化設(shè)計,不限制外部框架蒋川;除 spring/springboot 環(huán)境之外牲芋,理論上支持運行在任何Java代碼中,甚至main方法直接啟動運行;
demo里面有缸浦,比較簡單夕冲,就是不通過反射,手動創(chuàng)建對象裂逐。
- 13歹鱼、泛化調(diào)用:服務(wù)調(diào)用方不依賴服務(wù)方提供的API;
同上
ok卜高,就剩下一個todo了弥姻,那就是各種調(diào)用方案的具體實現(xiàn)
先todo了...