epoll機制
一句話解釋:epoll
機制可以監(jiān)聽特定的fd
既绩,當(dāng)fd
收到內(nèi)容時怠褐,發(fā)送事件回調(diào)昌腰。相比select
和poll
機制褂删,效率更高飞醉。
epoll API
epoll_create(int size)
參數(shù):
- size:表示最多可以監(jiān)聽多少個fd,新版本已棄用屯阀。
返回值:epoll實例的fd
-
>= 0
成功 -
< 0
失敗
作用:
初始化epoll機制缅帘,調(diào)用API后,操作系統(tǒng)內(nèi)核會產(chǎn)生一個eventpoll實例难衰,并返回一個fd钦无,這個fd就是epoll實例的句柄。
epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
參數(shù):
- epfd: 方法1中創(chuàng)建的epoll實例的fd
- op: 操作指令
- EPOLL_CTL_ADD: 注冊新的fd到epfd中
- EPOLL_CTL_MOD:修改已注冊的fd的監(jiān)聽事件
- EPOLL_CTL_DEL:從epfd中刪除一個fd
- fd:要監(jiān)聽的fd
- event:要監(jiān)聽的event
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; // 表示監(jiān)聽的事件類型(EPOLLIN/EPOLLHUP/EPOLLOUT...)
epoll_data_t data; // 用戶自定義數(shù)據(jù)盖袭,當(dāng)事件發(fā)生時將會原樣返回給用戶
};
返回值:
-
>= 0
成功 -
< 0
失敗
作用:
注冊失暂、修改或刪除監(jiān)聽的fd和事件。
epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
參數(shù):
- epfd:方法1中創(chuàng)建的epoll實例的fd
- events:和2中結(jié)構(gòu)一樣
- maxevents:events數(shù)量
- timeout:等待超時時間鳄虱。如果超過timeout還沒有事件弟塞,則返回
返回值:
到來事件的個數(shù),返回的事件存儲在events數(shù)組中拙已。
MessageQueue原理
大家都知道决记,Android的主線程的Looper
,本質(zhì)是運行了一個死循環(huán)倍踪,不斷從MessageQueue
中取消息執(zhí)行系宫,如果沒有消息,則會等待在nativePollOnce
方法上建车,這個方法的底層原理就是epoll_wait
.
下面一起來看源碼扩借,弄清楚Looper
的整個流程是怎樣的。
首先看Looper.java中的loop方法癞志,這是我們啟動一個線程looper的入口往枷。
// Looper.java
public static void loop() {
final Looper me = myLooper();
for (;;) {
if (!loopOnce(me, ident, thresholdOverride)) {
return;
}
}
}
這個方法就是開啟一個無限循環(huán)框产,調(diào)用loopOnce
凄杯。
// Looper.java
private static boolean loopOnce(final Looper me, final long ident, final int thresholdOverride) {
// 從messagequeue中獲取下一條message
Message msg = me.mQueue.next();
// 執(zhí)行message
msg.target.dispatchMessage(msg);
// Android10開始,可以通過添加observer的方式秉宿,監(jiān)聽messsage的執(zhí)行情況
if (observer != null) {
observer.messageDispatched(token, msg);
}
// 回收message
msg.recycleUnchecked();
return true;
}
當(dāng)loopOnce
方法返回true之后戒突,又會再次循環(huán)調(diào)到loopOnce
,重復(fù)執(zhí)行描睦。
接下來我們看MessageQueue
的next()
方法膊存。
// MessageQueue.java
Message next() {
for (;;) {
// 通過epoll機制,等待消息,或超時喚醒
nativePollOnce(ptr, nextPollTimeoutMills);
synchronized (this) {
Message prevMsg = null;
Message msg = mMessages;
if (msg != null && msg.target == null) {
// msg.target == null 表示是同步屏障
// 如果有同步屏障隔崎,則直接跳到下一個異步的消息(同步的消息都過濾掉今艺,先不處理)
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
if (now < msg.when) {
// 如果當(dāng)前還沒到達message的執(zhí)行時間, 則獲取當(dāng)前的時間差作為timeout
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
}else{
// 省略一些鏈表的操作 prevMsg.next = msg.next; msg.next = null;
// 直接返回已經(jīng)到達執(zhí)行時間的,第一條message
return msg;
}
}
}
}
}
這個方法爵卒,首先會調(diào)用nativePollOnce
這個native方法虚缎,等nativePollOnce
返回后,會去MessageQueue
的鏈表中取下一條待執(zhí)行的message钓株。
取message的方法:
- 取鏈表頭的第一個message(
MessageQueue
中的message是按照時間順序排列的实牡,所以第一個就是最近的待執(zhí)行的message) - 如果這個消息是同步屏障,則跳過所有同步消息轴合,直接取下一個異步消息创坞,返回
- 否則,判斷當(dāng)前message是否到執(zhí)行時間受葛,如果到執(zhí)行時間题涨,則直接返回,否則繼續(xù)調(diào)
nativePollOnce
等待总滩。
接下來看nativePollOnce
的實現(xiàn)携栋。
// android_os_MessageQueue_nativePollOnce()
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, jlong ptr, jint timeoutMillis) {
//將Java層傳遞下來的mPtr轉(zhuǎn)換為nativeMessageQueue
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis); 【3】
}
經(jīng)過一系列調(diào)用,最后調(diào)到了Looper
中的pollInner
方法
Looper.cpp
int Looper::pollInner(int timeoutMillis) {
...
struct epoll_event eventItems[EPOLL_MAX_EVENTS]; //fd最大個數(shù)為16
//等待事件發(fā)生或者超時咳秉,在nativeWake()方法婉支,向管道寫端寫入字符,則該方法會返回澜建;
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
//循環(huán)遍歷向挖,處理所有的事件
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeEventFd) {
if (epollEvents & EPOLLIN) {
// 如果是喚醒事件,則讀取并清空管道數(shù)據(jù)
awoken();
}
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
// 如果是之前在mRequests中注冊過的fd
//處理request炕舵,生成對應(yīng)的reponse對象何之,push到響應(yīng)數(shù)組
pushResponse(events, mRequests.valueAt(requestIndex));
}
}
}
Done: ;
//處理Native的Message,調(diào)用相應(yīng)回調(diào)方法
while (mMessageEnvelopes.size() != 0) {
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
{
handler->handleMessage(message); // 處理消息事件
}
} else {
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}
//處理帶有Callback()方法的Response事件咽筋,執(zhí)行Reponse相應(yīng)的回調(diào)方法
for (size_t i = 0; i < mResponses.size(); i++) {
if (response.request.ident == POLL_CALLBACK) {
// 處理請求的回調(diào)方法
int callbackResult = response.request.callback->handleEvent(fd, events, data);
if (callbackResult == 0) {
removeFd(fd, response.request.seq); //移除fd
}
}
}
return result;
}
Looper.pollInner
主要做如下事情:
- 調(diào)用
epoll_wait
溶推,等待在一些特定的fd上 - 當(dāng)
epoll_wait
返回后(fd發(fā)生寫入或超時時間到),執(zhí)行喚醒的事件奸攻。- 如果喚醒的是
mWakeEventFd
蒜危,則直接調(diào)用awoken方法。 - 如果喚醒的是之前注冊在
mRequests
中的fd睹耐,則將Request
生成一個對應(yīng)的Response
辐赞,加入mResponses
集合
- 如果喚醒的是
- 處理native message,執(zhí)行相應(yīng)的回調(diào)方法
- 處理
mResponses
集合中的所有Response
事件硝训,調(diào)用他們callback
的handleEvent
回調(diào)方法响委。(點擊事件就是在這里被執(zhí)行的)
我們來看看awoken
方法新思。它的邏輯很簡單,就是循環(huán)讀取fd中的全部內(nèi)容赘风。
// Looper.cpp
void Looper::awoken() {
char buffer[16];
ssize_t nRead;
do {
nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
} while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
}
關(guān)于mRequests
和mResponses
的邏輯夹囚,先挖個坑,后面的文章再講邀窃。
epoll使用示例
下面我們寫一個epoll
機制使用的示例代碼崔兴。
在這個例子中,我們監(jiān)聽了一個sockfd
的管道端口蛔翅,啟動一個線程敲茄,等待在epoll_wait
上。一旦sockfd中寫入數(shù)據(jù)山析,就可以喚醒我們的線程堰燎,進行讀取。
#include <sys/socket.h>
void MonitorInit::createEpoll(int sockfd) {
if(mSockFd == sockfd) return;
mEpollFd = epoll_create(EPOLL_MAX_EVENTS);
int epollEvents = 0;
epollEvents |= EPOLLIN;
struct epoll_event eventItem;
memset(&eventItem, 0, sizeof(epoll_event));
eventItem.events = epollEvents;
eventItem.data.fd = sockfd;
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, sockfd, &eventItem);
LLOG_ERROR(TAG_LOOPER, "Adding epoll event resutl %d", epollResult);
if(epollResult < 0){
LLOG_ERROR(TAG_LOOPER, "Error adding epoll event, fd %d, errno %d", sockfd, epollResult);
}
pthread_t thd;
// 開啟一個線程笋轨,這個線程用來監(jiān)聽epoll_wait
pthread_create(&thd, nullptr, epollCallback, nullptr);
pthread_detach(thd);
mSockFd = sockfd;
}
void epollCallback(void *arg){
// 死循環(huán)秆剪,等待fd消息
while(loop){
int timeoutMillis = 100000;
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
LLOG_ERROR(TAG_LOOPER, "receive event count %d", eventCount);
if(eventCount < 0){
LLOG_ERROR(TAG_LOOPER, "Poll failed with an unexpected error, errno=%d", errno);
}
if(eventCount == 0){
// 超時時間到
LLOG_ERROR(TAG_LOOPER, "pollOnce - timeout");
}
for(int i = 0; i < eventCount; i++){
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if(fd == mSockFd){
// 將fd的內(nèi)容讀出來
}
}
}
}
總結(jié)
MessageQueue核心原理:主線程通過Looper
中的死循環(huán),不斷從MessageQueue
中獲取待指定的message爵政。
- 如果有到執(zhí)行時間的消息時仅讽,直接執(zhí)行。
- 如果還沒有到執(zhí)行時間的消息钾挟,會通過
epoll_wait
等待在mWakeReadPipeFd
端口洁灵,等待內(nèi)容寫入,超時時間是下一個message
執(zhí)行時間到現(xiàn)在的時間差掺出。- 如果在等待的過程中徽千,有新的消息插入隊列,會往
mWakeReadPipeFd
端口寫入數(shù)據(jù)汤锨,這樣就能喚醒等待在這個上面的pollInner
方法双抽,從而繼續(xù)執(zhí)行之后的message
。 - 如果等待的過程中闲礼,沒有新的消息插入牍汹,則會在
timeout
時間到達的時候,喚醒柬泽,處理后面的message
慎菲。
- 如果在等待的過程中徽千,有新的消息插入隊列,會往