轉(zhuǎn)自:https://github.com/angryz/my-blog/issues/6
上篇Redisson 分布式鎖實(shí)現(xiàn)分析中提到了RedissonLock
中的redis命令都是通過CommandExecutor
來發(fā)送到redis服務(wù)執(zhí)行的哪替,本篇就來了解一下它的實(shí)現(xiàn)方式。
先來看其源碼
public interface CommandExecutor extends CommandSyncExecutor, CommandAsyncExecutor {
}
可以看到它同時(shí)繼承了 同步和異步(sync/async) 兩種調(diào)用方式古劲。
Note:
- 在分布式鎖的實(shí)現(xiàn)中是用了同步的 CommandExecutor照宝,是因?yàn)殒i的獲取和釋放是有強(qiáng)一致性要求的颜懊,需要實(shí)時(shí)知道結(jié)果方可進(jìn)行下一步操作年堆。
- 上篇分布式鎖分析中我提到 Redisson 的同步實(shí)現(xiàn)實(shí)際上是基于異步實(shí)現(xiàn)的斑匪,這在下文中也會得到解釋呐籽。
在Redisson中,除了提供同步和異步的方式執(zhí)行命令之外蚀瘸,還通過 Reactive Streams 實(shí)現(xiàn)了 Reactive 方式的命令執(zhí)行器狡蝶。見下圖
預(yù)備知識
Redisson 大量使用了 Redis 的 EVAL
命令來執(zhí)行 Lua 腳本,所以先要對 EVAL
有所了解贮勃。
EVAL命令格式和示例
EVAL script numkeys key [key ...] arg [arg ...]
> eval "return redis.call('set',KEYS[1],ARGV[1])" 1 foo bar
OK
從 Redis 2.6.0 版本開始牢酵,通過內(nèi)置的 Lua 解釋器,可以使用 EVAL 命令對 Lua 腳本進(jìn)行求值衙猪。
參數(shù)的說明本文不再詳述馍乙,可查閱 Redis命令參考。
重點(diǎn)是這個(gè):Redis 使用單個(gè) Lua 解釋器去運(yùn)行所有腳本垫释,并且 Redis 也保證腳本會以原子性(atomic)的方式執(zhí)行丝格,當(dāng)某個(gè)腳本正在運(yùn)行的時(shí)候,不會有其他腳本或 Redis 命令被執(zhí)行棵譬。所以 Redisson 中使用了 EVAL 來保證執(zhí)行命令操作數(shù)據(jù)時(shí)的安全性显蝌。
例子
這里就使用 Redisson 參考文檔中的一個(gè) RAtomicLong
對象的例子吧。
RedissonClient client = Redisson.create(config);
RAtomicLong longObject = client.getAtomicLong('myLong');
// 同步方式
longObject.compareAndSet(3, 401);
// 異步方式
longObject.compareAndSetAsync(3, 401);
RedissonReactiveClient client = Redisson.createReactive(config);
RAtomicLongReactive longObject = client.getAtomicLong('myLong');
// reactive方式
longObject.compareAndSet(3, 401);
根據(jù)此例,我們分別來看 compareAndSet/compareAndSetAsync
的實(shí)現(xiàn)曼尊,其他命令原理都一樣酬诀。
異步
既然同步和Reactive的實(shí)現(xiàn)都繼承了異步的實(shí)現(xiàn),那我們就先來看看CommandAsyncService
吧骆撇。
例子中的 longObject.compareAndSetAsync(3, 401);
實(shí)際執(zhí)行的是 RedissonAtomicLong
實(shí)現(xiàn)類的 compareAndSetAsync
方法瞒御,如下
public Future<Boolean> compareAndSetAsync(long expect, long update) {
return commandExecutor.evalWriteAsync(getName(),
StringCodec.INSTANCE,
RedisCommands.EVAL_BOOLEAN,
"...此處省略...",
Collections.<Object>singletonList(getName()),
expect, update);
}
此處的 evalWriteAsync
就是在 CommandAsyncService
中實(shí)現(xiàn)的,如下
public <T, R> Future<R> evalWriteAsync(String key,
Codec codec,
RedisCommand<T> evalCommandType,
String script,
List<Object> keys,
Object ... params) {
NodeSource source = getNodeSource(key);
return evalAsync(source, false, codec, evalCommandType, script, keys, params);
}
private <T, R> Future<R> evalAsync(NodeSource nodeSource,
boolean readOnlyMode,
Codec codec,
RedisCommand<T> evalCommandType,
String script,
List<Object> keys,
Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise();
List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
args.add(script);
args.add(keys.size());
args.addAll(keys);
args.addAll(Arrays.asList(params));
async(readOnlyMode, nodeSource, codec, evalCommandType, args.toArray(), mainPromise, 0);
return mainPromise;
}
追根溯源神郊,最后來看看 async
方法的實(shí)現(xiàn)肴裙,
protected <V, R> void async(final boolean readOnlyMode,
final NodeSource source,
final Codec codec,
final RedisCommand<V> command,
final Object[] params,
final Promise<R> mainPromise,
final int attempt) {
// ....省略部分代碼....
// AsyncDetails 是一個(gè)包裝對象,用來將異步調(diào)用過程中的對象引用包裝起來方便使用
final AsyncDetails<V, R> details = AsyncDetails.acquire();
details.init(connectionFuture, attemptPromise,
readOnlyMode, source, codec, command, params, mainPromise, attempt);
// retryTimerTask 用來實(shí)現(xiàn) Redisson 提供的重試機(jī)制
final TimerTask retryTimerTask = new TimerTask() {
@Override
public void run(Timeout t) throws Exception {
// ....省略部分代碼....
int count = details.getAttempt() + 1;
// ....省略部分代碼....
async(details.isReadOnlyMode(), details.getSource(),
details.getCodec(), details.getCommand(),
details.getParams(), details.getMainPromise(), count);
AsyncDetails.release(details);
}
};
// 啟用重試機(jī)制
Timeout timeout = connectionManager.newTimeout(retryTimerTask,
connectionManager.getConfig().getRetryInterval(),
TimeUnit.MILLISECONDS);
details.setTimeout(timeout);
// checkConnectionFuture 用于檢查客戶端是否與服務(wù)端集群建立連接涌乳,如果連接建立
// 則可發(fā)送命令到服務(wù)端執(zhí)行
if (connectionFuture.isDone()) {
checkConnectionFuture(source, details);
} else {
connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> connFuture) throws Exception {
checkConnectionFuture(source, details);
}
});
}
// ....省略部分代碼....
}
private <R, V> void checkConnectionFuture(final NodeSource source,
final AsyncDetails<V, R> details) {
// ....省略部分代碼....
// 獲取客戶端與服務(wù)端集群建立的連接
final RedisConnection connection = details.getConnectionFuture().getNow();
if (details.getSource().getRedirect() == Redirect.ASK) {
// 客戶端接收到 ASK 轉(zhuǎn)向, 先發(fā)送一個(gè) ASKING 命令蜻懦,然后再發(fā)送真正的命令請求
// ....省略部分代碼....
} else {
// ....省略部分代碼....
// 客戶端發(fā)送命令到服務(wù)端
ChannelFuture future = connection.send(new CommandData<V, R>(details.getAttemptPromise(),
details.getCodec(), details.getCommand(), details.getParams()));
details.setWriteFuture(future);
}
// ....省略部分代碼....
// 釋放本次連接
releaseConnection(source, details.getConnectionFuture(), details.isReadOnlyMode(),
details.getAttemptPromise(), details);
}
由于代碼太長,我只貼出了和執(zhí)行命令有關(guān)的部分代碼夕晓,我們可以從上面代碼中看到
- Redisson 對每次操作都提供了重試機(jī)制宛乃,可配置
retryAttempts
來控制重試次數(shù)(缺省為3次),可配置retryInterval
來控制重試間隔(缺省為 1000 ms)蒸辆。Redisson 中使用了 Netty 的TimerTask
和Timeout
工具來實(shí)現(xiàn)其重試機(jī)制烤惊。 - Redisson 中也大量使用了 Netty 實(shí)現(xiàn)的異步工具
Future
和FutureListener
,使得異步調(diào)用執(zhí)行完成后能夠立刻做出對應(yīng)的操作吁朦。 - RedissonConnection 是基于 Netty 實(shí)現(xiàn)的柒室,發(fā)送命令的
send
方法實(shí)現(xiàn)是使用 Netty 的Channel.writeAndFlush
方法。
以上便是 Redisson 的異步實(shí)現(xiàn)逗宜。
同步
Redisson 里的同步都是基于異步來實(shí)現(xiàn)的雄右,為什么這么說,來看看 RedissonAtomicLong
的 compareAndSet
方法纺讲,
public boolean compareAndSet(long expect, long update) {
return get(compareAndSetAsync(expect, update));
}
可見是在之前的異步方法外套了一個(gè) get
方法擂仍,而這個(gè) get
方法實(shí)際上也是在異步實(shí)現(xiàn)類 CommandAsyncService
中實(shí)現(xiàn)的,至于同步的實(shí)現(xiàn)類 CommandSyncService
有興趣大家可以去看看熬甚,基本上都是在異步實(shí)現(xiàn)返回的 Future
外套了一個(gè) get
方法逢渔。那么我們就看看 get
的實(shí)現(xiàn),
public <V> V get(Future<V> future) {
final CountDownLatch l = new CountDownLatch(1);
future.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
l.countDown();
}
});
try {
// 阻塞當(dāng)前線程
l.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (future.isSuccess()) {
return future.getNow();
}
throw convertException(future);
}
原來是利用了 CountDownLatch
在異步調(diào)用結(jié)果返回前將當(dāng)前線程阻塞乡括,然后通過 Netty 的 FutureListener
在異步調(diào)用完成后解除阻塞肃廓,并返回調(diào)用結(jié)果。
Reactive
從例子中可以看到诲泌,Reactive 的客戶端和對象實(shí)現(xiàn)都是獨(dú)立的盲赊,先來看看 RedissonAtomicLongReactive
的 compareAndSet
方法,
public Publisher<Boolean> compareAndSet(long expect, long update) {
return commandExecutor.evalWriteReactive(getName(), StringCodec.INSTANCE,
RedisCommands.EVAL_BOOLEAN,
"if redis.call('get', KEYS[1]) == ARGV[1] then "
+ "redis.call('set', KEYS[1], ARGV[2]); "
+ "return 1 "
+ "else "
+ "return 0 end",
Collections.<Object>singletonList(getName()), expect, update);
}
它調(diào)用的是 CommandReactiveService
中實(shí)現(xiàn)的 evalWriteReactive
方法敷扫,
public <T, R> Publisher<R> evalWriteReactive(String key, Codec codec,
RedisCommand<T> evalCommandType, String script, List<Object> keys,
Object... params) {
Future<R> f = evalWriteAsync(key, codec, evalCommandType, script, keys, params);
return new NettyFuturePublisher<R>(f);
}
我們看到這里還是基于異步調(diào)用實(shí)現(xiàn)的哀蘑,只是將異步調(diào)用返回的 Future
封裝在了一個(gè) NettyFuturePublisher
對象中返回,這個(gè)對象繼承了 Reactive Streams 中的 Stream
,所以我的解讀也只能到此為止了绘迁,Reactive Streams 的相關(guān)知識目前我還不具備合溺。
總結(jié)
- Redisson 提供了 同步、異步 和 Reactive 三種命令執(zhí)行方式缀台。
- 同步 和 Reactive 的實(shí)現(xiàn)是基于 異步 的實(shí)現(xiàn)的棠赛。
- Redisson 使用 Netty 連接 Redis 服務(wù),并依賴 Netty 異步工具類來實(shí)現(xiàn)異步通信将硝、重試機(jī)制恭朗、阻塞等特性屏镊。
- Redisson 使用 Reactive Streams 來實(shí)現(xiàn) Reactive 特性依疼。