如上所示伟葫,在dubbo的自介里面拧晕,是這樣介紹exchange層與transport層的
exchange 信息交換層:封裝請求響應(yīng)模式十酣,同步轉(zhuǎn)異步,以 Request, Response 為中心蚊丐,擴(kuò)展接口為 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer
transport 網(wǎng)絡(luò)傳輸層:抽象 mina 和 netty 為統(tǒng)一接口,以 Message 為中心艳吠,擴(kuò)展接口為 Channel, Transporter, Client, Server, Codec
exchange封裝語義信息麦备,等于說操作的是Request Response的語義封裝,而transport層做為傳輸層是不care任何的語義信息的昭娩,它只負(fù)責(zé)單向的數(shù)據(jù)傳輸凛篙。
為了理解上面所述,我們先看下dubbo里面提供的Endpoint接口题禀。
public interface Endpoint {
//當(dāng)前Endpoint對應(yīng)的url(這個(gè)url是dubbo提供的一個(gè)信息載體)
URL getUrl();
//當(dāng)前Endpoint對應(yīng)的ChannelHandler
ChannelHandler getChannelHandler();
//當(dāng)前Endpoint對應(yīng)的本地地址
InetSocketAddress getLocalAddress();
//使用Endpoint發(fā)送消息
void send(Object message) throws RemotingException;
//與上相同鞋诗,添加的sent用于冪等校驗(yàn)
void send(Object message, boolean sent) throws RemotingException;
//關(guān)閉Endpoint
void close();
//在timeout時(shí)間內(nèi)關(guān)閉Endpoint
void close(int timeout);
//開始關(guān)閉Endpoint
void startClose();
//Endpoint是否已經(jīng)關(guān)閉
boolean isClosed();
}
dubbo為什么抽象出Endpoint的概念,因?yàn)閷τ贑lient和Service來說迈嘹,其實(shí)都是一個(gè)Endpoint削彬,只是語義上的不同,兩者在物理上是p2p的概念秀仲,所以用Endpoint來統(tǒng)一抽象一下融痛。如下
Client和Service都extends于Endpoint,這里有個(gè)很關(guān)鍵的概念要說下神僵,dubbo里面的Client到底是個(gè)啥雁刷,Client就是對應(yīng)一個(gè)虛擬機(jī)嗎?如果你這樣理解的話保礼,會(huì)有很多不理解的地方沛励,在dubbo里面,每個(gè)DubboInvoker都內(nèi)置了一個(gè)ExchangeClient[] clients;默認(rèn)情況下是一個(gè)client炮障,所以如果refer的dubbo Provider 服務(wù)處于不同的虛擬機(jī)的話目派,一個(gè)虛擬機(jī)里面會(huì)有多個(gè)Client(根據(jù)refer的地址來緩存共享Client)。
所以看到上面的圖我們就明白了胁赢,為啥Client是繼承Channel的企蹭,因?yàn)橐粋€(gè)Client只關(guān)聯(lián)到一個(gè)遠(yuǎn)程的Provider的server,那么其關(guān)聯(lián)的Channel也就是一對一的智末,所以我們可以這樣的繼承和實(shí)現(xiàn)谅摄。
我們接著看下Channel接口的實(shí)現(xiàn),Channel是連接p2p的通道系馆,
public interface Channel extends Endpoint {
//拿到遠(yuǎn)程地址送漠,從這個(gè)方法的語義出發(fā),我們可以看大Channel是關(guān)聯(lián)到Client上的
InetSocketAddress getRemoteAddress();
//是否已經(jīng)連接由蘑,那么肯定是針對服務(wù)端來說的螺男,就是是否已經(jīng)連接上了服務(wù)端
boolean isConnected();
boolean hasAttribute(String key);
Object getAttribute(String key);
void setAttribute(String key, Object value);
void removeAttribute(String key);
}
關(guān)于Attribute的幾個(gè)簡單方法我們先不談棒厘,從Channel的方法的語義上分析,我們可以定位到Channel是為Client而準(zhǔn)備的下隧。
故而我們可以看到Client直接的實(shí)現(xiàn)了對Channel的繼承,源碼如下
public interface Client extends Endpoint, Channel, Resetable {
//從新連接
void reconnect() throws RemotingException;
@Deprecated
void reset(com.alibaba.dubbo.common.Parameters parameters);
}
我們可以看到Client只是新增了一個(gè)reconnect方法谓媒,因?yàn)槟J(rèn)是使用tcp來打開keepalive連接的,那么如果發(fā)生網(wǎng)絡(luò)抖動(dòng)句惯,我們是需要重連的土辩,保證通道的可用。
而接口Resetable抢野,表明實(shí)現(xiàn)類可以通過url進(jìn)行屬性的重新設(shè)置拷淘,在前面說過,dubbo里面的peer端都是通過url這個(gè)對象進(jìn)行屬性的更新和設(shè)置的指孤。
public interface Resetable {
void reset(URL url);
}
我們再看下Server接口的定義
public interface Server extends Endpoint, Resetable {
//是否已經(jīng)綁定到本地启涯,也就是是否啟動(dòng)成功,因?yàn)槭荢erver恃轩,所以不用connect到遠(yuǎn)程
//只需要監(jiān)聽bind的本地port就可以
boolean isBound();
//拿到所有連接到此Server的Channel结洼,可以認(rèn)為這個(gè)Channel是對Client的抽象
Collection<Channel> getChannels();
//根據(jù)遠(yuǎn)程地址(也就是Client的地址)返回綁定的Channel
Channel getChannel(InetSocketAddress remoteAddress);
@Deprecated //作廢,不介紹叉跛,使用Resetable里面的reset方法松忍。
void reset(com.alibaba.dubbo.common.Parameters parameters);
}
由于exchange 信息交換層:封裝請求響應(yīng)模式,同步轉(zhuǎn)異步筷厘,以 Request, Response 為中心鸣峭,擴(kuò)展接口為 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer
我們通過看前面的接口發(fā)現(xiàn)Channel只有單向send能力(繼承于Endpoint接口),而ExchangeChannel位于exchange層酥艳,具有了接收response的能力摊溶,我們看接口源碼
public interface ExchangeChannel extends Channel {
//抽象出request方法,在Endpont的簡單的send的基礎(chǔ)上抽象出了request和response的能力
ResponseFuture request(Object request) throws RemotingException;
//帶超時(shí)時(shí)間的request方法
ResponseFuture request(Object request, int timeout) throws RemotingException;
//后面再分析這個(gè)
ExchangeHandler getExchangeHandler();
//指定時(shí)間內(nèi)關(guān)閉這個(gè)close玖雁。
void close(int timeout);
}
而針對 Channel的不同的狀態(tài)事件更扁,我們綁定ChannelHandler來進(jìn)行邏輯上的處理,接口定義如下
@SPI
public interface ChannelHandler {
//當(dāng)channel connected時(shí)的處理方法
void connected(Channel channel) throws RemotingException;
//當(dāng)channel disconnected時(shí)的處理方法
void disconnected(Channel channel) throws RemotingException;
//當(dāng)channel send message時(shí)的處理方法
void sent(Channel channel, Object message) throws RemotingException;
//當(dāng) channel received message時(shí)的處理方法
void received(Channel channel, Object message) throws RemotingException;
//當(dāng)channel caught exception時(shí)的處理方法
void caught(Channel channel, Throwable exception) throws RemotingException;
}
ChannelHandler不關(guān)心語義的處理赫冬,而ExchangeHandler關(guān)心語義浓镜,所以其添加了reply方法,如下
public interface ExchangeHandler extends ChannelHandler, TelnetHandler {
//其中的request是具有語義的request或是response
Object reply(ExchangeChannel channel, Object request) throws RemotingException;
}
好劲厌,看了如上之后膛薛,我們看下transport層的Channel的繼承體系,如下
可以看到transport的Channel對Netty补鼻,Grizzly哄啄,Mina的Channel做了統(tǒng)一的封裝雅任,這樣用戶可以靈活的使用spi進(jìn)行配置。
我們看下AbstractChannel的源碼如下
public abstract class AbstractChannel extends AbstractPeer implements Channel {
public AbstractChannel(URL url, ChannelHandler handler) {
super(url, handler);
}
@Override
public void send(Object message, boolean sent) throws RemotingException {
if (isClosed()) {
throw new RemotingException(this, "Failed to send message "
+ (message == null ? "" : message.getClass().getName()) + ":" + message
+ ", cause: Channel closed. channel: " + getLocalAddress() + " -> " + getRemoteAddress());
}
}
@Override
public String toString() {
return getLocalAddress() + " -> " + getRemoteAddress();
}
}
基本沒有實(shí)現(xiàn)什么功能咨跌,只是簡單的做了一個(gè)Channel的狀態(tài)的判斷沪么。所有的實(shí)現(xiàn)都由子類來實(shí)現(xiàn)。AbstractPeer只是為了抽象出p2p的概念锌半,很簡單禽车,不用介紹。
由于最出名的nio框架還是netty刊殉,我們選取NettyChannel進(jìn)行分析殉摔,如下
final class NettyChannel extends AbstractChannel {
private static final Logger logger = LoggerFactory.getLogger(NettyChannel.class);
//這里封裝了netty的Channel與dubbo自己封裝的NettyChannel的緩存關(guān)系
private static final ConcurrentMap<Channel, NettyChannel> channelMap = new ConcurrentHashMap<Channel, NettyChannel>();
//這個(gè)是實(shí)際引用的netty的Channel
private final Channel channel;
//屬性
private final Map<String, Object> attributes = new ConcurrentHashMap<String, Object>();
//構(gòu)造函數(shù)為私有,所以只能自己調(diào)用
private NettyChannel(Channel channel, URL url, ChannelHandler handler) {
super(url, handler);
if (channel == null) {
throw new IllegalArgumentException("netty channel == null;");
}
this.channel = channel; //典型的適配器模式记焊,將netty的Channel適配成了dubbo的NettyChannel
}
//將netty的Channel封裝成dubbo的NettyChannel
static NettyChannel getOrAddChannel(Channel ch, URL url, ChannelHandler handler) {
if (ch == null) {
return null;
}
NettyChannel ret = channelMap.get(ch);
if (ret == null) {
NettyChannel nettyChannel = new NettyChannel(ch, url, handler);
if (ch.isActive()) {
ret = channelMap.putIfAbsent(ch, nettyChannel);
}
if (ret == null) {
ret = nettyChannel;
}
}
return ret;
}
//如果Channel Disconnected 逸月,對緩存進(jìn)行移除
static void removeChannelIfDisconnected(Channel ch) {
if (ch != null && !ch.isActive()) {
channelMap.remove(ch);
}
}
//典型的代理模式
public InetSocketAddress getLocalAddress() {
return (InetSocketAddress) channel.localAddress();
}
//拿到遠(yuǎn)程Server的地址
public InetSocketAddress getRemoteAddress() {
return (InetSocketAddress) channel.remoteAddress();
}
//是否Connected
public boolean isConnected() {
return !isClosed() && channel.isActive();
}
@Override
public void send(Object message, boolean sent) throws RemotingException {
super.send(message, sent);
boolean success = true;
int timeout = 0;
try {
ChannelFuture future = channel.writeAndFlush(message);
if (sent) {
timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
success = future.await(timeout);
}
Throwable cause = future.cause();
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
}
if (!success) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
+ "in timeout(" + timeout + "ms) limit");
}
}
@Override
public void close() {
try {
super.close();
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
try {
removeChannelIfDisconnected(channel);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
try {
attributes.clear();
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
try {
if (logger.isInfoEnabled()) {
logger.info("Close netty channel " + channel);
}
channel.close();
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
@Override
public boolean hasAttribute(String key) {
return attributes.containsKey(key);
}
@Override
public Object getAttribute(String key) {
return attributes.get(key);
}
@Override
public void setAttribute(String key, Object value) {
if (value == null) { // The null value unallowed in the ConcurrentHashMap.
attributes.remove(key);
} else {
attributes.put(key, value);
}
}
@Override
public void removeAttribute(String key) {
attributes.remove(key);
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((channel == null) ? 0 : channel.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null) return false;
if (getClass() != obj.getClass()) return false;
NettyChannel other = (NettyChannel) obj;
if (channel == null) {
if (other.channel != null) return false;
} else if (!channel.equals(other.channel)) return false;
return true;
}
@Override
public String toString() {
return "NettyChannel [channel=" + channel + "]";
}
}
如上可以看到NettyChannel的能力都是借用了netty的Channel的能力。NettyChannel只是做了簡單的封裝遍膜。
由于在客戶端碗硬,Channel與Client一一對應(yīng),我們看下transport層的Client的繼承體系捌归,如下
從繼承關(guān)系看由于AbstractClient繼承于ChannelHandler肛响,所以其擁有了自我handler的能力。
我們先看下AbstractPeer的源碼如下
public abstract class AbstractPeer implements Endpoint, ChannelHandler {
//代理的ChannelHandler
private final ChannelHandler handler;
//url就是配置信息
private volatile URL url;
//是否在關(guān)閉中
private volatile boolean closing;
//是否已經(jīng)關(guān)閉
private volatile boolean closed;
public AbstractPeer(URL url, ChannelHandler handler) {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
this.url = url;
this.handler = handler;
}
@Override
public void send(Object message) throws RemotingException {
send(message, url.getParameter(Constants.SENT_KEY, false));
}
@Override
public void close() {
closed = true;
}
@Override
public void close(int timeout) {
close();
}
@Override
public void startClose() {
if (isClosed()) {
return;
}
closing = true;
}
@Override
public URL getUrl() {
return url;
}
protected void setUrl(URL url) {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
this.url = url;
}
@Override
public ChannelHandler getChannelHandler() {
if (handler instanceof ChannelHandlerDelegate) {
return ((ChannelHandlerDelegate) handler).getHandler();
} else {
return handler;
}
}
/**
* @return ChannelHandler
*/
@Deprecated
public ChannelHandler getHandler() {
return getDelegateHandler();
}
/**
* Return the final handler (which may have been wrapped). This method should be distinguished with getChannelHandler() method
*
* @return ChannelHandler
*/
public ChannelHandler getDelegateHandler() {
return handler;
}
@Override
public boolean isClosed() {
return closed;
}
public boolean isClosing() {
return closing && !closed;
}
@Override
public void connected(Channel ch) throws RemotingException {
if (closed) {
return;
}
handler.connected(ch);
}
@Override
public void disconnected(Channel ch) throws RemotingException {
handler.disconnected(ch);
}
@Override
public void sent(Channel ch, Object msg) throws RemotingException {
if (closed) {
return;
}
handler.sent(ch, msg);
}
@Override
public void received(Channel ch, Object msg) throws RemotingException {
if (closed) {
return;
}
handler.received(ch, msg);
}
@Override
public void caught(Channel ch, Throwable ex) throws RemotingException {
handler.caught(ch, ex);
}
}
如上可以看到AbstractPeer內(nèi)部引用了一個(gè)ChannelHandler來實(shí)現(xiàn)ChannelHandler的功能惜索,典型的裝飾模式特笋。
我們再看看AbstractEndpoint的源碼如下
public abstract class AbstractEndpoint extends AbstractPeer implements Resetable {
private static final Logger logger = LoggerFactory.getLogger(AbstractEndpoint.class);
//內(nèi)部封裝的編解碼器,對于transport層的Endpoint來說巾兆,我操作的都是字符數(shù)組猎物,怎么編解碼需要Codec2來負(fù)責(zé)黏包,半包的問題角塑。
private Codec2 codec;
//超時(shí)時(shí)間
private int timeout;
//鏈接超時(shí)時(shí)間
private int connectTimeout;
public AbstractEndpoint(URL url, ChannelHandler handler) {
super(url, handler);
//通過url拿到自定義的(如果有的話)的codec
this.codec = getChannelCodec(url);
//通過url拿到自定義的(如果有的話)的timeout
this.timeout = url.getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// //通過url拿到自定義的(如果有的話)的connectTimeout
this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);
}
protected static Codec2 getChannelCodec(URL url) {
String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");
if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);
} else {
return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class)
.getExtension(codecName));
}
}
//根據(jù)url重置timeout與connectTimeout屬性
@Override
public void reset(URL url) {
if (isClosed()) {
throw new IllegalStateException("Failed to reset parameters "
+ url + ", cause: Channel closed. channel: " + getLocalAddress());
}
try {
if (url.hasParameter(Constants.TIMEOUT_KEY)) {
int t = url.getParameter(Constants.TIMEOUT_KEY, 0);
if (t > 0) {
this.timeout = t;
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
try {
if (url.hasParameter(Constants.CONNECT_TIMEOUT_KEY)) {
int t = url.getParameter(Constants.CONNECT_TIMEOUT_KEY, 0);
if (t > 0) {
this.connectTimeout = t;
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
try {
if (url.hasParameter(Constants.CODEC_KEY)) {
this.codec = getChannelCodec(url);
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
}
@Deprecated
public void reset(com.alibaba.dubbo.common.Parameters parameters) {
reset(getUrl().addParameters(parameters.getParameters()));
}
protected Codec2 getCodec() {
return codec;
}
protected int getTimeout() {
return timeout;
}
protected int getConnectTimeout() {
return connectTimeout;
}
}
如上可以看到AbstractEndpoint封裝了Codec2和兩個(gè)超時(shí)的屬性蔫磨。
我們再看下AbstractClient。
public abstract class AbstractClient extends AbstractEndpoint implements Client {
//對應(yīng)的線程池的名字的前綴
protected static final String CLIENT_THREAD_POOL_NAME = "DubboClientHandler";
private static final Logger logger = LoggerFactory.getLogger(AbstractClient.class);
//對應(yīng)的線程池的名字id
private static final AtomicInteger CLIENT_THREAD_POOL_ID = new AtomicInteger();
//定時(shí)任務(wù)圃伶,用于client的connect狀態(tài)的檢測和重連
private static final ScheduledThreadPoolExecutor reconnectExecutorService = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("DubboClientReconnectTimer", true));
//connect時(shí)的鎖
private final Lock connectLock = new ReentrantLock();
//在send的時(shí)候堤如,如果send_reconnect為ture,那么會(huì)檢查下client的狀態(tài)窒朋,如果是disconnect狀態(tài)搀罢,那么會(huì)發(fā)起連接
private final boolean send_reconnect;
//重連的次數(shù)的統(tǒng)計(jì)
private final AtomicInteger reconnect_count = new AtomicInteger(0);
// Reconnection error log has been called before?
private final AtomicBoolean reconnect_error_log_flag = new AtomicBoolean(false);
// reconnect warning period. Reconnect warning interval (log warning after how many times) //for test
private final int reconnect_warning_period;
private final long shutdown_timeout;
//client對應(yīng)的線程池
protected volatile ExecutorService executor;
//重連返回的結(jié)果的占位符Future
private volatile ScheduledFuture<?> reconnectExecutorFuture = null;
// the last successed connected time
private long lastConnectedTime = System.currentTimeMillis();
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
//默認(rèn)是false
send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
//默認(rèn)15分鐘
shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT);
// The default reconnection interval is 2s, 1800 means warning interval is 1 hour.
reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800);
try {
//子類實(shí)現(xiàn)
doOpen();
} catch (Throwable t) {
close();
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
try {
// connect.
connect();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
}
} catch (RemotingException t) {
if (url.getParameter(Constants.CHECK_KEY, true)) {
close();
throw t;
} else {
logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
}
} catch (Throwable t) {
close();
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
//得到這個(gè)Client關(guān)聯(lián)的線程池
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class)
.getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
ExtensionLoader.getExtensionLoader(DataStore.class)
.getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
}
protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
return ChannelHandlers.wrap(handler, url);
}
/**
* @param url
* @return 0-false Reconnect間隔時(shí)間
*/
private static int getReconnectParam(URL url) {
int reconnect;
String param = url.getParameter(Constants.RECONNECT_KEY);
if (param == null || param.length() == 0 || "true".equalsIgnoreCase(param)) {
reconnect = Constants.DEFAULT_RECONNECT_PERIOD;
} else if ("false".equalsIgnoreCase(param)) {
reconnect = 0;
} else {
try {
reconnect = Integer.parseInt(param);
} catch (Exception e) {
throw new IllegalArgumentException("reconnect param must be nonnegative integer or false/true. input is:" + param);
}
if (reconnect < 0) {
throw new IllegalArgumentException("reconnect param must be nonnegative integer or false/true. input is:" + param);
}
}
return reconnect;
}
/**
* init reconnect thread 這個(gè)重連線程會(huì)在client斷開的時(shí)候發(fā)起重連
*/
private synchronized void initConnectStatusCheckCommand() {
//reconnect=false to close reconnect
int reconnect = getReconnectParam(getUrl());
if (reconnect > 0 && (reconnectExecutorFuture == null || reconnectExecutorFuture.isCancelled())) {
Runnable connectStatusCheckCommand = new Runnable() {
@Override
public void run() {
try {
if (!isConnected()) {
connect();
} else {
lastConnectedTime = System.currentTimeMillis();
}
} catch (Throwable t) {
String errorMsg = "client reconnect to " + getUrl().getAddress() + " find error . url: " + getUrl();
// wait registry sync provider list
if (System.currentTimeMillis() - lastConnectedTime > shutdown_timeout) {
if (!reconnect_error_log_flag.get()) {
reconnect_error_log_flag.set(true);
logger.error(errorMsg, t);
return;
}
}
if (reconnect_count.getAndIncrement() % reconnect_warning_period == 0) {
logger.warn(errorMsg, t);
}
}
}
};
reconnectExecutorFuture = reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand, reconnect, reconnect, TimeUnit.MILLISECONDS);
}
}
//不進(jìn)行重連的定時(shí)操作
private synchronized void destroyConnectStatusCheckCommand() {
try {
if (reconnectExecutorFuture != null && !reconnectExecutorFuture.isDone()) {
reconnectExecutorFuture.cancel(true);
reconnectExecutorService.purge();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
//創(chuàng)建新的線程池
protected ExecutorService createExecutor() {
return Executors.newCachedThreadPool(new NamedThreadFactory(CLIENT_THREAD_POOL_NAME + CLIENT_THREAD_POOL_ID.incrementAndGet() + "-" + getUrl().getAddress(), true));
}
public InetSocketAddress getConnectAddress() {
return new InetSocketAddress(NetUtils.filterLocalHost(getUrl().getHost()), getUrl().getPort());
}
@Override
public InetSocketAddress getRemoteAddress() {
Channel channel = getChannel();
if (channel == null)
return getUrl().toInetSocketAddress();
return channel.getRemoteAddress();
}
@Override
public InetSocketAddress getLocalAddress() {
Channel channel = getChannel();
if (channel == null)
return InetSocketAddress.createUnresolved(NetUtils.getLocalHost(), 0);
return channel.getLocalAddress();
}
//還是拋給了channel去判斷
@Override
public boolean isConnected() {
Channel channel = getChannel();
if (channel == null)
return false;
return channel.isConnected();
}
@Override
public Object getAttribute(String key) {
Channel channel = getChannel();
if (channel == null)
return null;
return channel.getAttribute(key);
}
@Override
public void setAttribute(String key, Object value) {
Channel channel = getChannel();
if (channel == null)
return;
channel.setAttribute(key, value);
}
@Override
public void removeAttribute(String key) {
Channel channel = getChannel();
if (channel == null)
return;
channel.removeAttribute(key);
}
@Override
public boolean hasAttribute(String key) {
Channel channel = getChannel();
if (channel == null)
return false;
return channel.hasAttribute(key);
}
@Override
public void send(Object message, boolean sent) throws RemotingException {
//如果send_reconnect為true,那么進(jìn)行狀態(tài)檢測侥猩,如果狀態(tài)是非連接態(tài)榔至,進(jìn)行重連
if (send_reconnect && !isConnected()) {
connect();
}
//留給子類實(shí)現(xiàn)
Channel channel = getChannel();
//TODO Can the value returned by getChannel() be null? need improvement.
if (channel == null || !channel.isConnected()) {
throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
}
//通過channel將消息發(fā)出去
channel.send(message, sent);
}
//發(fā)起連接
protected void connect() throws RemotingException {
connectLock.lock();
try {
if (isConnected()) {
return;
}
initConnectStatusCheckCommand();
//核心方法由子類實(shí)現(xiàn)
doConnect();
if (!isConnected()) {
throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", cause: Connect wait timeout: " + getConnectTimeout() + "ms.");
} else {
if (logger.isInfoEnabled()) {
logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", channel is " + this.getChannel());
}
}
reconnect_count.set(0);
reconnect_error_log_flag.set(false);
} catch (RemotingException e) {
throw e;
} catch (Throwable e) {
throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", cause: " + e.getMessage(), e);
} finally {
connectLock.unlock();
}
}
public void disconnect() {
connectLock.lock();
try {
destroyConnectStatusCheckCommand();
try {
Channel channel = getChannel();
if (channel != null) {
channel.close();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
//核心方法由子類實(shí)現(xiàn)
doDisConnect();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
} finally {
connectLock.unlock();
}
}
//提供重連的方法,先斷開欺劳,再連接
@Override
public void reconnect() throws RemotingException {
disconnect();
connect();
}
//close的時(shí)候先銷毀外圍資源
@Override
public void close() {
try {
if (executor != null) {
ExecutorUtil.shutdownNow(executor, 100);
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
super.close();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
disconnect();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
doClose();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
@Override
public void close(int timeout) {
ExecutorUtil.gracefulShutdown(executor, timeout);
close();
}
@Override
public String toString() {
return getClass().getName() + " [" + getLocalAddress() + " -> " + getRemoteAddress() + "]";
}
/**
* Open client.
*
* @throws Throwable
*/
protected abstract void doOpen() throws Throwable;
/**
* Close client.
*
* @throws Throwable
*/
protected abstract void doClose() throws Throwable;
/**
* Connect to server.
*
* @throws Throwable
*/
protected abstract void doConnect() throws Throwable;
/**
* disConnect to server.
*
* @throws Throwable
*/
protected abstract void doDisConnect() throws Throwable;
/**
* Get the connected channel.
*
* @return channel
*/
protected abstract Channel getChannel();
}
如上我們可以看到AbstractClient對Client的鏈接態(tài)做了判斷和重連機(jī)制唧取,所有的核心實(shí)現(xiàn)還是由子類來實(shí)現(xiàn)我們看下子類是怎么實(shí)現(xiàn)的铅鲤,由于netty最為有名,我們選取NettyClient做分析枫弟。
源碼如下
public class NettyClient extends AbstractClient {
private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);
private static final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(Constants.DEFAULT_IO_THREADS, new DefaultThreadFactory("NettyClientWorker", true));
//netty客戶端啟動(dòng)類
private Bootstrap bootstrap;
//volatile,使用引用的方式進(jìn)行使用
private volatile Channel channel; // volatile, please copy reference to use
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
//wrapChannelHandler我們后面分析
super(url, wrapChannelHandler(url, handler));
}
//開啟客戶端邢享,其實(shí)就是對Client進(jìn)行設(shè)置,為connect前做準(zhǔn)備
@Override
protected void doOpen() throws Throwable {
final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
bootstrap = new Bootstrap();
bootstrap.group(nioEventLoopGroup)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
//緩存使用池化技術(shù)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
//.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
.channel(NioSocketChannel.class);
//鏈接超時(shí)時(shí)間設(shè)置成3秒之上
if (getConnectTimeout() < 3000) {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
} else {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());
}
bootstrap.handler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
//設(shè)置編碼器
.addLast("decoder", adapter.getDecoder())
//設(shè)置解碼器
.addLast("encoder", adapter.getEncoder())
//設(shè)置handler媒区,其實(shí)就是本身
.addLast("handler", nettyClientHandler);
}
});
}
@Override
protected void doConnect() throws Throwable {
long start = System.currentTimeMillis();
ChannelFuture future = bootstrap.connect(getConnectAddress());
try {
boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
//發(fā)起連接
if (ret && future.isSuccess()) {
Channel newChannel = future.channel();
try {
//銷毀原來關(guān)聯(lián)的老的channel
// Close old channel
Channel oldChannel = NettyClient.this.channel; // copy reference
if (oldChannel != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
}
oldChannel.close();
} finally {
NettyChannel.removeChannelIfDisconnected(oldChannel);
}
}
} finally {
if (NettyClient.this.isClosed()) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close new netty channel " + newChannel + ", because the client closed.");
}
newChannel.close();
} finally {
NettyClient.this.channel = null;
NettyChannel.removeChannelIfDisconnected(newChannel);
}
} else {
//將新生成的channl賦值給當(dāng)前驼仪,這樣就完成了整個(gè)的初始化
NettyClient.this.channel = newChannel;
}
}
} else if (future.cause() != null) {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + ", error message is:" + future.cause().getMessage(), future.cause());
} else {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + " client-side timeout "
+ getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
}
} finally {
if (!isConnected()) {
//future.cancel(true);
}
}
}
//關(guān)閉連接,其實(shí)就是移除緩存袜漩,為啥不把這個(gè)nioEventLoopGroup也干掉呢,因?yàn)檫@個(gè)是共享的湾碎,別人還在用
@Override
protected void doDisConnect() throws Throwable {
try {
NettyChannel.removeChannelIfDisconnected(channel);
} catch (Throwable t) {
logger.warn(t.getMessage());
}
}
@Override
protected void doClose() throws Throwable {
//can't shutdown nioEventLoopGroup
//nioEventLoopGroup.shutdownGracefully();
}
//返回當(dāng)前的channel并將其緩存并封裝成dubbo的channel進(jìn)行返回
@Override
protected com.alibaba.dubbo.remoting.Channel getChannel() {
Channel c = channel;
if (c == null || !c.isActive())
return null;
return NettyChannel.getOrAddChannel(c, getUrl(), this);
}
}
如上的代碼還是比較容易的宙攻。如上是對transport層的client的分析,對于transport來說沒有任何的業(yè)務(wù)含義在里面介褥,所以接下來我們繼續(xù)的分析exchange層的client座掘。
整個(gè)exchange層的client的繼承關(guān)系如下
前面我們分析過,Client與Channel是一一對應(yīng)的柔滔,所以dubbo抽象出了個(gè)接口ExchangeClient溢陪,源碼如下
public interface ExchangeClient extends Client, ExchangeChannel {
}
而例外一個(gè)實(shí)現(xiàn)類HeaderExchangeChannel是在exchange層具有業(yè)務(wù)語義的Channel,我們看下其源碼實(shí)現(xiàn)。
final class HeaderExchangeChannel implements ExchangeChannel {
private static final Logger logger = LoggerFactory.getLogger(HeaderExchangeChannel.class);
private static final String CHANNEL_KEY = HeaderExchangeChannel.class.getName() + ".CHANNEL";
//內(nèi)部封裝的channel睛廊,我們可以看到這是個(gè)裝飾模式形真,給沒有業(yè)務(wù)語義的channel
//添加業(yè)務(wù)語義
private final Channel channel;
private volatile boolean closed = false;
HeaderExchangeChannel(Channel channel) {
if (channel == null) {
throw new IllegalArgumentException("channel == null");
}
this.channel = channel;
}
//
static HeaderExchangeChannel getOrAddChannel(Channel ch) {
if (ch == null) {
return null;
}
HeaderExchangeChannel ret = (HeaderExchangeChannel) ch.getAttribute(CHANNEL_KEY);
if (ret == null) {
ret = new HeaderExchangeChannel(ch);
if (ch.isConnected()) {
ch.setAttribute(CHANNEL_KEY, ret);
}
}
return ret;
}
static void removeChannelIfDisconnected(Channel ch) {
if (ch != null && !ch.isConnected()) {
ch.removeAttribute(CHANNEL_KEY);
}
}
@Override
public void send(Object message) throws RemotingException {
send(message, getUrl().getParameter(Constants.SENT_KEY, false));
}
@Override
public void send(Object message, boolean sent) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The channel " + this + " is closed!");
}
//如果message是Request 或者 Response 或者String(兼容telnet)直接的發(fā)送
if (message instanceof Request
|| message instanceof Response
|| message instanceof String) {
channel.send(message, sent);
} else {
//不然將其封裝成Request進(jìn)行發(fā)送
Request request = new Request();
request.setVersion(Version.getProtocolVersion());
//單向發(fā)送,不關(guān)注返回結(jié)果超全,即時(shí)的返回即可
request.setTwoWay(false);
request.setData(message);
channel.send(request, sent);
}
}
//發(fā)送咆霜,并拿到返回結(jié)果,這里就有了response的信息了
@Override
public ResponseFuture request(Object request) throws RemotingException {
return request(request, channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));
}
@Override
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
//需要返回結(jié)果嘶朱,會(huì)進(jìn)行超時(shí)等待返回結(jié)果
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
@Override
public boolean isClosed() {
return closed;
}
@Override
public void close() {
try {
channel.close();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
// graceful close
@Override
public void close(int timeout) {
if (closed) {
return;
}
closed = true;
if (timeout > 0) {
long start = System.currentTimeMillis();
while (DefaultFuture.hasFuture(channel)
&& System.currentTimeMillis() - start < timeout) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
logger.warn(e.getMessage(), e);
}
}
}
close();
}
@Override
public void startClose() {
channel.startClose();
}
@Override
public InetSocketAddress getLocalAddress() {
return channel.getLocalAddress();
}
@Override
public InetSocketAddress getRemoteAddress() {
return channel.getRemoteAddress();
}
@Override
public URL getUrl() {
return channel.getUrl();
}
@Override
public boolean isConnected() {
return channel.isConnected();
}
@Override
public ChannelHandler getChannelHandler() {
return channel.getChannelHandler();
}
@Override
public ExchangeHandler getExchangeHandler() {
return (ExchangeHandler) channel.getChannelHandler();
}
@Override
public Object getAttribute(String key) {
return channel.getAttribute(key);
}
@Override
public void setAttribute(String key, Object value) {
channel.setAttribute(key, value);
}
@Override
public void removeAttribute(String key) {
channel.removeAttribute(key);
}
@Override
public boolean hasAttribute(String key) {
return channel.hasAttribute(key);
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((channel == null) ? 0 : channel.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null) return false;
if (getClass() != obj.getClass()) return false;
HeaderExchangeChannel other = (HeaderExchangeChannel) obj;
if (channel == null) {
if (other.channel != null) return false;
} else if (!channel.equals(other.channel)) return false;
return true;
}
@Override
public String toString() {
return channel.toString();
}
}
如上我們可以看到HeaderExchangeChannel已經(jīng)有了Request和Response的業(yè)務(wù)語義了蛾坯。
即時(shí)是在exchange層,Channel也是底層的實(shí)現(xiàn)疏遏,我們業(yè)務(wù)需要關(guān)心的是Client脉课。
我們看下ExchangeClient的幾個(gè)實(shí)現(xiàn)。
ReferenceCountExchangeClient略過财异,只是簡單的記錄對某個(gè)ExchangeClient的引用倘零。
HeaderExchangeClient底層也是使用了ExchangeChannel但是擁有了heartbeat的能力。
LazyConnectExchangeClient 只有在真正的發(fā)起連接的時(shí)候宝当,才初始化客戶端视事。
這3個(gè)Client其實(shí)都是針對Channel的封裝,使用很簡單庆揩,我們就不說了俐东。
最后跌穗,我們看下exchange與transport層是如何關(guān)聯(lián)起來的,其實(shí)在LazyConnectExchangeClient已經(jīng)有了虏辫,其有個(gè)initClient方法蚌吸,我們看下
private void initClient() throws RemotingException {
if (client != null)
return;
if (logger.isInfoEnabled()) {
logger.info("Lazy connect to " + url);
}
connectLock.lock();
try {
if (client != null)
return;
//這一句是重點(diǎn)
this.client = Exchangers.connect(url, requestHandler);
} finally {
connectLock.unlock();
}
}
我們再看Exchangers里面的connect方法,如下
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).connect(url, handler);
//
public static Exchanger getExchanger(String type) {
return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}
Exchanger其實(shí)就是對exchange層的最高級(jí)的封裝砌庄,如下
public interface Exchanger {
//生成服務(wù)端的ExchangeServer
@Adaptive({Constants.EXCHANGER_KEY})
ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;
//生成客戶端的ExchangeClient
@Adaptive({Constants.EXCHANGER_KEY})
ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;
}
其具體實(shí)現(xiàn)是HeaderExchanger
如下
public class HeaderExchanger implements Exchanger {
public static final String NAME = "header";
@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
//返回的就是我們前面講的HeaderExchangeClient
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
}
照這樣分析羹唠,ExchangeClient應(yīng)該是封裝了一個(gè)Transporter層的client,果不其然娄昆,
我們看
Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler)))
的具體返回佩微。
Transporters是對Transporter層的最高級(jí)封裝,如下
public class Transporters {
static {
// check duplicate jar package 防癡呆設(shè)計(jì)萌焰,防止重復(fù)的jar沖突
Version.checkDuplicate(Transporters.class);
Version.checkDuplicate(RemotingException.class);
}
private Transporters() {
}
public static Client connect(String url, ChannelHandler... handler) throws RemotingException {
return connect(URL.valueOf(url), handler);
}
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
ChannelHandler handler;
if (handlers == null || handlers.length == 0) {
handler = new ChannelHandlerAdapter();
} else if (handlers.length == 1) {
handler = handlers[0];
} else {
//
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().connect(url, handler);
}
public static Transporter getTransporter() {
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
}
而getTransporter方法哺眯,默認(rèn)返回的就是我們前面分析的NettyClient,這樣整個(gè)就串起來扒俯。