遠(yuǎn)程服務(wù)調(diào)用
在上一步,我們實(shí)現(xiàn)了一個(gè)服務(wù)發(fā)布程序唠亚,這一步我們要進(jìn)行客戶端調(diào)用链方。
實(shí)現(xiàn)細(xì)節(jié)
- 序列化,客戶端和服務(wù)端使用相同的序列化協(xié)議灶搜,也就是protostuff侄柔。
- 網(wǎng)絡(luò),也使用netty發(fā)起客戶端請(qǐng)求占调,客戶端連接保持長(zhǎng)鏈接暂题。
- 動(dòng)態(tài)代理,使用jdk原生的動(dòng)態(tài)代理究珊,也就是必須基于接口生成代理類薪者。rpc調(diào)用本身需要依賴服務(wù)端API,因此沒有問題剿涮。至于不需要依賴API的泛化調(diào)用言津,不是這一步考慮的問題。
設(shè)計(jì)
類圖概況
整體的類圖如下:
時(shí)序圖
重要類描述
- ConsumerFactory取试,負(fù)責(zé)根據(jù)提供的ConsumerDescriptor生成代理類悬槽,內(nèi)部使用的JDK動(dòng)態(tài)代理。
- RemoteProcedureInvoker瞬浓,consumerFactory生產(chǎn)的代理類只是一個(gè)門面初婆,內(nèi)部實(shí)際調(diào)用委托給此類。這是最核心的一個(gè)骨架類猿棉。
- ServiceInstanceSelector磅叛,服務(wù)實(shí)例選擇器,根據(jù)制定的ConsumerDescriptor獲取一個(gè)遠(yuǎn)程的服務(wù)實(shí)例(即ServiceInstance)萨赁,ServiceInstance表示具體的服務(wù)實(shí)例弊琴,如集群中的某一個(gè)機(jī)器上的某一個(gè)服務(wù)。
- 目前先實(shí)現(xiàn)一個(gè)直連的實(shí)現(xiàn)杖爽,DirectServiceInstanceSelector敲董,直接根據(jù)Consumer配置的信息返回實(shí)例,不查詢注冊(cè)中心之類的組件慰安。未來可以有其它實(shí)現(xiàn)腋寨。
- ConnectionSupplier,實(shí)際是connection的工廠類泻帮,根據(jù)ServiceInstance生成ConsumingConnection精置,實(shí)現(xiàn)的時(shí)候需要考慮ServiceInstance指向的服務(wù)的協(xié)議,目前不考慮多協(xié)議锣杂。
- 默認(rèn)提供基于Netty NIO的長(zhǎng)鏈接實(shí)現(xiàn)脂倦,序列化使用Protostuff。
- ServiceInstanceSelector磅叛,服務(wù)實(shí)例選擇器,根據(jù)制定的ConsumerDescriptor獲取一個(gè)遠(yuǎn)程的服務(wù)實(shí)例(即ServiceInstance)萨赁,ServiceInstance表示具體的服務(wù)實(shí)例弊琴,如集群中的某一個(gè)機(jī)器上的某一個(gè)服務(wù)。
代碼實(shí)現(xiàn)
RemoteProcedureInvoker
RemoteServiceAccessor
package io.destinyshine.storks.consumer;
import ...
/**
* a base class for RemoteProcedureInvoker
*
* @author liujianyu.ljy
* @date 2017/09/07
*/
@Slf4j
public abstract class RemoteServiceAccessor {
protected ServiceInstanceSelector serviceInstanceSelector;
protected ConnectionSupplier connectionSupplier;
protected ConsumingConnection obtainConnection(ConsumerDescriptor<?> desc) throws Exception {
ServiceKey serviceKey = ServiceKey.of(desc);
Optional<ServiceInstance> instOpt = serviceInstanceSelector.select(desc);
if (instOpt.isPresent()) {
ServiceInstance inst = instOpt.get();
return (connectionSupplier.getConnection(inst));
} else {
throw new ServiceNotFoundException("cannot found service of " + serviceKey + " in registry.");
}
}
public void setServiceInstanceSelector(ServiceInstanceSelector serviceInstanceSelector) {
this.serviceInstanceSelector = serviceInstanceSelector;
}
public void setConnectionSupplier(ConnectionSupplier connectionSupplier) {
this.connectionSupplier = connectionSupplier;
}
}
DefaultRemoteProcedureInvoker
package io.destinyshine.storks.consumer;
import ...
/**
* @author destinyliu
*/
@Slf4j
public class DefaultRemoteProcedureInvoker extends RemoteServiceAccessor implements RemoteProcedureInvoker {
@Override
public ResponseMessage invoke(ConsumerDescriptor desc, RequestMessage requestMessage) throws Exception {
ConsumingConnection connection = obtainConnection(desc);
Future<ResponseMessage> responsePromise = connection.sendRequest(requestMessage);
return responsePromise.get();
}
}
ServiceInstanceSelector
DirectServiceInstanceSelector
package io.destinyshine.storks.consumer.connect;
import ...
/**
* 只支持直連的服務(wù)選擇器
*
* @author liujianyu
* @date 2017/09/03
*/
public class DirectServiceInstanceSelector implements ServiceInstanceSelector {
@Override
public Optional<ServiceInstance> select(ConsumerDescriptor desc) {
if (desc.isDirect()) {
ServiceInstance inst = ServiceInstance.builder()
.host(desc.getRemoteHost())
.port(desc.getRemotePort())
.serviceInterface(desc.getServiceInterface().getName())
.serviceVersion(desc.getServiceVersion())
.build();
return Optional.of(inst);
}
return Optional.empty();
}
}
ConnectionSupplier
AbstractConnectionSupplier
package io.destinyshine.storks.consumer.connect;
import ...
/**
* @author destinyliu
*/
@Slf4j
public abstract class AbstractConnectionSupplier implements ConnectionSupplier {
private Map<ServiceInstance, ConsumingConnection> connectionCache = new ConcurrentHashMap<>();
@Override
public synchronized ConsumingConnection getConnection(ServiceInstance instance) throws Exception {
ConsumingConnection con = connectionCache.computeIfAbsent(instance,
instance1 -> {
try {
return createConnectionInternal(instance1);
} catch (Exception e) {
e.printStackTrace();
}
return null;
});
return con;
}
/**
* create connection
*
* @param instance
* @return
* @throws Exception
*/
protected abstract ConsumingConnection createConnectionInternal(ServiceInstance instance) throws Exception;
public void shutdown() {
logger.warn("shutting down connectionManager...");
connectionCache.forEach((serviceKey, con) -> {
logger.warn("closing all connections of serviceKey={}", serviceKey);
try {
con.close();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
logger.warn(
"closed connection of serviceKey={}, {}",
serviceKey,
con
);
logger.warn("closed all connections of serviceKey={}", serviceKey);
});
logger.warn("shutdown connectionManager finished.");
}
}
SocketChannelConsumingConnectionSupplier
package io.destinyshine.storks.consumer.connect;
import ...
/**
* NIO consuming connection supplier.
*
* @author liujianyu
* @date 2017/09/03
*/
@Slf4j
public class SocketChannelConsumingConnectionSupplier extends AbstractConnectionSupplier implements ConnectionSupplier {
@Override
public ConsumingConnection createConnectionInternal(ServiceInstance instance) throws Exception {
logger.info("will create connection of instance {}", instance);
String remoteHost = instance.getHost();
int remotePort = instance.getPort();
SocketChannelConsumingConnection con = new SocketChannelConsumingConnection(remoteHost, remotePort);
con.connect();
return con;
}
}
ConsumingConnection
在ConsumingConnection的實(shí)現(xiàn)過程中元莫,我們使用基于Netty 的NIO方式實(shí)現(xiàn)赖阻。由于NIO的response返回是異步的,并且發(fā)送Request后不會(huì)阻塞線程知道遠(yuǎn)程服務(wù)端返回詳細(xì)踱蠢;所以需要維護(hù)一個(gè)ConcurrentLinkedQueue<Promise<ResponseMessage>>隊(duì)列火欧,當(dāng)發(fā)送一個(gè)請(qǐng)求的時(shí)候,增加一個(gè)Promise到隊(duì)列中茎截,并返回這個(gè)Promise苇侵。收到響應(yīng)的時(shí)候取隊(duì)頭的Promise設(shè)置結(jié)果,在Promise上等待的線程可收到結(jié)果企锌∮芘ǎ可參考下面的代碼。
Netty中的Promise是Future的子接口撕攒,類似于java8自帶的CompletableFuture陡鹃,能夠增加監(jiān)聽器監(jiān)聽其是否完成,也能夠手動(dòng)設(shè)置結(jié)果抖坪∑季ǎ可參考java8的CompletableFuture.
實(shí)現(xiàn)代碼。
package io.destinyshine.storks.consumer.support;
import ...
/**
* @author liujianyu
*/
public class SocketChannelConsumingConnection implements AutoCloseable, ConsumingConnection {
private final Logger logger = LoggerFactory.getLogger(getClass());
private ConcurrentLinkedQueue<Promise<ResponseMessage>> responsePromises = new ConcurrentLinkedQueue<>();
private final String remoteHost;
private final int remotePort;
private Channel channel;
private long connectedTime;
public SocketChannelConsumingConnection(String remoteHost, int remotePort) {
this.remoteHost = remoteHost;
this.remotePort = remotePort;
}
@Override
public void connect() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group);
bootstrap.channel(NioSocketChannel.class);
bootstrap.remoteAddress(new InetSocketAddress(remoteHost, remotePort));
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new ProtostuffEncoder<>(RequestMessage.class))
.addLast(Protocol.newFrameDecoder())
.addLast(new ProtostuffDecoder<>(ResponseMessage.class, ResponseMessage::new))
.addLast(new SimpleChannelInboundHandler<ResponseMessage>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ResponseMessage msg)
throws Exception {
Promise<ResponseMessage> promise;
if ((promise = responsePromises.poll()) != null) {
promise.setSuccess(msg);
} else {
logger.error("remote server closed!");
}
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect().sync();
this.channel = channelFuture.channel();
this.connectedTime = System.currentTimeMillis();
channelFuture.addListener(future -> {
if (future.isSuccess()) {
logger.debug(future.toString() + "client connected");
} else {
logger.debug(future.toString() + "server attemp failed", future.cause());
}
});
} finally {
}
}
@Override
public Promise<ResponseMessage> sendRequest(RequestMessage requestMessage) {
Promise<ResponseMessage> promise = this.channel.eventLoop().newPromise();
this.responsePromises.add(promise);
this.channel.writeAndFlush(requestMessage);
return promise;
}
@Override
public String toString() {
return "SocketChannelConsumingConnection{" +
"remoteHost='" + remoteHost + '\'' +
", remotePort=" + remotePort +
'}';
}
@Override
public void close() {
channel.close().awaitUninterruptibly();
}
}
運(yùn)行客戶端
同樣使用原始的方式執(zhí)行客戶端代碼擦俐。
在這個(gè)實(shí)例中脊阴,用多個(gè)線程發(fā)起并行RPC調(diào)用,試驗(yàn)是否能夠正確響應(yīng)蚯瞧,多個(gè)線程之間是否會(huì)錯(cuò)亂蹬叭。經(jīng)過試驗(yàn)多線程正常調(diào)用和響應(yīng)。
package io.destinyshine.storks.sample.service;
import ...
/**
* @author liujianyu
*/
public class DirectClientMain {
private static final Logger logger = LoggerFactory.getLogger(DirectClientMain.class);
public static void main(String[] args) throws Exception {
DefaultRemoteProcedureInvoker invoker = new DefaultRemoteProcedureInvoker();
invoker.setConnectionSupplier(new SocketChannelConsumingConnectionSupplier());
invoker.setServiceInstanceSelector(new DirectServiceInstanceSelector());
ConsumerDescriptor<HelloService> desc = ConsumerBuilder
.ofServiceInterface(HelloService.class)
.remoteServer("127.0.0.1")
.remotePort(39874)
.serviceVersion("1.0.0")
.direct(true)
.build();
ConsumerFactory consumerFactory = new DefaultConsumerFactory(invoker);
HelloService helloServiceConsumer = consumerFactory.getConsumer(desc);
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
int finalI = i;
executorService.submit(() -> {
String input = null;
String result = null;
try {
input = "tom,direct," + Thread.currentThread().getName() + "," + finalI;
result = helloServiceConsumer.hello(input);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
logger.info("input={}, get result: {}", input, result);
});
}
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
invoker.close();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}));
}
}