RDMA通過kernel-bypass和協(xié)議棧offload兩大核心技術,實現了遠高于傳統(tǒng)TCP/IP的網絡通信性能。盡管RDMA的性能要遠好于TCP/IP令野,但目前RDMA的實際落地業(yè)務場景卻寥寥無幾夷恍,這其中制約RDMA技術大規(guī)模上線應用的主要原因有兩點:
- 主流互聯(lián)網公司普遍選擇RoCE(RDMA over Converged Ethernet)作為RDMA部署方案,而RoCE本質上是RDMA over UDP曼月,在網絡上無法保證不丟包。因此RoCE部署方案需要額外的擁塞控制機制來保證底層的無損網絡柔昼,如PFC哑芹、ECN等,這給大規(guī)模的上線部署帶來挑戰(zhàn)捕透。而且目前各大廠商對硬件擁塞控制的支持均還不完善聪姿,存在兼容性問題碴萧。
- RDMA提供了完全不同于socket的編程接口,因此要想使用RDMA末购,需要對現有應用進行改造破喻。而RDMA原生編程API(verbs/RDMA_CM)比較復雜,需要對RDMA技術有深入理解才能做好開發(fā)盟榴,學習成本較高曹质。
為了降低應用程序的改造成本,決定研發(fā)一個RDMA通信庫擎场,該通信庫直接基于ibvebrs和RDMA_CM羽德,避免對其他第三方庫的調用。本文主要對rdma編程的事件通知機制進行歸納總結迅办。
傳統(tǒng)socket編程中通常采用IO復用技術(select宅静、poll、epoll等)來實現事件通知機制站欺,那么對于rdma是否可以同樣基于IO復用技術來實現事件通知機制姨夹?答案是完全可以。
1. RDMA_CM API(For Connection)
在rdma編程時矾策,可以直接通過RDMA_CM API來建立RDMA連接磷账。
對rdma_create_id函數進行分析,其主要創(chuàng)建了rdma_cm_id對象贾虽,并將其注冊到驅動中逃糟。
int rdma_create_id(struct rdma_event_channel *channel,
struct rdma_cm_id **id, void *context,
enum rdma_port_space ps)
{
enum ibv_qp_type qp_type = (ps == RDMA_PS_IPOIB || ps == RDMA_PS_UDP) ?
IBV_QPT_UD : IBV_QPT_RC;
ret = ucma_init(); //查詢獲取所有IB設備,存放在cma_dev_array全局數組中榄鉴;檢測是否支持AF_IB協(xié)議
struct cma_id_private *id_priv =
ucma_alloc_id(channel, context, ps, qp_type); //創(chuàng)建并初始化id_priv對象:若未創(chuàng)建rdma_event_channel履磨,那么調用rdma_create_event_channel創(chuàng)建一個蛉抓。
CMA_INIT_CMD_RESP(&cmd, sizeof cmd, CREATE_ID, &resp, sizeof resp);
cmd.uid = (uintptr_t) id_priv;
cmd.ps = ps;
cmd.qp_type = qp_type;
ret = write(id_priv->id.channel->fd, &cmd, sizeof cmd); //將id_priv相關信息注冊到內核驅動中庆尘,不做過多分析
*id = &id_priv->id; //返回rdma_cm_id對象
}
rdma_cm_id數據結構定義如下:
struct rdma_cm_id {
struct ibv_context *verbs; //ibv_open_device
struct rdma_event_channel *channel; //rdma_create_event_channel創(chuàng)建;For Setup connection
void *context; //user specified context
struct ibv_qp *qp; //rdma_create_qp巷送,底層調用的是ibv_create_qp
struct rdma_route route;
enum rdma_port_space ps; //RDMA_PS_IPOIB or RDMA_PS_UDP or RDMA_PS_TCP
uint8_t port_num; //port數目
struct rdma_cm_event *event; //rdma_cm相關的事件events
struct ibv_comp_channel *send_cq_channel; //ibv_create_comp_channel創(chuàng)建驶忌;For data transfer
struct ibv_cq *send_cq; //發(fā)送CQ,通常和recv_cq是同一個CQ
struct ibv_comp_channel *recv_cq_channel; //ibv_create_comp_channel創(chuàng)建笑跛;For data transfer
struct ibv_cq *recv_cq; //接收CQ付魔,通常和send_cq是同一個CQ
struct ibv_srq *srq;
struct ibv_pd *pd; //ibv_open_device
enum ibv_qp_type qp_type; //IBV_QPT_RC or IBV_QPT_UD
};
在創(chuàng)建rdma_cm_id時,如果預先沒有創(chuàng)建rdma_event_channel飞蹂,那么需要調用rdma_create_event_channel函數几苍。
struct rdma_event_channel *rdma_create_event_channel(void)
{
struct rdma_event_channel *channel;
if (ucma_init()) //通過static局部變量,保證只做一次初始化
return NULL;
channel = malloc(sizeof *channel); //創(chuàng)建rdma_event_channel
if (!channel)
return NULL;
channel->fd = open("/dev/infiniband/rdma_cm", O_RDWR | O_CLOEXEC); //可以看出rdma_event_channel本質上就是一個fd
if (channel->fd < 0) {
goto err;
}
return channel;
err:
free(channel);
return NULL;
}
rdma_event_channel的定義如下:
struct rdma_event_channel {
int fd;
}
1.1 RDMA_CM原生事件通知實現(in block way)
static int cma_handler(struct rdma_cm_id *cma_id, struct rdma_cm_event *event);
ret = rdma_get_cm_event(channel, &event); //阻塞操作陈哑,直到有rdma_cm event發(fā)生才返回
if (!ret) {
ret = cma_handler(event->id, event); //處理事件
rdma_ack_cm_event(event); //ack event
}
static int cma_handler(struct rdma_cm_id *cma_id, struct rdma_cm_event *event) {
int ret = 0;
switch (event->event)
{
case RDMA_CM_EVENT_ADDR_RESOLVED:
ret = addr_handler(cma_id->context);
break;
case RDMA_CM_EVENT_MULTICAST_JOIN:
ret = join_handler(cma_id->context, &event->param.ud);
break;
case RDMA_CM_EVENT_ADDR_ERROR:
case RDMA_CM_EVENT_ROUTE_ERROR:
case RDMA_CM_EVENT_MULTICAST_ERROR:
printf("mckey: event: %s, error: %d\n", rdma_event_str(event->event), event->status); connect_error();
ret = event->status;
break;
case RDMA_CM_EVENT_DEVICE_REMOVAL:
/* Cleanup will occur after test completes. */
break;
default:
break;
}
可以看出妻坝,RDMA_CM的fd所偵測的都是建立連接相關的event伸眶,其不涉及數據傳輸相關的event,所以rdma_cm event只用于通知建連相關事件
enum rdma_cm_event_type {$
RDMA_CM_EVENT_ADDR_RESOLVED,
RDMA_CM_EVENT_ADDR_ERROR,
RDMA_CM_EVENT_ROUTE_RESOLVED,
RDMA_CM_EVENT_ROUTE_ERROR,
RDMA_CM_EVENT_CONNECT_REQUEST,
RDMA_CM_EVENT_CONNECT_RESPONSE,
RDMA_CM_EVENT_CONNECT_ERROR,
RDMA_CM_EVENT_UNREACHABLE,
RDMA_CM_EVENT_REJECTED,
RDMA_CM_EVENT_ESTABLISHED,
RDMA_CM_EVENT_DISCONNECTED,
RDMA_CM_EVENT_DEVICE_REMOVAL,
RDMA_CM_EVENT_MULTICAST_JOIN,
RDMA_CM_EVENT_MULTICAST_ERROR,
RDMA_CM_EVENT_ADDR_CHANGE,
RDMA_CM_EVENT_TIMEWAIT_EXIT
};$
1.2 IO復用poll/epoll(in non-block way)
rdma_cm fd不同于傳統(tǒng)socket fd刽宪,其只會向上拋POLLIN事件厘贼,表示有rdma_cm event事件發(fā)生,具體event類型需要通過rdma_get_cm_event來獲取圣拄。
/* change the blocking mode of the completion channel */
flags = fcntl(cm_id->channel->fd, F_GETFL);
rc = fcntl(cm_id->channel->fd, F_SETFL, flags | O_NONBLOCK); //設置rdma_cm fd為NONBLOCK
if (rc < 0) {
fprintf(stderr, "Failed to change file descriptor of Completion Event Channel\n");
return -1;
}
struct pollfd my_pollfd;
int ms_timeout = 10;
/*
* poll the channel until it has an event and sleep ms_timeout
* milliseconds between any iteration
*/
my_pollfd.fd = cm_id->channel->fd;
my_pollfd.events = POLLIN; //只需要監(jiān)聽POLLIN事件嘴秸,POLLIN事件意味著有rdma_cm event發(fā)生
my_pollfd.revents = 0;
do {
rc = poll(&my_pollfd, 1, ms_timeout); //非阻塞操作,有事件或者超時時返回
} while (rc == 0);
/* 注意:poll監(jiān)聽到有事件發(fā)生庇谆,只意味著有rdma_cm event事件發(fā)生岳掐,但具體event仍然需要通過rdma_get_cm_event來獲取。*/
ret = rdma_get_cm_event(channel, &event);
if (!ret) {
ret = cma_handler(event->id, event); //處理收到的事件
rdma_ack_cm_event(event); //ack event
}
2. verbs API(For data transfer)
從上一節(jié)可以看出族铆,RDMA_CM中的fd只涉及建連相關的事件岩四,其無法獲取數據傳輸相關的事件。
對于RDMA傳輸哥攘,數據傳輸是由NIC硬件完成的剖煌,完全不需要CPU參與。網卡硬件完成數據傳輸后逝淹,會向CQ(completion queue中)提交一個cqe耕姊,用于描述數據傳輸完成情況。
struct ibv_cq *ibv_create_cq(struct ibv_context *context, int cqe,
void *cq_context, struct ibv_comp_channel *channel, int comp_vector)
# 作用:創(chuàng)建CQ栅葡,每個QP都有對應的send cq和recv cq茉兰。
# 一個CQ可以被同一個QP的send queue和recv queue共享,也可以被多個不同的QP共享
# 注意:CQ僅僅只是一個queue欣簇,其本身沒有built-in的事件通知機制规脸。如果想要增加事件通知機制,那么需要指定channel對象熊咽。
verbs API提供了創(chuàng)建ibv_comp_channel的編程接口:
struct ibv_comp_channel *ibv_create_comp_channel(struct ibv_context *context)
# 作用:創(chuàng)建completion channel莫鸭,用于向user通知有新的completion queue event(cqe)已經被寫入CQ中。
struct ibv_comp_channel {
struct ibv_context *context;
int fd;
int refcnt;
};$
2.1 Verbs原生事件通知實現(in block way)
struct ibv_context *context;
struct ibv_cq *cq;
void *ev_ctx = NULL; /* can be initialized with other values for the CQ context */
/* Create a CQ, which is associated with a Completion Event Channel */
cq = ibv_create_cq(ctx, 1, ev_ctx, channel, 0);
if (!cq) {
fprintf(stderr, "Failed to create CQ\n");
return -1;
}
/* Request notification before any completion can be created (to prevent races) */
ret = ibv_req_notify_cq(cq, 0);
if (ret) {
fprintf(stderr, "Couldn't request CQ notification\n");
return -1;
}
/* The following code will be called each time you need to read a Work Completion */
struct ibv_cq *ev_cq;
void *ev_ctx;
int ret;
int ne;
/* Wait for the Completion event */
ret = ibv_get_cq_event(channel, &ev_cq, &ev_ctx); //阻塞函數横殴,直到有cqe發(fā)生才返回被因,ev_cq指向發(fā)生cqe的CQ
if (ret) {
fprintf(stderr, "Failed to get CQ event\n");
return -1;
}
/* Ack the event */
ibv_ack_cq_events(ev_cq, 1);
/* Request notification upon the next completion event */
ret = ibv_req_notify_cq(ev_cq, 0);
if (ret) {
fprintf(stderr, "Couldn't request CQ notification\n");
return -1;
}
/* Empty the CQ: poll all of the completions from the CQ (if any exist) */
do {
ne = ibv_poll_cq(cq, 1, &wc);
if (ne < 0) {
fprintf(stderr, "Failed to poll completions from the CQ: ret = %d\n",
ne);
return -1;
}
/* there may be an extra event with no completion in the CQ */
if (ne == 0)
continue;
if (wc.status != IBV_WC_SUCCESS) {
fprintf(stderr, "Completion with status 0x%x was found\n",
wc.status);
return -1;
}
} while (ne);
2.2 IO復用poll/epoll(in non-block way)
利用fcntl設置channel->fd的屬性為non-block,然后就可以用poll/epoll/select等來監(jiān)聽channel->fd的POLLIN事件衫仑,POLLIN事件意味著有新的completion queue event被填入CQ中梨与。user程序在被喚醒后,無需像傳統(tǒng)socket那樣進行read/write操作(因為data已經直接DMA到用戶態(tài)緩存中)文狱,而是需要做poll_cq操作粥鞋,對每一個cqe進行解析處理。
struct ibv_context *context;
struct ibv_cq *cq;
void *ev_ctx = NULL; /* can be initialized with other values for the CQ context */
/* Create a CQ, which is associated with a Completion Event Channel */
cq = ibv_create_cq(ctx, 1, ev_ctx, channel, 0);
if (!cq) {
fprintf(stderr, "Failed to create CQ\n");
return -1;
}
/* Request notification before any completion can be created (to prevent races) */
ret = ibv_req_notify_cq(cq, 0);
if (ret) {
fprintf(stderr, "Couldn't request CQ notification\n");
return -1;
}
/* The following code will be called only once, after the Completion Event Channel
was created瞄崇,to change the blocking mode of the completion channel */
int flags = fcntl(channel->fd, F_GETFL);
rc = fcntl(channel->fd, F_SETFL, flags | O_NONBLOCK);
if (rc < 0) {
fprintf(stderr, "Failed to change file descriptor of Completion Event Channel\n");
return -1;
}
/* The following code will be called each time you need to read a Work Completion */
struct pollfd my_pollfd;
struct ibv_cq *ev_cq;
void *ev_ctx;
int ne;
int ms_timeout = 10;
/*
* poll the channel until it has an event and sleep ms_timeout
* milliseconds between any iteration
*/
my_pollfd.fd = channel->fd;
my_pollfd.events = POLLIN; //只需要監(jiān)聽POLLIN事件呻粹,POLLIN事件意味著有新的cqe發(fā)生
my_pollfd.revents = 0;
do {
rc = poll(&my_pollfd, 1, ms_timeout); //非阻塞函數到踏,有cqe事件或超時時退出
} while (rc == 0);
if (rc < 0) {
fprintf(stderr, "poll failed\n");
return -1;
}
ev_cq = cq;
/* Wait for the completion event */
ret = ibv_get_cq_event(channel, &ev_cq, &ev_ctx); //獲取completion queue event。對于epoll水平觸發(fā)模式尚猿,必須要執(zhí)行ibv_get_cq_event并將該cqe取出窝稿,否則會不斷重復喚醒epoll
if (ret) {
fprintf(stderr, "Failed to get cq_event\n");
return -1;
}
/* Ack the event */
ibv_ack_cq_events(ev_cq, 1); //ack cqe
/* Request notification upon the next completion event */
ret = ibv_req_notify_cq(ev_cq, 0);
if (ret) {
fprintf(stderr, "Couldn't request CQ notification\n");
return -1;
}
/* Empty the CQ: poll all of the completions from the CQ (if any exist) */
do {
ne = ibv_poll_cq(cq, 1, &wc);
if (ne < 0) {
fprintf(stderr, "Failed to poll completions from the CQ: ret = %d\n",
ne);
return -1;
}
/* there may be an extra event with no completion in the CQ */
if (ne == 0)
continue;
if (wc.status != IBV_WC_SUCCESS) {
fprintf(stderr, "Completion with status 0x%x was found\n",
wc.status);
return -1;
}
} while (ne);
3. rpoll實現(rsocket)
rsocket是附在rdma_cm庫中的一個子模塊,提供了完全類似于socket接口的rdma調用凿掂。此處主要對rpoll的實現進行分析伴榔。
rpoll同時支持對rdma fd和正常socket fd進行監(jiān)聽,但對于rdma fd庄萎,其目前僅支持四種事件:POLLIN踪少、POLLOUT、POLLHUP糠涛、POLLERR援奢。
* Note that we may receive events on an rsocket that may not be reported
* to the user (e.g. connection events or credit updates). Process those
* events, then return to polling until we find ones of interest.
*/
int rpoll(struct pollfd *fds, nfds_t nfds, int timeout)
{
struct timeval s, e;
struct pollfd *rfds;
uint32_t poll_time = 0;
int ret;
do {
ret = rs_poll_check(fds, nfds); //主動輪詢查看是否有event發(fā)生
if (ret || !timeout) //如果有event發(fā)生或者timeout為0,直接返回
return ret;
if (!poll_time)
gettimeofday(&s, NULL);
gettimeofday(&e, NULL);
poll_time = (e.tv_sec - s.tv_sec) * 1000000 +
(e.tv_usec - s.tv_usec) + 1;
} while (poll_time <= polling_time); //嘗試輪詢polling_time時間忍捡,該時間內如果有event發(fā)生集漾,那么直接返回,否則進入后續(xù)邏輯
rfds = rs_fds_alloc(nfds); //創(chuàng)建新的pollfd數組rfds砸脊,用于添加到原生poll中具篇。
if (!rfds)
return ERR(ENOMEM);
do {
ret = rs_poll_arm(rfds, fds, nfds); //對所有verbs fd進行arm操作,并將待監(jiān)聽事件全部改為POLLIN
if (ret)
break;
ret = poll(rfds, nfds, timeout); //調用OS原生poll
if (ret <= 0)
break;
ret = rs_poll_events(rfds, fds, nfds); //將cqe或rdma_cm event轉化為具體event
} while (!ret);
rpoll中調用rs_poll_check進行輪詢凌埂,查看是否有event發(fā)生驱显。
static int rs_poll_check(struct pollfd *fds, nfds_t nfds)
{
struct rsocket *rs;
int i, cnt = 0;
for (i = 0; i < nfds; i++) {
rs = idm_lookup(&idm, fds[i].fd); //根據fd找到對應的rsocket對象
if (rs)
fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all);
//查看rsocket fd是否有event發(fā)生,手動向上拋事件
else
poll(&fds[i], 1, 0); //普通fd瞳抓,非阻塞poll一次埃疫,查詢是否有event發(fā)生
if (fds[i].revents)
cnt++;
}
return cnt;
}
static int rs_poll_rs(struct rsocket *rs, int events,
int nonblock, int (*test)(struct rsocket *rs))
{
struct pollfd fds;
short revents;
int ret;
check_cq:
if ((rs->type == SOCK_STREAM) && ((rs->state & rs_connected) ||
(rs->state == rs_disconnected) || (rs->state & rs_error))) {
rs_process_cq(rs, nonblock, test); //調用ibv_poll_cq遍歷cqe
//對于send cqe,可以在處理函數中將發(fā)送緩存重新放回到內存池中孩哑,
//對于recv cqe栓霜,可以在處理函數中更新可讀數據length和addr等
revents = 0;
if ((events & POLLIN) && rs_conn_have_rdata(rs)) //接收緩存有數據,拋POLLIN
事件
revents |= POLLIN;
if ((events & POLLOUT) && rs_can_send(rs)) //發(fā)送緩存可寫臭笆,拋POLLOUT事件
revents |= POLLOUT;
if (!(rs->state & rs_connected)) {
if (rs->state == rs_disconnected)
revents |= POLLHUP; //斷開連接叙淌,拋POLLHUP事件
else
revents |= POLLERR; //拋POLLERR事件
}
return revents;
} else if (rs->type == SOCK_DGRAM) { //UDP相關邏輯秤掌,不關注
ds_process_cqs(rs, nonblock, test);
revents = 0;
if ((events & POLLIN) && rs_have_rdata(rs))
revents |= POLLIN;
if ((events & POLLOUT) && ds_can_send(rs))
revents |= POLLOUT;
return revents;
}
if (rs->state == rs_listening) { //rmda_cm fd
fds.fd = rs->cm_id->channel->fd;
fds.events = events; //此處沒有將要監(jiān)聽的事件設置為POLLIN愁铺,why?
fds.revents = 0;
poll(&fds, 1, 0); //直接poll一次闻鉴,然后返回
return fds.revents;
}
if (rs->state & rs_opening) {
ret = rs_do_connect(rs);
if (ret && (errno == EINPROGRESS)) {
errno = 0;
} else {
goto check_cq;
}
}
if (rs->state == rs_connect_error) {
revents = 0;
if (events & POLLOUT)
revents |= POLLOUT;
if (events & POLLIN)
revents |= POLLIN;
revents |= POLLERR;
return revents;
}
return 0;
}
當主動輪詢polling_time時間后茵乱,如果仍然沒有event發(fā)生,且尚未超時孟岛,那么就需要調用rs_poll_arm函數瓶竭,其主要作用有兩點:1)對所有verbs fd進行arm操作(ibv_notify_cq_event)督勺;2)將所有rdma相關事件全部修改為監(jiān)聽POLLIN事件,然后丟給原生poll函數去監(jiān)聽斤贰。
static int rs_poll_arm(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
{
struct rsocket *rs;
int i;
for (i = 0; i < nfds; i++) {
rs = idm_lookup(&idm, fds[i].fd);
if (rs) { // rdma相關fd
fds[i].revents = rs_poll_rs(rs, fds[i].events, 0, rs_is_cq_armed);
if (fds[i].revents)
return 1;
if (rs->type == SOCK_STREAM) {
if (rs->state >= rs_connected)
rfds[i].fd = rs->cm_id->recv_cq_channel->fd; //verbs fd智哀,用于通知data傳輸event
else
rfds[i].fd = rs->cm_id->channel->fd; //rdma_cm fd,用于通知connect event
} else {
rfds[i].fd = rs->epfd;
}
rfds[i].events = POLLIN; //所有監(jiān)聽事件全部改為POLLIN
} else { //普通fd
rfds[i].fd = fds[i].fd;
rfds[i].events = fds[i].events;
}
rfds[i].revents = 0;
}
return 0;
}
原生poll在超時時間內如果監(jiān)聽到有事件發(fā)生荧恍,那么調用rs_poll_events函數瓷叫。
static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
{
struct rsocket *rs;
int i, cnt = 0;
for (i = 0; i < nfds; i++) {
if (!rfds[i].revents) //沒有事件發(fā)生,跳過
continue;
rs = idm_lookup(&idm, fds[i].fd);
if (rs) {
fastlock_acquire(&rs->cq_wait_lock);
if (rs->type == SOCK_STREAM)
rs_get_cq_event(rs); //調用ibv_get_cq_event
else
ds_get_cq_event(rs);
fastlock_release(&rs->cq_wait_lock);
fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all); //手動向上拋事件
} else {
fds[i].revents = rfds[i].revents; //普通fd送巡,直接向上拋事件
}
if (fds[i].revents)
cnt++;
}
return cnt;
}
總結來看摹菠,對于rpoll實現,主要分兩個步驟:
- 主動遍歷輪詢polling_time時間骗爆,查看是否有event發(fā)生次氨;
- 如果polling_time時間內沒有event發(fā)生,那么將verbs/rdma_cm fd直接注冊到OS原生poll中摘投,并將待監(jiān)聽事件改為POLLIN煮寡,然后調用原生poll。如果poll監(jiān)聽到verbs/rdma_cm fd的事件犀呼,這只意味著有cqe事件或rdma_cm事件發(fā)生洲押,不能直接返回給用戶,需要額外進行邏輯判斷圆凰,以確定究竟是否要向上拋事件杈帐,以及拋什么事件。
4. 總結
對于rdma編程专钉,目前主流實現是利用rdma_cm來建立連接挑童,然后利用verbs來傳輸數據。
rdma_cm和ibverbs分別會創(chuàng)建一個fd跃须,這兩個fd的分工不同站叼。rdma_cm fd主要用于通知建連相關的事件,verbs fd則主要通知有新的cqe發(fā)生菇民。當直接對rdma_cm fd進行poll/epoll監(jiān)聽時尽楔,此時只能監(jiān)聽到POLLIN事件,這意味著有rdma_cm事件發(fā)生第练。當直接對verbs fd進行poll/epoll監(jiān)聽時阔馋,同樣只能監(jiān)聽到POLLIN事件,這意味著有新的cqe娇掏。