lettuce-core版本: 5.1.7.RELEASE
在上一篇介紹了Lettuce是如何基于Netty與Redis建立連接的介褥,其中提到了一個(gè)很重要的CommandHandler類,這一期會(huì)介紹CommandHandler是如何在發(fā)送Command到Lettuce中發(fā)揮作用的亿乳,以及Lettuce是如何實(shí)現(xiàn)多線程共享同一個(gè)物理連接的鲫售。
還是先看一下我們的示例代碼共螺,這一篇主要是跟進(jìn)去sync.get方法看看Lettuc是如何發(fā)送get命令到Redis以及是如何讀取Redis的命令的。
/**
* @author xiaobing
* @date 2019/12/20
*/
public class LettuceSimpleUse {
private void testLettuce() throws ExecutionException, InterruptedException {
//構(gòu)建RedisClient對(duì)象情竹,RedisClient包含了Redis的基本配置信息藐不,可以基于RedisClient創(chuàng)建RedisConnection
RedisClient client = RedisClient.create("redis://localhost");
//創(chuàng)建一個(gè)線程安全的StatefulRedisConnection,可以多線程并發(fā)對(duì)該connection操作,底層只有一個(gè)物理連接.
StatefulRedisConnection<String, String> connection = client.connect();
//獲取SyncCommand秦效。Lettuce支持SyncCommand雏蛮、AsyncCommands、ActiveCommand三種command
RedisStringCommands<String, String> sync = connection.sync();
String value = sync.get("key");
System.out.println("get redis value with lettuce sync command, value is :" + value);
//獲取SyncCommand阱州。Lettuce支持SyncCommand挑秉、AsyncCommands、ActiveCommand三種command
RedisAsyncCommands<String, String> async = connection.async();
RedisFuture<String> getFuture = async.get("key");
value = getFuture.get();
System.out.println("get redis value with lettuce async command, value is :" + value);
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
new LettuceSimpleUse().testLettuce();
}
}
在看sync.get方法之前先看一下RedisStringCommands是如何生成生成的苔货,從下面的代碼可以看到RedisStringCommands其實(shí)是對(duì)RedisAsyncCommands<String, String>方法調(diào)用的同步阻塞版本犀概。
//創(chuàng)建一個(gè)sync版本的RedisCommand
protected RedisCommands<K, V> newRedisSyncCommandsImpl() {
//async()方法返回的就是該Connection對(duì)應(yīng)的RedisAsyncCommand
return syncHandler(async(), RedisCommands.class, RedisClusterCommands.class);
}
//返回一個(gè)動(dòng)態(tài)代理類立哑,代理類的實(shí)現(xiàn)在FutureSyncInvocationHandler類中
protected <T> T syncHandler(Object asyncApi, Class<?>... interfaces) {
FutureSyncInvocationHandler h = new FutureSyncInvocationHandler((StatefulConnection<?, ?>) this, asyncApi, interfaces);
//基于FutureSyncInvocationHandler生成動(dòng)態(tài)代理類
return (T) Proxy.newProxyInstance(AbstractRedisClient.class.getClassLoader(), interfaces, h);
}
//異步轉(zhuǎn)同步的關(guān)鍵
class FutureSyncInvocationHandler extends AbstractInvocationHandler {
...
@Override
@SuppressWarnings("unchecked")
protected Object handleInvocation(Object proxy, Method method, Object[] args) throws Throwable {
try {
Method targetMethod = this.translator.get(method);
Object result = targetMethod.invoke(asyncApi, args);
// RedisAsyncCommand返回的大部分對(duì)象類型都是RedisFuture類型的
if (result instanceof RedisFuture<?>) {
RedisFuture<?> command = (RedisFuture<?>) result;
if (isNonTxControlMethod(method.getName()) && isTransactionActive(connection)) {
return null;
}
//獲取配置的超時(shí)時(shí)間
long timeout = getTimeoutNs(command);
//阻塞的等待RedisFuture返回結(jié)果
return LettuceFutures.awaitOrCancel(command, timeout, TimeUnit.NANOSECONDS);
}
return result;
} catch (InvocationTargetException e) {
throw e.getTargetException();
}
}
}
...
所以sync.get操作最終調(diào)用的還是async.get操作,接下來(lái)看async.get是怎么做的姻灶。還是先看一張時(shí)序圖铛绰,心里有一個(gè)概念。
AbstractRedisAsyncCommands
@Override
public RedisFuture<V> get(K key) {
return dispatch(commandBuilder.get(key));
}
commandBuilder.get(key)
這一步驟主要是根據(jù)用戶的輸入?yún)?shù)key产喉、命令類型get捂掰、序列化方式來(lái)生成一個(gè)command對(duì)象。而這個(gè)command對(duì)象會(huì)按照Redis的協(xié)議格式把命令序列化成字符串曾沈。
Command<K, V, V> get(K key) {
notNullKey(key);
//Valueoutput基于序列化
return createCommand(GET, new ValueOutput<>(codec), key);
}
protected <T> Command<K, V, T> createCommand(CommandType type, CommandOutput<K, V, T> output, K key) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key);
return createCommand(type, output, args);
}
protected <T> Command<K, V, T> createCommand(CommandType type, CommandOutput<K, V, T> output, CommandArgs<K, V> args) {
return new Command<K, V, T>(type, output, args);
}
AbstractRedisAsyncCommands.dispatch
public <T> AsyncCommand<K, V, T> dispatch(RedisCommand<K, V, T> cmd) {
//用AsyncCommand對(duì)RedisCommand做一個(gè)包裝處理这嚣,這個(gè)AsyncCommand實(shí)現(xiàn)了RedisFuture接口,最后返回給調(diào)用方的就是這個(gè)對(duì)象晦譬。當(dāng)Lettuce收到Redis的返回結(jié)果時(shí)會(huì)調(diào)用AsyncCommand的complete方法疤苹,異步的方式返回?cái)?shù)據(jù)。
AsyncCommand<K, V, T> asyncCommand = new AsyncCommand<>(cmd);
//調(diào)用connection的dispatch方法把Command發(fā)送給Redis敛腌,這個(gè)connection就是上一篇中說(shuō)的那個(gè)StatefulRedisConnectionImpl
RedisCommand<K, V, T> dispatched = connection.dispatch(asyncCommand);
if (dispatched instanceof AsyncCommand) {
return (AsyncCommand<K, V, T>) dispatched;
}
return asyncCommand;
}
StatefulRedisConnectionImpl.dispatch
@Override
public <T> RedisCommand<K, V, T> dispatch(RedisCommand<K, V, T> command) {
//對(duì)command做預(yù)處理卧土,當(dāng)前主要是根據(jù)不同的命令配置一些異步處理,如:auth命令之后成功之后把password寫入到相應(yīng)變量中像樊,select db操作成功之后把db值寫入到相應(yīng)變量中等等尤莺。
RedisCommand<K, V, T> toSend = preProcessCommand(command);
try {
//真正的dispatch是在父類實(shí)現(xiàn)的
return super.dispatch(toSend);
} finally {
if (command.getType().name().equals(MULTI.name())) {
multi = (multi == null ? new MultiOutput<>(codec) : multi);
}
}
}
//父類RedisChannelHandler的dispatch方法
protected <T> RedisCommand<K, V, T> dispatch(RedisCommand<K, V, T> cmd) {
if (debugEnabled) {
logger.debug("dispatching command {}", cmd);
}
//tracingEnable的代碼先不用看
if (tracingEnabled) {
RedisCommand<K, V, T> commandToSend = cmd;
TraceContextProvider provider = CommandWrapper.unwrap(cmd, TraceContextProvider.class);
if (provider == null) {
commandToSend = new TracedCommand<>(cmd, clientResources.tracing()
.initialTraceContextProvider().getTraceContext());
}
return channelWriter.write(commandToSend);
}
//其實(shí)就是直接調(diào)用channelWriter.write方法,而這個(gè)channelWriter就是上一節(jié)說(shuō)的那個(gè)屏蔽底層channel實(shí)現(xiàn)的DefaultEndpoint類
return channelWriter.write(cmd);
}
DefaultEndpoint.write
@Override
public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {
LettuceAssert.notNull(command, "Command must not be null");
try {
//sharedLock是Lettuce自己實(shí)現(xiàn)的一個(gè)共享排他鎖生棍。incrementWriters相當(dāng)于獲取一個(gè)共享鎖颤霎,當(dāng)channel狀態(tài)發(fā)生變化的時(shí)候,如斷開連接時(shí)會(huì)獲取排他鎖執(zhí)行一些清理操作涂滴。
sharedLock.incrementWriters();
// validateWrite是驗(yàn)證當(dāng)前操作是否可以執(zhí)行友酱,Lettuce內(nèi)部維護(hù)了一個(gè)保存已經(jīng)發(fā)送但是還沒有收到Redis消息的Command的stack,可以配置這個(gè)stack的長(zhǎng)度柔纵,防止Redis不可用時(shí)stack太長(zhǎng)導(dǎo)致內(nèi)存溢出缔杉。如果這個(gè)stack已經(jīng)滿了,validateWrite會(huì)拋出異常
validateWrite(1);
//autoFlushCommands默認(rèn)為true搁料,即每執(zhí)行一個(gè)Redis命令就執(zhí)行Flush操作發(fā)送給Redis或详,如果設(shè)置為false,則需要手動(dòng)flush郭计。由于flush操作相對(duì)較重霸琴,在某些場(chǎng)景下需要繼續(xù)提升Lettuce的吞吐量可以考慮設(shè)置為false。
if (autoFlushCommands) {
if (isConnected()) {
//寫入channel并執(zhí)行flush操作昭伸,核心在這個(gè)方法的實(shí)現(xiàn)中
writeToChannelAndFlush(command);
} else {
// 如果當(dāng)前channel連接已經(jīng)斷開就先放入Buffer中梧乘,直接返回AsyncCommand,重連之后會(huì)把Buffer中的Command再次嘗試通過(guò)channel發(fā)送到Redis中
writeToDisconnectedBuffer(command);
}
} else {
writeToBuffer(command);
}
} finally {
//釋放共享鎖
sharedLock.decrementWriters();
if (debugEnabled) {
logger.debug("{} write() done", logPrefix());
}
}
return command;
}
DefaultEndpoint.writeToChannelAndFlush
private void writeToChannelAndFlush(RedisCommand<?, ?, ?> command) {
//queueSize字段做cas +1操作
QUEUE_SIZE.incrementAndGet(this);
ChannelFuture channelFuture = channelWriteAndFlush(command);
//Lettuce的可靠性:保證最多一次庐杨。由于Lettuce的保證是基于內(nèi)存的宋下,所以并不可靠(系統(tǒng)crash時(shí)內(nèi)存數(shù)據(jù)會(huì)丟失)
if (reliability == Reliability.AT_MOST_ONCE) {
// cancel on exceptions and remove from queue, because there is no housekeeping
channelFuture.addListener(AtMostOnceWriteListener.newInstance(this, command));
}
//Lettuce的可靠性:保證最少一次嗡善。由于Lettuce的保證是基于內(nèi)存的,所以并不可靠(系統(tǒng)crash時(shí)內(nèi)存數(shù)據(jù)會(huì)丟失)
if (reliability == Reliability.AT_LEAST_ONCE) {
// commands are ok to stay within the queue, reconnect will retrigger them
channelFuture.addListener(RetryListener.newInstance(this, command));
}
}
//可以看到最終還是調(diào)用了channle的writeAndFlush操作学歧,這個(gè)Channel就是netty中的NioSocketChannel
private ChannelFuture channelWriteAndFlush(RedisCommand<?, ?, ?> command) {
if (debugEnabled) {
logger.debug("{} write() writeAndFlush command {}", logPrefix(), command);
}
return channel.writeAndFlush(command);
}
到這里其實(shí)就牽扯到Netty的Channel、EventLoop相關(guān)概念了各吨,簡(jiǎn)單的說(shuō)channel會(huì)把需要write的對(duì)象放入Channel對(duì)應(yīng)的EventLoop的隊(duì)列中就返回了枝笨,EventLoop是一個(gè)SingleThreadEventExector,它會(huì)回調(diào)Bootstrap時(shí)配置的CommandHandler的write方法
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (debugEnabled) {
logger.debug("{} write(ctx, {}, promise)", logPrefix(), msg);
}
if (msg instanceof RedisCommand) {
//如果是單個(gè)的RedisCommand就直接調(diào)用writeSingleCommand返回
writeSingleCommand(ctx, (RedisCommand<?, ?, ?>) msg, promise);
return;
}
if (msg instanceof List) {
List<RedisCommand<?, ?, ?>> batch = (List<RedisCommand<?, ?, ?>>) msg;
if (batch.size() == 1) {
writeSingleCommand(ctx, batch.get(0), promise);
return;
}
//批量寫操作揭蜒,暫不關(guān)心
writeBatch(ctx, batch, promise);
return;
}
if (msg instanceof Collection) {
writeBatch(ctx, (Collection<RedisCommand<?, ?, ?>>) msg, promise);
}
}
writeSingleCommand 核心在這里
Lettuce使用單一連接支持多線程并發(fā)向Redis發(fā)送Command横浑,那Lettuce是怎么把請(qǐng)求Command與Redis返回的結(jié)果對(duì)應(yīng)起來(lái)的呢,秘密就在這里屉更。
private void writeSingleCommand(ChannelHandlerContext ctx, RedisCommand<?, ?, ?> command, ChannelPromise promise)
{
if (!isWriteable(command)) {
promise.trySuccess();
return;
}
//把當(dāng)前command放入一個(gè)特定的棧中徙融,這一步是關(guān)鍵
addToStack(command, promise);
// Trace操作,暫不關(guān)心
if (tracingEnabled && command instanceof CompleteableCommand) {
...
}
//調(diào)用ChannelHandlerContext把命令真正發(fā)送給Redis瑰谜,當(dāng)然在發(fā)送給Redis之前會(huì)由CommandEncoder類對(duì)RedisCommand進(jìn)行編碼后寫入ByteBuf
ctx.write(command, promise);
private void addToStack(RedisCommand<?, ?, ?> command, ChannelPromise promise) {
try {
//再次驗(yàn)證隊(duì)列是否滿了欺冀,如果滿了就拋出異常
validateWrite(1);
//command.getOutput() == null意味這個(gè)這個(gè)Command不需要Redis返回影響。一般不會(huì)走這個(gè)分支
if (command.getOutput() == null) {
// fire&forget commands are excluded from metrics
complete(command);
}
//這個(gè)應(yīng)該是用來(lái)做metrics統(tǒng)計(jì)用的萨脑,暫時(shí)先不考慮
RedisCommand<?, ?, ?> redisCommand = potentiallyWrapLatencyCommand(command);
//無(wú)論promise是什么類型的隐轩,最終都會(huì)把command放入到stack中,stack是一個(gè)基于數(shù)組實(shí)現(xiàn)的雙向隊(duì)列
if (promise.isVoid()) {
//如果promise不是Future類型的就直接把當(dāng)前command放入到stack
stack.add(redisCommand);
} else {
//如果promise是Future類型的就等f(wàn)uture完成后把當(dāng)前command放入到stack中渤早,當(dāng)前場(chǎng)景下就是走的這個(gè)分支
promise.addListener(AddToStack.newInstance(stack, redisCommand));
}
} catch (Exception e) {
command.completeExceptionally(e);
throw e;
}
}
}
那么Lettuce收到Redis的回復(fù)消息之后是怎么通知RedisCommand职车,并且把結(jié)果與RedisCommand對(duì)應(yīng)上的呢。Netty在收到Redis服務(wù)端返回的消息之后就會(huì)回調(diào)CommandHandler的channelRead方法
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf input = (ByteBuf) msg;
...
try {
...
//重點(diǎn)在這里
decode(ctx, buffer);
} finally {
input.release();
}
}
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException {
//如果stack為空鹊杖,則直接返回悴灵,這個(gè)時(shí)候一般意味著返回的結(jié)果找到對(duì)應(yīng)的RedisCommand了
if (pristine && stack.isEmpty() && buffer.isReadable()) {
...
return;
}
while (canDecode(buffer)) {
//重點(diǎn)來(lái)了。從stack的頭上取第一個(gè)RedisCommand
RedisCommand<?, ?, ?> command = stack.peek();
if (debugEnabled) {
logger.debug("{} Stack contains: {} commands", logPrefix(), stack.size());
}
pristine = false;
try {
//直接把返回的結(jié)果buffer給了stack頭上的第一個(gè)RedisCommand骂蓖。
//decode操作實(shí)際上拿到RedisCommand的commandoutput對(duì)象對(duì)Redis的返回結(jié)果進(jìn)行反序列化的积瞒。
if (!decode(ctx, buffer, command)) {
return;
}
} catch (Exception e) {
ctx.close();
throw e;
}
if (isProtectedMode(command)) {
onProtectedMode(command.getOutput().getError());
} else {
if (canComplete(command)) {
stack.poll();
try {
complete(command);
} catch (Exception e) {
logger.warn("{} Unexpected exception during request: {}", logPrefix, e.toString(), e);
}
}
}
afterDecode(ctx, command);
}
if (buffer.refCnt() != 0) {
buffer.discardReadBytes();
}
}
從上面的代碼可以看出來(lái),當(dāng)Lettuce收到Redis的回復(fù)消息時(shí)就從stack的頭上取第一個(gè)RedisCommand涯竟,這個(gè)RedisCommand就是與該Redis返回結(jié)果對(duì)應(yīng)的RedisCommand赡鲜。為什么這樣就能對(duì)應(yīng)上呢,是因?yàn)長(zhǎng)ettuce與Redis之間只有一條tcp連接庐船,在Lettuce端放入stack時(shí)是有序的银酬,tcp協(xié)議本身是有序的,redis是單線程處理請(qǐng)求的筐钟,所以Redis返回的消息也是有序的揩瞪。這樣就能保證Redis中返回的消息一定對(duì)應(yīng)著stack中的第一個(gè)RedisCommand。當(dāng)然如果連接斷開又重連了篓冲,這個(gè)肯定就對(duì)應(yīng)不上了李破,Lettuc對(duì)斷線重連也做了特殊處理宠哄,防止對(duì)應(yīng)不上。
Command.encode
public void encode(ByteBuf buf) {
buf.writeByte('*');
//寫入?yún)?shù)的數(shù)量
CommandArgs.IntegerArgument.writeInteger(buf, 1 + (args != null ? args.count() : 0));
//換行
buf.writeBytes(CommandArgs.CRLF);
//寫入命令的類型嗤攻,即get
CommandArgs.BytesArgument.writeBytes(buf, type.getBytes());
if (args != null) {
//調(diào)用Args的編碼毛嫉,這里面就會(huì)使用我們之前配置的codec序列化,當(dāng)前使用的是String.UTF8
args.encode(buf);
}
}