上一篇主要介紹了RedisCommand在Lettuce中是如何流轉(zhuǎn)的,以及Lettuce是如何通過單一連接處理多線程請求的蒸苇。這一篇主要介紹一下Lettuce中是如何實現(xiàn)Redis斷線重連的,在介紹之前先看一張圖回憶一下RedisComman流轉(zhuǎn)過程中牽扯到的幾個類,以及類之間的關(guān)系掂铐。
如上圖所示RedisCommand在Lettuce中經(jīng)歷了如下流程:
- RedisAsyncCommands調(diào)用StatefulRedisConnectionImpl的dispatch
- StatefulRedisConnectionImpl調(diào)用DefaultEndpoint的writeCommand
- 與Redis連接正常辱姨,autoFlush為true是,DefaultEndpoint會直接把Command通過Netty的channel發(fā)送出去
- Netty收到RedisCommand之后镜悉,會在EventLoop中回調(diào)我們的初始化Bootstrap時配置的CommandHandler的write方法醇份,在這個write方法中CommandHandler會把RedisCommand放入stack(雙向隊列)的尾部
- 把RedisCommand序列化之后發(fā)送給Redis
- Netty在收到Redis的response之后會在EventLoop中回調(diào)CommandHandler的channelRead方法拗盒,CommandHandler會在這個方法調(diào)用中從stack的頭部取一個RedisCommand痊臭,基于這個RedisCommand對Redis的response反序列化然后調(diào)用RedisCommand的complete方法鸦致,該RedisCommand的調(diào)用方就會收到通知收到Redis消息了。至此RedisCommand就算結(jié)束了旅程弧蝇。
這個時候可能會有疑問鹃觉?CommandHandler怎么確保Redis返回的消息就一定能與stack雙向隊列的第一個RedisCommand對應(yīng)上的呢沉填,也就是說Redis返回的消息為什么就剛好是第一個RedisCommand請求的結(jié)果呢。
其實上一篇已經(jīng)介紹了,在正常場景下CommandHandler接收RedisCommand的是串行有序的,把RedisCommand通過tcp協(xié)議寫入Redis也是有序的些楣,Redis本身是單線程處理請求,所以Redis內(nèi)部處理以及返回結(jié)果也是有序的1這樣就能保證先進入CommandHandler的RedisCommand一定先收到Redis的響應(yīng)道宅。(這里可以思考一下泞当,如果Redis不是單線程的,比如Dubbo也是單一長連接,但是服務(wù)端是多線程并發(fā)處理請求的,所以對于請求的返回是無序的,用這種stack數(shù)據(jù)結(jié)構(gòu)是否可行呢?)
上面說了正常場景下CommandHandler的stack結(jié)構(gòu)可以保證請求與Redis的返回結(jié)果對應(yīng)上,那如果連接斷開又連接上了崇败,這種順序還能保證嗎岸霹?答案是不能保證,下面就具體看一下Lettuce的斷線重連是如何實現(xiàn)杀捻,以及斷線重連期間都做了什么工作保證RedisCommand能與Redis影響請求對應(yīng)上的墓拜。
Lettuce實現(xiàn)斷線重連的核心類是ConnectionWatchdog,那么ConnectionWatchdog具體是如何被實例化、被應(yīng)用的秸仙,需要回過頭來看下Redis連接的初始化過程孝冒。
- 初始化Netty的Bootstrap時設(shè)置PlainChannelInitializer
- Netty的channel連接初始化時會回調(diào)PlainChannelInitializer的initChannel方法
- 在initChannel方法中會調(diào)用ConnectionBuidler.buildHandlers方法獲取所有的handler放入channel的pipeline中。(Netty對于收到和發(fā)送的所有消息都會挨個調(diào)用pipeline,具體可以參考Netty權(quán)威指南這本書)
- ConnectionBuidler.build方法中會負責(zé)創(chuàng)建ConnectionWatchdog
//RedisClient.initializeChannelAsync0
private void initializeChannelAsync0(ConnectionBuilder connectionBuilder, CompletableFuture<Channel> channelReadyFuture,
SocketAddress redisAddress) {
logger.debug("Connecting to Redis at {}", redisAddress);
Bootstrap redisBootstrap = connectionBuilder.bootstrap();
//創(chuàng)建一個RedisChannelInitializer
RedisChannelInitializer initializer = connectionBuilder.build();
//把initializer賦值給RedisBootstrap,Netty會在Channel初始化的時候回調(diào)該initializer
redisBootstrap.handler(initializer);
...
}
// ConnectionBuidler.build
public RedisChannelInitializer build() {
//創(chuàng)建PlainChannelInitializer對象捶闸,這個地方要注意this::buildHandlers方法税灌,PlainChannelInitializer會在Channel初始化的時候調(diào)用該this::buildHandler方法獲取所有的handler放入Channel的handler pipeline中。
return new PlainChannelInitializer(pingCommandSupplier, this::buildHandlers, clientResources, timeout);
}
// RedisChannelInitializer.initChannel 該方法會在建立連接,Channel初始化的時候被調(diào)用
@Override
protected void initChannel(Channel channel) throws Exception {
...
//調(diào)用ConnectionBuidler.buildHandlers方法獲取所有的handler放入channel的pipeline中痒玩。(對于Netty的pipeline機制可以參考Netty權(quán)威指南這本書)
for (ChannelHandler handler : handlers.get()) {
channel.pipeline().addLast(handler);
}
clientResources.nettyCustomizer().afterChannelInitialized(channel);
}
//ConnectionBuidler.buildHandlers負責(zé)創(chuàng)建Channel所使用的ChannelHandler對象
protected List<ChannelHandler> buildHandlers() {
...
handlers.add(new ChannelGroupListener(channelGroup));
handlers.add(new CommandEncoder());
handlers.add(commandHandlerSupplier.get());
// 判斷如果配置了自動重連就添加ConnectionWatchdog
if (clientOptions.isAutoReconnect()) {
handlers.add(createConnectionWatchdog());
}
handlers.add(new ConnectionEventTrigger(connectionEvents, connection, clientResources.eventBus()));
//這個地方又檢查并添加了一遍坤溃,不太明白再次創(chuàng)建的目的是什么道偷,有知道的朋友嘶窄,歡迎指出漓拾。
if (clientOptions.isAutoReconnect()) {
handlers.add(createConnectionWatchdog());
}
return handlers;
}
上面可以看到ConnectionWatchdog是如何被應(yīng)用到Netty的ChannelHandler中的示血,下面看下ConnectionWatchdog是如何構(gòu)建的麸拄,以及如何自動重連的地淀。
- 基于配置創(chuàng)建ConnectionWatchdog
- ConnectionWatchdog的ChannelActive和ChannelInActive會在Channel建立成功和斷開連接的時候被回調(diào)
- 在ConnectionWatchdog的ChannelInActive方法中會嘗試重連烈疚,斷開連接之后并不是立即重連灯抛,而是根據(jù)一個延時重連的策略來延遲執(zhí)行重連任務(wù)对嚼。
protected ConnectionWatchdog createConnectionWatchdog() {
// 可以看到即使上面被調(diào)用了兩次靡砌,其實對象只有一個。另外因為對于一個StatefulConnectionImpl來說骗炉,ConnectionBuilder是同一個的忆矛,所以即使Channel斷線重連了混稽,ConnectionWatchdog也還是這個對象采驻。
if (connectionWatchdog != null) {
return connectionWatchdog;
}
...
//基于一些配置項構(gòu)建ConnectionWatchdog對象
ConnectionWatchdog watchdog = new ConnectionWatchdog(clientResources.reconnectDelay(), clientOptions, bootstrap, timer,
clientResources.eventExecutorGroup(), socketAddressSupplier, reconnectionListener, connection);
//把watchdog傳給endpoint,前面已經(jīng)說過endpoint是更高級別的抽象匈勋,用來抽象底層channel礼旅,注冊給endpoint是為了讓endpoint在某些場景下直接調(diào)用配置watchdog。
endpoint.registerConnectionWatchdog(watchdog);
//把創(chuàng)建的watchdog賦值給當前的ConnectionBuilder對象
connectionWatchdog = watchdog;
return watchdog;
}
//Channel建立成功之后回調(diào)channelActive颓影,channelActive方法中其實沒做什么實質(zhì)性的工作各淀,主要是把reconnectSchedulerSync設(shè)置為false,相當于釋放鎖
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//reconnectSchedulerSync可以理解是重連定時任務(wù)的鎖诡挂,設(shè)置為false表示鎖是釋放的碎浇。
reconnectSchedulerSync.set(false);
channel = ctx.channel();
reconnectScheduleTimeout = null;
logPrefix = null;
remoteAddress = channel.remoteAddress();
logPrefix = null;
logger.debug("{} channelActive()", logPrefix());
super.channelActive(ctx);
}
//斷開連接的時候channelInactive會被調(diào)用
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
...
//把舊的channl設(shè)置為null,舊的channel就是斷開連接的channel
channel = null;
if (listenOnChannelInactive && !reconnectionHandler.isReconnectSuspended()) {
// 真正的重連邏輯在這里A住EА!城豁!
scheduleReconnect();
} else {
logger.debug("{} Reconnect scheduling disabled", logPrefix(), ctx);
}
super.channelInactive(ctx);
}
//scheduleReconnect苟穆,顧名思義計劃派發(fā)重連,并不是真正的重連
public void scheduleReconnect() {
...
// 通過對reconnectSchedulerSync做cas的方式獲取鎖
if ((channel == null || !channel.isActive()) && reconnectSchedulerSync.compareAndSet(false, true)) {
attempts++;
final int attempt = attempts;
//根據(jù)重試次數(shù)獲取延遲執(zhí)行重連的時間,這個應(yīng)該也好理解雳旅,當連接斷開的時候并不是立即重連的(因為此時重連大概率也是失敻ァ),默認的重連策略是等待X時間再嘗試連接攒盈,這個X是遞增的抵拘,也就是說失敗的次數(shù)越多,下次重試之前間隔的時間越長型豁,當然也有一個上限僵蛛。
int timeout = (int) reconnectDelay.createDelay(attempt).toMillis();
logger.debug("{} Reconnect attempt {}, delay {}ms", logPrefix(), attempt, timeout);
this.reconnectScheduleTimeout = timer.newTimeout(it -> {
...
//通過reconnectWorkers來真正的執(zhí)行重連邏輯,而不是在當前線程中
reconnectWorkers.submit(() -> {
//真正的重連邏輯S洹3湮尽!衣形!
ConnectionWatchdog.this.run(attempt);
return null;
});
}, timeout, TimeUnit.MILLISECONDS);
// Set back to null when ConnectionWatchdog#run runs earlier than reconnectScheduleTimeout's assignment.
if (!reconnectSchedulerSync.get()) {
reconnectScheduleTimeout = null;
}
} else {
logger.debug("{} Skipping scheduleReconnect() because I have an active channel", logPrefix());
}
}
下面看一下ConnectionWatchdog真正執(zhí)行重連的邏輯
//ConnectionWatchdog.run是真正執(zhí)行重連的邏輯驼侠,并且是在其他線程中執(zhí)行的
public void run(int attempt) throws Exception {
//設(shè)置為false,表示釋放reconnectSchedulerSync的鎖
reconnectSchedulerSync.set(false);
...
try {
reconnectionListener.onReconnectAttempt(new ConnectionEvents.Reconnect(attempt));
logger.log(infoLevel, "Reconnecting, last destination was {}", remoteAddress);
//真正的重連邏輯在這里1么@岬纭!纪铺!
CompletableFuture<Channel> future = reconnectionHandler.reconnect();
future.whenComplete((c, t) -> {
//如果連接建立成功了就直接返回
if (c != null && t == null) {
return;
}
...
//如果連接建立失敗相速,就重新嘗試重連
if (!isReconnectSuspended()) {
scheduleReconnect();
}
});
} catch (Exception e) {
logger.log(warnLevel, "Cannot reconnect: {}", e.toString());
}
}
//ReconnectionHandler.reconnect
protected CompletableFuture<Channel> reconnect() {
CompletableFuture<Channel> future = new CompletableFuture<>();
//通過socketAddressSupplier獲取實際Redis地址,獲取到Redis地址之后執(zhí)行真正的重連邏輯
socketAddressSupplier.subscribe(remoteAddress -> {
if (future.isCancelled()) {
return;
}
//真正的重連邏輯
reconnect0(future, remoteAddress);
}, future::completeExceptionally);
return this.currentFuture = future;
}
//ReconnectionHandler.reconnect0
private void reconnect0(CompletableFuture<Channel> result, SocketAddress remoteAddress) {
//其實重連就是調(diào)用bootstrap的connect方法
ChannelFuture connectFuture = bootstrap.connect(remoteAddress);
ChannelPromise initFuture = connectFuture.channel().newPromise();
logger.debug("Reconnecting to Redis at {}", remoteAddress);
result.whenComplete((c, t) -> {
if (t instanceof CancellationException) {
connectFuture.cancel(true);
initFuture.cancel(true);
}
});
initFuture.addListener((ChannelFuture it) -> {
if (it.cause() != null) {
connectFuture.cancel(true);
close(it.channel());
result.completeExceptionally(it.cause());
} else {
result.complete(connectFuture.channel());
}
});
//異常和超時邏輯處理
...
}
所以其實真正重連的實現(xiàn)方法就是調(diào)用bootstrap.connect方法鲜锚,這里可能會有一個疑問:connection方法會的ChannelFuture對象并沒有被使用突诬,前面的文章中提到過DefaultEndpoint抽象了channel的調(diào)用,所以DefaultEndpoint對象中是有對Channel對象的引用的芜繁,那重新連接成功創(chuàng)建的Channel是如何告知DefaultEndpoint的呢旺隙。
其實根源還是PlainChannelInitializer中,PlainChannelInitializer對象是配置到Netty的bootstrap中的骏令,所以當每次該bootstrap對象創(chuàng)建一個channel的時候都會調(diào)用PlainChannelInitializer的initchannel方法蔬捷,從而把ConnectionBuilder中得handlers注冊到channel中。這個handlers中有一個CommandHandler(雖然每次創(chuàng)建新的channel都會創(chuàng)建新的CommandHandler榔袋,但是所有的CommandHandler對象引用的DefaultEndpoint是同一個)周拐。實現(xiàn)如下:
//CommandHandler.
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
...
//調(diào)用Endpoint的notifyChannelActive方法告知Channel變了
endpoint.notifyChannelActive(ctx.channel());
super.channelActive(ctx);
...
}
//DefaultEndpoint.notifyChannelActive
@Override
public void notifyChannelActive(Channel channel) {
this.logPrefix = null;
//把自己的channel指向為新的channel對象
this.channel = channel;
this.connectionError = null;
...
//讓connectionWatchdog重新監(jiān)聽斷開連接事件
if (connectionWatchdog != null) {
connectionWatchdog.arm();
}
//獲取悲觀鎖
sharedLock.doExclusive(() -> {
try {
// Move queued commands to buffer before issuing any commands because of connection activation.
// That's necessary to prepend queued commands first as some commands might get into the queue
// after the connection was disconnected. They need to be prepended to the command buffer
...
//調(diào)用StatefulConnectionImpl的activated方法,這個里面也做很多總要的事情;硕摇M姿凇!吏够!
connectionFacade.activated();
//把斷開連接時緩存的Command重新通過Channel發(fā)送出去
flushCommands(disconnectedBuffer);
} catch (Exception e) {
...
}
});
}
可能還會有一個疑問勾给,就是我們在第一次創(chuàng)建連接的時候滩报,在連接成功之后有判斷是否有密碼,有密碼就發(fā)送AUTH命令播急,有選擇DB就發(fā)送Select命令等脓钾,在重連的時候卻并沒有看到這個操作,其實就是在上面的代碼connectionFacade.activated()的實現(xiàn)中桩警。
//StatefulRedisConnectionImpl.activated惭笑。
public void activated() {
super.activated();
// do not block in here, since the channel flow will be interrupted.
//如果密碼不為空就設(shè)置密碼
if (password != null) {
async.authAsync(password);
}
//如果db!=0就設(shè)置db
if (db != 0) {
async.selectAsync(db);
}
if (clientName != null) {
setClientName(clientName);
}
if (readOnly) {
async.readOnly();
}
}
從上面可以看到當Channel重新連接成功時StatefulRedisConnectionImpl的activated方法會被調(diào)用生真,在該方法中會檢測密碼不為空就調(diào)用auth命令,那么StatefulRedisConnectionImpl是如何知道密碼的呢捺宗。原因是在preProcessCommand方法中:
//StatefulRedisConnectionImpl.preProcessCommand柱蟀,該方法會在每次dispatchCommand的時候被調(diào)用,Lettuce在第一次建立連接的時候會調(diào)用AUTH和SELECT方法蚜厉,在調(diào)用這些方法的時候StatefulRedisConnectionImpl就會記住password和db长已。從而在斷線重連的時候會自動執(zhí)行AUTH和SELECT方法。
protected <T> RedisCommand<K, V, T> preProcessCommand(RedisCommand<K, V, T> command) {
RedisCommand<K, V, T> local = command;
//如果該Command是AUTH昼牛,就等該Command返回成功之后記錄下password
if (local.getType().name().equals(AUTH.name())) {
local = attachOnComplete(local, status -> {
if ("OK".equals(status)) {
char[] password = CommandArgsAccessor.getFirstCharArray(command.getArgs());
if (password != null) {
this.password = password;
} else {
String stringPassword = CommandArgsAccessor.getFirstString(command.getArgs());
if (stringPassword != null) {
this.password = stringPassword.toCharArray();
}
}
}
});
}
//如果該Command是SELECT术瓮,就等該Command返回成功之后記錄下db
if (local.getType().name().equals(SELECT.name())) {
local = attachOnComplete(local, status -> {
if ("OK".equals(status)) {
Long db = CommandArgsAccessor.getFirstInteger(command.getArgs());
if (db != null) {
this.db = db.intValue();
}
}
});
}
//如果該Command是READONLY,就等該Command返回成功之后記錄下readonly為true
if (local.getType().name().equals(READONLY.name())) {
local = attachOnComplete(local, status -> {
if ("OK".equals(status)) {
this.readOnly = true;
}
});
}
...
return local;
}
至此Lettuce的重連邏輯完成了贰健,因為第一次創(chuàng)建連接的時候Bootstrap對象已經(jīng)被配置好了胞四,所以在斷線重連的時候邏輯簡單了很多,而且很多AUTH伶椿、SELECT等命令被放在了Channel的pipeline相對應(yīng)類中去實現(xiàn)了辜伟。