之前的幾個(gè)章節(jié)都在講解Dubbo的種種流程性的邏輯丑掺,首先講到了服務(wù)啟動(dòng)和服務(wù)調(diào)用猪钮,然后又講到了服務(wù)治理的一些內(nèi)容品山。作為一個(gè)成熟的RPC框架,這些都是必要的內(nèi)容烤低,但是有一點(diǎn)往往是容易被人忽略的肘交,那就是優(yōu)雅停機(jī)。今天我們就一起來(lái)看一下Dubbo對(duì)于優(yōu)雅停機(jī)的一些支持性動(dòng)作扑馁。
優(yōu)雅停機(jī)主要用在服務(wù)版本迭代上線的過(guò)程中涯呻,比如我們發(fā)布了新的服務(wù)版本,經(jīng)常性是直接替換線上正在跑的服務(wù)腻要,這個(gè)時(shí)候如果在服務(wù)切換的過(guò)程中老的服務(wù)沒(méi)有正常關(guān)閉的話复罐,容易造成內(nèi)存清理問(wèn)題,所以優(yōu)雅停機(jī)也是重要的一環(huán)雄家。
Dubbo的優(yōu)雅停機(jī)是依賴于JDK的ShutdownHook函數(shù)效诅,下面先了解一下JDK的ShutdownHook函數(shù)會(huì)在哪些時(shí)候生效:
- 程序正常退出
- 程序中使用System.exit()退出JVM
- 系統(tǒng)發(fā)生OutofMemory異常
- 使用kill pid干掉JVM進(jìn)程的時(shí)候(kill -9時(shí)候是不能觸發(fā)ShutdownHook生效的)
Dubbo優(yōu)雅停機(jī)代碼解讀
dubbo的優(yōu)雅停機(jī)代碼入口就在于AbstractConfig的靜態(tài)代碼塊中:
static {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
if (logger.isInfoEnabled()) {
logger.info("Run shutdown hook now.");
}
ProtocolConfig.destroyAll();
}
}, "DubboShutdownHook"));
}
//在停機(jī)的時(shí)候往往要注意的是:此時(shí)服務(wù)器很大可能性既是consumer又是provider,所以要在兩方面都進(jìn)行一定的處理
public static void destroyAll() {
// 關(guān)閉所有注冊(cè)中心咳短,清空注冊(cè)中心的內(nèi)容填帽。
// 關(guān)閉zk連接,這時(shí)候consumer端從zk上已經(jīng)找不到關(guān)閉的服務(wù)了
// 取消所有的注冊(cè)和訂閱關(guān)系咙好,作為consumer則不再監(jiān)聽數(shù)據(jù)變更篡腌,作為provider則簡(jiǎn)單斷開于zk的連接
AbstractRegistryFactory.destroyAll();
ExtensionLoader<Protocol> loader = ExtensionLoader.getExtensionLoader(Protocol.class);
for (String protocolName : loader.getLoadedExtensions()) {
try {
Protocol protocol = loader.getLoadedExtension(protocolName);
if (protocol != null) {
//關(guān)閉Server
protocol.destroy();
}
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
/**
* 關(guān)閉所有已創(chuàng)建注冊(cè)中心
*/
public static void destroyAll() {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Close all registries " + getRegistries());
}
// 鎖定注冊(cè)中心關(guān)閉過(guò)程,防止一個(gè)注冊(cè)中心被多次關(guān)閉
LOCK.lock();
try {
for (Registry registry : getRegistries()) {
try {
//以zk注冊(cè)中心為例講解勾效,zkRegistry->FailbackRegistry->AbstractRegistry
registry.destroy();
} catch (Throwable e) {
LOGGER.error(e.getMessage(), e);
}
}
REGISTRIES.clear();
} finally {
// 釋放鎖
LOCK.unlock();
}
}
//ZookeeperRegistry.destory()
//可以看到的是這一層關(guān)閉的核心就是關(guān)閉zkClient
public void destroy() {
super.destroy();
try {
zkClient.close();
} catch (Exception e) {
logger.warn("Failed to close zookeeper client " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
//FailbackRegistry.destory()
//首先要明白FailbackRegistry的核心就在于失敗重試嘹悼,所以這一層的關(guān)閉只要關(guān)閉retryFuture就可以
public void destroy() {
super.destroy();
try {
retryFuture.cancel(true);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
//AbstractRegistry.destory()
//處理通用的destory邏輯
public void destroy() {
if (logger.isInfoEnabled()){
logger.info("Destroy registry:" + getUrl());
}
//作為provider,取消所有的服務(wù)注冊(cè)
Set<URL> destroyRegistered = new HashSet<URL>(getRegistered());
if (! destroyRegistered.isEmpty()) {
for (URL url : new HashSet<URL>(getRegistered())) {
if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
try {
//從已注冊(cè)的列表中移除該URL
unregister(url);
if (logger.isInfoEnabled()) {
logger.info("Destroy unregister url " + url);
}
} catch (Throwable t) {
logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
}
}
}
}
//作為consumer层宫,取消所有的訂閱關(guān)系
Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
if (! destroySubscribed.isEmpty()) {
for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
URL url = entry.getKey();
for (NotifyListener listener : entry.getValue()) {
try {
//將listener從訂閱者對(duì)應(yīng)的listener集合中移除(監(jiān)聽的服務(wù)變更將不再進(jìn)行通知)
unsubscribe(url, listener);
if (logger.isInfoEnabled()) {
logger.info("Destroy unsubscribe url " + url);
}
} catch (Throwable t) {
logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " +t.getMessage(), t);
}
}
}
}
}
下面總結(jié)一下AbstractRegistryFactory.destroyAll()做的所有事情:
斷開于zk的連接杨伙。 默認(rèn)情況下服務(wù)端的dynamic為true,也就是dubbo自己管理服務(wù)的注冊(cè)萌腿,所以這時(shí)候會(huì)在zk節(jié)點(diǎn)上創(chuàng)建臨時(shí)的URL節(jié)點(diǎn)信息限匣,在客戶端與zk端開之后,zk監(jiān)測(cè)到連接關(guān)閉毁菱,所以客戶端創(chuàng)建的臨時(shí)節(jié)點(diǎn)信息也會(huì)直接移除(zk臨時(shí)節(jié)點(diǎn)的特性)米死。作為provider,這時(shí)候在zk節(jié)點(diǎn)上已經(jīng)沒(méi)有自己的信息了贮庞,所以這時(shí)候consumer理論上已經(jīng)不會(huì)再看到該provider的信息了峦筒,也就是說(shuō)不會(huì)有新的請(qǐng)求在過(guò)來(lái),但是如果集群比較龐大的話窗慎,可能不止有一個(gè)zk節(jié)點(diǎn)物喷,這時(shí)候可能依然會(huì)有請(qǐng)求過(guò)來(lái)卤材。作為consumer,因?yàn)閏onsumer在zk上注冊(cè)的為持久節(jié)點(diǎn)峦失,所以在連接斷開時(shí)候并不會(huì)刪除該節(jié)點(diǎn)扇丛,只是會(huì)移除對(duì)應(yīng)的監(jiān)聽器。但是這里有一個(gè)容易忽略的問(wèn)題就是宠进,服務(wù)端注冊(cè)的節(jié)點(diǎn)在zk上并不會(huì)刪除晕拆,那么下次當(dāng)consumer再次subscribe的時(shí)候依然后創(chuàng)建該節(jié)點(diǎn),這時(shí)候因?yàn)樵摴?jié)點(diǎn)在上次停機(jī)的時(shí)候已經(jīng)創(chuàng)建過(guò)了材蹬,重新創(chuàng)建就會(huì)拋異常了,這要怎么處理吝镣?哈哈
堤器,Dubbo的做法是直接捕獲NodeExistsException然后什么都不做,如果出現(xiàn)該異常了默認(rèn)就是創(chuàng)建成功末贾,只不過(guò)會(huì)再次重新注冊(cè)監(jiān)聽器而已闸溃。
接下來(lái)就看一下Protocol.destory(),因?yàn)镻rotocol的實(shí)現(xiàn)類中主要分為兩類拱撵,一類是RegistryProtoocl辉川,另外一類是可擴(kuò)展的Protocol(DubboProtocol)。
//RegistryProtocol.destory()
public void destroy() {
List<Exporter<?>> exporters = new ArrayList<Exporter<?>>(bounds.values());
for(Exporter<?> exporter :exporters){
exporter.unexport();
}
bounds.clear();
}
//DubboExporter.destory()
//主要是將exporter從對(duì)應(yīng)的exporterMap.remove中移除
public void unexport() {
super.unexport();
exporterMap.remove(key);
}
//AbstractExporter.destory()
//將exporter中引用的invoker進(jìn)行destroy調(diào)用拴测,因?yàn)镮nvoker有包裝類乓旗,所以在ExtensionLoader加載的時(shí)候?qū)嶋H上會(huì)加上包裝。
public void unexport() {
if (unexported) {
return ;
}
unexported = true;
getInvoker().destroy();
}
//DubboInvoker
public void destroy() {
//防止client被關(guān)閉多次.在connect per jvm的情況下集索,client.close方法會(huì)調(diào)用計(jì)數(shù)器-1屿愚,當(dāng)計(jì)數(shù)器小于等于0的情況下,才真正關(guān)閉
if (super.isDestroyed()){
return ;
} else {
//dubbo check ,避免多次關(guān)閉
destroyLock.lock();
try{
if (super.isDestroyed()){
return ;
}
super.destroy();
if (invokers != null){
invokers.remove(this);
}
for (ExchangeClient client : clients) {
try {
client.close();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}finally {
destroyLock.unlock();
}
}
}
RegistryProtocol中文翻譯就是注冊(cè)協(xié)議务荆,注冊(cè)協(xié)議只關(guān)心跟注冊(cè)有關(guān)的內(nèi)容妆距,而Exporter和Invoker都是RegistryProtocol下層的內(nèi)容,所以在調(diào)用注冊(cè)協(xié)議關(guān)閉服務(wù)的時(shí)候會(huì)講其下的Exporter和Invoker都關(guān)閉掉函匕。
注冊(cè)協(xié)議和服務(wù)協(xié)議的區(qū)別就是注冊(cè)協(xié)議只關(guān)系服務(wù)注冊(cè)的相關(guān)邏輯娱据,而不會(huì)考慮服務(wù)暴露,服務(wù)引用的一些內(nèi)容盅惜,這些內(nèi)容要在DubboProtocol中去處理:
public void destroy() {
//關(guān)停所有的Server中剩,作為provider將不再接收新的請(qǐng)求
for (String key : new ArrayList<String>(serverMap.keySet())) {
//HeaderExchangeServer
ExchangeServer server = serverMap.remove(key);
if (server != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close dubbo server: " + server.getLocalAddress());
}
server.close(getServerShutdownTimeout());
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
//關(guān)停所有的Client,作為consumer將不再發(fā)送新的請(qǐng)求
for (String key : new ArrayList<String>(referenceClientMap.keySet())) {
ExchangeClient client = referenceClientMap.remove(key);
if (client != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress());
}
client.close();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
//對(duì)于幽靈客戶端的處理邏輯暫時(shí)先忽略
stubServiceMethodsMap.clear();
super.destroy();
}
//HeaderExchangeServer.close(timeout)
public void close(final int timeout) {
if (timeout > 0) {
final long max = (long) timeout;
final long start = System.currentTimeMillis();
if (getUrl().getParameter(Constants.CHANNEL_SEND_READONLYEVENT_KEY, false)){
sendChannelReadOnlyEvent();
}
//作為server在關(guān)閉的時(shí)候很有可能仍然有任務(wù)在進(jìn)行中酷窥,這時(shí)候這個(gè)timeout的時(shí)間就是用來(lái)等待相應(yīng)處理結(jié)束的咽安,每隔10ms進(jìn)行一次重試,直到最后超時(shí)
while (HeaderExchangeServer.this.isRunning()
&& System.currentTimeMillis() - start < max) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
logger.warn(e.getMessage(), e);
}
}
}
doClose();
//NettyServer
server.close(timeout);
}
//關(guān)閉處理心跳任務(wù)的定時(shí)器
private void doClose() {
if (closed) {
return;
}
closed = true;
stopHeartbeatTimer();
try {
scheduled.shutdown();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
//AbstractService.close()
//作者的本意就是在這里關(guān)閉掉業(yè)務(wù)線程池蓬推,這里提到的業(yè)務(wù)線程池也就是dubbo處理所有自定義業(yè)務(wù)使用的線程池妆棒,關(guān)閉這個(gè)線程池十分重要,但是老版本的代碼在這里有BUG
public void close(int timeout) {
ExecutorUtil.gracefulShutdown(executor ,timeout);
//close方法就是強(qiáng)制關(guān)閉業(yè)務(wù)線程池,并且關(guān)閉NettyServer中相關(guān)Channel
close();
}
public static void gracefulShutdown(Executor executor, int timeout) {
if (!(executor instanceof ExecutorService) || isShutdown(executor)) {
return;
}
final ExecutorService es = (ExecutorService) executor;
try {
//不再接收新的任務(wù)糕珊,將原來(lái)未執(zhí)行完的任務(wù)執(zhí)行完
es.shutdown();
} catch (SecurityException ex2) {
return ;
} catch (NullPointerException ex2) {
return ;
}
try {//如果到達(dá)timeout時(shí)間之后仍然沒(méi)有關(guān)閉任務(wù)动分,就直接調(diào)用shutdownNow,強(qiáng)制關(guān)閉所有任務(wù)
if(! es.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
es.shutdownNow();
}
} catch (InterruptedException ex) {
es.shutdownNow();
//不要生吞InterruptedException红选,所以在本地調(diào)用中依然將本線程的interrupted置位澜公,以便上層能夠發(fā)現(xiàn)
Thread.currentThread().interrupt();
}
//如果到這里都沒(méi)有關(guān)閉成功的話,就重新開線程關(guān)閉業(yè)務(wù)線程池
if (!isShutdown(es)){
newThreadToCloseExecutor(es);
}
}
上面提到了一個(gè)Bug喇肋,這里簡(jiǎn)單介紹一下:
因?yàn)樵陉P(guān)閉executor的時(shí)候坟乾,作者本意就是這里的executor就是業(yè)務(wù)線程池,但是實(shí)際上這里并不是業(yè)務(wù)線程池蝶防。原因如下:
在初始server的時(shí)候在Abstract中有這么一段代碼:
if (handler instanceof WrappedChannelHandler ){
executor = ((WrappedChannelHandler)handler).getExecutor();
}
在NettyServer中對(duì)于handler包裝的最后結(jié)果導(dǎo)致這個(gè)handler實(shí)際上是MultiMessageHandler甚侣,而MultiMessageHandler跟WrappedChannelHandler沒(méi)有繼承關(guān)閉,所以這里的executor實(shí)際上是null间学,沒(méi)有引用到實(shí)際的業(yè)務(wù)線程池殷费,所以在關(guān)閉的時(shí)候?qū)е聵I(yè)務(wù)線程池沒(méi)有成功關(guān)閉。這個(gè)BUG已經(jīng)在后面的dubbo其他版本中修復(fù)低葫,這里可以查看其中一種修復(fù)方法:解決方案
下面看一下ExchangeClient.destory()
public void close(int timeout) {
doClose();
channel.close(timeout);
}
//HeaderExchangeChannel.clse()
//關(guān)閉心跳處理
private void doClose() {
stopHeartbeatTimer();
}
// graceful close
public void close(int timeout) {
if (closed) {
return;
}
closed = true;
if (timeout > 0) {
//這里作者的本意是看一下客戶端是否有發(fā)出去的請(qǐng)求详羡,但是還沒(méi)有收到相應(yīng)的,然后等到timeout時(shí)間看請(qǐng)求是否返回
//但是因?yàn)镈efaultFuture在發(fā)送請(qǐng)求時(shí)候的key是成員變量channel嘿悬,而不是HeaderExchangeChannel.this实柠,所以這代碼有BUG
long start = System.currentTimeMillis();
while (DefaultFuture.hasFuture(HeaderExchangeChannel.this)
&& System.currentTimeMillis() - start < timeout) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
logger.warn(e.getMessage(), e);
}
}
}
close();
}
public void close() {
try {
channel.close();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
基本上到這里,所有的清理操作都介紹的差不多了鹊漠。撒花~~~