摘要
最近在看WMS代碼迹炼,里面好多都涉及到Handler砸彬, Looper通信,相比Binder通信斯入,Handler適用于線程間通信砂碉,并且沒有Binder那么復(fù)雜,也容易理解刻两,對于更新UI操作更是需要Handler增蹭,本篇就專門介紹下Handler相關(guān)內(nèi)容,包括App層的使用磅摹,F(xiàn)WK和Native的具體實(shí)現(xiàn)滋迈,通過這塊內(nèi)容介紹, 可以對這塊有一個(gè)清晰的認(rèn)識(shí)户誓。
Handler 使用舉例
我們都知道對于App饼灿,只有UI線程才可以更新UI,其他線程更新UI會(huì)直接導(dǎo)致應(yīng)用crash帝美,對于非UI線程需要更新UI的碍彭,可以通過handler將數(shù)據(jù)發(fā)送過去, UI線程會(huì)讀取數(shù)據(jù)并執(zhí)行更新UI操作悼潭。下面是一個(gè)例子庇忌,每次點(diǎn)擊按鈕都會(huì)隨機(jī)生成一個(gè)字符串,并且將該字符串放入一個(gè)BlockingQueue女责,內(nèi)部一個(gè)非UI線程循環(huán)讀取該BlockingQueue漆枚,并將內(nèi)容通過Handler顯示到UI上创译。代碼如下:
public class MainActivity extends AppCompatActivity {
private static final String TAG = "MyTest";
private static final BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>(10);
private static Thread thread = null;
private static final Lock lock = new ReentrantLock();
private static Handler handler = null;
private static TextView textView = null;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
init();
Log.d(TAG, "hello");
Button button = findViewById(R.id.button1);
button.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 10; i++) {
sb.append((char)('a' + (int) (Math.random() * ('z' - 'a'))));
}
Log.i(TAG, "content " + sb.toString());
blockingQueue.offer(sb.toString());
}
});
}
private void init() {
lock.lock();
textView = (TextView)findViewById(R.id.textView);
handler = new MyHandler(Looper.myLooper());
if (thread == null) {
thread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
String content = blockingQueue.take();
Message message = handler.obtainMessage(1);
message.obj = content;
handler.sendMessage(message);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
thread.start();
}
lock.unlock();
}
static class MyHandler extends Handler {
@Override
public void handleMessage(Message msg) {
Log.i(TAG, "receive " + msg.what);
switch (msg.what) {
case 1:
textView.setText((String)msg.obj);
break;
default:
Log.e(TAG, "unknown what");
}
}
public MyHandler(Looper looper) {
super(looper);
}
}
}
關(guān)鍵點(diǎn)就在于自己需要繼承Handler類并實(shí)現(xiàn)handleMessage方法抵知,并提供一個(gè)構(gòu)造函數(shù),可以使用指定的Looper來初始化該Handler软族。每個(gè)應(yīng)用在啟動(dòng)的時(shí)候會(huì)自動(dòng)創(chuàng)建一個(gè)Looper刷喜,每個(gè)線程只能有一個(gè)Looper,因?yàn)檫@個(gè)是一個(gè)線程私有變量立砸。下面是Looper成員的關(guān)系圖掖疮。
可以發(fā)現(xiàn)Fwk和Native都有一套Looper,而包含關(guān)系正好相反颗祝,F(xiàn)wk層是Looper包含MessageQueue浊闪,而Native是MessageQueue包含Looper恼布,F(xiàn)wk的具體實(shí)現(xiàn)是完全由Native提供的。
Looper代碼解讀
我們知道android應(yīng)用搁宾,包括system_server都會(huì)運(yùn)行ActivityThread的main函數(shù)折汞,并且將之作為主線程,我們就從這塊開始介紹
public static void main(String[] args) {
Trace.traceBegin(Trace.TRACE_TAG_ACTIVITY_MANAGER, "ActivityThreadMain");
// Install selective syscall interception
AndroidOs.install();
// CloseGuard defaults to true and can be quite spammy. We
// disable it here, but selectively enable it later (via
// StrictMode) on debug builds, but using DropBox, not logs.
CloseGuard.setEnabled(false);
Environment.initForCurrentUser();
// Make sure TrustedCertificateStore looks in the right place for CA certificates
final File configDir = Environment.getUserConfigDirectory(UserHandle.myUserId());
TrustedCertificateStore.setDefaultUserDirectory(configDir);
// Call per-process mainline module initialization.
initializeMainlineModules();
Process.setArgV0("<pre-initialized>");
Looper.prepareMainLooper();
if (sMainThreadHandler == null) {
sMainThreadHandler = thread.getHandler();
}
if (false) {
Looper.myLooper().setMessageLogging(new LogPrinter(Log.DEBUG, "ActivityThread"));
}
// End of event ActivityThreadMain.
Trace.traceEnd(Trace.TRACE_TAG_ACTIVITY_MANAGER);
Looper.loop();
...
這兒調(diào)用了Looper.prepareMainLooper()盖腿,這個(gè)就是為主線程生成默認(rèn)的Looper爽待,如果普通線程需要使用Handler就需要自己手動(dòng)創(chuàng)建Looper了◆娓看下具體實(shí)現(xiàn)
public static void prepareMainLooper() {
prepare(false);
synchronized (Looper.class) {
if (sMainLooper != null) {
throw new IllegalStateException("The main Looper has already been prepared.");
}
sMainLooper = myLooper();
}
}
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}
這塊的意思就是一個(gè)線程只允許擁有一個(gè)Looper鸟款,在初始化的時(shí)候會(huì)會(huì)主線程生成一個(gè)不允許退出的Looper并且設(shè)置到主線程變量中。對于普通線程茂卦,Looper就可以是允許退出的了何什,這塊可以通過prepare的參數(shù)來控制。那如何拿到主線程Looper呢疙筹?看下下面這個(gè)函數(shù)
public static Looper getMainLooper() {
synchronized (Looper.class) {
return sMainLooper;
}
}
通過調(diào)用getMainLooper就可以拿到主線程Looper富俄。
接下來看下loop實(shí)現(xiàn)
public static void loop() {
final Looper me = myLooper(); //拿到當(dāng)前線程Looper
if (me == null) {
throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
}
if (me.mInLoop) {
Slog.w(TAG, "Loop again would have the queued messages be executed"
+ " before this one completed.");
}
me.mInLoop = true;
final MessageQueue queue = me.mQueue;
// Make sure the identity of this thread is that of the local process,
// and keep track of what that identity token actually is.
Binder.clearCallingIdentity(); // 清理Binder pid,uid而咆,使得通過IPC接口拿到的uid和pid都是本進(jìn)程的uid霍比,pid
final long ident = Binder.clearCallingIdentity();
// Allow overriding a threshold with a system prop. e.g.
// adb shell 'setprop log.looper.1000.main.slow 1 && stop && start'
final int thresholdOverride =
SystemProperties.getInt("log.looper."
+ Process.myUid() + "."
+ Thread.currentThread().getName()
+ ".slow", 0);
boolean slowDeliveryDetected = false;
for (;;) {
Message msg = queue.next(); // might block // 阻塞式等待,最終是會(huì)阻塞到epoll_wait上等待消息暴备,這塊是通過讀寫eventfd實(shí)現(xiàn)的悠瞬,相比pipe優(yōu)勢很大,后面具體介紹
if (msg == null) {
// No message indicates that the message queue is quitting.
return;
}
...
long origWorkSource = ThreadLocalWorkSource.setUid(msg.workSourceUid);
try {
msg.target.dispatchMessage(msg); // 調(diào)用handler的處理函數(shù)
if (observer != null) {
observer.messageDispatched(token, msg);
}
dispatchEnd = needEndTime ? SystemClock.uptimeMillis() : 0;
} catch (Exception exception) {
if (observer != null) {
observer.dispatchingThrewException(token, msg, exception);
}
throw exception;
} finally {
ThreadLocalWorkSource.restore(origWorkSource);
if (traceTag != 0) {
Trace.traceEnd(traceTag);
}
}
...
msg.recycleUnchecked();
}
}
這個(gè)函數(shù)比較長涯捻,很多都是各種維測打印浅妆,我這邊把維測相關(guān)的信息去掉了,這樣只關(guān)心流程主線即可障癌。從上述代碼可以看到凌外,loop的流程如下:
- 循環(huán)從MessageQueue中拿到待處理的Message
- 調(diào)用Message中target的dispatchMessage方法, Message的target就是一個(gè)Handler涛浙。
- 回收Message康辑,Message做了一個(gè)對象池,創(chuàng)建Message先從對象池中獲取轿亮,如果獲取失敗再從堆上申請疮薇,釋放也是先放到對象池中,可以提升對象獲取速度我注,對性能和內(nèi)存都有好處按咒。
看下MessageQueue的next實(shí)現(xiàn):
Message next() {
// Return here if the message loop has already quit and been disposed.
// This can happen if the application tries to restart a looper after quit
// which is not supported.
final long ptr = mPtr;
if (ptr == 0) {
return null;
}
int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
nativePollOnce(ptr, nextPollTimeoutMillis); //按照最近Message的截止時(shí)間作為超時(shí)時(shí)間阻塞到epoll_wait上,返回后但骨,說明等待了足夠的時(shí)間励七,應(yīng)該有Message到時(shí)間了智袭。后面會(huì)詳細(xì)介紹具體實(shí)現(xiàn)
synchronized (this) {
// Try to retrieve the next message. Return if found.
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
if (msg != null && msg.target == null) {
// Stalled by a barrier. Find the next asynchronous message in the queue.
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
if (now < msg.when) {
// Next message is not ready. Set a timeout to wake up when it is ready.
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE); // 計(jì)算下一次最接近Message的等待時(shí)間
} else {
// Got a message.
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
msg.markInUse();
return msg;
}
} else {
// No more messages.
nextPollTimeoutMillis = -1;
}
...
}
Handler代碼解讀
使用Handler的地方,一個(gè)是復(fù)寫它的handleMessage方法掠抬,一個(gè)是獲取Message對象补履,一個(gè)是發(fā)送Message對象,下面分別介紹下剿另。
handleMessage
/**
* Subclasses must implement this to receive messages.
*/
public void handleMessage(@NonNull Message msg) {
}
可以看到是空實(shí)現(xiàn)箫锤,所有需要自定義Message處理函數(shù)的場景均需要繼承Handler類并重寫下這個(gè)方法,當(dāng)然也可以直接在msg中提供一個(gè)callback雨女,這樣不重寫不繼承也沒問題谚攒,使用上更加簡單。
獲取Message對象
public final Message obtainMessage()
{
return Message.obtain(this);
}
public final Message obtainMessage(int what)
{
return Message.obtain(this, what);
}
public final Message obtainMessage(int what, @Nullable Object obj) {
return Message.obtain(this, what, obj);
}
public final Message obtainMessage(int what, int arg1, int arg2)
{
return Message.obtain(this, what, arg1, arg2);
}
public final Message obtainMessage(int what, int arg1, int arg2, @Nullable Object obj) {
return Message.obtain(this, what, arg1, arg2, obj);
}
public static Message obtain() {
synchronized (sPoolSync) {
if (sPool != null) {
Message m = sPool;
sPool = m.next;
m.next = null;
m.flags = 0; // clear in-use flag
sPoolSize--;
return m;
}
}
return new Message();
}
/**
* Same as {@link #obtain()}, but copies the values of an existing
* message (including its target) into the new one.
* @param orig Original message to copy.
* @return A Message object from the global pool.
*/
public static Message obtain(Message orig) {
Message m = obtain();
m.what = orig.what;
m.arg1 = orig.arg1;
m.arg2 = orig.arg2;
m.obj = orig.obj;
m.replyTo = orig.replyTo;
m.sendingUid = orig.sendingUid;
m.workSourceUid = orig.workSourceUid;
if (orig.data != null) {
m.data = new Bundle(orig.data);
}
m.target = orig.target;
m.callback = orig.callback;
return m;
}
可以看到實(shí)現(xiàn)方法都是從對象池里面獲取一個(gè)Message氛堕,然后按照參數(shù)對應(yīng)賦值即可馏臭。
發(fā)送Message對象
public final boolean sendMessage(@NonNull Message msg) {
return sendMessageDelayed(msg, 0);
}
public final boolean sendEmptyMessage(int what)
{
return sendEmptyMessageDelayed(what, 0);
}
public final boolean post(@NonNull Runnable r) {
return sendMessageDelayed(getPostMessage(r), 0);
}
private boolean enqueueMessage(@NonNull MessageQueue queue, @NonNull Message msg,
long uptimeMillis) {
msg.target = this; // 關(guān)鍵操作:將target指定為當(dāng)前Handler
msg.workSourceUid = ThreadLocalWorkSource.getUid();
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}
...
這兒發(fā)送Message的方法也很多,都是針對創(chuàng)建Message的各種封裝簡化讼稚,最終都是調(diào)用的enqueueMessage括儒, 這個(gè)函數(shù)里面有個(gè)關(guān)鍵操作就是將Message的target指定為當(dāng)前Handler,然后調(diào)用MessageQueue的enqueueMessage方法锐想。
看下該方法的實(shí)現(xiàn):
boolean enqueueMessage(Message msg, long when) {
if (msg.target == null) {
throw new IllegalArgumentException("Message must have a target.");
}
synchronized (this) {
if (msg.isInUse()) {
throw new IllegalStateException(msg + " This message is already in use.");
}
if (mQuitting) {
IllegalStateException e = new IllegalStateException(
msg.target + " sending message to a Handler on a dead thread");
Log.w(TAG, e.getMessage(), e);
msg.recycle();
return false;
}
msg.markInUse();
msg.when = when;
Message p = mMessages;
boolean needWake;
if (p == null || when == 0 || when < p.when) {
// New head, wake up the event queue if blocked.
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
// Inserted within the middle of the queue. Usually we don't have to wake
// up the event queue unless there is a barrier at the head of the queue
// and the message is the earliest asynchronous message in the queue.
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) { // 每個(gè)Message有個(gè)觸發(fā)時(shí)間帮寻,這兒是按照觸發(fā)時(shí)間順序插入Message,越在前面的觸發(fā)時(shí)間越早
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
// We can assume mPtr != 0 because mQuitting is false.
if (needWake) {
nativeWake(mPtr); //喚醒操作赠摇,后面具體介紹
}
}
return true;
}
這塊就是將Message按照時(shí)間順序插入到Message隊(duì)列中固逗,然后執(zhí)行下喚醒操作,那這兒的喚醒是如何喚醒next中的阻塞呢藕帜?這塊就需要了解native的實(shí)現(xiàn)了烫罩,下面開始看下吧。
NativeMessageQueue實(shí)現(xiàn)
MessageQueue是這樣構(gòu)造的
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit();
}
這兒的mPtr就是NativeMessageQueue對象的指針洽故,通過在Java中保存Native對象的指針來操作Native對象贝攒。看下具體實(shí)現(xiàn)
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return 0;
}
nativeMessageQueue->incStrong(env);
return reinterpret_cast<jlong>(nativeMessageQueue); //將native對象指針傳遞給java
}
NativeMessageQueue::NativeMessageQueue() :
mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
mLooper = Looper::getForThread(); // 獲取native的Looper
if (mLooper == NULL) {
mLooper = new Looper(false);
Looper::setForThread(mLooper);
}
}
這兒就實(shí)現(xiàn)了Fwk使用Native的Looper时甚。接下來看下兩個(gè)關(guān)鍵調(diào)用的具體實(shí)現(xiàn)隘弊,一個(gè)是nativePollOnce, 一個(gè)是nativeWake撞秋。
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj;
mLooper->pollOnce(timeoutMillis);
mPollObj = NULL;
mPollEnv = NULL;
if (mExceptionObj) {
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->wake();
}
void NativeMessageQueue::wake() {
mLooper->wake();
}
可以看到上面兩個(gè)函數(shù)最終都是調(diào)用的Native Looper的對應(yīng)調(diào)用长捧,接下來就看下Native的Looper是如何實(shí)現(xiàn)的嚣鄙。
Native Looper介紹
先看下下面這兩個(gè)函數(shù)吻贿,通過這兩個(gè)函數(shù)可以看出Native的Looper也是線程級的變量。
void Looper::setForThread(const sp<Looper>& looper) {
sp<Looper> old = getForThread(); // also has side-effect of initializing TLS
if (looper != nullptr) {
looper->incStrong((void*)threadDestructor);
}
pthread_setspecific(gTLSKey, looper.get());
if (old != nullptr) {
old->decStrong((void*)threadDestructor);
}
}
sp<Looper> Looper::getForThread() {
int result = pthread_once(& gTLSOnce, initTLSKey);
LOG_ALWAYS_FATAL_IF(result != 0, "pthread_once failed");
return (Looper*)pthread_getspecific(gTLSKey);
}
看下Looper的初始化:
Looper::Looper(bool allowNonCallbacks)
: mAllowNonCallbacks(allowNonCallbacks),
mSendingMessage(false),
mPolling(false),
mEpollRebuildRequired(false),
mNextRequestSeq(0),
mResponseIndex(0),
mNextMessageUptime(LLONG_MAX) {
mWakeEventFd.reset(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC)); //創(chuàng)建eventfd
LOG_ALWAYS_FATAL_IF(mWakeEventFd.get() < 0, "Could not make wake event fd: %s", strerror(errno));
AutoMutex _l(mLock);
rebuildEpollLocked();
}
void Looper::rebuildEpollLocked() {
// Close old epoll instance if we have one.
if (mEpollFd >= 0) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this);
#endif
mEpollFd.reset();
}
// Allocate the new epoll instance and register the wake pipe.
mEpollFd.reset(epoll_create1(EPOLL_CLOEXEC));
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));
struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeEventFd.get();
int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &eventItem); //注冊eventfd 輸入事件
LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s",
strerror(errno));
for (size_t i = 0; i < mRequests.size(); i++) {
const Request& request = mRequests.valueAt(i);
struct epoll_event eventItem;
request.initEventItem(&eventItem);
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, request.fd, &eventItem);
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s",
request.fd, strerror(errno));
}
}
}
這塊最主要的就是eventfd的使用哑子,之前android使用的是pipe舅列,后來換成了eventfd肌割,這兒使用eventfd相對于pipe有以下幾個(gè)好處:
- 對于進(jìn)程間通信,如果使用pipe帐要,就需要每個(gè)進(jìn)程創(chuàng)建2個(gè)fd把敞,如果是n個(gè)進(jìn)程,那么就需要2n個(gè)fd榨惠,并且每個(gè)京城都需要維護(hù)這么多個(gè)fd奋早,而fd對于進(jìn)程是很寶貴的資源,一共也才1024個(gè)赠橙。而使用eventfd就只需要一個(gè)fd就可以了
- 使用pipe效率沒有eventfd高耽装,eventfd就是一個(gè)計(jì)數(shù)器,內(nèi)容就是一個(gè)32字節(jié)的整數(shù)期揪,傳輸開銷可以忽略不計(jì)掉奄,而pipe則需要內(nèi)存至少分配一個(gè)4k內(nèi)存。
接下來看下pollOnce的實(shí)現(xiàn):
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
if (ident >= 0) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
"fd=%d, events=0x%x, data=%p",
this, ident, fd, events, data);
#endif
if (outFd != nullptr) *outFd = fd;
if (outEvents != nullptr) *outEvents = events;
if (outData != nullptr) *outData = data;
return ident;
}
}
if (result != 0) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
if (outFd != nullptr) *outFd = 0;
if (outEvents != nullptr) *outEvents = 0;
if (outData != nullptr) *outData = nullptr;
return result;
}
result = pollInner(timeoutMillis);
}
}
int Looper::pollInner(int timeoutMillis) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
#endif
// Adjust the timeout based on when the next message is due.
if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
if (messageTimeoutMillis >= 0
&& (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
timeoutMillis = messageTimeoutMillis;
}
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - next message in %" PRId64 "ns, adjusted timeout: timeoutMillis=%d",
this, mNextMessageUptime - now, timeoutMillis);
#endif
}
// Poll.
int result = POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
// We are about to idle.
mPolling = true;
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis); // 阻塞等待
// No longer idling.
mPolling = false;
// Acquire lock.
mLock.lock();
// Rebuild epoll set if needed.
if (mEpollRebuildRequired) {
mEpollRebuildRequired = false;
rebuildEpollLocked();
goto Done;
}
// Check for poll error.
if (eventCount < 0) {
if (errno == EINTR) {
goto Done;
}
ALOGW("Poll failed with an unexpected error: %s", strerror(errno));
result = POLL_ERROR;
goto Done;
}
// Check for poll timeout.
if (eventCount == 0) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - timeout", this);
#endif
result = POLL_TIMEOUT;
goto Done;
}
// Handle all events.
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif
for (int i = 0; i < eventCount; i++) { //對于Fwk的Looper凤薛,其實(shí)一般只有一個(gè)fd姓建,就是eventfd,除非也通過addFd設(shè)置了某些fd的事件回調(diào)
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeEventFd.get()) {
if (epollEvents & EPOLLIN) {
awoken();
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
}
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
pushResponse(events, mRequests.valueAt(requestIndex));
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}
Done: ;
// Invoke pending message callbacks.
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) { // native Message消息處理缤苫,類似于Fwk的速兔,MessageEnvelope中保存有hander。
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
// Remove the envelope from the list.
// We keep a strong reference to the handler until the call to handleMessage
// finishes. Then we drop it so that the handler can be deleted *before*
// we reacquire our lock.
{ // obtain handler
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
this, handler.get(), message.what);
#endif
handler->handleMessage(message);
} // release handler
mLock.lock();
mSendingMessage = false;
result = POLL_CALLBACK;
} else {
// The last message left at the head of the queue determines the next wakeup time.
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}
// Release lock.
mLock.unlock();
// Invoke all response callbacks.
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if (response.request.ident == POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
this, response.request.callback.get(), fd, events, data);
#endif
// Invoke the callback. Note that the file descriptor may be closed by
// the callback (and potentially even reused) before the function returns so
// we need to be a little careful when removing the file descriptor afterwards.
int callbackResult = response.request.callback->handleEvent(fd, events, data);
if (callbackResult == 0) {
removeFd(fd, response.request.seq);
}
// Clear the callback reference in the response structure promptly because we
// will not clear the response vector itself until the next poll.
response.request.callback.clear();
result = POLL_CALLBACK;
}
}
return result;
}
可以看出pollOnce主要就是在epoll_wait上阻塞等待活玲,要不有fd事件喚醒憨栽,要不就是超時(shí)返回,這塊也有native massage的處理翼虫,類似于Fwk的屑柔,調(diào)用Message相關(guān)的Handler中的handleMessage。
接下來看下wake的實(shí)現(xiàn):
void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ wake", this);
#endif
uint64_t inc = 1;
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t)));
if (nWrite != sizeof(uint64_t)) {
if (errno != EAGAIN) {
LOG_ALWAYS_FATAL("Could not write wake signal to fd %d (returned %zd): %s",
mWakeEventFd.get(), nWrite, strerror(errno));
}
}
}
看到這里應(yīng)該就有豁然開朗的感覺了珍剑,這兒就向eventfd中寫入一個(gè)數(shù)字掸宛,這樣就可以把阻塞到epoll_wait上的線程喚醒了。
總結(jié)
本篇通過例子招拙,源碼介紹了下Android中的Handler機(jī)制唧瘾,本質(zhì)上就是一個(gè)支持跨進(jìn)程的基于IO多路復(fù)用的生產(chǎn)消費(fèi)者框架,能理解到這里别凤,Handler應(yīng)該算是徹底明白了饰序。