前言
從 Android 2.3 開始驯妄,Google 把 Handler 的阻塞/喚醒方案從 Object#wait() / notify()典徊,改成了用 Linux epoll 來實現(xiàn)
原因是 Native 層也引入了一套消息管理機制孽锥,用于提供給 C/C++ 開發(fā)者使用咐汞,而現(xiàn)有的阻塞/喚醒方案是為 Java 層準(zhǔn)備的,只支持 Java
簡述
一蠢挡、 I/O多路復(fù)用: epoll
epoll 提供的三個函數(shù):
// 用于創(chuàng)建一個 epoll 池
int epoll_create(int size);
// 用來執(zhí)行 fd 的 “增刪改” 操作弧岳,最后一個參數(shù) event 是告訴內(nèi)核 需要監(jiān)聽什么事件
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
// 使用戶線程阻塞的方法,它的第二個參數(shù) events 接受的是一個集合對象袒哥,
// 如果有多個事件同時發(fā)生缩筛,events 對象可以從內(nèi)核得到發(fā)生的事件的集合
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
二、 Linux的eventfd
eventfd 是專門用來傳遞事件的 fd 堡称,它提供的功能也非常簡單:累計計數(shù)
理解write() 函數(shù)與read() 函數(shù):
int efd = eventfd();
write(efd, 1);//寫入數(shù)字1
write(efd, 2);//再寫入數(shù)字2
int res = read(efd);
printf(res);//輸出值為 3
write()函數(shù):向 eventfd 中寫入一個 int 類型的值瞎抛,并且,只要沒有發(fā)生 讀 操作却紧,eventfd 中保存的值將會一直累加
read() 函數(shù):將 eventfd 保存的值讀了出來桐臊,并且,在沒有新的值加入之前晓殊,再次調(diào)用 read() 方法會發(fā)生阻塞断凶,直到有人重新向 eventfd 寫入值
總結(jié): 只要 eventfd 計數(shù)不為 0 ,那么表示 fd 是可讀的巫俺。再結(jié)合 epoll 的特性认烁,我們可以非常輕松的創(chuàng)建出 生產(chǎn)者/消費者模型
Handler 機制的底層邏輯就是利用 epoll + eventfd,其中消費者大部分時候處于阻塞休眠狀態(tài),而一旦有請求入隊(eventfd 被寫入值)却嗡,消費者就立刻喚醒處理舶沛,
流程
一、java層中MessageQueue 類中的幾個 jni 方法:
nativeInit()窗价、nativePollOnce() 和 nativeWake()
/frameworks/base/core/java/android/os/MessageQueue.java
class MessageQueue {
// 初始化消息隊列
private native static long nativeInit();
// 消息的循環(huán)與阻塞
private native void nativePollOnce(long ptr, int timeoutMillis); /*non-static for callbacks*/
// 消息的分送與喚醒
private native static void nativeWake(long ptr);
}
二如庭、消息隊列初始化
Java層MessageQueue 構(gòu)造函數(shù)中會調(diào)用 nativeInit() 方法
/frameworks/base/core/java/android/os/MessageQueue.java
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit();
}
在native層會創(chuàng)建一個消息隊列 NativeMessageQueue 對象,用于保存 Native 開發(fā)者發(fā)送的消息撼港,并且在NativeMessageQueue 構(gòu)造函數(shù)中會創(chuàng)建一個native層的Looper對象
/frameworks/base/core/jni/android_os_MessageQueue.cpp
class android_os_MessageQueue {
void android_os_MessageQueue_nativeInit() {
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
}
NativeMessageQueue() {
mLooper = Looper::getForThread();
if (mLooper == NULL) {
mLooper = new Looper(false);
Looper::setForThread(mLooper);
}
}
}
接著我們來看Native Looper 初始化流程
/system/core/libutils/Looper.cpp
class looper {
Looper::Looper() {
// 重頭戲:mWakeEventFd 是用來監(jiān)聽 MessageQueue 是否有新消息加入
int mWakeEventFd = eventfd();
rebuildEpollLocked();
}
void rebuildEpollLocked(){
// 重頭戲:在 Looper 初始化時創(chuàng)建了 epoll 對象
int mEpollFd = epoll_create();
// 把用于喚醒消息隊列的eventfd 添加到 epoll 池
epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
}
}
消息隊列初始化總結(jié):
重點:mWakeEventFd 與 mEpollFd
1坪它、Looper 的構(gòu)造函數(shù)首先創(chuàng)建了 eventfd 對象 :mWakeEventFd,作用是用來監(jiān)聽 MessageQueue 是否有新消息加入
2帝牡、 Looper 初始化時創(chuàng)建了 epoll 對象:mEpollFd
3往毡、把mWakeEventFd 添加到epoll池中
三、消息循環(huán)與阻塞
Java層MessageQueue的next方法中會調(diào)用 nativePollOnce() 方法否灾。
鏈路如下:
Looper#loop() -> MessageQueue#next() -> MessageQueue#nativePollOnce()
在native層的nativePollOnce()調(diào)用NativeMessageQueue中的pollOnce()方法卖擅,最終會進入Looper的poollPnce()方法
/frameworks/base/core/jni/android_os_MessageQueue.cpp
class android_os_MessageQueue {
//jni方法,轉(zhuǎn)到 NativeMessageQueue#pollOnce()
void android_os_MessageQueue_nativePollOnce(){
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
class NativeMessageQueue : MessageQueue {
/轉(zhuǎn)到 Looper#pollOnce() 方法
void pollOnce(){
mLooper->pollOnce(timeoutMillis);
}
}
}
接著我們看native中Looper里的pollOnce()方法
//system/core/libutils/Looper.cpp
class looper {
int pollOnce(int timeoutMillis){
int result = 0;
for (;;) {
if (result != 0) {
return result;
}
result = pollInner(timeoutMillis);//超時
}
}
int pollInner(int timeoutMillis){
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);//調(diào)用 epoll_wait() 等待事件的產(chǎn)生
}
}
消息循環(huán)與阻塞總結(jié):
整體鏈路如下:
Looper#loop() -> MessageQueue#next() -> MessageQueue#nativePollOnce()
->NativeMessageQueue#pollOnce() -> Looper#pollOnce()-> Looper#pollInner() -> epoll_wait()
消息隊列在初始化成功以后墨技,Java 層的 Looper#loop() 會開始無限輪詢惩阶,不停的獲取下一條消息。如果消息隊列為空扣汪,調(diào)用 epoll_wait 使線程進入到阻塞態(tài)断楷,讓出 CPU 調(diào)度
四、消息的發(fā)送/喚醒機制
Java層MessageQueue的enqueueMessage方法中會調(diào)用 nativewake() 方法崭别。
鏈路如下:
Handler #enqueueMessage() -> MessageQueue#enqueueMessage() -> MessageQueue#nativeWake()
Java 和 Native 都各自維護了一套消息隊列冬筒,所以他們發(fā)送消息的入口也不一樣
1、Java 開發(fā)使用 Handler#sendMessage() / post()
2茅主、C/C++ 開發(fā)使用 Looper#sendMessage()
先看Java層:
Handler 發(fā)送消息時舞痰,不管調(diào)用的是 sendMessage 還是 post,最后都是調(diào)用到 MessageQueue#enqueueMessage() 方法將消息入列诀姚,入列的順序是按照執(zhí)行時間先后排序
注:如果我們發(fā)送的消息需要馬上被執(zhí)行响牛,那么將 needWake 變量置為 true,接著使用 nativeWake() 喚醒線程
/frameworks/base/core/java/android/os/Handler.java
class Handler {
boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
msg.target = this;
return queue.enqueueMessage(msg, uptimeMillis);
}
}
/frameworks/base/core/java/android/os/MessageQueue.java
class MessageQueue {
boolean enqueueMessage(Message msg, long when) {
//...按照到期時間將消息插入消息隊列
if (needWake) {
nativeWake(mPtr);
}
}
}
在native層的nativeWake()調(diào)用NativeMessageQueue中的wake()方法赫段,最終會進入Looper的wake()方法
/frameworks/base/core/jni/android_os_MessageQueue.cpp
class android_os_MessageQueue {
// jni方法呀打,轉(zhuǎn)到 NativeMessageQueue#wake()
void android_os_MessageQueue_nativeWake(){
nativeMessageQueue->wake();
}
class NativeMessageQueue : MessageQueue {
// 轉(zhuǎn)到 Looper#wake() 方法
void wake(){
mLooper->wake();
}
}
}
接著我們看native中Looper里的wake()方法
/system/core/libutils/Looper.cpp
class looper {
void Looper::wake() {
int inc = 1;
write(mWakeEventFd, &inc);
}
}
總結(jié)java層喚醒:
鏈路如下:
Handler #enqueueMessage() -> MessageQueue#enqueueMessage() -> MessageQueue#nativeWake()->NativeMessageQueue#wake() -> Looper#wake() -> write()
Java 發(fā)送消息鏈路走完,然后我們看 Native 層如何發(fā)送消息
/system/core/libutils/Looper.cpp
class looper {
void Looper::sendMessageAtTime(uptime, handler,message) {
int i = 0;
int messageCount = mMessageEnvelopes.size();
while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
i += 1;
}
mMessageEnvelopes.insertAt(messageEnvelope(uptime, handler, message), i, 1);
// Wake the poll loop only when we enqueue a new message at the head.
if (i == 0) {
wake();
}
}
void Looper::wake() {
int inc = 1;
write(mWakeEventFd, &inc);
}
}
總結(jié)喚醒:
java與native發(fā)消息的方式 糯笙、消息類型 贬丛、 送達的消息隊列 都不相同,但是當(dāng)需要喚醒線程時给涕,Java 和 Native 都會執(zhí)行到 Looper#wake() 方法線程如何被喚醒?為什么這樣的操作會喚醒豺憔?
1额获、write() 一行方法調(diào)用,向 mWakeEventFd 寫入了一個 1則會喚醒焕阿。
2咪啡、mWakeEventFd 被寫入值后首启,狀態(tài)會從 不可讀 變成 可讀暮屡,內(nèi)核監(jiān)聽到 fd 的可讀寫狀態(tài)發(fā)生變化,會將事件從內(nèi)核返回給 epoll_wait() 方法調(diào)用
而 epoll_wait() 方法一旦返回毅桃,阻塞狀態(tài)將會被取消褒纲,線程繼續(xù)向下執(zhí)行
總結(jié)
重點: java層的looper、messageQueue在c++層均有對應(yīng)的類钥飞,然后通過將looper初始化時創(chuàng)建的 eventFd 返回的 wakeEventFd莺掠,注冊到 epoll_create創(chuàng)建的 epoll對象里,然后通過epoll_ctl去在wakeEventFd上添加一個 ADD類型的事件監(jiān)聽读宙,最后上層 調(diào)用nativePollOnce的時候彻秆,最終會調(diào)用 epoll 的 epoll_wait 通過監(jiān)聽fd的方式來形成阻塞。來消息后结闸,去往wakeEventFd去寫一個“1”值唇兑,此后epoll_wait監(jiān)聽到值了,就阻塞解除繼續(xù)執(zhí)行了桦锄,繼續(xù)取message執(zhí)行去了扎附。