1 為什么需要Merge Operator
RocksDB是一個高性能嵌入式持久化key-value存儲引擎,提供了常規(guī)的Put酒贬,Get,Delete接口竟块。還有一個常見的用法是耐齐,更新一個已經存在的key對應的value埠况。用戶需要做三件事:
- 調用Get接口棵癣,獲取value的值
- 修改它
- 調用Put接口將它寫回數據庫
假設需要維護一組counter喜命,每個counter都有不同的名字壁榕,需要為counter實現Set牌里、Add牡辽、Get和Remove操作敞临。
接口定義如下:
class Counters {
public:
// (re)set the value of a named counter
virtual void Set(const string& key, uint64_t value);
// remove the named counter
virtual void Remove(const string& key);
// retrieve the current value of the named counter, return false if not found
virtual bool Get(const string& key, uint64_t *value);
// increase the named counter by value.
// if the counter does not exist, treat it as if the counter was initialized to zero
virtual void Add(const string& key, uint64_t value);
};
對于Add接口的實現挺尿,可能是這樣的
// implemented as get -> modify -> set
virtual void Add(const string& key, uint64_t value) {
uint64_t base;
if (!Get(key, &base)) {
base = kDefaultValue;
}
Set(key, base + value);
}
考慮到Get這種隨機讀操作相對還是比較慢的攀涵,如果RocksDB可以提供Add接口洽沟,那么可以這樣調用
virtual void Add(const string& key, uint64_t value) {
string serialized = Serialize(value);
db->Add(add_option, key, serialized);
}
這看起來很合理照筑,但是只適用于counter的場景烙懦。存在RocksDB中的數據并不都是counter,比如有可能是一個鏈表保存用戶的位置信息昆烁,我們希望有一個Append接口將用戶新的位置信息追加在鏈表后面。所以静尼,這類更新操作的語義,依賴于用戶的數據類型鼠渺。RockDB抽象了這個更新操作鸭巴,讓用戶可以自定義更新語義,在RocksDB中拦盹,叫做Merge
。
2 Merge
Merge接口提供了下面的語義
- 封裝了read - modify - write語義普舆,對外統一提供簡單的抽象接口
- 減少用戶重復觸發(fā)Get操作引入的性能損耗
- 通過決定合并操作的時間和方式沼侣,來優(yōu)化后端性能芯杀,并達到并不改變底層更新的語義
- 漸進式的更新,來均攤更新帶來帶來的性能損耗揭厚,以得到漸進式的性能提升筛圆。(Can, in some cases, amortize the cost over all incremental updates to provide asymptotic increases in efficiency.)
3 Merge接口的使用
MergeOperator
MergeOperator定義了幾個方法來告訴RocksDB應該如何在已有的數據上做增量更新。這些方法(PartialMerge椿浓、FullMerge)可以組成新的merge操作太援。
RocksDB提供了接口AssociativeMergeOperator,這個接口封裝了partial merge的實現細節(jié)扳碍,可以滿足大部分場景的需要(數據類型是關聯的)提岔。
比AssociativeMergeOperator功能更多的是Generic MergeOperator,后面將會介紹它的應用場景笋敞。
AssociativeMergeOperator的接口聲明
// The simpler, associative merge operator.
class AssociativeMergeOperator : public MergeOperator {
public:
virtual ~AssociativeMergeOperator() {}
// Gives the client a way to express the read -> modify -> write semantics
// key: (IN) The key that's associated with this merge operation.
// existing_value:(IN) null indicates the key does not exist before this op
// value: (IN) the value to update/merge the existing_value with
// new_value: (OUT) Client is responsible for filling the merge result
// here. The string that new_value is pointing to will be empty.
// logger: (IN) Client could use this to log errors during merge.
//
// Return true on success.
// All values passed in will be client-specific values. So if this method
// returns false, it is because client specified bad data or there was
// internal corruption. The client should assume that this will be treated
// as an error by the library.
virtual bool Merge(const Slice& key,
const Slice* existing_value,
const Slice& value,
std::string* new_value,
Logger* logger) const = 0;
private:
// Default implementations of the MergeOperator functions
virtual bool FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const override;
virtual bool PartialMerge(const Slice& key,
const Slice& left_operand,
const Slice& right_operand,
std::string* new_value,
Logger* logger) const override;
};
自定義MergeOperator
用戶需要定義一個子類碱蒙,繼承AssociativeMergeOperator或者MergeOperator基類,重載用到的接口。
RocksDB持有一個MergeOperator類型的成員變量赛惩,并提供了Merge接口哀墓。用戶將自定義的MergeOperator子類賦值給DB對應的成員變量,這樣RocksDB可以調用用戶定義的Merge方法喷兼,達到用戶定義merge語義的目的篮绰。
// In addition to Get(), Put(), and Delete(), the DB class now also has an additional method: Merge().
class DB {
...
// Merge the database entry for "key" with "value". Returns OK on success,
// and a non-OK status on error. The semantics of this operation is
// determined by the user provided merge_operator when opening DB.
// Returns Status::NotSupported if DB does not have a merge_operator.
virtual Status Merge(
const WriteOptions& options,
const Slice& key,
const Slice& value) = 0;
...
};
Struct Options {
...
// REQUIRES: The client must provide a merge operator if Merge operation
// needs to be accessed. Calling Merge on a DB without a merge operator
// would result in Status::NotSupported. The client must ensure that the
// merge operator supplied here has the same name and *exactly* the same
// semantics as the merge operator provided to previous open calls on
// the same DB. The only exception is reserved for upgrade, where a DB
// previously without a merge operator is introduced to Merge operation
// for the first time. It's necessary to specify a merge operator when
// opening the DB in this case.
// Default: nullptr
const std::shared_ptr<MergeOperator> merge_operator;
...
};
自定義MergeOperator并使用的一個例子
// A 'model' merge operator with uint64 addition semantics
class UInt64AddOperator : public AssociativeMergeOperator {
public:
virtual bool Merge(
const Slice& key,
const Slice* existing_value,
const Slice& value,
std::string* new_value,
Logger* logger) const override {
// assuming 0 if no existing value
uint64_t existing = 0;
if (existing_value) {
if (!Deserialize(*existing_value, &existing)) {
// if existing_value is corrupted, treat it as 0
Log(logger, "existing value corruption");
existing = 0;
}
}
uint64_t oper;
if (!Deserialize(value, &oper)) {
// if operand is corrupted, treat it as 0
Log(logger, "operand value corruption");
oper = 0;
}
auto new = existing + oper;
*new_value = Serialize(new);
return true; // always return true for this, since we treat all errors as "zero".
}
virtual const char* Name() const override {
return "UInt64AddOperator";
}
};
// Implement 'add' directly with the new Merge operation
class MergeBasedCounters : public RocksCounters {
public:
MergeBasedCounters(std::shared_ptr<DB> db);
// mapped to a leveldb Merge operation
virtual void Add(const string& key, uint64_t value) override {
string serialized = Serialize(value);
db_->Merge(merge_option_, key, serialized);
}
};
// How to use it
DB* dbp;
Options options;
options.merge_operator.reset(new UInt64AddOperator);
DB::Open(options, "/tmp/db", &dbp);
std::shared_ptr<DB> db(dbp);
MergeBasedCounters counters(db);
counters.Add("a", 1);
...
uint64_t v;
counters.Get("a", &v);
Generic MergeOperator
關聯性和非關聯性
前面有提到過, 使用AssociativeMergeOperator的一個前提是:數據類型的關聯性,即:
- 調用Put接口寫入RocksDB的數據的格式和Merge接口是相同的
- 使用用戶自定義的merge操作季惯,可以將多個merge操作數合并成一個
例如上面的Counter的例子吠各,調用Set接口,RocksDB將data保存為序列化的8字節(jié)整數勉抓。調用Add接口走孽,data也是8字節(jié)整數。
MergeOperator還可以用于非關聯型數據類型的更新琳状。
例如,在RocksDB中保存json字符串盒齿,即Put接口寫入data的格式為合法的json字符串念逞。而Merge接口只希望更新json中的某個字段。所以代碼可能是這樣:
...
// Put/store the json string into to the database
db_->Put(put_option_, "json_obj_key",
"{ employees: [ {first_name: john, last_name: doe}, {first_name: adam, last_name: smith}] }");
...
// Use a pre-defined "merge operator" to incrementally update the value of the json string
db_->Merge(merge_option_, "json_obj_key", "employees[1].first_name = lucy");
db_->Merge(merge_option_, "json_obj_key", "employees[0].last_name = dow");
AssociativeMergeOperator無法處理這種場景边翁,因為它假設Put和Merge的數據格式是關聯的翎承。我們需要區(qū)分Put和Merge的數據格式,也無法把多個merge操作數合并成一個符匾。這時候就需要Generic MergeOperator叨咖。
Generic MergeOperator 接口
// The Merge Operator
//
// Essentially, a MergeOperator specifies the SEMANTICS of a merge, which only
// client knows. It could be numeric addition, list append, string
// concatenation, edit data structure, ... , anything.
// The library, on the other hand, is concerned with the exercise of this
// interface, at the right time (during get, iteration, compaction...)
class MergeOperator {
public:
virtual ~MergeOperator() {}
// Gives the client a way to express the read -> modify -> write semantics
// key: (IN) The key that's associated with this merge operation.
// existing: (IN) null indicates that the key does not exist before this op
// operand_list:(IN) the sequence of merge operations to apply, front() first.
// new_value: (OUT) Client is responsible for filling the merge result here
// logger: (IN) Client could use this to log errors during merge.
//
// Return true on success. Return false failure / error / corruption.
virtual bool FullMerge(const Slice& key,
const Slice* existing_value,
const std::deque<std::string>& operand_list,
std::string* new_value,
Logger* logger) const = 0;
// This function performs merge(left_op, right_op)
// when both the operands are themselves merge operation types.
// Save the result in *new_value and return true. If it is impossible
// or infeasible to combine the two operations, return false instead.
virtual bool PartialMerge(const Slice& key,
const Slice& left_operand,
const Slice& right_operand,
std::string* new_value,
Logger* logger) const = 0;
// The name of the MergeOperator. Used to check for MergeOperator
// mismatches (i.e., a DB created with one MergeOperator is
// accessed using a different MergeOperator)
virtual const char* Name() const = 0;
};
- MergeOperator提供了兩個方法, FullMerge和PartialMerge. 第一個方法用于對已有的值做Put或Delete操作. 第二個方法用于在可能的情況下將兩個操作數合并.
- AssociativeMergeOperator繼承了MergeOperator, 并提供了這些方法的默認實現, 暴露了簡化后的接口.
- MergeOperator的FullMerge方法的傳入exsiting_value和一個操作數序列, 而不是單獨的一個操作數.
工作原理
當調用DB::Put()和DB:Merge()接口時, 并不需要立刻計算最后的結果. RocksDB將計算的動作延后觸發(fā), 例如在下一次用戶調用Get, 或者RocksDB決定做Compaction時. 所以, 當merge的動作真正開始做的時候, 可能積壓(stack)了多個操作數需要處理. 這種情況就需要MergeOperator::FullMerge來對existing_value和一個操作數序列進行計算, 得到最終的值.
PartialMerge和Stacking
有時候, 在調用FullMerge之前, 可以先對某些merge操作數進行合并處理, 而不是將它們保存起來, 這就是PartialMerge的作用: 將兩個操作數合并為一個, 減少FullMerge的工作量.
當遇到兩個merge操作數時, RocksDB總是先會嘗試調用用戶的PartialMerge方法來做合并, 如果PartialMerge返回false才會保存操作數. 當遇到Put/Delete操作, 就會調用FullMerge將已存在的值和操作數序列傳入, 計算出最終的值.
Merge Best Practice
什么場景使用Merge
如果有如下需求,可以使用merge啊胶。
- 數據需要增量更新
- 經常需要讀數據甸各,而不知道數據的新value
使用Associative Merge的場景
- merge 操作數的格式和Put相同
- 多個順序的merge操作數可以合并成一個
使用Generic Merge的場景
- merge 操作數的格式和Put不同
- 當多個merge操作數可以合并時,PartialMerge()方法返回true
限于篇幅, 這一篇主要是基于官方的wiki, 介紹Merge操作的使用和特性, 源碼分析將放在下一篇.
參考資料:
https://github.com/facebook/rocksdb/wiki/Merge-Operator