Looper循環(huán)中,如果messageQueue沒有消失,還會一直循環(huán)下去嗎
這個問題涉及l(fā)inuex里面的pipe(管道)和epoll機制,
先給出答案:不會一直循環(huán)下去,阻塞起來
首先說下pipe
pipe:中文意思是管道,使用I/O流操作,實現(xiàn)跨進程通信,管道的一端的讀,另一端寫,標(biāo)準(zhǔn)的生產(chǎn)者消費者模式
下面說下5種I/O模型 參考大話 Select、Poll斗锭、Epoll https://cloud.tencent.com/developer/article/1005481
[1] blocking IO - 阻塞IO [2] nonblocking IO - 非阻塞IO [3] IO multiplexing - IO多路復(fù)用 [4] signal driven IO - 信號驅(qū)動IO [5] asynchronous IO - 異步IO
其中前面4種IO都可以歸類為synchronous IO - 同步IO淮菠,在介紹select慕趴、poll厌丑、epoll之前,首先介紹一下這幾種IO模型,signal driven IO平時用的比較少漾唉,這里就不介紹了。
1. IO - 同步堰塌、異步赵刑、阻塞、非阻塞
下面以network IO中的read讀操作為切入點场刑,來講述同步(synchronous) IO和異步(asynchronous) IO般此、阻塞(blocking) IO和非阻塞(non-blocking)IO的異同。一般情況下牵现,一次網(wǎng)絡(luò)IO讀操作會涉及兩個系統(tǒng)對象:(1) 用戶進程(線程)Process铐懊;(2)內(nèi)核對象kernel,兩個處理階段:
[1] Waiting for the data to be ready - 等待數(shù)據(jù)準(zhǔn)備好 [2] Copying the data from the kernel to the process - 將數(shù)據(jù)從內(nèi)核空間的buffer拷貝到用戶空間進程的buffer
IO模型的異同點就是區(qū)分在這兩個系統(tǒng)對象瞎疼、兩個處理階段的不同上科乎。
1.1 同步IO 之 Blocking IO
官方描述: 如上圖所示,用戶進程process在Blocking IO讀recvfrom操作的兩個階段都是等待的贼急。在數(shù)據(jù)沒準(zhǔn)備好的時候茅茂,process原地等待kernel準(zhǔn)備數(shù)據(jù)。kernel準(zhǔn)備好數(shù)據(jù)后竿裂,process繼續(xù)等待kernel將數(shù)據(jù)copy到自己的buffer玉吁。在kernel完成數(shù)據(jù)的copy后process才會從recvfrom系統(tǒng)調(diào)用中返回。
本人描述: 用戶進程向內(nèi)核對象請求數(shù)據(jù),如果內(nèi)核對象數(shù)據(jù)沒準(zhǔn)備好,則用戶進程一直等下去,直到內(nèi)核將數(shù)據(jù)準(zhǔn)備好,并且復(fù)制到用戶進程
1.2 同步IO 之 NonBlocking IO
官方描述: 從圖中可以看出腻异,process在NonBlocking IO讀recvfrom操作的第一個階段是不會block等待的进副,如果kernel數(shù)據(jù)還沒準(zhǔn)備好,那么recvfrom會立刻返回一個EWOULDBLOCK錯誤悔常。當(dāng)kernel準(zhǔn)備好數(shù)據(jù)后影斑,進入處理的第二階段的時候,process會等待kernel將數(shù)據(jù)copy到自己的buffer机打,在kernel完成數(shù)據(jù)的copy后process才會從recvfrom系統(tǒng)調(diào)用中返回矫户。
本人描述:實際上就是用戶進程不斷輪尋,看內(nèi)核進程的數(shù)據(jù)是否準(zhǔn)備好,當(dāng)然第二階段(kernel將數(shù)據(jù)copy到自己的buffer)用戶進程是需要等待的
1.3 同步IO 之 IO multiplexing
IO多路復(fù)用,就是我們熟知的select残邀、poll皆辽、epoll模型柑蛇。從圖上可見,在IO多路復(fù)用的時候驱闷,process在兩個處理階段都是block住等待的耻台。初看好像IO多路復(fù)用沒什么用,其實select空另、poll盆耽、epoll的優(yōu)勢在于可以以較少的代價來同時監(jiān)聽處理多個IO。
1.4 異步IO
從上圖看出扼菠,異步IO要求process在recvfrom操作的兩個處理階段上都不能等待摄杂,也就是process調(diào)用recvfrom后立刻返回,kernel自行去準(zhǔn)備好數(shù)據(jù)并將數(shù)據(jù)從kernel的buffer中copy到process的buffer在通知process讀操作完成了循榆,然后process在去處理析恢。遺憾的是,linux的網(wǎng)絡(luò)IO中是不存在異步IO的冯痢,linux的網(wǎng)絡(luò)IO處理的第二階段總是阻塞等待數(shù)據(jù)copy完成的氮昧。真正意義上的網(wǎng)絡(luò)異步IO是Windows下的IOCP(IO完成端口)模型。
很多時候浦楣,我們比較容易混淆non-blocking IO和asynchronous IO袖肥,認(rèn)為是一樣的。但是通過上圖振劳,幾種IO模型的比較椎组,會發(fā)現(xiàn)non-blocking IO和asynchronous IO的區(qū)別還是很明顯的,non-blocking IO僅僅要求處理的第一階段不block即可历恐,而asynchronous IO要求兩個階段都不能block住寸癌。
select與epoll
阻塞I/O模式下,一個線程只能處理一個流的I/O事件弱贼。如果想要同時處理多個流蒸苇,要么多進程(fork),要么多線程(pthread_create)吮旅,很不幸這兩種方法效率都不高溪烤。
我們只要不停的把所有流從頭到尾問一遍,又從頭開始庇勃。這樣就可以處理多個流了檬嘀,但這樣的做法顯然不好,因為如果所有的流都沒有數(shù)據(jù)责嚷,那么只會白白浪費CPU鸳兽。這里要補充一點,阻塞模式下罕拂,內(nèi)核對于I/O事件的處理是阻塞或者喚醒揍异,而非阻塞模式下則把I/O事件交給其他對象(后文介紹的select以及epoll)處理甚至直接忽略全陨。
為了避免CPU空轉(zhuǎn),可以引進了一個代理(一開始有一位叫做select的代理蒿秦,后來又有一位叫做poll的代理烤镐,不過兩者的本質(zhì)是一樣的)。這個代理比較厲害棍鳖,可以同時觀察許多流的I/O事件有咨,在空閑的時候儡司,會把當(dāng)前線程阻塞掉肃廓,當(dāng)有一個或多個流有I/O事件時蕾羊,就從阻塞態(tài)中醒來坡椒,于是我們的程序就會輪詢一遍所有的流(于是我們可以把“忙”字去掉了)
于是奥帘,如果沒有I/O事件產(chǎn)生猾骡,我們的程序就會阻塞在select處洪规。但是依然有個問題旧困,我們從select那里僅僅知道了醇份,有I/O事件發(fā)生了,但卻并不知道是那幾個流(可能有一個吼具,多個僚纷,甚至全部),我們只能無差別輪詢所有流拗盒,找出能讀出數(shù)據(jù)怖竭,或者寫入數(shù)據(jù)的流,對他們進行操作陡蝇。
但是使用select痊臭,我們有O(n)的無差別輪詢復(fù)雜度,同時處理的流越多登夫,沒一次無差別輪詢時間就越長广匙。再次
說了這么多,終于能好好解釋epoll了
epoll可以理解為event poll恼策,不同于忙輪詢和無差別輪詢鸦致,epoll之會把哪個流發(fā)生了怎樣的I/O事件通知我們。此時我們對這些流的操作都是有意義的戏蔑。(復(fù)雜度降低到了O(1))
epoll與select/poll的區(qū)別
select蹋凝,poll,epoll都是IO多路復(fù)用的機制总棵。I/O多路復(fù)用就通過一種機制鳍寂,可以監(jiān)視多個描述符,一旦某個描述符就緒情龄,能夠通知程序進行相應(yīng)的操作迄汛。
select的本質(zhì)是采用32個整數(shù)的32位捍壤,即32*32= 1024來標(biāo)識,fd值為1-1024鞍爱。當(dāng)fd的值超過1024限制時鹃觉,就必須修改FD_SETSIZE的大小。這個時候就可以標(biāo)識32*max值范圍的fd睹逃。
poll與select不同盗扇,通過一個pollfd數(shù)組向內(nèi)核傳遞需要關(guān)注的事件,故沒有描述符個數(shù)的限制沉填,pollfd中的events字段和revents分別用于標(biāo)示關(guān)注的事件和發(fā)生的事件疗隶,故pollfd數(shù)組只需要被初始化一次。
epoll還是poll的一種優(yōu)化翼闹,返回后不需要對所有的fd進行遍歷斑鼻,在內(nèi)核中維持了fd的列表。select和poll是將這個內(nèi)核列表維持在用戶態(tài)猎荠,然后傳遞到內(nèi)核中坚弱。與poll/select不同,epoll不再是一個單獨的系統(tǒng)調(diào)用关摇,而是由epoll_create/epoll_ctl/epoll_wait三個系統(tǒng)調(diào)用組成荒叶,后面將會看到這樣做的好處。epoll在2.6以后的內(nèi)核才支持拒垃。
select/poll的幾大缺點:
1停撞、每次調(diào)用select/poll,都需要把fd集合從用戶態(tài)拷貝到內(nèi)核態(tài)悼瓮,這個開銷在fd很多時會很大
2戈毒、同時每次調(diào)用select/poll都需要在內(nèi)核遍歷傳遞進來的所有fd,這個開銷在fd很多時也很大
3横堡、針對select支持的文件描述符數(shù)量太小了埋市,默認(rèn)是1024
為什么epoll相比select/poll更高效
傳統(tǒng)的poll函數(shù)相當(dāng)于每次調(diào)用都重起爐灶,從用戶空間完整讀入ufds命贴,完成后再次完全拷貝到用戶空間道宅,另外每次poll都需要對所有設(shè)備做至少做一次加入和刪除等待隊列操作,這些都是低效的原因胸蛛。
epoll的解決方案中污茵。每次注冊新的事件到epoll句柄中時(在epoll_ctl中指定EPOLL_CTL_ADD),會把所有的fd拷貝進內(nèi)核葬项,而不是在epoll_wait的時候重復(fù)拷貝泞当。epoll保證了每個fd在整個過程中只會拷貝一次。select, poll和epoll都是使用waitqueue調(diào)用callback函數(shù)去wakeup你的異步等待線程的民珍,如果設(shè)置了timeout的話就起一個hrtimer襟士,select和poll的callback函數(shù)并沒有做什么事情盗飒,但epoll的waitqueue callback函數(shù)把當(dāng)前的有效fd加到ready list,然后喚醒異步等待進程陋桂,所以epoll函數(shù)返回的就是這個ready list逆趣, ready list中包含所有有效的fd,這樣一來kernel不用去遍歷所有的fd嗜历,用戶空間程序也不用遍歷所有的fd宣渗,而只是遍歷返回有效fd鏈表。
epoll用法
1. int epoll_create(int size);
創(chuàng)建一個epoll的句柄秸脱,size用來告訴內(nèi)核需要監(jiān)聽的數(shù)目一共有多大落包。當(dāng)創(chuàng)建好epoll句柄后,它就是會占用一個fd值摊唇,在linux下如果查看/proc/進程id/fd/,是能夠看到這個fd的涯鲁,所以在使用完epoll后巷查,必須調(diào)用close() 關(guān)閉,否則可能導(dǎo)致fd被耗盡抹腿。epoll通過epoll_ctl來對監(jiān)控的fds集合來進行增岛请、刪、改警绩,那么必須涉及到fd的快速查找問題崇败,于是,一個低時間復(fù)雜度的增肩祥、刪后室、改、查的數(shù)據(jù)結(jié)構(gòu)來組織被監(jiān)控的fds集合是必不可少的了混狠。在linux 2.6.8之前的內(nèi)核岸霹,epoll使用hash來組織fds集合,于是在創(chuàng)建epoll fd的時候将饺,epoll需要初始化hash的大小贡避。于是epoll_create(int size)有一個參數(shù)size,以便內(nèi)核根據(jù)size的大小來分配hash的大小予弧。在linux 2.6.8以后的內(nèi)核中刮吧,epoll使用紅黑樹來組織監(jiān)控的fds集合,于是epoll_create(int size)的參數(shù)size實際上已經(jīng)沒有意義了掖蛤。
2.int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
epoll的事件注冊函數(shù)杀捻,第一個參數(shù)是 epoll_create() 的返回值,第二個參數(shù)表示動作坠七,使用如下三個宏來表示:
EPOLL_CTL_ADD //注冊新的fd到epfd中水醋;
EPOLL_CTL_MOD //修改已經(jīng)注冊的fd的監(jiān)聽事件旗笔;
EPOLL_CTL_DEL //從epfd中刪除一個fd;
第三個參數(shù)是需要監(jiān)聽的fd拄踪,第四個參數(shù)是告訴內(nèi)核需要監(jiān)聽什么事蝇恶,struct epoll_event 結(jié)構(gòu)如下:
typedef union epoll_data
{
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
events 可以是以下幾個宏的集合:
EPOLLIN //表示對應(yīng)的文件描述符可以讀(包括對端SOCKET正常關(guān)閉);
EPOLLOUT //表示對應(yīng)的文件描述符可以寫惶桐;
EPOLLPRI //表示對應(yīng)的文件描述符有緊急的數(shù)據(jù)可讀(這里應(yīng)該表示有帶外數(shù)據(jù)到來)撮弧;
EPOLLERR //表示對應(yīng)的文件描述符發(fā)生錯誤;
EPOLLHUP //表示對應(yīng)的文件描述符被掛斷姚糊;
EPOLLET //將EPOLL設(shè)為邊緣觸發(fā)(Edge Triggered)模式贿衍,這是相對于水平觸發(fā)(Level Triggered)來說的。
EPOLLONESHOT//只監(jiān)聽一次事件救恨,當(dāng)監(jiān)聽完這次事件之后贸辈,如果還需要繼續(xù)監(jiān)聽這個socket的話,需要再次把這個socket加入到EPOLL隊列里肠槽。
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
參數(shù)events用來從內(nèi)核得到事件的集合擎淤,maxevents 告之內(nèi)核這個events有多大,這個 maxevents 的值不能大于創(chuàng)建 epoll_create() 時的size秸仙,參數(shù) timeout 是超時時間(毫秒嘴拢,0會立即返回,-1將不確定寂纪,也有說法說是永久阻塞)席吴。該函數(shù)返回需要處理的事件數(shù)目,如返回0表示已超時捞蛋。
LT VS ET
說這個之前孝冒,先說兩個概念
阻塞模式:針對的是fd的read/write阻塞,如果當(dāng)前沒有fd數(shù)據(jù)可讀,那么read阻塞襟交,如果當(dāng)前沒有數(shù)據(jù)可寫(可能管道滿了)迈倍,那么write阻塞
非阻塞模式:針對的是fd的read/write,如果當(dāng)前沒有fd數(shù)據(jù)可讀,那么read后返回調(diào)用返回-1捣域,errno值為EAGAIN啼染,如果當(dāng)前沒有數(shù)據(jù)可寫(可能管道滿了),那么write后返回調(diào)用返回-1焕梅,errno值為EAGAIN
EPOLL事件有兩種模型 Level Triggered (LT) 和 Edge Triggered (ET):
LT(level triggered迹鹅,水平觸發(fā)模式)是缺省的工作方式,并且同時支持 block 和 non-block socket贞言。在這種做法中斜棚,內(nèi)核告訴你一個文件描述符是否就緒了,然后你可以對這個就緒的fd進行IO操作。如果你不作任何操作弟蚀,內(nèi)核還是會繼續(xù)通知你的蚤霞,所以,這種模式編程出錯誤可能性要小一點义钉。
說白了就是以下兩點
當(dāng)管道不為空時昧绣,有數(shù)據(jù)可讀,則可讀事件一直觸發(fā)直到將管道中的數(shù)據(jù)read完畢
當(dāng)管道不為空時捶闸,有數(shù)據(jù)可寫夜畴,則可寫事件一直觸發(fā)
它強調(diào)的是處于某種狀態(tài)
補充一點:EPOLL可以監(jiān)聽多個fd,如果fd1的模式設(shè)為阻塞模式的話删壮,當(dāng)對fd1進行read操作時贪绘,不要單獨把read放在死循環(huán)里面讀取,因為read完所有數(shù)據(jù)之后央碟,再去read的話會阻塞税灌,導(dǎo)致該此代碼阻塞,此時如果fd2有數(shù)據(jù)可讀時(來自其他線程或者進程向fd2寫數(shù)據(jù))亿虽,也不會解除阻塞垄琐,導(dǎo)致無法處理數(shù)據(jù),這樣明顯是不對的经柴,除非有其他線程或者進程向fd1寫數(shù)據(jù),才能解除fd1的read阻塞墩朦,
所以fd1的模式設(shè)為阻塞模式的話坯认,可以將epoll_wait和read按順序一起放在死循環(huán),每次循環(huán)執(zhí)行epoll_wait和reada讀取固定的數(shù)據(jù)(epoll_wait在LT模式下氓涣,如果有數(shù)據(jù)可讀牛哺,不會阻塞),這樣當(dāng)fd1的數(shù)據(jù)讀取完畢之后劳吠,epoll_wait就會阻塞引润,不會執(zhí)行read操作,避免read阻塞痒玩,此時如果fd2有數(shù)據(jù)可讀時,epoll_wait就可以解除阻塞淳附,處理fd2的數(shù)據(jù)了,不會受其他fd的影響
ET(edge-triggered蠢古,邊緣觸發(fā)模式)是高速工作方式奴曙,只支持no-block socket。在這種模式下草讶,當(dāng)描述符從未就緒變?yōu)榫途w時洽糟,內(nèi)核通過epoll告訴你。然后它會假設(shè)你知道文件描述符已經(jīng)就緒,并且不會再為那個文件描述符發(fā)送更多的就緒通知坤溃,等到下次有新的數(shù)據(jù)進來的時候才會再次出發(fā)就緒事件拍霜。
說白了就是以下兩點
當(dāng)管道中有新的數(shù)據(jù)來臨時(有空變?yōu)榉强栈蛘咴诜强盏那闆r下有新的數(shù)據(jù)),則觸發(fā)可讀事件
當(dāng)管道中的有滿載變?yōu)榉菨M載狀態(tài)時薪介,有數(shù)據(jù)可寫祠饺,則觸發(fā)可寫事件
它強調(diào)的是某種狀態(tài)變化時觸發(fā)事件,只觸發(fā)一次昭灵,過了就不會觸發(fā)吠裆,
也就是如果管道中有新的數(shù)據(jù)來臨時,epoll_wait結(jié)束等待烂完,此時如果不讀取管道中的數(shù)據(jù)试疙,那么代碼再次循環(huán)到epoll_wait,阻塞抠蚣,直到下次有新的數(shù)據(jù)來臨時才會返回祝旷,所以如果是ET模式的話,可讀事件觸發(fā)時一定要將數(shù)據(jù)及時取出來嘶窄,不要死扛著怀跛,過了這村就沒那店了,另一方面因為可能數(shù)據(jù)比較大柄冲,所以要多次讀取吻谋,比如在while里面讀取例如110代碼,如果此時fd是阻塞模式现横,那么當(dāng)讀完畢之后漓拾,管道為null,必然會導(dǎo)致read阻塞戒祠,影響其他fd的數(shù)據(jù)處理骇两,明顯不符合題意,所以這里設(shè)置fd為非阻塞模式姜盈,當(dāng)讀取完畢之后會返回-1低千,而且errno == EAGAIN(115行代碼),這樣就可以直接break馏颂,結(jié)束這次讀取工作示血,不會導(dǎo)致阻塞.所以ET強制fd為非阻塞模型
來看個示例代碼
1 //
2 // Created by user on 2020/1/31.
3 //
4
5 #include "TestBKEPoll.h"
6
7 #include <sys/epoll.h>
8 #include <unistd.h>
9 #include <iostream>
10 #include <iostream>
11 #include <pthread.h>
12 #include <unistd.h>
13 #include <thread>
14 #include <stdio.h>
15 #include <sys/prctl.h>
16 #include <fcntl.h>
17
18 using namespace std;
19
20 void *threadwrite(void *data) {
21 printf("write pid=%d,ppid=%d,tid=%d\n", getpid(), getppid(),
22 this_thread::get_id());
23 sleep(5);
24 int *fd = (int *) data;
25 char *buf = "iamcj";
26 write(*fd, buf, 3);
27 write(fd[1], buf, 3);
28 write(fd[2], buf, 3); char *buf1 = "kl";
29 // sleep(5);
30 // write (*fd, buf1, 5);
31 printf("\n");
32 printf("write *fd=%d\n", *fd);
33 // printf("start pid=%d,ppid=%d,tid=%d\n", getpid(), getppid(),this_thread::get_id());
34 }
35
36 int setfd_nonblock(int fd) {
37 int old_flags = fcntl(fd, F_GETFD);
38 fcntl(fd, F_SETFL, old_flags | O_NONBLOCK);
39 return old_flags;
40 }
41
42 int testepoll() {
43 int ret = 0;
44 int pipe_fd[2];
45 int pipe_fd1[2];
46 int pipe_fd2[2];
47 /**
48 * 調(diào)用pipe函數(shù)時在內(nèi)核中開辟一塊緩沖區(qū)(稱為管道)用于通信,它有一個讀端和一個寫端饱亮,
49 * 然后通過fd參數(shù)傳出給用戶程序兩個文件描述符矾芙,fd[0]指向管道的讀端,fd[1]指向管道的寫端(
50 * 很好記近上,就像0是標(biāo)準(zhǔn)輸入1是標(biāo)準(zhǔn)輸出一樣)剔宪。
51 * 所以管道在用戶程序看起來就像一個打開的文件,
52 * 通過read(fd[0])或者write(fd[1])向這個文件讀寫數(shù)據(jù)其實是在讀寫內(nèi)核緩沖區(qū)。
53 * pipe函數(shù)調(diào)用成功返回0葱绒,調(diào)用失敗返回-1感帅。
54 */
55 if ((ret = pipe(pipe_fd)) < 0) {
56 cout << "create pipe fail:" << ret << ",errno:" << errno << endl;
57 return -1;
58 }
59 if ((ret = pipe(pipe_fd1)) < 0) {
60 cout << "create pipe1 fail:" << ret << ",errno:" << errno << endl;
61 return -1;
62 }
63 if ((ret = pipe(pipe_fd2)) < 0) {
64 cout << "create pipe1 fail:" << ret << ",errno:" << errno << endl;
65 return -1;
66 }
67 // child=fork();
68 printf("testCreateThread pid=%d,ppid=%d,tid=%d\n", getpid(), getppid(), this_thread::get_id());
69
70 pthread_t thread;
71 //創(chuàng)建一個線程并自動執(zhí)行
72 int pd[3] = {pipe_fd[1], pipe_fd1[1], pipe_fd2[1]};
73 int id = pthread_create(&thread, NULL, threadwrite, (void *) pd);
74 printf("testCreateThread pipe_fd[0]=%d,pipe_fd1[0]=%d,pipe_fd2[0]=%d\n",
75 pipe_fd[0], pipe_fd1[0], pipe_fd2[0]);
76 printf("testCreateThread pipe_fd[1]=%d,pipe_fd1[1]=%d,pipe_fd2[1]=%d\n",
77 pipe_fd[1], pipe_fd1[1], pipe_fd2[1]);
78 struct epoll_event ev, ev1, ev2, events[20]; //事件臨時pipe_fd[1]變量
79 ev.data.fd = pipe_fd[0]; //設(shè)置監(jiān)聽文件描述符
80 ev.events = EPOLLET | EPOLLIN;
81 ev1.data.fd = pipe_fd1[0]; //設(shè)置監(jiān)聽文件描述符
82 ev1.events = EPOLLET | EPOLLIN; //設(shè)置要處理的事件類型
83 ev2.data.fd = pipe_fd2[0]; //設(shè)置監(jiān)聽文件描述符
84 ev2.events = EPOLLET | EPOLLIN; //設(shè)置要處理的事件類型
85 int epfd = epoll_create(1);
86 printf("epfd=%d\n", epfd);
87 if (ev.events & EPOLLET) {
88 set_nonblock(pipe_fd[0]);
89 }
90 ret = epoll_ctl(epfd, EPOLL_CTL_ADD, pipe_fd[0], &ev);
91 ret = epoll_ctl(epfd, EPOLL_CTL_ADD, pipe_fd1[0], &ev1);
92 ret = epoll_ctl(epfd, EPOLL_CTL_ADD, pipe_fd2[0], &ev2);
93 for (;;) {
94 printf("sleep before\n");
95 sleep(2);
96 printf("sleep after\n");
97 int count = epoll_wait(epfd, events, 2, -1);
98 printf("count is %d,pid=%d,ppid=%d\n", count, getpid(), getppid());
99 char r_buf[100];
100 for (int i = 0; i < count; i++) {
101 printf("pipe_fd[0]=%d,events[%d].data.fd=%d,events=%d,%d\n",
102 pipe_fd[0], i, events[i].data.fd,
103 events[i].events, (events[i].events & EPOLLIN));
104 if ((events[i].data.fd == pipe_fd[0] ||
105 events[i].data.fd == pipe_fd1[0] ||
106 events[i].data.fd == pipe_fd2[0])
107 && (events[i].events & EPOLLIN)) {
108 printf("ok\n");
109 printf("EPOLLET\n");
110 // while (true) {
111 int r_num = read(events[i].data.fd, r_buf, 1);
112 printf(
113 "read r_num is %d bytes data from the pipe,value is %s atoi is %d and errno is %d \n",
114 r_num, r_buf, atoi(r_buf), errno);
115 if (r_num < 0 && errno == EAGAIN) {
116 break;
117 }
118 // }
119
120 }
121 }
122 }
123 return 0;
124 }
下面來解讀下這部分代碼,注意因為mac上沒有epoll機制地淀,所以這段代碼只能在linux上運行
先看看55行-66行失球,這部分代碼主要是調(diào)用pipe函數(shù)時在內(nèi)核中開辟一塊緩沖區(qū)(稱為管道)用于通信,它有一個讀端和一個寫端帮毁,
然后通過fd參數(shù)傳出給用戶程序兩個文件描述符实苞,fd[0]指向管道的讀端,fd[1]指向管道的寫端
執(zhí)行完這段代碼之后會生成相應(yīng)的文件描述符烈疚,可以通過
ls -l /proc/pid/fd查看黔牵,比如
lr-x------ 1 user user 64 1月 31 20:24 3 -> pipe:[83141]
l-wx------ 1 user user 64 1月 31 20:24 4 -> pipe:[83141]
lr-x------ 1 user user 64 1月 31 20:24 5 -> pipe:[83142]
l-wx------ 1 user user 64 1月 31 20:24 6 -> pipe:[83142]
lr-x------ 1 user user 64 1月 31 20:24 7 -> pipe:[83143]
l-wx------ 1 user user 64 1月 31 20:24 8 -> pipe:[83143]
3,4,5,6,7,8就是對應(yīng)的fd,在代碼里面打印fd的值也是跟這個是輸出是匹配的爷肝,其實3和4是一對猾浦,表示讀寫兩端,其他的以此類推灯抛,記住這些fd用完之后要釋放(close)金赦,避免占用資源73行:創(chuàng)建一個線程,為了向pipe中寫入數(shù)據(jù)对嚼,觸發(fā)可讀事件
79行:這行表示事件關(guān)聯(lián)的文件描述符夹抗,說白了就是當(dāng)有可讀或者可寫事件發(fā)生時,epoll得知道這個事件是來源于哪個fd纵竖,以便從這個fd讀寫數(shù)據(jù)兔朦,這行代碼就是這個作用,跟104行的代碼對應(yīng)起來了
85行:創(chuàng)建一個epoll的句柄,返回一個文件描述符磨确,可以認(rèn)為是操作多個fd監(jiān)聽事件的操作句柄,集合了一下,可以通過ls -l /proc/pid/fd查看該fd,示例如下
lrwx------ 1 user user 64 1月 31 20:24 9 -> anon_inode:[eventpoll]
后面也表示是eventpoll的fd87-89行:如果事件是ET(edge trigger)邊緣模式声邦,則設(shè)置監(jiān)聽該事件的fd是非阻塞了乏奥,上面LT VS ET已經(jīng)解釋過為什么這么做了。
90-92行:在三個不同的fd上注冊監(jiān)聽事件亥曹,這里堅監(jiān)聽的都是EPOLLIN邓了,表示可讀事件,其實筆者思考了下媳瞪,綜合79行代碼骗炉,可以看出event與fd是雙向關(guān)聯(lián)關(guān)系,相互持有蛇受,其實我很困惑為什么不在注冊監(jiān)聽event的時候句葵,把fd的句柄賦值到ev.data.fd,為什么還要單獨顯式的寫出來,剛開始學(xué)習(xí)這個的時候很容易忘記寫ev.data.fd = pipe_fd[0]乍丈,導(dǎo)致104行的判斷代碼一致失敗剂碴,很疑惑∏嶙ǎ可能是開放給開發(fā)者忆矛,給開發(fā)者更大的發(fā)揮空間
111行:讀取管道中的數(shù)據(jù),
對于LT模式來說请垛,如果這次read沒有讀取完畢(比如只讀取一部分?jǐn)?shù)據(jù))或者把這樣代碼注釋掉催训,那么在循環(huán)中epoll_wait將一直返回,它會執(zhí)行97行之后的語句宗收,不會阻塞,漫拭,因為epoll_wait原本的語意是:監(jiān)控并探測socket是否有數(shù)據(jù)可讀(對于讀事件而言)。LT模式保留了其原本的語意镜雨,只要socket還有數(shù)據(jù)可讀嫂侍,它就能不斷反饋,于是荚坞,我們想什么時候讀取處理都可以挑宠,我們永遠有再次poll的機會去探測是否有數(shù)據(jù)可以處理,這樣帶來了編程上的很大方便颓影,不容易死鎖造成某些socket餓死各淀。相反,ET模式修改了epoll_wait原本的語意诡挂,變成了:監(jiān)控并探測socket是否有新的數(shù)據(jù)可讀碎浇。
對于ET模式來說
即使這次read沒有讀取完畢(比如只讀取一部分?jǐn)?shù)據(jù))或者把這樣代碼注釋掉,再次循環(huán)回到epoll_wait時璃俗,它會阻塞奴璃,因為可讀事件只會觸發(fā)一次
補充:在Android的中epoll跟上面的說代碼很類似,只是Android的fd不是通過pipe獲取城豁,而是通過eventfd()函數(shù)獲取苟穆,Android代碼如下
大概了解下eventfd()函數(shù)
eventfd()函數(shù)會創(chuàng)建一個“eventfd”對象,用戶空間的應(yīng)用程序可以用這個eventfd來實現(xiàn)事件的等待或通知機制唱星,也可以用于內(nèi)核通知新的事件到用戶空間應(yīng)用程序雳旅。 這個對象包含一個64-bit的整形計數(shù)器,內(nèi)核空間維護這個計數(shù)器间聊,創(chuàng)建這個計數(shù)器的時候使用第一個入?yún)nitval來初始化計數(shù)器攒盈。
對于eventfd的讀寫操作對應(yīng)函數(shù)read和write
多余的不說了,可以參考下這兩篇文章
通過實例來理解 eventfd 函數(shù)機制
Linux進程間通信——eventfd
用我個人的了解就是eventfd創(chuàng)建的時候會初始化一個計數(shù)器哎榴,對于epoll_wait來說型豁,只要計數(shù)器的值不為0僵蛛,那么epoll_wait就不會阻塞,因為可讀偷遗,當(dāng)向eventfd中write數(shù)據(jù)時墩瞳,write函數(shù)相當(dāng)于是向計數(shù)器中進行“添加”,比如說計數(shù)器中的值原本是2氏豌,如果write了一個3喉酌,那么計數(shù)器中的值就變成了5,
當(dāng)read數(shù)據(jù)時:
針對無EFD_SEMAPHORE的flag來說泵喘,read一次就把計數(shù)器清零泪电,
執(zhí)行完read,繼續(xù)epoll_wait等待
針對有EFD_SEMAPHORE的flag來說纪铺,read一次計數(shù)器減1相速,循環(huán)read,直到計數(shù)器為0鲜锚,設(shè)置條件突诬,退出read,繼續(xù)epoll_wait等待
所以write函數(shù)里面的count參數(shù)芜繁,直對EFD_SEMAPHORE有效旺隙,沒有EFD_SEMAPHORE時,其實count>0即可骏令,無關(guān)次數(shù).
eventfd的示例代碼:
#include "TestBKEPoll.h"
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <unistd.h>
#include <iostream>
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <thread>
#include <stdio.h>
#include <sys/prctl.h>
#include <fcntl.h>
using namespace std;
void *threadwrite(void *data) {
printf("write pid=%d,ppid=%d,tid=%d\n", getpid(), getppid(),
this_thread::get_id());
sleep(5);
int *fd = (int *) data;
uint64_t count = 3;//寫入的計數(shù)器的值
write(*fd, &count, sizeof(uint64_t));
printf("\n");
printf("write *fd=%d\n", *fd);
}
int testepoll() {
printf("testCreateThread pid=%d,ppid=%d,tid=%d\n", getpid(), getppid(), this_thread::get_id());
int testEventfd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
pthread_t thread;
//創(chuàng)建一個線程并自動執(zhí)行
int id = pthread_create(&thread, NULL, threadwrite, (void *) pd);
struct epoll_event ev蔬捷; //事件臨時pipe_fd[1]變量
ev.data.fd = testEventfd; //設(shè)置監(jiān)聽文件描述符
ev.events = EPOLLET | EPOLLIN;
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, eventfd, &ev);
for (;;) {
printf("sleep before\n");
sleep(2);
printf("sleep after\n");
int count = epoll_wait(epfd, events, 2, -1);
printf("count is %d,pid=%d,ppid=%d\n", count, getpid(), getppid());
char r_buf[100];
for (int i = 0; i < count; i++) {
printf("pipe_fd[0]=%d,events[%d].data.fd=%d,events=%d,%d\n",
pipe_fd[0], i, events[i].data.fd,
events[i].events, (events[i].events & EPOLLIN));
if ((events[i].data.fd == testEventfd)
&& (events[i].events & EPOLLIN)) {
printf("ok\n");
printf("EPOLLET\n");
uint64_t count = 0;//讀取的計數(shù)器值,eventfd的flag有EFD_SEMAPHORE時,每次read時count減1榔袋,沒有EFD_SEMAPHORE時周拐,只read一次,count為write過來的數(shù)字凰兑,即3
while (true) {
int r_num = read(events[i].data.fd, &count, sizeof(uint64_t));
printf(
"read r_num is %d bytes data from the pipe,value is %s atoi is %d and errno is %d \n",
r_num, count, atoi(count), errno);
if (r_num < 0 && errno == EAGAIN) {
break;
}
}
}
}
}
return 0;
}
思考
1.如果messagequeue里面沒有message妥粟,那么looper會阻塞,相當(dāng)于主線程阻塞,那么點擊事件是怎么傳入到主線程呢?
首先上面說loop之所有會阻塞吏够,是因為epoll機制罕容,代碼位于Loop.cpp中的pollOnce方法,所以要讓主線程接觸阻塞稿饰,必須要往管道里面write,那么當(dāng)點擊屏幕時
誰來往當(dāng)前app的主線程的管道wrtite呢露泊,看看調(diào)用棧
"main@4011" prio=5 runnable
java.lang.Thread.State: RUNNABLE
at android.os.MessageQueue.enqueueMessage(MessageQueue.java:534)
at android.os.Handler.enqueueMessage(Handler.java:631)
at android.os.Handler.sendMessageAtTime(Handler.java:600)
at android.os.Handler.sendMessageDelayed(Handler.java:570)
at android.os.Handler.postDelayed(Handler.java:398)
at android.view.View.postDelayed(View.java:13011)
at android.view.View.onTouchEvent(View.java:10370)
at android.widget.TextView.onTouchEvent(TextView.java:8300)
at android.view.View.dispatchTouchEvent(View.java:9300)
at android.view.ViewGroup.dispatchTransformedTouchEvent(ViewGroup.java:2553)
at android.view.ViewGroup.dispatchTouchEvent(ViewGroup.java:2197)
at android.view.ViewGroup.dispatchTransformedTouchEvent(ViewGroup.java:2553)
at android.view.ViewGroup.dispatchTouchEvent(ViewGroup.java:2197)
at android.view.ViewGroup.dispatchTransformedTouchEvent(ViewGroup.java:2553)
at android.view.ViewGroup.dispatchTouchEvent(ViewGroup.java:2197)
at android.view.ViewGroup.dispatchTransformedTouchEvent(ViewGroup.java:2553)
at android.view.ViewGroup.dispatchTouchEvent(ViewGroup.java:2197)
at android.view.ViewGroup.dispatchTransformedTouchEvent(ViewGroup.java:2553)
at android.view.ViewGroup.dispatchTouchEvent(ViewGroup.java:2197)
at android.view.ViewGroup.dispatchTransformedTouchEvent(ViewGroup.java:2553)
at android.view.ViewGroup.dispatchTouchEvent(ViewGroup.java:2197)
at android.view.ViewGroup.dispatchTransformedTouchEvent(ViewGroup.java:2553)
at android.view.ViewGroup.dispatchTouchEvent(ViewGroup.java:2197)
at com.android.internal.policy.PhoneWindow$DecorView.superDispatchTouchEvent(PhoneWindow.java:2403)
at com.android.internal.policy.PhoneWindow.superDispatchTouchEvent(PhoneWindow.java:1737)
at android.app.Activity.dispatchTouchEvent(Activity.java:2771)
at com.android.internal.policy.PhoneWindow$DecorView.dispatchTouchEvent(PhoneWindow.java:2364)
at android.view.View.dispatchPointerEvent(View.java:9520)
at android.view.ViewRootImpl$ViewPostImeInputStage.processPointerEvent(ViewRootImpl.java:4230)
at android.view.ViewRootImpl$ViewPostImeInputStage.onProcess(ViewRootImpl.java:4096)
at android.view.ViewRootImpl$InputStage.deliver(ViewRootImpl.java:3642)
at android.view.ViewRootImpl$InputStage.onDeliverToNext(ViewRootImpl.java:3695)
at android.view.ViewRootImpl$InputStage.forward(ViewRootImpl.java:3661)
at android.view.ViewRootImpl$AsyncInputStage.forward(ViewRootImpl.java:3787)
at android.view.ViewRootImpl$InputStage.apply(ViewRootImpl.java:3669)
at android.view.ViewRootImpl$AsyncInputStage.apply(ViewRootImpl.java:3844)
at android.view.ViewRootImpl$InputStage.deliver(ViewRootImpl.java:3642)
at android.view.ViewRootImpl$InputStage.onDeliverToNext(ViewRootImpl.java:3695)
at android.view.ViewRootImpl$InputStage.forward(ViewRootImpl.java:3661)
at android.view.ViewRootImpl$InputStage.apply(ViewRootImpl.java:3669)
at android.view.ViewRootImpl$InputStage.deliver(ViewRootImpl.java:3642)
at android.view.ViewRootImpl.deliverInputEvent(ViewRootImpl.java:5922)
at android.view.ViewRootImpl.doProcessInputEvents(ViewRootImpl.java:5896)
at android.view.ViewRootImpl.enqueueInputEvent(ViewRootImpl.java:5857)
at android.view.ViewRootImpl$WindowInputEventReceiver.onInputEvent(ViewRootImpl.java:6025)
at android.view.InputEventReceiver.dispatchInputEvent(InputEventReceiver.java:185)
at android.os.MessageQueue.nativePollOnce(MessageQueue.java:-1)
at android.os.MessageQueue.next(MessageQueue.java:323)
at android.os.Looper.loop(Looper.java:135)
at android.app.ActivityThread.main(ActivityThread.java:5417)
at java.lang.reflect.Method.invoke(Method.java:-1)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:726)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:616)
這個dispatchInputEvent實際上是從native方法里面調(diào)用的喉镰,Looper.cpp 中pollInner方法中的handleEvent最終會調(diào)用dispatchInputEvent,這個里面沒有
的調(diào)用實際上已經(jīng)在app的主線程啦惭笑,所以write不是在這兒侣姆,這里只是想看下調(diào)用棧
真正write是在system_server進程中主要是InputTransport.cpp中的send方法生真,這里面的調(diào)用比較復(fù)雜,可以參考
Input系統(tǒng)—事件處理全過程 http://gityuan.com/2016/12/31/input-ipc/捺宗,這篇文章
2.如果messagequeue里面沒有message柱蟀,那么looper會阻塞,相當(dāng)于主線程阻塞,那么廣播事件怎么傳入主線程?
直接看調(diào)用棧
"Binder_3@4082" prio=5 runnable
java.lang.Thread.State: RUNNABLE
at android.os.MessageQueue.enqueueMessage(MessageQueue.java:534)
at android.os.Handler.enqueueMessage(Handler.java:631)
at android.os.Handler.sendMessageAtTime(Handler.java:600)
at android.os.Handler.sendMessageDelayed(Handler.java:570)
at android.os.Handler.sendMessage(Handler.java:507)
at android.app.ActivityThread.sendMessage(ActivityThread.java:2281)
at android.app.ActivityThread.sendMessage(ActivityThread.java:2258)
at android.app.ActivityThread.-wrap25(ActivityThread.java:-1)
at android.app.ActivityThread$ApplicationThread.scheduleReceiver(ActivityThread.java:696)
at android.app.ApplicationThreadNative.onTransact(ApplicationThreadNative.java:217)
at android.os.Binder.execTransact(Binder.java:453)蚜厉,
长已,這個就很清晰了,在binder線程調(diào)用enqueueMessage昼牛,enqueueMessage方法里面有nativewait即可喚醒主線程
總結(jié):也就是說喚醒主線程有兩種方式
1.外部進程通過管道喚醒
2.自己的進程通過bind線程喚醒
參考鏈接:
我讀過的最好的epoll講解:http://blog.51cto.com/yaocoder/888374
大話 Select术瓮、Poll、Epoll : https://cloud.tencent.com/developer/article/1005481