在上一篇對(duì)整個(gè)Client端的調(diào)用邏輯我們做了分析,這一章對(duì)Server端的源碼做一些分析骑脱,整個(gè)Server端的類繼承關(guān)系如下造寝。
整個(gè)的繼承關(guān)系還是主要分兩個(gè)分支
其中AbstractServer 主要工作在transport層
ExchangeServer 主要工作在exchange層。
我們從下往上的進(jìn)行分析焙糟。
AbstractServer的源碼如下
public abstract class AbstractServer extends AbstractEndpoint implements Server {
//server對(duì)應(yīng)的線程池的名字
protected static final String SERVER_THREAD_POOL_NAME = "DubboServerHandler";
private static final Logger logger = LoggerFactory.getLogger(AbstractServer.class);
//對(duì)應(yīng)的線程池
ExecutorService executor;
//local地址
private InetSocketAddress localAddress;
//bind的地址
private InetSocketAddress bindAddress;
//可以連接的客戶端數(shù)(也就是Channel的數(shù)量)
private int accepts;
//idle超時(shí)時(shí)間
private int idleTimeout = 600; //600 seconds
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
//綁定的本地地址(由url進(jìn)行獲然饶谩)
localAddress = getUrl().toInetSocketAddress();
//綁定的ip
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
//綁定的端口號(hào)
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = NetUtils.ANYHOST;
}
//由ip和port得到綁定的地址
bindAddress = new InetSocketAddress(bindIp, bindPort);
//得到容許連接的客戶端數(shù)量
this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
//idle的超時(shí)時(shí)間
this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
try {
//不用說晾捏,有子類實(shí)現(xiàn)
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
//fixme replace this with better method
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
//得到共享線程池
executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}
protected abstract void doOpen() throws Throwable;
protected abstract void doClose() throws Throwable;
//通過url重置server的屬性
@Override
public void reset(URL url) {
if (url == null) {
return;
}
try {
if (url.hasParameter(Constants.ACCEPTS_KEY)) {
int a = url.getParameter(Constants.ACCEPTS_KEY, 0);
if (a > 0) {
//重置最大連接數(shù)
this.accepts = a;
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
try {
if (url.hasParameter(Constants.IDLE_TIMEOUT_KEY)) {
int t = url.getParameter(Constants.IDLE_TIMEOUT_KEY, 0);
if (t > 0) {
//重置idle超時(shí)時(shí)間
this.idleTimeout = t;
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
try {
//重置線程池參數(shù)
if (url.hasParameter(Constants.THREADS_KEY)
&& executor instanceof ThreadPoolExecutor && !executor.isShutdown()) {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
int threads = url.getParameter(Constants.THREADS_KEY, 0);
int max = threadPoolExecutor.getMaximumPoolSize();
int core = threadPoolExecutor.getCorePoolSize();
if (threads > 0 && (threads != max || threads != core)) {
if (threads < core) {
threadPoolExecutor.setCorePoolSize(threads);
if (core == max) {
threadPoolExecutor.setMaximumPoolSize(threads);
}
} else {
threadPoolExecutor.setMaximumPoolSize(threads);
if (core == max) {
threadPoolExecutor.setCorePoolSize(threads);
}
}
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
super.setUrl(getUrl().addParameters(url.getParameters()));
}
//向連接的所有的Client端的Channel發(fā)送消息
@Override
public void send(Object message, boolean sent) throws RemotingException {
Collection<Channel> channels = getChannels();
for (Channel channel : channels) {
if (channel.isConnected()) {
channel.send(message, sent);
}
}
}
//關(guān)閉服務(wù)端
@Override
public void close() {
if (logger.isInfoEnabled()) {
logger.info("Close " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
ExecutorUtil.shutdownNow(executor, 100);
try {
super.close();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
doClose();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
@Override
public void close(int timeout) {
ExecutorUtil.gracefulShutdown(executor, timeout);
close();
}
@Override
public InetSocketAddress getLocalAddress() {
return localAddress;
}
public InetSocketAddress getBindAddress() {
return bindAddress;
}
public int getAccepts() {
return accepts;
}
public int getIdleTimeout() {
return idleTimeout;
}
//處理Channel的connected 事件 進(jìn)行檢查,是否超過了accepts數(shù)量
@Override
public void connected(Channel ch) throws RemotingException {
// If the server has entered the shutdown process, reject any new connection
if (this.isClosing() || this.isClosed()) {
logger.warn("Close new channel " + ch + ", cause: server is closing or has been closed. For example, receive a new connect request while in shutdown process.");
ch.close();
return;
}
Collection<Channel> channels = getChannels();
if (accepts > 0 && channels.size() > accepts) {
logger.error("Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts);
ch.close();
return;
}
super.connected(ch);
}
//處理Channel的disconnected事件
@Override
public void disconnected(Channel ch) throws RemotingException {
Collection<Channel> channels = getChannels();
if (channels.isEmpty()) {
logger.warn("All clients has discontected from " + ch.getLocalAddress() + ". You can graceful shutdown now.");
}
super.disconnected(ch);
}
}
還是選取NettyServer作為AbstractServer的實(shí)現(xiàn)類來進(jìn)行分析哀托。源碼如下
public class NettyServer extends AbstractServer implements Server {
private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
//遠(yuǎn)程Client與Channel的映射關(guān)系惦辛,這里的Channel是dubbo的Channel
private Map<String, Channel> channels; // <ip:port, channel>
//服務(wù)端的啟動(dòng)類
private ServerBootstrap bootstrap;
//服務(wù)端對(duì)應(yīng)的channel
private io.netty.channel.Channel channel;
//主Group
private EventLoopGroup bossGroup;
//工作Group
private EventLoopGroup workerGroup;
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
@Override
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
//直接通過nettyServerHandler拿到所有關(guān)聯(lián)的channels
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
//將最后創(chuàng)建的nettyServerHandler注冊(cè)為handler
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
//拿到server對(duì)應(yīng)的channel
channel = channelFuture.channel();
}
@Override
protected void doClose() throws Throwable {
try {
if (channel != null) {
// unbind.
channel.close();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
Collection<com.alibaba.dubbo.remoting.Channel> channels = getChannels();
if (channels != null && channels.size() > 0) {
for (com.alibaba.dubbo.remoting.Channel channel : channels) {
try {
channel.close();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
if (bootstrap != null) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
if (channels != null) {
channels.clear();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
@Override
public Collection<Channel> getChannels() {
Collection<Channel> chs = new HashSet<Channel>();
for (Channel channel : this.channels.values()) {
if (channel.isConnected()) {
chs.add(channel);
} else {
channels.remove(NetUtils.toAddressString(channel.getRemoteAddress()));
}
}
return chs;
}
@Override
public Channel getChannel(InetSocketAddress remoteAddress) {
return channels.get(NetUtils.toAddressString(remoteAddress));
}
@Override
public boolean isBound() {
return channel.isActive();
}
}
我們比較好奇NettyServer里面的channels是怎么來的,這個(gè)channels是在NettyServerHandler里面進(jìn)行維護(hù)的仓手,那NettyServerHandler的channels是怎么來的呢胖齐,因?yàn)镹ettyServerHandler是NettyServer出入站處理器,所以所有的netty事件的處理都由他負(fù)責(zé)嗽冒,于是dubbo的開發(fā)者們通過裝飾模式呀伙,緩存了所有的生效的channels,關(guān)鍵源碼如下
//緩存的channel信息
private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>(); // <ip:port, channel>
//重寫channelActive方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//為什么不用super調(diào)用添坊?剿另??
ctx.fireChannelActive();
//如果對(duì)應(yīng)的Channel生效贬蛙,轉(zhuǎn)換成dubbo的channel并緩存
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
if (channel != null) {
//在這里進(jìn)一步緩存
channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()), channel);
}
handler.connected(channel);
} finally {
//如果失效雨女,進(jìn)行緩存的移除
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
//重寫失效方法
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//移除緩存
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
//移除
channels.remove(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()));
handler.disconnected(channel);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
如上我們通過裝飾模式,就可以很方便的將所有的channel進(jìn)行緩存阳准,并返回給上層進(jìn)行使用了氛堕。
剛才的AbstractServer都是工作在transport層,我們接著分析exchange層的ExchangeServer野蝇。
public interface ExchangeServer extends Server {
//返回所有關(guān)聯(lián)的ExchangeChannel
Collection<ExchangeChannel> getExchangeChannels();
//返回指定remoteAddress(也就是客戶端)關(guān)聯(lián)的ExchangeChannel
ExchangeChannel getExchangeChannel(InetSocketAddress remoteAddress);
}
HeaderExchangeServer
public class HeaderExchangeServer implements ExchangeServer {
protected final Logger logger = LoggerFactory.getLogger(getClass());
//這個(gè)線程池是實(shí)時(shí)的刷新關(guān)聯(lián)的Client端的Channel
private final ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1,
new NamedThreadFactory(
"dubbo-remoting-server-heartbeat",
true));
private final Server server;
// heartbeat timer
private ScheduledFuture<?> heartbeatTimer;
// heartbeat timeout (ms), default value is 0 , won't execute a heartbeat.
private int heartbeat;
private int heartbeatTimeout;
private AtomicBoolean closed = new AtomicBoolean(false);
public HeaderExchangeServer(Server server) {
if (server == null) {
throw new IllegalArgumentException("server == null");
}
this.server = server;
this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
if (heartbeatTimeout < heartbeat * 2) {
throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
}
//開始刷新關(guān)聯(lián)的Client端的Channel
startHeartbeatTimer();
}
public Server getServer() {
return server;
}
@Override
public boolean isClosed() {
return server.isClosed();
}
private boolean isRunning() {
Collection<Channel> channels = getChannels();
for (Channel channel : channels) {
//只要有一個(gè)客戶端的channel連著岔擂,都算是runnning狀態(tài)
if (channel.isConnected()) {
return true;
}
}
return false;
}
@Override
public void close() {
doClose();
server.close();
}
@Override
public void close(final int timeout) {
startClose();
if (timeout > 0) {
final long max = (long) timeout;
final long start = System.currentTimeMillis();
if (getUrl().getParameter(Constants.CHANNEL_SEND_READONLYEVENT_KEY, true)) {
sendChannelReadOnlyEvent();
}
while (HeaderExchangeServer.this.isRunning()
&& System.currentTimeMillis() - start < max) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
logger.warn(e.getMessage(), e);
}
}
}
doClose();
server.close(timeout);
}
@Override
public void startClose() {
server.startClose();
}
//向所有關(guān)聯(lián)的Client端的Channel發(fā)送準(zhǔn)備關(guān)閉的消息
private void sendChannelReadOnlyEvent() {
Request request = new Request();
request.setEvent(Request.READONLY_EVENT);
request.setTwoWay(false);
request.setVersion(Version.getProtocolVersion());
Collection<Channel> channels = getChannels();
for (Channel channel : channels) {
try {
if (channel.isConnected())
channel.send(request, getUrl().getParameter(Constants.CHANNEL_READONLYEVENT_SENT_KEY, true));
} catch (RemotingException e) {
logger.warn("send cannot write message error.", e);
}
}
}
private void doClose() {
if (!closed.compareAndSet(false, true)) {
return;
}
//停止刷新
stopHeartbeatTimer();
try {
scheduled.shutdown();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
//將transport層的channel封裝成exchange層的channel
@Override
public Collection<ExchangeChannel> getExchangeChannels() {
Collection<ExchangeChannel> exchangeChannels = new ArrayList<ExchangeChannel>();
//其實(shí)這個(gè)server就是NettyServer
Collection<Channel> channels = server.getChannels();
if (channels != null && !channels.isEmpty()) {
for (Channel channel : channels) {
exchangeChannels.add(HeaderExchangeChannel.getOrAddChannel(channel));
}
}
return exchangeChannels;
}
@Override
public ExchangeChannel getExchangeChannel(InetSocketAddress remoteAddress) {
Channel channel = server.getChannel(remoteAddress);
return HeaderExchangeChannel.getOrAddChannel(channel);
}
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Collection<Channel> getChannels() {
return (Collection) getExchangeChannels();
}
@Override
public Channel getChannel(InetSocketAddress remoteAddress) {
return getExchangeChannel(remoteAddress);
}
@Override
public boolean isBound() {
return server.isBound();
}
@Override
public InetSocketAddress getLocalAddress() {
return server.getLocalAddress();
}
@Override
public URL getUrl() {
return server.getUrl();
}
@Override
public ChannelHandler getChannelHandler() {
return server.getChannelHandler();
}
@Override
public void reset(URL url) {
server.reset(url);
try {
if (url.hasParameter(Constants.HEARTBEAT_KEY)
|| url.hasParameter(Constants.HEARTBEAT_TIMEOUT_KEY)) {
int h = url.getParameter(Constants.HEARTBEAT_KEY, heartbeat);
int t = url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, h * 3);
if (t < h * 2) {
throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
}
if (h != heartbeat || t != heartbeatTimeout) {
heartbeat = h;
heartbeatTimeout = t;
startHeartbeatTimer();
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
}
@Override
@Deprecated
public void reset(com.alibaba.dubbo.common.Parameters parameters) {
reset(getUrl().addParameters(parameters.getParameters()));
}
@Override
public void send(Object message) throws RemotingException {
if (closed.get()) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The server " + getLocalAddress() + " is closed!");
}
server.send(message);
}
@Override
public void send(Object message, boolean sent) throws RemotingException {
if (closed.get()) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The server " + getLocalAddress() + " is closed!");
}
server.send(message, sent);
}
private void startHeartbeatTimer() {
stopHeartbeatTimer();
if (heartbeat > 0) {
heartbeatTimer = scheduled.scheduleWithFixedDelay(
new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
@Override
public Collection<Channel> getChannels() {
return Collections.unmodifiableCollection(
HeaderExchangeServer.this.getChannels());
}
}, heartbeat, heartbeatTimeout),
heartbeat, heartbeat, TimeUnit.MILLISECONDS);
}
}
private void stopHeartbeatTimer() {
try {
ScheduledFuture<?> timer = heartbeatTimer;
if (timer != null && !timer.isCancelled()) {
timer.cancel(true);
}
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
} finally {
heartbeatTimer = null;
}
}
}
通過這樣的分析之后位喂,我們可以看到Server端的實(shí)現(xiàn)與Client的實(shí)現(xiàn)一脈相承浪耘,如果認(rèn)真點(diǎn)看乱灵,難度不大