ubus消息內(nèi)存模型
發(fā)送數(shù)據(jù)的時(shí)候以blob_attr形式發(fā)送箍铭,但是blob_attr以blob_buf的形式進(jìn)行管理。
struct blob_buf {
struct blob_attr *head;
bool (*grow)(struct blob_buf *buf, int minlen);
int buflen;
void *buf;
};
struct blob_attr {
uint32_t id_len;
char data[];
} __packed;
struct blobmsg_hdr {
uint16_t namelen;
uint8_t name[];
} __packed;
下面以如下調(diào)用為例蜜唾,分析ubus構(gòu)造消息的過程
blobmsg_add_string(&b, "name", "dtwdebug");
blobmsg_add_string 調(diào)用了 blobmsg_add_field帖旨, BLOBMSG_TYPE_STRING = 3
blobmsg_add_field調(diào)用blobmsg_new創(chuàng)建一個(gè)blob_attr
blobmsg_new里調(diào)用blob_new生成的blob_attr的id_len的十六進(jìn)制形式為03 00 00 15,其中0x03為type表示字符串灵妨,0x15表示整個(gè)blob_attr的長度(id_len加data的長度)解阅,0x15 = 21 = 17 + 4 = attrlen + sizeof(id_len) = blobmsg_hdrlen(4) + 9 + 4 = 二字節(jié)對(duì)齊(sizeof(blobmsg_hdr + 4 + 1)) + 9 + 4 = 二字節(jié)對(duì)齊(2 + 4 + 1) + 9 + 4 = 21.
總而言之,blob_attr.data部分的長度attrlen為 二字節(jié)對(duì)齊(sizeof(blobmsg_hdr) + namelen + 1) + payload_len泌霍,此處namelen = strlen("name") = 4, payload_len = strlen("dtwdebug") + 1 = 9
由此货抄,構(gòu)造了一個(gè)83 00 00 15 00 04 6e 61 6d 65 00 00 64 74 77 64 65 62 75 67 00的blob_attr
00 00 00 1c // blob_attr.id_len = 28 = 四字節(jié)對(duì)齊(21) + 四字節(jié)對(duì)齊(4)
83 00 00 15 // blob_attr.id_len = 21
00 04 // blobmsg_hdr.namelen = 4
6e 61 6d 65 00 blobmsg_hdr.name = "name"
00 // padding
64 74 77 64 65 62 75 67 00 // blobmsg_data = "dtwdebug"
下面簡要分析ubus_notify發(fā)送的數(shù)據(jù)的內(nèi)存模型
ubus_notify(ctx, &student_object, "student.remove", b.head, 10000);
ubus_notify發(fā)送的十六進(jìn)制數(shù)據(jù)
0000 00 0a 00 02 dd 4d 31 68 00 00 00 3c 03 00 00 08 .....M1h...<....
0010 dd 4d 31 68 04 00 00 13 73 74 75 64 65 6e 74 2e .M1h....student.
0020 72 65 6d 6f 76 65 00 00 07 00 00 1c 83 00 00 15 remove..........
0030 00 04 6e 61 6d 65 00 00 64 74 77 64 65 62 75 67 ..name..dtwdebug
0040 00 00 00 00 ....
///////////////////////////////////////////////第一層為一個(gè)ubus_msghdr和一個(gè)blob_attr
////////////// ubus_msghdr部分
struct ubus_msghdr {
uint8_t version;
uint8_t type;
uint16_t seq;
uint32_t peer;
} __packetdata;
00 // version
0a // type
00 02 // seq
dd 4d 31 68 // peer
///////////// blob_attr
struct blob_attr {
uint32_t id_len;
char data[];
} __packed;
///////////////////////////////////////////////第二層是一個(gè)整體的blob_attr述召,開頭四個(gè)字節(jié)表示總數(shù)據(jù)的長度 60
00 00 00 3c // blob_attr.id_len = 60
03 00 00 08 dd 4d 31 68 04 00 00 13 73 74 75 64
65 6e 74 2e 72 65 6d 6f 76 65 00 00 07 00 00 1c
83 00 00 15 00 04 6e 61 6d 65 00 00 64 74 77 64
65 62 75 67 00 00 00 00
///////////////////////////////////////////////第三層是ubus_notify函數(shù)需要攜帶的一些信息,包括 ojbect id蟹地,notyfy的type("student.remove"), 攜帶的數(shù)據(jù)积暖,其中攜帶的數(shù)據(jù)也是一個(gè)blob_attr
03 00 00 08 // type UBUS_ATTR_OBJID = 3
dd 4d 31 68 // value student_object.id
04 00 00 13 // type UBUS_ATTR_METHOD = 4
73 74 75 64 65 6e 74 2e 72 65 6d 6f 76 65 00 00 // value "student.remove",最后一個(gè)0x00為二字節(jié)對(duì)齊后的padding
07 00 00 1c // type UBUS_ATTR_DATA = 7
83 00 00 15 00 04 6e 61 6d 65 00 00 64 74 77 64 65 62 75 67 00 00 00 00 // value 是一個(gè)blob_attr
///////////////////////////////////////////////第四層是notify攜帶的數(shù)據(jù)就是前面分析的blobmsg_add_string構(gòu)造的blob_attr
83 00 00 15 // type BLOBMSG_TYPE_STRING = 3, 第一個(gè)比特位為BLOB_ATTR_EXTENDED的標(biāo)志位怪与,
00 04 6e 61 6d 65 00 00 64 74 77 64 65 62 75 67 00 00 00 00
第五層是blobmsg部分
struct blobmsg_hdr {
uint16_t namelen;
uint8_t name[];
} __packed;
注意:blobmsg_hdr的長度為 二字節(jié)對(duì)齊(2 + strlen(name) + 1)
00 04 // blobmsg_hdr.namelen = 4
6e 61 6d 65 00 00 // blobmsg_hdr.name = "name", 最后的0x00為二字節(jié)對(duì)齊后的padding
64 74 77 64 65 62 75 67 00 // blobmsg_data = "dtwdebug"
ubus如何解決粘包
發(fā)送消息
// ubus_notify調(diào)用棧
libubus.so!writev_retry(int fd, struct iovec * iov, int iov_len, int sock_fd) (/home/anlan/Desktop/ubus/ubus/libubus-io.c:84)
libubus.so!ubus_send_msg(struct ubus_context * ctx, uint32_t seq, struct blob_attr * msg, int cmd, uint32_t peer, int fd) (/home/anlan/Desktop/ubus/ubus/libubus-io.c:148)
libubus.so!__ubus_start_request(struct ubus_context * ctx, struct ubus_request * req, struct blob_attr * msg, int cmd, uint32_t peer) (/home/anlan/Desktop/ubus/ubus/libubus-req.c:66)
libubus.so!ubus_start_request(struct ubus_context * ctx, struct ubus_request * req, struct blob_attr * msg, int cmd, uint32_t peer) (/home/anlan/Desktop/ubus/ubus/libubus-req.c:76)
libubus.so!__ubus_notify_async(struct ubus_context * ctx, struct ubus_object * obj, const char * type, struct blob_attr * msg, struct ubus_notify_request * req, _Bool reply) (/home/anlan/Desktop/ubus/ubus/libubus-req.c:291)
libubus.so!ubus_notify(struct ubus_context * ctx, struct ubus_object * obj, const char * type, struct blob_attr * msg, int timeout) (/home/anlan/Desktop/ubus/ubus/libubus-req.c:317)
main(int argc, char ** argv) (/home/anlan/Desktop/ubus/ubustest/ubus_pub.c:224)
由前面的分析已知ubus_notify發(fā)送的消息最外層分為兩部分夺刑,ubusmsg_hdr + blob_attr,具體處理發(fā)送邏輯的函數(shù)為 writev_retry分别,這個(gè)函數(shù)要么把消息完整發(fā)出去遍愿,要么返回失敗。在ubus_send_msg中可以看到耘斩,如果消息沒有被完整發(fā)出去沼填,那么就當(dāng)斷開連接處理。
static int writev_retry(int fd, struct iovec *iov, int iov_len, int sock_fd)
{
uint8_t fd_buf[CMSG_SPACE(sizeof(int))] = { 0 };
struct msghdr msghdr = { 0 };
struct cmsghdr *cmsg;
int len = 0;
int *pfd;
msghdr.msg_iov = iov,
msghdr.msg_iovlen = iov_len,
msghdr.msg_control = fd_buf;
msghdr.msg_controllen = sizeof(fd_buf);
cmsg = CMSG_FIRSTHDR(&msghdr);
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_len = CMSG_LEN(sizeof(int));
pfd = (int *) CMSG_DATA(cmsg);
msghdr.msg_controllen = cmsg->cmsg_len;
do {
ssize_t cur_len;
if (sock_fd < 0) {
msghdr.msg_control = NULL;
msghdr.msg_controllen = 0;
} else {
*pfd = sock_fd;
}
cur_len = sendmsg(fd, &msghdr, 0);
if (cur_len < 0) {
switch(errno) {
case EAGAIN:
wait_data(fd, true);
break;
case EINTR:
break;
default:
return -1;
}
continue;
}
if (len > 0)
sock_fd = -1;
len += cur_len;
while (cur_len >= (ssize_t) iov->iov_len) {
cur_len -= iov->iov_len;
iov_len--;
iov++;
if (!iov_len)
return len;
}
iov->iov_base += cur_len;
iov->iov_len -= cur_len;
msghdr.msg_iov = iov;
msghdr.msg_iovlen = iov_len;
} while (1);
/* Should never reach here */
return -1;
}
int __hidden ubus_send_msg(struct ubus_context *ctx, uint32_t seq,
struct blob_attr *msg, int cmd, uint32_t peer, int fd)
{
struct ubus_msghdr hdr;
struct iovec iov[2] = {
STATIC_IOV(hdr)
};
int ret;
hdr.version = 0;
hdr.type = cmd;
hdr.seq = cpu_to_be16(seq);
hdr.peer = cpu_to_be32(peer);
if (!msg) {
blob_buf_init(&b, 0);
msg = b.head;
}
iov[1].iov_base = (char *) msg;
iov[1].iov_len = blob_raw_len(msg);
ret = writev_retry(ctx->sock.fd, iov, ARRAY_SIZE(iov), fd);
if (ret < 0) // 如果消息沒有被完整發(fā)出去括授,那么當(dāng)斷開連接處理
ctx->sock.eof = true; // 置為true表示斷開連接
if (fd >= 0)
close(fd);
return ret;
}
接收消息
如果給ubus_notify設(shè)置超時(shí)時(shí)間坞笙,那么client端就會(huì)處理ubusd返回的數(shù)據(jù),在get_next_msg函數(shù)中會(huì)調(diào)用兩次 recv_retry接收數(shù)據(jù)荚虚。首先說明recv_retry這個(gè)函數(shù)會(huì)接收固定長度的數(shù)據(jù)薛夜。
// ubus_notify處理ubusd返回?cái)?shù)據(jù)的調(diào)用棧
libubus.so!recv_retry(struct ubus_context * ctx, struct iovec * iov, _Bool wait, int * recv_fd) (/home/anlan/Desktop/ubus/ubus/libubus-io.c:159)
libubus.so!get_next_msg(struct ubus_context * ctx, int * recv_fd) (/home/anlan/Desktop/ubus/ubus/libubus-io.c:306)
libubus.so!ubus_handle_data(struct uloop_fd * u, unsigned int events) (/home/anlan/Desktop/ubus/ubus/libubus-io.c:321)
libubus.so!ubus_poll_data(struct ubus_context * ctx, int timeout) (/home/anlan/Desktop/ubus/ubus/libubus-io.c:344)
libubus.so!ubus_complete_request(struct ubus_context * ctx, struct ubus_request * req, int req_timeout) (/home/anlan/Desktop/ubus/ubus/libubus-req.c:163)
libubus.so!ubus_notify(struct ubus_context * ctx, struct ubus_object * obj, const char * type, struct blob_attr * msg, int timeout) (/home/anlan/Desktop/ubus/ubus/libubus-req.c:326)
main(int argc, char ** argv) (/home/anlan/Desktop/ubus/ubustest/ubus_pub.c:224)
接收返回消息的函數(shù)為get_next_msg,在這個(gè)函數(shù)中會(huì)調(diào)用兩次recv_retry版述,第一次調(diào)用recv_retry的時(shí)候設(shè)置的固定長度為12却邓,前面已經(jīng)分析ubus發(fā)送的消息分為兩部分,ubusmsg_hdr加一個(gè)blob_attr院水,12正好為ubusmsg_hdr的大小加blob_attr.id_len的內(nèi)存大小 8 + 4 = 12。
第二次調(diào)用recv_retry简十,設(shè)置的接收數(shù)據(jù)的大小為20檬某,因?yàn)榈谝淮握{(diào)用已經(jīng)把ubusmsg_hdr和blob_attr.id_len讀取過了,所以20為根據(jù)id_len算出來的blob_attr.data的長度螟蝙。
0000 00 01 00 02 90 67 e9 ee 00 00 00 18 03 00 00 08 .....g..........
0010 90 67 e9 ee 0b 00 00 04 01 00 00 08 00 00 00 00 .g..............
00 01 00 02 90 67 e9 ee // ubusmsg_hdr
00 00 00 18 // blob_attr.id_len = 24
03 00 00 08 90 67 e9 ee 0b 00 00 04 01 00 00 08 00 00 00 00 // blob_attr.data的長度為20
static int recv_retry(struct ubus_context *ctx, struct iovec *iov, bool wait, int *recv_fd)
{
uint8_t fd_buf[CMSG_SPACE(sizeof(int))] = { 0 };
struct msghdr msghdr = { 0 };
struct cmsghdr *cmsg;
int total = 0;
int bytes;
int *pfd;
int fd;
fd = ctx->sock.fd;
msghdr.msg_iov = iov,
msghdr.msg_iovlen = 1,
msghdr.msg_control = fd_buf;
msghdr.msg_controllen = sizeof(fd_buf);
cmsg = CMSG_FIRSTHDR(&msghdr);
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_len = CMSG_LEN(sizeof(int));
pfd = (int *) CMSG_DATA(cmsg);
while (iov->iov_len > 0) {
if (recv_fd) {
msghdr.msg_control = fd_buf;
msghdr.msg_controllen = cmsg->cmsg_len;
} else {
msghdr.msg_control = NULL;
msghdr.msg_controllen = 0;
}
*pfd = -1;
bytes = recvmsg(fd, &msghdr, 0);
if (!bytes)
return -1;
if (bytes < 0) {
bytes = 0;
if (errno == EINTR)
continue;
if (errno != EAGAIN)
return -1;
}
if (!wait && !bytes)
return 0;
if (recv_fd)
*recv_fd = *pfd;
recv_fd = NULL;
wait = true;
iov->iov_len -= bytes;
iov->iov_base += bytes;
total += bytes;
if (iov->iov_len > 0)
wait_data(fd, false);
}
return total;
}
如何實(shí)現(xiàn)接受消息超時(shí)等待的功能
ubus_notify接收消息的超時(shí)等待功能主要在ubus_complete_request中實(shí)現(xiàn)
int ubus_complete_request(struct ubus_context *ctx, struct ubus_request *req,
int req_timeout)
{
ubus_complete_handler_t complete_cb = req->complete_cb;
int status = UBUS_STATUS_NO_DATA;
int64_t timeout = 0, time_end = 0;
if (req_timeout)
time_end = get_time_msec() + req_timeout;
ubus_complete_request_async(ctx, req);
req->complete_cb = ubus_sync_req_cb;
ctx->stack_depth++;
while (!req->status_msg) {
if (req_timeout) { // 如果設(shè)置了超時(shí)時(shí)間
timeout = time_end - get_time_msec(); // 計(jì)算超時(shí)時(shí)間
if (timeout <= 0) {
ubus_set_req_status(req, UBUS_STATUS_TIMEOUT);
break;
}
}
ubus_poll_data(ctx, (unsigned int) timeout); // 在超時(shí)時(shí)間內(nèi)等待緩沖區(qū)是否可讀恢恼,如果可讀或者超時(shí)都會(huì)走到get_next_msg,
// get_next_msg會(huì)調(diào)用recv_retry,recv_retry這個(gè)函數(shù)會(huì)嘗試讀取數(shù)據(jù)胰默,如果緩沖區(qū)可讀场斑,那么會(huì)一直把完整的消息從緩沖區(qū)讀取出來,如果超時(shí)緩沖區(qū)無數(shù)據(jù)可讀牵署,那么直接走下面的邏輯
if (ctx->sock.eof) {
ubus_set_req_status(req, UBUS_STATUS_CONNECTION_FAILED);
ctx->cancel_poll = true;
break;
}
}
ctx->stack_depth--;
if (ctx->stack_depth)
ctx->cancel_poll = true;
if (req->status_msg)
status = req->status_code;
req->complete_cb = complete_cb;
if (req->complete_cb)
req->complete_cb(req, status);
if (!ctx->stack_depth && !ctx->sock.registered)
ctx->pending_timer.cb(&ctx->pending_timer);
return status;
}
// 讀取固定長度的數(shù)據(jù)漏隐,長度由iov->iov_len指定,wait = true時(shí)表示要么出錯(cuò)奴迅,要么必須讀取到固定長度的數(shù)據(jù)青责,wait = false表示嘗試讀一下,如果沒讀到數(shù)據(jù)那么返回0
// 返回-1 表示出現(xiàn)錯(cuò)誤,返回0表示wait = false而且當(dāng)前緩沖區(qū)內(nèi)無數(shù)據(jù)可讀脖隶,返回指定的數(shù)據(jù)長度扁耐,表示成功讀取
static int recv_retry(struct ubus_context *ctx, struct iovec *iov, bool wait, int *recv_fd)
{
uint8_t fd_buf[CMSG_SPACE(sizeof(int))] = { 0 };
struct msghdr msghdr = { 0 };
struct cmsghdr *cmsg;
int total = 0;
int bytes;
int *pfd;
int fd;
fd = ctx->sock.fd;
msghdr.msg_iov = iov,
msghdr.msg_iovlen = 1,
msghdr.msg_control = fd_buf;
msghdr.msg_controllen = sizeof(fd_buf);
cmsg = CMSG_FIRSTHDR(&msghdr);
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_len = CMSG_LEN(sizeof(int));
pfd = (int *) CMSG_DATA(cmsg);
while (iov->iov_len > 0) { // 剩余數(shù)據(jù)的長度大于0
if (recv_fd) {
msghdr.msg_control = fd_buf;
msghdr.msg_controllen = cmsg->cmsg_len;
} else {
msghdr.msg_control = NULL;
msghdr.msg_controllen = 0;
}
*pfd = -1;
bytes = recvmsg(fd, &msghdr, 0); // 讀取數(shù)據(jù)
if (!bytes) // 返回0表示對(duì)端關(guān)閉連接
return -1;
if (bytes < 0) {
bytes = 0;
if (errno == EINTR) // 被信號(hào)打斷,繼續(xù)讀取
continue;
if (errno != EAGAIN) // EAGAIN表示當(dāng)前無數(shù)據(jù)可讀产阱,也應(yīng)該繼續(xù)讀取婉称,如果errno既不是EINTR也不是EAGAIN,表示出錯(cuò)了返回-1
return -1;
}
if (!wait && !bytes) // 如果參數(shù)wait為false而且未讀取到數(shù)據(jù)构蹬,那么直接返回0
return 0;
if (recv_fd)
*recv_fd = *pfd;
recv_fd = NULL;
wait = true; // 如果讀到了數(shù)據(jù)王暗,那么就必須把數(shù)據(jù)接收完整,所以wait置為true
iov->iov_len -= bytes; // 更新剩余要讀取的數(shù)據(jù)長度
iov->iov_base += bytes; // 更新保存剩余數(shù)據(jù)的位置
total += bytes; // 更新已經(jīng)讀取的數(shù)據(jù)長度
if (iov->iov_len > 0) // 如果還有數(shù)據(jù)要讀取怎燥,那么就用poll去等待緩沖區(qū)可讀
wait_data(fd, false);
}
return total;
}
static void wait_data(int fd, bool write)
{
struct pollfd pfd = { .fd = fd };
pfd.events = write ? POLLOUT : POLLIN;
poll(&pfd, 1, -1);
}