Get
SuperVersion* sv = GetAndRefSuperVersion丐吓;
SequenceNumber snapshot;
//獲取snapshot (目前最大的sequence)
...
bool done = false;
if (!skip_memtable) {
// Get value associated with key
if (get_impl_options.get_value) {
//查詢memtable
if (sv->mem->Get(lkey, get_impl_options.value->GetSelf(), &s,
&merge_context, &max_covering_tombstone_seq,
read_options, get_impl_options.callback,
get_impl_options.is_blob_index)) {
done = true;
get_impl_options.value->PinSelf();
RecordTick(stats_, MEMTABLE_HIT);
} else if ((s.ok() || s.IsMergeInProgress()) &&
sv->imm->Get(lkey, get_impl_options.value->GetSelf(), &s,
&merge_context, &max_covering_tombstone_seq,
read_options, get_impl_options.callback,
get_impl_options.is_blob_index)) {
done = true;
get_impl_options.value->PinSelf();
RecordTick(stats_, MEMTABLE_HIT);
}
}
...
memtable get
存在memtable里的key是key+(type and sequence)其中type and seq混合8字節(jié)
//先查詢bloom filter
if (bloom_filter_) {
// when both memtable_whole_key_filtering and prefix_extractor_ are set,
// only do whole key filtering for Get() to save CPU
if (moptions_.memtable_whole_key_filtering) {
may_contain =
bloom_filter_->MayContain(StripTimestampFromUserKey(user_key, ts_sz));
} else {
assert(prefix_extractor_);
may_contain =
!prefix_extractor_->InDomain(user_key) ||
bloom_filter_->MayContain(prefix_extractor_->Transform(user_key));
}
}
//從memtable里拿
GetFromTable(key, *max_covering_tombstone_seq, do_merge, callback,
is_blob_index, value, s, merge_context, seq,
&found_final_value, &merge_in_progress);
void MemTable::GetFromTable(...){
//構(gòu)建saver 和回調(diào)
Saver saver;
saver.status = s;
saver.found_final_value = found_final_value;
saver.merge_in_progress = merge_in_progress;
saver.key = &key;
saver.value = value;
saver.seq = kMaxSequenceNumber;
saver.mem = this;
saver.merge_context = merge_context;
saver.max_covering_tombstone_seq = max_covering_tombstone_seq;
saver.merge_operator = moptions_.merge_operator;
saver.logger = moptions_.info_log;
saver.inplace_update_support = moptions_.inplace_update_support;
saver.statistics = moptions_.statistics;
saver.env_ = env_;
saver.callback_ = callback;
saver.is_blob_index = is_blob_index;
saver.do_merge = do_merge;
//執(zhí)行查找
table_->Get(key, &saver, SaveValue);
}
void MemTableRep::Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg, const char* entry)) {
auto iter = GetDynamicPrefixIterator();
//從skiplist里查找
for (iter->Seek(k.internal_key(), k.memtable_key().data());
iter->Valid() && callback_func(callback_args, iter->key());
iter->Next()) {
}
}
inline void InlineSkipList<Comparator>::Iterator::Seek(const char* target) {
//尋找key和sequence符合條件的 node
//返回小于我們seq的值
//Returns the earliest node with a key >= key.
// Return nullptr if there is no such node.
//key在skip list里從大到小排
//所以查找會找到key >= 我們需要的key
//如果key相等,會按照seq降序排,所以順序過去一定是seq小于等與我們需要的seq
node_ = list_->FindGreaterOrEqual(target);
}
static bool SaveValue(void* arg, const char* entry) {
Saver* s = reinterpret_cast<Saver*>(arg);
assert(s != nullptr);
MergeContext* merge_context = s->merge_context;
SequenceNumber max_covering_tombstone_seq = s->max_covering_tombstone_seq;
const MergeOperator* merge_operator = s->merge_operator;
assert(merge_context != nullptr);
// entry format is:
// klength varint32
// userkey char[klength-8]
// tag uint64
// vlength varint32f
// value char[vlength]
// Check that it belongs to same user key. We do not check the
// sequence number since the Seek() call above should have skipped
// all entries with overly large sequence numbers.
uint32_t key_length;
const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
Slice user_key_slice = Slice(key_ptr, key_length - 8);
//這里因為seek可能找到key大于我們需要的key,此時需要比較一下,如果不想等則直接跳過
if (s->mem->GetInternalKeyComparator()
.user_comparator()
->CompareWithoutTimestamp(user_key_slice, s->key->user_key()) == 0) {
// Correct user key
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
ValueType type;
SequenceNumber seq;
UnPackSequenceAndType(tag, &seq, &type);
// If the value is not in the snapshot, skip it
if (!s->CheckCallback(seq)) {
return true; // to continue to the next seq
}
s->seq = seq;
if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex) &&
max_covering_tombstone_seq > seq) {
type = kTypeRangeDeletion;
}
switch (type) {
...
//根據(jù)type處理key value
}
// s->state could be Corrupt, merge or notfound
return false;
}
ThreadLocalSuperVersion
Rocksdb利用線程局部緩存和atomic來替換掉原先leveldb的version加鎖的邏輯
//在讀之前需要獲得新的superversion(最新的versionset)
SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(DBImpl* db) {
//通過swap獲得當(dāng)前的superversion(每個線程都用InUse對象替換tls對象)
//如果沒有寫概龄,那么在執(zhí)行ReturnThreadLocalSuperVersion前,tls都保持inuse對象
void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
// Invariant:
// (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
// (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
// should only keep kSVInUse before ReturnThreadLocalSuperVersion call
// (if no Scrape happens).
assert(ptr != SuperVersion::kSVInUse);
SuperVersion* sv = static_cast<SuperVersion*>(ptr);
//如果剛獲取完superversion,就發(fā)現(xiàn)已經(jīng)過期了飞醉。那就把這個給刪了,直接通過加鎖獲取當(dāng)前最新的super version
if (sv == SuperVersion::kSVObsolete ||
sv->version_number != super_version_number_.load()) {
RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES);
SuperVersion* sv_to_delete = nullptr;
if (sv && sv->Unref()) {
RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
db->mutex()->Lock();
// NOTE: underlying resources held by superversion (sst files) might
// not be released until the next background job.
sv->Cleanup();
if (db->immutable_db_options().avoid_unnecessary_blocking_io) {
db->AddSuperVersionsToFreeQueue(sv);
db->SchedulePurge();
} else {
sv_to_delete = sv;
}
} else {
db->mutex()->Lock();
}
//這里一定要加鎖屯阀,防止在被后臺線程作出變更缅帘,并獲取當(dāng)前的全局super_version
sv = super_version_->Ref();
db->mutex()->Unlock();
delete sv_to_delete;
}
assert(sv != nullptr);
return sv;
}
ReturnAndCleanupSuperVersion
void DBImpl::ReturnAndCleanupSuperVersion(ColumnFamilyData* cfd,
SuperVersion* sv) {
if (!cfd->ReturnThreadLocalSuperVersion(sv)) {
//將當(dāng)前的superversion反還給tls,如果此時cas發(fā)現(xiàn)換不回去难衰,則說明已經(jīng)被變更了(寫線程修改了所有線程的tls為nullptr)
//清除掉當(dāng)前保留的舊版本superversion
CleanupSuperVersion(sv);
}
}
bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
assert(sv != nullptr);
// Put the SuperVersion back
void* expected = SuperVersion::kSVInUse;
if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
// When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
// storage has not been altered and no Scrape has happened. The
// SuperVersion is still current.
return true;
} else {
// ThreadLocal scrape happened in the process of this GetImpl call (after
// thread local Swap() at the beginning and before CompareAndSwap()).
// This means the SuperVersion it holds is obsolete.
assert(expected == SuperVersion::kSVObsolete);
}
return false;
}
InstallSuperVersion
InstallSuperVersionAndScheduleWork->
void ColumnFamilyData::InstallSuperVersion(
SuperVersionContext* sv_context, InstrumentedMutex* db_mutex,
const MutableCFOptions& mutable_cf_options) {
//外部加鎖了
SuperVersion* new_superversion = sv_context->new_superversion.release();
new_superversion->db_mutex = db_mutex;
new_superversion->mutable_cf_options = mutable_cf_options;
new_superversion->Init(mem_, imm_.current(), current_);
SuperVersion* old_superversion = super_version_;
//設(shè)置新的suerversion
super_version_ = new_superversion;
++super_version_number_;
super_version_->version_number = super_version_number_;
super_version_->write_stall_condition =
RecalculateWriteStallConditions(mutable_cf_options);
if (old_superversion != nullptr) {
// Reset SuperVersions cached in thread local storage.
// This should be done before old_superversion->Unref(). That's to ensure
// that local_sv_ never holds the last reference to SuperVersion, since
// it has no means to safely do SuperVersion cleanup.
//將其他線程的tls設(shè)置為nullptr(SuperVersion::kSVObsolete)
//這個在old_superversion->Unref()之前調(diào)用钦无,這樣local_sv就不會是最后一個superversion的引用
ResetThreadLocalSuperVersions();
if (old_superversion->mutable_cf_options.write_buffer_size !=
mutable_cf_options.write_buffer_size) {
mem_->UpdateWriteBufferSize(mutable_cf_options.write_buffer_size);
}
if (old_superversion->write_stall_condition !=
new_superversion->write_stall_condition) {
sv_context->PushWriteStallNotification(
old_superversion->write_stall_condition,
new_superversion->write_stall_condition, GetName(), ioptions());
}
//如果這是最后一個對old_superversion的引用,那么就將其清除掉
if (old_superversion->Unref()) {
old_superversion->Cleanup();
sv_context->superversions_to_free.push_back(old_superversion);
}
}
}
rocksdb對leveldb的讀優(yōu)化
Mutex用時也是Atomic的3倍盖袭。
rocksdb就是將leveldb里Get()實現(xiàn)中一上來就mutex加鎖的操作換成atmoic+線程私有存儲的方式來進行優(yōu)化失暂,優(yōu)化后讀操作基本很少再會有互斥彼宠,性能提高不少