NVMEDevice
是基于SPDK針對NVME設(shè)備的一種BlockDevice
實現(xiàn)题篷,模塊的主要類圖如下:
接下來對主要的流程進行分析
初始化設(shè)備 NVMEDevice::open
配置項如:
bluestore_block_path = spdk:55cd2e404bd73932
int NVMEDevice::open(const string& p, int path_fd)
{
string serial_number;
int fd = ::open(p.c_str(), O_RDONLY | O_CLOEXEC);
...
char buf[100];
r = ::read(fd, buf, sizeof(buf));
...
// 讀取到設(shè)備sn
serial_number = string(buf, i);
// 調(diào)用NVMEManager進行設(shè)備加載
r = manager.try_get(serial_number, &driver);
// 將NVMEDevice注冊到Driver
driver->register_device(this);
block_size = driver->get_block_size();
size = driver->get_size();
name = serial_number;
...
return 0;
}
主要的初始化工作在manager.try_get
中完成,其中進行NVME設(shè)備的發(fā)現(xiàn)、注冊
int NVMEManager::try_get(const string &sn_tag, SharedDriverData **driver)
{
// 處理參數(shù)bluestore_spdk_coremask指定的core mask
// 需要至少2個core來運行spdk
...
// 首次init = false
if (!init) {
init = true;
// 啟動一個線程慢显,監(jiān)視probe_queue,當(dāng)其中有ProbeContext插入時熔萧,進行 spdk_nvme_probe 操作來發(fā)現(xiàn)設(shè)備
// 省略部分不重要的細節(jié)车猬,保留主要代碼邏輯
// probe_cb 用于指示是否連接設(shè)備
// attach_cb 用于自定義連接設(shè)備后的操作
dpdk_thread = std::thread(
[this, coremask_arg, m_core_arg, mem_size_arg]() {
...
// env 參數(shù)初始化
spdk_env_opts_init(&opts);
...
// 初始化env, DPDK庫
spdk_env_init(&opts);
...
std::unique_lock<std::mutex> l(probe_queue_lock);
while (true) {
if (!probe_queue.empty()) {
ProbeContext* ctxt = probe_queue.front();
probe_queue.pop_front();
// 遍歷總線NVME設(shè)備,并通過uio/vfio將其連接到用戶態(tài)NVME設(shè)備驅(qū)動
// 是否連接設(shè)備淆党,取決于probe_cb返回值酷师,返回true的設(shè)備才連接
r = spdk_nvme_probe(NULL, ctxt, probe_cb, attach_cb, NULL);
...
probe_queue_cond.notify_all();
} else {
probe_queue_cond.wait(l);
}
}
}
);
// 用于probe的dpdk_thread將一直運行,監(jiān)聽probe_queue染乌,重復(fù)設(shè)備發(fā)現(xiàn)過程
dpdk_thread.detach();
}
// 向probe_queue插入本次需要發(fā)現(xiàn)設(shè)備的ProbeContext
// 觸發(fā)dpdk_thread的設(shè)備probe
ProbeContext ctx = {sn_tag, this, nullptr, false};
{
std::unique_lock<std::mutex> l(probe_queue_lock);
probe_queue.push_back(&ctx);
while (!ctx.done)
probe_queue_cond.wait(l);
}
// 等待本次ProbeContext處理完成以后山孔,獲得SharedDriverData實例:ctx.driver
if (!ctx.driver)
return -1;
// 至此,NVMEDevice與SharedDriverData的關(guān)聯(lián)關(guān)系建立起來荷憋,為接下來的IO做好了準(zhǔn)備
*driver = ctx.driver;
return 0;
}
在上面的probe流程中台颠,有兩個最關(guān)鍵的函數(shù):probe_cb和attach_cb,接下來進行分析
probe_cb
注意參數(shù)cb_ctx
即為此前spdk_nvme_probe
調(diào)用時的ProbeContext實例指針
static bool probe_cb(void *cb_ctx, const struct spdk_nvme_transport_id *trid, struct spdk_nvme_ctrlr_opts *opts)
{
NVMEManager::ProbeContext *ctx = static_cast<NVMEManager::ProbeContext*>(cb_ctx);
char serial_number[128];
struct spdk_pci_addr pci_addr;
struct spdk_pci_device *pci_dev = NULL;
int result = 0;
// 非本地NVME設(shè)備不做連接勒庄,還可能存在遠程的基于NVMe-oF的設(shè)備(NVMe over Fabrics)串前,Ceph僅使用本地NVME設(shè)備
if (trid->trtype != SPDK_NVME_TRANSPORT_PCIE) {
dout(0) << __func__ << " only probe local nvme device" << dendl;
return false;
}
// 獲取設(shè)備PCI地址,獲取失敗的設(shè)備不做連接
result = spdk_pci_addr_parse(&pci_addr, trid->traddr);
if (result) {
dout(0) << __func__ << " failed to get pci address from %s, " << trid->traddr << " return value is: %d" << result << dendl;
return false;
}
// 獲取PCI設(shè)備信息实蔽,獲取失敗的設(shè)備不連接
pci_dev = spdk_pci_get_device(&pci_addr);
if (!pci_dev) {
dout(0) << __func__ << " failed to get pci device" << dendl;
return false;
}
// 讀取設(shè)備SN荡碾,讀取失敗的設(shè)備不連接
result = spdk_pci_device_get_serial_number(pci_dev, serial_number, 128);
if (result < 0) {
dout(10) << __func__ << " failed to get serial number from %p" << pci_dev << dendl;
return false;
}
// 對比配置參數(shù)的SN和設(shè)備的SN,若不一致局装,則為其它的非指定NVME設(shè)備坛吁,不連接
if (ctx->sn_tag.compare(string(serial_number, 16))) {
dout(0) << __func__ << " device serial number (" << ctx->sn_tag << ") not match " << serial_number << dendl;
return false;
}
// 一切正常劳殖,且SN匹配的設(shè)備,連接
return true;
}
attach_cb
static void attach_cb(void *cb_ctx, const struct spdk_nvme_transport_id *trid,
struct spdk_nvme_ctrlr *ctrlr, const struct spdk_nvme_ctrlr_opts *opts)
{
struct spdk_pci_addr pci_addr;
struct spdk_pci_device *pci_dev = NULL;
spdk_pci_addr_parse(&pci_addr, trid->traddr);
pci_dev = spdk_pci_get_device(&pci_addr);
...
NVMEManager::ProbeContext *ctx = static_cast<NVMEManager::ProbeContext*>(cb_ctx);
// 注冊NVME設(shè)備控制器
ctx->manager->register_ctrlr(ctx->sn_tag, ctrlr, pci_dev, &ctx->driver);
}
// 設(shè)備控制器注冊邏輯
void register_ctrlr(const string &sn_tag, spdk_nvme_ctrlr *c, struct spdk_pci_device *pci_dev,
SharedDriverData **driver) {
// 確保manage的互斥鎖拨脉,因為注冊操作非線程安全
assert(lock.is_locked());
// 獲取設(shè)備的namespace數(shù)量哆姻,至少要有1個,若超過1個女坑,也僅使用第一個namespace
spdk_nvme_ns *ns;
int num_ns = spdk_nvme_ctrlr_get_num_ns(c);
assert(num_ns >= 1);
...
ns = spdk_nvme_ctrlr_get_ns(c, 1);
...
// 實際上填具,manager僅管理一個driver,因為現(xiàn)在的版本匆骗,OSD只能使用一個NVME設(shè)備
assert(shared_driver_datas.empty());
// 初始化SharedDriverData并將其加入driver列表劳景,實際上只會有一個driver
shared_driver_datas.push_back(new SharedDriverData(shared_driver_datas.size()+1, sn_tag, c, ns));
*driver = shared_driver_datas.back();
}
// SharedDriverData初始化邏輯
SharedDriverData(unsigned _id, const std::string &sn_tag,
spdk_nvme_ctrlr *c, spdk_nvme_ns *ns)
: id(_id),
sn(sn_tag),
ctrlr(c),
ns(ns) {
int i;
// 獲得size、sector size等基本信息
sector_size = spdk_nvme_ns_get_sector_size(ns);
block_size = std::max(CEPH_PAGE_SIZE, sector_size);
size = ((uint64_t)sector_size) * spdk_nvme_ns_get_num_sectors(ns);
// 之前通過core mask指定了可用的核碉就,這里將在除了主核之外的可用核上盟广,各創(chuàng)建一個SharedDriverQueueData
RTE_LCORE_FOREACH_SLAVE(i) {
queues.push_back(new SharedDriverQueueData(this, ctrlr, ns, block_size, sn, sector_size, i, queue_number++));
}
// 調(diào)用每個SharedDriverQueueData的start()方法
_aio_start();
}
// SharedDriverQueueData的初始化
SharedDriverQueueData(SharedDriverData *driver, spdk_nvme_ctrlr *c, spdk_nvme_ns *ns, uint64_t block_size,
const std::string &sn_tag, uint32_t sector_size, uint32_t core, uint32_t queue_id)
: driver(driver),
...
// 指定queue的執(zhí)行函數(shù)為 _aio_thread()
run_func([this]() { _aio_thread(); }),
completed_op_seq(0), queue_op_seq(0) {
// 核心就一行代碼,創(chuàng)建spdk nvme qpair
qpair = spdk_nvme_ctrlr_alloc_io_qpair(ctrlr, SPDK_NVME_QPRIO_URGENT);
...
}
// SharedDriverQueueData的start()
void start() {
// DPDK提供的函數(shù)瓮钥,啟動一個線程筋量,執(zhí)行初始化時指定的函數(shù):_aio_thread(),
// 并設(shè)置線程的CPU親和性碉熄,指定到core_id對應(yīng)的核
// 達到在一個邏輯核上啟動一個工作線程的目的
int r = rte_eal_remote_launch(dpdk_thread_adaptor, static_cast<void*>(&run_func),
core_id);
assert(r == 0);
}
_aio_thread()
的流程桨武,結(jié)合后續(xù)的讀寫流程來討論
WRITE
Bluestore的寫流程中,在_txc_add_transaction
過程中會調(diào)用BlockDevice
的aio_write
,
aio_write
過程主要完成IOContext的裝配工作
在Bluestore的_txc_state_proc
狀態(tài)機中锈津,對于SimpleWrite會在STATE_PREPARE
階段調(diào)用BlockDevice
的aio_submit
提交IOContext
對于defferedWrite呀酸,會在kv_finalize_thread
中調(diào)用BlockDevice
的aio_submit
提交IOContext
接下來分析NVMEDevice
的aio_write
和aio_submit
過程
int NVMEDevice::aio_write(
uint64_t off,
bufferlist &bl,
IOContext *ioc,
bool buffered)
{
uint64_t len = bl.length();
// 確保IO的合法性,offset和length必須與block_size對齊琼梆,且不會越出設(shè)備大小邊界
assert(off % block_size == 0);
assert(len % block_size == 0);
assert(len > 0);
assert(off < size);
assert(off + len <= size);
// 初始化NVMEDevice特有的Task
Task *t = new Task(this, IOCommand::WRITE_COMMAND, off, len);
t->write_bl = std::move(bl);
if (buffered) {
// buffered write性誉,默認關(guān)閉,可通過參數(shù)打開
// 直接將Task提交給SharedDriverQueueData茎杂,將被它的_aio_thread()處理
if(queue_id == -1)
queue_id = ceph_gettid();
driver->get_queue(queue_id)->queue_task(t);
} else {
// 默認的非Buffered write错览,此處僅進行IOContext組裝,并不提交IO
// 經(jīng)由后續(xù)的aio_submit來提交Task
t->ctx = ioc;
Task *first = static_cast<Task*>(ioc->nvme_task_first);
Task *last = static_cast<Task*>(ioc->nvme_task_last);
if (last)
last->next = t;
if (!first)
ioc->nvme_task_first = t;
ioc->nvme_task_last = t;
++ioc->num_pending;
}
return 0;
}
void NVMEDevice::aio_submit(IOContext *ioc)
{
int pending = ioc->num_pending.load();
Task *t = static_cast<Task*>(ioc->nvme_task_first);
// num_pending在aio_write時自增煌往,ioc->nvme_task_first也被設(shè)置為Task類型實例指針
// 對于默認的非buffered write來說倾哺,此處的條件總是滿足
if (pending && t) {
ioc->num_running += pending;
ioc->num_pending -= pending;
// 確認只有本線程在處理這個IOContext,不應(yīng)存在多個線程處理同一個IOContext的情況
assert(ioc->num_pending.load() == 0);
// 提交Task到SharedDriverQueueData刽脖,將被它的_aio_thread()處理
if(queue_id == -1)
queue_id = ceph_gettid();
driver->get_queue(queue_id)->queue_task(t, pending);
ioc->nvme_task_first = ioc->nvme_task_last = nullptr;
}
}
經(jīng)過aio_submit
之后悼粮,Task被提交到SharedDriverQueueData
的task_queue
隊列中
需要注意的是,Task通過其next指針將多個Task串聯(lián)起來曾棕,入隊列的只是head Task
READ
BlueStore
會調(diào)用BlockDevice
的read
和aio_read
兩個方法來讀取數(shù)據(jù)
相同點在于:
- 它們都會產(chǎn)生一個Task,類型為
READ_COMMAND
; - 都會分配一個頁對齊的buffer用于接收讀取到的數(shù)據(jù)
區(qū)別在于:
-
read
會直接提交Task到SharedDriverQueueData
的task_queue
菜循,然后同步等待直至Task被處理完成翘地,獲得讀取到的數(shù)據(jù)同步返回 -
aio_read
僅完成Task組裝和IOContext組裝,與aio_write
一樣,需要經(jīng)過aio_submit
來提交Task衙耕,為異步讀取數(shù)據(jù)
_aio_thread()
由上面的分析可知昧穿,aio_write
和aio_read
操作最后都會將Task提交到SharedDriverQueueData
的task_queue
_aio_thread()
的處理邏輯如下:
void SharedDriverQueueData::_aio_thread()
{
// 準(zhǔn)備數(shù)據(jù)buffer緩沖區(qū)
if (data_buf_mempool.empty()) {
for (uint16_t i = 0; i < data_buffer_default_num; i++) {
void *b = spdk_zmalloc(data_buffer_size, CEPH_PAGE_SIZE, NULL);
if (!b) {
derr << __func__ << " failed to create memory pool for nvme data buffer" << dendl;
assert(b);
}
data_buf_mempool.push_back(b);
}
}
Task *t = nullptr;
int r = 0;
uint64_t lba_off, lba_count;
ceph::coarse_real_clock::time_point cur, start
= ceph::coarse_real_clock::now();
// 開始線程的處理邏輯循環(huán)
while (true) {
bool inflight = queue_op_seq.load() - completed_op_seq.load();
again:
// 當(dāng)存在在途未完成的請求時,進行一次completion收割操作橙喘,若沒有完成的請求时鸵,則線程自旋等待
// _mm_pause() 由DPDK庫的librte_env實現(xiàn)
// 若有完成的IO,則會依次調(diào)用其注冊的回調(diào)函數(shù):io_complete
// 回調(diào)操作由本線程在此處完成
if (inflight) {
if (!spdk_nvme_qpair_process_completions(qpair, g_conf->bluestore_spdk_max_io_completion)) {
dout(30) << __func__ << " idle, have a pause" << dendl;
_mm_pause();
}
}
// 由于t還未被初始化厅瞎,首次將跳過此for循環(huán)饰潜,之后t會被賦值,下一輪循環(huán)就可能進入for循環(huán)中
for (; t; t = t->next) {
t->queue = this;
lba_off = t->offset / sector_size;
lba_count = t->len / sector_size;
switch (t->command) {
case IOCommand::WRITE_COMMAND:
{
// 分配并拷貝task的數(shù)據(jù)區(qū)域內(nèi)存,若分配失敗則goto again重試
r = alloc_buf_from_pool(t, true);
...
//提交寫請求到qpair和簸,參數(shù)含義如下
// ns 提交 I/O.的namespace
// qpair 提交 I/O 的qpair
// lba_off 寫請求起始LBA號.
// lba_count 寫請求的LBA數(shù)量
// io_complete 寫請求完成時的回調(diào)函數(shù)
// t io_complete回調(diào)時傳入的參數(shù).
// io_flags I/O flag
// data_buf_reset_sgl 重置數(shù)據(jù)區(qū)域的回調(diào)函數(shù).
// data_buf_next_sge 遍歷數(shù)據(jù)各內(nèi)存區(qū)域的回調(diào)函數(shù)
r = spdk_nvme_ns_cmd_writev(
ns, qpair, lba_off, lba_count, io_complete, t, 0,
data_buf_reset_sgl, data_buf_next_sge);
...
break;
}
case IOCommand::READ_COMMAND:
{
// 分配task的數(shù)據(jù)區(qū)域內(nèi)存彭雾,不做拷貝,若分配失敗則goto again重試
r = alloc_buf_from_pool(t, false);
...
//提交讀請求到qpair,其參數(shù)含義與寫一致
r = spdk_nvme_ns_cmd_readv(
ns, qpair, lba_off, lba_count, io_complete, t, 0,
data_buf_reset_sgl, data_buf_next_sge);
...
break;
}
case IOCommand::FLUSH_COMMAND:
{
// 暫無使用FLUSH_COMMAND的場景
...
break;
}
}
}
if (!queue_empty.load()) {
// queue非空時锁保,取出隊首的Task薯酝,下一輪循環(huán)時,由上面的for循環(huán)處理
Mutex::Locker l(queue_lock);
if (!task_queue.empty()) {
t = task_queue.front();
task_queue.pop();
logger->set(l_bluestore_nvmedevice_queue_ops, task_queue.size());
}
if (!t)
queue_empty = true;
} else {
// 隊列為空時爽柒,喚醒上層因flush阻塞的線程吴菠,因為沒有在途IO,說明所有數(shù)據(jù)已經(jīng)安全落盤浩村,flush可以安全返回
if (flush_waiters.load()) {
Mutex::Locker l(flush_lock);
if (*flush_waiter_seqs.begin() <= completed_op_seq.load())
flush_cond.Signal();
}
if (!inflight) {
...
Mutex::Locker l(queue_lock);
if (queue_empty.load()) {
// 運行到此處做葵,說明本輪循環(huán)開始時沒有在途IO,到本輪循環(huán)結(jié)束時也沒有在途IO
// 滿足線程安全退出的條件
// 檢查是否在外層設(shè)置了aio_stop標(biāo)示穴亏,若設(shè)置了蜂挪,則本線程退出,停止IO處理
if (aio_stop)
break;
queue_cond.Wait(queue_lock);
}
}
}
}
...
}
FLUSH
int NVMEDevice::flush()
{
...
SharedDriverQueueData *queue = driver->get_queue(queue_id);
assert(queue != NULL);
queue->flush_wait();
...
}
// SharedDriverQueueData的flush_wait()實現(xiàn)
void flush_wait() {
uint64_t cur_seq = queue_op_seq.load();
uint64_t left = cur_seq - completed_op_seq.load();
if (cur_seq > completed_op_seq) {
// 存在在途IO嗓化,則等待棠涮,在_aio_thread()中處理完在途IO后會喚醒本flush線程,
// 喚醒后再次檢查queue_op_seq和completed_op_seq刺覆,確認沒有在途IO后严肪,flush才能返回
Mutex::Locker l(flush_lock);
++flush_waiters;
flush_waiter_seqs.insert(cur_seq);
while (cur_seq > completed_op_seq.load()) {
flush_cond.Wait(flush_lock);
}
flush_waiter_seqs.erase(cur_seq);
--flush_waiters;
}
}