前言
在開發(fā)中如果碰到需要執(zhí)行一些耗時比較長的任務渤早,但是又要保證任務不能丟失神妹,比如執(zhí)行過程中由于某種原因app發(fā)生了crash骑脱,需要在下次app啟動后的適當時機重新執(zhí)行任務的情況痴柔,該如何解決裳涛?
解決方案
- 假設我們需要在后臺執(zhí)行的任務類型有上傳圖片的任務A,發(fā)送聊天消息的任務B迁筛,視頻編輯處理的任務C煤蚌,所有耗時比較久并且重要性很高,需要保證不能丟失的任務都可以抽象為一個TaskModel。這個TaskModel保留了任務執(zhí)行需要的基本信息铺然,以及用于隊列管理的必要屬性俗孝。方案的主要思路就是維護一個用于管理TaskModel的串行任務隊列TaskQueueService(后面簡稱為TaskQueue)酒甸,這個TaskQueue結(jié)合數(shù)據(jù)庫對外提供管理TaskModel的各類方法魄健,比如AddTask等,在一個任務需要被執(zhí)行時插勤,就會通過TaskQueue的add方法添加到隊列中沽瘦,并且同時將TaskModel持久化到數(shù)據(jù)庫中,保證任務不會丟失农尖。然后TaskQueue根據(jù)自己的某種規(guī)則從數(shù)據(jù)庫中取出TaskModel析恋,對任務進行調(diào)度執(zhí)行。
類結(jié)構(gòu)說明
-
我們抽象出任務的基類為TaskModel盛卡,需要執(zhí)行的具體任務類型A助隧、B、C為繼承TaskModel的類TaskModelA滑沧、TaskModelB并村、TaskModelC,然后在各自的類中添加業(yè)務需要的基本信息滓技,以及根據(jù)需要重寫基類的方法哩牍。TaskModel結(jié)構(gòu)如下類圖所示:
TaskModel.png
taskID為自增Id,當task被添加到隊列令漂,寫入數(shù)據(jù)庫中時會自動加1膝昆。
status表示任務執(zhí)行的狀態(tài),是一個枚舉值叠必,枚舉類型分為init荚孵、running、suspend纬朝、finish收叶、fail、remove六種狀態(tài)玄组,在taskmodel剛被寫入數(shù)據(jù)中時是init的狀態(tài)滔驾,在taskModel執(zhí)行失敗后會將狀態(tài)置為fail。
priority表示Task的優(yōu)先級俄讹,默認值為low哆致,在taskQueue中可以根據(jù)task的優(yōu)先級對高優(yōu)先級的任務優(yōu)先調(diào)度執(zhí)行。
customID用于對task做某種標記患膛,方便從數(shù)據(jù)庫查詢摊阀。
className存儲具體任務的類型如TaskModelA。
data內(nèi)存儲TaskModel歸檔后的data數(shù)據(jù)
runCount用于存儲task執(zhí)行的次數(shù),可以用于控制重試次數(shù)
run()方法是子類繼承TaskModel后必須實現(xiàn)的方法胞此,里面寫任務執(zhí)行的邏輯
prepareForAddToQueue()方法是在run之前會執(zhí)行的方法臣咖,如果重寫了該方法,那么在taskQueue執(zhí)行task之前會先調(diào)用prepare方法漱牵,返回為yes才會繼續(xù)執(zhí)行task的run方法夺蛇,prepare方法可以用于對task的一些校驗。
retryNextTime()方法通過調(diào)用taskQueue的retryTask方法將自身的status重置為init狀態(tài)酣胀,那么在taskQueue執(zhí)行接下來的任務時刁赦,就會重新執(zhí)行到該task。
suspend()方法是將自身的status改為suspend掛起狀態(tài)闻镶,那么taskQueue在執(zhí)行接下來的task時甚脉,由于只會取status為init的task執(zhí)行,就不會執(zhí)行到suspend狀態(tài)的task铆农。當需要重新執(zhí)行已掛起的task時牺氨,調(diào)用retry方法就可以重新將該task添加到執(zhí)行隊列中。
-
TaskQueueService是我們維護的任務隊列墩剖,它是一個單例對象猴凹。TaskQueueService的結(jié)構(gòu)如下所示:
TaskQueueService.png
taskSignal用于在task執(zhí)行改變狀態(tài)時對外發(fā)送信號,在對應的controller中可以通過taskSignal傳出的task做一些ui或者業(yè)務邏輯涛碑。
runningTask表示當前隊列正在執(zhí)行的taskModel
suspendTasks保存了所有被掛起的taskModel
- runNextTask()方法為TaskQueue最核心的方法精堕,該方法中用異步執(zhí)行通過某種規(guī)則從數(shù)據(jù)庫中取出的最合適的taskModel。
架構(gòu)圖
- 如上圖所示蒲障,假設所有的task都為相同優(yōu)先級歹篓,TaskQueue的runNextTask方法中取從數(shù)據(jù)庫篩選出來status為init狀態(tài)并且根據(jù)taskId排序,拿到最先進入的一個task去執(zhí)行揉阎。在task執(zhí)行完成后庄撮,又會觸發(fā)runNextTask方法,從而繼續(xù)從數(shù)據(jù)庫讀取下一個最合適的taskModel去執(zhí)行任務毙籽。
實現(xiàn)
TaskModel
- 首先TaskModel是需要存儲在數(shù)據(jù)庫中的洞斯,結(jié)合上一章WCDB的使用,我們將TaskModel繼承自RSModel類坑赡,以支持數(shù)據(jù)庫寫入烙如。TaskModel的.h文件十分簡潔,主要是一些關鍵屬性的暴露和task操作方法的抽象:
#import "RSModel.h"
typedef NS_ENUM(NSInteger ,RSTaskQueueTaskModelStatus) {
RSTaskQueueTaskModelStatusInit,
RSTaskQueueTaskModelStatusRunning,
RSTaskQueueTaskModelStatusSuspend,
RSTaskQueueTaskModelStatusFinish,
RSTaskQueueTaskModelStatusFail,
RSTaskQueueTaskModelStatusRemove,
};
typedef NS_ENUM(NSInteger ,RSTaskQueueTaskModelPriority) {
RSTaskQueueTaskModelPriorityLow,
RSTaskQueueTaskModelPriorityMiddle,
RSTaskQueueTaskModelPriorityHigh,
};
@interface RSTaskQueueTaskModel : RSModel
@property (nonatomic, assign) NSInteger taskId;
@property (nonatomic, assign) RSTaskQueueTaskModelStatus status;
@property (nonatomic, assign) RSTaskQueueTaskModelPriority priority;
@property (nonatomic, strong) NSString *customId;
@property (nonatomic, strong) NSString *customType;
@property (nonatomic, strong) NSString *className;
@property (nonatomic, strong) NSData *data;
@property (nonatomic, assign) NSInteger runCount;
-(void)run;//任務運行主入口毅否,子類需要實現(xiàn)它
-(void)suspend;//執(zhí)行異步操作時調(diào)用亚铁,可以掛起當前任務,防止任務隊列被阻塞
-(void)pop;//任務執(zhí)行成功需要顯式調(diào)用螟加,將任務從隊列中移除
-(void)fail;//任務執(zhí)行失敗需要顯式調(diào)用, 修改任務的狀態(tài)
-(void)retryNextTime;//重新入隊徘溢,等待重試
-(BOOL)prepareForAddToQueue;//任務被插入任務隊列前吞琐,自動調(diào)用,子類可以重載它然爆。返回失敗則任務不用被加入任務隊列
-(BOOL)remove;//從任務隊列中移除任務
@end
TaskModel的.m文件需要定義類文件中綁定到數(shù)據(jù)庫表的字段以及主鍵的設置站粟、默認值的設置以及約束等。
#import "RSTaskQueueTaskModel+WCTTableCoding.h"
#import "RSTaskQueueTaskModel.h"
#import <WCDB/WCDB.h>
#import "RSDBService.h"
#import "RSTaskQueueService.h"
#import <YYModel.h>
@implementation RSTaskQueueTaskModel
WCDB_IMPLEMENTATION(RSTaskQueueTaskModel)
WCDB_SYNTHESIZE(RSTaskQueueTaskModel, taskId)
WCDB_SYNTHESIZE(RSTaskQueueTaskModel, customId)
WCDB_SYNTHESIZE(RSTaskQueueTaskModel, customType)
WCDB_SYNTHESIZE(RSTaskQueueTaskModel, className)
WCDB_SYNTHESIZE(RSTaskQueueTaskModel, data)
WCDB_SYNTHESIZE(RSTaskQueueTaskModel, runCount)
WCDB_SYNTHESIZE_DEFAULT(RSTaskQueueTaskModel, status, RSTaskQueueTaskModelStatusInit)
WCDB_SYNTHESIZE_DEFAULT(RSTaskQueueTaskModel, priority, RSTaskQueueTaskModelPriorityLow)
WCDB_PRIMARY_ASC_AUTO_INCREMENT(RSTaskQueueTaskModel, taskId)
//設置status和priority的默認值曾雕,以及taskId自增
-(instancetype)init {
self = [super init];
if (self) {
static dispatch_once_t token;
dispatch_once(&token, ^{
[RSTaskQueueTaskModel createDBTable];
});
self.isAutoIncrement = YES;
self.runCount = 0;
}
return self;
}
-(instancetype)initWithCoder:(NSCoder *)aDecoder {
self = [super initWithCoder:aDecoder];
if (self) {
}
return self;
}
+(void)createDBTable {
if ([[RSDBService db] createTableAndIndexesOfName:NSStringFromClass([RSTaskQueueTaskModel class]) withClass:[RSTaskQueueTaskModel class]]) {
NSLog(@"creat table RSTaskQueueTaskModel success");
} else {
NSLog(@"creat table RSTaskQueueTaskModel fail");
}
}
-(BOOL)prepareForAddToQueue {
self.className = NSStringFromClass([self class]);
self.data = [self yy_modelToJSONData];
//這里使用YYModel將model轉(zhuǎn)換為data存入數(shù)據(jù)庫
return YES;
}
-(void)run {
}
-(void)suspend {
[[RSTaskQueueService shareInstance] suspendTask:self];
}
-(void)pop {
[[RSTaskQueueService shareInstance] popTask:self];
}
-(void)retryNextTime {
[[RSTaskQueueService shareInstance] retryTask:self];
}
-(void)fail {
[[RSTaskQueueService shareInstance] failTask:self];
}
-(BOOL)remove {
return [[RSTaskQueueService shareInstance] removeTask:self];
}
//以上的操作方法實現(xiàn)實際上就是調(diào)用TaskQueue的方法奴烙,傳入?yún)?shù)為本身,所以重點還是在TaskQueue對任務的調(diào)度翻默。TaskModel只是一個支持寫入數(shù)據(jù)庫的對Task抽象的Model
@end
TaskQueueService
TaskQueueService的.h基本和類圖中的屬性方法一致缸沃,按照實際需求多寫了幾個從數(shù)據(jù)庫查詢taskModel的接口。
#import <Foundation/Foundation.h>
#import "RSTaskQueueTaskModel.h"
@interface RSTaskQueueService : NSObject
@property (nonatomic, strong) RACSubject *taskSignal;
@property (nonatomic, strong) RSTaskQueueTaskModel *runingTask;
@property (nonatomic, strong) NSMutableArray<RSTaskQueueTaskModel *> *suspendTasks;
+(RSTaskQueueService *)shareInstance;
-(BOOL)addTask:(RSTaskQueueTaskModel *)task;
-(BOOL)retryTask:(RSTaskQueueTaskModel *)task;
-(void)runNextTask;
-(void)popTask:(RSTaskQueueTaskModel *)task;
-(BOOL)failTask:(RSTaskQueueTaskModel *)task;
-(BOOL)removeTask:(RSTaskQueueTaskModel *)task;
//-(void)finishTaskWaitForNextTime:(RSTaskQueueTaskModel *)task;
-(BOOL)suspendTask:(RSTaskQueueTaskModel *)task;
-(void)resetAllTasks;
-(void)retryAllFailTasks;
- (BOOL)deleteTaskWithClassName:(NSString *)className customId:(NSString*)customId;
-(NSArray<RSTaskQueueTaskModel *>*)getTasksClassName:(NSString *)className customId:(NSString*)customId;
-(NSArray<RSTaskQueueTaskModel *>*)getTasksClassName:(NSString *)className status:(RSTaskQueueTaskModelStatus)status;
-(NSArray<RSTaskQueueTaskModel *>*)getTasksClassName:(NSString *)className;
-(BOOL)isAllTaskFinish;
@end
在.m中修械,需要實現(xiàn)taskQueue的單例初始化方法,通過調(diào)用[RSTaskQueueService shareInstance]來獲取taskQueue。
+(RSTaskQueueService *)shareInstance {
static dispatch_once_t once;
static id sharedInstance;
dispatch_once(&once, ^{
sharedInstance = [[RSTaskQueueService alloc] init];
});
return sharedInstance;
}
-(instancetype)init {
self = [super init];
if (self) {
self.runingTask = nil;
self.taskSignal = [RACSubject subject];
self.suspendTasks = [[NSMutableArray alloc] init];
}
return self;
}
在一個任務TaskModel需要被執(zhí)行是检盼,會調(diào)用TaskQueue的addTask方法肯污,如果該任務是符合執(zhí)行規(guī)定的,那么就會將任務寫入TaskModel數(shù)據(jù)庫吨枉,同時對外發(fā)送一個taskSignal信號蹦渣,以同步當前task的狀態(tài),然后調(diào)用runNextTask方法去做任務的調(diào)度執(zhí)行貌亭。
-(BOOL)addTask:(RSTaskQueueTaskModel *)task {
if ([task prepareForAddToQueue]) {
@synchronized(self) {
BOOL result = [[RSDBService db] insertObject:task into:NSStringFromClass([RSTaskQueueTaskModel class])];
if (result) {
[self.taskSignal sendNext:task];
[self runNextTask];
}
return result;
}
}
return NO;
}
在runNextTask方法中柬唯,首先是開辟了一個異步線程,然后判斷是否有task正在執(zhí)行圃庭,如果有的話就返回锄奢,沒有的話從數(shù)據(jù)庫中讀取最合適的taskModel來跑。如何判斷是否是最合適的篩選條件剧腻,也就是task調(diào)度的方法了拘央。在這里選取的是status為init并且taskId最小,也就是最先加入的任務來執(zhí)行书在。
-(void)runNextTask {
[RSUtils dispatch_async_background:^{
@synchronized(self) {
if (self.runingTask) {
//有任務在執(zhí)行中
return;
}
NSArray *tmp = [[RSDBService db] getObjectsOfClass:[RSTaskQueueTaskModel class] fromTable:NSStringFromClass([RSTaskQueueTaskModel class]) where:RSTaskQueueTaskModel.status==RSTaskQueueTaskModelStatusInit orderBy:RSTaskQueueTaskModel.taskId.order(WCTOrderedAscending)];
if ([tmp count] > 0) {
RSLogInfo(@"RSTaskQueueService task count:%ld", tmp.count);
RSTaskQueueTaskModel *task = [tmp firstObject];
Class clazz = NSClassFromString(task.className);
RSTaskQueueTaskModel * object = [[clazz alloc] init];
[object yy_modelSetWithJSON:task.data];
[object setTaskId:task.taskId];
[object setStatus:RSTaskQueueTaskModelStatusRunning];
object.runCount = object.runCount + 1;
BOOL dbResult = [[RSDBService db] updateRowsInTable:DBTableName(RSTaskQueueTaskModel) onProperties:{RSTaskQueueTaskModel.status, RSTaskQueueTaskModel.runCount} withObject:object where:RSTaskQueueTaskModel.taskId==object.taskId];
if (dbResult) {
self.runingTask = object;
[self.taskSignal sendNext:object];
RSLogInfo(@"RSTaskQueueService run taskId:%d", object.taskId);
[object run];
} else {
RSLogError(@"RSTaskQueueService update task status fail taskId:%d", object.taskId);
}
} else {
RSLogInfo(@"RSTaskQueueService is empty");
}
}
}];
}
resetAllTasks是將所有未完成的任務重置的方法灰伟,主要用于app初始化的時候調(diào)用,以重置之前未執(zhí)行成功的tasks儒旬。
-(void)resetAllTasks {
@synchronized(self) {
self.runingTask = nil;
[self.suspendTasks removeAllObjects];
RSTaskQueueTaskModel *task = [RSTaskQueueTaskModel new];
task.status = RSTaskQueueTaskModelStatusInit;
[[RSDBService db] updateRowsInTable:NSStringFromClass([RSTaskQueueTaskModel class]) onProperty:RSTaskQueueTaskModel.status withObject:task where:RSTaskQueueTaskModel.status==RSTaskQueueTaskModelStatusRunning||RSTaskQueueTaskModel.status==RSTaskQueueTaskModelStatusSuspend];
}
}
retryAllFailTasks方法將所有執(zhí)行失敗的task重新執(zhí)行栏账,原理還是將status狀態(tài)由fail改為init狀態(tài)寫入數(shù)據(jù)庫,下一次taskQueue開始run的時候就會考慮執(zhí)行到這些task了栈源。
-(void)retryAllFailTasks {
NSArray *tmp = [[RSDBService db] getObjectsOfClass:[RSTaskQueueTaskModel class] fromTable:NSStringFromClass([RSTaskQueueTaskModel class]) where:RSTaskQueueTaskModel.status==RSTaskQueueTaskModelStatusFail orderBy:RSTaskQueueTaskModel.taskId.order(WCTOrderedAscending)];
for (RSTaskQueueTaskModel *task in tmp) {
Class clazz = NSClassFromString(task.className);
RSTaskQueueTaskModel * object = [[clazz alloc] init];
[object yy_modelSetWithJSON:task.data];
[object setTaskId:task.taskId];
[object setStatus:task.status];
[object retryNextTime];
}
}
其他的方法就不列出了挡爵,代碼都差不多,核心點還是在于結(jié)合數(shù)據(jù)庫修改taskModel的狀態(tài)凉翻,然后taskQueue在執(zhí)行runNextTask的時候就會自動執(zhí)行最合適的task了讨。
結(jié)束
其實整個方案的架構(gòu)并不復雜捻激,通俗易懂,主要是在于實現(xiàn)以及解決應用中的問題前计。方案是可以根據(jù)需求隨時修改的胞谭,沒有最好的方案,只有最合適的方案男杈。當然文中的taskQueueService還有更多優(yōu)化的空間丈屹,有不合理的地方大家可以提出,一起交流伶棒,共同進步旺垒。