一寂纪、在子線程中創(chuàng)建Handler点弯,“Can't create handler inside thread XXX that has not called Looper.prepare()”
從Handler構(gòu)造方法開(kāi)始,直接上源碼岳锁,
// Handler.java
public Handler(@Nullable Callback callback, boolean async) {
...
mLooper = Looper.myLooper();
if (mLooper == null) {
// 這里就是異常拋出的地方
throw new RuntimeException(
"Can't create handler inside thread " + Thread.currentThread()
+ " that has not called Looper.prepare()");
}
...
}
接著看Looper.myLooper()
做了什么监嗜,
// Looper.java
public static @Nullable Looper myLooper() {
return sThreadLocal.get();
}
這段代碼很簡(jiǎn)單,就是從sThreadLocal
拿東西环鲤,從返回值知道,拿的是一個(gè)Looper對(duì)象憎兽,并且可能為null冷离。接下來(lái)就需要好好了解一下這個(gè)ThreadLocal是個(gè)什么東西,簡(jiǎn)單講唇兑,ThreadLocal就是一個(gè)普通工具類酒朵,用于管理線程的本地變量桦锄,使用自定義數(shù)據(jù)結(jié)構(gòu)ThreadLocalMap(ThreadLocal的靜態(tài)內(nèi)部類扎附,本質(zhì)就是一個(gè)hashMap)來(lái)存儲(chǔ)這些變量。
ThreadLocal既然有g(shù)et()结耀,那就肯定有set()留夜,我們先不看get做了什么,先看set做了什么图甜,
// ThreadLocal.java
public void set(T value) {
// 獲取當(dāng)前線程
Thread t = Thread.currentThread();
// 獲取當(dāng)前線程的ThreadLocalMap變量
ThreadLocalMap map = getMap(t);
if (map != null)
// 以當(dāng)前ThreadLocal對(duì)象為key碍粥,存入傳進(jìn)來(lái)的value
map.set(this, value);
else
// 為當(dāng)前線程的ThreadLocalMap變量賦值
createMap(t, value);
}
// ThreadLocal#getMap(Thread t)
ThreadLocalMap getMap(Thread t) {
// threadLocals是Thread的一個(gè)類行為ThreadLocalMap的成員變量,用于保存該線程的本地變量
return t.threadLocals;
}
// ThreadLocal#createMap(Thread t, T firstValue)
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
現(xiàn)在我們知道ThreadLocal.set()
的作用就是給當(dāng)前線程的成員變量threadLocals
賦值黑毅,以Looper為例嚼摩,
// Looper.java
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}
ThreadLocalMap的key就是Looper#sThreadLocal
,value就是新建的Looper
對(duì)象矿瘦。接下來(lái)我們看看ThreadLocal.get()
做了什么枕面,
// ThreadLocal.java
public T get() {
Thread t = Thread.currentThread();
// 從當(dāng)前線程獲取ThreadLocalMap,從前文可知缚去,如果沒(méi)有調(diào)用set潮秘,這個(gè)值為null
ThreadLocalMap map = getMap(t);
if (map != null) {
// 以Looper#sThreadLocal為key,獲取entry
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}
/**
* Variant of set() to establish initialValue. Used instead
* of set() in case user has overridden the set() method.
*
* @return the initial value
*/
private T setInitialValue() {
// 默認(rèn)返回null
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
// 初始化ThreadLocalMap易结,但存入一個(gè)null值
createMap(t, value);
return value;
}
get()方法很簡(jiǎn)單枕荞,直接從當(dāng)前線程的ThreadLocalMap中以Looper#sThreadLocal
為key獲取Looper對(duì)象柜候,如果當(dāng)前線程ThreadLocalMap為null,則該方法也會(huì)返回null躏精。
現(xiàn)在來(lái)回答最開(kāi)始的問(wèn)題渣刷,子線程中創(chuàng)建Handler報(bào)異常,是因?yàn)闆](méi)有調(diào)用
Looper.prepare()
矗烛,向當(dāng)前線程(即子線程)的ThreadLocalMap類型的成員變量threadLocals
中存入Looper對(duì)象飞主,所以在構(gòu)造Handler的時(shí)候無(wú)法獲取當(dāng)前線程的Looper對(duì)象,故而拋出異常高诺。
二碌识、Handler機(jī)制中怎么保證一個(gè)Thread只有一個(gè)Looper?
上文我們知道虱而,在一個(gè)線程中筏餐,如果要用到Handler機(jī)制,必須先調(diào)用Looper.prepare()
才能使用Handler的sendXX和postXX接口牡拇,先看看prepare()方法里做了什么:
// Looper.java
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}
很明顯魁瞪,上面代碼中異常就表明一個(gè)線程中只能有一個(gè)Looper,又回到了sThreadLocal
上惠呼。這里值得注意的一點(diǎn)是Looper#sThreadLocal
的初始化导俘,
// Looper.java
static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();
static final
說(shuō)明sThreadLocal
全局只會(huì)初始化一次,且不可變更剔蹋。
// ThreadLocal.java
public void set(T value) {
// 獲取當(dāng)前線程
Thread t = Thread.currentThread();
// 獲取當(dāng)前線程的ThreadLocalMap變量
ThreadLocalMap map = getMap(t);
if (map != null)
// 以當(dāng)前ThreadLocal對(duì)象為key旅薄,存入傳進(jìn)來(lái)的value
map.set(this, value);
else
// 為當(dāng)前線程的ThreadLocalMap變量賦值
createMap(t, value);
}
從set源碼可看到,往threadLocals
中存入數(shù)據(jù)的key始終是sThreadLocal
泣崩,這就說(shuō)明在調(diào)用Looper.prepare()
的時(shí)候始終只會(huì)更新當(dāng)前線程的Looper少梁,當(dāng)然這個(gè)會(huì)拋異常,不會(huì)存在更新Looper的情況矫付。
現(xiàn)在回答問(wèn)題凯沪,給線程設(shè)置Looper是通過(guò)調(diào)用
Looper.prepare()
實(shí)現(xiàn),在該方法中有做限制买优,一個(gè)線程僅能綁定一個(gè)Looper妨马。
三、MessageQueue是什么數(shù)據(jù)結(jié)構(gòu)杀赢,為什么采用這樣的數(shù)據(jù)結(jié)構(gòu)烘跺?
先看看Message的源碼:
public final class Message implements Parcelable {
...
/*package*/ Message next;
...
}
很明顯是鏈表結(jié)構(gòu),采用這種結(jié)構(gòu)有以下幾種好處:
1. 解耦:在項(xiàng)目啟動(dòng)之初來(lái)預(yù)測(cè)未來(lái)項(xiàng)目會(huì)遇到什么需求是很困難的葵陵。但消息隊(duì)列用于數(shù)據(jù)存取液荸,通過(guò)定義基于數(shù)據(jù)的借口層,存取兩邊都實(shí)現(xiàn)這一接口脱篙。這允許你獨(dú)立擴(kuò)展或修改兩邊的處理過(guò)程娇钱,只要確保他們遵循同樣的借口約束伤柄。
2. 冗余:有些情況下,處理數(shù)據(jù)的過(guò)程會(huì)失敗文搂。除非數(shù)據(jù)持久化适刀,否則將造成數(shù)據(jù)丟失。消息隊(duì)列將數(shù)據(jù)持久化直到他們被完全處理煤蹭,通過(guò)這一方式避免數(shù)據(jù)丟失的風(fēng)險(xiǎn)笔喉。在消息隊(duì)列所采用的“插入-獲取-刪除”范式中,在把一個(gè)消息刪除之前硝皂,需要在處理過(guò)程中明確指出該消息已經(jīng)被處理完畢常挚,以確保數(shù)據(jù)被安全的保存到使用完畢。
3. 擴(kuò)展性:因?yàn)橄㈥?duì)列解耦了處理過(guò)程稽物,所以增大消息入列和處理的頻率就變得很容易:只要另外增加處理邏輯就行奄毡,不需要更改代碼。
4. 緩沖:消息隊(duì)列本身就是一個(gè)很好的緩沖結(jié)構(gòu)贝或,寫入端的速度可以足夠快吼过,不用受讀取端效率影響。該緩沖有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過(guò)系統(tǒng)的速度咪奖。
5. 順序保證:消息隊(duì)列本來(lái)就是“先進(jìn)先出”的數(shù)據(jù)結(jié)構(gòu)盗忱,能夠保證數(shù)據(jù)能夠按照特定的順序來(lái)處理。
6. 異步通信:消息隊(duì)列提供了異步處理機(jī)制羊赵,允許你把一個(gè)消息放入隊(duì)列趟佃,但并不立即處理它。即寫入端和讀取端是異步的慷垮。
7. 內(nèi)存友好:鏈表數(shù)據(jù)結(jié)構(gòu)在內(nèi)存占用上不連續(xù)揖闸,能提高內(nèi)存申請(qǐng)效率。
四料身、Handler中的Looper是怎么運(yùn)作的?
Looper作為Handler機(jī)制中中承上啟下的一個(gè)組件衩茸,其作用是啟動(dòng)一個(gè)死循環(huán)芹血,不斷從MessageQueue
中獲取message并處理,如果message為null楞慈,則退出循環(huán)幔烛;否則通過(guò)message.target.dispatchMessage()
將消息分發(fā)給handler處理。
// Looper.java
public static void loop() {
final Looper me = myLooper();
if (me == null) {
throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
}
// 從當(dāng)前Thread綁定的Looper中獲取MessageQueue
final MessageQueue queue = me.mQueue;
...
// 啟動(dòng)死循環(huán)囊蓝,輪詢Message
for (;;) {
Message msg = queue.next(); // might block
if (msg == null) {
// No message indicates that the message queue is quitting.
return;
}
...
try {
// 分發(fā)Message到Handler
msg.target.dispatchMessage(msg);
} finally {
...
}
...
// 回收Message
msg.recycleUnchecked();
}
}
這里的死循環(huán)保證能源源不斷的從MessageQueue中通過(guò)next()
獲取下一個(gè)需要執(zhí)行的Message饿悬,只有在無(wú)Message要處理的時(shí)候才會(huì)退出循環(huán)。下面看看next()
做了什么聚霜。
Message next() {
// 1. 如果 native消息隊(duì)列指針映射已經(jīng)為0狡恬,即虛引用珠叔,說(shuō)明消息隊(duì)列已經(jīng)退出,沒(méi)有消息了弟劲。
final long ptr = mPtr;
if (ptr == 0) {
return null;
}
int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0;
// 2. 死循環(huán)祷安,當(dāng)未獲取到需要 `分發(fā)處理` 的消息時(shí),保持空轉(zhuǎn)
for (;;) {
…
// 3. 調(diào)用native層方法兔乞,poll message汇鞭,注意,消息還存在于native層
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) {
// Try to retrieve the next message. Return if found.
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
// 如果頭節(jié)點(diǎn)消息是一個(gè)同步屏障庸追,則找到消息隊(duì)列中第一個(gè)異步消息
if (msg != null && msg.target == null) {
// Stalled by a barrier. Find the next asynchronous message in the queue.
do {
// 指針后移霍骄,直到指向第一個(gè)異步消息
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
if (now < msg.when) {
// Next message is not ready. Set a timeout to wake up when it is ready.
// 還沒(méi)到執(zhí)行時(shí)間,計(jì)算需要等待的時(shí)間
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// Got a message.
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
// 將mMessages設(shè)置為下一個(gè)要執(zhí)行的消息
mMessages = msg.next;
}
// 釋放指針對(duì)下一個(gè)消息的引用
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
msg.markInUse();
return msg;
}
} else {
// No more messages.
// 沒(méi)有消息淡溯,消息隊(duì)列一直阻塞
nextPollTimeoutMillis = -1;
}
// Process the quit message now that all pending messages have been handled.
if (mQuitting) {
dispose();
return null;
}
// 下面是處理IdleHandler的邏輯
// If first time idle, then get the number of idlers to run.
// Idle handles only run if the queue is empty or if the first message
// in the queue (possibly a barrier) is due to be handled in the future.
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
if (pendingIdleHandlerCount <= 0) {
// No idle handlers to run. Loop and wait some more.
mBlocked = true;
continue;
}
if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}
// Run the idle handlers.
// We only ever reach this code block during the first iteration.
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[I];
mPendingIdleHandlers[i] = null; // release the reference to the handler
boolean keep = false;
try {
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}
// Reset the idle handler count to 0 so we do not run them again.
pendingIdleHandlerCount = 0;
// While calling an idle handler, a new message could have been delivered
// so go back and look again for a pending message without waiting.
nextPollTimeoutMillis = 0;
}
}
處理邏輯也比較清晰腕巡,如果頭節(jié)點(diǎn)是同步屏障,先處理異步消息血筑;由于消息隊(duì)列是按照處理時(shí)間排好序的绘沉,所以直接取第一個(gè)消息,根據(jù)需要處理的時(shí)間設(shè)置好等待時(shí)間豺总,等待時(shí)間為0則直接返回msg车伞;如果沒(méi)有取到消息,則一直等待下去喻喳;最后處理IdleHandler另玖。
另外提個(gè)問(wèn)題:為什么在Looper和MQ中都需要死循環(huán)?
- Looper中的循環(huán)負(fù)責(zé)消息隊(duì)列中消息的分發(fā)表伦,死循環(huán)保障消息分發(fā)一直處于運(yùn)行中谦去,不循環(huán)就停止分發(fā);
- MQ的
next()
中的死循環(huán)負(fù)責(zé)獲取消息蹦哼,保障Looper可以獲取有效的消息鳄哭,使Looper可以一直運(yùn)行下去,next()
只要發(fā)現(xiàn)有效消息就返回纲熏,即跳出死循環(huán)妆丘。
五、Handler插入延時(shí)消息是怎么處理的局劲?
Handler插入消息最終都是走到sendMessageAtTime(msg, long)
勺拣,然后通過(guò)enqueueMessage (queue, msg, long)
放入消息隊(duì)列:
// Handler.java
public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
MessageQueue queue = mQueue;
if (queue == null) {
RuntimeException e = new RuntimeException(
this + " sendMessageAtTime() called with no mQueue");
Log.w("Looper", e.getMessage(), e);
return false;
}
return enqueueMessage(queue, msg, uptimeMillis);
}
// Handler.java
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
// 持有當(dāng)前Handler
msg.target = this;
// 是否為異步消息
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}
上面有兩點(diǎn)值得注意:
sendMessageAtTime(msg, long)
第二個(gè)參數(shù)使用的是android.os.SystemClock#uptimeMillis
,這個(gè)時(shí)間從開(kāi)機(jī)開(kāi)始計(jì)算鱼填。msg.target = this;
:msg持有Handler引用药有,而Handler如果作為匿名內(nèi)部類,會(huì)持有外部類的引用苹丸,這里會(huì)出現(xiàn)引用的持有與釋放問(wèn)題愤惰,有內(nèi)存泄漏隱患苇经。
下面看看MQ的enqueueMessage做了什么:
boolean enqueueMessage(Message msg, long when) {
…
synchronized (this) {
…
msg.markInUse();
msg.when = when;
Message p = mMessages; // 當(dāng)前頭節(jié)點(diǎn)
boolean needWake;
// 如果消息隊(duì)列為空,或者頭節(jié)點(diǎn)消息處理時(shí)間小于當(dāng)前消息羊苟,則將當(dāng)前消息設(shè)置為頭節(jié)點(diǎn)塑陵,并設(shè)置喚醒消息隊(duì)列
if (p == null || when == 0 || when < p.when) {
// New head, wake up the event queue if blocked.
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
// Inserted within the middle of the queue. Usually we don't have to wake
// up the event queue unless there is a barrier at the head of the queue
// and the message is the earliest asynchronous message in the queue.
// 此處意思是,如果當(dāng)前隊(duì)列頭部有一個(gè)同步屏障蜡励,并且入列的消息msg是一個(gè)馬上需要執(zhí)行的異步消息令花,此時(shí)就需要喚醒隊(duì)列
// 默認(rèn)消息入列的時(shí)候不需要喚醒隊(duì)列,即needWake = false凉倚。假如下面判斷needWake = true兼都,表明p是一個(gè)同步屏障
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
// 指針后移(使用臨時(shí)變量prev保存上一個(gè)節(jié)點(diǎn)信息,使p == prev.next恒成立)稽寒,直到移到消息隊(duì)列尾扮碧,或者當(dāng)前消息處理時(shí)間小于p的執(zhí)行時(shí)間
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
// 如果needWake = true,說(shuō)明p是同步屏障杏糙,此時(shí)已經(jīng)將p后移了慎王,則沒(méi)有必要喚醒消息隊(duì)列
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
// 將當(dāng)前消息插入到p消息和prev消息之間
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
// We can assume mPtr != 0 because mQuitting is false.
if (needWake) {
// 喚醒消息隊(duì)列
nativeWake(mPtr);
}
}
return true;
}
Handler插入的延時(shí)消息是根據(jù)msg的when,選擇合適的位置插入消息隊(duì)列宏侍。因此赖淤,消息隊(duì)列是一個(gè)按照時(shí)間排序的有序隊(duì)列。
六谅河、消息隊(duì)列阻塞和喚醒的原理咱旱?
當(dāng)消息隊(duì)列中的第一個(gè)消息執(zhí)行時(shí)間還沒(méi)到,這個(gè)時(shí)候調(diào)用nativePollOnce(ptr, nextPollTimeoutMillis)
來(lái)阻塞當(dāng)前線程并進(jìn)入休眠绷耍,避免空轉(zhuǎn)吐限,當(dāng)?shù)却龝r(shí)間nextPollTimeoutMillis
到了的時(shí)候又會(huì)自動(dòng)喚醒線程,同時(shí)也可以使用nativeWake(mPtr)
來(lái)喚醒線程褂始,下面就來(lái)看看這兩個(gè)本地方法是怎么運(yùn)作的诸典。
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj;
mLooper->pollOnce(timeoutMillis);
mPollObj = NULL;
mPollEnv = NULL;
if (mExceptionObj) {
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}
Java層通過(guò)JNI調(diào)到Native層的android_os_MessageQueue_nativePollOnce
方法,下面看看Looper#pollOnce
做了什么:
//Looper.h
int pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData);
inline int pollOnce(int timeoutMillis) {
return pollOnce(timeoutMillis, NULL, NULL, NULL);
}
//實(shí)現(xiàn)
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
if (ident >= 0) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
"fd=%d, events=0x%x, data=%p",
this, ident, fd, events, data);
#endif
if (outFd != NULL) *outFd = fd;
if (outEvents != NULL) *outEvents = events;
if (outData != NULL) *outData = data;
return ident;
}
}
...
result = pollInner(timeoutMillis);
}
}
先處理Native層滯留的Response病袄,然后調(diào)用pollInner搂赋,繼續(xù)往下看:
int Looper::pollInner(int timeoutMillis) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
#endif
// Adjust the timeout based on when the next message is due.
if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
if (messageTimeoutMillis >= 0
&& (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
timeoutMillis = messageTimeoutMillis;
}
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - next message in %" PRId64 "ns, adjusted timeout: timeoutMillis=%d",
this, mNextMessageUptime - now, timeoutMillis);
#endif
}
// Poll.
int result = POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
// We are about to idle.
// 即將處于idle狀態(tài)
mPolling = true;
// fd最大的個(gè)數(shù)是16
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
// 等待時(shí)間發(fā)生或者超時(shí),在nativeWake()方法益缠,向管道寫端寫入字符,則方法會(huì)返回基公。
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// No longer idling.
// 不再處于idle狀態(tài)
mPolling = false;
// 請(qǐng)求鎖 幅慌,因?yàn)樵贜ative Message的處理和添加邏輯上需要同步
// Acquire lock.
mLock.lock();
// Rebuild epoll set if needed.
// 如果需要,重建epoll
if (mEpollRebuildRequired) {
mEpollRebuildRequired = false;
// epoll重建轰豆,直接跳轉(zhuǎn)到Done
rebuildEpollLocked();
goto Done;
}
// Check for poll error.
if (eventCount < 0) {
if (errno == EINTR) {
goto Done;
}
ALOGW("Poll failed with an unexpected error, errno=%d", errno);
// epoll事件個(gè)數(shù)小于0胰伍,發(fā)生錯(cuò)誤齿诞,直接跳轉(zhuǎn)Done
result = POLL_ERROR;
goto Done;
}
// Check for poll timeout.
// 如果需要,重建epoll
if (eventCount == 0) {
//epoll事件個(gè)數(shù)等于0骂租,發(fā)生超時(shí)祷杈,直接跳轉(zhuǎn)Done
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - timeout", this);
#endif
result = POLL_TIMEOUT;
goto Done;
}
// Handle all events.
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif
// 循環(huán)處理所有的事件
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
//首先處理mWakeEventFd
if (fd == mWakeEventFd) {
//如果是喚醒mWakeEventFd有反應(yīng)
if (epollEvents & EPOLLIN) {
/**重點(diǎn)代碼*/
// 已經(jīng)喚醒了,則讀取并清空管道數(shù)據(jù)
awoken(); // 該函數(shù)內(nèi)部就是read渗饮,從而使FD可讀狀態(tài)被清除
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
}
} else {
// 其他input fd處理但汞,其實(shí)就是將活動(dòng)放入response隊(duì)列,等待處理
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
// 處理request互站,生成對(duì)應(yīng)的response對(duì)象私蕾,push到響應(yīng)數(shù)組
pushResponse(events, mRequests.valueAt(requestIndex));
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}
Done: ;
// Invoke pending message callbacks.
// 再處理Native的Message,調(diào)用相應(yīng)回調(diào)方法
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
// Remove the envelope from the list.
// We keep a strong reference to the handler until the call to handleMessage
// finishes. Then we drop it so that the handler can be deleted *before*
// we reacquire our lock.
{ // obtain handler
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
// 釋放鎖
mLock.unlock();
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
this, handler.get(), message.what);
#endif
// 處理消息事件
handler->handleMessage(message);
} // release handler
// 請(qǐng)求鎖
mLock.lock();
mSendingMessage = false;
// 發(fā)生回調(diào)
result = POLL_CALLBACK;
} else {
// The last message left at the head of the queue determines the next wakeup time.
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}
// Release lock.
// 釋放鎖
mLock.unlock();
// Invoke all response callbacks.
// 處理帶有Callback()方法的response事件胡桃,執(zhí)行Response相應(yīng)的回調(diào)方法
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if (response.request.ident == POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
this, response.request.callback.get(), fd, events, data);
#endif
// Invoke the callback. Note that the file descriptor may be closed by
// the callback (and potentially even reused) before the function returns so
// we need to be a little careful when removing the file descriptor afterwards.
// 處理請(qǐng)求的回調(diào)方法
int callbackResult = response.request.callback->handleEvent(fd, events, data);
if (callbackResult == 0) {
// 移除fd
removeFd(fd, response.request.seq);
}
// Clear the callback reference in the response structure promptly because we
// will not clear the response vector itself until the next poll.
// 清除response引用的回調(diào)方法
response.request.callback.clear();
// 發(fā)生回調(diào)
result = POLL_CALLBACK;
}
}
return result;
}
pollInner()
方法的處理流程:
- 先調(diào)用epoll_wait()踩叭,這是阻塞方法,用于等待事件發(fā)生或者超時(shí)翠胰。
- 對(duì)于epoll_wait()返回容贝,當(dāng)且僅當(dāng)以下3種情況出現(xiàn):
- POLL_ERROR:發(fā)生錯(cuò)誤,直接跳轉(zhuǎn)Done
- POLL_TIMEOUT:發(fā)生超時(shí)之景,直接跳轉(zhuǎn)到Done
- 檢測(cè)到管道有事情發(fā)生斤富,則再根據(jù)情況做相應(yīng)處理:
a. 如果檢測(cè)到管道產(chǎn)生事件,則直接讀取管道的數(shù)據(jù)
b. 如果是其他事件闺兢,則處理request茂缚,生成對(duì)應(yīng)的response對(duì)象,push到response數(shù)組
- 進(jìn)入Done標(biāo)記位的代碼:
a. 先處理Native的Message屋谭,調(diào)用Native的Handler來(lái)處理該Message
b. 再處理Resposne數(shù)組嚎研,POLL_CALLBACK類型的事件
參考https://www.tinymind.net.cn/articles/479757df993a94
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
// 將Java層傳遞下來(lái)的mPtr轉(zhuǎn)換為nativeMessageQueue
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
//調(diào)用wake函數(shù)
nativeMessageQueue->wake();
}
void NativeMessageQueue::wake() {
mLooper->wake();
}
// Looper.h
void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ wake", this);
#endif
uint64_t inc = 1;
// 向管道m(xù)WakeEventFd寫入字符1
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
if (nWrite != sizeof(uint64_t)) {
if (errno != EAGAIN) {
ALOGW("Could not write wake signal, errno=%d", errno);
}
}
}
Looper類的
wake()
函數(shù)只是往mWakeEventfd中寫了一些內(nèi)容,這個(gè)fd只是通知而已支示,類似于pipe查邢,最后會(huì)把epoll_wait()
喚醒,線程就不阻塞了繼續(xù)發(fā)送 Native層的消息我擂,然后處理之前的addFd事件衬以,然后處理Java層的消息。
這里面涉及pipe/epoll機(jī)制校摩。
pipe(管道)看峻,是指用于連接一個(gè)讀進(jìn)程和一個(gè)寫進(jìn)程的共享文件,又稱pipe文件衙吩。
向管道(共享文件)提供輸入的發(fā)送進(jìn)程(即寫進(jìn)程)互妓,以字符流的形式將大量數(shù)據(jù)送入管道(寫入過(guò)程);而接受管道輸出的接收進(jìn)程(即讀進(jìn)程),可從管道接收數(shù)據(jù)冯勉,標(biāo)準(zhǔn)的生產(chǎn)者消費(fèi)者模式澈蚌。
為了協(xié)調(diào)雙方的通信,管道通信機(jī)制必須提供以下3 方面的協(xié)調(diào)能力:
- 互斥灼狰。當(dāng)一個(gè)進(jìn)程正在對(duì) pipe 進(jìn)行讀/寫操作時(shí)宛瞄,另一個(gè)進(jìn)程必須等待。
- 同步交胚。當(dāng)寫(輸入)進(jìn)程把一定數(shù)量(如4KB)數(shù)據(jù)寫入 pipe 后份汗,便去睡眠等待,直到讀(輸出)進(jìn)程取走數(shù)據(jù)后承绸,再把它喚醒裸影。當(dāng)讀進(jìn)程讀到一空 pipe 時(shí),也應(yīng)睡眠等待军熏,直至寫進(jìn)程將數(shù)據(jù)寫入管道后轩猩,才將它喚醒。
-
對(duì)方是否存在荡澎。只有確定對(duì)方已存在時(shí)均践,才能進(jìn)行通信。
epoll是Linux I/O多路復(fù)用的一種機(jī)制摩幔,可以監(jiān)視多個(gè)描述符fd彤委,一旦某個(gè)描述符就緒,能夠通知程序進(jìn)行相應(yīng)的操作或衡。