從epoll機制看MessageQueue

epoll機制

一句話解釋:epoll機制可以監(jiān)聽特定的fd既绩,當(dāng)fd收到內(nèi)容時怠褐,發(fā)送事件回調(diào)昌腰。相比selectpoll機制褂删,效率更高飞醉。

epoll API

  1. epoll_create(int size)

參數(shù):

  • size:表示最多可以監(jiān)聽多少個fd,新版本已棄用屯阀。

返回值:epoll實例的fd

  • >= 0 成功
  • < 0 失敗

作用:
初始化epoll機制缅帘,調(diào)用API后,操作系統(tǒng)內(nèi)核會產(chǎn)生一個eventpoll實例难衰,并返回一個fd钦无,這個fd就是epoll實例的句柄。

  1. epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

參數(shù):

  • epfd: 方法1中創(chuàng)建的epoll實例的fd
  • op: 操作指令
    • EPOLL_CTL_ADD: 注冊新的fd到epfd中
    • EPOLL_CTL_MOD:修改已注冊的fd的監(jiān)聽事件
    • EPOLL_CTL_DEL:從epfd中刪除一個fd
  • fd:要監(jiān)聽的fd
  • event:要監(jiān)聽的event
typedef union epoll_data {
   void        *ptr;
   int          fd;
   uint32_t     u32;
   uint64_t     u64;
} epoll_data_t;

struct epoll_event {
   uint32_t     events;  // 表示監(jiān)聽的事件類型(EPOLLIN/EPOLLHUP/EPOLLOUT...)
   epoll_data_t data; // 用戶自定義數(shù)據(jù)盖袭,當(dāng)事件發(fā)生時將會原樣返回給用戶
};

返回值:

  • >= 0 成功
  • < 0 失敗

作用:
注冊失暂、修改或刪除監(jiān)聽的fd和事件。

  1. epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)

參數(shù):

  • epfd:方法1中創(chuàng)建的epoll實例的fd
  • events:和2中結(jié)構(gòu)一樣
  • maxevents:events數(shù)量
  • timeout:等待超時時間鳄虱。如果超過timeout還沒有事件弟塞,則返回

返回值:
到來事件的個數(shù),返回的事件存儲在events數(shù)組中拙已。

MessageQueue原理

大家都知道决记,Android的主線程的Looper,本質(zhì)是運行了一個死循環(huán)倍踪,不斷從MessageQueue中取消息執(zhí)行系宫,如果沒有消息,則會等待在nativePollOnce方法上建车,這個方法的底層原理就是epoll_wait.

下面一起來看源碼扩借,弄清楚Looper的整個流程是怎樣的。

首先看Looper.java中的loop方法癞志,這是我們啟動一個線程looper的入口往枷。
// Looper.java

public static void loop() {
  final Looper me = myLooper();
  for (;;) {
    if (!loopOnce(me, ident, thresholdOverride)) {
      return;
    }
  }
}

這個方法就是開啟一個無限循環(huán)框产,調(diào)用loopOnce凄杯。
// Looper.java

private static boolean loopOnce(final Looper me, final long ident, final int thresholdOverride) {
  // 從messagequeue中獲取下一條message
  Message msg = me.mQueue.next();
  // 執(zhí)行message
  msg.target.dispatchMessage(msg);
  // Android10開始,可以通過添加observer的方式秉宿,監(jiān)聽messsage的執(zhí)行情況
  if (observer != null) {
    observer.messageDispatched(token, msg);
  }
  //  回收message
  msg.recycleUnchecked();
  return true;
}

當(dāng)loopOnce方法返回true之后戒突,又會再次循環(huán)調(diào)到loopOnce,重復(fù)執(zhí)行描睦。
接下來我們看MessageQueuenext()方法膊存。

// MessageQueue.java

Message next() {
  for (;;) {
    // 通過epoll機制,等待消息,或超時喚醒
    nativePollOnce(ptr, nextPollTimeoutMills);
    synchronized (this) {
       Message prevMsg = null;
       Message msg = mMessages;
       if (msg != null && msg.target == null) {
          // msg.target == null 表示是同步屏障
          // 如果有同步屏障隔崎,則直接跳到下一個異步的消息(同步的消息都過濾掉今艺,先不處理)
          do {
             prevMsg = msg;
             msg = msg.next;
          } while (msg != null && !msg.isAsynchronous());
       }
       if (msg != null) {
         if (now < msg.when) {
          // 如果當(dāng)前還沒到達message的執(zhí)行時間, 則獲取當(dāng)前的時間差作為timeout
          nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
         }else{
           // 省略一些鏈表的操作 prevMsg.next = msg.next; msg.next = null;
           // 直接返回已經(jīng)到達執(zhí)行時間的,第一條message
           return msg;
         }
       }
    }
  }
}

這個方法爵卒,首先會調(diào)用nativePollOnce這個native方法虚缎,等nativePollOnce返回后,會去MessageQueue的鏈表中取下一條待執(zhí)行的message钓株。

取message的方法:

  • 取鏈表頭的第一個message(MessageQueue中的message是按照時間順序排列的实牡,所以第一個就是最近的待執(zhí)行的message)
  • 如果這個消息是同步屏障,則跳過所有同步消息轴合,直接取下一個異步消息创坞,返回
  • 否則,判斷當(dāng)前message是否到執(zhí)行時間受葛,如果到執(zhí)行時間题涨,則直接返回,否則繼續(xù)調(diào)nativePollOnce等待总滩。

接下來看nativePollOnce的實現(xiàn)携栋。

// android_os_MessageQueue_nativePollOnce()

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, jlong ptr, jint timeoutMillis) {
    //將Java層傳遞下來的mPtr轉(zhuǎn)換為nativeMessageQueue
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->pollOnce(env, obj, timeoutMillis); 【3】
}

經(jīng)過一系列調(diào)用,最后調(diào)到了Looper中的pollInner方法

Looper.cpp

int Looper::pollInner(int timeoutMillis) {
    ...
    struct epoll_event eventItems[EPOLL_MAX_EVENTS]; //fd最大個數(shù)為16
    //等待事件發(fā)生或者超時咳秉,在nativeWake()方法婉支,向管道寫端寫入字符,則該方法會返回澜建;
    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

    //循環(huán)遍歷向挖,處理所有的事件
    for (int i = 0; i < eventCount; i++) {
        int fd = eventItems[i].data.fd;
        uint32_t epollEvents = eventItems[i].events;
        if (fd == mWakeEventFd) {
            if (epollEvents & EPOLLIN) {
                // 如果是喚醒事件,則讀取并清空管道數(shù)據(jù)
                awoken(); 
            }
        } else {
            ssize_t requestIndex = mRequests.indexOfKey(fd);
            if (requestIndex >= 0) {
                // 如果是之前在mRequests中注冊過的fd
                //處理request炕舵,生成對應(yīng)的reponse對象何之,push到響應(yīng)數(shù)組
                pushResponse(events, mRequests.valueAt(requestIndex));
            }
        }
    }
Done: ;
    //處理Native的Message,調(diào)用相應(yīng)回調(diào)方法
    while (mMessageEnvelopes.size() != 0) {
        const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
        if (messageEnvelope.uptime <= now) {
            {
                handler->handleMessage(message);  // 處理消息事件
            }
        } else {
            mNextMessageUptime = messageEnvelope.uptime;
            break;
        }
    }

    //處理帶有Callback()方法的Response事件咽筋,執(zhí)行Reponse相應(yīng)的回調(diào)方法
    for (size_t i = 0; i < mResponses.size(); i++) {
        if (response.request.ident == POLL_CALLBACK) {
            // 處理請求的回調(diào)方法
            int callbackResult = response.request.callback->handleEvent(fd, events, data);
            if (callbackResult == 0) {
                removeFd(fd, response.request.seq); //移除fd
            }
        }
    }
    return result;
}

Looper.pollInner主要做如下事情:

  • 調(diào)用epoll_wait溶推,等待在一些特定的fd上
  • 當(dāng)epoll_wait返回后(fd發(fā)生寫入或超時時間到),執(zhí)行喚醒的事件奸攻。
    • 如果喚醒的是mWakeEventFd蒜危,則直接調(diào)用awoken方法。
    • 如果喚醒的是之前注冊在mRequests中的fd睹耐,則將Request生成一個對應(yīng)的Response辐赞,加入mResponses集合
  • 處理native message,執(zhí)行相應(yīng)的回調(diào)方法
  • 處理mResponses集合中的所有Response事件硝训,調(diào)用他們callbackhandleEvent回調(diào)方法响委。(點擊事件就是在這里被執(zhí)行的)

我們來看看awoken方法新思。它的邏輯很簡單,就是循環(huán)讀取fd中的全部內(nèi)容赘风。
// Looper.cpp

void Looper::awoken() {
    char buffer[16];
    ssize_t nRead;
    do {
        nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
    } while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
}

關(guān)于mRequestsmResponses的邏輯夹囚,先挖個坑,后面的文章再講邀窃。

epoll使用示例

下面我們寫一個epoll機制使用的示例代碼崔兴。

在這個例子中,我們監(jiān)聽了一個sockfd的管道端口蛔翅,啟動一個線程敲茄,等待在epoll_wait上。一旦sockfd中寫入數(shù)據(jù)山析,就可以喚醒我們的線程堰燎,進行讀取。

#include <sys/socket.h>

void MonitorInit::createEpoll(int sockfd) {
    if(mSockFd == sockfd) return;
    mEpollFd = epoll_create(EPOLL_MAX_EVENTS);
    int epollEvents = 0;
    epollEvents |= EPOLLIN;
    struct epoll_event eventItem;
    memset(&eventItem, 0, sizeof(epoll_event));
    eventItem.events = epollEvents;
    eventItem.data.fd = sockfd;
    int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, sockfd, &eventItem);
    LLOG_ERROR(TAG_LOOPER, "Adding epoll event resutl %d", epollResult);
    if(epollResult < 0){
        LLOG_ERROR(TAG_LOOPER, "Error adding epoll event, fd %d, errno %d", sockfd, epollResult);
    }
    pthread_t thd;
    // 開啟一個線程笋轨,這個線程用來監(jiān)聽epoll_wait
    pthread_create(&thd, nullptr, epollCallback, nullptr);
    pthread_detach(thd);
    mSockFd = sockfd;
}

void epollCallback(void *arg){
    // 死循環(huán)秆剪,等待fd消息
    while(loop){
        int timeoutMillis = 100000;
        struct epoll_event eventItems[EPOLL_MAX_EVENTS];
        int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
        LLOG_ERROR(TAG_LOOPER, "receive event count %d", eventCount);
        if(eventCount < 0){
            LLOG_ERROR(TAG_LOOPER, "Poll failed with an unexpected error, errno=%d", errno);
        }
        if(eventCount == 0){
            // 超時時間到
            LLOG_ERROR(TAG_LOOPER, "pollOnce - timeout");
        }
        for(int i = 0; i < eventCount; i++){
            int fd = eventItems[i].data.fd;
            uint32_t epollEvents = eventItems[i].events;
            if(fd == mSockFd){
                // 將fd的內(nèi)容讀出來
            }
        }
    }
}

總結(jié)

MessageQueue核心原理:主線程通過Looper中的死循環(huán),不斷從MessageQueue中獲取待指定的message爵政。

  • 如果有到執(zhí)行時間的消息時仅讽,直接執(zhí)行。
  • 如果還沒有到執(zhí)行時間的消息钾挟,會通過epoll_wait等待在mWakeReadPipeFd端口洁灵,等待內(nèi)容寫入,超時時間是下一個message執(zhí)行時間到現(xiàn)在的時間差掺出。
    • 如果在等待的過程中徽千,有新的消息插入隊列,會往mWakeReadPipeFd端口寫入數(shù)據(jù)汤锨,這樣就能喚醒等待在這個上面的pollInner方法双抽,從而繼續(xù)執(zhí)行之后的message
    • 如果等待的過程中闲礼,沒有新的消息插入牍汹,則會在timeout時間到達的時候,喚醒柬泽,處理后面的message慎菲。

擴展閱讀

深入理解Linux的epoll機制

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市聂抢,隨后出現(xiàn)的幾起案子钧嘶,更是在濱河造成了極大的恐慌棠众,老刑警劉巖琳疏,帶你破解...
    沈念sama閱讀 217,185評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件有决,死亡現(xiàn)場離奇詭異,居然都是意外死亡空盼,警方通過查閱死者的電腦和手機书幕,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評論 3 393
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來揽趾,“玉大人台汇,你說我怎么就攤上這事±橄梗” “怎么了苟呐?”我有些...
    開封第一講書人閱讀 163,524評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長俐筋。 經(jīng)常有香客問我牵素,道長,這世上最難降的妖魔是什么澄者? 我笑而不...
    開封第一講書人閱讀 58,339評論 1 293
  • 正文 為了忘掉前任笆呆,我火速辦了婚禮,結(jié)果婚禮上粱挡,老公的妹妹穿的比我還像新娘赠幕。我一直安慰自己,他們只是感情好询筏,可當(dāng)我...
    茶點故事閱讀 67,387評論 6 391
  • 文/花漫 我一把揭開白布榕堰。 她就那樣靜靜地躺著,像睡著了一般嫌套。 火紅的嫁衣襯著肌膚如雪局冰。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,287評論 1 301
  • 那天灌危,我揣著相機與錄音康二,去河邊找鬼。 笑死勇蝙,一個胖子當(dāng)著我的面吹牛沫勿,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播味混,決...
    沈念sama閱讀 40,130評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼产雹,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了翁锡?” 一聲冷哼從身側(cè)響起蔓挖,我...
    開封第一講書人閱讀 38,985評論 0 275
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎馆衔,沒想到半個月后瘟判,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體怨绣,經(jīng)...
    沈念sama閱讀 45,420評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,617評論 3 334
  • 正文 我和宋清朗相戀三年拷获,在試婚紗的時候發(fā)現(xiàn)自己被綠了篮撑。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,779評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡匆瓜,死狀恐怖赢笨,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情驮吱,我是刑警寧澤茧妒,帶...
    沈念sama閱讀 35,477評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站左冬,受9級特大地震影響嘶伟,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜又碌,卻給世界環(huán)境...
    茶點故事閱讀 41,088評論 3 328
  • 文/蒙蒙 一九昧、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧毕匀,春花似錦铸鹰、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,716評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至躁垛,卻和暖如春剖毯,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背教馆。 一陣腳步聲響...
    開封第一講書人閱讀 32,857評論 1 269
  • 我被黑心中介騙來泰國打工逊谋, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人土铺。 一個月前我還...
    沈念sama閱讀 47,876評論 2 370
  • 正文 我出身青樓胶滋,卻偏偏與公主長得像,于是被迫代替她去往敵國和親悲敷。 傳聞我的和親對象是個殘疾皇子究恤,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,700評論 2 354

推薦閱讀更多精彩內(nèi)容