前菜
介紹
Handler機(jī)制為Android提供給予開發(fā)者用于線程間切換,處理Java層消息與Native層消息的一種機(jī)制毫炉。
相關(guān)類
Java層
- Thread
- ThreadLocal
- ThreadLocalMap
- Looper
- Message
- MessageQueue
- Handler
Native層
- Looper
- ALooper
- NativeMessageQueue
- Message
- MessageHandler
- WeakMessageHandler
基礎(chǔ)知識介紹
1. ThreadLocal
源碼鏈接
類關(guān)系
public class ThreadLocal<T> {
static class ThreadLocalMap {
private static final int INITIAL_CAPACITY = 16;
private Entry[] table = new Entry[INITIAL_CAPACITY];
static class Entry extends WeakReference<ThreadLocal<?>> {
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
}
}
一般使用ThreadLocal例子创橄,以Looper為例沸版。
static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();
...
sThreadLocal.set(new Looper(quitAllowed));
儲存數(shù)據(jù)
public class ThreadLocal<T> {
...
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);//第一次存儲椅您,該map一定為空
}
//從當(dāng)前線程的 threadLocals 變量獲取ThreadLocalMap對象
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
//創(chuàng)建ThreadLocalMap對象寡壮,并保存到當(dāng)前線程的threadLocals變量中。
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
}
public class Thread implements Runnable{
...
ThreadLocal.ThreadLocalMap threadLocals = null;
...
}
ThreadLocalMap結(jié)構(gòu)
static class ThreadLocalMap{
...
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
table = new Entry[INITIAL_CAPACITY];
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
table[i] = new Entry(firstKey, firstValue);
size = 1;
setThreshold(INITIAL_CAPACITY);
}
private void set(ThreadLocal<?> key, Object value) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
if (k == key) {
e.value = value;
return;
}
//如果ThreadLocal被回收了诀诊,則也進(jìn)行Entry數(shù)組的回收工作
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}
tab[i] = new Entry(key, value);
int sz = ++size;
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}
}
讀取數(shù)據(jù)
public class ThreadLocal<T> {
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
//第一次獲取盆顾,map為空,直接走初始化方法
return setInitialValue();
}
private T setInitialValue() {
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
return value;
}
//如果之前沒有保存過該ThreadLocal畏梆,則先存入一個null進(jìn)入。
protected T initialValue() {
return null;
}
}
<meta name="source" content="lake">結(jié)論
- ThreadLocal 里面有一個靜態(tài)內(nèi)部類 ThreadLocalMap奈懒,ThreadLocalMap 有個靜態(tài)類 Entry奠涌,Entry繼承 WeakReference。
- ThreadLocal 是一個工具類磷杏,主要負(fù)責(zé)從當(dāng)前 Thread 中操作 ThreadLocalMap溜畅。
- ThreadLocalMap 里面維護(hù)著一個 Entry 數(shù)組,每一個 Entry 對象都保存著一個以ThreadLocal 為key极祸,Object 為value的結(jié)構(gòu)慈格。
- Entry 里面的key被封裝為弱引用,便于釋放回收處理遥金。
- 在同一個線程中浴捆,ThreadLocal對象與Object進(jìn)行組合,同一個ThreadLocal對象稿械,在Entry中只能保存一個值选泻。
2. epoll 機(jī)制
epoll 是 Linux 提供的高效訪問 I/O 的機(jī)制。
1.使用epoll前美莫,需要先通過 epoll_create 函數(shù)創(chuàng)建一個epoll句柄页眯。
// 創(chuàng)建能容納10個fd相關(guān)信息的緩存。
int epollHandle = epoll_create( 10 );
2.得到epoll句柄后厢呵,通過epoll_ctl 把需要監(jiān)聽的文件句柄加入 epoll 句柄中窝撵。
// 先定義一個event
struct epoll_event listenEvent;
// 指定該句柄的可讀事件
// EPOLLIN(句柄可讀)
// EPOLLOUT(句柄可寫)
// EPOLLERR(句柄錯誤)
// EPOLLUP(句柄斷)
listenEvent.events = EPOLLIN;
// epoll_event 中有個聯(lián)合體叫data,用來存儲上下文數(shù)據(jù)襟铭。
data.fd = listenEvent;
3.EPOLL_CTL_ADD將監(jiān)聽fd和監(jiān)聽事件加入epoll句柄的等待隊列中碌奉。
// EPOLL_CTL_DEL 將監(jiān)聽fd從epoll句柄中移除短曾。
// EPOLL_CTL_MOD修改fd的監(jiān)聽事件。
epoll_ctl(epollHandle, Epoll_CTL_ADD ,listener, &listenEvent)
4調(diào)用epoll_wait用于等待事件道批。
struct epoll_event resultEvents[10];
......
int timeout = -1;
while(1){
int nfds == epoll_wait(epollHandle,resultEvents,10,timeout);
// nfds 大于0表示所監(jiān)聽的句柄上有事件發(fā)生错英。
// nfds 等于0表示等待超時。
// nfds 小于0表示等待過程中發(fā)生了錯誤隆豹。
//
if(nfds == -1){
//錯誤
}else if(nfds == 0){
//超時
}else{
//resultEvents 用于返回哪些發(fā)生了事件的信息椭岩。
struct epoll_event &event = resultEvents[i];
if(event & EPOLLIN){
//收到可讀事件
......
}
......//其他處理
}
}
//添加監(jiān)聽
int Looper::addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data) ;
//處理監(jiān)控請求
void Looper::pushResponse(int events, const Request& request) ;
正餐
Looper、MessageQueue璃赡、NativeMessageQueue創(chuàng)建過程
相關(guān)源碼鏈接:
一般使用Looper判哥,以ActivityThread為例。
public final class ActivityThread{
public static void main(String[] args) {
Looper.prepareMainLooper();
...
Looper.loop();
}
}
構(gòu)建Looper對象
使用ThreadLocal工具保存到ThreadLocalMap中碉考。
public final class Looper {
//ThreadLocal聲明為靜態(tài)對象塌计,確保在同一個線程下,只有一個Looper對象存在侯谁。
static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();
public static void prepareMainLooper() {
prepare(false);
synchronized (Looper.class) {
if (sMainLooper != null) {
throw new IllegalStateException("The main Looper has already been prepared.");
}
sMainLooper = myLooper();
}
}
//創(chuàng)建Looper對象锌仅,并將Looper對象保存到ThreadLocalMap中。
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}
//從ThreadLocalMap中獲取Looper對象墙贱。
public static Looper myLooper() {
return sThreadLocal.get();
}
}
構(gòu)建MessageQueue隊列
public final class Looper {
final MessageQueue mQueue;
final Thread mThread;
...
private Looper(boolean quitAllowed) {
mQueue = new MessageQueue(quitAllowed);
mThread = Thread.currentThread();
}
}
public final class MessageQueue {
private final boolean mQuitAllowed;
private long mPtr;
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
//構(gòu)造函數(shù)調(diào)用naitveInit热芹,該函數(shù)由Native層實現(xiàn)。
//Native層的 NativeMessageQueue 對象引用指針惨撇。
mPtr = nativeInit();
}
}
構(gòu)建Native層 NativeMessageQueue 對象
nativeInit 方法在 frameworks/base/core/jni/android_os_MessageQueue.cpp 定義實現(xiàn)伊脓。
class NativeMessageQueue : public MessageQueue, public LooperCallback {
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return 0;
}
nativeMessageQueue->incStrong(env);
return reinterpret_cast<jlong>(nativeMessageQueue);
}
NativeMessageQueue::NativeMessageQueue() :
mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
//一個線程會有一個Looper循環(huán)來處理消息隊列中的消息。
//獲取保存在本地線程的存儲空間的Looper對象
mLooper = Looper::getForThread();
if (mLooper == NULL) {
//如果第一次進(jìn)來魁衙,則該線程沒有設(shè)置本地存儲报腔,所以需要先創(chuàng)建一個Looper,然后后再將其保存到TLS中剖淀,
//這是很常見的一種以線程為單位的單利模式纯蛾。
mLooper = new Looper(false);
Looper::setForThread(mLooper);
}
}
}
注冊epoll機(jī)制
Looper::Looper(bool allowNonCallbacks)
: mAllowNonCallbacks(allowNonCallbacks),
mSendingMessage(false),
mPolling(false),
mEpollRebuildRequired(false),
mNextRequestSeq(0),
mResponseIndex(0),
mNextMessageUptime(LLONG_MAX) {
// Android 6.0 之前為 pipe, 6.0 之后為 eventfd
mWakeEventFd.reset(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC));
LOG_ALWAYS_FATAL_IF(mWakeEventFd.get() < 0, "Could not make wake event fd: %s", strerror(errno));
AutoMutex _l(mLock);
rebuildEpollLocked();
}
void Looper::rebuildEpollLocked() {
// 如果有新的epoll事件,則把原來的epoll事件關(guān)閉
if (mEpollFd >= 0) {
mEpollFd.reset();
}
// Allocate the new epoll instance and register the wake pipe.
// 創(chuàng)建一個epoll實例纵隔,并注冊wake管道
mEpollFd.reset(epoll_create1(EPOLL_CLOEXEC));
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));
struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
// 監(jiān)聽可讀事件
eventItem.events = EPOLLIN;
// 監(jiān)控的fd為喚醒事件fd
eventItem.data.fd = mWakeEventFd.get();
int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &eventItem);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s",
strerror(errno));
// 如果請求隊列中有數(shù)據(jù)茅撞,則還需要將請求中的事件注冊到epoll實例中
for (size_t i = 0; i < mRequests.size(); i++) {
const Request& request = mRequests.valueAt(i);
struct epoll_event eventItem;
request.initEventItem(&eventItem);
// 注冊請求中的事件到epoll實例中
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, request.fd, &eventItem);
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s",
request.fd, strerror(errno));
}
}
}
<meta name="source" content="lake">Handler
相關(guān)源碼鏈接:
構(gòu)建Handler
public class Handler {
final Looper mLooper;
final MessageQueue mQueue;
public Handler() {
this(null, false);
}
public Handler(Callback callback) {
this(callback, false);
}
public Handler(Looper looper) {
this(looper, null, false);
}
...
public Handler(Looper looper,Callback callback) {
this(looper, callback, false);
}
public Handler(@Nullable Callback callback, boolean async) {
if (FIND_POTENTIAL_LEAKS) {
final Class<? extends Handler> klass = getClass();
if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) &&
(klass.getModifiers() & Modifier.STATIC) == 0) {
Log.w(TAG, "The following Handler class should be static or leaks might occur: " +
klass.getCanonicalName());
}
}
mLooper = Looper.myLooper();
if (mLooper == null) {
throw new RuntimeException(
"Can't create handler inside thread " + Thread.currentThread()
+ " that has not called Looper.prepare()");
}
mQueue = mLooper.mQueue;
mCallback = callback;
mAsynchronous = async;
}
public Handler(@NonNull Looper looper, @Nullable Callback callback, boolean async) {
mLooper = looper;
mQueue = looper.mQueue;
mCallback = callback;
mAsynchronous = async;
}
}
通過Handler發(fā)送Message
舉個栗子
Handler mHandler = new Handler();
#1.1 發(fā)送空消息
mHandler.sendEmptyMessage(what);
#1.3 發(fā)送延時消息
mHandler.sendEmptyMessageDelayed(what, 0);
#2.1 發(fā)送Message對象
Message msg = Message.obtail();
msg.what = ...;
msg.obj = ...;
mHandler.sendMessage(msg);
#2.2 發(fā)送延時消息
mHandler.sendMessageDelayed(msg,1000);
#2.3 在指定時間發(fā)送消息
mHandler.sendMessageAtTime(msg,uptimeMillis);
#4. 發(fā)送Runnable消息
mHandler.post(Runnable r);
...
public class Handler {
//方式一
public final boolean sendEmptyMessage(int what) {
return sendEmptyMessageDelayed(what, 0);
}
public final boolean sendEmptyMessageDelayed(int what, long delayMillis) {
Message msg = Message.obtain();
msg.what = what;
return sendMessageDelayed(msg, delayMillis);
}
//方式二
public final boolean sendMessage(Message msg) {
return sendMessageDelayed(msg, 0);
}
//方式三
public final boolean post(@NonNull Runnable r) {
return sendMessageDelayed(getPostMessage(r), 0);
}
private static Message getPostMessage(Runnable r) {
Message m = Message.obtain();
m.callback = r;
return m;
}
public final boolean sendMessageDelayed(Message msg, long delayMillis) {
if (delayMillis < 0) {
delayMillis = 0;
}
return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
}
public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
MessageQueue queue = mQueue;
if (queue == null) {
RuntimeException e = new RuntimeException(
this + " sendMessageAtTime() called with no mQueue");
Log.w("Looper", e.getMessage(), e);
return false;
}
return enqueueMessage(queue, msg, uptimeMillis);
}
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
//這里將當(dāng)前Handler進(jìn)行保存,主要為了后續(xù)分發(fā)到對應(yīng) Handler 的對象的 handleMessage 方法巨朦。
msg.target = this;
msg.workSourceUid = ThreadLocalWorkSource.getUid();
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}
}
MessageQueue處理邏輯
Message結(jié)構(gòu)
public final class Message implements Parcelable {
public int what;
public int arg1;
public int arg2;
public Object obj;
Handler target;//發(fā)送該Message的對象米丘,用于回調(diào)
Runnable callback;
Message next;
public long when;//執(zhí)行時間
...
}
MessageQueue 與 Message 關(guān)系圖
MessageQueue里面維護(hù)的是Message鏈表。
在Java層投遞Message
public final class MessageQueue {
boolean enqueueMessage(Message msg, long when) {
......
boolean needWake;
synchronized (this) {
if (mQuitting) {
msg.recycle();
return false;
}
......
msg.when = when;
Message p = mMessages;
if (p == null || when == 0 || when < p.when) {
// 如果p為空糊啡,表明消息隊列中沒有消息拄查,那么msg將是第一個消息,needwake需要根據(jù)mBlocked的
//情況考慮是否觸發(fā)棚蓄。
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
//如果p不為空堕扶,表明消息隊列中還有剩余的消息碍脏,需要將新的msg添加到對應(yīng)的位置。
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
//從消息隊列中取出消息稍算,判斷該消息的觸發(fā)時間典尾,與要新添加的Message的時間對比。
//如果新添加的Message時間小于 當(dāng)前消息隊列中獲取的Message的時間糊探,則直接break钾埂。
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
//將新添加的Message添加到該隊列的指定位置。如下圖所示
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
// We can assume mPtr != 0 because mQuitting is false.
//最后根據(jù)情況判斷是否需要喚醒
if (needWake) {
// 調(diào)用nativeWake科平,以觸發(fā)nativePollOnce函數(shù)結(jié)束等待褥紫。
nativeWake(mPtr);
}
}
return true;
}
}
在Java層提取Message
public final class MessageQueue {
Message next() {
......
final long ptr = mPtr;
if (ptr == 0) {
return null;
}
int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
// ptr 保存了 NativeMessageQueue的指針,調(diào)用nativePollOnce進(jìn)行等待
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) {
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
if (msg != null && msg.target == null) {
// 嘗試先從消息隊列中尋找異步消息
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
if (now < msg.when) {
// Next message is not ready. Set a timeout to wake up when it is ready.
// 從消息隊列中拿到的Message的執(zhí)行時間瞪慧,比當(dāng)前時間還后面髓考,則計算其差值,用于后面休眠弃酌。
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
//如果從消息隊列中獲取的Message小于當(dāng)前時間氨菇,則返回給Looper進(jìn)行派發(fā)和處理。
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;//斷開引用鏈妓湘,便于GC回收
msg.markInUse();
return msg;
}
} else {
// No more messages.
// 消息隊列中沒有更多的消息了查蓉。則進(jìn)行長時間休眠。
// -1代表長時間等待多柑。
nextPollTimeoutMillis = -1;
}
......
// 下面主要IdleHandler的相關(guān)邏輯
// If first time idle, then get the number of idlers to run.
// Idle handles only run if the queue is empty or if the first message
// in the queue (possibly a barrier) is due to be handled in the future.
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
if (pendingIdleHandlerCount <= 0) {
// No idle handlers to run. Loop and wait some more.
mBlocked = true;
continue;
}
if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}
// Run the idle handlers.
// We only ever reach this code block during the first iteration.
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // release the reference to the handler
boolean keep = false;
try {
//這里函數(shù)的返回值,覺得后續(xù)是否還會調(diào)用該IdleHandler的方法楣责。
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}
// queueIdle 返回false的時候竣灌,就會將其從ArrayList隊列中移除掉,下次就不會再接收到函數(shù)調(diào)用秆麸。
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}
// Reset the idle handler count to 0 so we do not run them again.
pendingIdleHandlerCount = 0;
// While calling an idle handler, a new message could have been delivered
// so go back and look again for a pending message without waiting.
nextPollTimeoutMillis = 0;
}
}
}
nativeWake函數(shù)分析
nativeWake函數(shù)分析
frameworks/base/core/jni/android_os_MessageQueue.cpp
class NativeMessageQueue : public MessageQueue, public LooperCallback {
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->wake();
}
void NativeMessageQueue::wake() {
mLooper->wake();
}
}
system/core/libutils/Looper.cpp
void Looper::wake() {
......
uint64_t inc = 1;
//向管道的寫端寫入一個字符(舊版本是寫入一個"W"字符)初嘹。
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t)));
if (nWrite != sizeof(uint64_t)) {
......
}
}
//舊版
//父進(jìn)程、子進(jìn)程
void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ wake", this);
#endif
ssize_t nWrite;
do {
nWrite = write(mWakeWritePipeFd, "W", 1);
} while (nWrite == -1 && errno == EINTR);
if (nWrite != 1) {
if (errno != EAGAIN) {
ALOGW("Could not write wake signal, errno=%d", errno);
}
}
}
nativePollOnce函數(shù)分析
frameworks/base/core/jni/android_os_MessageQueue.cpp
class NativeMessageQueue : public MessageQueue, public LooperCallback {
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
......
mLooper->pollOnce(timeoutMillis);
......
}
}
system/core/libutils/Looper.cpp
inline int pollOnce(int timeoutMillis) {
return pollOnce(timeoutMillis, nullptr, nullptr, nullptr);
}
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData);
timeoutMillis:超時等待時間沮趣。
如果為-1屯烦,則表示無限等待,直到有事件發(fā)生為止房铭。
如果值為0驻龟,則無需等待立即返回。
outFd:用來存儲發(fā)生事件的那個文件描述符缸匪。
outEvents:用來存儲在該文件描述符上發(fā)生了哪些事件翁狐,目前支持可讀、可寫凌蔬、錯誤和中斷4個事件露懒。(從epoll事件對應(yīng)而來)
outData:用來存儲上下文數(shù)據(jù)闯冷,這個上下文數(shù)據(jù)是由用戶在添加監(jiān)聽句柄時傳遞的,它的作用和 pthread_create 函數(shù)的最后
一個參數(shù) parame 一樣懈词,用來傳遞用戶數(shù)據(jù)蛇耀。
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) { //無限循環(huán)
// mResponses 為 Vector數(shù)據(jù)結(jié)構(gòu)。
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
if (ident >= 0) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
if (outFd != nullptr) *outFd = fd;
if (outEvents != nullptr) *outEvents = events;
if (outData != nullptr) *outData = data;
return ident;
}
}
if (result != 0) {
if (outFd != nullptr) *outFd = 0;
if (outEvents != nullptr) *outEvents = 0;
if (outData != nullptr) *outData = nullptr;
return result;
}
result = pollInner(timeoutMillis);
}
}
int Looper::pollInner(int timeoutMillis) {
......
調(diào)用epoll函數(shù)等待坎弯,查看epoll函數(shù)返回結(jié)果;
超時纺涤、錯誤、有事件發(fā)生;
如果是管道讀端有事件荞怒,則認(rèn)為是控制命令洒琢,可以直接讀取管道中的數(shù)據(jù);(eventfd)
如果是其他fd發(fā)生事件,則根據(jù)Request構(gòu)造Response褐桌,并push到Response數(shù)組中衰抑。
真正處理事件,首先處理Native的Message荧嵌。調(diào)用Native Handler 的 handleMessage處理該Message呛踊。
處理Response數(shù)組中帶有callback的事件。
}
Handler啦撮、Looper谭网、MessageQueue、Thread關(guān)系
<meta name="source" content="lake">- 一個線程里面有一個Looper赃春。
- 一個Looper對應(yīng)一個MessageQueue愉择。
- 一個Looper可能對應(yīng)多個Handler。
- 相同的Looper的Handler發(fā)消息织中,都發(fā)送到同一個MessageQueue锥涕。
- MessageQueue處理完成后,分發(fā)消息會根據(jù)target字段狭吼,找到Handler的引用层坠,并完成分發(fā)工作。
甜點
1. IdelHandler
public static interface IdleHandler {
boolean queueIdle();
}
當(dāng)一個線程準(zhǔn)備等待更多消息時刁笙,即其將消息分發(fā)完之后就會回調(diào)這個接口破花。
queueIdle返回false代表一次性,返回true可能有多次回調(diào)疲吸。
添加IdleHandler
public final class MessageQueue {
public void addIdleHandler(@NonNull IdleHandler handler) {
if (handler == null) {
throw new NullPointerException("Can't add a null IdleHandler");
}
synchronized (this) {
mIdleHandlers.add(handler);
}
}
}
2. 消息屏障
<meta name="source" content="lake">Message類型分為3種類型:
- normal 普通消息
- barrier 消息屏障
- async 異步消息
插入消息屏障
public final class MessageQueue {
public int postSyncBarrier() {
return postSyncBarrier(SystemClock.uptimeMillis());
}
private int postSyncBarrier(long when) {
// 按照時間排序座每,將這個msg插入到消息隊列中
synchronized (this) {
final int token = mNextBarrierToken++;
final Message msg = Message.obtain();
msg.markInUse();
msg.when = when;
msg.arg1 = token;
Message prev = null;
Message p = mMessages;
if (when != 0) {
while (p != null && p.when <= when) {
prev = p;
p = p.next;
}
}
if (prev != null) { // invariant: p == prev.next
msg.next = p;
prev.next = msg;
} else {
msg.next = p;
mMessages = msg;
}
return token;
}
}
}
消息屏障有6點注意事項!
- 沒有設(shè)置target摘悴。
- 消息屏障也是帶著時間戳的尺栖,也是按時間來進(jìn)行排序,只影響后面他后面的消息烦租。
- 消息隊列是可以插入多個消息屏障的延赌。
- 插入到消息隊列的時候除盏,是沒有喚醒線程的。
- 插入消息屏障的時候會返回一個token挫以,后續(xù)撤除這個消息屏障者蠕,需要使用token去消息隊列中查找。
- 消息屏障沒有對外開放掐松,要使用需要利用反射機(jī)制踱侣。
移除消息屏障
public final class MessageQueue {
//根據(jù)token移除消息屏障。
public void removeSyncBarrier(int token) {
// Remove a sync barrier token from the queue.
// If the queue is no longer stalled by a barrier then wake it.
synchronized (this) {
Message prev = null;
Message p = mMessages;
while (p != null && (p.target != null || p.arg1 != token)) {
prev = p;
p = p.next;
}
if (p == null) {
throw new IllegalStateException("The specified message queue synchronization "
+ " barrier token has not been posted or has already been removed.");
}
final boolean needWake;
if (prev != null) {
prev.next = p.next;
needWake = false;
} else {
mMessages = p.next;
needWake = mMessages == null || mMessages.target != null;
}
p.recycleUnchecked();
// If the loop is quitting then it is already awake.
// We can assume mPtr != 0 when mQuitting is false.
if (needWake && !mQuitting) {
nativeWake(mPtr);
}
}
}
}
處理屏障
next方法里面處理大磺,具體邏輯為:
public final class MessageQueue {
Message next() {
for (;;) {
nativePollOnce(ptr, nextPollTimeoutMillis);
......
如果第一條消息是就是屏障抡句,就往后面遍歷查看是否有異步消息。
如果沒有杠愧,就無限休眠待榔,等待被別人喚醒。
如果有流济,就看這個消息觸發(fā)時間還有多長锐锣,設(shè)置一個超時,繼續(xù)休眠绳瘟。
}
}
}
- 如果將消息插入到隊列頭部雕憔,不受消息屏障的影響,喚醒線程糖声,去處理消息斤彼。
- 如果沒有插入到隊列頭部芯丧,并且頭部是消息屏障船老,此時插入普通消息不喚醒消息隊列。
- 如果此時插入的是異步消息莹弊,并且插入的位置前面沒有其他異步消息蟋恬,則進(jìn)行隊列的喚醒翁潘。
Android中內(nèi)存屏障使用例子
總結(jié)
- Java層提供了Looper類和MessageQueue類趁冈,其中Looper類提供循環(huán)處理消息的機(jī)制,以及插入渗勘、刪除和提取消息的函數(shù)接口沐绒。
- Handler常用于用戶與MessageQueue處理相關(guān)。
- MessageQueue內(nèi)部通過mPtr變量保存了一個Native層的NativeMessageQueue對象旺坠。
- NativeMessageQueue保存了一個Native層的Looper對象乔遮,該Looper從ALooper派生,提供pollOnce 和addFd等函數(shù)取刃。
- Java層有Message類和Handler類蹋肮,而Native層也有對應(yīng)的Message類和MessageHandler抽象類出刷。