之前進(jìn)程間通信的知識都是應(yīng)付面試的時(shí)候臨時(shí)補(bǔ)的屡律,最近還是要通過碼代碼來學(xué)習(xí)下腌逢,主要參考《Linux/Unix系統(tǒng)編程手冊》,本來照著這本書的示例一路敲下來就足夠了超埋,但是書上給的都是比較完整的例子(畢竟便于即敲即用嘛)搏讶,這里我盡量簡化佳鳖。
1. 準(zhǔn)備
這里給出下文可能用到的變量的定義
mqd_t mqd; // 消息隊(duì)列句柄,可用于IO多路復(fù)用
struct mq_attr attr; // 消息隊(duì)列屬性媒惕,下面為字段說明系吩,[]內(nèi)為可以設(shè)置/獲取該字段的API
// .mq_flags 0或者O_NONBLOCK [mq_getattr(), mq_setattr()]
// .mq_maxmsg 最大消息數(shù)量 [mq_open(), mq_getattr()]
// .mq_msgsize 最大消息大小 [mq_open(), mq_getattr()]
// .mq_curmsgs 隊(duì)列中消息數(shù)量 [mq_getattr()]
const char* mq_name = "/mq"; // 消息隊(duì)列名稱,必須以'/'開頭
消息隊(duì)列的maxmsg
和msgsize
都是在創(chuàng)建時(shí)指定的妒蔚,之后不能改變穿挨。是否非阻塞讀寫則可以手動設(shè)定。消息數(shù)量也是只讀的肴盏,是隨著消息隊(duì)列的讀取/寫入而改變的科盛,每次寫入則加1,每次讀取則減1菜皂。
另外贞绵,相關(guān)API都是返回-1作為錯(cuò)誤碼,之后API說明略去檢查返回值的步驟恍飘。
gcc
編譯時(shí)需要加上-lrt
選項(xiàng)榨崩,動態(tài)鏈接到共享庫librt.so
。
2. 基本API
創(chuàng)建消息隊(duì)列
// 參數(shù)2和3同系統(tǒng)調(diào)用open(2)章母,參數(shù)4設(shè)置自定義消息隊(duì)列屬性母蛛,若為NULL則不設(shè)置,也可以不要參數(shù)4
mqd = mq_open(mq_name, O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, &attr);
生產(chǎn)者
下列代碼發(fā)送3個(gè)消息到隊(duì)列中
mqd = mq_open(mq_name, O_WRONLY | O_NONBLOCK);
const char* messages[] = {"msg-1", "msg-2", "msg-3"};
int priorities[] = {4, 0, 6};
for (int i = 0; i < 3; ++i)
mq_send(mqd, messages[i], strlen(messages[i]), priorities[i]);
注意對現(xiàn)有的消息隊(duì)列調(diào)用mq_open
時(shí)乳怎,第2個(gè)參數(shù)不帶O_CREAT
時(shí)不需要指定權(quán)限彩郊。
發(fā)送時(shí)需要指定優(yōu)先級,消息按照優(yōu)先級降序存在消息隊(duì)列中舞肆,因此隊(duì)列中的消息依次是"msg-3", "msg-1", "msg-2"
焦辅。
消費(fèi)者
讀取消息隊(duì)列中所有消息并顯示優(yōu)先級
mqd = mq_open(mq_name, O_WRONLY | O_NONBLOCK);
mq_getattr(mqd, &attr); // 取得隊(duì)列屬性,從而定義合適大小的緩沖區(qū)
char* buffer = new buffer[attr.mq_msgsize + 1];
unsigned int priority;
ssize_t num_read;
while ((num_read = mq_receive(mqd, buffer, attr.mq_msgsize, &priority)) != -1) {
buffer[num_read] = '\0';
printf("[%2u] %s\n", priority, buffer);
}
// 若errno != EAGAIN則需要處理錯(cuò)誤椿胯,非阻塞模式下若隊(duì)列為空errno會被設(shè)置為EAGAIN
delete[] buffer;
另外關(guān)于EINTR
錯(cuò)誤,若設(shè)置信號處理器時(shí)指定了SA_RESTART
標(biāo)志剃根,則mq_receive
和mq_send
會自動重啟哩盲,所以無需代碼處理。但是這針對的是阻塞式I/O狈醉,參考man 7 signal
如下內(nèi)容
If a blocked call to one of the following interfaces is interrupted by a signal handler, then the call will be automatically restarted after the signal handler returns if the SA_RESTART flag was used; otherwise the call will fail with the error EINTR
至于非阻塞式I/O廉油,個(gè)人覺得根本就不會被打斷,因?yàn)闆]有意義苗傅,但是實(shí)際情況還是得看內(nèi)核怎么實(shí)現(xiàn)抒线,有篇討論可以參考。EINTR and non-blocking calls
超時(shí)讀寫
mq_timedsend()
和mq_timedreceive
相比mq_send()
和mq_receive()
僅僅多出一個(gè)參數(shù)const struct timespec *abs_timeout
渣慕,用于指定超時(shí)時(shí)間嘶炭,僅僅在O_NONBLOCK
標(biāo)記不起作用時(shí)才有效抱慌。
struct timespec
{
__time_t tv_sec; /* Seconds. */
__syscall_slong_t tv_nsec; /* Nanoseconds. */
};
注意該參數(shù)設(shè)置的是絕對時(shí)間,因此可以用clock_gettime()
來獲取CLOCK_REALTIME
時(shí)鐘的當(dāng)前值眨猎,并在該值上加上所需的時(shí)間量來生成一個(gè)恰當(dāng)初始化的timespec
結(jié)構(gòu)抑进。
2. 消息通知
示例程序的流程是順序的,先創(chuàng)建消息隊(duì)列睡陪,再寫入若干消息寺渗,再依次讀取所有消息。實(shí)際上用于進(jìn)程間通信時(shí)兰迫,寫者和讀者是并發(fā)執(zhí)行的信殊,即生產(chǎn)者-消費(fèi)者問題。
POSIX消息隊(duì)列提供了消息通知API汁果,在隊(duì)列為空時(shí)若有新的消息到來涡拘,能夠接收通知。通過下列API可以注冊通知须鼎,指定具體通知方式(比如信號)后鲸伴,若新消息到來使得隊(duì)列從空變成非空,就會調(diào)用自定義的通知處理函數(shù)晋控。
int mq_notify(mqd_t mqdes, const struct sigevent *sevp);
以下幾點(diǎn)需要特別注意:
- 任一時(shí)刻只能有1個(gè)進(jìn)程能夠向特定消息隊(duì)列注冊通知汞窗,如果已經(jīng)存在,再次注冊會失敗赡译,
errno
被置為EBUSY
仲吏。 - 若對非空隊(duì)列注冊通知,只有等到隊(duì)列被清空后蝌焚,新消息到來時(shí)才能發(fā)出通知裹唆。
- 若向注冊進(jìn)程發(fā)送了一個(gè)通知之后就會刪除注冊信息,這樣其他進(jìn)程就可以向隊(duì)列注冊通知只洒。
- 若有其他進(jìn)程調(diào)用
mq_receive()
發(fā)生阻塞许帐,則有新消息到來時(shí),其他進(jìn)程接收消息毕谴,注冊進(jìn)程繼續(xù)等待通知成畦。 - 可以傳入NULL來撤銷通知。
通過man 7 sigevent
可以查看該結(jié)構(gòu)的詳細(xì)信息
union sigval { /* 通知傳遞的數(shù)據(jù) */
int sival_int; /* Integer value */
void *sival_ptr; /* Pointer value */
};
struct sigevent {
int sigev_notify; /* 通知方法 */
int sigev_signo; /* 通知信號 */
union sigval sigev_value; /* 通知傳遞的數(shù)據(jù) */
void (*sigev_notify_function) (union sigval);
/* 線程通知的函數(shù) (SIGEV_THREAD) */
void *sigev_notify_attributes;
/* 線程通知的線程屬性 (SIGEV_THREAD) */
pid_t sigev_notify_thread_id;
/* 用于接收信號的線程ID (SIGEV_THREAD_ID) */
};
其中sigev_notify
指定通知的方法涝开,有SIGEV_NONE
(不作任何處理)循帐、SIGEV_SIGNAL
(使用信號通知)和SIGEV_THREAD
(使用線程通知)。
用示例代碼解釋典型用法(忽略了錯(cuò)誤處理)舀武,預(yù)定義變量如下
struct sigevent sev;
constexpr int NOTIFY_SIG = SIGUSR1; // 自定義通知信號的種類
使用信號通知的框架如下
int main() {
// ...
signal(SIGUSR1, [](int){}); // 僅僅用于跳出sigsuspend
sev.sigev_notify = SIGEV_SIGNAL;
sev.sigev_signo = NOTIFY_SIG;
mq_notify(mqd, &sev);
sigset_t empty_mask;
sigemptyset(&empty_mask);
while (true) {
sigsuspend(&empty_mask);
mq_notify(mqd, &sev);
// TODO: 處理由空變?yōu)榉强盏南㈥?duì)列
}
}
使用線程通知的框架如下
static void notifySetup(mqd_t& mqd); // 注冊通知線程拄养,函數(shù)的前置聲明
static void notifyFunc(union sigval sv) {
auto& mqd = *reinterpret_cast<mqd_t*>(sv.sival_ptr);
notifySetup(mqd);
// TODO: 處理由空變?yōu)榉强盏南㈥?duì)列
}
static void notifySetup(mqd_t& mqd) {
sev.sigev_notify = SIGEV_THREAD;
sev.sigev_notify_function = notifyFunc;
sev.sigev_notify_attributes = NULL;
sev.sigev_value.sival_ptr = &mqd;
mq_notify(mqd, &sev);
}
int main() {
// ...
notifySetup(mqd);
pause(); // 主線程永遠(yuǎn)中止,因?yàn)槎〞r(shí)器通知是在一個(gè)單獨(dú)的線程中調(diào)用notifyFunc()來分發(fā)的
}
3. 相關(guān)系統(tǒng)命令
常見的命令银舱,ipcs
顯示IPC狀態(tài)瘪匿,ipcrm
釋放IPC跛梗。但是它們都只能用于System V IPC,對POSIX IPC沒有作用柿顶。POSIX IPC被實(shí)現(xiàn)成了虛擬文件系統(tǒng)的文件茄袖,可以直接使用ls
和rm
來列出和刪除這些文件,掛載在/dev/
目錄下嘁锯,因此可以用常見的Linux命令來管理宪祥。
/dev/mqueue/xxx
:記錄了消息隊(duì)列/xxx
的狀態(tài),這也解釋了為何IPC名字要以/
開頭家乘。
# cat /dev/mqueue/mq
QSIZE:13 NOTIFY:0 SIGNO:10 NOTIFY_PID:30527
其中QSIZE
為所有未消費(fèi)的消息大小之和蝗羊,并且進(jìn)程30527在等待信號10(SIGUSR1)的通知。這是我在啟動之前的mq_notify
示例程序監(jiān)聽所致仁锯。
如果rm
掉該文件耀找,那么之后消息隊(duì)列也無效了。
/proc/sys/fs/mqueue/
下記錄了一些文件
-
msg_default
:消息隊(duì)列默認(rèn)屬性的mq_maxmsg
字段的值业崖; -
msg_max
:消息隊(duì)列默認(rèn)屬性的mq_maxmsg
字段的上限野芒; -
msgsize_default
:消息隊(duì)列默認(rèn)屬性的mq_msgsize
字段的值; -
msgsize_max
:消息隊(duì)列默認(rèn)屬性的mq_msgsize
字段的上限双炕; -
queues_max
:系統(tǒng)可以創(chuàng)建的消息隊(duì)列個(gè)數(shù)上限狞悲。
# tail -n +1 /proc/sys/fs/mqueue/* | grep -v "^$"
==> /proc/sys/fs/mqueue/msg_default <==
10
==> /proc/sys/fs/mqueue/msg_max <==
10
==> /proc/sys/fs/mqueue/msgsize_default <==
8192
==> /proc/sys/fs/mqueue/msgsize_max <==
8192
==> /proc/sys/fs/mqueue/queues_max <==
256
其中tail
的+NUM
代表打印第NUM行開始的內(nèi)容。
4. I/O多路復(fù)用
POSIX消息隊(duì)列相比System V消息隊(duì)列的最大優(yōu)點(diǎn)就是妇斤,mqd_t
類型的句柄可以被select/poll/epoll
監(jiān)聽摇锋。
示例代碼epoll_consumer.cc
5. 總結(jié)
消息隊(duì)列本質(zhì)是消息組成的鏈表,允許進(jìn)程以消息的形式交換數(shù)據(jù)站超,和數(shù)據(jù)報(bào)socket一樣荸恕。不同于TCP的流式傳輸,消息具有邊界死相,N次寫入對應(yīng)N次讀取融求,若讀取緩沖區(qū)太小,則剩余的部分會被舍棄算撮,而不會留給下次繼續(xù)讀双肤。
隊(duì)列具有容量上限和單條消息的大小上限,隊(duì)列填滿時(shí)钮惠,寫入操作會被阻塞,非阻塞模式下會失敗并設(shè)置errno
為EAGAIN
七芭,因此采用非阻塞模式能用循環(huán)讀取整個(gè)隊(duì)列而進(jìn)行后續(xù)操作素挽。單次發(fā)送的數(shù)據(jù)超過消息大小上限時(shí),會發(fā)送失敗狸驳。
相比System V消息隊(duì)列预明,POSIX消息隊(duì)列的優(yōu)點(diǎn)是:
- 支持I/O多路復(fù)用缩赛;
- 支持隊(duì)列從空變?yōu)榉强諘r(shí)的消息異步通知。
- 如同其他POSIX IPC撰糠,POSIX消息隊(duì)列維護(hù)了引用計(jì)數(shù)酥馍,支持安全地刪除。
缺點(diǎn)在于可移植性較差阅酪,因?yàn)檎Q生較晚旨袒。大多情況下這不是問題,不維護(hù)老代碼的話完全可以用POSIX消息隊(duì)列术辐。另一方面POSIX消息隊(duì)列嚴(yán)格按照優(yōu)先級排序砚尽,System V消息隊(duì)列支持按照消息的類型字段來讀取消息。