lettuce-core版本: 5.1.7.RELEASE
先看一下Lettuce的基本使用方法嗤详,使用Lettuce大概分為如下幾步:
- 基于Redis連接信息創(chuàng)建RedisClient
- 基于RedisClient創(chuàng)建StatefulRedisConnection
- 從Connection中獲取Command浆竭,基于Command執(zhí)行Redis命令操作。
/**
* @author xiaobing
* @date 2019/12/20
*/
public class LettuceSimpleUse {
private void testLettuce() throws ExecutionException, InterruptedException {
//構(gòu)建RedisClient對(duì)象,RedisClient包含了Redis的基本配置信息,可以基于RedisClient創(chuàng)建RedisConnection
RedisClient client = RedisClient.create("redis://localhost");
//創(chuàng)建一個(gè)線程安全的StatefulRedisConnection,可以多線程并發(fā)對(duì)該connection操作,底層只有一個(gè)物理連接.
StatefulRedisConnection<String, String> connection = client.connect();
//獲取SyncCommand括荡。Lettuce支持SyncCommand、AsyncCommands溉旋、ActiveCommand三種command
RedisStringCommands<String, String> sync = connection.sync();
String value = sync.get("key");
System.out.println("get redis value with lettuce sync command, value is :" + value);
//獲取SyncCommand畸冲。Lettuce支持SyncCommand、AsyncCommands观腊、ActiveCommand三種command
RedisAsyncCommands<String, String> async = connection.async();
RedisFuture<String> getFuture = async.get("key");
value = getFuture.get();
System.out.println("get redis value with lettuce async command, value is :" + value);
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
new LettuceSimpleUse().testLettuce();
}
}
先看一張建立連接的時(shí)序圖邑闲,有一個(gè)直觀的印象。
RedisClient
一個(gè)可擴(kuò)展梧油、線程安全的RedisClient苫耸,支持sync、async儡陨、reactor執(zhí)行模式褪子。
RedisClient.create只是傳入了一些配置信息,此時(shí)并沒(méi)有創(chuàng)建連接骗村。
// 使用默認(rèn)的ClientResource
public static RedisClient create(String uri) {
LettuceAssert.notEmpty(uri, "URI must not be empty");
return new RedisClient(null, RedisURI.create(uri));
}
// ClientResources中包含了一些配置和線程池信息嫌褪,是一個(gè)比較重的資源,多個(gè)RedisClient可以共享同一個(gè)ClientResource
protected RedisClient(ClientResources clientResources, RedisURI redisURI) {
super(clientResources);
assertNotNull(redisURI);
this.redisURI = redisURI;
setDefaultTimeout(redisURI.getTimeout());
}
RedisClient.connnect
可以看到connect方法有一些重載方法胚股,默認(rèn)的是用UTF8 String對(duì)key和value序列化笼痛,通過(guò)傳入RedisCodec支持自定義的對(duì)Key和Value的序列化方式。
public StatefulRedisConnection<String, String> connect() {
return connect(newStringStringCodec());
}
public <K, V> StatefulRedisConnection<K, V> connect(RedisCodec<K, V> codec) {
checkForRedisURI();
//connectStandaloneAsync是異步創(chuàng)建connection琅拌,返回的是Future對(duì)象缨伊,通過(guò)getConnection轉(zhuǎn)為同步操作
return getConnection(connectStandaloneAsync(codec, this.redisURI, timeout));
}
//異步轉(zhuǎn)同步操作
protected <T> T getConnection(ConnectionFuture<T> connectionFuture) {
try {
return connectionFuture.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw RedisConnectionException.create(connectionFuture.getRemoteAddress(), e);
} catch (Exception e) {
if (e instanceof ExecutionException) {
throw RedisConnectionException.create(connectionFuture.getRemoteAddress(), e.getCause());
}
throw RedisConnectionException.create(connectionFuture.getRemoteAddress(), e);
}
}
RedisClient.connectStandaloneAsync
private <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectStandaloneAsync(RedisCodec<K, V> codec,
RedisURI redisURI, Duration timeout) {
assertNotNull(codec);
checkValidRedisURI(redisURI);
logger.debug("Trying to get a Redis connection for: " + redisURI);
//創(chuàng)建一個(gè)有狀態(tài)的EndPoint用于抽象底層channel的實(shí)現(xiàn),DefaultEndpoint內(nèi)部封裝斷線重連财忽、重連后成功后回放連接失敗期間的command倘核。同時(shí)封裝了AT_MOST_ONCE泣侮、AT_LEAST_ONCE的可靠性實(shí)現(xiàn)(該邏輯是基于內(nèi)存的即彪,所以并不可靠)。
DefaultEndpoint endpoint = new DefaultEndpoint(clientOptions, clientResources);
RedisChannelWriter writer = endpoint;
//進(jìn)一步封裝,添加支持過(guò)期時(shí)間的執(zhí)行命令
if (CommandExpiryWriter.isSupported(clientOptions)) {
writer = new CommandExpiryWriter(writer, clientOptions, clientResources);
}
//創(chuàng)建StatefulRedisConnectionImpl對(duì)象隶校,StatefulRedisConnectionImpl對(duì)外提供RedisCommand對(duì)象漏益,內(nèi)部基于writer發(fā)送命令。此時(shí)并沒(méi)有真正的創(chuàng)建物理連接深胳,該類(lèi)本身是無(wú)狀態(tài)绰疤、線程安全的。
StatefulRedisConnectionImpl<K, V> connection = newStatefulRedisConnection(writer, codec, timeout);
//異步創(chuàng)建Redis物理連接舞终,返回future對(duì)象轻庆。后面可以看到future中返回的對(duì)象其實(shí)還是上面的connection
ConnectionFuture<StatefulRedisConnection<K, V>> future = connectStatefulAsync(connection, codec, endpoint, redisURI,
() -> new CommandHandler(clientOptions, clientResources, endpoint));
future.whenComplete((channelHandler, throwable) -> {
if (throwable != null) {
connection.close();
}
});
return future;
}
//StatefulRedisConnectionImpl的構(gòu)造函數(shù),此時(shí)已經(jīng)創(chuàng)建了sync敛劝、async余爆、reactive三種類(lèi)型的RedisCommand】涿耍基于RedisCodec對(duì)key和value序列化蛾方,通過(guò)write把命令真正的發(fā)出去。
public StatefulRedisConnectionImpl(RedisChannelWriter writer, RedisCodec<K, V> codec, Duration timeout) {
super(writer, timeout);
this.codec = codec;
this.async = newRedisAsyncCommandsImpl();
this.sync = newRedisSyncCommandsImpl();
this.reactive = newRedisReactiveCommandsImpl();
}
RedisClient.connectStatefulAsync
private <K, V, S> ConnectionFuture<S> connectStatefulAsync(StatefulRedisConnectionImpl<K, V> connection,
RedisCodec<K, V> codec, Endpoint endpoint,
RedisURI redisURI, Supplier<CommandHandler> commandHandlerSupplier) {
//構(gòu)建ConnectionBuidler上陕,通過(guò)ConnectionBuilder來(lái)創(chuàng)建connection
ConnectionBuilder connectionBuilder;
if (redisURI.isSsl()) {
SslConnectionBuilder sslConnectionBuilder = SslConnectionBuilder.sslConnectionBuilder();
sslConnectionBuilder.ssl(redisURI);
connectionBuilder = sslConnectionBuilder;
} else {
connectionBuilder = ConnectionBuilder.connectionBuilder();
}
//填充StatefulRedisConnectionImpl
connectionBuilder.connection(connection);
//控制RedisClient行為的一些配置參數(shù)
connectionBuilder.clientOptions(clientOptions);
//ClientResource包含了一些EventLoopGroup信息
connectionBuilder.clientResources(clientResources);
//配置commandHandlerSupplier桩砰,這個(gè)commandHandler很重要,是實(shí)現(xiàn)StatefulRedisConnectionImpl線程安全的關(guān)鍵释簿,后面會(huì)詳細(xì)講亚隅。
connectionBuilder.commandHandler(commandHandlerSupplier).endpoint(endpoint);
//connectionBuilder填充Bootstrap等更多的信息
//getSocketAddressSupplier是根據(jù)redisURI獲取真正的Redis連接信息,如:sentinel模式下辕万,需要從sentinel獲取到真實(shí)的redis連接地址
connectionBuilder(getSocketAddressSupplier(redisURI), connectionBuilder, redisURI);
//配置netty的channeltype
channelType(connectionBuilder, redisURI);
if (clientOptions.isPingBeforeActivateConnection()) {
if (hasPassword(redisURI)) {
connectionBuilder.enableAuthPingBeforeConnect();
} else {
connectionBuilder.enablePingBeforeConnect();
}
}
//初始化channel枢步,在這一步才真正的異步的去創(chuàng)建物理連接
ConnectionFuture<RedisChannelHandler<K, V>> future = initializeChannelAsync(connectionBuilder);
ConnectionFuture<?> sync = future;
if (!clientOptions.isPingBeforeActivateConnection() && hasPassword(redisURI)) {
//連接成功之后發(fā)送auth命令,做密碼的驗(yàn)證
sync = sync.thenCompose(channelHandler -> {
CommandArgs<K, V> args = new CommandArgs<>(codec).add(redisURI.getPassword());
return connection.async().dispatch(CommandType.AUTH, new StatusOutput<>(codec), args);
});
}
//設(shè)置clientName渐尿,從Redis服務(wù)端執(zhí)行client list可以看到clientname
if (LettuceStrings.isNotEmpty(redisURI.getClientName())) {
sync = sync.thenApply(channelHandler -> {
connection.setClientName(redisURI.getClientName());
return channelHandler;
});
}
//選擇db
if (redisURI.getDatabase() != 0) {
sync = sync.thenCompose(channelHandler -> {
CommandArgs<K, V> args = new CommandArgs<>(codec).add(redisURI.getDatabase());
return connection.async().dispatch(CommandType.SELECT, new StatusOutput<>(codec), args);
});
}
//返回connection對(duì)象
return sync.thenApply(channelHandler -> (S) connection);
}
RedisClient.connectionBuilder
//為ConnectionBuidler填充更多的信息醉途,如Bootstrap、channelGroup
protected void connectionBuilder(Mono<SocketAddress> socketAddressSupplier, ConnectionBuilder connectionBuilder,
RedisURI redisURI) {
//創(chuàng)建Netty客戶(hù)端的Bootstrap對(duì)象
Bootstrap redisBootstrap = new Bootstrap();
//Bootstrap的一些配置參數(shù)砖茸,具體可以參考Netty的相關(guān)書(shū)籍(Netty權(quán)威指南)
redisBootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
redisBootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
redisBootstrap.option(ChannelOption.ALLOCATOR, BUF_ALLOCATOR);
SocketOptions socketOptions = getOptions().getSocketOptions();
redisBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
Math.toIntExact(socketOptions.getConnectTimeout().toMillis()));
if (LettuceStrings.isEmpty(redisURI.getSocket())) {
//keepAlive參數(shù)隘擎,默認(rèn)為true
redisBootstrap.option(ChannelOption.SO_KEEPALIVE, socketOptions.isKeepAlive());
//tcp_nodelay參數(shù),默認(rèn)為true
redisBootstrap.option(ChannelOption.TCP_NODELAY, socketOptions.isTcpNoDelay());
}
connectionBuilder.timeout(redisURI.getTimeout());
connectionBuilder.password(redisURI.getPassword());
//把構(gòu)建出來(lái)的bootStrap對(duì)象賦值給connectionBuidler凉夯,由connectionBuilder創(chuàng)建連接
connectionBuilder.bootstrap(redisBootstrap);
//Netty的相關(guān)參數(shù)配置货葬,待研究
connectionBuilder.channelGroup(channels).connectionEvents(connectionEvents).timer(timer);
//配置socket地址提供者
connectionBuilder.socketAddressSupplier(socketAddressSupplier);
}
RedisClient.initializeChannelAsync
//初始化redis連接,返回ChannelFuture對(duì)象
protected <K, V, T extends RedisChannelHandler<K, V>> ConnectionFuture<T> initializeChannelAsync(
ConnectionBuilder connectionBuilder) {
Mono<SocketAddress> socketAddressSupplier = connectionBuilder.socketAddress();
if (clientResources.eventExecutorGroup().isShuttingDown()) {
throw new IllegalStateException("Cannot connect, Event executor group is terminated.");
}
//創(chuàng)建socketAddressFuture 對(duì)象劲够,當(dāng)socketAddressSupplier異步獲取SocketAddress成功之后會(huì)把SocketAddress數(shù)據(jù)放入該對(duì)象中
CompletableFuture<SocketAddress> socketAddressFuture = new CompletableFuture<>();
//創(chuàng)建channelReadyFuture震桶,當(dāng)連接建立成功之后會(huì)把Channel對(duì)象放入該對(duì)象中
CompletableFuture<Channel> channelReadyFuture = new CompletableFuture<>();
//配置獲取SocketAddress異步操作之后的操作:
//1. 把SocketAddress對(duì)象放入socketAddressFuture中
//2. 基于SocketAddress調(diào)用initializeChannelAsync0方法真正去建立連接
socketAddressSupplier.doOnError(socketAddressFuture::completeExceptionally).doOnNext(socketAddressFuture::complete)
.subscribe(redisAddress -> {
if (channelReadyFuture.isCancelled()) {
return;
}
//異步建立真正的連接,如果建立成功會(huì)把生產(chǎn)的Channel對(duì)象放入channelReadyFuture中
initializeChannelAsync0(connectionBuilder, channelReadyFuture, redisAddress);
}, channelReadyFuture::completeExceptionally);
//建立連接成功之后返回的還是connectionBuilder的connection對(duì)象征绎,即StatefulRedisConnectionImpl
return new DefaultConnectionFuture<>(socketAddressFuture, channelReadyFuture.thenApply(channel -> (T) connectionBuilder
.connection()));
}
RedisClient.initializeChannelAsync0
//真正的去建立Redis物理連接蹲姐,這里面有很多基于Future的異步操作,如果看不太懂,建議先看看Future的相關(guān)知識(shí)柴墩,多看幾遍忙厌。
private void initializeChannelAsync0(ConnectionBuilder connectionBuilder, CompletableFuture<Channel> channelReadyFuture,
SocketAddress redisAddress) {
logger.debug("Connecting to Redis at {}", redisAddress);
Bootstrap redisBootstrap = connectionBuilder.bootstrap();
//創(chuàng)建PlainChannelInitializer對(duì)象,PlainChannelIntializer對(duì)象會(huì)在Channel初始化的時(shí)候添加很多Handlers(Netty的Handler概念可以參考Netty權(quán)威指南)江咳,如:CommandEncoder逢净、CommandHandler(非常重要的Handler)、ConnectionWatchdog(實(shí)現(xiàn)斷線重連)
RedisChannelInitializer initializer = connectionBuilder.build();
//RedisChannelInitializer配置到Bootstrap中
redisBootstrap.handler(initializer);
//調(diào)用一些通過(guò)ClientResources自定義的回調(diào)函數(shù)
clientResources.nettyCustomizer().afterBootstrapInitialized(redisBootstrap);
//獲取initFuture 對(duì)象歼指,如果Channel初始化完成爹土,可以通過(guò)該對(duì)象獲取到初始化的結(jié)果
CompletableFuture<Boolean> initFuture = initializer.channelInitialized();
//真正的通過(guò)Netty異步的方式去建立物理連接,返回ChannelFuture對(duì)象
ChannelFuture connectFuture = redisBootstrap.connect(redisAddress);
//配置異常處理
channelReadyFuture.whenComplete((c, t) -> {
if (t instanceof CancellationException) {
connectFuture.cancel(true);
initFuture.cancel(true);
}
});
connectFuture.addListener(future -> {
//異常處理
if (!future.isSuccess()) {
logger.debug("Connecting to Redis at {}: {}", redisAddress, future.cause());
connectionBuilder.endpoint().initialState();
//賦值channelReadyFuture告知出現(xiàn)異常了
channelReadyFuture.completeExceptionally(future.cause());
return;
}
//當(dāng)Channel初始化完成之后踩身,根據(jù)初始化的結(jié)果做判斷
initFuture.whenComplete((success, throwable) -> {
//如果異常為空着饥,則初始化成功。
if (throwable == null) {
logger.debug("Connecting to Redis at {}: Success", redisAddress);
RedisChannelHandler<?, ?> connection = connectionBuilder.connection();
connection.registerCloseables(closeableResources, connection);
//把成功之后的結(jié)果賦值給channelReadyFuture對(duì)象
channelReadyFuture.complete(connectFuture.channel());
return;
}
//如果初始化Channel的過(guò)程中出現(xiàn)異常的處理邏輯
logger.debug("Connecting to Redis at {}, initialization: {}", redisAddress, throwable);
connectionBuilder.endpoint().initialState();
Throwable failure;
if (throwable instanceof RedisConnectionException) {
failure = throwable;
} else if (throwable instanceof TimeoutException) {
failure = new RedisConnectionException("Could not initialize channel within "
+ connectionBuilder.getTimeout(), throwable);
} else {
failure = throwable;
}
//賦值channelReadyFuture告知出現(xiàn)異常了
channelReadyFuture.completeExceptionally(failure);
});
});
}
至此惰赋,Redis的Connection的建立連接的主流程就結(jié)束了宰掉,具體的一些邏輯如:斷線重連是如何實(shí)現(xiàn)的,Redis模式下是怎么基于Sentinel獲取Redis實(shí)際連接的等等會(huì)在后續(xù)的文章中介紹赁濒。