IPC學(xué)習(xí)之POSIX消息隊(duì)列

之前進(jìn)程間通信的知識都是應(yīng)付面試的時(shí)候臨時(shí)補(bǔ)的屡律,最近還是要通過碼代碼來學(xué)習(xí)下腌逢,主要參考《Linux/Unix系統(tǒng)編程手冊》,本來照著這本書的示例一路敲下來就足夠了超埋,但是書上給的都是比較完整的例子(畢竟便于即敲即用嘛)搏讶,這里我盡量簡化佳鳖。

1. 準(zhǔn)備

這里給出下文可能用到的變量的定義

mqd_t mqd;  // 消息隊(duì)列句柄,可用于IO多路復(fù)用
struct mq_attr attr;  // 消息隊(duì)列屬性媒惕,下面為字段說明系吩,[]內(nèi)為可以設(shè)置/獲取該字段的API
                      // .mq_flags 0或者O_NONBLOCK [mq_getattr(), mq_setattr()]
                      // .mq_maxmsg 最大消息數(shù)量 [mq_open(), mq_getattr()]
                      // .mq_msgsize 最大消息大小 [mq_open(), mq_getattr()]
                      // .mq_curmsgs 隊(duì)列中消息數(shù)量 [mq_getattr()]
const char* mq_name = "/mq";  // 消息隊(duì)列名稱,必須以'/'開頭

消息隊(duì)列的maxmsgmsgsize都是在創(chuàng)建時(shí)指定的妒蔚,之后不能改變穿挨。是否非阻塞讀寫則可以手動設(shè)定。消息數(shù)量也是只讀的肴盏,是隨著消息隊(duì)列的讀取/寫入而改變的科盛,每次寫入則加1,每次讀取則減1菜皂。
另外贞绵,相關(guān)API都是返回-1作為錯(cuò)誤碼,之后API說明略去檢查返回值的步驟恍飘。
gcc編譯時(shí)需要加上-lrt選項(xiàng)榨崩,動態(tài)鏈接到共享庫librt.so

2. 基本API

創(chuàng)建消息隊(duì)列

// 參數(shù)2和3同系統(tǒng)調(diào)用open(2)章母,參數(shù)4設(shè)置自定義消息隊(duì)列屬性母蛛,若為NULL則不設(shè)置,也可以不要參數(shù)4
mqd = mq_open(mq_name, O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, &attr);

生產(chǎn)者

下列代碼發(fā)送3個(gè)消息到隊(duì)列中

mqd = mq_open(mq_name, O_WRONLY | O_NONBLOCK);
const char* messages[] = {"msg-1", "msg-2", "msg-3"};
int priorities[] = {4, 0, 6};
for (int i = 0; i < 3; ++i)
    mq_send(mqd, messages[i], strlen(messages[i]), priorities[i]);

注意對現(xiàn)有的消息隊(duì)列調(diào)用mq_open時(shí)乳怎,第2個(gè)參數(shù)不帶O_CREAT時(shí)不需要指定權(quán)限彩郊。
發(fā)送時(shí)需要指定優(yōu)先級,消息按照優(yōu)先級降序存在消息隊(duì)列中舞肆,因此隊(duì)列中的消息依次是"msg-3", "msg-1", "msg-2"焦辅。

消費(fèi)者

讀取消息隊(duì)列中所有消息并顯示優(yōu)先級

mqd = mq_open(mq_name, O_WRONLY | O_NONBLOCK);
mq_getattr(mqd, &attr);  // 取得隊(duì)列屬性,從而定義合適大小的緩沖區(qū)
char* buffer = new buffer[attr.mq_msgsize + 1];
unsigned int priority;
ssize_t num_read;
while ((num_read = mq_receive(mqd, buffer, attr.mq_msgsize, &priority)) != -1) {
    buffer[num_read] = '\0';
    printf("[%2u] %s\n", priority, buffer); 
}
// 若errno != EAGAIN則需要處理錯(cuò)誤椿胯,非阻塞模式下若隊(duì)列為空errno會被設(shè)置為EAGAIN
delete[] buffer;

另外關(guān)于EINTR錯(cuò)誤,若設(shè)置信號處理器時(shí)指定了SA_RESTART標(biāo)志剃根,則mq_receivemq_send會自動重啟哩盲,所以無需代碼處理。但是這針對的是阻塞式I/O狈醉,參考man 7 signal如下內(nèi)容

If a blocked call to one of the following interfaces is interrupted by a signal handler, then the call will be automatically restarted after the signal handler returns if the SA_RESTART flag was used; otherwise the call will fail with the error EINTR

至于非阻塞式I/O廉油,個(gè)人覺得根本就不會被打斷,因?yàn)闆]有意義苗傅,但是實(shí)際情況還是得看內(nèi)核怎么實(shí)現(xiàn)抒线,有篇討論可以參考。EINTR and non-blocking calls

超時(shí)讀寫

mq_timedsend()mq_timedreceive相比mq_send()mq_receive()僅僅多出一個(gè)參數(shù)const struct timespec *abs_timeout渣慕,用于指定超時(shí)時(shí)間嘶炭,僅僅在O_NONBLOCK標(biāo)記不起作用時(shí)才有效抱慌。

 struct timespec
   {     
     __time_t tv_sec;        /* Seconds.  */
     __syscall_slong_t tv_nsec;  /* Nanoseconds.  */                                                                                                             
   };    

注意該參數(shù)設(shè)置的是絕對時(shí)間,因此可以用clock_gettime()來獲取CLOCK_REALTIME時(shí)鐘的當(dāng)前值眨猎,并在該值上加上所需的時(shí)間量來生成一個(gè)恰當(dāng)初始化的timespec結(jié)構(gòu)抑进。

2. 消息通知

示例程序的流程是順序的,先創(chuàng)建消息隊(duì)列睡陪,再寫入若干消息寺渗,再依次讀取所有消息。實(shí)際上用于進(jìn)程間通信時(shí)兰迫,寫者和讀者是并發(fā)執(zhí)行的信殊,即生產(chǎn)者-消費(fèi)者問題。
POSIX消息隊(duì)列提供了消息通知API汁果,在隊(duì)列為空時(shí)若有新的消息到來涡拘,能夠接收通知。通過下列API可以注冊通知须鼎,指定具體通知方式(比如信號)后鲸伴,若新消息到來使得隊(duì)列從空變成非空,就會調(diào)用自定義的通知處理函數(shù)晋控。

int mq_notify(mqd_t mqdes, const struct sigevent *sevp);

以下幾點(diǎn)需要特別注意

  • 任一時(shí)刻只能有1個(gè)進(jìn)程能夠向特定消息隊(duì)列注冊通知汞窗,如果已經(jīng)存在,再次注冊會失敗赡译,errno被置為EBUSY仲吏。
  • 若對非空隊(duì)列注冊通知,只有等到隊(duì)列被清空后蝌焚,新消息到來時(shí)才能發(fā)出通知裹唆。
  • 若向注冊進(jìn)程發(fā)送了一個(gè)通知之后就會刪除注冊信息,這樣其他進(jìn)程就可以向隊(duì)列注冊通知只洒。
  • 若有其他進(jìn)程調(diào)用mq_receive()發(fā)生阻塞许帐,則有新消息到來時(shí),其他進(jìn)程接收消息毕谴,注冊進(jìn)程繼續(xù)等待通知成畦。
  • 可以傳入NULL來撤銷通知。

通過man 7 sigevent可以查看該結(jié)構(gòu)的詳細(xì)信息

       union sigval {          /* 通知傳遞的數(shù)據(jù) */
           int     sival_int;         /* Integer value */
           void   *sival_ptr;         /* Pointer value */
       };

       struct sigevent {
           int          sigev_notify; /* 通知方法 */
           int          sigev_signo;  /* 通知信號 */
           union sigval sigev_value;  /* 通知傳遞的數(shù)據(jù) */
           void       (*sigev_notify_function) (union sigval);
                            /* 線程通知的函數(shù) (SIGEV_THREAD) */
           void        *sigev_notify_attributes;
                            /* 線程通知的線程屬性 (SIGEV_THREAD) */
           pid_t        sigev_notify_thread_id;
                            /* 用于接收信號的線程ID (SIGEV_THREAD_ID) */
       };

其中sigev_notify指定通知的方法涝开,有SIGEV_NONE(不作任何處理)循帐、SIGEV_SIGNAL(使用信號通知)和SIGEV_THREAD(使用線程通知)。
用示例代碼解釋典型用法(忽略了錯(cuò)誤處理)舀武,預(yù)定義變量如下

struct sigevent sev;
constexpr int NOTIFY_SIG = SIGUSR1;  // 自定義通知信號的種類

使用信號通知的框架如下

int main() {
    // ...
    signal(SIGUSR1, [](int){});  // 僅僅用于跳出sigsuspend

    sev.sigev_notify = SIGEV_SIGNAL;
    sev.sigev_signo = NOTIFY_SIG;
    mq_notify(mqd, &sev);

    sigset_t empty_mask;
    sigemptyset(&empty_mask);
    while (true) {
        sigsuspend(&empty_mask);
        mq_notify(mqd, &sev);
        // TODO: 處理由空變?yōu)榉强盏南㈥?duì)列
    }
}

使用線程通知的框架如下

static void notifySetup(mqd_t& mqd);  // 注冊通知線程拄养,函數(shù)的前置聲明

static void notifyFunc(union sigval sv) {
    auto& mqd = *reinterpret_cast<mqd_t*>(sv.sival_ptr);
    notifySetup(mqd);
    // TODO: 處理由空變?yōu)榉强盏南㈥?duì)列
}

static void notifySetup(mqd_t& mqd) {
    sev.sigev_notify = SIGEV_THREAD;
    sev.sigev_notify_function = notifyFunc;
    sev.sigev_notify_attributes = NULL;
    sev.sigev_value.sival_ptr = &mqd;
    mq_notify(mqd, &sev);
}

int main() {
    // ...
    notifySetup(mqd);
    pause();  // 主線程永遠(yuǎn)中止,因?yàn)槎〞r(shí)器通知是在一個(gè)單獨(dú)的線程中調(diào)用notifyFunc()來分發(fā)的
}

3. 相關(guān)系統(tǒng)命令

常見的命令银舱,ipcs顯示IPC狀態(tài)瘪匿,ipcrm釋放IPC跛梗。但是它們都只能用于System V IPC,對POSIX IPC沒有作用柿顶。POSIX IPC被實(shí)現(xiàn)成了虛擬文件系統(tǒng)的文件茄袖,可以直接使用lsrm來列出和刪除這些文件,掛載在/dev/目錄下嘁锯,因此可以用常見的Linux命令來管理宪祥。
/dev/mqueue/xxx:記錄了消息隊(duì)列/xxx的狀態(tài),這也解釋了為何IPC名字要以/開頭家乘。

# cat /dev/mqueue/mq
QSIZE:13         NOTIFY:0     SIGNO:10    NOTIFY_PID:30527 

其中QSIZE為所有未消費(fèi)的消息大小之和蝗羊,并且進(jìn)程30527在等待信號10(SIGUSR1)的通知。這是我在啟動之前的mq_notify示例程序監(jiān)聽所致仁锯。
如果rm掉該文件耀找,那么之后消息隊(duì)列也無效了。
/proc/sys/fs/mqueue/下記錄了一些文件

  • msg_default:消息隊(duì)列默認(rèn)屬性的mq_maxmsg字段的值业崖;
  • msg_max:消息隊(duì)列默認(rèn)屬性的mq_maxmsg字段的上限野芒;
  • msgsize_default:消息隊(duì)列默認(rèn)屬性的mq_msgsize字段的值;
  • msgsize_max:消息隊(duì)列默認(rèn)屬性的mq_msgsize字段的上限双炕;
  • queues_max:系統(tǒng)可以創(chuàng)建的消息隊(duì)列個(gè)數(shù)上限狞悲。
# tail -n +1 /proc/sys/fs/mqueue/* | grep -v "^$"
==> /proc/sys/fs/mqueue/msg_default <==
10
==> /proc/sys/fs/mqueue/msg_max <==
10
==> /proc/sys/fs/mqueue/msgsize_default <==
8192
==> /proc/sys/fs/mqueue/msgsize_max <==
8192
==> /proc/sys/fs/mqueue/queues_max <==
256

其中tail+NUM代表打印第NUM行開始的內(nèi)容。

4. I/O多路復(fù)用

POSIX消息隊(duì)列相比System V消息隊(duì)列的最大優(yōu)點(diǎn)就是妇斤,mqd_t類型的句柄可以被select/poll/epoll監(jiān)聽摇锋。
示例代碼epoll_consumer.cc

5. 總結(jié)

消息隊(duì)列本質(zhì)是消息組成的鏈表,允許進(jìn)程以消息的形式交換數(shù)據(jù)站超,和數(shù)據(jù)報(bào)socket一樣荸恕。不同于TCP的流式傳輸,消息具有邊界死相,N次寫入對應(yīng)N次讀取融求,若讀取緩沖區(qū)太小,則剩余的部分會被舍棄算撮,而不會留給下次繼續(xù)讀双肤。
隊(duì)列具有容量上限和單條消息的大小上限,隊(duì)列填滿時(shí)钮惠,寫入操作會被阻塞,非阻塞模式下會失敗并設(shè)置errnoEAGAIN七芭,因此采用非阻塞模式能用循環(huán)讀取整個(gè)隊(duì)列而進(jìn)行后續(xù)操作素挽。單次發(fā)送的數(shù)據(jù)超過消息大小上限時(shí),會發(fā)送失敗狸驳。
相比System V消息隊(duì)列预明,POSIX消息隊(duì)列的優(yōu)點(diǎn)是:

  1. 支持I/O多路復(fù)用缩赛;
  2. 支持隊(duì)列從空變?yōu)榉强諘r(shí)的消息異步通知。
  3. 如同其他POSIX IPC撰糠,POSIX消息隊(duì)列維護(hù)了引用計(jì)數(shù)酥馍,支持安全地刪除。

缺點(diǎn)在于可移植性較差阅酪,因?yàn)檎Q生較晚旨袒。大多情況下這不是問題,不維護(hù)老代碼的話完全可以用POSIX消息隊(duì)列术辐。另一方面POSIX消息隊(duì)列嚴(yán)格按照優(yōu)先級排序砚尽,System V消息隊(duì)列支持按照消息的類型字段來讀取消息。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末辉词,一起剝皮案震驚了整個(gè)濱河市必孤,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌瑞躺,老刑警劉巖敷搪,帶你破解...
    沈念sama閱讀 221,273評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異幢哨,居然都是意外死亡赡勘,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,349評論 3 398
  • 文/潘曉璐 我一進(jìn)店門嘱么,熙熙樓的掌柜王于貴愁眉苦臉地迎上來狮含,“玉大人,你說我怎么就攤上這事曼振〖钙” “怎么了须误?”我有些...
    開封第一講書人閱讀 167,709評論 0 360
  • 文/不壞的土叔 我叫張陵昌妹,是天一觀的道長茸时。 經(jīng)常有香客問我令境,道長绷跑,這世上最難降的妖魔是什么昨稼? 我笑而不...
    開封第一講書人閱讀 59,520評論 1 296
  • 正文 為了忘掉前任回还,我火速辦了婚禮双戳,結(jié)果婚禮上抛人,老公的妹妹穿的比我還像新娘弛姜。我一直安慰自己,他們只是感情好妖枚,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,515評論 6 397
  • 文/花漫 我一把揭開白布廷臼。 她就那樣靜靜地躺著,像睡著了一般。 火紅的嫁衣襯著肌膚如雪荠商。 梳的紋絲不亂的頭發(fā)上寂恬,一...
    開封第一講書人閱讀 52,158評論 1 308
  • 那天,我揣著相機(jī)與錄音莱没,去河邊找鬼初肉。 笑死,一個(gè)胖子當(dāng)著我的面吹牛饰躲,可吹牛的內(nèi)容都是我干的牙咏。 我是一名探鬼主播,決...
    沈念sama閱讀 40,755評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼属铁,長吁一口氣:“原來是場噩夢啊……” “哼眠寿!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起焦蘑,我...
    開封第一講書人閱讀 39,660評論 0 276
  • 序言:老撾萬榮一對情侶失蹤盯拱,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后例嘱,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體狡逢,經(jīng)...
    沈念sama閱讀 46,203評論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,287評論 3 340
  • 正文 我和宋清朗相戀三年拼卵,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了奢浑。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,427評論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡腋腮,死狀恐怖雀彼,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情即寡,我是刑警寧澤徊哑,帶...
    沈念sama閱讀 36,122評論 5 349
  • 正文 年R本政府宣布,位于F島的核電站聪富,受9級特大地震影響莺丑,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜墩蔓,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,801評論 3 333
  • 文/蒙蒙 一梢莽、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧奸披,春花似錦昏名、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,272評論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽份殿。三九已至,卻和暖如春嗽交,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背颂斜。 一陣腳步聲響...
    開封第一講書人閱讀 33,393評論 1 272
  • 我被黑心中介騙來泰國打工夫壁, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人沃疮。 一個(gè)月前我還...
    沈念sama閱讀 48,808評論 3 376
  • 正文 我出身青樓盒让,卻偏偏與公主長得像,于是被迫代替她去往敵國和親司蔬。 傳聞我的和親對象是個(gè)殘疾皇子邑茄,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,440評論 2 359