Redis watch機(jī)制的分析

Redis watch機(jī)制的分析

我的博客

我們可以用redis的watch和multi來(lái)處理一些涉及并發(fā)的操作,redis的watch+multi實(shí)際是一種樂觀鎖,今天我們來(lái)分析一下它的實(shí)現(xiàn)機(jī)制陕习。

常用的代碼段

$key = 'xxxx';
$redis->watch($key);
$redis->multi();
// 更新了key
$redis->set($key);
$flag = $redis->exec();

// 如果事務(wù)執(zhí)行失敗返回false
if ($flag === false) {
    
} else {
    
}

流程圖

redis_watch.png

當(dāng)客戶端A和客戶端B同時(shí)執(zhí)行這段代碼時(shí)候声滥,因?yàn)槭聞?wù)的執(zhí)行是串行的,假設(shè)A客戶端先于B執(zhí)行,那么當(dāng)A執(zhí)行完成時(shí)缚够,會(huì)將客戶端A從watch了這個(gè)key的列表中刪除隔缀,并且將列表中的所有客戶端都設(shè)置為CLIENT_DIRTY_CAS题造,之后當(dāng)B執(zhí)行的時(shí)候,事務(wù)發(fā)現(xiàn)B的狀態(tài)是CLIENT_DIRTY_CAS猾瘸,便終止事務(wù)并返回失敗界赔。

存儲(chǔ)

  1. redis 用了哈希表+鏈表存儲(chǔ)watch了key的客戶端:

哈希表key為redis的key,哈希表的value為客戶端組成的鏈表

定位key的復(fù)雜度O(1)牵触,查找和處理客戶端的復(fù)雜度為O(n)

Key1 => (client1->client2->client3...)

Key2 => (client1->client2->client3...)

  1. 每個(gè)客戶端也維護(hù)一個(gè)鏈表用來(lái)存儲(chǔ)已經(jīng)watch的key

相關(guān)源碼

涉及文件

multi.h

multi.c

db.c

t_string.c

watch

/* watch命令 */
void watchCommand(client *c) {
    int j;

    if (c->flags & CLIENT_MULTI) {
        addReplyError(c,"WATCH inside MULTI is not allowed");
        return;
    }
    for (j = 1; j < c->argc; j++)
        watchForKey(c,c->argv[j]);
    addReply(c,shared.ok);
}

typedef struct watchedKey {
    robj *key;
    redisDb *db;
} watchedKey;

/* watch一個(gè)key */
void watchForKey(client *c, robj *key) {
    list *clients = NULL;
    listIter li;
    listNode *ln;
    watchedKey *wk;

    /* 檢查key是否已經(jīng)watch 如果已經(jīng)watch 直接返回 */
    // 創(chuàng)建一個(gè)迭代器
    listRewind(c->watched_keys,&li);
    // 遍歷客戶端已經(jīng)watch的key
    while((ln = listNext(&li))) {
        wk = listNodeValue(ln);
        // 當(dāng)發(fā)現(xiàn)已經(jīng)存在次key淮悼,直接返回
        if (wk->db == c->db && equalStringObjects(key,wk->key))
            return; /* Key already watched */
    }
    /* 沒有被watch,繼續(xù)一下處理 */
    // 獲取hash表中當(dāng)前key的客戶端鏈表
    clients = dictFetchValue(c->db->watched_keys,key);
    // 如果不存在揽思,則創(chuàng)建一個(gè)鏈表用于存儲(chǔ)
    if (!clients) {
        clients = listCreate();
        dictAdd(c->db->watched_keys,key,clients);
        incrRefCount(key);
    }
    // 添加當(dāng)前客戶端到鏈表末尾
    listAddNodeTail(clients,c);
    /* 維護(hù)客戶端中的watch_keys 鏈表 */
    wk = zmalloc(sizeof(*wk));
    wk->key = key;
    wk->db = c->db;
    incrRefCount(key);
    listAddNodeTail(c->watched_keys,wk);
}

multi

/* multi 命令 */
void multiCommand(client *c) {
    // 如果已經(jīng)初始化了客戶端狀態(tài)袜腥,即已經(jīng)執(zhí)行了multi 則返回
    if (c->flags & CLIENT_MULTI) {
        addReplyError(c,"MULTI calls can not be nested");
        return;
    }
    // 初始化客戶端狀態(tài)為CLIENT_MULTI
    c->flags |= CLIENT_MULTI;
    addReply(c,shared.ok);
}

/* 初始化客戶端狀態(tài) 清空事務(wù)隊(duì)列 */
void initClientMultiState(client *c) {
    c->mstate.commands = NULL;
    c->mstate.count = 0;
}

exec

/* exec 命令 */
void execCommand(client *c) {
    int j;
    robj **orig_argv;
    int orig_argc;
    struct redisCommand *orig_cmd;
    int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */
    int was_master = server.masterhost == NULL;
    
    // 未執(zhí)行multi,則返回
    if (!(c->flags & CLIENT_MULTI)) {
        addReplyError(c,"EXEC without MULTI");
        return;
    }
    
    /*
     * 關(guān)鍵
     * 處理客戶端狀態(tài) 以下兩種狀態(tài)會(huì)直接終止事務(wù)钉汗,不會(huì)執(zhí)行事務(wù)隊(duì)列中的命令
     * 1. CLIENT_DIRTY_CAS => 當(dāng)因?yàn)閣atch的key被touch了
     * 2. CLIENT_DIRTY_EXEC => 當(dāng)客戶端入隊(duì)了不存在的命令
     */
    
    /* Check if we need to abort the EXEC because:
     * 1) Some WATCHed key was touched.
     * 2) There was a previous error while queueing commands.
     * A failed EXEC in the first case returns a multi bulk nil object
     * (technically it is not an error but a special behavior), while
     * in the second an EXECABORT error is returned. */
    if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) {
        addReply(c, c->flags & CLIENT_DIRTY_EXEC ? shared.execaborterr :
                                                  shared.nullmultibulk);
        // 
        discardTransaction(c);
        goto handle_monitor;
    }

    /* 執(zhí)行隊(duì)列中的命令 */
    // 清空當(dāng)前客戶端中存儲(chǔ)的watch了的key羹令,和hash表中客戶端node
    unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
    orig_argv = c->argv;
    orig_argc = c->argc;
    orig_cmd = c->cmd;
    addReplyMultiBulkLen(c,c->mstate.count);
    // 執(zhí)行隊(duì)列中的命令
    for (j = 0; j < c->mstate.count; j++) {
        c->argc = c->mstate.commands[j].argc;
        c->argv = c->mstate.commands[j].argv;
        c->cmd = c->mstate.commands[j].cmd;

        /* Propagate a MULTI request once we encounter the first command which
         * is not readonly nor an administrative one.
         * This way we'll deliver the MULTI/..../EXEC block as a whole and
         * both the AOF and the replication link will have the same consistency
         * and atomicity guarantees. */
        if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN))) {
            execCommandPropagateMulti(c);
            must_propagate = 1;
        }
        // 這里會(huì)call相關(guān)的命令
        // 如果是涉及到修改相關(guān)的命令,不管有沒有更改值损痰,都會(huì)將hash表中watch了key的客戶端的狀態(tài)置為CLIENT_DIRTY_CAS
        call(c,CMD_CALL_FULL);

        /* Commands may alter argc/argv, restore mstate. */
        c->mstate.commands[j].argc = c->argc;
        c->mstate.commands[j].argv = c->argv;
        c->mstate.commands[j].cmd = c->cmd;
    }
    c->argv = orig_argv;
    c->argc = orig_argc;
    c->cmd = orig_cmd;
    discardTransaction(c);

    /* Make sure the EXEC command will be propagated as well if MULTI
     * was already propagated. */
    if (must_propagate) {
        int is_master = server.masterhost == NULL;
        server.dirty++;
        /* If inside the MULTI/EXEC block this instance was suddenly
         * switched from master to slave (using the SLAVEOF command), the
         * initial MULTI was propagated into the replication backlog, but the
         * rest was not. We need to make sure to at least terminate the
         * backlog with the final EXEC. */
        if (server.repl_backlog && was_master && !is_master) {
            char *execcmd = "*1\r\n$4\r\nEXEC\r\n";
            feedReplicationBacklog(execcmd,strlen(execcmd));
        }
    }

handle_monitor:
    /* Send EXEC to clients waiting data from MONITOR. We do it here
     * since the natural order of commands execution is actually:
     * MUTLI, EXEC, ... commands inside transaction ...
     * Instead EXEC is flagged as CMD_SKIP_MONITOR in the command
     * table, and we do it here with correct ordering. */
    if (listLength(server.monitors) && !server.loading)
        replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
}

/* 清空當(dāng)前事務(wù)數(shù)據(jù) */
void discardTransaction(client *c) {
    freeClientMultiState(c);
    initClientMultiState(c);
    c->flags &= ~(CLIENT_MULTI|CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC);
    unwatchAllKeys(c);
}

/* Unwatch all the keys watched by this client. To clean the EXEC dirty
 * flag is up to the caller. */
void unwatchAllKeys(client *c) {
    listIter li;
    listNode *ln;

    if (listLength(c->watched_keys) == 0) return;
    listRewind(c->watched_keys,&li);
    while((ln = listNext(&li))) {
        list *clients;
        watchedKey *wk;

        /* Lookup the watched key -> clients list and remove the client
         * from the list */
        wk = listNodeValue(ln);
        clients = dictFetchValue(wk->db->watched_keys, wk->key);
        serverAssertWithInfo(c,NULL,clients != NULL);
        listDelNode(clients,listSearchKey(clients,c));
        /* Kill the entry at all if this was the only client */
        if (listLength(clients) == 0)
            dictDelete(wk->db->watched_keys, wk->key);
        /* Remove this watched key from the client->watched list */
        listDelNode(c->watched_keys,ln);
        decrRefCount(wk->key);
        zfree(wk);
    }
}

set,hset 以set命令為??

void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
    long long milliseconds = 0; /* initialized to avoid any harmness warning */

    if (expire) {
        if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK)
            return;
        if (milliseconds <= 0) {
            addReplyErrorFormat(c,"invalid expire time in %s",c->cmd->name);
            return;
        }
        if (unit == UNIT_SECONDS) milliseconds *= 1000;
    }

    if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) ||
        (flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL))
    {
        addReply(c, abort_reply ? abort_reply : shared.nullbulk);
        return;
    }
    // 看這里?? 設(shè)置string的值
    setKey(c->db,key,val);
    server.dirty++;
    if (expire) setExpire(c,c->db,key,mstime()+milliseconds);
    notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
    if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,
        "expire",key,c->db->id);
    addReply(c, ok_reply ? ok_reply : shared.ok);
}

/* SET key value [NX] [XX] [EX <seconds>] [PX <milliseconds>] */
void setCommand(client *c) {
    int j;
    robj *expire = NULL;
    int unit = UNIT_SECONDS;
    int flags = OBJ_SET_NO_FLAGS;

    for (j = 3; j < c->argc; j++) {
        char *a = c->argv[j]->ptr;
        robj *next = (j == c->argc-1) ? NULL : c->argv[j+1];

        if ((a[0] == 'n' || a[0] == 'N') &&
            (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
            !(flags & OBJ_SET_XX))
        {
            flags |= OBJ_SET_NX;
        } else if ((a[0] == 'x' || a[0] == 'X') &&
                   (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
                   !(flags & OBJ_SET_NX))
        {
            flags |= OBJ_SET_XX;
        } else if ((a[0] == 'e' || a[0] == 'E') &&
                   (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
                   !(flags & OBJ_SET_PX) && next)
        {
            flags |= OBJ_SET_EX;
            unit = UNIT_SECONDS;
            expire = next;
            j++;
        } else if ((a[0] == 'p' || a[0] == 'P') &&
                   (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
                   !(flags & OBJ_SET_EX) && next)
        {
            flags |= OBJ_SET_PX;
            unit = UNIT_MILLISECONDS;
            expire = next;
            j++;
        } else {
            addReply(c,shared.syntaxerr);
            return;
        }
    }

    c->argv[2] = tryObjectEncoding(c->argv[2]);
    setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);
}

/* High level Set operation. This function can be used in order to set
 * a key, whatever it was existing or not, to a new object.
 *
 * 1) The ref count of the value object is incremented.
 * 2) clients WATCHing for the destination key notified.
 * 3) The expire time of the key is reset (the key is made persistent).
 *
 * All the new keys in the database should be craeted via this interface. */
void setKey(redisDb *db, robj *key, robj *val) {
    if (lookupKeyWrite(db,key) == NULL) {
        dbAdd(db,key,val);
    } else {
        dbOverwrite(db,key,val);
    }
    incrRefCount(val);
    removeExpire(db,key);
    // 看這里?? 標(biāo)記hash表中所有已經(jīng)watch這個(gè)key的所有客戶端狀態(tài)為CLIENT_DIRTY_CAS
    // 如果我原先的值為1福侈,這里set為1,也會(huì)執(zhí)行這個(gè)方法卢未。所以說(shuō)和值變沒變沒關(guān)系肪凛。
    signalModifiedKey(db,key);
}

void signalModifiedKey(redisDb *db, robj *key) {
    touchWatchedKey(db,key);
}

/* 更新hash表中相應(yīng)客戶端的狀態(tài)為CLIENT_DIRTY_CAS */
void touchWatchedKey(redisDb *db, robj *key) {
    list *clients;
    listIter li;
    listNode *ln;

    if (dictSize(db->watched_keys) == 0) return;
    clients = dictFetchValue(db->watched_keys, key);
    if (!clients) return;

    /* Mark all the clients watching this key as CLIENT_DIRTY_CAS */
    /* Check if we are already watching for this key */
    listRewind(clients,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);

        c->flags |= CLIENT_DIRTY_CAS;
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市尝丐,隨后出現(xiàn)的幾起案子显拜,更是在濱河造成了極大的恐慌,老刑警劉巖爹袁,帶你破解...
    沈念sama閱讀 222,807評(píng)論 6 518
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件远荠,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡失息,警方通過查閱死者的電腦和手機(jī)譬淳,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,284評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門档址,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人邻梆,你說(shuō)我怎么就攤上這事守伸。” “怎么了浦妄?”我有些...
    開封第一講書人閱讀 169,589評(píng)論 0 363
  • 文/不壞的土叔 我叫張陵尼摹,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我剂娄,道長(zhǎng)蠢涝,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 60,188評(píng)論 1 300
  • 正文 為了忘掉前任阅懦,我火速辦了婚禮和二,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘耳胎。我一直安慰自己惯吕,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,185評(píng)論 6 398
  • 文/花漫 我一把揭開白布怕午。 她就那樣靜靜地躺著废登,像睡著了一般。 火紅的嫁衣襯著肌膚如雪诗轻。 梳的紋絲不亂的頭發(fā)上钳宪,一...
    開封第一講書人閱讀 52,785評(píng)論 1 314
  • 那天,我揣著相機(jī)與錄音扳炬,去河邊找鬼吏颖。 笑死,一個(gè)胖子當(dāng)著我的面吹牛恨樟,可吹牛的內(nèi)容都是我干的半醉。 我是一名探鬼主播,決...
    沈念sama閱讀 41,220評(píng)論 3 423
  • 文/蒼蘭香墨 我猛地睜開眼劝术,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼缩多!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起养晋,我...
    開封第一講書人閱讀 40,167評(píng)論 0 277
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤衬吆,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后绳泉,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體逊抡,經(jīng)...
    沈念sama閱讀 46,698評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,767評(píng)論 3 343
  • 正文 我和宋清朗相戀三年零酪,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了冒嫡。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片拇勃。...
    茶點(diǎn)故事閱讀 40,912評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖孝凌,靈堂內(nèi)的尸體忽然破棺而出方咆,到底是詐尸還是另有隱情,我是刑警寧澤蟀架,帶...
    沈念sama閱讀 36,572評(píng)論 5 351
  • 正文 年R本政府宣布瓣赂,位于F島的核電站,受9級(jí)特大地震影響片拍,放射性物質(zhì)發(fā)生泄漏钩述。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,254評(píng)論 3 336
  • 文/蒙蒙 一穆碎、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧职恳,春花似錦所禀、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,746評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至操禀,卻和暖如春褂策,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背颓屑。 一陣腳步聲響...
    開封第一講書人閱讀 33,859評(píng)論 1 274
  • 我被黑心中介騙來(lái)泰國(guó)打工斤寂, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人揪惦。 一個(gè)月前我還...
    沈念sama閱讀 49,359評(píng)論 3 379
  • 正文 我出身青樓遍搞,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親器腋。 傳聞我的和親對(duì)象是個(gè)殘疾皇子溪猿,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,922評(píng)論 2 361

推薦閱讀更多精彩內(nèi)容

  • Redis 通過 MULTI 、 DISCARD 纫塌、 EXEC 和 WATCH 四個(gè)命令來(lái)實(shí)現(xiàn)事務(wù)功能诊县, 本章首先...
    binge1024閱讀 520評(píng)論 0 2
  • 超強(qiáng)、超詳細(xì)Redis入門教程 轉(zhuǎn)載2017年03月04日 16:20:02 16916 轉(zhuǎn)載自: http://...
    邵云濤閱讀 17,455評(píng)論 3 313
  • 分布式緩存技術(shù)PK:選擇Redis還是Memcached? 經(jīng)平臺(tái)同意授權(quán)轉(zhuǎn)載 作者:田京昆(騰訊后臺(tái)研發(fā)工程師)...
    meng_philip123閱讀 68,933評(píng)論 7 60
  • 文章已經(jīng)放到github上 媳荒,如果對(duì)您有幫助 請(qǐng)給個(gè)star[https://github.com/qqxuanl...
    尼爾君閱讀 2,287評(píng)論 0 22
  • 卡耐基金言: 天底下只有一種方法可以促使他人去做任何事––給他想要的東西抗悍。 在你每天的生活之旅中驹饺,別忘了為人間留下...
    千言萬(wàn)語(yǔ)娟娟閱讀 329評(píng)論 0 1