Redisson 中的 CommandExecutor

轉(zhuǎn)自:https://github.com/angryz/my-blog/issues/6

上篇Redisson 分布式鎖實(shí)現(xiàn)分析中提到了RedissonLock中的redis命令都是通過CommandExecutor來發(fā)送到redis服務(wù)執(zhí)行的哪替,本篇就來了解一下它的實(shí)現(xiàn)方式。

先來看其源碼

public interface CommandExecutor extends CommandSyncExecutor, CommandAsyncExecutor {
}

可以看到它同時(shí)繼承了 同步和異步(sync/async) 兩種調(diào)用方式古劲。

Note:

  • 在分布式鎖的實(shí)現(xiàn)中是用了同步的 CommandExecutor照宝,是因?yàn)殒i的獲取和釋放是有強(qiáng)一致性要求的颜懊,需要實(shí)時(shí)知道結(jié)果方可進(jìn)行下一步操作年堆。
  • 上篇分布式鎖分析中我提到 Redisson 的同步實(shí)現(xiàn)實(shí)際上是基于異步實(shí)現(xiàn)的斑匪,這在下文中也會得到解釋呐籽。

在Redisson中,除了提供同步和異步的方式執(zhí)行命令之外蚀瘸,還通過 Reactive Streams 實(shí)現(xiàn)了 Reactive 方式的命令執(zhí)行器狡蝶。見下圖

預(yù)備知識

Redisson 大量使用了 Redis 的 EVAL 命令來執(zhí)行 Lua 腳本,所以先要對 EVAL 有所了解贮勃。

EVAL命令格式和示例

EVAL script numkeys key [key ...] arg [arg ...]

> eval "return redis.call('set',KEYS[1],ARGV[1])" 1 foo bar
OK

從 Redis 2.6.0 版本開始牢酵,通過內(nèi)置的 Lua 解釋器,可以使用 EVAL 命令對 Lua 腳本進(jìn)行求值衙猪。

參數(shù)的說明本文不再詳述馍乙,可查閱 Redis命令參考

重點(diǎn)是這個(gè):Redis 使用單個(gè) Lua 解釋器去運(yùn)行所有腳本垫释,并且 Redis 也保證腳本會以原子性(atomic)的方式執(zhí)行丝格,當(dāng)某個(gè)腳本正在運(yùn)行的時(shí)候,不會有其他腳本或 Redis 命令被執(zhí)行棵譬。所以 Redisson 中使用了 EVAL 來保證執(zhí)行命令操作數(shù)據(jù)時(shí)的安全性显蝌。

例子

這里就使用 Redisson 參考文檔中的一個(gè) RAtomicLong 對象的例子吧。

RedissonClient client = Redisson.create(config);
RAtomicLong longObject = client.getAtomicLong('myLong');
// 同步方式
longObject.compareAndSet(3, 401);
// 異步方式
longObject.compareAndSetAsync(3, 401);

RedissonReactiveClient client = Redisson.createReactive(config);
RAtomicLongReactive longObject = client.getAtomicLong('myLong');
// reactive方式
longObject.compareAndSet(3, 401);

根據(jù)此例,我們分別來看 compareAndSet/compareAndSetAsync 的實(shí)現(xiàn)曼尊,其他命令原理都一樣酬诀。

異步

既然同步和Reactive的實(shí)現(xiàn)都繼承了異步的實(shí)現(xiàn),那我們就先來看看CommandAsyncService吧骆撇。

例子中的 longObject.compareAndSetAsync(3, 401); 實(shí)際執(zhí)行的是 RedissonAtomicLong 實(shí)現(xiàn)類的 compareAndSetAsync 方法瞒御,如下

public Future<Boolean> compareAndSetAsync(long expect, long update) {
    return commandExecutor.evalWriteAsync(getName(),
                                          StringCodec.INSTANCE,
                                          RedisCommands.EVAL_BOOLEAN,
                                          "...此處省略...",
                                          Collections.<Object>singletonList(getName()),
                                          expect, update);
}

此處的 evalWriteAsync 就是在 CommandAsyncService 中實(shí)現(xiàn)的,如下

public <T, R> Future<R> evalWriteAsync(String key,
                                       Codec codec,
                                       RedisCommand<T> evalCommandType,
                                       String script,
                                       List<Object> keys,
                                       Object ... params) {
    NodeSource source = getNodeSource(key);
    return evalAsync(source, false, codec, evalCommandType, script, keys, params);
}

private <T, R> Future<R> evalAsync(NodeSource nodeSource,
                                   boolean readOnlyMode,
                                   Codec codec,
                                   RedisCommand<T> evalCommandType,
                                   String script,
                                   List<Object> keys,
                                   Object ... params) {
    Promise<R> mainPromise = connectionManager.newPromise();
    List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
    args.add(script);
    args.add(keys.size());
    args.addAll(keys);
    args.addAll(Arrays.asList(params));
    async(readOnlyMode, nodeSource, codec, evalCommandType, args.toArray(), mainPromise, 0);
    return mainPromise;
}

追根溯源神郊,最后來看看 async 方法的實(shí)現(xiàn)肴裙,

protected <V, R> void async(final boolean readOnlyMode,
                            final NodeSource source,
                            final Codec codec,
                            final RedisCommand<V> command,
                            final Object[] params,
                            final Promise<R> mainPromise,
                            final int attempt) {
    // ....省略部分代碼....
    // AsyncDetails 是一個(gè)包裝對象,用來將異步調(diào)用過程中的對象引用包裝起來方便使用
    final AsyncDetails<V, R> details = AsyncDetails.acquire();
    details.init(connectionFuture, attemptPromise,
            readOnlyMode, source, codec, command, params, mainPromise, attempt);

    // retryTimerTask 用來實(shí)現(xiàn) Redisson 提供的重試機(jī)制
    final TimerTask retryTimerTask = new TimerTask() {

        @Override
        public void run(Timeout t) throws Exception {
            // ....省略部分代碼....
            int count = details.getAttempt() + 1;
            // ....省略部分代碼....
            async(details.isReadOnlyMode(), details.getSource(),
                    details.getCodec(), details.getCommand(),
                    details.getParams(), details.getMainPromise(), count);
            AsyncDetails.release(details);
        }
    };
    // 啟用重試機(jī)制
    Timeout timeout = connectionManager.newTimeout(retryTimerTask,
            connectionManager.getConfig().getRetryInterval(),
            TimeUnit.MILLISECONDS);
    details.setTimeout(timeout);

    // checkConnectionFuture 用于檢查客戶端是否與服務(wù)端集群建立連接涌乳,如果連接建立
    // 則可發(fā)送命令到服務(wù)端執(zhí)行
    if (connectionFuture.isDone()) {
        checkConnectionFuture(source, details);
    } else {
        connectionFuture.addListener(new FutureListener<RedisConnection>() {
            @Override
            public void operationComplete(Future<RedisConnection> connFuture) throws Exception {
                checkConnectionFuture(source, details);
            }
        });
    }

    // ....省略部分代碼....
}

private <R, V> void checkConnectionFuture(final NodeSource source,
        final AsyncDetails<V, R> details) {
    // ....省略部分代碼....
    // 獲取客戶端與服務(wù)端集群建立的連接
    final RedisConnection connection = details.getConnectionFuture().getNow();

    if (details.getSource().getRedirect() == Redirect.ASK) {
        // 客戶端接收到 ASK 轉(zhuǎn)向, 先發(fā)送一個(gè) ASKING 命令蜻懦,然后再發(fā)送真正的命令請求
        // ....省略部分代碼....
    } else {
        // ....省略部分代碼....
        // 客戶端發(fā)送命令到服務(wù)端
        ChannelFuture future = connection.send(new CommandData<V, R>(details.getAttemptPromise(),
                details.getCodec(), details.getCommand(), details.getParams()));
        details.setWriteFuture(future);
    }
    // ....省略部分代碼....
    // 釋放本次連接
    releaseConnection(source, details.getConnectionFuture(), details.isReadOnlyMode(),
            details.getAttemptPromise(), details);
}

由于代碼太長,我只貼出了和執(zhí)行命令有關(guān)的部分代碼夕晓,我們可以從上面代碼中看到

  • Redisson 對每次操作都提供了重試機(jī)制宛乃,可配置 retryAttempts 來控制重試次數(shù)(缺省為3次),可配置 retryInterval 來控制重試間隔(缺省為 1000 ms)蒸辆。Redisson 中使用了 Netty 的 TimerTaskTimeout 工具來實(shí)現(xiàn)其重試機(jī)制烤惊。
  • Redisson 中也大量使用了 Netty 實(shí)現(xiàn)的異步工具 FutureFutureListener,使得異步調(diào)用執(zhí)行完成后能夠立刻做出對應(yīng)的操作吁朦。
  • RedissonConnection 是基于 Netty 實(shí)現(xiàn)的柒室,發(fā)送命令的 send 方法實(shí)現(xiàn)是使用 Netty 的 Channel.writeAndFlush 方法。

以上便是 Redisson 的異步實(shí)現(xiàn)逗宜。

同步

Redisson 里的同步都是基于異步來實(shí)現(xiàn)的雄右,為什么這么說,來看看 RedissonAtomicLongcompareAndSet方法纺讲,

public boolean compareAndSet(long expect, long update) {
    return get(compareAndSetAsync(expect, update));
}

可見是在之前的異步方法外套了一個(gè) get 方法擂仍,而這個(gè) get 方法實(shí)際上也是在異步實(shí)現(xiàn)類 CommandAsyncService 中實(shí)現(xiàn)的,至于同步的實(shí)現(xiàn)類 CommandSyncService 有興趣大家可以去看看熬甚,基本上都是在異步實(shí)現(xiàn)返回的 Future 外套了一個(gè) get 方法逢渔。那么我們就看看 get 的實(shí)現(xiàn),

public <V> V get(Future<V> future) {
    final CountDownLatch l = new CountDownLatch(1);
    future.addListener(new FutureListener<V>() {
        @Override
        public void operationComplete(Future<V> future) throws Exception {
            l.countDown();
        }
    });
    try {
        // 阻塞當(dāng)前線程
        l.await();
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
    if (future.isSuccess()) {
        return future.getNow();
    }
    throw convertException(future);
}

原來是利用了 CountDownLatch 在異步調(diào)用結(jié)果返回前將當(dāng)前線程阻塞乡括,然后通過 Netty 的 FutureListener在異步調(diào)用完成后解除阻塞肃廓,并返回調(diào)用結(jié)果。

Reactive

從例子中可以看到诲泌,Reactive 的客戶端和對象實(shí)現(xiàn)都是獨(dú)立的盲赊,先來看看 RedissonAtomicLongReactivecompareAndSet 方法,

public Publisher<Boolean> compareAndSet(long expect, long update) {
    return commandExecutor.evalWriteReactive(getName(), StringCodec.INSTANCE,
            RedisCommands.EVAL_BOOLEAN,
            "if redis.call('get', KEYS[1]) == ARGV[1] then "
                 + "redis.call('set', KEYS[1], ARGV[2]); "
                 + "return 1 "
               + "else "
                 + "return 0 end",
            Collections.<Object>singletonList(getName()), expect, update);
}

它調(diào)用的是 CommandReactiveService 中實(shí)現(xiàn)的 evalWriteReactive 方法敷扫,

public <T, R> Publisher<R> evalWriteReactive(String key, Codec codec,
        RedisCommand<T> evalCommandType, String script, List<Object> keys,
        Object... params) {
  Future<R> f = evalWriteAsync(key, codec, evalCommandType, script, keys, params);
  return new NettyFuturePublisher<R>(f);
}

我們看到這里還是基于異步調(diào)用實(shí)現(xiàn)的哀蘑,只是將異步調(diào)用返回的 Future 封裝在了一個(gè) NettyFuturePublisher 對象中返回,這個(gè)對象繼承了 Reactive Streams 中的 Stream,所以我的解讀也只能到此為止了绘迁,Reactive Streams 的相關(guān)知識目前我還不具備合溺。

總結(jié)

  • Redisson 提供了 同步、異步 和 Reactive 三種命令執(zhí)行方式缀台。
  • 同步 和 Reactive 的實(shí)現(xiàn)是基于 異步 的實(shí)現(xiàn)的棠赛。
  • Redisson 使用 Netty 連接 Redis 服務(wù),并依賴 Netty 異步工具類來實(shí)現(xiàn)異步通信将硝、重試機(jī)制恭朗、阻塞等特性屏镊。
  • Redisson 使用 Reactive Streams 來實(shí)現(xiàn) Reactive 特性依疼。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市而芥,隨后出現(xiàn)的幾起案子律罢,更是在濱河造成了極大的恐慌,老刑警劉巖棍丐,帶你破解...
    沈念sama閱讀 218,546評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件误辑,死亡現(xiàn)場離奇詭異,居然都是意外死亡歌逢,警方通過查閱死者的電腦和手機(jī)巾钉,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,224評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來秘案,“玉大人砰苍,你說我怎么就攤上這事≮甯撸” “怎么了赚导?”我有些...
    開封第一講書人閱讀 164,911評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長赤惊。 經(jīng)常有香客問我吼旧,道長,這世上最難降的妖魔是什么未舟? 我笑而不...
    開封第一講書人閱讀 58,737評論 1 294
  • 正文 為了忘掉前任圈暗,我火速辦了婚禮,結(jié)果婚禮上裕膀,老公的妹妹穿的比我還像新娘厂置。我一直安慰自己,他們只是感情好魂角,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,753評論 6 392
  • 文/花漫 我一把揭開白布昵济。 她就那樣靜靜地躺著,像睡著了一般。 火紅的嫁衣襯著肌膚如雪访忿。 梳的紋絲不亂的頭發(fā)上瞧栗,一...
    開封第一講書人閱讀 51,598評論 1 305
  • 那天,我揣著相機(jī)與錄音海铆,去河邊找鬼迹恐。 笑死,一個(gè)胖子當(dāng)著我的面吹牛卧斟,可吹牛的內(nèi)容都是我干的殴边。 我是一名探鬼主播,決...
    沈念sama閱讀 40,338評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼珍语,長吁一口氣:“原來是場噩夢啊……” “哼锤岸!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起板乙,我...
    開封第一講書人閱讀 39,249評論 0 276
  • 序言:老撾萬榮一對情侶失蹤是偷,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后募逞,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體蛋铆,經(jīng)...
    沈念sama閱讀 45,696評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,888評論 3 336
  • 正文 我和宋清朗相戀三年放接,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了刺啦。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,013評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡纠脾,死狀恐怖玛瘸,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情乳乌,我是刑警寧澤捧韵,帶...
    沈念sama閱讀 35,731評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站汉操,受9級特大地震影響再来,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜磷瘤,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,348評論 3 330
  • 文/蒙蒙 一芒篷、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧采缚,春花似錦针炉、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,929評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽殖侵。三九已至,卻和暖如春镰烧,著一層夾襖步出監(jiān)牢的瞬間拢军,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,048評論 1 270
  • 我被黑心中介騙來泰國打工怔鳖, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留茉唉,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,203評論 3 370
  • 正文 我出身青樓结执,卻偏偏與公主長得像度陆,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子献幔,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,960評論 2 355