開篇
?本著尊重原創(chuàng)作者精神蚁署,我必須表明這個實現(xiàn)是由組內(nèi)的同事蜂神實現(xiàn)的便脊,我知識站在前人的肩膀上學(xué)習(xí)了下具體的實現(xiàn)!9飧辍哪痰!
?致敬原作者的牛逼技術(shù)K煸!晌杰!
PalDB限制
- PalDB is optimal in replacing the usage of large in-memory data storage but still use memory (off-heap, yet much less) to do its job. Disabling memory mapping and relying on seeks is possible but is not what PalDB has been optimized for.
- The size of the index is limited to 2GB. There's no limitation in the data size however.PalDB的索引大小不超過2GB
- PalDB is not thread-safe at the moment so synchronization should be done externally if multi-threaded.PalDB是不線程安全的
PalDB線程安全代碼實現(xiàn)
?在java的世界里線程安全的實現(xiàn)方法無非是通過鎖或者通過ThreadLocal變量跷睦,加鎖會對性能有一定的損耗,而ThreadLocal相比之下性能損耗幾乎為零乎莉,所以在實現(xiàn)上采用的就是ThreadLocal關(guān)鍵字送讲。
代碼實現(xiàn)比較
?在實現(xiàn)過程中我們把公用的變量都用ThreadLocal修飾,這樣不同的線程讀取數(shù)據(jù)的時候就不會相互干擾惋啃,主要變量由:
- dataInputOutput 讀取數(shù)據(jù)保存的緩沖區(qū)
- storage PalDB讀取實現(xiàn)類
- serialization PalDB自定義實現(xiàn)類的保存變量
//線程不安全版本的ReaderImpl實現(xiàn)類
public final class ReaderImpl implements StoreReader {
// Configuration
private final Configuration config;
// Buffer
private final DataInputOutput dataInputOutput = new DataInputOutput();
// Storage
private final StorageReader storage;
// Serialization
private final StorageSerialization serialization;
// Cache
private final StorageCache cache;
// File
private final File file;
// Opened?
private boolean opened;
//線程安全版本的ReaderImpl實現(xiàn)類
public final class ReaderImpl implements StoreReader {
// Configuration
private final Configuration config;
// Buffer
private final ThreadLocal<DataInputOutput> localDataInputOutput = new ThreadLocal<>();
// = new DataInputOutput();
// Storage
private final ThreadLocal<StorageReader> localStorage = new ThreadLocal<>();
// Serialization
private final ThreadLocal<StorageSerialization> localSerialization = new ThreadLocal<>();
private final Map<Thread, StorageReader> allStorageReader = new ConcurrentHashMap<Thread, StorageReader>();
線程不安全版本
?多線程并發(fā)讀取的時候會因為共同了storage造成讀取數(shù)據(jù)混亂哼鬓,所以線程安全版本就需要在這幾個會造成混亂的變量上做文章。包括:
- dataInputOutput 讀取數(shù)據(jù)保存的緩沖區(qū)
- storage PalDB讀取實現(xiàn)類
- serialization PalDB自定義實現(xiàn)類的保存變量
public <K> K get(Object key, K defaultValue) {
checkOpen();
if (key == null) {
throw new NullPointerException("The key can't be null");
}
K value = cache.get(key);
if (value == null) {
try {
byte[] valueBytes = storage.get(serialization.serializeKey(key));
if (valueBytes != null) {
Object v = serialization.deserialize(dataInputOutput.reset(valueBytes));
cache.put(key, v);
return (K) v;
} else {
return defaultValue;
}
} catch (Exception ex) {
throw new RuntimeException(ex);
}
} else if (value == StorageCache.NULL_VALUE) {
return null;
}
return value;
}
線程安全版本
?每次都從本線程的ThreadLocal變量中獲取StorageReader讀取數(shù)據(jù)边灭,然后反序列化到ThreadLocal變量中保存的localDataInputOutput存儲序列化的數(shù)據(jù)异希。保證了變量線程的安全。
public <K> K get(Object key, K defaultValue) {
checkOpen();
if (key == null) {
throw new NullPointerException("The key can't be null");
}
K value = cache.get(key);
if (value == null) {
try {
byte[] valueBytes = localStorage.get().get(localSerialization.get().serializeKey(key));
if (valueBytes != null) {
Object v = localSerialization.get().deserialize(localDataInputOutput.get().reset(valueBytes));
cache.put(key, v);
return (K) v;
} else {
return defaultValue;
}
} catch (Exception ex) {
throw new RuntimeException(ex);
}
} else if (value == StorageCache.NULL_VALUE) {
return null;
}
return value;
}
代碼實現(xiàn)核心
?在PalDB的api當(dāng)中所有的get/put動作都會有checkOpen方法調(diào)用绒瘦,所以在這個方法內(nèi)部我們把線程不安全的變量都進(jìn)行了拷貝并保存到ThreadLocal當(dāng)中称簿,這是整個實現(xiàn)的核心精華!6杳薄憨降!感謝原作者牛逼的設(shè)計!8眯铩授药!
private void checkOpen() {
if (!opened) {
throw new IllegalStateException("The store is closed");
}
StorageReader storage = localStorage.get();
if (null == storage || storage.isClosed()) {
storage = mainStorage.duplicate();
localStorage.set(storage);
allStorageReader.put(Thread.currentThread(), storage);
}
if (null == localDataInputOutput.get()) {
localDataInputOutput.set(new DataInputOutput());
}
if (null == localSerialization.get()) {
localSerialization.set(new StorageSerialization(this.config));
}
}