android-priority-jobqueue是一個后臺任務(wù)隊列框架,可以對任務(wù)進(jìn)行磁盤緩存馋嗜,當(dāng)網(wǎng)絡(luò)恢復(fù)連接的時候繼續(xù)執(zhí)行任務(wù)满力。
1. 介紹
1.1 優(yōu)點
- 便于解耦A(yù)pplication的業(yè)務(wù)邏輯作喘,讓你的代碼更加健壯厢蒜,易于重構(gòu)和測試减响。
- 不處理AsyncTask的生命周期。
- Job Queue關(guān)心優(yōu)先Jobs郭怪,檢測網(wǎng)絡(luò)連接,并行運行等刊橘。
- 可以延遲jobs鄙才。
- 分組jobs來確保串行執(zhí)行。
- 默認(rèn)情況下促绵,Job Queue監(jiān)控網(wǎng)絡(luò)連接(所以你不需要擔(dān)心)攒庵,當(dāng)設(shè)備處于離線狀態(tài),需要網(wǎng)絡(luò)的jobs不會運行败晴,直到網(wǎng)絡(luò)重新連接浓冒。
1.2 UML
1.3 主類
- JobManager:Job管理類,負(fù)責(zé)任務(wù)的添加尖坤、刪除等稳懒。
- MessageFactory:Message工廠類,負(fù)責(zé)創(chuàng)建相應(yīng)的Message慢味,包括AddJobMessage场梆、JobConsumerIdleMessage墅冷、RunJobMessage等。
- PriorityMessageQueue:優(yōu)先級Message隊列或油。
- MessageQueue:Message隊列接口寞忿,負(fù)責(zé)新增、停止顶岸、清空消息腔彰。
- SafeMessageQueue:非優(yōu)先級Message隊列。
- JobManagerThread:Job Runnable對象辖佣,輪詢消息隊列進(jìn)行處理霹抛。
- Scheduler:調(diào)度器,喚醒a(bǔ)pp或者JobManager凌简。
2. 基本用例
2.1 創(chuàng)建Job
public class PostTweetJob extends Job {
public static final int PRIORITY = 1;
private String text;
public PostTweetJob(String text) {
// requireNetwork,需要網(wǎng)絡(luò)連接
// persist雏搂,需要持久化
super(new Params(PRIORITY).requireNetwork().persist());
}
@Override
public void onAdded() {
// Job已經(jīng)被保存到磁盤里藕施,可以用來更新UI
}
@Override
public void onRun() throws Throwable {
// 在這里處理Job邏輯,例如網(wǎng)絡(luò)請求等凸郑,所有的工作就是異步完成
webservice.postTweet(text);
}
@Override
protected RetryConstraint shouldReRunOnThrowable(Throwable throwable, int runCount,
int maxRunCount) {
// 在onRun里發(fā)生異常處理
return RetryConstraint.createExponentialBackoff(runCount, 1000);
}
@Override
protected void onCancel(@CancelReason int cancelReason, @Nullable Throwable throwable) {
// Job被取消是調(diào)用
}
}
2.2 發(fā)送Job
//...
public void onSendClick() {
final String status = editText.getText().toString();
if(status.trim().length() > 0) {
jobManager.addJobInBackground(new PostTweetJob(status));
editText.setText("");
}
}
//...
3. 源碼分析
AndroidPriorityJobQueue細(xì)節(jié)非常多裳食,就不一一分析,在這里我主要帶大家一起看下Job是如何被添加到隊列里被執(zhí)行以及如果再網(wǎng)絡(luò)連接的時候繼續(xù)完成Job芙沥。其他部分可以自行查看诲祸。
3.1 流程圖
3.2 添加Job
我們先來看下如何添加Job到異步線程處理。
public void addJobInBackground(Job job) {
AddJobMessage message = messageFactory.obtain(AddJobMessage.class);
message.setJob(job);
messageQueue.post(message);
}
MessageFactory通過obtain創(chuàng)建AddJobMessage而昨,將Job設(shè)置到message里面救氯,然后通過messageQueue.post發(fā)送。
public <T extends Message> T obtain(Class<T> klass) {
final Type type = Type.mapping.get(klass);
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (type) {
Message message = pools[type.ordinal()];
if (message != null) {
pools[type.ordinal()] = message.next;
counts[type.ordinal()] -= 1;
message.next = null;
//noinspection unchecked
return (T) message;
}
try {
return klass.newInstance();
} catch (InstantiationException e) {
JqLog.e(e, "Cannot create an instance of " + klass + ". Make sure it has a empty" +
" constructor.");
} catch (IllegalAccessException e) {
JqLog.e(e, "Cannot create an instance of " + klass + ". Make sure it has a public" +
" empty constructor.");
}
}
return null;
}
創(chuàng)建AddJobMessage歌憨,如果pools緩存里面有該Message着憨,則使用,否則通過newInstance創(chuàng)建务嫡。
@Override
public void post(Message message) {
synchronized (LOCK) {
postJobTick = true;
int index = message.type.priority;
if (queues[index] == null) {
queues[index] = new UnsafeMessageQueue(factory, "queue_" + message.type.name());
}
queues[index].post(message);
timer.notifyObject(LOCK);
}
}
我們可以看到甲抖,queues是一個UnsafeMessageQueue數(shù)組,根據(jù)Message的優(yōu)先級進(jìn)行排列心铃,將message保存到UnsafeMessageQueue里面准谚,并且通知監(jiān)控該對象的線程。
3.4 執(zhí)行Job
@Override
public void run() {
messageQueue.consume(new MessageQueueConsumer() {
@Override
public void handleMessage(Message message) {
switch (message.type) {
case ADD_JOB:
handleAddJob((AddJobMessage) message);
break;
\\去除無關(guān)代碼
...
}
}
\\去除無關(guān)代碼
...
});
}
@Override
public void consume(MessageQueueConsumer consumer) {
if(running.getAndSet(true)) {
throw new IllegalStateException("only 1 consumer per MQ");
}
while (running.get()) {
Message message = next(consumer);
if (message != null) {
JqLog.d("[%s] consuming message of type %s", LOG_TAG, message.type);
consumer.handleMessage(message);
factory.release(message);
}
}
}
如果running已經(jīng)被設(shè)置成true去扣,則拋出異常柱衔,每一個MessageQueue只能存在一個consumer。獲取下一個Message,交給consumer進(jìn)行處理秀存,處理完進(jìn)行釋放捶码。
獲取下一個Message。
public Message next(MessageQueueConsumer consumer) {
boolean calledOnIdle = false;
while (running.get()) {
//暫時去除無關(guān)代碼
...
for (int i = Type.MAX_PRIORITY; i >= 0; i--) {
UnsafeMessageQueue mq = queues[i];
if (mq == null) {
continue;
}
Message message = mq.next();
if (message != null) {
return message;
}
}
//暫時去除無關(guān)代碼
...
}
return null;
}
因為Message類型是ADD_JOB或链,執(zhí)行handleAddJob惫恼。
private void handleAddJob(AddJobMessage message) {
//暫時去除無用代碼
...
final boolean insert = oldJob == null || consumerManager.isJobRunning(oldJob.getId());
if (insert) {
JobQueue queue = job.isPersistent() ? persistentJobQueue : nonPersistentJobQueue;
if (oldJob != null) { //the other job was running, will be cancelled if it fails
consumerManager.markJobsCancelledSingleId(TagConstraint.ANY, new String[]{job.getSingleInstanceId()});
queue.substitute(jobHolder, oldJob);
} else {
queue.insert(jobHolder);
}
} else {
JqLog.d("another job with same singleId: %s was already queued", job.getSingleInstanceId());
}
jobHolder.getJob().onAdded();
//暫時去除無用代碼
...
}
如果要求持久化,則進(jìn)行數(shù)據(jù)庫保存澳盐,否則進(jìn)行內(nèi)存緩存祈纯。回調(diào)Job的onAdded
至此完成新增Job叼耙,但一直沒找到Job的onRun腕窥,別急,我們回頭再去看下next筛婉。
public Message next(MessageQueueConsumer consumer) {
boolean calledOnIdle = false;
while (running.get()) {
//暫時去除無關(guān)代碼
...
if (!calledOnIdle) {
consumer.onIdle();
calledOnIdle = true;
}
//暫時去除無關(guān)代碼
...
}
return null;
}
原來當(dāng)next找不到下一個Message時簇爆,會通知consumer,目前處于閑置狀態(tài)爽撒。
final MessageQueueConsumer queueConsumer = new MessageQueueConsumer() {
//暫時去除無用代碼
...
@Override
public void onIdle() {
JqLog.d("consumer manager on idle");
JobConsumerIdleMessage idle = factory.obtain(JobConsumerIdleMessage.class);
idle.setWorker(Consumer.this);
idle.setLastJobCompleted(lastJobCompleted);
parentMessageQueue.post(idle);
}
};
發(fā)出JobConsumerIdleMessage消息入蛆,在JobManagerThread線程里進(jìn)行處理。
@Override
public void run() {
messageQueue.consume(new MessageQueueConsumer() {
@Override
public void handleMessage(Message message) {
switch (message.type) {
case JOB_CONSUMER_IDLE:
boolean busy = consumerManager.handleIdle((JobConsumerIdleMessage) message);
if (!busy) {
invokeSchedulersIfIdle();
}
break;
\\去除無關(guān)代碼
...
}
}
\\去除無關(guān)代碼
...
});
}
boolean handleIdle(@NonNull JobConsumerIdleMessage message) {
//暫時去除無用代碼
...
if (nextJob != null) {
consumer.hasJob = true;
runningJobGroups.add(nextJob.getGroupId());
RunJobMessage runJobMessage = factory.obtain(RunJobMessage.class);
runJobMessage.setJobHolder(nextJob);
runningJobHolders.put(nextJob.getJob().getId(), nextJob);
if (nextJob.getGroupId() != null) {
runningJobGroups.add(nextJob.getGroupId());
}
consumer.messageQueue.post(runJobMessage);
return true;
} else {
//暫時去除無用代碼
...
}
return false;
}
}
發(fā)出RunJobMessage消息硕勿。
final MessageQueueConsumer queueConsumer = new MessageQueueConsumer() {
@Override
public void handleMessage(Message message) {
switch (message.type) {
case RUN_JOB:
handleRunJob((RunJobMessage) message);
lastJobCompleted = timer.nanoTime();
removePokeMessages();
break;
//暫時去除無用代碼
...
}
}
//暫時去除無用代碼
...
};
private void handleRunJob(RunJobMessage message) {
JqLog.d("running job %s", message.getJobHolder().getClass().getSimpleName());
JobHolder jobHolder = message.getJobHolder();
//運行Job
int result = jobHolder.safeRun(jobHolder.getRunCount(), timer);
RunJobResultMessage resultMessage = factory.obtain(RunJobResultMessage.class);
resultMessage.setJobHolder(jobHolder);
resultMessage.setResult(result);
resultMessage.setWorker(this);
parentMessageQueue.post(resultMessage);
}
int safeRun(int currentRunCount, Timer timer) {
return job.safeRun(this, currentRunCount, timer);
}
final int safeRun(JobHolder holder, int currentRunCount, Timer timer) {
//暫時去除無用代碼
...
try {
onRun();
} catch (Throwable t) {
//暫時去除無用代碼
...
}
if (!failed) {
return JobHolder.RUN_RESULT_SUCCESS;
}
if (holder.isCancelledSingleId()) {
return JobHolder.RUN_RESULT_FAIL_SINGLE_ID;
}
if (holder.isCancelled()) {
return JobHolder.RUN_RESULT_FAIL_FOR_CANCEL;
}
if (reRun) {
return JobHolder.RUN_RESULT_TRY_AGAIN;
}
if (cancelForDeadline) {
return JobHolder.RUN_RESULT_HIT_DEADLINE;
}
if (currentRunCount < getRetryLimit()) {
holder.setThrowable(throwable);
return JobHolder.RUN_RESULT_FAIL_SHOULD_RE_RUN;
} else {
holder.setThrowable(throwable);
return JobHolder.RUN_RESULT_FAIL_RUN_LIMIT;
}
}
private void handleRunJob(RunJobMessage message) {
JobHolder jobHolder = message.getJobHolder();
int result = jobHolder.safeRun(jobHolder.getRunCount(), timer);
RunJobResultMessage resultMessage = factory.obtain(RunJobResultMessage.class);
resultMessage.setJobHolder(jobHolder);
resultMessage.setResult(result);
resultMessage.setWorker(this);
parentMessageQueue.post(resultMessage);
}
如果運行成功哨毁,則返回RUN_RESULT_SUCCESS,如果失敗源武,分別返回失敗原因為被取消,需要再次運行粱栖,運行次數(shù)超過限制等,發(fā)出RunJobResultMessage消息闹究。
private void handleRunJobResult(RunJobResultMessage message) {
final int result = message.getResult();
final JobHolder jobHolder = message.getJobHolder();
callbackManager.notifyOnRun(jobHolder.getJob(), result);
RetryConstraint retryConstraint = null;
switch (result) {
case JobHolder.RUN_RESULT_SUCCESS:
removeJob(jobHolder);
break;
//暫時去除無用代碼
...
case JobHolder.RUN_RESULT_TRY_AGAIN:
retryConstraint = jobHolder.getRetryConstraint();
insertOrReplace(jobHolder);
break;
//暫時去除無用代碼
...
}
consumerManager.handleRunJobResult(message, jobHolder, retryConstraint);
callbackManager.notifyAfterRun(jobHolder.getJob(), result);
//暫時去除無用代碼
...
}
如果執(zhí)行Job成功,則移除該Job。至此我們大致分析完整個Job從新增到執(zhí)行的全部流程叛买。
3.5 監(jiān)控網(wǎng)絡(luò)連接
當(dāng)網(wǎng)絡(luò)連接時繼續(xù)執(zhí)行Job,具體實現(xiàn)在NetworkUtilImpl里率挣。
public NetworkUtilImpl(Context context) {
context = context.getApplicationContext();、
//如果SDK>=21
if (VERSION.SDK_INT >= Build.VERSION_CODES.LOLLIPOP) {
if (VERSION.SDK_INT >= Build.VERSION_CODES.M) {
//如果SDK>=23,則監(jiān)聽IDLE模式
listenForIdle(context);
}
//監(jiān)聽網(wǎng)絡(luò)連接狀態(tài)
listenNetworkViaConnectivityManager(context);
} else {
context.registerReceiver(new BroadcastReceiver() {
@Override
public void onReceive(Context context, Intent intent) {
//處理網(wǎng)絡(luò)狀態(tài)改變
dispatchNetworkChange(context);
}
}, getNetworkIntentFilter());
}
}
private void listenForIdle(Context context) {
context.registerReceiver(new BroadcastReceiver() {
@Override
public void onReceive(Context context, Intent intent) {
dispatchNetworkChange(context);
}
}, new IntentFilter(PowerManager.ACTION_DEVICE_IDLE_MODE_CHANGED));
}
private void listenNetworkViaConnectivityManager(final Context context) {
ConnectivityManager cm = (ConnectivityManager) context
.getSystemService(Context.CONNECTIVITY_SERVICE);
NetworkRequest request = new NetworkRequest.Builder()
.addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)
.addCapability(NetworkCapabilities.NET_CAPABILITY_NOT_RESTRICTED)
.build();
cm.registerNetworkCallback(request, new ConnectivityManager.NetworkCallback() {
@Override
public void onAvailable(Network network) {
dispatchNetworkChange(context);
}
});
}
void dispatchNetworkChange(Context context) {
if(listener == null) {//shall not be but just be safe
return;
}
listener.onNetworkChange(getNetworkStatus(context));
}
一旦網(wǎng)絡(luò)連接捶箱,則回調(diào)到JobManagerThread智什。
@Override
public void onNetworkChange(@NetworkUtil.NetworkStatus int networkStatus) {
ConstraintChangeMessage constraint = messageFactory.obtain(ConstraintChangeMessage.class);
messageQueue.post(constraint);
}
發(fā)送ConstraintChangeMessage消息,回調(diào)到JobManagerThread丁屎。
@Override
public void run() {
messageQueue.consume(new MessageQueueConsumer() {
@Override
public void handleMessage(Message message) {
switch (message.type) {
//暫時去除無用代碼
...
case CONSTRAINT_CHANGE:
consumerManager.handleConstraintChange();
break;
//暫時去除無用代碼
...
}
}
//暫時去除無用代碼
...
});
}
void handleConstraintChange() {
considerAddingConsumers(true);
}
private void considerAddingConsumers(boolean pokeAllWaiting) {
//暫時去除無用代碼
...
boolean isAboveLoadFactor = isAboveLoadFactor();
if (isAboveLoadFactor) {
addWorker();
}
}
private void addWorker() {
//新增Consumer
Consumer consumer = new Consumer(jobManagerThread.messageQueue,
new SafeMessageQueue(timer, factory, "consumer"), factory, timer);
final Thread thread;
if (threadFactory != null) {
thread = threadFactory.newThread(consumer);
} else {
thread = new Thread(threadGroup, consumer, "job-queue-worker-" + UUID.randomUUID());
thread.setPriority(threadPriority);
}
consumers.add(consumer);
thread.start();
}
在Consumer里執(zhí)行荠锭。
@Override
public void run() {
messageQueue.consume(queueConsumer);
}
至此,又開始繼續(xù)執(zhí)行Jobs晨川。
4. 設(shè)計之美
AndroidPriorityJobQueue設(shè)計了2個消息隊列证九,一個是存在優(yōu)先級,用來保存新增Job共虑、處理空閑狀態(tài)愧怜、處理Job結(jié)果、取消等消息妈拌,一個不存在優(yōu)先級拥坛,用來保存運行Job的消息,分別用2個線程來對于處理這2個隊列尘分,這樣使處理Job的線程職責(zé)更加清晰猜惋。目前看到里面用到了3中設(shè)計模式,分別是工廠模式音诫、代理模式和Builder模式。
4.1 工廠模式
MessageFactory
public <T extends Message> T obtain(Class<T> klass) {
final Type type = Type.mapping.get(klass);
synchronized (type) {
Message message = pools[type.ordinal()];
if (message != null) {
pools[type.ordinal()] = message.next;
counts[type.ordinal()] -= 1;
message.next = null;
return (T) message;
}
try {
return klass.newInstance();
} catch (InstantiationException e) {
JqLog.e(e, "Cannot create an instance of " + klass + ". Make sure it has a empty" +
" constructor.");
} catch (IllegalAccessException e) {
JqLog.e(e, "Cannot create an instance of " + klass + ". Make sure it has a public" +
" empty constructor.");
}
}
return null;
}
通過newInstance來創(chuàng)建對象竭钝,值得一提的是,當(dāng)消息處理完成香罐,會通過release釋放Message,保存到pools庇茫,這樣下次創(chuàng)建Message時,如果存在則直接使用旦签。
DefaultQueueFactory
@Override
public JobQueue createPersistentQueue(Configuration configuration, long sessionId) {
return new CachedJobQueue(new SqliteJobQueue(configuration, sessionId, jobSerializer));
}
@Override
public JobQueue createNonPersistent(Configuration configuration, long sessionId) {
return new CachedJobQueue(new SimpleInMemoryPriorityQueue(configuration, sessionId));
}
4.2 代理模式
CachedJobQueue
public class CachedJobQueue implements JobQueue {
private JobQueue delegate;
private Integer cachedCount;
public CachedJobQueue(JobQueue delegate) {
this.delegate = delegate;
}
@Override
public boolean insert(@NonNull JobHolder jobHolder) {
invalidateCache();
return delegate.insert(jobHolder);
}
private void invalidateCache() {
cachedCount = null;
}
//省略代碼
@Override
public int count() {
if(cachedCount == null) {
cachedCount = delegate.count();
}
return cachedCount;
}
private boolean isEmpty() {
return cachedCount != null && cachedCount == 0;
}
@Override
public int countReadyJobs(@NonNull Constraint constraint) {
if (isEmpty()) {
return 0;
}
return delegate.countReadyJobs(constraint);
}
@Override
public JobHolder nextJobAndIncRunCount(@NonNull Constraint constraint) {
if(isEmpty()) {
return null;//we know we are empty, no need for querying
}
JobHolder holder = delegate.nextJobAndIncRunCount(constraint);
if (holder != null && cachedCount != null) {
cachedCount -= 1;
}
return holder;
}
//省略代碼
}
該代理模式的設(shè)計主要是為了緩存等待Jobs的數(shù)量。
4.3 Builder模式
Configuration
public class Configuration {
//省略代碼
@Nullable
public ThreadFactory getThreadFactory() {
return threadFactory;
}
@SuppressWarnings("unused")
public static final class Builder {
private Configuration configuration;
public Builder(@NonNull Context context) {
this.configuration = new Configuration();
this.configuration.appContext = context.getApplicationContext();
}
//省略代碼
@NonNull
public Builder threadFactory(@Nullable final ThreadFactory threadFactory) {
configuration.threadFactory = threadFactory;
return this;
}
@NonNull
public Configuration build() {
if(configuration.queueFactory == null) {
configuration.queueFactory = new DefaultQueueFactory();
}
if(configuration.networkUtil == null) {
configuration.networkUtil = new NetworkUtilImpl(configuration.appContext);
}
if (configuration.timer == null) {
configuration.timer = new SystemTimer();
}
return configuration;
}
}
}
我們可以看到配置類很適合用Builder模式來設(shè)計宁炫,對配置項和使用類進(jìn)行解耦偿曙,便于配置項的拓展。
5. 總結(jié)
AndroidPriorityJobQueue從13年開源到現(xiàn)在望忆,經(jīng)歷了2個大的版本,完整的看下來也花費了不少時間稿壁,看到了一些設(shè)計后臺任務(wù)隊列框架的思路傅是,特別是對任務(wù)隊列進(jìn)行分組確保串行落午,對網(wǎng)絡(luò)恢復(fù)時繼續(xù)進(jìn)行Job等設(shè)計溃斋,可能理解的還不夠深入吸申,有錯誤的地方還大家望指正截碴。
6. 參考資料
AndroidPriorityJobQueue Github
可以隨意轉(zhuǎn)發(fā)日丹,也歡迎關(guān)注我的簡書,我會堅持給大家?guī)矸窒怼?/p>