PS:簡(jiǎn)書(shū)的markdown與筆記格式不太一樣,原文請(qǐng)戳這里
一语淘、鎖的實(shí)現(xiàn)方式
涉及的類:
Locker -> AbstractLocker -> ConsistentKeyLocker
LocalLockMediator
文檔中對(duì)這個(gè)接口的解釋:
該接口代表線程安全的任意的鎖,可能是在JanusGraph進(jìn)程中的籽前,也可能是在多個(gè)進(jìn)程之間的种冬。Lock本身就是一個(gè)確認(rèn)了的KeyColumn的實(shí)例。如果兩個(gè)KeyColumn使用了同一個(gè)Lock粗井,則這兩個(gè)KeyColumn調(diào)用equals方法應(yīng)該返回true尔破。
獲取鎖的線程是通過(guò)StoreTransaction來(lái)確認(rèn)的。
該接口使用三步鎖模型來(lái)獲取鎖浇衬,支持阻塞鎖和非阻塞鎖懒构。不過(guò)不是所有的StoreTransaction都需要鎖并使用這個(gè)接口。如果StoreTransaction需要一個(gè)或者多個(gè)鎖耘擂,JanusGraph會(huì)按照以下順序調(diào)用接口中的方法(tx代表每一步中的同一個(gè)StoreTransaction):
writeLock(kc, tx) 使用tx事務(wù)對(duì)象獲取對(duì)某個(gè)KeyColumn的鎖胆剧。每個(gè)KeyColumn對(duì)象都調(diào)用一次或多次。
checkLocks(tx) 檢查當(dāng)前的事務(wù)之前調(diào)用的writeLock方法實(shí)際上有沒(méi)有成功醉冤。如果用戶提交事務(wù)或者用戶放棄了這個(gè)事務(wù)時(shí),會(huì)調(diào)用一次。
deleteLocks(tx) 釋放當(dāng)前事務(wù)所持有的鎖峰鄙。不論checkLocks方法有沒(méi)有調(diào)用茎杂,這個(gè)方法都會(huì)調(diào)用一次。
Doc中的解釋:
構(gòu)建Locker的抽象基礎(chǔ)類。實(shí)現(xiàn)了線程之間的鎖(使用LocakLockMediator)。進(jìn)程之間的鎖由子類來(lái)實(shí)現(xiàn)。
AbstractLocker中實(shí)現(xiàn)了Locker接口中的方法移宅。針對(duì)這些實(shí)現(xiàn)的方法,又添加了幾個(gè)新的方法由子類實(shí)現(xiàn)(例如椿疗,Locker中是writeLock方法漏峰,AbstractLocker類中,又添加了writeSingleLock方法届榄,此方法在writeLock中被調(diào)用)浅乔。而實(shí)現(xiàn)的Locker接口中的方法都需要調(diào)用新定義的這幾個(gè)方法。
publicabstractclassAbstractLocker<SextendsLockStatus>implementsLocker{abstractSwriteSingleLock(KeyColumnlockID,StoreTransactiontx);abstractvoid checkSingleLock(KeyColumnlockID,SlockStatus,StoreTransactiontx);abstractvoid deleteSingleLock(KeyColumnlockID,SlockStatus,StoreTransactiontx);}
ConsistentKeyLocker實(shí)現(xiàn)了AbstractLocker中定義的方法铝条。
ConsistentKeyLocker是一個(gè)全局的鎖靖苇,通過(guò)AbstractLocker中的方法實(shí)現(xiàn)了線程之間的鎖的競(jìng)爭(zhēng)。并通過(guò)使用KeyColumnValueStore來(lái)進(jìn)行數(shù)據(jù)的讀和寫(xiě)實(shí)現(xiàn)了進(jìn)程之間的鎖的競(jìng)爭(zhēng)班缰。
加鎖的過(guò)程如下:
加鎖是通過(guò)兩個(gè)階段來(lái)完成的:首先在進(jìn)程之間的線程中獲取鎖贤壁,然后再JanusGraph Cluster的多個(gè)進(jìn)程之間來(lái)獲取鎖。
階段一:線程之間的鎖的競(jìng)爭(zhēng)
在一個(gè)共享的進(jìn)程中埠忘,線程的鎖的競(jìng)爭(zhēng)是由LocalLockMeditator來(lái)“仲裁(原文是arbitrated)”的脾拆。這個(gè)仲裁者使用concurrent包中的類確保對(duì)于任何一個(gè)KeyColumn,在任何時(shí)間段內(nèi)最多只有一個(gè)線程持有他的鎖莹妒。這段代碼是在AbstractLocker中名船。
階段二:進(jìn)程之間的鎖的競(jìng)爭(zhēng)
如果在階段一中,meditator發(fā)出信號(hào)表示當(dāng)前線程持有了進(jìn)程內(nèi)的鎖旨怠,則接下來(lái)會(huì)通過(guò)讀和寫(xiě)KeyColumnValueStore來(lái)檢查是否當(dāng)前進(jìn)程是唯一持有該鎖的進(jìn)程渠驼。在Cassandra或者HBase中,都有專用的store來(lái)專門存儲(chǔ)鎖數(shù)據(jù)(BigTable模型中鉴腻,store一般表示column family)迷扇。
進(jìn)程之間競(jìng)爭(zhēng)鎖所涉及的IO操作:
寫(xiě)一個(gè)column來(lái)存儲(chǔ)key(KeyColumn.getKey()+KeyColumn.getValue()), column(大致的時(shí)間戳+進(jìn)程的rid)和value(0的byte表示)。
如果寫(xiě)失敗了或者超過(guò)了LockWait的實(shí)現(xiàn)爽哎,就重試寫(xiě)操作谋梭,帶一個(gè)修改的時(shí)間戳,直到超出了配置的重試次數(shù)(獲取失斁肭唷)或者在LockWait的時(shí)間內(nèi)完成了寫(xiě)操作(但寫(xiě)操作成功了不代表獲取鎖成功了)。
如果需要的話盹舞,執(zhí)行等待产镐,直到lockWait的時(shí)間耗盡隘庄,或者寫(xiě)成功。
獲取這個(gè)key的所有的column癣亚。
擦除掉timestamp已經(jīng)過(guò)了lockExpire的記錄丑掺。
如果我們的column是第一個(gè)讀的column,或者只有當(dāng)前的rid持有該column(階段一的meditator保證了一個(gè)KeyColumn在一個(gè)進(jìn)程中只有一個(gè)線程能獲取到鎖)述雾,則獲取鎖成功街州。否則失敗。
釋放鎖時(shí)玻孟,刪除掉記錄中的該條column(注意不是整個(gè)key都刪掉)唆缴。
結(jié)合代碼:
publicvoidwriteLock(KeyColumn lockID, StoreTransaction tx)throwsTemporaryLockingException, PermanentLockingException{? ? .....//調(diào)用meditator先贏得線程之間的鎖的競(jìng)爭(zhēng)。if(lockLocally(lockID, tx)){booleanok =false;try{//往HBase中寫(xiě)一條ColumnS stat = writeSingleLock(lockID, tx);//修改local lock的過(guò)期時(shí)間lockLocally(lockID, stat.getExpirationTimestamp(), tx);// lockState是記錄鎖狀態(tài)的緩存lockState.take(tx, lockID, stat);? ? ? ? ? ? ok =true;? ? ? ? }catch(){//對(duì)異常的處理}finally{if(!ok){? ? ? ? ? ? ? ? unlockLocally(lockID, tx);? ? ? ? ? ? ? ? ....? ? ? ? ? ? }? ? ? ? }? ? }else{thrownewPermanentLockingException("Local lock contention");? ? }}
4. LocalLockMediator的實(shí)現(xiàn):
LocalLockMediator是用于在同一個(gè)JVM之間的事務(wù)競(jìng)爭(zhēng)鎖而使用的黍翎。其底層是基于一個(gè)ConcurrentHashMap的putIfAbsent面徽。putIfAbsent返回一個(gè)Map中的Value,這個(gè)value如果是null匣掸,代表ConcurrentHashMap中原本不存在這個(gè)key趟紊。但如果返回不是null,則放棄put操作碰酝。當(dāng)key不存在時(shí)霎匈,putIfAbsent成功了就是獲取鎖成功了。
在Backend類中铛嘱,有一個(gè)ConcurrentHashMap用于保存Locker:
publicclassBackendimplementsLockerProvider,AutoCloseable{? ? privatefinalFunction lockerCreator;? ? privatefinalConcurrentHashMap lockers =newConcurrentHashMap<>();@Overridepublic Locker getLocker(StringlockerName) {? ? ? ? Preconditions.checkNotNull(lockerName);? ? ? ? Locker l = lockers.get(lockerName);if(null== l) {? ? ? ? ? ? l = lockerCreator.apply(lockerName);finalLocker x = lockers.putIfAbsent(lockerName, l);if(null!= x) {? ? ? ? ? ? ? ? l = x;? ? ? ? ? ? }? ? ? ? }returnl;? ? }}
而B(niǎo)ackend中的getLocker方法最終所使用的地方則是ExpectedValueCheckingStore。
ExpectedValueChecking家族是指以ExpectedValueChecking開(kāi)頭的幾個(gè)類弄痹。分別包括:
ExpectedValueCheckingStore
ExpectedValueCheckingStoreManager
ExpectedValueCheckingTransaction
該類是KeyColumnValueStore的一個(gè)包裝類。KeyColumnValueStore是一個(gè)接口嵌器,該接口表示的是針對(duì)BigTable模型的數(shù)據(jù)操作接口肛真。該接口提供了讀和寫(xiě)數(shù)據(jù)的方法。
ExpectedValueCheckingStore主要是對(duì)KeyColumnValueStore中的mutate方法和acquireLock方法進(jìn)行了封裝爽航,其內(nèi)部有一個(gè)KeyColumnValueStore變量蚓让,在這兩個(gè)方法前后又加了一些邏輯,其余的方法都是直接調(diào)用KeyColumnValueStore的對(duì)應(yīng)方法讥珍。其最終仍然需要依賴一個(gè)內(nèi)部的KeyColumnValueStore變量历极。
這個(gè)類一般是跟ExpectedValueCheckingTransaction一起,為每個(gè)StoreTransaction跟蹤所有的傳入acquireLock方法中的expectedValue參數(shù)衷佃。當(dāng)事務(wù)調(diào)用mutate方法時(shí)趟卸,這些類會(huì)協(xié)同一起檢查所有之前提供的expected value是否匹配實(shí)際的值。如果不匹配拋出異常。
//對(duì)KeyColumnValueStore的基礎(chǔ)代理類锄列,所實(shí)現(xiàn)的繼承的方法都是直接調(diào)用其內(nèi)部的store變量來(lái)實(shí)現(xiàn)图云。publicclassKCVSProxyimplementsKeyColumnValueStore{? ? ? ? protectedfinal KeyColumnValueStore store;}publicclassExpectedValueCheckingStoreextendsKCVSProxy{finalLocker locker;//對(duì)象的初始化需要使用代理的KeyColumnValueStore,以及l(fā)ocker邻邮。publicExpectedValueCheckingStore(KeyColumnValueStore store, Locker locker){super(store);this.locker = locker;? ? }// (1)確認(rèn)事務(wù)txh之前調(diào)用acquireLock獲取鎖是否成功// (2)如果成功了就將additions和deletetions寫(xiě)到底層存儲(chǔ)的key中// (3) Deletions是在additions之前執(zhí)行的竣况。也就是說(shuō),如果某個(gè)column既有deletion也有addition筒严,會(huì)先刪除丹泉,然后添加。//如果實(shí)現(xiàn)類中不支持鎖鸭蛙,則跳過(guò)鎖的認(rèn)證階段摹恨,并執(zhí)行后來(lái)的階段。publicvoidmutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> deletions, StoreTransaction txh)throwsBackendException{? ? ? ? .....? ? }//試圖獲取key value對(duì)所聲明的鎖规惰。鎖是隨機(jī)分配的睬塌。//如果鎖獲取失敗了,可以拋出PermanentLockingException歇万。但這并不是強(qiáng)制的揩晴,如果獲取鎖失敗了,也可以不拋出異常贪磺。鎖的獲取只要在KeyColumnValueStore調(diào)用mutate方法時(shí)確認(rèn)獲取成功即可硫兰。// expectedValue必須匹配key value代表的publicvoidacquireLock(StaticBuffer key, StaticBuffer column, StaticBuffer expectedValue, StoreTransaction txh)throwsBackendException{? ? ? ? ....? ? }}
ExpectedValueCheckingTransaction
ExpectedValueCheckingTransaction是一個(gè)StoreTransaction的實(shí)現(xiàn)類。支持通過(guò)LocalLockMediator來(lái)獲取鎖寒锚,在ExpectedValueCheckingSotre中用于讀和寫(xiě)鎖記錄劫映。
其父類StoreTransaction代表一個(gè)事務(wù)的把手,用于唯一標(biāo)識(shí)后端存儲(chǔ)的一個(gè)事務(wù)刹前。所有對(duì)后端的修改操作必須有一個(gè)單個(gè)的事務(wù)作為context泳赋。這樣的事務(wù)能被JanusGraph中間件識(shí)別出來(lái)就是通過(guò)StoreTransaction。圖的事務(wù)依賴于底層存儲(chǔ)的事務(wù)喇喉。
要注意StoreTransaction本身不提供任何的隔離以及一致性的保證祖今。如果對(duì)應(yīng)的后端支持的話,Graph Transaction必須自己去實(shí)現(xiàn)拣技。
在這個(gè)類中千诬,有一個(gè)強(qiáng)一致的事務(wù)和一個(gè)不一致的事務(wù)。
publicclassExpectedValueCheckingTransactionimplementsStoreTransaction{//在事務(wù)鎖的階段一致是false膏斤。事務(wù)中調(diào)用mutate或者mutateMany方法開(kāi)始時(shí)設(shè)置為true徐绑。privatebooleanisMutationStarted;//用于對(duì)鎖相關(guān)的元數(shù)據(jù)進(jìn)行讀和寫(xiě)的事務(wù)。privatefinalStoreTransaction strongConsistentTx;//用于讀和寫(xiě)客戶端數(shù)據(jù)的事務(wù)莫辨。不保證強(qiáng)一致性傲茄。privatefinalStoreTransaction inconsistentTx;}
Backend中毅访,關(guān)鍵的元數(shù)據(jù)的初始化如下,以HBase為例:
publicclassBackendimplementsLockerProvider,AutoCloseable{//HBaseStoreManager烫幕。Backend中有多個(gè)KeyColumnValueStore的初始化俺抽,是通過(guò)該變量來(lái)實(shí)現(xiàn)的。這些變量各自負(fù)責(zé)hbase的一個(gè)column family的操作(所以较曼,在mutate或者mutateMany方法中,是沒(méi)有column family的)振愿。privatefinalKeyColumnValueStoreManager storeManager;//ExpectedKeyColumnValueStoreManagerprivatefinalKeyColumnValueStoreManager storeManagerLocking;privatefinalMap stores;privatefinalConcurrentHashMap lockers =newConcurrentHashMap<>();//構(gòu)造方法捷犹。構(gòu)造方法調(diào)用完之后調(diào)用initialize方法。publicBackend(Configuration config){? ? ? ? storeManager = ...;? ? ? ? storeManagerLocking =newExpectedValueCheckingStoreManager(storeManager,LOCK_STORE_SUFFIX,this,maxReadTime);? ? ? ? ? ? }publicvoidinitialize(){//使用ExpectedValueCheckingStoreManager來(lái)創(chuàng)建ExpectedValueCheckingStore冕末。創(chuàng)建的對(duì)象包括://IDAuthority idAuthority;//KCVSCache edgeStore//KCVSCache indexStore//KCVSCache txLogStore//KCVSConfiguration systemConfig//KCVSConfiguration userConfig}publicLockergetLocker(String lockerName){//創(chuàng)建ConsistentKeyLocker萍歉。}}
這里的設(shè)計(jì)完美的提現(xiàn)了面向接口編程,代理模式档桃。
KeyColumnValueStore的實(shí)現(xiàn)類:HbaseKeyColumnValueStore ExpectedValueCheckingStore其中使用了HBaseKeyColumnValueStore枪孩,作為代理,添加了調(diào)用writeLock的方法保證了進(jìn)程之間只有一個(gè)線程能獲取KeyColumn的鎖藻肄。
KeyColumnValueStoreManager用于創(chuàng)建KeyColumnValueStore(openDatebase)方法蔑舞。各個(gè)不同的存儲(chǔ)后端都需要實(shí)現(xiàn)mutateMany方法用于將緩存中的內(nèi)容持久化到DB中。其實(shí)現(xiàn)類是HBaseStoreManager嘹屯。
在Backend創(chuàng)建systemConfig和userConfig時(shí)攻询,會(huì)創(chuàng)建ExpectedValueCheckingStoreTransaction(通過(guò)ExpectedValueCheckingStoreManager.beginTransaction方法)。ExpectedValueCheckingStoreTransaction中使用的兩個(gè)StoreTransaction最終都是HBaseTransaction類州弟。至少?gòu)腍Base的角度來(lái)講钧栖,這里兩個(gè)變量的設(shè)計(jì)似乎略顯多余,因?yàn)镠Base不是強(qiáng)一致的婆翔,由于CAP定理的限制拯杠,HBase采用的是最終一致性。
關(guān)于ExpectedValueCheckingTransaction如何檢查expectedValue的啃奴,可以查看checkSingleExpectedValueUnsafe這個(gè)私有方法潭陪。
ExpectedValueCheckingStore在執(zhí)行mutate方法時(shí),會(huì)調(diào)用該檢查纺腊。
```
@Override
public void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> deletions, StoreTransaction txh) throws BackendException {
? ? ExpectedValueCheckingTransaction etx = (ExpectedValueCheckingTransaction)txh;
? ? // 這一步會(huì)對(duì)expectedValue做檢查畔咧。如果檢查失敗會(huì)拋異常。
? ? boolean hasAtLeastOneLock = etx.prepareForMutations();
? ? if (hasAtLeastOneLock) {
? ? ? ? // Force all mutations on this transaction to use strong consistency
? ? ? ? store.mutate(key, additions, deletions, getConsistentTx(txh));
? ? } else {
? ? ? ? store.mutate(key, additions, deletions, unwrapTx(txh));
? ? }
}
```