lifecycle policy的存儲機(jī)制
當(dāng)我們?yōu)橐粋€bucket配置lifecycle policy時总寒,lifecycle相關(guān)的數(shù)據(jù)會存儲在2個位置:
- 在bucket.instance對象的xattr中寫入key為
user.rgw.lc
value為LifecycleConfig的屬性(真正的lifecycle rules列表)。 - 在lc.0 - lc.31共32個對象(這個值可配置)中選擇其中一個蚓炬,向其omap寫入lifecycle的狀態(tài)信息。但其實(shí)在該omap對應(yīng)的header中也有l(wèi)c相關(guān)的信息摊腋,比如用于記錄當(dāng)前omap的lifecycle遍歷進(jìn)度的marker,但這些數(shù)據(jù)不是在設(shè)置lc時設(shè)置的尤泽。
下面我們驗(yàn)證下:
首先脆淹,我們通過boto向一個名為bucket1的bucket配置lifecycle policy:
bucket1 = conn.get_bucket('bucket1')
expir = Expiration(days=1)
lc = Lifecycle()
lc.add_rule(
prefix = "test/",
expiration = expir,
)
bucket1.configure_lifecycle(lc)
之后签杈,去該bucket1對應(yīng)的bucket.instance對象的xattr中查看瘫镇。
$ rados -p default.rgw.meta ls --namespace root
bucket1
.bucket.meta.bucket1:38d08ed7-3883-49de-ab89-0dea7c8c960f.4162.1
$ rados -p default.rgw.meta --namespace root listxattr .bucket.meta.bucket1:38d08ed7-3883-49de-ab89-0dea7c8c960f.4162.1
ceph.objclass.version
user.rgw.acl
user.rgw.lc
user.rgw.lc
對應(yīng)的value就是該bucket的lifecycle rule列表。
然后答姥,再去查看lc.xx對象
$ rados -p default.rgw.log --namespace=lc ls
lc.6
lc.14
lc.29
lc.8
lc.10
lc.26
lc.22
lc.17
lc.27
lc.4
lc.11
lc.18
lc.20
lc.7
lc.2
lc.13
lc.16
lc.12
lc.30
lc.24
lc.9
lc.15
lc.19
lc.21
lc.23
lc.31
lc.25
lc.5
lc.3
lc.28
lc.1
lc.0
RGWPutLC::execute()代碼
lifecycle的組織方式也可以在put lc操作的代碼中窺見一斑铣除。
void RGWPutLC::execute()
{
bufferlist bl;
RGWLifecycleConfiguration_S3 *config = NULL;
RGWLCXMLParser_S3 parser(s->cct);
RGWLifecycleConfiguration_S3 new_config(s->cct);
// 從http header中取出md5到content_md5
content_md5 = s->info.env->get("HTTP_CONTENT_MD5");
if (content_md5 == nullptr) {
op_ret = -ERR_INVALID_REQUEST;
s->err.message = "Missing required header for this request: Content-MD5";
ldout(s->cct, 5) << s->err.message << dendl;
return;
}
// 將取出的md5從base64解碼到content_md5_bin
std::string content_md5_bin;
try {
content_md5_bin = rgw::from_base64(boost::string_view(content_md5));
} catch (...) {
s->err.message = "Request header Content-MD5 contains character "
"that is not base64 encoded.";
ldout(s->cct, 5) << s->err.message << dendl;
op_ret = -ERR_BAD_DIGEST;
return;
}
if (!parser.init()) {
op_ret = -EINVAL;
return;
}
// 從req_state中解析出put lc所需的參數(shù)存入RGWPutLC.data長度為RGWPutLC.len
op_ret = get_params();
if (op_ret < 0)
return;
ldout(s->cct, 15) << "read len=" << len << " data=" << (data ? data : "") << dendl;
// 計算params的MD5
MD5 data_hash;
unsigned char data_hash_res[CEPH_CRYPTO_MD5_DIGESTSIZE];
data_hash.Update(reinterpret_cast<const byte*>(data), len);
data_hash.Final(data_hash_res);
// 比較計算出的md5和客戶端傳入的md5是否一致,以判斷數(shù)據(jù)是否損壞
if (memcmp(data_hash_res, content_md5_bin.c_str(), CEPH_CRYPTO_MD5_DIGESTSIZE) != 0) {
op_ret = -ERR_BAD_DIGEST;
s->err.message = "The Content-MD5 you specified did not match what we received.";
ldout(s->cct, 5) << s->err.message
<< " Specified content md5: " << content_md5
<< ", calculated content md5: " << data_hash_res
<< dendl;
return;
}
// 將data中的參數(shù)數(shù)據(jù)解析到parser對象中
if (!parser.parse(data, len, 1)) {
op_ret = -ERR_MALFORMED_XML;
return;
}
// 解析出的xml對象是一顆樹結(jié)構(gòu)
/*
class XMLObj
{
XMLObj *parent;
......
multimap<string, XMLObj *> children;
......
}
*/
// 如上鹦付,每一個標(biāo)簽作為一個節(jié)點(diǎn)尚粘,分別包含指向其父節(jié)點(diǎn)的指針和孩子節(jié)點(diǎn)的指針列表
config = static_cast<RGWLifecycleConfiguration_S3 *>(parser.find_first("LifecycleConfiguration"));
if (!config) {
op_ret = -ERR_MALFORMED_XML;
return;
}
// 將config中的rule_map中的rule轉(zhuǎn)存到new_config的rule_map和prefix_map中
op_ret = config->rebuild(store, new_config);
if (op_ret < 0)
return;
if (s->cct->_conf->subsys.should_gather(ceph_subsys_rgw, 15)) {
ldout(s->cct, 15) << "New LifecycleConfiguration:";
new_config.to_xml(*_dout);
*_dout << dendl;
}
// 將rule_map編碼存入bl,并copy一個attrs map睁壁,增加RGW_ATTR_LC->bl項(xiàng)背苦,
new_config.encode(bl);
map<string, bufferlist> attrs;
attrs = s->bucket_attrs;
attrs[RGW_ATTR_LC] = bl;
// 將新的attrs寫入bucket.instance對象的xattr中,
op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs, &s->bucket_info.objv_tracker);
if (op_ret < 0)
return;
string shard_id = s->bucket.tenant + ':' + s->bucket.name + ':' + s->bucket.bucket_id;
string oid;
// 從default.rgw.log pool中的32個lc.xx對象中選擇一個潘明,構(gòu)造oid行剂,xx表示一個0-31的整數(shù)
// get_lc_oid代碼如下:
/*
static void get_lc_oid(struct req_state *s, string& oid){
string shard_id = s->bucket.name + ':' +s->bucket.bucket_id;
int max_objs = (s->cct->_conf->rgw_lc_max_objs > HASH_PRIME)?HASH_PRIME:s->cct->_conf->rgw_lc_max_objs;
int index = ceph_str_hash_linux(shard_id.c_str(), shard_id.size()) % HASH_PRIME % max_objs;
oid = lc_oid_prefix;
char buf[32];
snprintf(buf, 32, ".%d", index);
oid.append(buf);
return;
}
*/
get_lc_oid(s, oid);
// 構(gòu)造要寫入omap的entry內(nèi)容
pair<string, int> entry(shard_id, lc_uninitial);
int max_lock_secs = s->cct->_conf->rgw_lc_lock_max_time;
rados::cls::lock::Lock l(lc_index_lock_name);
utime_t time(max_lock_secs, 0);
l.set_duration(time);
l.set_cookie(cookie);
librados::IoCtx *ctx = store->get_lc_pool_ctx();
do {
op_ret = l.lock_exclusive(ctx, oid);
if (op_ret == -EBUSY) {
dout(0) << "RGWLC::RGWPutLC() failed to acquire lock on, sleep 5, try again" << oid << dendl;
sleep(5);
continue;
}
if (op_ret < 0) {
dout(0) << "RGWLC::RGWPutLC() failed to acquire lock " << oid << op_ret << dendl;
break;
}
// 在lc.xx對象關(guān)聯(lián)的omap中寫入entry
op_ret = cls_rgw_lc_set_entry(*ctx, oid, entry);
if (op_ret < 0) {
dout(0) << "RGWLC::RGWPutLC() failed to set entry " << oid << op_ret << dendl;
}
break;
}while(1);
l.unlock(ctx, oid);
return;
}
lifecycle的作用機(jī)制
RGWLC類是負(fù)責(zé)執(zhí)行l(wèi)c的類,它會根據(jù)用戶的配置開啟1個或多個worker線程钳降,這些worker線程的任務(wù)是在一個無限循環(huán)中厚宰,每隔一段時間(生產(chǎn)環(huán)境是一天一次,測試環(huán)境比較頻繁)判斷一下當(dāng)前是否應(yīng)該執(zhí)行l(wèi)ifecycle的遍歷工作遂填,如果是的話铲觉,調(diào)用RGWLC類的process方法,隨機(jī)選擇32個lc.xx對象中的一個吓坚,根據(jù)其header中的標(biāo)記撵幽,取出其未遍歷的下一個omap entry,更新header中的標(biāo)記礁击,更新該entry的狀態(tài)為processing盐杂,然后處理該entry逗载,遍歷該條entry對應(yīng)的bucket中的所有對象,根據(jù)lc規(guī)則刪除或轉(zhuǎn)換bucket中過期的object链烈,并寫日志厉斟。
代碼追蹤如下:
下面這個函數(shù)是worker線程的的執(zhí)行內(nèi)容,可以看到强衡,它在一個while循環(huán)中擦秽,每隔一段時間判斷should_work,如果通過的話漩勤,那么就調(diào)用lc->process()函數(shù)進(jìn)行遍歷感挥,然后設(shè)置下一次被喚醒的時間,進(jìn)入阻塞狀態(tài)锯七。
void *RGWLC::LCWorker::entry() {
do {
utime_t start = ceph_clock_now();
if (should_work(start)) {
dout(2) << "life cycle: start" << dendl;
int r = lc->process();
if (r < 0) {
dout(0) << "ERROR: do life cycle process() returned error r=" << r << dendl;
}
dout(2) << "life cycle: stop" << dendl;
}
if (lc->going_down())
break;
utime_t end = ceph_clock_now();
int secs = schedule_next_start_time(start, end);
utime_t next;
next.set_from_double(end + secs);
dout(5) << "schedule life cycle next start time: " << rgw_to_asctime(next) <<dendl;
lock.Lock();
cond.WaitInterval(lock, utime_t(secs, 0));
lock.Unlock();
} while (!lc->going_down());
return NULL;
}
在RGWLC::process函數(shù)中链快,主要做了以下幾件事:
1.從lc.xx對象的header中獲得omap中要遍歷的下一個entry
2.將拿到的entry的狀態(tài)設(shè)為processing(正在處理)
3.更新header中記錄的下一個entry
4.調(diào)用bucket_lc_process
函數(shù)處理當(dāng)前的entry對應(yīng)的lc規(guī)則
int RGWLC::process(int index, int max_lock_secs)
{
rados::cls::lock::Lock l(lc_index_lock_name);
do {
utime_t now = ceph_clock_now();
pair<string, int > entry;//string = bucket_name:bucket_id ,int = LC_BUCKET_STATUS
if (max_lock_secs <= 0)
return -EAGAIN;
utime_t time(max_lock_secs, 0);
l.set_duration(time);
int ret = l.lock_exclusive(&store->lc_pool_ctx, obj_names[index]);
if (ret == -EBUSY) { /* already locked by another lc processor */
dout(0) << "RGWLC::process() failed to acquire lock on, sleep 5, try again" << obj_names[index] << dendl;
sleep(5);
continue;
}
if (ret < 0)
return 0;
// 讀取lc.xx對象的head
string marker;
cls_rgw_lc_obj_head head;
ret = cls_rgw_lc_get_head(store->lc_pool_ctx, obj_names[index], head);
if (ret < 0) {
dout(0) << "RGWLC::process() failed to get obj head " << obj_names[index] << ret << dendl;
goto exit;
}
if(!if_already_run_today(head.start_date)) {
head.start_date = now;
head.marker.clear();
ret = bucket_lc_prepare(index);
if (ret < 0) {
dout(0) << "RGWLC::process() failed to update lc object " << obj_names[index] << ret << dendl;
goto exit;
}
}
// 從lc.xx對象的header中獲取下一個要遍歷的omap entry
ret = cls_rgw_lc_get_next_entry(store->lc_pool_ctx, obj_names[index], head.marker, entry);
if (ret < 0) {
dout(0) << "RGWLC::process() failed to get obj entry " << obj_names[index] << dendl;
goto exit;
}
if (entry.first.empty())
goto exit;
// 將該entry的狀態(tài)設(shè)為processing
entry.second = lc_processing;
ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index], entry);
if (ret < 0) {
dout(0) << "RGWLC::process() failed to set obj entry " << obj_names[index] << entry.first << entry.second << dendl;
goto exit;
}
// 更新header中的下一個entry標(biāo)記
head.marker = entry.first;
ret = cls_rgw_lc_put_head(store->lc_pool_ctx, obj_names[index], head);
if (ret < 0) {
dout(0) << "RGWLC::process() failed to put head " << obj_names[index] << dendl;
goto exit;
}
l.unlock(&store->lc_pool_ctx, obj_names[index]);
// 處理當(dāng)前的entry對應(yīng)的lc規(guī)則
ret = bucket_lc_process(entry.first);
bucket_lc_post(index, max_lock_secs, entry, ret);
}while(1);
exit:
l.unlock(&store->lc_pool_ctx, obj_names[index]);
return 0;
}
而bucket_lc_process
函數(shù)做的則就是最終的處理工作了:遍歷某條lc規(guī)則對應(yīng)的bucket的所有objects誉己,根據(jù)prefix和tagging找到lc 規(guī)則作用的object眉尸,然后判斷這些objects是否過期,如果過期巨双,做對應(yīng)的刪除處理噪猾。
要注意的是,目前L版本的ceph僅支持到期刪除的lifecycle筑累,也就是Expiration袱蜡。不支持Transition。